Spaces:
Running
Running
import os | |
from getpass import getpass | |
import csv | |
from langchain_core.documents import Document | |
from langchain_text_splitters import RecursiveCharacterTextSplitter | |
#from langchain.schema import Document | |
from langchain_huggingface import HuggingFaceEmbeddings | |
import torch | |
from langchain_huggingface import HuggingFaceEndpoint | |
from langchain_community.cache import InMemoryCache | |
from langchain.globals import set_llm_cache | |
from langchain_chroma import Chroma | |
from langchain.chains import RetrievalQA | |
import numpy as np | |
import gradio | |
import sqlite3 | |
from dotenv import load_dotenv | |
# Load environment variables | |
load_dotenv() | |
#hfapi_key = getpass("Enter you HuggingFace access token:") | |
hfapi_key = os.getenv("Mytoken") | |
if not hfapi_key: | |
raise ValueError("HUGGINGFACE_API_KEY not found in environment variables") | |
os.environ["HF_TOKEN"] = hfapi_key | |
os.environ["HUGGINGFACEHUB_API_TOKEN"] = hfapi_key | |
set_llm_cache(InMemoryCache()) | |
persist_directory = 'docs/chroma/' | |
#################################### | |
def load_file_as_JSON(): | |
print("$$$$$ ENTER INTO load_file_as_JSON $$$$$") | |
rows = [] | |
with open("mini-llama-articles.csv", mode="r", encoding="utf-8") as file: | |
csv_reader = csv.reader(file) | |
for idx, row in enumerate(csv_reader): | |
if idx == 0: | |
continue | |
# Skip header row | |
rows.append(row) | |
print("@@@@@@ EXIT FROM load_file_as_JSON @@@@@") | |
return rows | |
#################################### | |
def get_documents(): | |
print("$$$$$ ENTER INTO get_documents $$$$$") | |
documents = [ | |
Document( | |
page_content=row[1], metadata={"title": row[0], "url": row[2], "source_name": row[3]} | |
) | |
for row in load_file_as_JSON() | |
] | |
print("documents lenght is ", len(documents)) | |
print("first entry from documents ", documents[0]) | |
print("document metadata ", documents[0].metadata) | |
print("@@@@@@ EXIT FROM get_documents @@@@@") | |
return documents | |
#################################### | |
def getDocSplitter(): | |
print("$$$$$ ENTER INTO getDocSplitter $$$$$") | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size = 512, | |
chunk_overlap = 128 | |
) | |
splits = text_splitter.split_documents(get_documents()) | |
print("Split length ", len(splits)) | |
print("Page content ", splits[0].page_content) | |
print("@@@@@@ EXIT FROM getDocSplitter @@@@@") | |
return splits | |
#################################### | |
def getEmbeddings(): | |
print("$$$$$ ENTER INTO getEmbeddings $$$$$") | |
modelPath="mixedbread-ai/mxbai-embed-large-v1" | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
# Create a dictionary with model configuration options, specifying to use the CPU for computations | |
model_kwargs = {'device': device} # cuda/cpu | |
# Create a dictionary with encoding options, specifically setting 'normalize_embeddings' to False | |
encode_kwargs = {'normalize_embeddings': False} | |
embedding = HuggingFaceEmbeddings( | |
model_name=modelPath, # Provide the pre-trained model's path | |
model_kwargs=model_kwargs, # Pass the model configuration options | |
encode_kwargs=encode_kwargs # Pass the encoding options | |
) | |
print("Embedding ", embedding) | |
print("@@@@@@ EXIT FROM getEmbeddings @@@@@") | |
return embedding | |
#################################### | |
def getLLM(): | |
print("$$$$$ ENTER INTO getLLM $$$$$") | |
llm = HuggingFaceEndpoint( | |
repo_id="HuggingFaceH4/zephyr-7b-beta", | |
#repo_id="chsubhasis/ai-tutor-towardsai", | |
task="text-generation", | |
max_new_tokens = 512, | |
top_k = 10, | |
temperature = 0.1, | |
repetition_penalty = 1.03, | |
) | |
print("llm ", llm) | |
print("Who is the CEO of Apple? ", llm.invoke("Who is the CEO of Apple?")) #test | |
print("@@@@@@ EXIT FROM getLLM @@@@@") | |
return llm | |
#################################### | |
def is_chroma_db_present(directory: str): | |
""" | |
Check if the directory exists and contains any files. | |
""" | |
return os.path.exists(directory) and len(os.listdir(directory)) > 0 | |
#################################### | |
def getRetiriver(): | |
print("$$$$$ ENTER INTO getRetiriver $$$$$") | |
if is_chroma_db_present(persist_directory): | |
print(f"Chroma vector DB found in '{persist_directory}' and will be loaded.") | |
# Load vector store from the local directory | |
#vectordb = Chroma(persist_directory=persist_directory) | |
vectordb = Chroma( | |
persist_directory=persist_directory, | |
embedding_function=getEmbeddings(), | |
collection_name="ai_tutor") | |
else: | |
vectordb = Chroma.from_documents( | |
collection_name="ai_tutor", | |
documents=getDocSplitter(), # splits we created earlier | |
embedding=getEmbeddings(), | |
persist_directory=persist_directory, # save the directory | |
) | |
print("vectordb collection count ", vectordb._collection.count()) | |
docs = vectordb.search("What is Artificial Intelligence", search_type="mmr", k=5) | |
for i in range(len(docs)): | |
print(docs[i].page_content) | |
metadata_filter = { | |
"result": "llama" # ChromaDB will perform a substring search | |
} | |
retriever = vectordb.as_retriever(search_type="mmr", search_kwargs={"k": 3, "fetch_k":5, "filter": metadata_filter}) | |
print("retriever ", retriever) | |
print("@@@@@@ EXIT FROM getRetiriver @@@@@") | |
return retriever | |
#################################### | |
def get_rag_response(query): | |
print("$$$$$ ENTER INTO get_rag_response $$$$$") | |
qa_chain = RetrievalQA.from_chain_type( | |
llm=getLLM(), | |
chain_type="stuff", | |
retriever=getRetiriver(), | |
return_source_documents=True | |
) | |
#RAG Evaluation | |
# Sample dataset of questions and expected answers | |
dataset = [ | |
{"question": "Who is the CEO of Meta?", "expected_answer": "Mark Zuckerberg"}, | |
{"question": "Who is the CEO of Apple?", "expected_answer": "Tiiiiiim Coooooook"}, | |
] | |
hit_rate, mrr = evaluate_rag(qa_chain, dataset) | |
print(f"Hit Rate: {hit_rate:.2f}, Mean Reciprocal Rank (MRR): {mrr:.2f}") | |
result = qa_chain({"query": query}) | |
print("Result ",result) | |
print("@@@@@@ EXIT FROM get_rag_response @@@@@") | |
return result["result"] | |
#################################### | |
def evaluate_rag(qa, dataset): | |
print("$$$$$ ENTER INTO evaluate_rag $$$$$") | |
hits = 0 | |
reciprocal_ranks = [] | |
for entry in dataset: | |
question = entry["question"] | |
expected_answer = entry["expected_answer"] | |
# Get the answer from the RAG system | |
response = qa({"query": question}) | |
answer = response["result"] | |
# Check if the answer matches the expected answer | |
if expected_answer.lower() in answer.lower(): | |
hits += 1 | |
reciprocal_ranks.append(1) # Hit at rank 1 | |
else: | |
reciprocal_ranks.append(0) | |
# Calculate Hit Rate and MRR | |
hit_rate = hits / len(dataset) | |
mrr = np.mean(reciprocal_ranks) | |
print("@@@@@@ EXIT FROM evaluate_rag @@@@@") | |
return hit_rate, mrr | |
#################################### | |
def launch_ui(): | |
print("$$$$$ ENTER INTO launch_ui $$$$$") | |
# Input from user | |
in_question = gradio.Textbox(lines=10, placeholder=None, value="query", label='Enter your query') | |
# Output prediction | |
out_response = gradio.Textbox(type="text", label='RAG Response') | |
# Gradio interface to generate UI | |
iface = gradio.Interface(fn = get_rag_response, | |
inputs = [in_question], | |
outputs = [out_response], | |
title = "RAG Response", | |
description = "Write the query and get the response from the RAG system", | |
allow_flagging = 'never') | |
iface.launch(share = True) | |
#################################### | |
if __name__ == "__main__": | |
launch_ui() |