Spaces:
Sleeping
Sleeping
import asyncio | |
import json | |
import os | |
import tempfile | |
import time | |
import yaml | |
from swarm_models import OpenAIChat | |
from swarms import Agent | |
def test_basic_agent_functionality(): | |
"""Test basic agent initialization and simple task execution""" | |
print("\nTesting basic agent functionality...") | |
model = OpenAIChat(model_name="gpt-4o") | |
agent = Agent(agent_name="Test-Agent", llm=model, max_loops=1) | |
response = agent.run("What is 2+2?") | |
assert response is not None, "Agent response should not be None" | |
# Test agent properties | |
assert ( | |
agent.agent_name == "Test-Agent" | |
), "Agent name not set correctly" | |
assert agent.max_loops == 1, "Max loops not set correctly" | |
assert agent.llm is not None, "LLM not initialized" | |
print("β Basic agent functionality test passed") | |
def test_memory_management(): | |
"""Test agent memory management functionality""" | |
print("\nTesting memory management...") | |
model = OpenAIChat(model_name="gpt-4o") | |
agent = Agent( | |
agent_name="Memory-Test-Agent", | |
llm=model, | |
max_loops=1, | |
context_length=8192, | |
) | |
# Test adding to memory | |
agent.add_memory("Test memory entry") | |
assert ( | |
"Test memory entry" | |
in agent.short_memory.return_history_as_string() | |
) | |
# Test memory query | |
agent.memory_query("Test query") | |
# Test token counting | |
tokens = agent.check_available_tokens() | |
assert isinstance(tokens, int), "Token count should be an integer" | |
print("β Memory management test passed") | |
def test_agent_output_formats(): | |
"""Test all available output formats""" | |
print("\nTesting all output formats...") | |
model = OpenAIChat(model_name="gpt-4o") | |
test_task = "Say hello!" | |
output_types = { | |
"str": str, | |
"string": str, | |
"list": str, # JSON string containing list | |
"json": str, # JSON string | |
"dict": dict, | |
"yaml": str, | |
} | |
for output_type, expected_type in output_types.items(): | |
agent = Agent( | |
agent_name=f"{output_type.capitalize()}-Output-Agent", | |
llm=model, | |
max_loops=1, | |
output_type=output_type, | |
) | |
response = agent.run(test_task) | |
assert ( | |
response is not None | |
), f"{output_type} output should not be None" | |
if output_type == "yaml": | |
# Verify YAML can be parsed | |
try: | |
yaml.safe_load(response) | |
print(f"β {output_type} output valid") | |
except yaml.YAMLError: | |
assert False, f"Invalid YAML output for {output_type}" | |
elif output_type in ["json", "list"]: | |
# Verify JSON can be parsed | |
try: | |
json.loads(response) | |
print(f"β {output_type} output valid") | |
except json.JSONDecodeError: | |
assert False, f"Invalid JSON output for {output_type}" | |
print("β Output formats test passed") | |
def test_agent_state_management(): | |
"""Test comprehensive state management functionality""" | |
print("\nTesting state management...") | |
model = OpenAIChat(model_name="gpt-4o") | |
# Create temporary directory for test files | |
with tempfile.TemporaryDirectory() as temp_dir: | |
state_path = os.path.join(temp_dir, "agent_state.json") | |
# Create agent with initial state | |
agent1 = Agent( | |
agent_name="State-Test-Agent", | |
llm=model, | |
max_loops=1, | |
saved_state_path=state_path, | |
) | |
# Add some data to the agent | |
agent1.run("Remember this: Test message 1") | |
agent1.add_memory("Test message 2") | |
# Save state | |
agent1.save() | |
assert os.path.exists(state_path), "State file not created" | |
# Create new agent and load state | |
agent2 = Agent( | |
agent_name="State-Test-Agent", llm=model, max_loops=1 | |
) | |
agent2.load(state_path) | |
# Verify state loaded correctly | |
history2 = agent2.short_memory.return_history_as_string() | |
assert ( | |
"Test message 1" in history2 | |
), "State not loaded correctly" | |
assert ( | |
"Test message 2" in history2 | |
), "Memory not loaded correctly" | |
# Test autosave functionality | |
agent3 = Agent( | |
agent_name="Autosave-Test-Agent", | |
llm=model, | |
max_loops=1, | |
saved_state_path=os.path.join( | |
temp_dir, "autosave_state.json" | |
), | |
autosave=True, | |
) | |
agent3.run("Test autosave") | |
time.sleep(2) # Wait for autosave | |
assert os.path.exists( | |
os.path.join(temp_dir, "autosave_state.json") | |
), "Autosave file not created" | |
print("β State management test passed") | |
def test_agent_tools_and_execution(): | |
"""Test agent tool handling and execution""" | |
print("\nTesting tools and execution...") | |
def sample_tool(x: int, y: int) -> int: | |
"""Sample tool that adds two numbers""" | |
return x + y | |
model = OpenAIChat(model_name="gpt-4o") | |
agent = Agent( | |
agent_name="Tools-Test-Agent", | |
llm=model, | |
max_loops=1, | |
tools=[sample_tool], | |
) | |
# Test adding tools | |
agent.add_tool(lambda x: x * 2) | |
assert len(agent.tools) == 2, "Tool not added correctly" | |
# Test removing tools | |
agent.remove_tool(sample_tool) | |
assert len(agent.tools) == 1, "Tool not removed correctly" | |
# Test tool execution | |
response = agent.run("Calculate 2 + 2 using the sample tool") | |
assert response is not None, "Tool execution failed" | |
print("β Tools and execution test passed") | |
def test_agent_concurrent_execution(): | |
"""Test agent concurrent execution capabilities""" | |
print("\nTesting concurrent execution...") | |
model = OpenAIChat(model_name="gpt-4o") | |
agent = Agent( | |
agent_name="Concurrent-Test-Agent", llm=model, max_loops=1 | |
) | |
# Test bulk run | |
tasks = [ | |
{"task": "Count to 3"}, | |
{"task": "Say hello"}, | |
{"task": "Tell a short joke"}, | |
] | |
responses = agent.bulk_run(tasks) | |
assert len(responses) == len(tasks), "Not all tasks completed" | |
assert all( | |
response is not None for response in responses | |
), "Some tasks failed" | |
# Test concurrent tasks | |
concurrent_responses = agent.run_concurrent_tasks( | |
["Task 1", "Task 2", "Task 3"] | |
) | |
assert ( | |
len(concurrent_responses) == 3 | |
), "Not all concurrent tasks completed" | |
print("β Concurrent execution test passed") | |
def test_agent_error_handling(): | |
"""Test agent error handling and recovery""" | |
print("\nTesting error handling...") | |
model = OpenAIChat(model_name="gpt-4o") | |
agent = Agent( | |
agent_name="Error-Test-Agent", | |
llm=model, | |
max_loops=1, | |
retry_attempts=3, | |
retry_interval=1, | |
) | |
# Test invalid tool execution | |
try: | |
agent.parse_and_execute_tools("invalid_json") | |
print("β Invalid tool execution handled") | |
except Exception: | |
assert True, "Expected error caught" | |
# Test recovery after error | |
response = agent.run("Continue after error") | |
assert response is not None, "Agent failed to recover after error" | |
print("β Error handling test passed") | |
def test_agent_configuration(): | |
"""Test agent configuration and parameters""" | |
print("\nTesting agent configuration...") | |
model = OpenAIChat(model_name="gpt-4o") | |
agent = Agent( | |
agent_name="Config-Test-Agent", | |
llm=model, | |
max_loops=1, | |
temperature=0.7, | |
max_tokens=4000, | |
context_length=8192, | |
) | |
# Test configuration methods | |
agent.update_system_prompt("New system prompt") | |
agent.update_max_loops(2) | |
agent.update_loop_interval(2) | |
# Verify updates | |
assert agent.max_loops == 2, "Max loops not updated" | |
assert agent.loop_interval == 2, "Loop interval not updated" | |
# Test configuration export | |
config_dict = agent.to_dict() | |
assert isinstance( | |
config_dict, dict | |
), "Configuration export failed" | |
# Test YAML export | |
yaml_config = agent.to_yaml() | |
assert isinstance(yaml_config, str), "YAML export failed" | |
print("β Configuration test passed") | |
def test_agent_with_stopping_condition(): | |
"""Test agent with custom stopping condition""" | |
print("\nTesting agent with stopping condition...") | |
def custom_stopping_condition(response: str) -> bool: | |
return "STOP" in response.upper() | |
model = OpenAIChat(model_name="gpt-4o") | |
agent = Agent( | |
agent_name="Stopping-Condition-Agent", | |
llm=model, | |
max_loops=5, | |
stopping_condition=custom_stopping_condition, | |
) | |
response = agent.run("Count up until you see the word STOP") | |
assert response is not None, "Stopping condition test failed" | |
print("β Stopping condition test passed") | |
def test_agent_with_retry_mechanism(): | |
"""Test agent retry mechanism""" | |
print("\nTesting agent retry mechanism...") | |
model = OpenAIChat(model_name="gpt-4o") | |
agent = Agent( | |
agent_name="Retry-Test-Agent", | |
llm=model, | |
max_loops=1, | |
retry_attempts=3, | |
retry_interval=1, | |
) | |
response = agent.run("Tell me a joke.") | |
assert response is not None, "Retry mechanism test failed" | |
print("β Retry mechanism test passed") | |
def test_bulk_and_filtered_operations(): | |
"""Test bulk operations and response filtering""" | |
print("\nTesting bulk and filtered operations...") | |
model = OpenAIChat(model_name="gpt-4o") | |
agent = Agent( | |
agent_name="Bulk-Filter-Test-Agent", llm=model, max_loops=1 | |
) | |
# Test bulk run | |
bulk_tasks = [ | |
{"task": "What is 2+2?"}, | |
{"task": "Name a color"}, | |
{"task": "Count to 3"}, | |
] | |
bulk_responses = agent.bulk_run(bulk_tasks) | |
assert len(bulk_responses) == len( | |
bulk_tasks | |
), "Bulk run should return same number of responses as tasks" | |
# Test response filtering | |
agent.add_response_filter("color") | |
filtered_response = agent.filtered_run( | |
"What is your favorite color?" | |
) | |
assert ( | |
"[FILTERED]" in filtered_response | |
), "Response filter not applied" | |
print("β Bulk and filtered operations test passed") | |
async def test_async_operations(): | |
"""Test asynchronous operations""" | |
print("\nTesting async operations...") | |
model = OpenAIChat(model_name="gpt-4o") | |
agent = Agent( | |
agent_name="Async-Test-Agent", llm=model, max_loops=1 | |
) | |
# Test single async run | |
response = await agent.arun("What is 1+1?") | |
assert response is not None, "Async run failed" | |
# Test concurrent async runs | |
tasks = ["Task 1", "Task 2", "Task 3"] | |
responses = await asyncio.gather( | |
*[agent.arun(task) for task in tasks] | |
) | |
assert len(responses) == len( | |
tasks | |
), "Not all async tasks completed" | |
print("β Async operations test passed") | |
def test_memory_and_state_persistence(): | |
"""Test memory management and state persistence""" | |
print("\nTesting memory and state persistence...") | |
with tempfile.TemporaryDirectory() as temp_dir: | |
state_path = os.path.join(temp_dir, "test_state.json") | |
# Create agent with memory configuration | |
model = OpenAIChat(model_name="gpt-4o") | |
agent1 = Agent( | |
agent_name="Memory-State-Test-Agent", | |
llm=model, | |
max_loops=1, | |
saved_state_path=state_path, | |
context_length=8192, | |
autosave=True, | |
) | |
# Test memory operations | |
agent1.add_memory("Important fact: The sky is blue") | |
agent1.memory_query("What color is the sky?") | |
# Save state | |
agent1.save() | |
# Create new agent and load state | |
agent2 = Agent( | |
agent_name="Memory-State-Test-Agent", | |
llm=model, | |
max_loops=1, | |
) | |
agent2.load(state_path) | |
# Verify memory persistence | |
memory_content = ( | |
agent2.short_memory.return_history_as_string() | |
) | |
assert ( | |
"sky is blue" in memory_content | |
), "Memory not properly persisted" | |
print("β Memory and state persistence test passed") | |
def test_sentiment_and_evaluation(): | |
"""Test sentiment analysis and response evaluation""" | |
print("\nTesting sentiment analysis and evaluation...") | |
def mock_sentiment_analyzer(text): | |
"""Mock sentiment analyzer that returns a score between 0 and 1""" | |
return 0.7 if "positive" in text.lower() else 0.3 | |
def mock_evaluator(response): | |
"""Mock evaluator that checks response quality""" | |
return "GOOD" if len(response) > 10 else "BAD" | |
model = OpenAIChat(model_name="gpt-4o") | |
agent = Agent( | |
agent_name="Sentiment-Eval-Test-Agent", | |
llm=model, | |
max_loops=1, | |
sentiment_analyzer=mock_sentiment_analyzer, | |
sentiment_threshold=0.5, | |
evaluator=mock_evaluator, | |
) | |
# Test sentiment analysis | |
agent.run("Generate a positive message") | |
# Test evaluation | |
agent.run("Generate a detailed response") | |
print("β Sentiment and evaluation test passed") | |
def test_tool_management(): | |
"""Test tool management functionality""" | |
print("\nTesting tool management...") | |
def tool1(x: int) -> int: | |
"""Sample tool 1""" | |
return x * 2 | |
def tool2(x: int) -> int: | |
"""Sample tool 2""" | |
return x + 2 | |
model = OpenAIChat(model_name="gpt-4o") | |
agent = Agent( | |
agent_name="Tool-Test-Agent", | |
llm=model, | |
max_loops=1, | |
tools=[tool1], | |
) | |
# Test adding tools | |
agent.add_tool(tool2) | |
assert len(agent.tools) == 2, "Tool not added correctly" | |
# Test removing tools | |
agent.remove_tool(tool1) | |
assert len(agent.tools) == 1, "Tool not removed correctly" | |
# Test adding multiple tools | |
agent.add_tools([tool1, tool2]) | |
assert len(agent.tools) == 3, "Multiple tools not added correctly" | |
print("β Tool management test passed") | |
def test_system_prompt_and_configuration(): | |
"""Test system prompt and configuration updates""" | |
print("\nTesting system prompt and configuration...") | |
model = OpenAIChat(model_name="gpt-4o") | |
agent = Agent( | |
agent_name="Config-Test-Agent", llm=model, max_loops=1 | |
) | |
# Test updating system prompt | |
new_prompt = "You are a helpful assistant." | |
agent.update_system_prompt(new_prompt) | |
assert ( | |
agent.system_prompt == new_prompt | |
), "System prompt not updated" | |
# Test configuration updates | |
agent.update_max_loops(5) | |
assert agent.max_loops == 5, "Max loops not updated" | |
agent.update_loop_interval(2) | |
assert agent.loop_interval == 2, "Loop interval not updated" | |
# Test configuration export | |
config_dict = agent.to_dict() | |
assert isinstance( | |
config_dict, dict | |
), "Configuration export failed" | |
print("β System prompt and configuration test passed") | |
def test_agent_with_dynamic_temperature(): | |
"""Test agent with dynamic temperature""" | |
print("\nTesting agent with dynamic temperature...") | |
model = OpenAIChat(model_name="gpt-4o") | |
agent = Agent( | |
agent_name="Dynamic-Temp-Agent", | |
llm=model, | |
max_loops=2, | |
dynamic_temperature_enabled=True, | |
) | |
response = agent.run("Generate a creative story.") | |
assert response is not None, "Dynamic temperature test failed" | |
print("β Dynamic temperature test passed") | |
def run_all_tests(): | |
"""Run all test functions""" | |
print("Starting Extended Agent functional tests...\n") | |
test_functions = [ | |
test_basic_agent_functionality, | |
test_memory_management, | |
test_agent_output_formats, | |
test_agent_state_management, | |
test_agent_tools_and_execution, | |
test_agent_concurrent_execution, | |
test_agent_error_handling, | |
test_agent_configuration, | |
test_agent_with_stopping_condition, | |
test_agent_with_retry_mechanism, | |
test_agent_with_dynamic_temperature, | |
test_bulk_and_filtered_operations, | |
test_memory_and_state_persistence, | |
test_sentiment_and_evaluation, | |
test_tool_management, | |
test_system_prompt_and_configuration, | |
] | |
# Run synchronous tests | |
total_tests = len(test_functions) + 1 # +1 for async test | |
passed_tests = 0 | |
for test in test_functions: | |
try: | |
test() | |
passed_tests += 1 | |
except Exception as e: | |
print(f"β Test {test.__name__} failed: {str(e)}") | |
# Run async test | |
try: | |
asyncio.run(test_async_operations()) | |
passed_tests += 1 | |
except Exception as e: | |
print(f"β Async operations test failed: {str(e)}") | |
print("\nExtended Test Summary:") | |
print(f"Total Tests: {total_tests}") | |
print(f"Passed: {passed_tests}") | |
print(f"Failed: {total_tests - passed_tests}") | |
print(f"Success Rate: {(passed_tests/total_tests)*100:.2f}%") | |
if __name__ == "__main__": | |
run_all_tests() | |