File size: 10,688 Bytes
d8d14f1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
import asyncio
import os
from unittest.mock import patch

import pytest

from swarm_models import OpenAIChat
from swarms.structs.agent import Agent
from swarms.structs.sequential_workflow import (
    SequentialWorkflow,
    Task,
)

# Mock the OpenAI API key using environment variables
os.environ["OPENAI_API_KEY"] = "mocked_api_key"


# Mock OpenAIChat class for testing
class MockOpenAIChat:
    def __init__(self, *args, **kwargs):
        pass

    def run(self, *args, **kwargs):
        return "Mocked result"


# Mock Agent class for testing
class MockAgent:
    def __init__(self, *args, **kwargs):
        pass

    def run(self, *args, **kwargs):
        return "Mocked result"


# Mock SequentialWorkflow class for testing
class MockSequentialWorkflow:
    def __init__(self, *args, **kwargs):
        pass

    def add(self, *args, **kwargs):
        pass

    def run(self):
        pass


# Test Task class
def test_task_initialization():
    description = "Sample Task"
    agent = MockOpenAIChat()
    task = Task(description=description, agent=agent)
    assert task.description == description
    assert task.agent == agent


def test_task_execute():
    description = "Sample Task"
    agent = MockOpenAIChat()
    task = Task(description=description, agent=agent)
    task.run()
    assert task.result == "Mocked result"


# Test SequentialWorkflow class
def test_sequential_workflow_initialization():
    workflow = SequentialWorkflow()
    assert isinstance(workflow, SequentialWorkflow)
    assert len(workflow.tasks) == 0
    assert workflow.max_loops == 1
    assert workflow.autosave is False
    assert (
        workflow.saved_state_filepath
        == "sequential_workflow_state.json"
    )
    assert workflow.restore_state_filepath is None
    assert workflow.dashboard is False


def test_sequential_workflow_add_task():
    workflow = SequentialWorkflow()
    task_description = "Sample Task"
    task_flow = MockOpenAIChat()
    workflow.add(task_description, task_flow)
    assert len(workflow.tasks) == 1
    assert workflow.tasks[0].description == task_description
    assert workflow.tasks[0].agent == task_flow


def test_sequential_workflow_reset_workflow():
    workflow = SequentialWorkflow()
    task_description = "Sample Task"
    task_flow = MockOpenAIChat()
    workflow.add(task_description, task_flow)
    workflow.reset_workflow()
    assert workflow.tasks[0].result is None


def test_sequential_workflow_get_task_results():
    workflow = SequentialWorkflow()
    task_description = "Sample Task"
    task_flow = MockOpenAIChat()
    workflow.add(task_description, task_flow)
    workflow.run()
    results = workflow.get_task_results()
    assert len(results) == 1
    assert task_description in results
    assert results[task_description] == "Mocked result"


def test_sequential_workflow_remove_task():
    workflow = SequentialWorkflow()
    task1_description = "Task 1"
    task2_description = "Task 2"
    task1_flow = MockOpenAIChat()
    task2_flow = MockOpenAIChat()
    workflow.add(task1_description, task1_flow)
    workflow.add(task2_description, task2_flow)
    workflow.remove_task(task1_description)
    assert len(workflow.tasks) == 1
    assert workflow.tasks[0].description == task2_description


def test_sequential_workflow_update_task():
    workflow = SequentialWorkflow()
    task_description = "Sample Task"
    task_flow = MockOpenAIChat()
    workflow.add(task_description, task_flow)
    workflow.update_task(task_description, max_tokens=1000)
    assert workflow.tasks[0].kwargs["max_tokens"] == 1000


def test_sequential_workflow_save_workflow_state():
    workflow = SequentialWorkflow()
    task_description = "Sample Task"
    task_flow = MockOpenAIChat()
    workflow.add(task_description, task_flow)
    workflow.save_workflow_state("test_state.json")
    assert os.path.exists("test_state.json")
    os.remove("test_state.json")


def test_sequential_workflow_load_workflow_state():
    workflow = SequentialWorkflow()
    task_description = "Sample Task"
    task_flow = MockOpenAIChat()
    workflow.add(task_description, task_flow)
    workflow.save_workflow_state("test_state.json")
    workflow.load_workflow_state("test_state.json")
    assert len(workflow.tasks) == 1
    assert workflow.tasks[0].description == task_description
    os.remove("test_state.json")


def test_sequential_workflow_run():
    workflow = SequentialWorkflow()
    task_description = "Sample Task"
    task_flow = MockOpenAIChat()
    workflow.add(task_description, task_flow)
    workflow.run()
    assert workflow.tasks[0].result == "Mocked result"


def test_sequential_workflow_workflow_bootup(capfd):
    workflow = SequentialWorkflow()
    workflow.workflow_bootup()
    out, _ = capfd.readouterr()
    assert "Sequential Workflow Initializing..." in out


def test_sequential_workflow_workflow_dashboard(capfd):
    workflow = SequentialWorkflow()
    workflow.workflow_dashboard()
    out, _ = capfd.readouterr()
    assert "Sequential Workflow Dashboard" in out


# Mock Agent class for async testing
class MockAsyncAgent:
    def __init__(self, *args, **kwargs):
        pass

    async def arun(self, *args, **kwargs):
        return "Mocked result"


# Test async execution in SequentialWorkflow
@pytest.mark.asyncio
async def test_sequential_workflow_arun():
    workflow = SequentialWorkflow()
    task_description = "Sample Task"
    task_flow = MockAsyncAgent()
    workflow.add(task_description, task_flow)
    await workflow.arun()
    assert workflow.tasks[0].result == "Mocked result"


def test_real_world_usage_with_openai_key():
    # Initialize the language model
    llm = OpenAIChat()
    assert isinstance(llm, OpenAIChat)


def test_real_world_usage_with_flow_and_openai_key():
    # Initialize a agent with the language model
    agent = Agent(llm=OpenAIChat())
    assert isinstance(agent, Agent)


def test_real_world_usage_with_sequential_workflow():
    # Initialize a sequential workflow
    workflow = SequentialWorkflow()
    assert isinstance(workflow, SequentialWorkflow)


def test_real_world_usage_add_tasks():
    # Create a sequential workflow and add tasks
    workflow = SequentialWorkflow()
    task1_description = "Task 1"
    task2_description = "Task 2"
    task1_flow = OpenAIChat()
    task2_flow = OpenAIChat()
    workflow.add(task1_description, task1_flow)
    workflow.add(task2_description, task2_flow)
    assert len(workflow.tasks) == 2
    assert workflow.tasks[0].description == task1_description
    assert workflow.tasks[1].description == task2_description


def test_real_world_usage_run_workflow():
    # Create a sequential workflow, add a task, and run the workflow
    workflow = SequentialWorkflow()
    task_description = "Sample Task"
    task_flow = OpenAIChat()
    workflow.add(task_description, task_flow)
    workflow.run()
    assert workflow.tasks[0].result is not None


def test_real_world_usage_dashboard_display():
    # Create a sequential workflow, add tasks, and display the dashboard
    workflow = SequentialWorkflow()
    task1_description = "Task 1"
    task2_description = "Task 2"
    task1_flow = OpenAIChat()
    task2_flow = OpenAIChat()
    workflow.add(task1_description, task1_flow)
    workflow.add(task2_description, task2_flow)
    with patch("builtins.print") as mock_print:
        workflow.workflow_dashboard()
        mock_print.assert_called()


def test_real_world_usage_async_execution():
    # Create a sequential workflow, add an async task, and run the workflow asynchronously
    workflow = SequentialWorkflow()
    task_description = "Sample Task"
    async_task_flow = OpenAIChat()

    async def async_run_workflow():
        await workflow.arun()

    workflow.add(task_description, async_task_flow)
    asyncio.run(async_run_workflow())
    assert workflow.tasks[0].result is not None


def test_real_world_usage_multiple_loops():
    # Create a sequential workflow with multiple loops, add a task, and run the workflow
    workflow = SequentialWorkflow(max_loops=3)
    task_description = "Sample Task"
    task_flow = OpenAIChat()
    workflow.add(task_description, task_flow)
    workflow.run()
    assert workflow.tasks[0].result is not None


def test_real_world_usage_autosave_state():
    # Create a sequential workflow with autosave, add a task, run the workflow, and check if state is saved
    workflow = SequentialWorkflow(autosave=True)
    task_description = "Sample Task"
    task_flow = OpenAIChat()
    workflow.add(task_description, task_flow)
    workflow.run()
    assert workflow.tasks[0].result is not None
    assert os.path.exists("sequential_workflow_state.json")
    os.remove("sequential_workflow_state.json")


def test_real_world_usage_load_state():
    # Create a sequential workflow, add a task, save state, load state, and run the workflow
    workflow = SequentialWorkflow()
    task_description = "Sample Task"
    task_flow = OpenAIChat()
    workflow.add(task_description, task_flow)
    workflow.run()
    workflow.save_workflow_state("test_state.json")
    workflow.load_workflow_state("test_state.json")
    workflow.run()
    assert workflow.tasks[0].result is not None
    os.remove("test_state.json")


def test_real_world_usage_update_task_args():
    # Create a sequential workflow, add a task, and update task arguments
    workflow = SequentialWorkflow()
    task_description = "Sample Task"
    task_flow = OpenAIChat()
    workflow.add(task_description, task_flow)
    workflow.update_task(task_description, max_tokens=1000)
    assert workflow.tasks[0].kwargs["max_tokens"] == 1000


def test_real_world_usage_remove_task():
    # Create a sequential workflow, add tasks, remove a task, and run the workflow
    workflow = SequentialWorkflow()
    task1_description = "Task 1"
    task2_description = "Task 2"
    task1_flow = OpenAIChat()
    task2_flow = OpenAIChat()
    workflow.add(task1_description, task1_flow)
    workflow.add(task2_description, task2_flow)
    workflow.remove_task(task1_description)
    workflow.run()
    assert len(workflow.tasks) == 1
    assert workflow.tasks[0].description == task2_description


def test_real_world_usage_with_environment_variables():
    # Ensure that the OpenAI API key is set using environment variables
    assert "OPENAI_API_KEY" in os.environ
    assert os.environ["OPENAI_API_KEY"] == "mocked_api_key"
    del os.environ["OPENAI_API_KEY"]  # Clean up after the test


def test_real_world_usage_no_openai_key():
    # Ensure that an exception is raised when the OpenAI API key is not set
    with pytest.raises(ValueError):
        OpenAIChat()  # API key not provided, should raise an exception