import nltk |
nltk.download('punkt_tab') |
import os |
from dotenv import load_dotenv |
import asyncio |
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect |
from fastapi.responses import HTMLResponse |
from fastapi.templating import Jinja2Templates |
from fastapi.middleware.cors import CORSMiddleware |
from langchain.chains import create_history_aware_retriever, create_retrieval_chain |
from langchain.chains.combine_documents import create_stuff_documents_chain |
from langchain_community.chat_message_histories import ChatMessageHistory |
from langchain_core.chat_history import BaseChatMessageHistory |
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder |
from langchain_core.runnables.history import RunnableWithMessageHistory |
from pinecone import Pinecone |
from pinecone_text.sparse import BM25Encoder |
from langchain_huggingface import HuggingFaceEmbeddings |
from langchain_community.retrievers import PineconeHybridSearchRetriever |
from langchain.retrievers import ContextualCompressionRetriever |
from langchain_community.chat_models import ChatPerplexity |
from langchain.retrievers.document_compressors import CrossEncoderReranker |
from langchain_community.cross_encoders import HuggingFaceCrossEncoder |
from langchain_core.prompts import PromptTemplate |
import re |
load_dotenv(".env") |
USER_AGENT = os.getenv("USER_AGENT") |
GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
SECRET_KEY = os.getenv("SECRET_KEY") |
os.environ['USER_AGENT'] = USER_AGENT |
os.environ["GROQ_API_KEY"] = GROQ_API_KEY |
os.environ["TOKENIZERS_PARALLELISM"] = 'true' |
app = FastAPI() |
origins = ["*"] |
app.add_middleware( |
CORSMiddleware, |
allow_origins=origins, |
allow_credentials=True, |
allow_methods=["*"], |
allow_headers=["*"], |
) |
templates = Jinja2Templates(directory="templates") |
def initialize_pinecone(index_name: str): |
try: |
pc = Pinecone(api_key=PINECONE_API_KEY) |
return pc.Index(index_name) |
except Exception as e: |
print(f"Error initializing Pinecone: {e}") |
raise |
pinecone_index = initialize_pinecone("updated-adib-bank") |
bm25 = BM25Encoder().load("./updated-adib-bank.json") |
embed_model = HuggingFaceEmbeddings(model_name="jinaai/jina-embeddings-v3", model_kwargs={"trust_remote_code":True}) |
retriever = PineconeHybridSearchRetriever( |
embeddings=embed_model, |
sparse_encoder=bm25, |
index=pinecone_index, |
top_k=20, |
alpha=0.5, |
) |
llm = ChatPerplexity(temperature=0, pplx_api_key=GROQ_API_KEY, model="llama-3.1-sonar-large-128k-chat", max_tokens=512, max_retries=2) |
model = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-base") |
compressor = CrossEncoderReranker(model=model, top_n=10) |
compression_retriever = ContextualCompressionRetriever( |
base_compressor=compressor, base_retriever=retriever |
) |
contextualize_q_system_prompt = """Given a chat history and the latest user question \ |
which might reference context in the chat history, formulate a standalone question \ |
which can be understood without the chat history. Do NOT answer the question, \ |
just reformulate it if needed and otherwise return it as is. |
""" |
contextualize_q_prompt = ChatPromptTemplate.from_messages( |
[ |
("system", contextualize_q_system_prompt), |
MessagesPlaceholder("chat_history"), |
("human", "{input}") |
] |
) |
history_aware_retriever = create_history_aware_retriever(llm, compression_retriever, contextualize_q_prompt) |
qa_system_prompt = """ You are a highly skilled information retrieval assistant. Use the following context to answer questions effectively. |
If you don't know the answer, simply state that you don't know. |
Your answer should be in {language} language. |
When responding to queries, follow these guidelines: |
1. Provide Clear Answers: |
- Based on the language of the question, you have to answer in that language. E.g., if the question is in English, then answer in English; if the question is in Arabic, you should answer in Arabic. |
- Ensure the response directly addresses the query with accurate and relevant information. |
- Do not give long answers. Provide detailed but concise responses. |
2. Formatting for Readability: |
- Provide the entire response in proper markdown format. |
- Use structured Maekdown elements such as headings, subheading, lists, tables, and links. |
- Use emaphsis on headings, important texts and phrases. |
3. Proper Citations: |
- ALWAYS USE INLINE CITATIONS with embed source URLs where users can verify information or explore further. |
- The inline citations should be in the format [1], [2], etc. |
- DO not inlcude references at the end of response. |
{context} |
""" |
qa_prompt = ChatPromptTemplate.from_messages( |
[ |
("system", qa_system_prompt), |
MessagesPlaceholder("chat_history"), |
("human", "{input}") |
] |
) |
document_prompt = PromptTemplate(input_variables=["page_content", "source"], template="{page_content} \n\n Source: {source}") |
question_answer_chain = create_stuff_documents_chain(llm, qa_prompt, document_prompt=document_prompt) |
rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain) |
store = {} |
def get_session_history(session_id: str) -> BaseChatMessageHistory: |
if session_id not in store: |
store[session_id] = ChatMessageHistory() |
return store[session_id] |
conversational_rag_chain = RunnableWithMessageHistory( |
rag_chain, |
get_session_history, |
input_messages_key="input", |
history_messages_key="chat_history", |
language_message_key="language", |
output_messages_key="answer", |
) |
@app.websocket("/ws") |
async def websocket_endpoint(websocket: WebSocket): |
await websocket.accept() |
print(f"Client connected: {websocket.client}") |
session_id = None |
try: |
while True: |
data = await websocket.receive_json() |
question = data.get('question') |
language = data.get('language') |
if "en" in language: |
language = "English" |
else: |
language = "Arabic" |
session_id = data.get('session_id', SESSION_ID_DEFAULT) |
try: |
async def stream_response(): |
complete_response = "" |
context = {} |
async for chunk in conversational_rag_chain.astream( |
{"input": question, 'language': language}, |
config={"configurable": {"session_id": session_id}} |
): |
if "context" in chunk: |
context = chunk['context'] |
if "answer" in chunk: |
complete_response += chunk['answer'] |
await websocket.send_json({'response': chunk['answer']}) |
if context: |
citations = re.findall(r'\[(\d+)\]', complete_response) |
citation_numbers = list(map(int, citations)) |
sources = dict() |
for index, doc in enumerate(context): |
if (index+1) in citation_numbers: |
sources[f"[{index+1}]"] = doc.metadata["source"] |
await websocket.send_json({'sources': sources}) |
await stream_response() |
except Exception as e: |
print(f"Error during message handling: {e}") |
await websocket.send_json({'response': "Something went wrong, Please try again.."}) |
except WebSocketDisconnect: |
print(f"Client disconnected: {websocket.client}") |
if session_id: |
store.pop(session_id, None) |
@app.get("/", response_class=HTMLResponse) |
async def read_index(request: Request): |
return templates.TemplateResponse("chat.html", {"request": request}) |