#!/usr/bin/env python # -*- coding: utf-8 -*- """ @Time : 2023/5/11 22:12 @Author : alexanderwu @File : environment.py @Modified From: https://github.com/geekan/MetaGPT/blob/main/metagpt/environment.py """ import asyncio import re import json import datetime import websockets from common import MessageType, format_message, timestamp from typing import Iterable from pydantic import BaseModel, Field from .roles import Role from .actions import Requirement from .roles import CustomRole, ActionObserver, Group, ROLES_LIST, ROLES_MAPPING from .system.memory import Memory from .system.schema import Message class Environment(BaseModel): """环境,承载一批角色,角色可以向环境发布消息,可以被其他角色观察到""" roles: dict[str, Role] = Field(default_factory=dict) memory: Memory = Field(default_factory=Memory) history: str = Field(default='') new_roles_args: dict = Field(default_factory=dict) new_roles: dict[str, Role] = Field(default_factory=dict) steps: list = Field(default_factory=list) msg_json: list = Field(default_factory=list) json_log: str = Field(default='./logs/json_log.json') task_id: str = Field(default='') proxy: str = Field(default='') llm_api_key: str = Field(default='') serpapi_key: str = Field(default='') alg_msg_queue: object = Field(default=None) class Config: arbitrary_types_allowed = True def add_role(self, role: Role): """增加一个在当前环境的Role""" role.set_env(self) self.roles[role.profile] = role def add_roles(self, roles: Iterable[Role]): """增加一批在当前环境的Role""" for role in roles: self.add_role(role) def _parser_roles(self, text): """解析添加的Roles""" agents = re.findall('{[\s\S]*?}', text) # re.findall('{{.*}}', agents) agents_args = [] for agent in agents: agent = json.loads(agent.strip()) if len(agent.keys()) > 0: agents_args.append(agent) print('---------------Agents---------------') for i, agent in enumerate(agents_args): print('Role', i, agent) return agents_args def _parser_plan(self, context): """解析生成的计划Plan""" plan_context = re.findall('## Execution Plan([\s\S]*?)##', str(context))[0] steps = [v.split("\n")[0] for v in re.split("\n\d+\. ", plan_context)[1:]] print('---------------Steps---------------') for i, step in enumerate(steps): print('Step', i, step) steps.insert(0, '') return steps def create_roles(self, plan: list, args: dict): """创建Role""" requirement_type = type('Requirement_Group', (Requirement,), {}) self.add_role(Group(roles=args, steps=plan, watch_actions=[Requirement,requirement_type], proxy=self.proxy, serpapi_api_key=self.serpapi_key, llm_api_key=self.llm_api_key)) # existing_roles = dict() # for item in ROLES_LIST: # existing_roles[item['name']] = item # init_actions, watch_actions = [], [] # for role in args: # class_name = role['name'].replace(' ', '_') + '_Requirement' # requirement_type = type(class_name, (Requirement,), {}) # if role['name'] in existing_roles.keys(): # print('Add a predefiend role:', role['name']) # role_object = ROLES_MAPPING[role['name']] # if 'Engineer' in role['name']: # _role = role_object(n_borg=2, use_code_review=True, proxy=self.proxy, llm_api_key=self.llm_api_key, serpapi_api_key=self.serpapi_key) # else: # _role = role_object(watch_actions=[requirement_type], proxy=self.proxy, llm_api_key=self.llm_api_key, serpapi_api_key=self.serpapi_key) # else: # print('Add a new role:', role['name']) # _role = CustomRole( # name=role['name'], # profile=role['name'], # goal=role['description'], # role_prompt=role['prompt'], # steps=role['steps'], # tool=role['tools'], # watch_actions=[requirement_type], # proxy=self.proxy, # llm_api_key=self.llm_api_key, # serpapi_api_key=self.serpapi_key, # ) # self.add_role(_role) # watch_actions.append(requirement_type) # init_actions.append(_role.init_actions) # init_actions.append(Requirement) # self.add_role(ActionObserver(steps=plan, watch_actions=init_actions, init_actions=watch_actions, proxy=self.proxy, llm_api_key=self.llm_api_key)) async def publish_message(self, message: Message): """向当前环境发布信息""" # self.message_queue.put(message) self.memory.add(message) self.history += f"\n{message}" if 'Manager' in message.role: self.steps = self._parser_plan(message.content) self.new_roles_args = self._parser_roles(message.content) self.new_roles = self.create_roles(self.steps, self.new_roles_args) filename, file_content = None, None if hasattr(message.instruct_content, 'Type') and 'FILE' in message.instruct_content.Type: filename = message.instruct_content.Key file_type = re.findall('```(.*?)\n', str(message.content))[0] file_content = re.findall(f'```{file_type}([\s\S]*?)```', str(message.content))[0] if message.role and 'ActionObserver' != message.role: if hasattr(message.instruct_content, 'Response'): content = message.instruct_content.Response else: content = message.content msg = { 'timestamp': timestamp(), 'role': message.role, 'content': content, 'file': { 'file_type': filename, 'file_data': file_content, } } if self.alg_msg_queue: self.alg_msg_queue.put_nowait(format_message(action=MessageType.RunTask.value, data={'task_id': self.task_id, 'task_message':msg})) if 'Agents Observer' in message.role: # send role list msg = { 'timestamp': timestamp(), 'role': "Revised Role List", 'content': self.new_roles_args, 'file': { 'file_type': None, 'file_data': None, } } if self.alg_msg_queue: self.alg_msg_queue.put_nowait(format_message(action=MessageType.RunTask.value, data={'task_id': self.task_id, 'task_message':msg})) async def run(self, k=1): """处理一次所有Role的运行""" old_roles = [] for _ in range(k): futures = [] for key in self.roles.keys(): old_roles.append(key) role = self.roles[key] future = role.run() futures.append(future) await asyncio.gather(*futures) if len(old_roles) < len(self.roles): while len(self.get_role(name='Group').steps) > 0: futures = [] for key in self.roles.keys(): if key not in old_roles: role = self.roles[key] future = role.run() futures.append(future) await asyncio.gather(*futures) def get_roles(self) -> dict[str, Role]: """获得环境内的所有Role""" return self.roles def get_role(self, name: str) -> Role: """获得环境内的指定Role""" return self.roles.get(name, None)