Spaces:
Running
Running
File size: 1,485 Bytes
d8d14f1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
from concurrent.futures import Future
from unittest.mock import Mock, create_autospec, patch
from swarms.structs import Agent, ConcurrentWorkflow, Task
def test_add():
workflow = ConcurrentWorkflow(max_workers=2)
task = Mock(spec=Task)
workflow.add(task)
assert task in workflow.tasks
def test_run():
workflow = ConcurrentWorkflow(max_workers=2)
task1 = create_autospec(Task)
task2 = create_autospec(Task)
workflow.add(task1)
workflow.add(task2)
with patch(
"concurrent.futures.ThreadPoolExecutor"
) as mock_executor:
future1 = Future()
future1.set_result(None)
future2 = Future()
future2.set_result(None)
mock_executor.return_value.__enter__.return_value.submit.side_effect = [
future1,
future2,
]
mock_executor.return_value.__enter__.return_value.as_completed.return_value = [
future1,
future2,
]
workflow.run()
task1.execute.assert_called_once()
task2.execute.assert_called_once()
def test_execute_task():
workflow = ConcurrentWorkflow(max_workers=2)
task = create_autospec(Task)
workflow._execute_task(task)
task.execute.assert_called_once()
def test_agent_execution():
workflow = ConcurrentWorkflow(max_workers=2)
agent = create_autospec(Agent)
task = Task(agent)
workflow.add(task)
workflow._execute_task(task)
agent.execute.assert_called_once()
|