chatwithpdf / app.py
rahgadda's picture
Initial Draft
526daeb verified
import streamlit as st
import os
import requests
import re
from langchain_community.document_loaders import PyPDFLoader
from langchain.docstore.document import Document
from langchain.text_splitter import CharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores.faiss import FAISS
from langchain.prompts.prompt import PromptTemplate
from langchain_community.llms import LlamaCpp
from langchain.chains import RetrievalQA
from dotenv import load_dotenv
import google.generativeai as genai
# Loading Google Gemini
load_dotenv()
genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
# Upload pdf file into 'pdf-data' folder if it does not exist
def fn_upload_pdf(mv_pdf_input_file, mv_processing_message):
"""Upload pdf file into 'pdf-data' folder if it does not exist"""
lv_file_name = mv_pdf_input_file.name
if not os.path.exists("pdf-data"):
os.makedirs("pdf-data")
lv_temp_file_path = os.path.join("pdf-data",lv_file_name)
if os.path.exists(lv_temp_file_path):
print("File already available")
fn_display_user_messages("File already available","Warning", mv_processing_message)
else:
with open(lv_temp_file_path,"wb") as lv_file:
lv_file.write(mv_pdf_input_file.getbuffer())
print("Step1: PDF uploaded successfully at -> " + lv_temp_file_path)
fn_display_user_messages("Step1: PDF uploaded successfully at -> " + lv_temp_file_path, "Info", mv_processing_message)
# Create Vector DB of uploaded PDF
def fn_create_vector_db(mv_pdf_input_file, mv_processing_message):
"""Create Vector DB of uploaded PDF"""
lv_file_name = mv_pdf_input_file.name[:-4] + ".vectorstore"
if not os.path.exists(os.path.join("vectordb","fiaas")):
os.makedirs(os.path.join("vectordb","fiaas"))
lv_temp_file_path = os.path.join(os.path.join("vectordb","fiaas"),lv_file_name)
lv_embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-mpnet-base-v2",
model_kwargs={'device': 'cpu'}
)
if os.path.exists(lv_temp_file_path):
print("VectorDB already available for uploaded file")
fn_display_user_messages("VectorDB already available for uploaded file","Warning", mv_processing_message)
lv_vector_store = FAISS.load_local(lv_temp_file_path, lv_embeddings,allow_dangerous_deserialization=True)
return lv_vector_store
else:
lv_temp_pdf_file_path = os.path.join("pdf-data",mv_pdf_input_file.name)
# -- Loading PDF Data
lv_pdf_loader = PyPDFLoader(lv_temp_pdf_file_path)
lv_pdf_content = lv_pdf_loader.load()
# -- Define patterns with flexibility
pattern1 = r"(\w+)-\n(\w+)" # Match hyphenated words separated by a line break
pattern2 = r"(?<!\n\s)\n(?!\s\n)" # Match line breaks not surrounded by whitespace
pattern3 = r"\n\s*\n" # Match multiple line breaks with optional whitespace
lv_pdf_formatted_content = []
for lv_page in lv_pdf_content:
# -- Apply substitutions with flexibility
lv_pdf_page_content = re.sub(pattern1, r"\1\2", lv_page.page_content)
lv_pdf_page_content = re.sub(pattern2, " ", lv_pdf_page_content.strip())
lv_pdf_page_content = re.sub(pattern3, " ", lv_pdf_page_content)
lv_pdf_page_content = re.sub("\n", " ", lv_pdf_page_content)
lv_pdf_formatted_content.append(Document( page_content= lv_pdf_page_content,
metadata= lv_page.metadata)
)
# print("Page Details of "+str(lv_page.metadata)+" is - "+lv_pdf_page_content)
print("Step2: PDF content extracted")
fn_display_user_messages("Step2: PDF content extracted", "Info", mv_processing_message)
# -- Chunking PDF Data
lv_text_splitter = CharacterTextSplitter(
separator="\n",
chunk_size=300,
chunk_overlap=30,
length_function=len
)
lv_pdf_chunk_documents = lv_text_splitter.split_documents(lv_pdf_formatted_content)
print("Step3: PDF content chucked and document object created")
fn_display_user_messages("Step3: PDF content chucked and document object created", "Info", mv_processing_message)
# -- Creating FIASS Vector Store
lv_vector_store = FAISS.from_documents(lv_pdf_chunk_documents, lv_embeddings)
print("Step4: Vector store created")
fn_display_user_messages("Step4: Vector store created", "Info", mv_processing_message)
lv_vector_store.save_local(lv_temp_file_path)
return lv_vector_store
# Display user Error, Warning or Success Message
def fn_display_user_messages(lv_text, lv_type, mv_processing_message):
"""Display user Info, Error, Warning or Success Message"""
if lv_type == "Success":
with mv_processing_message.container():
st.success(lv_text)
elif lv_type == "Error":
with mv_processing_message.container():
st.error(lv_text)
elif lv_type == "Warning":
with mv_processing_message.container():
st.warning(lv_text)
else:
with mv_processing_message.container():
st.info(lv_text)
# Download TheBloke Models
def fn_download_llm_models(mv_selected_model, mv_processing_message):
"""Download TheBloke Models"""
lv_download_url = ""
print("Downloading TheBloke of "+mv_selected_model)
fn_display_user_messages("Downloading TheBloke of "+mv_selected_model, "Info", mv_processing_message)
if mv_selected_model == 'microsoft/phi-2':
lv_download_url = "https://huggingface.co/TheBloke/phi-2-GGUF/resolve/main/phi-2.Q2_K.gguf"
elif mv_selected_model == 'google/gemma-2b':
lv_download_url = "https://huggingface.co/MaziyarPanahi/gemma-2b-it-GGUF/resolve/main/gemma-2b-it.Q2_K.gguf"
elif mv_selected_model == 'mistralai/Mistral-7B-Instruct-v0.2':
lv_download_url = "https://huggingface.co/TheBloke/Mistral-7B-Instruct-v0.2-GGUF/resolve/main/mistral-7b-instruct-v0.2.Q2_K.gguf"
if not os.path.exists("model"):
os.makedirs("model")
lv_filename = os.path.basename(lv_download_url)
lv_temp_file_path = os.path.join("model",lv_filename)
if os.path.exists(lv_temp_file_path):
print("Model already available")
fn_display_user_messages("Model already available","Warning", mv_processing_message)
else:
lv_response = requests.get(lv_download_url, stream=True)
if lv_response.status_code == 200:
with open(lv_temp_file_path, 'wb') as f:
for chunk in lv_response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
print("Download completed")
fn_display_user_messages("Model download completed","Info", mv_processing_message)
else:
print(f"Model download completed {response.status_code}")
fn_display_user_messages(f"Model download completed {response.status_code}","Error", mv_processing_message)
# Function return QA Response using Vector Store
def fn_generate_QnA_response(mv_selected_model, mv_user_question, lv_vector_store, mv_processing_message):
"""Returns QA Response using Vector Store"""
lv_model_path = ""
lv_model_type = ""
lv_template = """Instruction:
You are an AI assistant for answering questions about the provided context.
You are given the following extracted parts of a long document and a question. Provide a detailed answer.
If you don't know the answer, just say "Hmm, I'm not sure." Don't try to make up an answer.
=======
{context}
=======
Question: {question}
Output:\n"""
lv_qa_prompt = PromptTemplate(
template=lv_template,
input_variables=["question", "context"]
)
if mv_selected_model == 'microsoft/phi-2':
lv_model_path = "model/phi-2.Q2_K.gguf"
lv_model_type = "pi"
elif mv_selected_model == 'google/gemma-2b':
lv_model_path = "model/gemma-2b-it.Q2_K.gguf"
lv_model_type = "gemma"
elif mv_selected_model == 'mistralai/Mistral-7B-Instruct-v0.2':
lv_model_path = "model/mistral-7b-instruct-v0.2.Q2_K.gguf"
lv_model_type = "mistral"
print("Step4: Generating LLM response")
fn_display_user_messages("Step4: Generating LLM response","Info", mv_processing_message)
lv_model = LlamaCpp(
model_path=lv_model_path,
temperature=0.00,
max_tokens=2048,
top_p=1,
n_ctx=2048,
verbose=False
)
lv_vector_search_result = lv_vector_store.similarity_search(mv_user_question, k=2)
# print("Vector Search Result - ")
# print(lv_vector_search_result)
# -- Creating formatted document result
lv_document_context = ""
lv_count = 0
for lv_result in lv_vector_search_result:
print("Concatenating Result of page - " + str(lv_count) + " with content of document page no - "+str(lv_result.metadata["page"]))
lv_document_context += lv_result.page_content
lv_count += 1
# print("Formatted Document Search Result - ")
# print(lv_document_context)
lv_qa_formatted_prompt = lv_qa_prompt.format(
question=mv_user_question,
context=lv_document_context
)
print("Formatted Prompt - " + lv_qa_formatted_prompt)
lv_llm_response = lv_model(lv_qa_formatted_prompt)
# print("LLM Response" +lv_llm_response)
print("Step5: LLM response generated")
fn_display_user_messages("Step5: LLM response generated","Info", mv_processing_message)
return lv_llm_response
# Function return API based QA Response using Vector Store
def fn_generate_API_QnA_response(mv_selected_model, mv_user_question, lv_vector_store, mv_processing_message):
"""Returns QA Response using Vector Store"""
lv_template = """Instruction:
You are an AI assistant for answering questions about the provided context.
You are given the following extracted parts of a long document and a question. Provide a detailed answer.
If you don't know the answer, just say "Hmm, I'm not sure." Don't try to make up an answer.
=======
{context}
=======
Question: {question}
Output:\n"""
lv_qa_prompt = PromptTemplate(
template=lv_template,
input_variables=["question", "context"]
)
lv_vector_search_result = lv_vector_store.similarity_search(mv_user_question, k=2)
# print("Vector Search Result - ")
# print(lv_vector_search_result)
# -- Creating formatted document result
lv_document_context = ""
lv_count = 0
for lv_result in lv_vector_search_result:
# print("Concatenating Result of page - " + str(lv_count) + " with content of document page no - "+str(lv_result.metadata["page"]))
lv_document_context += lv_result.page_content
lv_count += 1
print("Formatted Document Search Result - ")
print(lv_document_context)
lv_qa_formatted_prompt = lv_qa_prompt.format(
question=mv_user_question,
context=lv_document_context
)
if mv_selected_model == 'Google Gemini-pro':
lv_model = genai.GenerativeModel('gemini-pro')
print("Step4: Generating LLM response")
fn_display_user_messages("Step4: Generating LLM response","Info", mv_processing_message)
lv_llm_response = lv_model.generate_content(lv_qa_formatted_prompt).text
print("Step5: LLM response generated")
fn_display_user_messages("Step5: LLM response generated","Info", mv_processing_message)
return lv_llm_response
# Main Function
def main():
# -- Streamlit Settings
st.set_page_config(layout='wide')
col1, col2, col3 = st.columns(3)
col2.title("Chat with PDF")
st.text("")
# -- Initialize chat history
if "messages" not in st.session_state:
st.session_state.messages = []
# -- Display Supported Models
col1, col2, col3 = st.columns(3)
mv_selected_model = col3.selectbox('Select Model',
[
'microsoft/phi-2',
'google/gemma-2b',
'mistralai/Mistral-7B-Instruct-v0.2',
'Google Gemini-pro'
]
)
# -- Display Supported Vector Stores
col1, col2, col3 = st.columns(3)
mv_selected_vector_db = col3.selectbox('Select Vector DB', ['FAISS'])
st.text("")
# -- Reading PDF File
col1, col2, col3 = st.columns(3)
mv_pdf_input_file = col2.file_uploader("Choose a PDF file:", type=["pdf"])
# -- Display Processing Details
st.text("")
col1, col2, col3 = st.columns(3)
mv_processing_message = col2.empty()
st.text("")
# -- Downloading Model Files
if mv_selected_model != "Google Gemini-pro":
fn_download_llm_models(mv_selected_model, mv_processing_message)
else:
print("Call Google API")
# -- Processing PDF
if (mv_pdf_input_file is not None):
# -- Upload PDF
fn_upload_pdf(mv_pdf_input_file, mv_processing_message)
# -- Create Vector Index
lv_vector_store = fn_create_vector_db(mv_pdf_input_file, mv_processing_message)
# -- Perform RAG
col1, col2, col3 = st.columns(3)
st.text("")
lv_chat_history = col2.chat_message
st.text("")
if mv_user_question := col2.chat_input("Chat on PDF Data"):
# -- Add user message to chat history
st.session_state.messages.append({"role": "user", "content": mv_user_question})
# -- Generating LLM response
if mv_selected_model != "Google Gemini-pro":
lv_response = fn_generate_QnA_response(mv_selected_model, mv_user_question, lv_vector_store, mv_processing_message)
else:
lv_response = fn_generate_API_QnA_response(mv_selected_model, mv_user_question, lv_vector_store, mv_processing_message)
# -- Adding assistant response to chat history
st.session_state.messages.append({"role": "assistant", "content": lv_response})
# -- Display chat messages from history on app rerun
for message in st.session_state.messages:
with lv_chat_history(message["role"]):
st.markdown(message["content"])
# -- Validate Data
# -- Get Web Response
# Calling Main Function
if __name__ == '__main__':
main()