|
import gradio as gr |
|
import ssl |
|
from openai import OpenAI |
|
import time |
|
import os |
|
import shutil |
|
from datetime import datetime |
|
import Arcana |
|
from nylon import * |
|
import pandas as pd |
|
import json |
|
import fiber |
|
import cite_source |
|
|
|
foldername = 'Celsiaaa' |
|
dbmsmode = 'Fiber' |
|
|
|
try: |
|
with open('settings.arcana',mode='r') as file: |
|
foldername,dbmsmode = file.read().split('\n') |
|
except Exception as e: |
|
print(e) |
|
with open('settings.arcana',mode='w') as file: |
|
newsettings = foldername+'\n'+dbmsmode |
|
file.write(newsettings) |
|
|
|
|
|
try: |
|
_create_unverified_https_context = ssl._create_unverified_context |
|
except AttributeError: |
|
pass |
|
else: |
|
ssl._create_default_https_context = _create_unverified_https_context |
|
|
|
def query_database2(query): |
|
print(dbmsmode) |
|
if dbmsmode == 'Nylon': |
|
db = ChatDatabase(foldername+'.txt') |
|
|
|
sender = 'Arcana' |
|
N = 10 |
|
cache = {} |
|
query_tag = None |
|
|
|
relevant_messages = db.get_relevant_messages(sender, query, N, cache, query_tag) |
|
|
|
print("Relevant messages:") |
|
for message in relevant_messages: |
|
print(f"Sender: {message[0]}, Time: {message[1]}, Tag: {message[3]}") |
|
print(f"Message: {message[2][:100]}...") |
|
print() |
|
|
|
df_data = [str(message) for message in relevant_messages] |
|
return ';'.join(df_data) |
|
elif dbmsmode == 'Fiber': |
|
dbms = fiber.FiberDBMS() |
|
|
|
dbms.load_or_create(foldername+'.txt') |
|
results = dbms.query(query, 3) |
|
|
|
|
|
result_strings = [] |
|
for result in results: |
|
result_str = f"Name: {result['name']}\nContent: {result['content']}\nTags: {result['tags']}\nIndex: {result['index']}" |
|
result_strings.append(result_str) |
|
|
|
|
|
return ';'.join(result_strings) |
|
|
|
def cite(style=None,author=None,title=None,publisher=None,year=None,url=None,date_accessed=None): |
|
return cite_source.generate_citation(style=style,author=author,title=title,publisher=publisher,year=year,url=url,access_date=date_accessed) |
|
|
|
|
|
def list_files_indb(directory=foldername): |
|
""" |
|
List all files in the given directory, separated by semicolons. |
|
|
|
:param directory: The directory to list files from. Defaults to the current directory. |
|
:return: A string of filenames separated by semicolons. |
|
""" |
|
try: |
|
|
|
files = [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))] |
|
|
|
|
|
return ';'.join(files) |
|
except Exception as e: |
|
return f"An error occurred: {str(e)}" |
|
|
|
search_mode = 0 |
|
|
|
|
|
|
|
client = OpenAI( |
|
base_url='https://api.openai-proxy.org/v1', |
|
api_key='sk-JPHnp3AyBA0TOxXuhJ01sbhuq8dRlW6YHQjbGpla279c4dAn', |
|
) |
|
|
|
|
|
function_list = [ |
|
{ |
|
"name": "search_database", |
|
"description": "Query the database and return a list of results as strings", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"query": { |
|
"type": "string", |
|
"description": "The query to execute against the database" |
|
}, |
|
}, |
|
"required": ["query"] |
|
} |
|
}, |
|
|
|
{ |
|
"name": "list_database_files", |
|
"description": "Check what files are present in the database", |
|
"parameters":{ |
|
"type":"object", |
|
"properties":{ |
|
"query":{ |
|
"type":"string", |
|
"description":"Gives a list of semicolon seperated file names in the database" |
|
}, |
|
}, |
|
} |
|
} |
|
] |
|
|
|
|
|
function_map = { |
|
"search_database": query_database2, |
|
"list_database_files":list_files_indb |
|
} |
|
|
|
def execute_function(function_name, function_args): |
|
if function_name in function_map: |
|
return function_map[function_name](**function_args) |
|
else: |
|
return f"Error: Function {function_name} not found" |
|
|
|
mapsearchmode = ['always', 'auto', 'none'] |
|
|
|
def openai_api_call(messages, retries=3, delay=5): |
|
global search_mode |
|
|
|
for attempt in range(retries): |
|
try: |
|
|
|
if search_mode == 0: |
|
messages[-1]['content'] = "[System: SEARCH when the user ASKED A QUESTION & remember to CITE(the source is the first tag). Otherwise do not search. The User's question:"];" + messages[-1]['content'] |
|
|
|
completion = client.chat.completions.create( |
|
model="gpt-4o", |
|
messages=messages, |
|
functions=function_list, |
|
function_call='auto', |
|
timeout=10 |
|
) |
|
response_message = completion.choices[0].message |
|
|
|
# Check if the model wants to call a function |
|
if response_message.function_call: |
|
function_name = response_message.function_call.name |
|
function_args = json.loads(response_message.function_call.arguments) |
|
function_response = execute_function(function_name, function_args) |
|
# Add the function response to the conversation |
|
messages.append(response_message.model_dump()) # The model's request to call the function |
|
messages.append({ |
|
"role": "function", |
|
"name": function_name, |
|
"content": json.dumps(function_response) |
|
}) |
|
# Make a follow-up call to the model with the function response |
|
return openai_api_call(messages) |
|
else: |
|
return response_message.content |
|
|
|
except Exception as e: |
|
print(f"Attempt {attempt + 1} failed: {e}") |
|
if attempt < retries - 1: |
|
time.sleep(delay) |
|
else: |
|
return "Sorry, I am having trouble connecting to the server. Please try again later." |
|
|
|
return "Failed to get a response after multiple attempts." |
|
|
|
|
|
def handle_search_mode(mode): |
|
print(mode) |
|
global search_mode |
|
if mode == "Always": |
|
search_mode = 0 |
|
return "You are in Mode 1" |
|
elif mode == "Automatic": |
|
search_mode = 1 |
|
return "You are in Mode 2" |
|
else: |
|
search_mode = 0 |
|
return "Select a mode" |
|
|
|
def handle_dbms_mode(mode): |
|
print(mode) |
|
global dbmsmode |
|
with open('settings.arcana',mode='w') as file: |
|
newsettings = foldername+'\n'+mode |
|
file.write(newsettings) |
|
|
|
if mode == "Nylon": |
|
dbmsmode = "Nylon" |
|
return "You are in Mode 1" |
|
elif mode == "Fiber": |
|
dbmsmode = "Fiber" |
|
return "You are in Mode 2" |
|
else: |
|
search_mode = 0 |
|
return "Select a mode" |
|
|
|
# Chatbot response function |
|
def chatbot_response(message, history): |
|
messages = [{"role": "system", "content": '''Your name is Arcana. You are a chatbot made by Indexademics. Your goal is to solve questions. You can query the database at anytime when the user asks a question specific to their tasks and you think is unlikely to be searchable online. |
|
If they ask anything about academics, search your database first, then use your own knowledge. Cite the file name when you use the dbms.'''}] |
|
|
|
for human, assistant in history: |
|
messages.append({"role": "user", "content": human}) |
|
messages.append({"role": "assistant", "content": assistant}) |
|
messages.append({"role": "user", "content": message}) |
|
|
|
response = openai_api_call(messages) |
|
|
|
return response |
|
|
|
selected = None |
|
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
# Function to handle the file upload |
|
def handle_file_upload(file): |
|
# Ensure the cache2 directory exists |
|
cache_dir = foldername |
|
os.makedirs(cache_dir, exist_ok=True) |
|
|
|
# Get the uploaded file path |
|
file_path = file.name |
|
|
|
# Define the new path for the uploaded file |
|
new_file_path = os.path.join(cache_dir, os.path.basename(file_path)) |
|
|
|
# Move the file to the cache2 directory |
|
shutil.move(file_path, new_file_path) |
|
|
|
# Get the file size |
|
file_size = os.path.getsize(new_file_path) |
|
|
|
return f"File saved to {new_file_path} with size: {file_size} bytes" |
|
|
|
# Wrapper function to run the file upload in a thread |
|
def handle_file_upload_threaded(file): |
|
with ThreadPoolExecutor() as executor: |
|
future = executor.submit(handle_file_upload, file) |
|
return future.result() |
|
|
|
def list_uploaded_files(): |
|
global foldername |
|
if not os.path.exists(foldername): |
|
return [] |
|
files = os.listdir(foldername) |
|
return [[file] for file in files] |
|
|
|
def on_select(evt: gr.SelectData): |
|
global selected |
|
selected_value = evt.value |
|
selected_index = evt.index |
|
selected = selected_value |
|
print(f"Selected value: {selected_value} at index: {selected_index}") |
|
|
|
file_path = os.path.join(foldername,selected_value) if selected_value else None |
|
status_message = f"Selected: {selected_value}" if selected_value else "No file selected" |
|
|
|
file_size = get_file_size(file_path) if file_path else "" |
|
file_creation_time = get_file_creation_time(file_path) if file_path else "" |
|
|
|
return file_path, status_message, file_size, file_creation_time |
|
|
|
def get_file_size(file_path): |
|
if file_path and os.path.exists(file_path): |
|
size_bytes = os.path.getsize(file_path) |
|
if size_bytes < 1024: |
|
return f"{size_bytes} bytes" |
|
elif size_bytes < 1024 * 1024: |
|
return f"{size_bytes / 1024:.2f} KB" |
|
else: |
|
return f"{size_bytes / (1024 * 1024):.2f} MB" |
|
return "" |
|
|
|
def get_file_creation_time(file_path): |
|
if file_path and os.path.exists(file_path): |
|
creation_time = os.path.getctime(file_path) |
|
return datetime.fromtimestamp(creation_time).strftime("%Y-%m-%d %H:%M:%S") |
|
return "" |
|
|
|
def delete_file(): |
|
global selected,foldername |
|
if selected: |
|
file_path = os.path.join(foldername, selected) |
|
if os.path.exists(file_path): |
|
os.remove(file_path) |
|
return list_uploaded_files(), None, f"File {selected} deleted successfully", "", "" |
|
else: |
|
return list_uploaded_files(), None, f"File {selected} not found", "", "" |
|
else: |
|
return list_uploaded_files(), None, "No file selected for deletion", "", "" |
|
|
|
def refresh_files(): |
|
return list_uploaded_files() |
|
|
|
def display_file(evt: gr.SelectData, df): |
|
file_path = os.path.join(foldername, evt.value) |
|
return file_path, file_path if file_path.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')) else None, f"Displaying: {evt.value}" |
|
|
|
def render_to_database(): |
|
# This function is undefined as per your request |
|
Arcana.main(foldername) |
|
|
|
def change_theme(theme): |
|
gr.Interface.theme = theme |
|
|
|
def rename_file(new_name): |
|
global selected |
|
if selected and new_name: |
|
old_path = os.path.join(foldername, selected) |
|
new_path = os.path.join(foldername, new_name+'.'+selected.split('.')[-1]) |
|
if os.path.exists(old_path): |
|
os.rename(old_path, new_path) |
|
selected = new_name |
|
return list_uploaded_files(), f"File renamed to {new_name}", new_path, get_file_size(new_path), get_file_creation_time(new_path) |
|
else: |
|
return list_uploaded_files(), f"File {selected} not found", None, "", "" |
|
return list_uploaded_files(), "No file selected or new name not provided", None, "", "" |
|
|
|
def query_database(query): |
|
# Usage example |
|
db = ChatDatabase(foldername+'.txt') |
|
|
|
# Example 1: Get relevant messages |
|
sender = 'Arcana' |
|
N = 10 |
|
cache = {} |
|
query_tag = None |
|
|
|
relevant_messages = db.get_relevant_messages(sender, query, N, cache, query_tag) |
|
|
|
print("Relevant messages:") |
|
for message in relevant_messages: |
|
print(f"Sender: {message[0]}, Time: {message[1]}, Tag: {message[3]}") |
|
print(f"Message: {message[2][:100]}...") |
|
print() |
|
|
|
df_data = [{"Nylon Returned Query": str(message)} for message in relevant_messages] |
|
|
|
# Create a pandas DataFrame |
|
df = pd.DataFrame(df_data) |
|
|
|
return df |
|
|
|
def query_database_fiber(query): |
|
dbms = fiber.FiberDBMS() |
|
# Load or create the database |
|
dbms.load_or_create(foldername+'.txt') |
|
results = dbms.query(query, 10) |
|
|
|
# Convert the results to a pandas DataFrame |
|
df = pd.DataFrame(results) |
|
|
|
# Reorder columns if needed |
|
columns_order = ['name', 'content', 'tags', 'index'] |
|
df = df[columns_order] |
|
|
|
return df |
|
|
|
def setdbname(name): |
|
global foldername |
|
foldername = name |
|
with open('settings.arcana',mode='w') as file: |
|
newsettings = foldername+'\n'+dbmsmode |
|
file.write(newsettings) |
|
|
|
example_database = [ |
|
"What is Hydrogen Bonding?", |
|
"Tell me the difference between impulse and force.", |
|
"Tell me a joke that Calculus students will understand.", |
|
"How should I review for the AP Biology Exam?", |
|
"What kind of resources are available in PA and Indexademics?", |
|
"What is the StandardCAS™ group?", |
|
"Explain the concept of quantum entanglement.", |
|
"What are the main differences between mitosis and meiosis?", |
|
"How does the Doppler effect work?", |
|
"Explain the process of photosynthesis.", |
|
"What is the significance of the Pythagorean theorem?", |
|
"How does natural selection contribute to evolution?", |
|
"What is the most important chapter in AP Statistics?", |
|
"How should I prepare on the IB Chinese Exam?" |
|
] |
|
|
|
import random |
|
|
|
def get_random_examples(num_examples=5): |
|
return random.sample(example_database, min(num_examples, len(example_database))) |
|
|
|
# Create the Gradio interface for the chatbot |
|
chatbot_interface = gr.ChatInterface( |
|
chatbot_response, |
|
chatbot=gr.Chatbot(height=400), |
|
textbox=gr.Textbox(placeholder="Type your message here...", container=False, scale=100), |
|
title="Indexademics ChatBot", |
|
description="Arcana v1", |
|
theme="default", |
|
examples=get_random_examples(), |
|
cache_examples=False, |
|
retry_btn=gr.Button('Retry'), |
|
undo_btn="Delete Previous", |
|
clear_btn="Clear", |
|
) |
|
|
|
def chatbot_response(message): |
|
# Your chatbot response logic here |
|
return f"Response to: {message}" |
|
|
|
def relaunch(): |
|
global demo |
|
demo.close() |
|
demo.launch(share=True) |
|
|
|
# Combine the interfaces using Tabs |
|
with gr.Blocks(js=""" |
|
async () => { |
|
const originalFetch = window.fetch; |
|
window.fetch = (url, options) => { |
|
if (options && options.signal) { |
|
const controller = new AbortController(); |
|
options.signal = controller.signal; |
|
setTimeout(() => controller.abort(), 3600000); // 300000 ms = 5 minutes |
|
} |
|
return originalFetch(url, options); |
|
}; |
|
} |
|
""") as demo: |
|
gr.Markdown("# ArcanaV1") |
|
with gr.Tabs(): |
|
with gr.TabItem("Welcome Page"): |
|
with open('introduction.txt',mode='r') as file: |
|
intro_content = file.read() |
|
gr.Markdown(intro_content) |
|
|
|
with gr.TabItem("Chatbot"): |
|
chatbot_interface.render() |
|
|
|
# File uploading interface |
|
with gr.TabItem('Upload'): |
|
gr.Markdown('# Upload and View Files') |
|
|
|
with gr.Row(): |
|
|
|
# Left column: File list and buttons |
|
with gr.Column(scale=1): |
|
gr.Markdown("## Upload File") |
|
file_input = gr.File(label="Upload your file here", file_types=["pdf", "jpeg", "jpg", "gif", "docx", "pptx"]) |
|
file_input.change(handle_file_upload_threaded, inputs=file_input) |
|
|
|
uploaded_files_list = gr.DataFrame(headers=["Uploaded Files"], datatype="str", interactive=False) |
|
|
|
with gr.Row(): |
|
refresh_button = gr.Button('Refresh') |
|
delete_button = gr.Button('Delete Selected File') |
|
|
|
|
|
# Right column: File viewer and Image viewer |
|
with gr.Column(scale=1): |
|
with gr.Tab("File Viewer"): |
|
file_viewer = gr.File(label="File Restore") |
|
file_status = gr.Textbox(label="File Status", interactive=False) |
|
file_size = gr.Textbox(label="File Size", interactive=False) |
|
file_creation_time = gr.Textbox(label="File Creation Time", interactive=False) |
|
|
|
with gr.Row(): |
|
new_file_name = gr.Textbox(label="New File Name", placeholder="Enter new file name") |
|
rename_button = gr.Button("Rename File") |
|
|
|
|
|
with gr.Tab("Image Viewer"): |
|
image_viewer = gr.Image(label="Image Viewer", type="filepath") |
|
|
|
|
|
|
|
# Event handlers |
|
refresh_button.click(fn=refresh_files, outputs=uploaded_files_list) |
|
delete_button.click(fn=delete_file, outputs=[uploaded_files_list, file_viewer, file_status, file_size, file_creation_time]) |
|
uploaded_files_list.select(fn=display_file, inputs=uploaded_files_list, outputs=[file_viewer, image_viewer, file_status]) |
|
uploaded_files_list.select(fn=on_select, outputs=[file_viewer, file_status, file_size, file_creation_time]) |
|
rename_button.click(fn=rename_file, |
|
inputs=new_file_name, |
|
outputs=[uploaded_files_list, file_status, file_viewer, file_size, file_creation_time]) |
|
|
|
render_button = gr.Button("Render All Files to Database") |
|
render_button.click(fn=render_to_database) |
|
|
|
with gr.TabItem('Settings'): |
|
with gr.TabItem('Database'): |
|
gr.Markdown('Settings') |
|
|
|
test_nylon = gr.Textbox(label='Test Nylon', placeholder='Query') |
|
uploaded_files_list2 = gr.DataFrame(headers=["Nylon Returned Query"], datatype="str", interactive=False) |
|
query_button2 = gr.Button('Query') |
|
query_button2.click(fn=query_database, inputs=test_nylon, outputs=uploaded_files_list2) |
|
|
|
test_fiber = gr.Textbox(label='Test Fiber', placeholder='Query') |
|
uploaded_files_list3 = gr.DataFrame(headers=["Fiber Returned Query"], datatype="str", interactive=False) |
|
query_button3 = gr.Button('Query') |
|
query_button3.click(fn=query_database_fiber, inputs=test_fiber, outputs=uploaded_files_list3) |
|
|
|
gr.Markdown('Nylon 2.1 will be deprecated in text-text selections, as it is built for image-text selections.\nDefault model is Fiber.') |
|
dbmsmode_selector = gr.Radio(["Nylon", "Fiber"], label="Select Model") |
|
dbmsmode_selector.change(handle_dbms_mode, dbmsmode_selector) |
|
|
|
database_name = gr.Textbox(label='Database Name', placeholder='cache') |
|
set_dbname = gr.Button('Set Database Name') |
|
set_dbname.click(fn=setdbname, inputs=database_name) |
|
|
|
with gr.TabItem('Theme'): |
|
gr.Markdown('Change Theme') |
|
|
|
theme_dropdown = gr.Dropdown(choices=['default', 'compact', 'huggingface', 'soft', 'dark'], label='Choose Theme') |
|
theme_button = gr.Button('Apply Theme') |
|
|
|
theme_button.click(fn=change_theme, inputs=theme_dropdown) |
|
relaunch_button = gr.Button('Relaunch') |
|
relaunch_button.click(fn=relaunch) |
|
with gr.TabItem('Search'): |
|
gr.Markdown('Set Search Modes') |
|
|
|
searchmode_selector = gr.Radio(["Always", "Automatic"], label="Select Mode") |
|
output = gr.Textbox(label="Output") |
|
searchmode_selector.change(handle_search_mode, searchmode_selector, output) |
|
|
|
|
|
# Launch the interface |
|
demo.launch(share=True) |
|
|
|
|
|
|
|
|