Spaces:
Sleeping
Sleeping
File size: 7,016 Bytes
43c34cc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
import re
from typing import List
from tenacity import retry, stop_after_attempt, wait_random_exponential
from agentreview.arguments import parse_args
from agentreview.utility.authentication_utils import get_openai_client
from .base import IntelligenceBackend
from ..message import SYSTEM_NAME, Message
# Default config follows the OpenAI playground
DEFAULT_TEMPERATURE = 1.0
DEFAULT_MAX_TOKENS = 4096
# Check https://platform.openai.com/docs/models for more models
DEFAULT_MODEL = "gpt-4o"
END_OF_MESSAGE = "<EOS>" # End of message token specified by us not OpenAI
STOP = ("<|endoftext|>", END_OF_MESSAGE) # End of sentence token
BASE_PROMPT = f"The messages always end with the token {END_OF_MESSAGE}."
class OpenAIChat(IntelligenceBackend):
"""Interface to the ChatGPT style model with system, user, assistant roles separation."""
stateful = False
type_name = "openai-chat"
def __init__(
self,
temperature: float = DEFAULT_TEMPERATURE,
max_tokens: int = DEFAULT_MAX_TOKENS,
model: str = DEFAULT_MODEL,
merge_other_agents_as_one_user: bool = True,
**kwargs,
):
"""
Instantiate the OpenAIChat backend.
args:
temperature: the temperature of the sampling
max_tokens: the maximum number of tokens to sample
model: the model to use
merge_other_agents_as_one_user: whether to merge messages from other agents as one user message
"""
super().__init__(
temperature=temperature,
max_tokens=max_tokens,
model=model,
merge_other_agents_as_one_user=merge_other_agents_as_one_user,
**kwargs,
)
self.client_type = kwargs.get("openai_client_type", None)
self.client = get_openai_client(self.client_type)
self.temperature = temperature
self.max_tokens = max_tokens
self.model = model
self.merge_other_agent_as_user = merge_other_agents_as_one_user
@retry(stop=stop_after_attempt(6), wait=wait_random_exponential(min=1, max=60))
def _get_response(self, messages):
# Refer to https://learn.microsoft.com/en-us/azure/ai-services/openai/how-to/switching-endpoints for how to
# make API calls
if self.client_type == "openai":
completion = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=self.temperature,
max_tokens=self.max_tokens,
stop=STOP,
)
elif self.client_type == "azure_openai":
completion = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=self.temperature,
max_tokens=self.max_tokens,
stop=STOP,
)
else:
raise NotImplementedError
response = completion.choices[0].message.content
response = response.strip()
return response
def query(
self,
agent_name: str,
role_desc: str,
history_messages: List[Message],
global_prompt: str = None,
request_msg: Message = None,
*args,
**kwargs,
) -> str:
"""
Format the input and call the ChatGPT/GPT-4 API.
args:
agent_name: the name of the agent
role_desc: the description of the role of the agent
env_desc: the description of the environment
history_messages: the history of the conversation, or the observation for the agent
request_msg: the request from the system to guide the agent's next response
"""
# Merge the role description and the global prompt as the system prompt for the agent
if global_prompt: # Prepend the global prompt if it exists
system_prompt = f"You are a helpful assistant.\n{global_prompt.strip()}\n{BASE_PROMPT}\n\nYour name is {agent_name}.\n\nYour role:{role_desc}"
else:
system_prompt = f"You are a helpful assistant. Your name is {agent_name}.\n\nYour role:{role_desc}\n\n{BASE_PROMPT}"
all_messages = [(SYSTEM_NAME, system_prompt)]
for msg in history_messages:
if msg.agent_name == SYSTEM_NAME:
all_messages.append((SYSTEM_NAME, msg.content))
else: # non-system messages are suffixed with the end of message token
all_messages.append((msg.agent_name, f"{msg.content}{END_OF_MESSAGE}"))
if request_msg:
all_messages.append((SYSTEM_NAME, request_msg.content))
else: # The default request message that reminds the agent its role and instruct it to speak
all_messages.append(
(SYSTEM_NAME, f"Now you speak, {agent_name}.{END_OF_MESSAGE}")
)
messages = []
for i, msg in enumerate(all_messages):
if i == 0:
assert (
msg[0] == SYSTEM_NAME
) # The first message should be from the system
messages.append({"role": "system", "content": msg[1]})
else:
if msg[0] == agent_name:
messages.append({"role": "assistant", "content": msg[1]})
else:
if messages[-1]["role"] == "user": # last message is from user
if self.merge_other_agent_as_user:
messages[-1][
"content"
] = f"{messages[-1]['content']}\n\n[{msg[0]}]: {msg[1]}"
else:
messages.append(
{"role": "user", "content": f"[{msg[0]}]: {msg[1]}"}
)
elif (
messages[-1]["role"] == "assistant"
): # consecutive assistant messages
# Merge the assistant messages
messages[-1]["content"] = f"{messages[-1]['content']}\n{msg[1]}"
elif messages[-1]["role"] == "system":
messages.append(
{"role": "user", "content": f"[{msg[0]}]: {msg[1]}"}
)
else:
raise ValueError(f"Invalid role: {messages[-1]['role']}")
response = self._get_response(messages, *args, **kwargs)
# Remove the agent name if the response starts with it
response = re.sub(rf"^\s*\[.*]:", "", response).strip() # noqa: F541
response = re.sub(
rf"^\s*{re.escape(agent_name)}\s*:", "", response
).strip() # noqa: F451
# Remove the tailing end of message token
response = re.sub(rf"{END_OF_MESSAGE}$", "", response).strip()
return response
|