Spaces:
Running
Running
from huggingface_hub import InferenceClient, HfApi, HfFileSystem | |
#from html2image import Html2Image | |
import gradio as gr | |
#import markdown | |
import requests | |
import random | |
import prompts | |
#import im_prompts | |
import uuid | |
import json | |
#import PIL | |
#import bs4 | |
import re | |
import os | |
fs = HfFileSystem() | |
loc_folder="chat_history" | |
loc_file="chat_json" | |
user_="community-pool/" | |
repo_="test3" | |
clients = [ | |
{'type':'image','name':'black-forest-labs/FLUX.1-dev','rank':'op','max_tokens':16384,'schema':{'bos':'<|im_start|>','eos':'<|im_end|>'}}, | |
{'type':'text','name':'deepseek-ai/DeepSeek-V2.5-1210','rank':'op','max_tokens':16384,'schema':{'bos':'<|im_start|>','eos':'<|im_end|>'}}, | |
{'type':'text','name':'Qwen/Qwen2.5-Coder-32B-Instruct','rank':'op','max_tokens':32768,'schema':{'bos':'<|im_start|>','eos':'<|im_end|>'}}, | |
{'type':'text','name':'meta-llama/Meta-Llama-3-8B','rank':'op','max_tokens':32768,'schema':{'bos':'<|im_start|>','eos':'<|im_end|>'}}, | |
{'type':'text','name':'Snowflake/snowflake-arctic-embed-l-v2.0','rank':'op','max_tokens':4096,'schema':{'bos':'<|im_start|>','eos':'<|im_end|>'}}, | |
{'type':'text','name':'Snowflake/snowflake-arctic-embed-m-v2.0','rank':'op','max_tokens':4096,'schema':{'bos':'<|im_start|>','eos':'<|im_end|>'}}, | |
{'type':'text','name':'HuggingFaceTB/SmolLM2-1.7B-Instruct','rank':'op','max_tokens':4096,'schema':{'bos':'<|im_start|>','eos':'<|im_end|>'}}, | |
{'type':'text','name':'Qwen/QwQ-32B-Preview','rank':'op','max_tokens':16384,'schema':{'bos':'<|im_start|>','eos':'<|im_end|>'}}, | |
{'type':'text','name':'meta-llama/Llama-3.3-70B-Instruct','rank':'pro','max_tokens':16384,'schema':{'bos':'<|im_start|>','eos':'<|im_end|>'}}, | |
{'type':'text','name':'mistralai/Mixtral-8x7B-Instruct-v0.1','rank':'op','max_tokens':40000,'schema':{'bos':'<s>','eos':'</s>'}}, | |
] | |
def file_template(inp): | |
if "readme.md" in inp.lower(): | |
template=prompts.README | |
else:template="NONE" | |
return template | |
def format_prompt(message, mod, system): | |
eos=f"{clients[int(mod)]['schema']['eos']}\n" | |
bos=f"{clients[int(mod)]['schema']['bos']}\n" | |
prompt="" | |
prompt+=bos | |
prompt+=system | |
prompt+=eos | |
prompt+=bos | |
prompt += f"[INST] {message} [/INST]" | |
prompt+=eos | |
prompt+=bos | |
return prompt | |
def generate(prompt,history,mod=2,tok=4000,seed=1,role="ASSISTANT",data=None): | |
#print("#####",history,"######") | |
try: | |
file_list = fs.ls(f'spaces/{user_}{repo_}',detail=False) | |
except Exception as e: | |
print(e) | |
file_list=["NO FILES YET"] | |
print('file list\n',file_list) | |
gen_images=False | |
client=InferenceClient(clients[int(mod)]['name']) | |
client_tok=clients[int(mod)]['max_tokens'] | |
good_seed=[947385642222,7482965345792,8584806344673] | |
if not os.path.isdir(loc_folder):os.mkdir(loc_folder) | |
if os.path.isfile(f'{loc_folder}/{loc_file}.json'): | |
with open(f'{loc_folder}/{loc_file}.json','r') as word_dict: | |
lod=json.loads(word_dict.read()) | |
word_dict.close() | |
else: | |
lod=[] | |
if role == "MANAGER": | |
system_prompt = prompts.MANAGER.replace("**TIMELINE**",data[4]).replace("**HISTORY**",str(history)) | |
formatted_prompt = format_prompt(prompt, mod, system_prompt) | |
elif role == "PATHMAKER": | |
system_prompt = prompts.PATH_MAKER.replace("**FILE_LIST**",str(file_list)).replace("**CURRENT_OR_NONE**",str(data[4])).replace("**PROMPT**",json.dumps(data[0],indent=4)).replace("**HISTORY**",str(history)) | |
formatted_prompt = format_prompt(prompt, mod, system_prompt) | |
elif role == "CREATE_FILE": | |
system_prompt = prompts.CREATE_FILE.replace("**FILE_LIST**",str(file_list)).replace("**TIMELINE**",data[4]).replace("**FILENAME**",str(data[1])).replace("**TEMPLATE_OR_NONE**",str(data[2])) | |
formatted_prompt = format_prompt(prompt, mod, system_prompt) | |
elif role == "SEARCH": | |
system_prompt = prompts.SEARCH.replace("**DATA**",data) | |
formatted_prompt = format_prompt(f'USER:{prompt}', mod, system_prompt) | |
else: system_prompt = "";formatted_prompt = format_prompt(f'USER:{prompt}', mod, system_prompt) | |
if tok==None:tok=client_tok-len(formatted_prompt)+10 | |
print("tok",tok) | |
generate_kwargs = dict( | |
temperature=0.9, | |
max_new_tokens=tok, #total tokens - input tokens | |
top_p=0.99, | |
repetition_penalty=1.0, | |
do_sample=True, | |
seed=seed, | |
) | |
output = "" | |
if role=="MANAGER": | |
print("Running Manager") | |
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=True) | |
for response in stream: | |
output += response.token.text | |
yield output | |
yield history | |
yield prompt | |
elif role=="PATHMAKER": | |
print("Runnning ", role) | |
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=True) | |
#prompt=f"We just completed role:{role}, now choose the next tool to complete the task:{prompt}, or COMPLETE" | |
for response in stream: | |
output += response.token.text | |
print(output) | |
yield output | |
yield history | |
yield prompt | |
elif role=="CREATE_FILE": | |
print("Running Create File") | |
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=True) | |
for response in stream: | |
output += response.token.text | |
#print(file_content) | |
print(output) | |
yield 'test1' | |
yield data[1] | |
yield output | |
#yield output | |
#yield history | |
#yield prompt | |
#with open(f'{loc_folder}/{loc_file}.json','w') as jobj: | |
# lod.append({'prompt':prompt,'response':output,'image':im_box,'model':clients[1]['name'],'seed':seed}), | |
# jobj.write(json.dumps(lod,indent=4)) | |
#jobj.close() | |
#chat_im_out=chat_img(output) | |
'''def gen_im(prompt,seed): | |
print('generating image') | |
image_out = im_client.text_to_image(prompt=prompt['text'],height=128,width=128,num_inference_steps=10,seed=seed) | |
#print(type(image_out)) | |
output=f'images/{uuid.uuid4()}.png' | |
image_out.save(output) | |
print('Done: ',output) | |
return [{'role':'assistant','content': {'path':output}}]''' | |
def parse_json(inp): | |
print("PARSE INPUT") | |
print(inp) | |
if type(inp)==type(""): | |
lines="" | |
if "```" in inp: | |
start = inp.find("```json") + 7 # Find index after the start character | |
end = inp.find("```", start) # Find index of end character from the start index | |
if start >= 0 and end >= 0: | |
inp= inp[start:end] # Slice the string between start and end | |
else: | |
inp="NONE" # Return None if characters not found | |
'''tog=False | |
cnt=0 | |
for line in inp.split("\n"): | |
if tog==True: | |
if not "```" in line: | |
lines+=line | |
if "```" in line: | |
cnt+=1 | |
if cnt==1: | |
tog = True | |
elif cnt==2: | |
tog = False''' | |
print("Extracted Lines") | |
print(inp) | |
try: | |
out_json=eval(inp) | |
out1=str(out_json['filename']) | |
out2=str(out_json['filecontent']) | |
return out1,out2 | |
except Exception as e: | |
print(e) | |
return "None","None" | |
if type(inp)==type({}): | |
out1=str(inp['filename']) | |
out2=str(inp['filecontent']) | |
return out1,out2 | |
def build_space(repo_name,file_name,file_content,access_token=""): | |
try: | |
repo_path=user_+str(repo_) | |
if not access_token:access_token=os.environ['HF_TOKEN'] | |
api=HfApi(endpoint="https://huggingface.co", token=access_token) | |
repo_url = api.create_repo( | |
repo_id=repo_path, | |
repo_type="space", | |
space_sdk="gradio", | |
exist_ok=True, | |
private=False, | |
) | |
# Create a new Space | |
#response = api.create_repo(repo_path) | |
#space_info = repo_url.json() | |
#print(space_info) | |
#space_id = space_info["name"] | |
#print(f"Created Space with ID: {space_id}") | |
local_file_path=str(uuid.uuid4()) | |
with open(local_file_path, 'w') as f: | |
f.write(str(file_content)) | |
f.close() | |
# Upload a local file to the Space | |
commit_message = "Adding file test: "+ str(file_name) | |
api.upload_file(path_or_fileobj=local_file_path, path_in_repo=file_name, repo_id=repo_path, repo_type='space', commit_message=commit_message) | |
print("File uploaded successfully.") | |
# Commit changes | |
commit_message += "\nInitial commit to the repository."+ f'{repo_path}/' + f'{file_name}' | |
#api.commit_repo(space_id, message=commit_message) | |
return [{'role':'assistant','content': commit_message+'\nCommit Success' }] | |
except Exception as e: | |
print("ERROR ",e) | |
return [{'role':'assistant','content': 'There was an Error: ' + str(e)}] | |
def agent(prompt_in,history,mod=2): | |
print(prompt_in) | |
print('mod ',mod) | |
in_data=[None,None,None,None,None,] | |
#in_data[0]=prompt_in['text'] | |
in_data[0]=prompt_in | |
prompt=prompt_in | |
fn="" | |
com="" | |
go=True | |
MAX_DATA=int(clients[int(mod)]['max_tokens'])*2 | |
if not history:history=[{'role':'user','content':prompt_in['text']}] | |
while go == True: | |
seed = random.randint(1,9999999999999) | |
c=0 | |
#history = [history[-4:]] | |
if len(str(history)) > MAX_DATA*4: | |
history = [history[-2:]] | |
print('history',history) | |
role="PATHMAKER" | |
outph= list(generate(prompt,history,mod,2400,seed,role,in_data))[0] | |
in_data[4]=outph | |
print(outph) | |
history.extend([{'role':'assistant','content':str(outph)}]) | |
yield history | |
role="MANAGER" | |
outp=generate(prompt,history,mod,128,seed,role,in_data) | |
outp0=list(outp)[0].split('<|im_end|>')[0] | |
#outp0 = re.sub('[^a-zA-Z0-9\s.,?!%()]', '', outpp) | |
history.extend([{'role':'assistant','content':str(outp0)}]) | |
yield history | |
for line in outp0.split("\n"): | |
if "action:" in line: | |
try: | |
com_line = line.split('action:')[1] | |
fn = com_line.split('action_input=')[0] | |
com = com_line.split('action_input=')[1].split('<|im_end|>')[0] | |
#com = com_line.split('action_input=')[1].replace('<|im_end|>','').replace("}","").replace("]","").replace("'","") | |
print(com) | |
except Exception as e: | |
pass | |
fn="NONE" | |
if 'CREATE_FILE' in fn: | |
print('CREATE_FILE called') | |
in_data[1]=com | |
temp1=file_template(com) | |
in_data[2]=temp1 | |
out_o =generate(prompt,history,mod=mod,tok=10000,seed=seed,role="CREATE_FILE",data=in_data) | |
out_w=list(out_o) | |
ret1,ret2 = parse_json(out_w[2].split('<|im_end|>')[0]) | |
print('ret1',ret1) | |
print('ret2',ret2) | |
build_out = build_space(repo_,ret1,ret2) | |
history+=[{'role':'system','content':f'observation:{build_out}'}] | |
yield history | |
elif 'READ_FILE' in fn: | |
try: | |
file_read = fs.read_text(f'spaces/{user_}{repo_}/{com}',detail=False) | |
except Exception as e: | |
print(e) | |
file_read="FILE HAS NO CONTENT" | |
print('file list\n',file_read) | |
history+=[{'role':'system','content':f'RETURNED FILE CONTENT: NAME: spaces/{user_}{repo_}/{com} CONTENT:{build_out}'}] | |
yield history | |
elif 'IMAGE' in fn: | |
print('IMAGE called') | |
#out_im=gen_im(prompt,seed) | |
#yield [{'role':'assistant','content': out_im}] | |
elif 'SEARCH' in fn: | |
print('SEARCH called') | |
elif 'COMPLETE' in fn: | |
print('COMPLETE') | |
go=False | |
break | |
elif 'NONE' in fn: | |
print('ERROR ACTION NOT FOUND') | |
history+=[{'role':'system','content':f'observation:The last thing we attempted resulted in an error, check formatting on the tool call'}] | |
else:pass;seed = random.randint(1,9999999999999) | |
with gr.Blocks() as ux: | |
with gr.Row(): | |
with gr.Column(): | |
gr.HTML("""<center><div style='font-size:xx-large;font-weight:900;'>Chatbo</div>""") | |
chatbot=gr.Chatbot(type='messages',show_label=False, show_share_button=False, show_copy_button=True, layout="panel") | |
prompt=gr.MultimodalTextbox(label="Prompt",file_count="multiple", file_types=["image"]) | |
mod_c=gr.Dropdown(choices=[n['name'] for n in clients],value='Qwen/Qwen2.5-Coder-32B-Instruct',type='index') | |
#chat_ux=gr.ChatInterface(fn=agent,chatbot=chatbot,additional_inputs=[mod_c]).load() | |
#chat_ux.additional_inputs=[mod_c] | |
#chat_ux.load() | |
with gr.Row(): | |
submit_b = gr.Button() | |
stop_b = gr.Button("Stop") | |
clear = gr.ClearButton([chatbot,prompt]) | |
with gr.Row(visible=False): | |
stt=gr.Textbox() | |
with gr.Column(): | |
html_view=gr.HTML("""<iframe src='https://huggingface.co/spaces/community-pool/test1/tree/main' height='1000' width='200'>Viewer Space</iframe>""") | |
sub_b = submit_b.click(agent, [prompt,chatbot,mod_c],chatbot) | |
sub_p = prompt.submit(agent, [prompt,chatbot,mod_c],chatbot) | |
stop_b.click(None,None,None, cancels=[sub_b,sub_p]) | |
ux.queue(default_concurrency_limit=20).launch(max_threads=40) | |