Basic_Agent / app.py
broadfield's picture
Update app.py
ac71a91 verified
from huggingface_hub import InferenceClient, HfApi, HfFileSystem
import gradio as gr
import requests
import random
import prompts
import uuid
import json
import re
import os
fs = HfFileSystem()
loc_folder="chat_history"
loc_file="chat_json"
user_="community-pool/"
repo_="test3"
clients = [
{'type':'image','name':'black-forest-labs/FLUX.1-dev','rank':'op','max_tokens':16384,'schema':{'bos':'<|im_start|>','eos':'<|im_end|>'}},
{'type':'text','name':'deepseek-ai/DeepSeek-V2.5-1210','rank':'op','max_tokens':16384,'schema':{'bos':'<|im_start|>','eos':'<|im_end|>'}},
{'type':'text','name':'Qwen/Qwen2.5-Coder-32B-Instruct','rank':'op','max_tokens':32768,'schema':{'bos':'<|im_start|>','eos':'<|im_end|>'}},
{'type':'text','name':'meta-llama/Meta-Llama-3-8B','rank':'op','max_tokens':32768,'schema':{'bos':'<|im_start|>','eos':'<|im_end|>'}},
{'type':'text','name':'Snowflake/snowflake-arctic-embed-l-v2.0','rank':'op','max_tokens':4096,'schema':{'bos':'<|im_start|>','eos':'<|im_end|>'}},
{'type':'text','name':'Snowflake/snowflake-arctic-embed-m-v2.0','rank':'op','max_tokens':4096,'schema':{'bos':'<|im_start|>','eos':'<|im_end|>'}},
{'type':'text','name':'HuggingFaceTB/SmolLM2-1.7B-Instruct','rank':'op','max_tokens':4096,'schema':{'bos':'<|im_start|>','eos':'<|im_end|>'}},
{'type':'text','name':'Qwen/QwQ-32B-Preview','rank':'op','max_tokens':16384,'schema':{'bos':'<|im_start|>','eos':'<|im_end|>'}},
{'type':'text','name':'meta-llama/Llama-3.3-70B-Instruct','rank':'pro','max_tokens':16384,'schema':{'bos':'<|im_start|>','eos':'<|im_end|>'}},
{'type':'text','name':'mistralai/Mixtral-8x7B-Instruct-v0.1','rank':'op','max_tokens':40000,'schema':{'bos':'<s>','eos':'</s>'}},
]
def file_template(inp):
if "readme.md" in inp.lower():
template=prompts.README
else:template="NONE"
return template
def format_prompt(message, mod, system):
eos=f"{clients[int(mod)]['schema']['eos']}\n"
bos=f"{clients[int(mod)]['schema']['bos']}\n"
prompt=""
prompt+=bos
prompt+=system
prompt+=eos
prompt+=bos
prompt += f"[INST] {message} [/INST]"
prompt+=eos
prompt+=bos
return prompt
def generate(prompt,history,mod=2,tok=4000,seed=1,role="ASSISTANT",data=None):
#print("#####",history,"######")
gen_images=False
client=InferenceClient(clients[int(mod)]['name'])
client_tok=clients[int(mod)]['max_tokens']
good_seed=[947385642222,7482965345792,8584806344673]
if not os.path.isdir(loc_folder):os.mkdir(loc_folder)
if os.path.isfile(f'{loc_folder}/{loc_file}.json'):
with open(f'{loc_folder}/{loc_file}.json','r') as word_dict:
lod=json.loads(word_dict.read())
word_dict.close()
else:
lod=[]
if role == "MANAGER":
system_prompt = prompts.MANAGER.replace("**TIMELINE**",data[4]).replace("**HISTORY**",str(history))
formatted_prompt = format_prompt(prompt, mod, system_prompt)
elif role == "PATHMAKER":
system_prompt = prompts.PATH_MAKER.replace("**FILE_LIST**",str(data[3])).replace("**CURRENT_OR_NONE**",str(data[4])).replace("**PROMPT**",json.dumps(data[0],indent=4)).replace("**HISTORY**",str(history))
formatted_prompt = format_prompt(prompt, mod, system_prompt)
elif role == "CREATE_FILE":
system_prompt = prompts.CREATE_FILE.replace("**FILE_LIST**",str(data[3])).replace("**TIMELINE**",data[4]).replace("**FILENAME**",str(data[1])).replace("**TEMPLATE_OR_NONE**",str(data[2]))
formatted_prompt = format_prompt(prompt, mod, system_prompt)
elif role == "SEARCH":
system_prompt = prompts.SEARCH.replace("**DATA**",data)
formatted_prompt = format_prompt(f'USER:{prompt}', mod, system_prompt)
else: system_prompt = "";formatted_prompt = format_prompt(f'USER:{prompt}', mod, system_prompt)
if tok==None:tok=client_tok-len(formatted_prompt)+10
print("tok",tok)
generate_kwargs = dict(
temperature=0.9,
max_new_tokens=tok, #total tokens - input tokens
top_p=0.99,
repetition_penalty=1.0,
do_sample=True,
seed=seed,
)
output = ""
if role=="MANAGER":
print("Running Manager")
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=True)
for response in stream:
output += response.token.text
yield output
yield history
yield prompt
elif role=="PATHMAKER":
print("Runnning ", role)
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=True)
#prompt=f"We just completed role:{role}, now choose the next tool to complete the task:{prompt}, or COMPLETE"
for response in stream:
output += response.token.text
print(output)
yield output
yield history
yield prompt
elif role=="CREATE_FILE":
print("Running Create File")
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=True)
for response in stream:
output += response.token.text
#print(file_content)
print(output)
yield 'test1'
yield data[1]
yield output
def parse_json(inp):
print("PARSE INPUT")
print(inp)
if type(inp)==type(""):
lines=""
if "```" in inp:
start = inp.find("```json") + 7
end = inp.find("```", start)
if start >= 0 and end >= 0:
inp= inp[start:end]
else:
inp="NONE"
print("Extracted Lines")
print(inp)
try:
out_json=eval(inp)
out1=str(out_json['filename'])
out2=str(out_json['filecontent'])
return out1,out2
except Exception as e:
print(e)
return "None","None"
if type(inp)==type({}):
out1=str(inp['filename'])
out2=str(inp['filecontent'])
return out1,out2
def build_space(repo_name,file_name,file_content,access_token=""):
try:
repo_path=user_+str(repo_)
if not access_token:
#access_token=os.environ['HF_TOKEN']
return [{'role':'assistant','content': 'ENTER A HUGGINGFACE TOKEN'}]
api=HfApi(endpoint="https://huggingface.co", token=access_token)
repo_url = api.create_repo(
repo_id=repo_path,
repo_type="space",
space_sdk="gradio",
exist_ok=True,
private=False,
)
local_file_path=str(uuid.uuid4())
with open(local_file_path, 'w') as f:
f.write(str(file_content))
f.close()
# Upload a local file to the Space
commit_message = "Adding file test: "+ str(file_name)
api.upload_file(path_or_fileobj=local_file_path, path_in_repo=file_name, repo_id=repo_path, repo_type='space', commit_message=commit_message)
print("File uploaded successfully.")
# Commit changes
commit_message += "\nInitial commit to the repository."+ f'{repo_path}/' + f'{file_name}'
#api.commit_repo(space_id, message=commit_message)
return [{'role':'assistant','content': commit_message+'\nCommit Success' }]
except Exception as e:
print("ERROR ",e)
return [{'role':'assistant','content': 'There was an Error: ' + str(e)}]
def agent(prompt_in,history,mod=2,tok_in=""):
print(prompt_in)
print('mod ',mod)
in_data=[None,None,None,None,None,]
#in_data[0]=prompt_in['text']
in_data[0]=prompt_in
prompt=prompt_in
fn=""
com=""
go=True
MAX_DATA=int(clients[int(mod)]['max_tokens'])*2
if not history:history=[{'role':'user','content':prompt_in['text']}]
while go == True:
try:
file_list = fs.ls(f'spaces/{user_}{repo_}',detail=False)
except Exception as e:
print(e)
file_list=["NO FILES YET"]
print('file list\n',file_list)
seed = random.randint(1,9999999999999)
c=0
#history = [history[-4:]]
if len(str(history)) > MAX_DATA*4:
history = [history[-2:]]
print('history',history)
role="PATHMAKER"
in_data[3]=file_list
outph= list(generate(prompt,history,mod,2400,seed,role,in_data))[0]
in_data[4]=outph
print(outph)
history.extend([{'role':'assistant','content':str(outph)}])
yield history
role="MANAGER"
outp=generate(prompt,history,mod,128,seed,role,in_data)
outp0=list(outp)[0].split('<|im_end|>')[0]
#outp0 = re.sub('[^a-zA-Z0-9\s.,?!%()]', '', outpp)
history.extend([{'role':'assistant','content':str(outp0)}])
yield history
for line in outp0.split("\n"):
if "action:" in line:
try:
com_line = line.split('action:')[1]
fn = com_line.split('action_input=')[0]
com = com_line.split('action_input=')[1].split('<|im_end|>')[0]
#com = com_line.split('action_input=')[1].replace('<|im_end|>','').replace("}","").replace("]","").replace("'","")
print(com)
except Exception as e:
pass
fn="NONE"
if 'CREATE_FILE' in fn:
print('CREATE_FILE called')
in_data[1]=com
temp1=file_template(com)
in_data[2]=temp1
in_data[3]=file_list
out_o =generate(prompt,history,mod=mod,tok=10000,seed=seed,role="CREATE_FILE",data=in_data)
out_w=list(out_o)
ret1,ret2 = parse_json(out_w[2].split('<|im_end|>')[0])
print('ret1',ret1)
print('ret2',ret2)
build_out = build_space(repo_,ret1,ret2,access_token=tok_in)
if build_out[0]["content"]=="ENTER A HUGGINGFACE TOKEN":
yield [{'role':'assistant','content':"ENTER A HUGGINGFACE TOKEN"}]
go=False
break
history+=[{'role':'system','content':f'observation:{build_out}'}]
yield history
elif 'READ_FILE' in fn:
try:
file_read = fs.read_text(f'spaces/{user_}{repo_}/{com}',detail=False)
except Exception as e:
print(e)
file_read="FILE HAS NO CONTENT"
print('file list\n',file_read)
history+=[{'role':'system','content':f'RETURNED FILE CONTENT: NAME: spaces/{user_}{repo_}/{com} CONTENT:{build_out}'}]
yield history
elif 'IMAGE' in fn:
print('IMAGE called')
#out_im=gen_im(prompt,seed)
#yield [{'role':'assistant','content': out_im}]
elif 'SEARCH' in fn:
print('SEARCH called')
elif 'COMPLETE' in fn:
print('COMPLETE')
go=False
break
elif 'NONE' in fn:
print('ERROR ACTION NOT FOUND')
history+=[{'role':'system','content':f'observation:The last thing we attempted resulted in an error, check formatting on the tool call'}]
else:pass;seed = random.randint(1,9999999999999)
with gr.Blocks() as ux:
with gr.Row():
with gr.Column():
gr.HTML("""<center><div style='font-size:xx-large;font-weight:900;'>Chatbo</div><br>
<div style='font-size:large;font-weight:700;'>This will make changes to your Huggingface File System</div><br>
<div style='font-size:large;font-weight:900;'>Use at your own risk!</div><br>
""")
chatbot=gr.Chatbot(type='messages',show_label=False, show_share_button=False, show_copy_button=True, layout="panel")
prompt=gr.MultimodalTextbox(label="Prompt",file_count="multiple", file_types=["image"])
mod_c=gr.Dropdown(choices=[n['name'] for n in clients],value='Qwen/Qwen2.5-Coder-32B-Instruct',type='index')
tok_in=gr.Textbox(label='HF TOKEN')
#chat_ux=gr.ChatInterface(fn=agent,chatbot=chatbot,additional_inputs=[mod_c]).load()
#chat_ux.additional_inputs=[mod_c]
#chat_ux.load()
with gr.Row():
submit_b = gr.Button()
stop_b = gr.Button("Stop")
clear = gr.ClearButton([chatbot,prompt])
with gr.Row(visible=False):
stt=gr.Textbox()
with gr.Column():
gr.HTML()
#html_view=gr.HTML("""<iframe src='https://huggingface.co/spaces/community-pool/test1/tree/main' height='1000' width='200'>Viewer Space</iframe>""")
sub_b = submit_b.click(agent, [prompt,chatbot,mod_c,tok_in],chatbot)
sub_p = prompt.submit(agent, [prompt,chatbot,mod_c,tok_in],chatbot)
stop_b.click(None,None,None, cancels=[sub_b,sub_p])
ux.queue(default_concurrency_limit=20).launch(max_threads=40)