Spaces:
Sleeping
Sleeping
import streamlit as st | |
import os | |
import requests | |
import re | |
from langchain_community.document_loaders import PyPDFLoader | |
from langchain.docstore.document import Document | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain.vectorstores.faiss import FAISS | |
from langchain.prompts.prompt import PromptTemplate | |
from langchain_community.llms import LlamaCpp | |
from langchain.chains import RetrievalQA | |
from dotenv import load_dotenv | |
import google.generativeai as genai | |
# Loading Google Gemini | |
load_dotenv() | |
genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) | |
# Upload pdf file into 'pdf-data' folder if it does not exist | |
def fn_upload_pdf(mv_pdf_input_file, mv_processing_message): | |
"""Upload pdf file into 'pdf-data' folder if it does not exist""" | |
lv_file_name = mv_pdf_input_file.name | |
if not os.path.exists("pdf-data"): | |
os.makedirs("pdf-data") | |
lv_temp_file_path = os.path.join("pdf-data",lv_file_name) | |
if os.path.exists(lv_temp_file_path): | |
print("File already available") | |
fn_display_user_messages("File already available","Warning", mv_processing_message) | |
else: | |
with open(lv_temp_file_path,"wb") as lv_file: | |
lv_file.write(mv_pdf_input_file.getbuffer()) | |
print("Step1: PDF uploaded successfully at -> " + lv_temp_file_path) | |
fn_display_user_messages("Step1: PDF uploaded successfully at -> " + lv_temp_file_path, "Info", mv_processing_message) | |
# Create Vector DB of uploaded PDF | |
def fn_create_vector_db(mv_pdf_input_file, mv_processing_message): | |
"""Create Vector DB of uploaded PDF""" | |
lv_file_name = mv_pdf_input_file.name[:-4] + ".vectorstore" | |
if not os.path.exists(os.path.join("vectordb","fiaas")): | |
os.makedirs(os.path.join("vectordb","fiaas")) | |
lv_temp_file_path = os.path.join(os.path.join("vectordb","fiaas"),lv_file_name) | |
lv_embeddings = HuggingFaceEmbeddings( | |
model_name="sentence-transformers/all-mpnet-base-v2", | |
model_kwargs={'device': 'cpu'} | |
) | |
if os.path.exists(lv_temp_file_path): | |
print("VectorDB already available for uploaded file") | |
fn_display_user_messages("VectorDB already available for uploaded file","Warning", mv_processing_message) | |
lv_vector_store = FAISS.load_local(lv_temp_file_path, lv_embeddings,allow_dangerous_deserialization=True) | |
return lv_vector_store | |
else: | |
lv_temp_pdf_file_path = os.path.join("pdf-data",mv_pdf_input_file.name) | |
# -- Loading PDF Data | |
lv_pdf_loader = PyPDFLoader(lv_temp_pdf_file_path) | |
lv_pdf_content = lv_pdf_loader.load() | |
# -- Define patterns with flexibility | |
pattern1 = r"(\w+)-\n(\w+)" # Match hyphenated words separated by a line break | |
pattern2 = r"(?<!\n\s)\n(?!\s\n)" # Match line breaks not surrounded by whitespace | |
pattern3 = r"\n\s*\n" # Match multiple line breaks with optional whitespace | |
lv_pdf_formatted_content = [] | |
for lv_page in lv_pdf_content: | |
# -- Apply substitutions with flexibility | |
lv_pdf_page_content = re.sub(pattern1, r"\1\2", lv_page.page_content) | |
lv_pdf_page_content = re.sub(pattern2, " ", lv_pdf_page_content.strip()) | |
lv_pdf_page_content = re.sub(pattern3, " ", lv_pdf_page_content) | |
lv_pdf_page_content = re.sub("\n", " ", lv_pdf_page_content) | |
lv_pdf_formatted_content.append(Document( page_content= lv_pdf_page_content, | |
metadata= lv_page.metadata) | |
) | |
# print("Page Details of "+str(lv_page.metadata)+" is - "+lv_pdf_page_content) | |
print("Step2: PDF content extracted") | |
fn_display_user_messages("Step2: PDF content extracted", "Info", mv_processing_message) | |
# -- Chunking PDF Data | |
lv_text_splitter = CharacterTextSplitter( | |
separator="\n", | |
chunk_size=300, | |
chunk_overlap=30, | |
length_function=len | |
) | |
lv_pdf_chunk_documents = lv_text_splitter.split_documents(lv_pdf_formatted_content) | |
print("Step3: PDF content chucked and document object created") | |
fn_display_user_messages("Step3: PDF content chucked and document object created", "Info", mv_processing_message) | |
# -- Creating FIASS Vector Store | |
lv_vector_store = FAISS.from_documents(lv_pdf_chunk_documents, lv_embeddings) | |
print("Step4: Vector store created") | |
fn_display_user_messages("Step4: Vector store created", "Info", mv_processing_message) | |
lv_vector_store.save_local(lv_temp_file_path) | |
return lv_vector_store | |
# Display user Error, Warning or Success Message | |
def fn_display_user_messages(lv_text, lv_type, mv_processing_message): | |
"""Display user Info, Error, Warning or Success Message""" | |
if lv_type == "Success": | |
with mv_processing_message.container(): | |
st.success(lv_text) | |
elif lv_type == "Error": | |
with mv_processing_message.container(): | |
st.error(lv_text) | |
elif lv_type == "Warning": | |
with mv_processing_message.container(): | |
st.warning(lv_text) | |
else: | |
with mv_processing_message.container(): | |
st.info(lv_text) | |
# Download TheBloke Models | |
def fn_download_llm_models(mv_selected_model, mv_processing_message): | |
"""Download TheBloke Models""" | |
lv_download_url = "" | |
print("Downloading TheBloke of "+mv_selected_model) | |
fn_display_user_messages("Downloading TheBloke of "+mv_selected_model, "Info", mv_processing_message) | |
if mv_selected_model == 'microsoft/phi-2': | |
lv_download_url = "https://huggingface.co/TheBloke/phi-2-GGUF/resolve/main/phi-2.Q2_K.gguf" | |
elif mv_selected_model == 'google/gemma-2b': | |
lv_download_url = "https://huggingface.co/MaziyarPanahi/gemma-2b-it-GGUF/resolve/main/gemma-2b-it.Q2_K.gguf" | |
elif mv_selected_model == 'mistralai/Mistral-7B-Instruct-v0.2': | |
lv_download_url = "https://huggingface.co/TheBloke/Mistral-7B-Instruct-v0.2-GGUF/resolve/main/mistral-7b-instruct-v0.2.Q2_K.gguf" | |
if not os.path.exists("model"): | |
os.makedirs("model") | |
lv_filename = os.path.basename(lv_download_url) | |
lv_temp_file_path = os.path.join("model",lv_filename) | |
if os.path.exists(lv_temp_file_path): | |
print("Model already available") | |
fn_display_user_messages("Model already available","Warning", mv_processing_message) | |
else: | |
lv_response = requests.get(lv_download_url, stream=True) | |
if lv_response.status_code == 200: | |
with open(lv_temp_file_path, 'wb') as f: | |
for chunk in lv_response.iter_content(chunk_size=1024): | |
if chunk: | |
f.write(chunk) | |
print("Download completed") | |
fn_display_user_messages("Model download completed","Info", mv_processing_message) | |
else: | |
print(f"Model download completed {response.status_code}") | |
fn_display_user_messages(f"Model download completed {response.status_code}","Error", mv_processing_message) | |
# Function return QA Response using Vector Store | |
def fn_generate_QnA_response(mv_selected_model, mv_user_question, lv_vector_store, mv_processing_message): | |
"""Returns QA Response using Vector Store""" | |
lv_model_path = "" | |
lv_model_type = "" | |
lv_template = """Instruction: | |
You are an AI assistant for answering questions about the provided context. | |
You are given the following extracted parts of a long document and a question. Provide a detailed answer. | |
If you don't know the answer, just say "Hmm, I'm not sure." Don't try to make up an answer. | |
======= | |
{context} | |
======= | |
Question: {question} | |
Output:\n""" | |
lv_qa_prompt = PromptTemplate( | |
template=lv_template, | |
input_variables=["question", "context"] | |
) | |
if mv_selected_model == 'microsoft/phi-2': | |
lv_model_path = "model/phi-2.Q2_K.gguf" | |
lv_model_type = "pi" | |
elif mv_selected_model == 'google/gemma-2b': | |
lv_model_path = "model/gemma-2b-it.Q2_K.gguf" | |
lv_model_type = "gemma" | |
elif mv_selected_model == 'mistralai/Mistral-7B-Instruct-v0.2': | |
lv_model_path = "model/mistral-7b-instruct-v0.2.Q2_K.gguf" | |
lv_model_type = "mistral" | |
print("Step4: Generating LLM response") | |
fn_display_user_messages("Step4: Generating LLM response","Info", mv_processing_message) | |
lv_model = LlamaCpp( | |
model_path=lv_model_path, | |
temperature=0.00, | |
max_tokens=2048, | |
top_p=1, | |
n_ctx=2048, | |
verbose=False | |
) | |
lv_vector_search_result = lv_vector_store.similarity_search(mv_user_question, k=2) | |
# print("Vector Search Result - ") | |
# print(lv_vector_search_result) | |
# -- Creating formatted document result | |
lv_document_context = "" | |
lv_count = 0 | |
for lv_result in lv_vector_search_result: | |
print("Concatenating Result of page - " + str(lv_count) + " with content of document page no - "+str(lv_result.metadata["page"])) | |
lv_document_context += lv_result.page_content | |
lv_count += 1 | |
# print("Formatted Document Search Result - ") | |
# print(lv_document_context) | |
lv_qa_formatted_prompt = lv_qa_prompt.format( | |
question=mv_user_question, | |
context=lv_document_context | |
) | |
print("Formatted Prompt - " + lv_qa_formatted_prompt) | |
lv_llm_response = lv_model(lv_qa_formatted_prompt) | |
# print("LLM Response" +lv_llm_response) | |
print("Step5: LLM response generated") | |
fn_display_user_messages("Step5: LLM response generated","Info", mv_processing_message) | |
return lv_llm_response | |
# Function return API based QA Response using Vector Store | |
def fn_generate_API_QnA_response(mv_selected_model, mv_user_question, lv_vector_store, mv_processing_message): | |
"""Returns QA Response using Vector Store""" | |
lv_template = """Instruction: | |
You are an AI assistant for answering questions about the provided context. | |
You are given the following extracted parts of a long document and a question. Provide a detailed answer. | |
If you don't know the answer, just say "Hmm, I'm not sure." Don't try to make up an answer. | |
======= | |
{context} | |
======= | |
Question: {question} | |
Output:\n""" | |
lv_qa_prompt = PromptTemplate( | |
template=lv_template, | |
input_variables=["question", "context"] | |
) | |
lv_vector_search_result = lv_vector_store.similarity_search(mv_user_question, k=2) | |
# print("Vector Search Result - ") | |
# print(lv_vector_search_result) | |
# -- Creating formatted document result | |
lv_document_context = "" | |
lv_count = 0 | |
for lv_result in lv_vector_search_result: | |
# print("Concatenating Result of page - " + str(lv_count) + " with content of document page no - "+str(lv_result.metadata["page"])) | |
lv_document_context += lv_result.page_content | |
lv_count += 1 | |
print("Formatted Document Search Result - ") | |
print(lv_document_context) | |
lv_qa_formatted_prompt = lv_qa_prompt.format( | |
question=mv_user_question, | |
context=lv_document_context | |
) | |
if mv_selected_model == 'Google Gemini-pro': | |
lv_model = genai.GenerativeModel('gemini-pro') | |
print("Step4: Generating LLM response") | |
fn_display_user_messages("Step4: Generating LLM response","Info", mv_processing_message) | |
lv_llm_response = lv_model.generate_content(lv_qa_formatted_prompt).text | |
print("Step5: LLM response generated") | |
fn_display_user_messages("Step5: LLM response generated","Info", mv_processing_message) | |
return lv_llm_response | |
# Main Function | |
def main(): | |
# -- Streamlit Settings | |
st.set_page_config(layout='wide') | |
col1, col2, col3 = st.columns(3) | |
col2.title("Chat with PDF") | |
st.text("") | |
# -- Initialize chat history | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
# -- Display Supported Models | |
col1, col2, col3 = st.columns(3) | |
mv_selected_model = col3.selectbox('Select Model', | |
[ | |
'microsoft/phi-2', | |
'google/gemma-2b', | |
'mistralai/Mistral-7B-Instruct-v0.2', | |
'Google Gemini-pro' | |
] | |
) | |
# -- Display Supported Vector Stores | |
col1, col2, col3 = st.columns(3) | |
mv_selected_vector_db = col3.selectbox('Select Vector DB', ['FAISS']) | |
st.text("") | |
# -- Reading PDF File | |
col1, col2, col3 = st.columns(3) | |
mv_pdf_input_file = col2.file_uploader("Choose a PDF file:", type=["pdf"]) | |
# -- Display Processing Details | |
st.text("") | |
col1, col2, col3 = st.columns(3) | |
mv_processing_message = col2.empty() | |
st.text("") | |
# -- Downloading Model Files | |
if mv_selected_model != "Google Gemini-pro": | |
fn_download_llm_models(mv_selected_model, mv_processing_message) | |
else: | |
print("Call Google API") | |
# -- Processing PDF | |
if (mv_pdf_input_file is not None): | |
# -- Upload PDF | |
fn_upload_pdf(mv_pdf_input_file, mv_processing_message) | |
# -- Create Vector Index | |
lv_vector_store = fn_create_vector_db(mv_pdf_input_file, mv_processing_message) | |
# -- Perform RAG | |
col1, col2, col3 = st.columns(3) | |
st.text("") | |
lv_chat_history = col2.chat_message | |
st.text("") | |
if mv_user_question := col2.chat_input("Chat on PDF Data"): | |
# -- Add user message to chat history | |
st.session_state.messages.append({"role": "user", "content": mv_user_question}) | |
# -- Generating LLM response | |
if mv_selected_model != "Google Gemini-pro": | |
lv_response = fn_generate_QnA_response(mv_selected_model, mv_user_question, lv_vector_store, mv_processing_message) | |
else: | |
lv_response = fn_generate_API_QnA_response(mv_selected_model, mv_user_question, lv_vector_store, mv_processing_message) | |
# -- Adding assistant response to chat history | |
st.session_state.messages.append({"role": "assistant", "content": lv_response}) | |
# -- Display chat messages from history on app rerun | |
for message in st.session_state.messages: | |
with lv_chat_history(message["role"]): | |
st.markdown(message["content"]) | |
# -- Validate Data | |
# -- Get Web Response | |
# Calling Main Function | |
if __name__ == '__main__': | |
main() |