import os
import pprint
import re
from typing import List, Optional, Union
from qwen_agent import Agent, MultiAgentHub
from qwen_agent.agents.user_agent import PENDING_USER_INPUT
from qwen_agent.gui.gradio_utils import format_cover_html
from qwen_agent.gui.utils import convert_fncall_to_text, convert_history_to_chatbot, get_avatar_image
from qwen_agent.llm.schema import CONTENT, FILE, IMAGE, NAME, ROLE, USER, Message
from qwen_agent.log import logger
from qwen_agent.utils.utils import print_traceback
from patching import common_programming_language_extensions
class WebUI:
"""A Common chatbot application for agent."""
def __init__(self, agent: Union[Agent, MultiAgentHub, List[Agent]], chatbot_config: Optional[dict] = None):
"""
Initialization the chatbot.
Args:
agent: The agent or a list of agents,
supports various types of agents such as Assistant, GroupChat, Router, etc.
chatbot_config: The chatbot configuration.
Set the configuration as {'user.name': '', 'user.avatar': '', 'agent.avatar': '', 'input.placeholder': '', 'prompt.suggestions': []}.
"""
chatbot_config = chatbot_config or {}
if isinstance(agent, MultiAgentHub):
self.agent_list = [agent for agent in agent.nonuser_agents]
self.agent_hub = agent
elif isinstance(agent, list):
self.agent_list = agent
self.agent_hub = None
else:
self.agent_list = [agent]
self.agent_hub = None
user_name = chatbot_config.get('user.name', 'user')
self.user_config = {
'name': user_name,
'avatar': chatbot_config.get(
'user.avatar',
get_avatar_image(user_name),
),
}
self.agent_config_list = [{
'name': agent.name,
'avatar': chatbot_config.get(
'agent.avatar',
os.path.join(os.path.dirname(__file__), 'assets/logo.jpeg'),
),
'description': agent.description or "I'm a helpful assistant.",
} for agent in self.agent_list]
self.input_placeholder = chatbot_config.get('input.placeholder', '跟我聊聊吧~')
self.prompt_suggestions = chatbot_config.get('prompt.suggestions', [])
self.verbose = chatbot_config.get('verbose', False)
"""
Run the chatbot.
Args:
messages: The chat history.
"""
def run(self,
messages: List[Message] = None,
share: bool = False,
server_name: str = None,
server_port: int = None,
concurrency_limit: int = 10,
enable_mention: bool = False,
**kwargs):
self.run_kwargs = kwargs
from qwen_agent.gui.gradio import gr, mgr
customTheme = gr.themes.Default(
primary_hue=gr.themes.utils.colors.blue,
radius_size=gr.themes.utils.sizes.radius_none,
)
with gr.Blocks(
css=os.path.join(os.path.dirname(__file__), 'assets/appBot.css'),
theme=customTheme,
) as demo:
history = gr.State([])
with gr.Row(elem_classes='container'):
with gr.Column(scale=4):
chatbot = mgr.Chatbot(value=convert_history_to_chatbot(messages=messages),
avatar_images=[
self.user_config,
self.agent_config_list,
],
height=650,
avatar_image_width=80,
flushing=False,
show_copy_button=True,
latex_delimiters=[{
'left': '\\(',
'right': '\\)',
'display': True
}, {
'left': '\\begin{equation}',
'right': '\\end{equation}',
'display': True
}, {
'left': '\\begin{align}',
'right': '\\end{align}',
'display': True
}, {
'left': '\\begin{alignat}',
'right': '\\end{alignat}',
'display': True
}, {
'left': '\\begin{gather}',
'right': '\\end{gather}',
'display': True
}, {
'left': '\\begin{CD}',
'right': '\\end{CD}',
'display': True
}, {
'left': '\\[',
'right': '\\]',
'display': True
}])
input = mgr.MultimodalInput(placeholder=self.input_placeholder, upload_button_props=dict(file_types=[".pdf", ".docx", ".pptx", ".txt", ".html", ".csv", ".tsv", ".xlsx", ".xls"] + ["." + file_type for file_type in common_programming_language_extensions]))
with gr.Column(scale=1):
if len(self.agent_list) > 1:
agent_selector = gr.Dropdown(
[(agent.name, i) for i, agent in enumerate(self.agent_list)],
label='Agents',
info='选择一个Agent',
value=0,
interactive=True,
)
agent_info_block = self._create_agent_info_block()
# agent_plugins_block = self._create_agent_plugins_block()
if self.prompt_suggestions:
gr.Examples(
label='推荐对话',
examples=self.prompt_suggestions,
inputs=[input],
)
if len(self.agent_list) > 1:
agent_selector.change(
fn=self.change_agent,
inputs=[agent_selector],
outputs=[agent_selector, agent_info_block, agent_plugins_block],
queue=False,
)
input_promise = input.submit(
fn=self.add_text,
inputs=[input, chatbot, history],
outputs=[input, chatbot, history],
queue=False,
)
if len(self.agent_list) > 1 and enable_mention:
input_promise = input_promise.then(
self.add_mention,
[chatbot, agent_selector],
[chatbot, agent_selector],
).then(
self.agent_run,
[chatbot, history, agent_selector],
[chatbot, history, agent_selector],
)
else:
input_promise = input_promise.then(
self.agent_run,
[chatbot, history],
[chatbot, history],
)
input_promise.then(self.flushed, None, [input])
demo.load(None)
demo.queue(default_concurrency_limit=concurrency_limit).launch(share=share,
server_name=server_name,
server_port=server_port)
def change_agent(self, agent_selector):
yield agent_selector, self._create_agent_info_block(agent_selector), self._create_agent_plugins_block(
agent_selector)
def add_text(self, _input, _chatbot, _history):
from qwen_agent.gui.gradio import gr
if _input.text == "/clear":
_chatbot = []
_history.clear()
yield gr.update(interactive=False, value=""), _chatbot, _history
return
_history.append({
ROLE: USER,
CONTENT: [{
'text': _input.text
}],
})
if self.user_config[NAME]:
_history[-1][NAME] = self.user_config[NAME]
if _input.files:
for file in _input.files:
if file.mime_type.startswith('image/'):
_history[-1][CONTENT].append({IMAGE: 'file://' + file.path})
else:
_history[-1][CONTENT].append({FILE: file.path})
_chatbot.append([_input, None])
yield gr.update(interactive=False, value=None), _chatbot, _history
def add_mention(self, _chatbot, _agent_selector):
if len(self.agent_list) == 1:
yield _chatbot, _agent_selector
query = _chatbot[-1][0].text
match = re.search(r'@\w+\b', query)
if match:
_agent_selector = self._get_agent_index_by_name(match.group()[1:])
agent_name = self.agent_list[_agent_selector].name
if ('@' + agent_name) not in query and self.agent_hub is None:
_chatbot[-1][0].text = '@' + agent_name + ' ' + query
yield _chatbot, _agent_selector
def agent_run(self, _chatbot, _history, _agent_selector=None):
if not _history:
if _agent_selector is not None:
yield _chatbot, _history, _agent_selector
else:
yield _chatbot, _history
return
if self.verbose:
logger.info('agent_run input:\n' + pprint.pformat(_history, indent=2))
num_input_bubbles = len(_chatbot) - 1
num_output_bubbles = 1
_chatbot[-1][1] = [None for _ in range(len(self.agent_list))]
agent_runner = self.agent_list[_agent_selector or 0]
if self.agent_hub:
agent_runner = self.agent_hub
responses = []
for responses in agent_runner.run(_history, **self.run_kwargs):
# usage = responses.usage
# responses = [Message(ASSISTANT, responses.output.choices[0].message.content)]
if not responses:
continue
if responses[-1][CONTENT] == PENDING_USER_INPUT:
logger.info('Interrupted. Waiting for user input!')
break
display_responses = convert_fncall_to_text(responses)
# display_responses[-1][CONTENT] += "\n" + repr({"usage": usage}) + ""
if not display_responses:
continue
if display_responses[-1][CONTENT] is None:
continue
while len(display_responses) > num_output_bubbles:
# Create a new chat bubble
_chatbot.append([None, None])
_chatbot[-1][1] = [None for _ in range(len(self.agent_list))]
num_output_bubbles += 1
assert num_output_bubbles == len(display_responses)
assert num_input_bubbles + num_output_bubbles == len(_chatbot)
for i, rsp in enumerate(display_responses):
agent_index = self._get_agent_index_by_name(rsp[NAME])
_chatbot[num_input_bubbles + i][1][agent_index] = rsp[CONTENT]
if len(self.agent_list) > 1:
_agent_selector = agent_index
if _agent_selector is not None:
yield _chatbot, _history, _agent_selector
else:
yield _chatbot, _history
if responses:
for res in responses:
res['content'] = re.sub(r"\ninput tokens.*", "", res['content'])
_history.extend([res for res in responses if res[CONTENT] != PENDING_USER_INPUT])
if _agent_selector is not None:
yield _chatbot, _history, _agent_selector
else:
yield _chatbot, _history
if self.verbose:
logger.info('agent_run response:\n' + pprint.pformat(responses, indent=2))
def flushed(self):
from qwen_agent.gui.gradio import gr
return gr.update(interactive=True)
def _get_agent_index_by_name(self, agent_name):
if agent_name is None:
return 0
try:
agent_name = agent_name.strip()
for i, agent in enumerate(self.agent_list):
if agent.name == agent_name:
return i
return 0
except Exception:
print_traceback()
return 0
def _create_agent_info_block(self, agent_index=0):
from qwen_agent.gui.gradio import gr
agent_config_interactive = self.agent_config_list[agent_index]
return gr.HTML(
format_cover_html(
bot_name=agent_config_interactive['name'],
bot_description=agent_config_interactive['description'],
bot_avatar=agent_config_interactive['avatar'],
))
def _create_agent_plugins_block(self, agent_index=0):
from qwen_agent.gui.gradio import gr
agent_interactive = self.agent_list[agent_index]
if agent_interactive.function_map:
capabilities = [key for key in agent_interactive.function_map.keys()]
return gr.CheckboxGroup(
label='插件',
value=capabilities,
choices=capabilities,
interactive=False,
)
else:
return gr.CheckboxGroup(
label='插件',
value=[],
choices=[],
interactive=False,
)