Spaces:
Running
Running
File size: 7,924 Bytes
a21db6e f7c3d27 3aa009f f7c3d27 a21db6e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
import os
from getpass import getpass
import csv
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
#from langchain.schema import Document
from langchain_huggingface import HuggingFaceEmbeddings
import torch
from langchain_huggingface import HuggingFaceEndpoint
from langchain_community.cache import InMemoryCache
from langchain.globals import set_llm_cache
from langchain_chroma import Chroma
from langchain.chains import RetrievalQA
import numpy as np
import gradio
import sqlite3
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
#hfapi_key = getpass("Enter you HuggingFace access token:")
hfapi_key = os.getenv("Mytoken")
if not hfapi_key:
raise ValueError("HUGGINGFACE_API_KEY not found in environment variables")
os.environ["HF_TOKEN"] = hfapi_key
os.environ["HUGGINGFACEHUB_API_TOKEN"] = hfapi_key
set_llm_cache(InMemoryCache())
persist_directory = 'docs/chroma/'
####################################
def load_file_as_JSON():
print("$$$$$ ENTER INTO load_file_as_JSON $$$$$")
rows = []
with open("mini-llama-articles.csv", mode="r", encoding="utf-8") as file:
csv_reader = csv.reader(file)
for idx, row in enumerate(csv_reader):
if idx == 0:
continue
# Skip header row
rows.append(row)
print("@@@@@@ EXIT FROM load_file_as_JSON @@@@@")
return rows
####################################
def get_documents():
print("$$$$$ ENTER INTO get_documents $$$$$")
documents = [
Document(
page_content=row[1], metadata={"title": row[0], "url": row[2], "source_name": row[3]}
)
for row in load_file_as_JSON()
]
print("documents lenght is ", len(documents))
print("first entry from documents ", documents[0])
print("document metadata ", documents[0].metadata)
print("@@@@@@ EXIT FROM get_documents @@@@@")
return documents
####################################
def getDocSplitter():
print("$$$$$ ENTER INTO getDocSplitter $$$$$")
text_splitter = RecursiveCharacterTextSplitter(
chunk_size = 512,
chunk_overlap = 128
)
splits = text_splitter.split_documents(get_documents())
print("Split length ", len(splits))
print("Page content ", splits[0].page_content)
print("@@@@@@ EXIT FROM getDocSplitter @@@@@")
return splits
####################################
def getEmbeddings():
print("$$$$$ ENTER INTO getEmbeddings $$$$$")
modelPath="mixedbread-ai/mxbai-embed-large-v1"
device = "cuda" if torch.cuda.is_available() else "cpu"
# Create a dictionary with model configuration options, specifying to use the CPU for computations
model_kwargs = {'device': device} # cuda/cpu
# Create a dictionary with encoding options, specifically setting 'normalize_embeddings' to False
encode_kwargs = {'normalize_embeddings': False}
embedding = HuggingFaceEmbeddings(
model_name=modelPath, # Provide the pre-trained model's path
model_kwargs=model_kwargs, # Pass the model configuration options
encode_kwargs=encode_kwargs # Pass the encoding options
)
print("Embedding ", embedding)
print("@@@@@@ EXIT FROM getEmbeddings @@@@@")
return embedding
####################################
def getLLM():
print("$$$$$ ENTER INTO getLLM $$$$$")
llm = HuggingFaceEndpoint(
repo_id="HuggingFaceH4/zephyr-7b-beta",
#repo_id="chsubhasis/ai-tutor-towardsai",
task="text-generation",
max_new_tokens = 512,
top_k = 10,
temperature = 0.1,
repetition_penalty = 1.03,
)
print("llm ", llm)
print("Who is the CEO of Apple? ", llm.invoke("Who is the CEO of Apple?")) #test
print("@@@@@@ EXIT FROM getLLM @@@@@")
return llm
####################################
def is_chroma_db_present(directory: str):
"""
Check if the directory exists and contains any files.
"""
return os.path.exists(directory) and len(os.listdir(directory)) > 0
####################################
def getRetiriver():
print("$$$$$ ENTER INTO getRetiriver $$$$$")
if is_chroma_db_present(persist_directory):
print(f"Chroma vector DB found in '{persist_directory}' and will be loaded.")
# Load vector store from the local directory
#vectordb = Chroma(persist_directory=persist_directory)
vectordb = Chroma(
persist_directory=persist_directory,
embedding_function=getEmbeddings(),
collection_name="ai_tutor")
else:
vectordb = Chroma.from_documents(
collection_name="ai_tutor",
documents=getDocSplitter(), # splits we created earlier
embedding=getEmbeddings(),
persist_directory=persist_directory, # save the directory
)
print("vectordb collection count ", vectordb._collection.count())
docs = vectordb.search("What is Artificial Intelligence", search_type="mmr", k=5)
for i in range(len(docs)):
print(docs[i].page_content)
metadata_filter = {
"result": "llama" # ChromaDB will perform a substring search
}
retriever = vectordb.as_retriever(search_type="mmr", search_kwargs={"k": 3, "fetch_k":5, "filter": metadata_filter})
print("retriever ", retriever)
print("@@@@@@ EXIT FROM getRetiriver @@@@@")
return retriever
####################################
def get_rag_response(query):
print("$$$$$ ENTER INTO get_rag_response $$$$$")
qa_chain = RetrievalQA.from_chain_type(
llm=getLLM(),
chain_type="stuff",
retriever=getRetiriver(),
return_source_documents=True
)
#RAG Evaluation
# Sample dataset of questions and expected answers
dataset = [
{"question": "Who is the CEO of Meta?", "expected_answer": "Mark Zuckerberg"},
{"question": "Who is the CEO of Apple?", "expected_answer": "Tiiiiiim Coooooook"},
]
hit_rate, mrr = evaluate_rag(qa_chain, dataset)
print(f"Hit Rate: {hit_rate:.2f}, Mean Reciprocal Rank (MRR): {mrr:.2f}")
result = qa_chain({"query": query})
print("Result ",result)
print("@@@@@@ EXIT FROM get_rag_response @@@@@")
return result["result"]
####################################
def evaluate_rag(qa, dataset):
print("$$$$$ ENTER INTO evaluate_rag $$$$$")
hits = 0
reciprocal_ranks = []
for entry in dataset:
question = entry["question"]
expected_answer = entry["expected_answer"]
# Get the answer from the RAG system
response = qa({"query": question})
answer = response["result"]
# Check if the answer matches the expected answer
if expected_answer.lower() in answer.lower():
hits += 1
reciprocal_ranks.append(1) # Hit at rank 1
else:
reciprocal_ranks.append(0)
# Calculate Hit Rate and MRR
hit_rate = hits / len(dataset)
mrr = np.mean(reciprocal_ranks)
print("@@@@@@ EXIT FROM evaluate_rag @@@@@")
return hit_rate, mrr
####################################
def launch_ui():
print("$$$$$ ENTER INTO launch_ui $$$$$")
# Input from user
in_question = gradio.Textbox(lines=10, placeholder=None, value="query", label='Enter your query')
# Output prediction
out_response = gradio.Textbox(type="text", label='RAG Response')
# Gradio interface to generate UI
iface = gradio.Interface(fn = get_rag_response,
inputs = [in_question],
outputs = [out_response],
title = "RAG Response",
description = "Write the query and get the response from the RAG system",
allow_flagging = 'never')
iface.launch(share = True)
####################################
if __name__ == "__main__":
launch_ui() |