Spaces:
Runtime error
Runtime error
# encoding=utf-8 | |
import multiprocessing as mp | |
import warnings | |
import requests | |
import tiktoken | |
from tqdm import tqdm | |
from dataclasses import dataclass, field | |
from typing import ( | |
AbstractSet, | |
Any, | |
Callable, | |
Collection, | |
Dict, | |
Generator, | |
List, | |
Literal, | |
Mapping, | |
Optional, | |
Set, | |
Tuple, | |
Union, | |
) | |
from pydantic import Extra, Field, root_validator | |
from loguru import logger | |
from langchain.llms.base import BaseLLM | |
from langchain.schema import Generation, LLMResult | |
from langchain.utils import get_from_dict_or_env | |
from langchain.callbacks.manager import ( | |
AsyncCallbackManagerForLLMRun, | |
CallbackManagerForLLMRun, | |
) | |
import sys | |
import json | |
class ChatGPTConfig: | |
r"""Defines the parameters for generating chat completions using the | |
OpenAI API. | |
Args: | |
temperature (float, optional): Sampling temperature to use, between | |
:obj:`0` and :obj:`2`. Higher values make the output more random, | |
while lower values make it more focused and deterministic. | |
(default: :obj:`0.2`) | |
top_p (float, optional): An alternative to sampling with temperature, | |
called nucleus sampling, where the model considers the results of | |
the tokens with top_p probability mass. So :obj:`0.1` means only | |
the tokens comprising the top 10% probability mass are considered. | |
(default: :obj:`1.0`) | |
n (int, optional): How many chat completion choices to generate for | |
each input message. ()default: :obj:`1`) | |
stream (bool, optional): If True, partial message deltas will be sent | |
as data-only server-sent events as they become available. | |
(default: :obj:`False`) | |
stop (str or list, optional): Up to :obj:`4` sequences where the API | |
will stop generating further tokens. (default: :obj:`None`) | |
max_tokens (int, optional): The maximum number of tokens to generate | |
in the chat completion. The total length of input tokens and | |
generated tokens is limited by the model's context length. | |
(default: :obj:`None`) | |
presence_penalty (float, optional): Number between :obj:`-2.0` and | |
:obj:`2.0`. Positive values penalize new tokens based on whether | |
they appear in the text so far, increasing the model's likelihood | |
to talk about new topics. See more information about frequency and | |
presence penalties. (default: :obj:`0.0`) | |
frequency_penalty (float, optional): Number between :obj:`-2.0` and | |
:obj:`2.0`. Positive values penalize new tokens based on their | |
existing frequency in the text so far, decreasing the model's | |
likelihood to repeat the same line verbatim. See more information | |
about frequency and presence penalties. (default: :obj:`0.0`) | |
logit_bias (dict, optional): Modify the likelihood of specified tokens | |
appearing in the completion. Accepts a json object that maps tokens | |
(specified by their token ID in the tokenizer) to an associated | |
bias value from :obj:`-100` to :obj:`100`. Mathematically, the bias | |
is added to the logits generated by the model prior to sampling. | |
The exact effect will vary per model, but values between:obj:` -1` | |
and :obj:`1` should decrease or increase likelihood of selection; | |
values like :obj:`-100` or :obj:`100` should result in a ban or | |
exclusive selection of the relevant token. (default: :obj:`{}`) | |
user (str, optional): A unique identifier representing your end-user, | |
which can help OpenAI to monitor and detect abuse. | |
(default: :obj:`""`) | |
""" | |
temperature: float = 1.0 # openai default: 1.0 | |
top_p: float = 1.0 | |
max_in_tokens: int = 3200 | |
timeout: int = 20 | |
def get_userid_and_token( | |
url='http://avatar.aicubes.cn/vtuber/auth/api/oauth/v1/login', | |
app_id='6027294018fd496693d0b8c77e2d20a1', | |
app_secret='52806a6fff8a452497061b9dcc5779f4' | |
): | |
d = {'app_id': app_id, 'app_secret': app_secret} | |
h = {'Content-Type': 'application/json'} | |
r = requests.post(url, json=d, headers=h) | |
data = r.json()['data'] | |
return data['user_id'], data['token'] | |
class ChatAPI: | |
def __init__(self, timeout=20, verbose=False) -> None: | |
self.timeout = timeout | |
self.verbose = verbose | |
self.user_id, self.token = get_userid_and_token() | |
def create_chat_completion(self, messages: List[Dict[str, str]], model: str, temperature: float, max_tokens=None) -> str: | |
res = self.create_chat_completion_response_data(messages, model, temperature, max_tokens) | |
return res['choices'][0]['message']['content'] | |
def create_chat_completion_response_data(self, messages: List[Dict[str, str]], model: str, temperature: float, max_tokens=None): | |
res = self.create_chat_completion_response(messages, model, temperature, max_tokens) | |
res = res.json()['data'] | |
return res | |
def create_chat_completion_response(self, messages: List[Dict[str, str]], model: str, temperature: float, max_tokens=None): | |
chat_url = 'http://avatar.aicubes.cn/vtuber/ai_access/chatgpt/v1/chat/completions' | |
chat_header = { | |
'Content-Type': 'application/json', | |
'userId': self.user_id, | |
'token': self.token | |
} | |
payload = { | |
'model': model, | |
'messages': messages, | |
'temperature': temperature, | |
'max_tokens': max_tokens, | |
} | |
timeout = self.timeout | |
res = requests.post(chat_url, json=payload, headers=chat_header, timeout=timeout) | |
if self.verbose: | |
data = res.json()["data"] | |
if data is None: | |
logger.debug(res.json()) | |
else: | |
logger.debug(data["choices"][0]["message"]["content"]) | |
return res | |
class OpenAIChat(BaseLLM): | |
"""Wrapper around OpenAI Chat large language models. | |
To use, you should have the ``openai`` python package installed, and the | |
environment variable ``OPENAI_API_KEY`` set with your API key. | |
Any parameters that are valid to be passed to the openai.create call can be passed | |
in, even if not explicitly saved on this class. | |
Example: | |
.. code-block:: python | |
from langchain.llms import OpenAIChat | |
openaichat = OpenAIChat(model_name="gpt-3.5-turbo") | |
""" | |
model_name: str = "gpt-3.5-turbo" | |
"""Model name to use.""" | |
model_kwargs: Dict[str, Any] = Field(default_factory=dict) | |
"""Holds any model parameters valid for `create` call not explicitly specified.""" | |
max_retries: int = 6 | |
"""Maximum number of retries to make when generating.""" | |
prefix_messages: List = Field(default_factory=list) | |
"""Series of messages for Chat input.""" | |
streaming: bool = False | |
"""Whether to stream the results or not.""" | |
allowed_special: Union[Literal["all"], AbstractSet[str]] = set() | |
"""Set of special tokens that are allowed。""" | |
disallowed_special: Union[Literal["all"], Collection[str]] = "all" | |
"""Set of special tokens that are not allowed。""" | |
api = ChatAPI(timeout=60) | |
generate_verbose: bool = False | |
class Config: | |
"""Configuration for this pydantic object.""" | |
extra = Extra.ignore | |
def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]: | |
"""Build extra kwargs from additional params that were passed in.""" | |
all_required_field_names = {field.alias for field in cls.__fields__.values()} | |
extra = values.get("model_kwargs", {}) | |
for field_name in list(values): | |
if field_name not in all_required_field_names: | |
if field_name in extra: | |
raise ValueError(f"Found {field_name} supplied twice.") | |
extra[field_name] = values.pop(field_name) | |
values["model_kwargs"] = extra | |
return values | |
def validate_environment(cls, values: Dict) -> Dict: | |
"""Validate that api key and python package exists in environment.""" | |
return values | |
def _default_params(self) -> Dict[str, Any]: | |
"""Get the default parameters for calling OpenAI API.""" | |
return self.model_kwargs | |
def _get_chat_params( | |
self, prompts: List[str], stop: Optional[List[str]] = None | |
) -> Tuple: | |
if len(prompts) > 1: | |
raise ValueError( | |
f"OpenAIChat currently only supports single prompt, got {prompts}" | |
) | |
messages = self.prefix_messages + [{"role": "user", "content": prompts[0]}] | |
params: Dict[str, Any] = {**{"model": self.model_name}, **self._default_params} | |
if stop is not None: | |
if "stop" in params: | |
raise ValueError("`stop` found in both the input and default params.") | |
params["stop"] = stop | |
if params.get("max_tokens") == -1: | |
# for ChatGPT api, omitting max_tokens is equivalent to having no limit | |
del params["max_tokens"] | |
return messages, params | |
def _generate( | |
self, | |
prompts: List[str], | |
stop: Optional[List[str]] = None, | |
run_manager: Optional[CallbackManagerForLLMRun] = None, | |
) -> LLMResult: | |
messages, params = self._get_chat_params(prompts, stop) | |
if self.generate_verbose: | |
logger.debug(json.dumps(params, indent=2)) | |
for msg in messages: | |
logger.debug(msg["role"] + " : " + msg["content"]) | |
resp = self.api.create_chat_completion_response_data(messages, self.model_name, self.model_kwargs['temperature']) | |
full_response = resp | |
llm_output = { | |
"token_usage": full_response["usage"], | |
"model_name": self.model_name, | |
} | |
return LLMResult( | |
generations=[ | |
[Generation(text=full_response["choices"][0]["message"]["content"])] | |
], | |
llm_output=llm_output, | |
) | |
async def _agenerate( | |
self, | |
prompts: List[str], | |
stop: Optional[List[str]] = None, | |
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, | |
) -> LLMResult: | |
# messages, params = self._get_chat_params(prompts, stop) | |
# full_response = await acompletion_with_retry( | |
# self, messages=messages, **params | |
# ) | |
# llm_output = { | |
# "token_usage": full_response["usage"], | |
# "model_name": self.model_name, | |
# } | |
# return LLMResult( | |
# generations=[ | |
# [Generation(text=full_response["choices"][0]["message"]["content"])] | |
# ], | |
# llm_output=llm_output, | |
# ) | |
raise NotImplementedError("Async not supported for OpenAIChat") | |
def _identifying_params(self) -> Mapping[str, Any]: | |
"""Get the identifying parameters.""" | |
return {**{"model_name": self.model_name}, **self._default_params} | |
def _llm_type(self) -> str: | |
"""Return type of llm.""" | |
return "openai-chat" | |
def get_num_tokens(self, text: str) -> int: | |
"""Calculate num tokens with tiktoken package.""" | |
# tiktoken NOT supported for Python < 3.8 | |
if sys.version_info[1] < 8: | |
return super().get_num_tokens(text) | |
try: | |
import tiktoken | |
except ImportError: | |
raise ValueError( | |
"Could not import tiktoken python package. " | |
"This is needed in order to calculate get_num_tokens. " | |
"Please install it with `pip install tiktoken`." | |
) | |
# create a GPT-3.5-Turbo encoder instance | |
enc = tiktoken.encoding_for_model("gpt-3.5-turbo") | |
# encode the text using the GPT-3.5-Turbo encoder | |
tokenized_text = enc.encode( | |
text, | |
allowed_special=self.allowed_special, | |
disallowed_special=self.disallowed_special, | |
) | |
# calculate the number of tokens in the encoded text | |
return len(tokenized_text) | |
class ChatSession: | |
def __init__(self, prompt: str = '', chatgpt_config: ChatGPTConfig = ChatGPTConfig()) -> None: | |
self.chatgpt_config = chatgpt_config.__dict__ | |
self.user_id, self.token = self.get_userid_and_token() | |
encoding = tiktoken.encoding_for_model("gpt-3.5-turbo-0301") | |
self.count = lambda x: len(encoding.encode(x)) | |
self.history = [] | |
self.system = [self.make_msg("system", prompt)] if prompt else [] | |
def restart(self, prompt: str = '') -> None: | |
self.system = [self.make_msg("system", prompt)] if prompt else [] | |
def make_msg(role: str, msg: str) -> Dict: | |
assert role in {"system", "assistant", "user"} | |
return {"role": role, "content": msg} | |
def get_userid_and_token( | |
url='http://avatar.aicubes.cn/vtuber/auth/api/oauth/v1/login', | |
app_id='6027294018fd496693d0b8c77e2d20a1', | |
app_secret='52806a6fff8a452497061b9dcc5779f4' | |
): | |
d = {'app_id': app_id, 'app_secret': app_secret} | |
h = {'Content-Type': 'application/json'} | |
r = requests.post(url, json=d, headers=h) | |
data = r.json()['data'] | |
return data['user_id'], data['token'] | |
def make_chat_session(self, user_id: str, token: str, input_message: List[Dict[str, str]]): | |
chat_h = { | |
'Content-Type': 'application/json', | |
'userId': user_id, | |
'token': token | |
} | |
chat_url = 'http://avatar.aicubes.cn/vtuber/ai_access/chatgpt/v1/chat/completions' | |
res = requests.post(chat_url, json={ | |
'messages': input_message, **self.chatgpt_config | |
}, headers=chat_h, timeout=self.chatgpt_config['timeout']) | |
return res.json()['data']['choices'][0]['message']['content'] | |
def create_chat_completion(self, messages: List[Dict[str, str]], model: str, temperature: float, max_tokens=None) -> str: | |
chat_url = 'http://avatar.aicubes.cn/vtuber/ai_access/chatgpt/v1/chat/completions' | |
chat_header = { | |
'Content-Type': 'application/json', | |
'userId': self.user_id, | |
'token': self.token | |
} | |
payload = { | |
'model': model, | |
'messages': messages, | |
'temperature': temperature, | |
'max_tokens': max_tokens, | |
} | |
timeout = self.chatgpt_config['timeout'] | |
res = requests.post(chat_url, json=payload, headers=chat_header, timeout=timeout) | |
return res.json()['data']['choices'][0]['message']['content'] | |
def chat(self, msg: str): | |
self.history.append(self.make_msg("user", msg)) | |
init_tokenCnt = self.count(self.system[0]['content']) if self.system else 0 | |
inputStaMsgIdx, tokenCnt = len(self.history), init_tokenCnt | |
while inputStaMsgIdx and ( | |
tokenCnt := tokenCnt + self.count(self.history[inputStaMsgIdx - 1]['content'])) < \ | |
self.chatgpt_config['max_in_tokens']: | |
inputStaMsgIdx -= 1 | |
inputStaMsgIdx = inputStaMsgIdx if inputStaMsgIdx < len(self.history) else -1 | |
res = self.make_chat_session(self.user_id, self.token, self.system + self.history[inputStaMsgIdx:]) | |
self.history.append(self.make_msg("assistant", res)) | |
return res | |
def batch_chat(info_lst: List, request_num: int = 6) -> List: | |
res = [] | |
pool = mp.Pool(processes=request_num) | |
for id, res_text in tqdm(pool.imap(single_chat, info_lst), desc="Asking API", total=len(info_lst)): | |
if res_text: | |
res.append((id, res_text)) | |
return res | |
def single_chat(info: Dict) -> (int, str): | |
sess = ChatSession(info['sys'], info['config']) | |
try: | |
res = sess.chat(info['query']) | |
return info['id'], res | |
except Exception as e: | |
print(e) | |
return info['id'], "" | |
if __name__ == '__main__': | |
sys_prompt = """ | |
你是一位严格的评分员,我会给你一个指令和这个指令的回复,你需要仔细检查回复并给出分数,你可以从多个角度评判这个回复,比如: | |
回复是否准确、是否详尽、是否无害、是否完全符合指令里的要求,等等。分数分为5个等级,1分:完全不可用,2分:不可用但完成了部分指令, | |
3分:可用但有明显缺陷,4分:可用但有少许缺陷,5分:可用且没有缺陷。你在工作时需要加入自己的思考,并在最后给出分数。 | |
下面是一个例子: | |
User: \n\n<指令>马云的妻子是谁?</指令>\n\n<回复>马云的妻子是张英琪。</回复> | |
Assistant: 这个回复错误,马云是阿里巴巴创始人,他的妻子是张瑛,因此回复错误,因此,我的分数是[1分]。 | |
""" | |
aaa = """ | |
fq(xm, m) = (Wqxm)e^(imθ) | |
fk(xn, n) = (Wkxn)e^(inθ) | |
g(xm, xn, m − n) = Re[(Wqxm)(Wkxn)∗e^(i(m−n)θ)] | |
""" | |
prompt = 'User: \n\n<指令>姚明多高</指令>\n\n<回复>18m</回复>\nAssistant:' | |
bbb = "The given equation defines a function g(xm, xn, m-n) in terms of two complex functions fq(xm, m) and fk(xn, n) and their corresponding Fourier coefficients Wq and Wk, respectively. The function g(xm, xn, m-n) takes the real part of the product of the two complex exponential terms with phase angles m-theta and n-theta, respectively, where theta is an arbitrary constant angle. The term (m-n)theta in the exponent indicates that the two exponential terms are shifted by a phase difference of (m-n)theta." | |
session = ChatSession('解释公式的含义') | |
# print(session.chat(aaa)) | |
print(session.chat("你是谁?谁创造了你?你的知识截止于什么时候?你可以给自己取一个名字,请告诉我你的名字")) | |