Spaces:
Running
Running
import os | |
from typing import ( | |
Any, | |
Union, | |
) | |
import zipfile | |
import streamlit as st | |
from streamlit.runtime.uploaded_file_manager import ( | |
UploadedFile, | |
UploadedFileRec, | |
UploadedFileManager, | |
) | |
from streamlit.runtime.scriptrunner import get_script_run_ctx | |
from supabase.client import Client | |
from langchain.vectorstores.supabase import SupabaseVectorStore | |
from components_keys import ComponentsKeys | |
from loaders.audio import process_audio | |
from loaders.txt import process_txt | |
from loaders.csv import process_csv | |
from loaders.markdown import process_markdown | |
from loaders.pdf import process_pdf | |
from loaders.html import ( | |
create_html_file, | |
delete_tempfile, | |
get_html, | |
process_html, | |
) | |
from loaders.powerpoint import process_powerpoint | |
from loaders.docx import process_docx | |
from utils import compute_sha1_from_content | |
ctx = get_script_run_ctx() | |
manager = UploadedFileManager() | |
file_processors = { | |
".txt": process_txt, | |
".csv": process_csv, | |
".md": process_markdown, | |
".markdown": process_markdown, | |
".m4a": process_audio, | |
".mp3": process_audio, | |
".webm": process_audio, | |
".mp4": process_audio, | |
".mpga": process_audio, | |
".wav": process_audio, | |
".mpeg": process_audio, | |
".pdf": process_pdf, | |
".html": process_html, | |
".pptx": process_powerpoint, | |
".docx": process_docx | |
} | |
def file_uploader(supabase, vector_store): | |
# Omit zip file support if the `st.secrets.self_hosted` != "true" because | |
# a zip file can consist of multiple files so the limit on 1 file uploaded | |
# at a time in the demo can be circumvented. | |
accepted_file_extensions = list(file_processors.keys()) | |
accept_multiple_files = st.secrets.self_hosted == "true" | |
if accept_multiple_files: | |
accepted_file_extensions += [".zip"] | |
files = st.file_uploader( | |
"**Upload a file**", | |
accept_multiple_files=accept_multiple_files, | |
type=accepted_file_extensions, | |
key=ComponentsKeys.FILE_UPLOADER, | |
) | |
if st.secrets.self_hosted == "false": | |
st.markdown("**In demo mode, the max file size is 1MB**") | |
if st.button("Add to Database"): | |
# Single file upload | |
if isinstance(files, UploadedFile): | |
filter_file(files, supabase, vector_store) | |
# Multiple files upload | |
elif isinstance(files, list): | |
for file in files: | |
filter_file(file, supabase, vector_store) | |
def file_already_exists(supabase, file): | |
file_sha1 = compute_sha1_from_content(file.getvalue()) | |
response = supabase.table("documents").select("id").eq("metadata->>file_sha1", file_sha1).execute() | |
return len(response.data) > 0 | |
def file_to_uploaded_file(file: Any) -> Union[None, UploadedFile]: | |
"""Convert a file to a streamlit `UploadedFile` object. | |
This allows us to unzip files and treat them the same way | |
streamlit treats files uploaded through the file uploader. | |
Parameters | |
--------- | |
file : Any | |
The file. Can be any file supported by this app. | |
Returns | |
------- | |
Union[None, UploadedFile] | |
The file converted to a streamlit `UploadedFile` object. | |
Returns `None` if the script context cannot be grabbed. | |
""" | |
if ctx is None: | |
print("script context not found, skipping uploading file:", file.name) | |
return | |
file_extension = os.path.splitext(file.name)[-1] | |
file_name = file.name | |
file_data = file.read() | |
# The file manager will automatically assign an ID so pass `None` | |
# Reference: https://github.com/streamlit/streamlit/blob/9a6ce804b7977bdc1f18906d1672c45f9a9b3398/lib/streamlit/runtime/uploaded_file_manager.py#LL98C6-L98C6 | |
uploaded_file_rec = UploadedFileRec(None, file_name, file_extension, file_data) | |
uploaded_file_rec = manager.add_file( | |
ctx.session_id, | |
ComponentsKeys.FILE_UPLOADER, | |
uploaded_file_rec, | |
) | |
return UploadedFile(uploaded_file_rec) | |
def filter_zip_file( | |
file: UploadedFile, | |
supabase: Client, | |
vector_store: SupabaseVectorStore, | |
) -> None: | |
"""Unzip the zip file then filter each unzipped file. | |
Parameters | |
---------- | |
file : UploadedFile | |
The uploaded file from the file uploader. | |
supabase : Client | |
The supabase client. | |
vector_store : SupabaseVectorStore | |
The vector store in the database. | |
""" | |
with zipfile.ZipFile(file, "r") as z: | |
unzipped_files = z.namelist() | |
for unzipped_file in unzipped_files: | |
with z.open(unzipped_file, "r") as f: | |
filter_file(f, supabase, vector_store) | |
def filter_file(file, supabase, vector_store): | |
# Streamlit file uploads are of type `UploadedFile` which has the | |
# necessary methods and attributes for this app to work. | |
if not isinstance(file, UploadedFile): | |
file = file_to_uploaded_file(file) | |
file_extension = os.path.splitext(file.name)[-1] | |
if file_extension == ".zip": | |
filter_zip_file(file, supabase, vector_store) | |
return True | |
if file_already_exists(supabase, file): | |
st.write(f"π {file.name} is already in the database.") | |
return False | |
if file.size < 1: | |
st.write(f"π¨ {file.name} is empty.") | |
return False | |
if file_extension in file_processors: | |
if st.secrets.self_hosted == "false": | |
file_processors[file_extension](vector_store, file, stats_db=supabase) | |
else: | |
file_processors[file_extension](vector_store, file, stats_db=None) | |
st.write(f"β {file.name} ") | |
return True | |
st.write(f"β {file.name} is not a valid file type.") | |
return False | |
def url_uploader(supabase, vector_store): | |
url = st.text_area("**Add an url**",placeholder="https://meraGPT.com") | |
button = st.button("Add the URL to the database") | |
if button: | |
if not st.session_state["overused"]: | |
html = get_html(url) | |
if html: | |
st.write(f"Getting content ... {url} ") | |
try: | |
file, temp_file_path = create_html_file(url, html) | |
except UnicodeEncodeError as e: | |
st.write(f"β Error encoding character: {e}") | |
file, temp_file_path = create_html_file(url, html) | |
ret = filter_file(file, supabase, vector_store) | |
delete_tempfile(temp_file_path, url, ret) | |
else: | |
st.write(f"β Failed to access to {url} .") | |
else: | |
st.write("You have reached your daily limit. Please come back later or self host the solution.") |