|
import gradio as gr |
|
import ssl |
|
from openai import OpenAI |
|
import time |
|
import os |
|
import shutil |
|
from datetime import datetime |
|
import Arcana |
|
from nylon import * |
|
import pandas as pd |
|
import json |
|
|
|
|
|
try: |
|
_create_unverified_https_context = ssl._create_unverified_context |
|
except AttributeError: |
|
pass |
|
else: |
|
ssl._create_default_https_context = _create_unverified_https_context |
|
|
|
def query_database2(query): |
|
db = ChatDatabase('memory.txt') |
|
|
|
sender = 'Arcana' |
|
N = 10 |
|
cache = {} |
|
query_tag = None |
|
|
|
relevant_messages = db.get_relevant_messages(sender, query, N, cache, query_tag) |
|
|
|
print("Relevant messages:") |
|
for message in relevant_messages: |
|
print(f"Sender: {message[0]}, Time: {message[1]}, Tag: {message[3]}") |
|
print(f"Message: {message[2][:100]}...") |
|
print() |
|
|
|
df_data = [str(message) for message in relevant_messages] |
|
return ';'.join(df_data) |
|
|
|
|
|
client = OpenAI( |
|
base_url='https://api.openai-proxy.org/v1', |
|
api_key='sk-Nxf8HmLpfIMhCd83n3TOr00TR57uBZ0jMbAgGCOzppXvlsx1', |
|
) |
|
|
|
|
|
function_list = [ |
|
{ |
|
"name": "query_database", |
|
"description": "Query the database and return a list of results as strings", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"query": { |
|
"type": "string", |
|
"description": "The query to execute against the database" |
|
}, |
|
}, |
|
"required": ["query"] |
|
} |
|
} |
|
] |
|
|
|
|
|
function_map = { |
|
"query_database": query_database2 |
|
} |
|
|
|
def execute_function(function_name, function_args): |
|
if function_name in function_map: |
|
return function_map[function_name](**function_args) |
|
else: |
|
return f"Error: Function {function_name} not found" |
|
|
|
|
|
def openai_api_call(messages, retries=3, delay=5): |
|
for attempt in range(retries): |
|
try: |
|
completion = client.chat.completions.create( |
|
model="gpt-3.5-turbo", |
|
messages=messages, |
|
functions=function_list, |
|
function_call='auto', |
|
timeout=10 |
|
) |
|
response_message = completion.choices[0].message |
|
|
|
|
|
if response_message.function_call: |
|
function_name = response_message.function_call.name |
|
function_args = json.loads(response_message.function_call.arguments) |
|
function_response = execute_function(function_name, function_args) |
|
|
|
messages.append(response_message.model_dump()) |
|
messages.append({ |
|
"role": "function", |
|
"name": function_name, |
|
"content": json.dumps(function_response) |
|
}) |
|
|
|
return openai_api_call(messages) |
|
else: |
|
return response_message.content |
|
|
|
except Exception as e: |
|
print(f"Attempt {attempt + 1} failed: {e}") |
|
if attempt < retries - 1: |
|
time.sleep(delay) |
|
else: |
|
return "Sorry, I am having trouble connecting to the server. Please try again later." |
|
|
|
|
|
def chatbot_response(message, history): |
|
messages = [{"role": "system", "content": '''You are Arcana, a dynamic study resource database designed to help students excel in their exams. Your responses should be accurate, informative, and evidence-based whenever possible. Follow these guidelines: |
|
Your primary goal is to provide students with the most helpful and accurate study information, utilizing both your internal knowledge and the PDF resources at your disposal.'''}] |
|
|
|
for human, assistant in history: |
|
messages.append({"role": "user", "content": human}) |
|
messages.append({"role": "assistant", "content": assistant}) |
|
messages.append({"role": "user", "content": message}) |
|
|
|
response = openai_api_call(messages) |
|
|
|
return response |
|
|
|
selected = None |
|
|
|
def upload_file(file): |
|
foldername = 'cache' |
|
if not os.path.exists(foldername): |
|
os.mkdir(foldername) |
|
file_path = os.path.join(foldername, os.path.basename(file.name)) |
|
shutil.copy(file.name, file_path) |
|
return list_uploaded_files() |
|
|
|
def list_uploaded_files(): |
|
foldername = 'cache' |
|
if not os.path.exists(foldername): |
|
return [] |
|
files = os.listdir(foldername) |
|
return [[file] for file in files] |
|
|
|
def on_select(evt: gr.SelectData): |
|
global selected |
|
selected_value = evt.value |
|
selected_index = evt.index |
|
selected = selected_value |
|
print(f"Selected value: {selected_value} at index: {selected_index}") |
|
|
|
file_path = os.path.join("cache", selected_value) if selected_value else None |
|
status_message = f"Selected: {selected_value}" if selected_value else "No file selected" |
|
|
|
file_size = get_file_size(file_path) if file_path else "" |
|
file_creation_time = get_file_creation_time(file_path) if file_path else "" |
|
|
|
return file_path, status_message, file_size, file_creation_time |
|
|
|
def get_file_size(file_path): |
|
if file_path and os.path.exists(file_path): |
|
size_bytes = os.path.getsize(file_path) |
|
if size_bytes < 1024: |
|
return f"{size_bytes} bytes" |
|
elif size_bytes < 1024 * 1024: |
|
return f"{size_bytes / 1024:.2f} KB" |
|
else: |
|
return f"{size_bytes / (1024 * 1024):.2f} MB" |
|
return "" |
|
|
|
def get_file_creation_time(file_path): |
|
if file_path and os.path.exists(file_path): |
|
creation_time = os.path.getctime(file_path) |
|
return datetime.fromtimestamp(creation_time).strftime("%Y-%m-%d %H:%M:%S") |
|
return "" |
|
|
|
def delete_file(): |
|
global selected |
|
if selected: |
|
foldername = 'cache' |
|
file_path = os.path.join(foldername, selected) |
|
if os.path.exists(file_path): |
|
os.remove(file_path) |
|
return list_uploaded_files(), None, f"File {selected} deleted successfully", "", "" |
|
else: |
|
return list_uploaded_files(), None, f"File {selected} not found", "", "" |
|
else: |
|
return list_uploaded_files(), None, "No file selected for deletion", "", "" |
|
|
|
def refresh_files(): |
|
return list_uploaded_files() |
|
|
|
def display_file(evt: gr.SelectData, df): |
|
file_path = os.path.join("cache", evt.value) |
|
return file_path, file_path if file_path.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')) else None, f"Displaying: {evt.value}" |
|
|
|
def render_to_database(): |
|
|
|
Arcana.main() |
|
|
|
def change_theme(theme): |
|
gr.Interface.theme = theme |
|
|
|
def rename_file(new_name): |
|
global selected |
|
if selected and new_name: |
|
old_path = os.path.join('cache', selected) |
|
new_path = os.path.join('cache', new_name+'.'+selected.split('.')[-1]) |
|
if os.path.exists(old_path): |
|
os.rename(old_path, new_path) |
|
selected = new_name |
|
return list_uploaded_files(), f"File renamed to {new_name}", new_path, get_file_size(new_path), get_file_creation_time(new_path) |
|
else: |
|
return list_uploaded_files(), f"File {selected} not found", None, "", "" |
|
return list_uploaded_files(), "No file selected or new name not provided", None, "", "" |
|
|
|
def query_database(query): |
|
|
|
db = ChatDatabase('memory.txt') |
|
|
|
|
|
sender = 'Arcana' |
|
N = 10 |
|
cache = {} |
|
query_tag = None |
|
|
|
relevant_messages = db.get_relevant_messages(sender, query, N, cache, query_tag) |
|
|
|
print("Relevant messages:") |
|
for message in relevant_messages: |
|
print(f"Sender: {message[0]}, Time: {message[1]}, Tag: {message[3]}") |
|
print(f"Message: {message[2][:100]}...") |
|
print() |
|
|
|
df_data = [{"Nylon Returned Query": str(message)} for message in relevant_messages] |
|
|
|
|
|
df = pd.DataFrame(df_data) |
|
|
|
return df |
|
|
|
|
|
example_database = [ |
|
"What is Hydrogen Bonding?", |
|
"Tell me the difference between impulse and force.", |
|
"Tell me a joke that Calculus students will understand.", |
|
"How should I review for the AP Biology Exam?", |
|
"What kind of resources are available in PA and Indexademics?", |
|
"What is the StandardCAS™ group?", |
|
"Explain the concept of quantum entanglement.", |
|
"What are the main differences between mitosis and meiosis?", |
|
"How does the Doppler effect work?", |
|
"Explain the process of photosynthesis.", |
|
"What is the significance of the Pythagorean theorem?", |
|
"How does natural selection contribute to evolution?", |
|
"What is the most important chapter in AP Statistics?", |
|
"How should I prepare on the IB Chinese Exam?" |
|
] |
|
|
|
import random |
|
|
|
def get_random_examples(num_examples=5): |
|
return random.sample(example_database, min(num_examples, len(example_database))) |
|
|
|
|
|
chatbot_interface = gr.ChatInterface( |
|
chatbot_response, |
|
chatbot=gr.Chatbot(height=400), |
|
textbox=gr.Textbox(placeholder="Type your message here...", container=True, scale=10), |
|
title="Review With Arcana", |
|
description="ArcanaUI v0.8 - Chatbot", |
|
theme="soft", |
|
examples=get_random_examples(), |
|
cache_examples=False, |
|
retry_btn=None, |
|
undo_btn="Delete Previous", |
|
clear_btn="Clear" |
|
) |
|
|
|
|
|
def relaunch(): |
|
global demo |
|
demo.close() |
|
demo.launch(share=True) |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# ArcanaUI v0.8") |
|
with gr.Tabs(): |
|
with gr.TabItem("Welcome Page"): |
|
with open('introduction.txt',mode='r') as file: |
|
intro_content = file.read() |
|
gr.Markdown(intro_content) |
|
|
|
with gr.TabItem("Chatbot"): |
|
chatbot_interface.render() |
|
|
|
|
|
with gr.TabItem('Upload'): |
|
gr.Markdown('# Upload and View Files') |
|
|
|
with gr.Row(): |
|
|
|
with gr.Column(scale=1): |
|
uploaded_files_list = gr.DataFrame(headers=["Uploaded Files"], datatype="str", interactive=False) |
|
|
|
with gr.Row(): |
|
upload_button = gr.UploadButton('Upload File') |
|
refresh_button = gr.Button('Refresh') |
|
delete_button = gr.Button('Delete Selected File') |
|
|
|
|
|
with gr.Column(scale=1): |
|
with gr.Tab("File Viewer"): |
|
file_viewer = gr.File(label="File Restore") |
|
file_status = gr.Textbox(label="File Status", interactive=False) |
|
file_size = gr.Textbox(label="File Size", interactive=False) |
|
file_creation_time = gr.Textbox(label="File Creation Time", interactive=False) |
|
|
|
with gr.Row(): |
|
new_file_name = gr.Textbox(label="New File Name", placeholder="Enter new file name") |
|
rename_button = gr.Button("Rename File") |
|
|
|
|
|
with gr.Tab("Image Viewer"): |
|
image_viewer = gr.Image(label="Image Viewer", type="filepath") |
|
|
|
|
|
refresh_button.click(fn=refresh_files, outputs=uploaded_files_list) |
|
upload_button.upload(upload_file, inputs=upload_button, outputs=uploaded_files_list) |
|
delete_button.click(fn=delete_file, outputs=[uploaded_files_list, file_viewer, file_status, file_size, file_creation_time]) |
|
uploaded_files_list.select(fn=display_file, inputs=uploaded_files_list, outputs=[file_viewer, image_viewer, file_status]) |
|
uploaded_files_list.select(fn=on_select, outputs=[file_viewer, file_status, file_size, file_creation_time]) |
|
rename_button.click(fn=rename_file, |
|
inputs=new_file_name, |
|
outputs=[uploaded_files_list, file_status, file_viewer, file_size, file_creation_time]) |
|
|
|
render_button = gr.Button("Render all PDFs to Database") |
|
render_button.click(fn=render_to_database) |
|
|
|
with gr.TabItem('Settings'): |
|
with gr.TabItem('Database'): |
|
gr.Markdown('Settings') |
|
|
|
test_nylon = gr.Textbox(label='Test Nylon', placeholder='Query') |
|
uploaded_files_list2 = gr.DataFrame(headers=["Nylon Returned Query"], datatype="str", interactive=False) |
|
|
|
query_button = gr.Button('Query') |
|
|
|
query_button.click(fn=query_database, inputs=test_nylon, outputs=uploaded_files_list2) |
|
with gr.TabItem('Theme'): |
|
gr.Markdown('Change Theme') |
|
|
|
theme_dropdown = gr.Dropdown(choices=['default', 'compact', 'huggingface', 'soft', 'dark'], label='Choose Theme') |
|
theme_button = gr.Button('Apply Theme') |
|
|
|
theme_button.click(fn=change_theme, inputs=theme_dropdown) |
|
relaunch_button = gr.Button('Relaunch') |
|
relaunch_button.click(fn=relaunch) |
|
|
|
|
|
|
|
demo.launch(share=True) |
|
|
|
|
|
|
|
|