Spaces:
Sleeping
Sleeping
import re | |
from typing import List | |
from tenacity import retry, stop_after_attempt, wait_random_exponential | |
from agentreview.arguments import parse_args | |
from agentreview.utility.authentication_utils import get_openai_client | |
from .base import IntelligenceBackend | |
from ..message import SYSTEM_NAME, Message | |
# Default config follows the OpenAI playground | |
DEFAULT_TEMPERATURE = 1.0 | |
DEFAULT_MAX_TOKENS = 4096 | |
# Check https://platform.openai.com/docs/models for more models | |
DEFAULT_MODEL = "gpt-4o" | |
END_OF_MESSAGE = "<EOS>" # End of message token specified by us not OpenAI | |
STOP = ("<|endoftext|>", END_OF_MESSAGE) # End of sentence token | |
BASE_PROMPT = f"The messages always end with the token {END_OF_MESSAGE}." | |
class OpenAIChat(IntelligenceBackend): | |
"""Interface to the ChatGPT style model with system, user, assistant roles separation.""" | |
stateful = False | |
type_name = "openai-chat" | |
def __init__( | |
self, | |
temperature: float = DEFAULT_TEMPERATURE, | |
max_tokens: int = DEFAULT_MAX_TOKENS, | |
model: str = DEFAULT_MODEL, | |
merge_other_agents_as_one_user: bool = True, | |
**kwargs, | |
): | |
""" | |
Instantiate the OpenAIChat backend. | |
args: | |
temperature: the temperature of the sampling | |
max_tokens: the maximum number of tokens to sample | |
model: the model to use | |
merge_other_agents_as_one_user: whether to merge messages from other agents as one user message | |
""" | |
super().__init__( | |
temperature=temperature, | |
max_tokens=max_tokens, | |
model=model, | |
merge_other_agents_as_one_user=merge_other_agents_as_one_user, | |
**kwargs, | |
) | |
self.client_type = kwargs.get("openai_client_type", None) | |
self.client = get_openai_client(self.client_type) | |
self.temperature = temperature | |
self.max_tokens = max_tokens | |
self.model = model | |
self.merge_other_agent_as_user = merge_other_agents_as_one_user | |
def _get_response(self, messages): | |
# Refer to https://learn.microsoft.com/en-us/azure/ai-services/openai/how-to/switching-endpoints for how to | |
# make API calls | |
if self.client_type == "openai": | |
completion = self.client.chat.completions.create( | |
model=self.model, | |
messages=messages, | |
temperature=self.temperature, | |
max_tokens=self.max_tokens, | |
stop=STOP, | |
) | |
elif self.client_type == "azure_openai": | |
completion = self.client.chat.completions.create( | |
model=self.model, | |
messages=messages, | |
temperature=self.temperature, | |
max_tokens=self.max_tokens, | |
stop=STOP, | |
) | |
else: | |
raise NotImplementedError | |
response = completion.choices[0].message.content | |
response = response.strip() | |
return response | |
def query( | |
self, | |
agent_name: str, | |
role_desc: str, | |
history_messages: List[Message], | |
global_prompt: str = None, | |
request_msg: Message = None, | |
*args, | |
**kwargs, | |
) -> str: | |
""" | |
Format the input and call the ChatGPT/GPT-4 API. | |
args: | |
agent_name: the name of the agent | |
role_desc: the description of the role of the agent | |
env_desc: the description of the environment | |
history_messages: the history of the conversation, or the observation for the agent | |
request_msg: the request from the system to guide the agent's next response | |
""" | |
# Merge the role description and the global prompt as the system prompt for the agent | |
if global_prompt: # Prepend the global prompt if it exists | |
system_prompt = f"You are a helpful assistant.\n{global_prompt.strip()}\n{BASE_PROMPT}\n\nYour name is {agent_name}.\n\nYour role:{role_desc}" | |
else: | |
system_prompt = f"You are a helpful assistant. Your name is {agent_name}.\n\nYour role:{role_desc}\n\n{BASE_PROMPT}" | |
all_messages = [(SYSTEM_NAME, system_prompt)] | |
for msg in history_messages: | |
if msg.agent_name == SYSTEM_NAME: | |
all_messages.append((SYSTEM_NAME, msg.content)) | |
else: # non-system messages are suffixed with the end of message token | |
all_messages.append((msg.agent_name, f"{msg.content}{END_OF_MESSAGE}")) | |
if request_msg: | |
all_messages.append((SYSTEM_NAME, request_msg.content)) | |
else: # The default request message that reminds the agent its role and instruct it to speak | |
all_messages.append( | |
(SYSTEM_NAME, f"Now you speak, {agent_name}.{END_OF_MESSAGE}") | |
) | |
messages = [] | |
for i, msg in enumerate(all_messages): | |
if i == 0: | |
assert ( | |
msg[0] == SYSTEM_NAME | |
) # The first message should be from the system | |
messages.append({"role": "system", "content": msg[1]}) | |
else: | |
if msg[0] == agent_name: | |
messages.append({"role": "assistant", "content": msg[1]}) | |
else: | |
if messages[-1]["role"] == "user": # last message is from user | |
if self.merge_other_agent_as_user: | |
messages[-1][ | |
"content" | |
] = f"{messages[-1]['content']}\n\n[{msg[0]}]: {msg[1]}" | |
else: | |
messages.append( | |
{"role": "user", "content": f"[{msg[0]}]: {msg[1]}"} | |
) | |
elif ( | |
messages[-1]["role"] == "assistant" | |
): # consecutive assistant messages | |
# Merge the assistant messages | |
messages[-1]["content"] = f"{messages[-1]['content']}\n{msg[1]}" | |
elif messages[-1]["role"] == "system": | |
messages.append( | |
{"role": "user", "content": f"[{msg[0]}]: {msg[1]}"} | |
) | |
else: | |
raise ValueError(f"Invalid role: {messages[-1]['role']}") | |
response = self._get_response(messages, *args, **kwargs) | |
# Remove the agent name if the response starts with it | |
response = re.sub(rf"^\s*\[.*]:", "", response).strip() # noqa: F541 | |
response = re.sub( | |
rf"^\s*{re.escape(agent_name)}\s*:", "", response | |
).strip() # noqa: F451 | |
# Remove the tailing end of message token | |
response = re.sub(rf"{END_OF_MESSAGE}$", "", response).strip() | |
return response | |