Spaces:
Sleeping
Sleeping
import io | |
import os | |
import tempfile | |
import hashlib | |
import json | |
import logging | |
import pandas as pd | |
from datetime import datetime | |
from dotenv import load_dotenv | |
from langchain_community.vectorstores import FAISS | |
from langchain_openai import OpenAIEmbeddings | |
from langchain.text_splitter import CharacterTextSplitter | |
from PyPDF2 import PdfReader | |
from docx import Document | |
# from transformers import pipeline | |
# Load environment variables | |
load_dotenv() | |
open_api_key_token = os.getenv('OPENAI_API_KEY') | |
class FileHandler: | |
def __init__(self, vector_db_path): | |
self.vector_db_path = vector_db_path | |
self.embeddings = OpenAIEmbeddings(api_key=open_api_key_token) | |
# self.summarizer = pipeline("summarization") | |
def prepare_metadata_string(self, document_name, document_description, department, version, last_updated): | |
metadata_string = f"\nDocument Name: {document_name}\nDocument Description: {document_description}\nDepartment: {department}\nVersion: {version}\nLast Updated: {last_updated}" | |
return metadata_string | |
async def handle_file_upload(self, file, document_name, document_description, department, version, last_updated): | |
content = await file.read() | |
file_hash = hashlib.md5(content).hexdigest() | |
file_key = f"{file.filename}_{file_hash}" | |
vector_store_path = os.path.join(self.vector_db_path, f"{file_key}.vectorstore") | |
metadata_path = os.path.join(self.vector_db_path, f"{file_key}.metadata.json") | |
metadata_string = self.prepare_metadata_string(document_name, document_description, department, version, | |
last_updated) | |
if os.path.exists(vector_store_path) and os.path.exists(metadata_path): | |
with open(metadata_path, 'r') as md_file: | |
metadata = json.load(md_file) | |
return {'path': vector_store_path, 'metadata': metadata, 'status': 'skipped - duplicate'} | |
if file.filename.endswith('.csv') or file.filename.endswith('.xlsx'): | |
texts = self.load_and_split_table(content, file.filename,metadata_string) | |
else: | |
texts = await self.load_and_split_text(content, file.filename,metadata_string) | |
vector_store = self.create_vector_store(texts) | |
vector_store.save_local(vector_store_path) | |
metadata = { | |
'filename': file.filename, | |
'document_name': document_name, | |
'document_description': document_description, | |
'department': department, | |
'version': version, | |
'last_updated': last_updated, | |
'hash': file_hash, | |
'upload_date': datetime.now().isoformat(), | |
'file_path': vector_store_path, | |
'file_size': len(content), | |
'content_type': file.content_type | |
} | |
with open(metadata_path, 'w') as md_file: | |
json.dump(metadata, md_file) | |
return {"message": "File processed and vector store created successfully", "file_metadata": metadata} | |
def summarize_text(self, text): | |
try: | |
summary = self.summarizer(text, max_length=150, min_length=10, do_sample=False) | |
logging.info("Text summarization successful") | |
return summary[0]['summary_text'] | |
except Exception as e: | |
logging.error(f"Error in summarization: {str(e)}") | |
# Log error or handle exception | |
return text # Return original text if summarization is not possible | |
def load_and_split_table(self, content, filename,metadata_string): | |
# Handle CSV and Excel file reading | |
if filename.endswith('.csv'): | |
df = pd.read_csv(io.StringIO(content.decode('utf-8'))) | |
else: # Excel | |
df = pd.read_excel(io.BytesIO(content)) | |
text = df.to_string(index=False) # Convert DataFrame to string | |
text += metadata_string # Append metadata to the text | |
return self.split_text(text) | |
async def load_and_split_text(self, content, filename,metadata_string): | |
with tempfile.NamedTemporaryFile(delete=False, mode='w+b', suffix=f"_{filename}") as temp_file: | |
temp_file.write(content) | |
temp_file.flush() | |
temp_file_path = temp_file.name | |
# Ensure the temp file is closed before reading from it | |
if filename.endswith('.pdf'): | |
texts = await self.load_and_split_pdf(temp_file_path,metadata_string) | |
elif filename.endswith('.docx'): | |
texts = await self.load_and_split_docx(temp_file_path,metadata_string) | |
elif filename.endswith('.txt'): | |
texts = await self.load_and_split_txt(temp_file_path,metadata_string) | |
# Apply summarization here to each text segment | |
# summarized_texts = [self.summarize_text(text) for text in texts] | |
# os.unlink(temp_file_path) # Explicitly remove the temporary file | |
# return summarized_texts | |
os.unlink(temp_file_path) # Explicitly remove the temporary file | |
return texts | |
async def load_and_split_pdf(self, pdf_path,metadata_string): | |
reader = PdfReader(pdf_path) | |
text = '' | |
for page in reader.pages: | |
text += page.extract_text() or "" | |
text += metadata_string # Append metadata to the text | |
return self.split_text(text) | |
async def load_and_split_docx(self, docx_path,metadata_string): | |
doc = Document(docx_path) | |
text = '\n'.join([paragraph.text for paragraph in doc.paragraphs if paragraph.text]) | |
text += metadata_string # Append metadata to the text | |
return self.split_text(text) | |
async def load_and_split_txt(self, txt_path,metadata_string): | |
with open(txt_path, 'r', encoding='utf-8') as file: | |
text = file.read() | |
text += metadata_string # Append metadata to the text | |
return self.split_text(text) | |
def split_text(self, text): | |
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
return text_splitter.split_text(text) | |
def create_vector_store(self, texts): | |
return FAISS.from_texts(texts, self.embeddings) | |