AutoAgents / autoagents /environment.py
Orami01's picture
Upload 274 files
9c48ae2
raw
history blame
8.11 kB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/5/11 22:12
@Author : alexanderwu
@File : environment.py
@Modified From: https://github.com/geekan/MetaGPT/blob/main/metagpt/environment.py
"""
import asyncio
import re
import json
import datetime
import websockets
from common import MessageType, format_message, timestamp
from typing import Iterable
from pydantic import BaseModel, Field
from .roles import Role
from .actions import Requirement
from .roles import CustomRole, ActionObserver, Group, ROLES_LIST, ROLES_MAPPING
from .system.memory import Memory
from .system.schema import Message
class Environment(BaseModel):
"""环境,承载一批角色,角色可以向环境发布消息,可以被其他角色观察到"""
roles: dict[str, Role] = Field(default_factory=dict)
memory: Memory = Field(default_factory=Memory)
history: str = Field(default='')
new_roles_args: dict = Field(default_factory=dict)
new_roles: dict[str, Role] = Field(default_factory=dict)
steps: list = Field(default_factory=list)
msg_json: list = Field(default_factory=list)
json_log: str = Field(default='./logs/json_log.json')
task_id: str = Field(default='')
proxy: str = Field(default='')
llm_api_key: str = Field(default='')
serpapi_key: str = Field(default='')
alg_msg_queue: object = Field(default=None)
class Config:
arbitrary_types_allowed = True
def add_role(self, role: Role):
"""增加一个在当前环境的Role"""
role.set_env(self)
self.roles[role.profile] = role
def add_roles(self, roles: Iterable[Role]):
"""增加一批在当前环境的Role"""
for role in roles:
self.add_role(role)
def _parser_roles(self, text):
"""解析添加的Roles"""
agents = re.findall('{[\s\S]*?}', text) # re.findall('{{.*}}', agents)
agents_args = []
for agent in agents:
agent = json.loads(agent.strip())
if len(agent.keys()) > 0:
agents_args.append(agent)
print('---------------Agents---------------')
for i, agent in enumerate(agents_args):
print('Role', i, agent)
return agents_args
def _parser_plan(self, context):
"""解析生成的计划Plan"""
plan_context = re.findall('## Execution Plan([\s\S]*?)##', str(context))[0]
steps = [v.split("\n")[0] for v in re.split("\n\d+\. ", plan_context)[1:]]
print('---------------Steps---------------')
for i, step in enumerate(steps):
print('Step', i, step)
steps.insert(0, '')
return steps
def create_roles(self, plan: list, args: dict):
"""创建Role"""
requirement_type = type('Requirement_Group', (Requirement,), {})
self.add_role(Group(roles=args, steps=plan, watch_actions=[Requirement,requirement_type], proxy=self.proxy, serpapi_api_key=self.serpapi_key, llm_api_key=self.llm_api_key))
# existing_roles = dict()
# for item in ROLES_LIST:
# existing_roles[item['name']] = item
# init_actions, watch_actions = [], []
# for role in args:
# class_name = role['name'].replace(' ', '_') + '_Requirement'
# requirement_type = type(class_name, (Requirement,), {})
# if role['name'] in existing_roles.keys():
# print('Add a predefiend role:', role['name'])
# role_object = ROLES_MAPPING[role['name']]
# if 'Engineer' in role['name']:
# _role = role_object(n_borg=2, use_code_review=True, proxy=self.proxy, llm_api_key=self.llm_api_key, serpapi_api_key=self.serpapi_key)
# else:
# _role = role_object(watch_actions=[requirement_type], proxy=self.proxy, llm_api_key=self.llm_api_key, serpapi_api_key=self.serpapi_key)
# else:
# print('Add a new role:', role['name'])
# _role = CustomRole(
# name=role['name'],
# profile=role['name'],
# goal=role['description'],
# role_prompt=role['prompt'],
# steps=role['steps'],
# tool=role['tools'],
# watch_actions=[requirement_type],
# proxy=self.proxy,
# llm_api_key=self.llm_api_key,
# serpapi_api_key=self.serpapi_key,
# )
# self.add_role(_role)
# watch_actions.append(requirement_type)
# init_actions.append(_role.init_actions)
# init_actions.append(Requirement)
# self.add_role(ActionObserver(steps=plan, watch_actions=init_actions, init_actions=watch_actions, proxy=self.proxy, llm_api_key=self.llm_api_key))
async def publish_message(self, message: Message):
"""向当前环境发布信息"""
# self.message_queue.put(message)
self.memory.add(message)
self.history += f"\n{message}"
if 'Manager' in message.role:
self.steps = self._parser_plan(message.content)
self.new_roles_args = self._parser_roles(message.content)
self.new_roles = self.create_roles(self.steps, self.new_roles_args)
filename, file_content = None, None
if hasattr(message.instruct_content, 'Type') and 'FILE' in message.instruct_content.Type:
filename = message.instruct_content.Key
file_type = re.findall('```(.*?)\n', str(message.content))[0]
file_content = re.findall(f'```{file_type}([\s\S]*?)```', str(message.content))[0]
if message.role and 'ActionObserver' != message.role:
if hasattr(message.instruct_content, 'Response'):
content = message.instruct_content.Response
else:
content = message.content
msg = {
'timestamp': timestamp(),
'role': message.role,
'content': content,
'file': {
'file_type': filename,
'file_data': file_content,
}
}
if self.alg_msg_queue:
self.alg_msg_queue.put_nowait(format_message(action=MessageType.RunTask.value, data={'task_id': self.task_id, 'task_message':msg}))
if 'Agents Observer' in message.role:
# send role list
msg = {
'timestamp': timestamp(),
'role': "Revised Role List",
'content': self.new_roles_args,
'file': {
'file_type': None,
'file_data': None,
}
}
if self.alg_msg_queue:
self.alg_msg_queue.put_nowait(format_message(action=MessageType.RunTask.value, data={'task_id': self.task_id, 'task_message':msg}))
async def run(self, k=1):
"""处理一次所有Role的运行"""
old_roles = []
for _ in range(k):
futures = []
for key in self.roles.keys():
old_roles.append(key)
role = self.roles[key]
future = role.run()
futures.append(future)
await asyncio.gather(*futures)
if len(old_roles) < len(self.roles):
while len(self.get_role(name='Group').steps) > 0:
futures = []
for key in self.roles.keys():
if key not in old_roles:
role = self.roles[key]
future = role.run()
futures.append(future)
await asyncio.gather(*futures)
def get_roles(self) -> dict[str, Role]:
"""获得环境内的所有Role"""
return self.roles
def get_role(self, name: str) -> Role:
"""获得环境内的指定Role"""
return self.roles.get(name, None)