Spaces:
Runtime error
Runtime error
import gc | |
import os | |
import time | |
import re | |
import numpy as np | |
import torch | |
import bm25s | |
from langchain_community.document_loaders import PyMuPDFLoader | |
from langchain_core.documents import Document | |
from langchain_community.embeddings.sentence_transformer import ( | |
SentenceTransformerEmbeddings, | |
) | |
from langchain_community.vectorstores import Chroma | |
from langchain_text_splitters import RecursiveCharacterTextSplitter | |
from langchain_core.runnables import RunnablePassthrough | |
from langchain_groq import ChatGroq | |
from langchain_openai import ChatOpenAI | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
from langchain_anthropic import ChatAnthropic | |
from dotenv import load_dotenv | |
from langchain_core.output_parsers import XMLOutputParser | |
from langchain.prompts import ChatPromptTemplate | |
from langchain_community.cross_encoders import HuggingFaceCrossEncoder | |
from langchain.retrievers import ContextualCompressionRetriever | |
from langchain.retrievers.document_compressors import CrossEncoderReranker | |
from langchain_core.messages import HumanMessage | |
from langchain.retrievers import EnsembleRetriever | |
from langchain_community.retrievers import BM25Retriever | |
load_dotenv() | |
# suppress grpc and glog logs for gemini | |
os.environ["GRPC_VERBOSITY"] = "ERROR" | |
os.environ["GLOG_minloglevel"] = "2" | |
# RAG parameters | |
CHUNK_SIZE = 1024 | |
CHUNK_OVERLAP = CHUNK_SIZE // 8 | |
K = 20 # number of chunks to retrieve from semantic search | |
FETCH_K = 50 | |
N_BM25 = 20 # number of chunks to retrieve from keyword search | |
TOP_N = 10 # final number of chunks to keep | |
model_kwargs = {"device": "cuda:1"} | |
print("Loading embedding and reranker models...") | |
embedding_function = SentenceTransformerEmbeddings( | |
model_name="mixedbread-ai/mxbai-embed-large-v1", model_kwargs=model_kwargs | |
) | |
# "sentence-transformers/all-MiniLM-L6-v2" | |
# "mixedbread-ai/mxbai-embed-large-v1" | |
reranker = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-base", model_kwargs=model_kwargs) | |
compressor = CrossEncoderReranker(model=reranker, top_n=TOP_N) | |
llm_model_translation = { | |
"LLaMA 3": "llama3-70b-8192", | |
"OpenAI GPT 4o Mini": "gpt-4o-mini", | |
"OpenAI GPT 4o": "gpt-4o", | |
"OpenAI GPT 4": "gpt-4-turbo", | |
"Gemini 1.5 Pro": "gemini-1.5-pro", | |
"Claude Sonnet 3.5": "claude-3-5-sonnet-20240620", | |
} | |
llm_classes = { | |
"llama3-70b-8192": ChatGroq, | |
"gpt-4o-mini": ChatOpenAI, | |
"gpt-4o": ChatOpenAI, | |
"gpt-4-turbo": ChatOpenAI, | |
"gemini-1.5-pro": ChatGoogleGenerativeAI, | |
"claude-3-5-sonnet-20240620": ChatAnthropic, | |
} | |
xml_system = """You're a helpful AI assistant. Given a user prompt and some related sources, fulfill all the requirements \ | |
of the prompt and provide citations. If a chunk of the generated text does not use any of the sources (for example, \ | |
introductions or general text), don't put a citation for that chunk and just leave "citations" section empty. Otherwise, \ | |
list all sources used for that chunk of the text. Remember, don't add inline citations in the text itself in any circumstant. | |
Add all citations to the separate citations section. Use explicit new lines in the text to show paragraph splits. For each chunk use this example format: | |
<chunk> | |
<text>This is a sample text chunk....</text> | |
<citations> | |
<citation>1</citation> | |
<citation>3</citation> | |
... | |
</citations> | |
</chunk> | |
If the prompt asks for a reference section, add it in a chunk without any citations | |
Return a citation for every quote across all articles that justify the text. Remember use the following format for your final output: | |
<cited_text> | |
<chunk> | |
<text></text> | |
<citations> | |
<citation><source_id></source_id></citation> | |
... | |
</citations> | |
</chunk> | |
<chunk> | |
<text></text> | |
<citations> | |
<citation><source_id></source_id></citation> | |
... | |
</citations> | |
</chunk> | |
... | |
</cited_text> | |
The entire text should be wrapped in one cited_text. For References section (if asked by prompt), don't add citations. | |
For source id, give a valid integer alone without a key. | |
Here are the sources:{context}""" | |
xml_prompt = ChatPromptTemplate.from_messages([("system", xml_system), ("human", "{input}")]) | |
def format_docs_xml(docs: list[Document]) -> str: | |
formatted = [] | |
for i, doc in enumerate(docs): | |
doc_str = f"""\ | |
<source id=\"{i}\"> | |
<path>{doc.metadata['source']}</path> | |
<article_snippet>{doc.page_content}</article_snippet> | |
</source>""" | |
formatted.append(doc_str) | |
return "\n\n<sources>" + "\n".join(formatted) + "</sources>" | |
def get_doc_content(docs, id): | |
return docs[id].page_content | |
def remove_citations(text): | |
text = re.sub(r"<\d+>", "", text) | |
return text | |
def display_cited_text(data): | |
combined_text = "" | |
citations = {} | |
# Iterate through the cited_text list | |
if "cited_text" in data: | |
for item in data["cited_text"]: | |
if "chunk" in item and len(item["chunk"]) > 0: | |
chunk_text = item["chunk"][0].get("text") | |
combined_text += chunk_text | |
citation_ids = [] | |
# Process the citations for the chunk | |
if len(item["chunk"]) > 1 and item["chunk"][1]["citations"]: | |
for c in item["chunk"][1]["citations"]: | |
if c and "citation" in c: | |
citation = c["citation"] | |
if isinstance(citation, dict) and "source_id" in citation: | |
citation = citation["source_id"] | |
if isinstance(citation, str): | |
try: | |
citation_ids.append(int(citation)) | |
except ValueError: | |
pass # Handle cases where the string is not a valid integer | |
if citation_ids: | |
citation_texts = [f"<{cid}>" for cid in citation_ids] | |
combined_text += " " + "".join(citation_texts) | |
combined_text += "\n\n" | |
return combined_text | |
def get_citations(data, docs): | |
# Initialize variables for the combined text and a dictionary for citations | |
citations = {} | |
# Iterate through the cited_text list | |
if data.get("cited_text"): | |
for item in data["cited_text"]: | |
citation_ids = [] | |
if "chunk" in item and len(item["chunk"]) > 1 and item["chunk"][1].get("citations"): | |
for c in item["chunk"][1]["citations"]: | |
if c and "citation" in c: | |
citation = c["citation"] | |
if isinstance(citation, dict) and "source_id" in citation: | |
citation = citation["source_id"] | |
if isinstance(citation, str): | |
try: | |
citation_ids.append(int(citation)) | |
except ValueError: | |
pass # Handle cases where the string is not a valid integer | |
# Store unique citations in a dictionary | |
for citation_id in citation_ids: | |
if citation_id not in citations: | |
citations[citation_id] = { | |
"source": docs[citation_id].metadata["source"], | |
"content": docs[citation_id].page_content, | |
} | |
return citations | |
def citations_to_html(citations): | |
if citations: | |
# Generate the HTML for the unique citations | |
html_content = "" | |
for citation_id, citation_info in citations.items(): | |
html_content += ( | |
f"<li><strong>Source ID:</strong> {citation_id}<br>" | |
f"<strong>Path:</strong> {citation_info['source']}<br>" | |
f"<strong>Page Content:</strong> {citation_info['content']}</li>" | |
) | |
html_content += "</ul></body></html>" | |
return html_content | |
return "" | |
def load_llm(model: str, api_key: str, temperature: float = 1.0, max_length: int = 2048): | |
model_name = llm_model_translation.get(model) | |
llm_class = llm_classes.get(model_name) | |
if not llm_class: | |
raise ValueError(f"Model {model} not supported.") | |
try: | |
llm = llm_class(model_name=model_name, temperature=temperature, max_tokens=max_length) | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
llm = None | |
return llm | |
def create_db_with_langchain(path: list[str], url_content: dict, yt_content: dict, query: str): | |
all_docs = [] | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=CHUNK_SIZE, | |
chunk_overlap=CHUNK_OVERLAP, | |
separators=[ | |
"\n\n", | |
"\n", | |
".", | |
"\uff0e", # Fullwidth full stop | |
"\u3002", # Ideographic full stop | |
"?", | |
"!", | |
",", | |
"\uff0c", # Fullwidth comma | |
"\u3001", # Ideographic comma | |
" ", | |
"\u200B", # Zero-width space | |
"", | |
], | |
keep_separator=True, | |
is_separator_regex=False, | |
length_function=len, | |
add_start_index=False, | |
) | |
if path: | |
for file in path: | |
loader = PyMuPDFLoader(file) | |
data = loader.load() | |
# split it into chunks | |
docs = text_splitter.split_documents(data) | |
all_docs.extend(docs) | |
# Internet Search | |
if url_content: | |
for url, content in url_content.items(): | |
doc = Document(page_content=content, metadata={"source": url}) | |
# split it into chunks | |
docs = text_splitter.split_documents([doc]) | |
all_docs.extend(docs) | |
# YouTube Transcriptions | |
if yt_content: | |
for yt_url, content in yt_content.items(): | |
doc = Document(page_content=content, metadata={"source": yt_url}) | |
# split it into chunks | |
docs = text_splitter.split_documents([doc]) | |
all_docs.extend(docs) | |
print(f"### Total number of documents before bm25s: {len(all_docs)}") | |
# if the number of docs is too high, we need to reduce it | |
num_max_docs = 300 | |
if len(all_docs) > num_max_docs: | |
docs_raw = [doc.page_content for doc in all_docs] | |
retriever = bm25s.BM25(corpus=docs_raw) | |
retriever.index(bm25s.tokenize(docs_raw)) | |
results, scores = retriever.retrieve(bm25s.tokenize(query), k=len(docs_raw), sorted=False) | |
top_indices = np.argpartition(scores[0], -num_max_docs)[-num_max_docs:] | |
all_docs = [all_docs[i] for i in top_indices] | |
# print docs | |
for idx, doc in enumerate(all_docs): | |
print(f"Doc: {idx} | Length = {len(doc.page_content)}") | |
bm25_retriever = BM25Retriever.from_documents(all_docs) | |
bm25_retriever.k = N_BM25 | |
assert len(all_docs) > 0, "No PDFs or scrapped data provided" | |
db = Chroma.from_documents(all_docs, embedding_function) | |
torch.cuda.empty_cache() | |
gc.collect() | |
return db, bm25_retriever | |
def pretty_print_docs(docs): | |
print(f"\n{'-' * 100}\n".join([f"Document {i+1}:\n\n" + d.page_content for i, d in enumerate(docs)])) | |
def generate_rag( | |
prompt: str, | |
input_role: str, | |
topic: str, | |
context: str, | |
model: str, | |
url_content: dict, | |
path: list[str], | |
temperature: float = 1.0, | |
max_length: int = 2048, | |
api_key: str = "", | |
sys_message="", | |
yt_content=None, | |
): | |
llm = load_llm(model, api_key, temperature, max_length) | |
if llm is None: | |
print("Failed to load LLM. Aborting operation.") | |
return None | |
query = llm_wrapper(input_role, topic, context, model="OpenAI GPT 4o", task_type="rag", temperature=0.7) | |
print("### Query: ", query) | |
db, bm25_retriever = create_db_with_langchain(path, url_content, yt_content, query) | |
retriever = db.as_retriever(search_type="mmr", search_kwargs={"k": K, "fetch_k": FETCH_K, "lambda_mult": 0.75}) | |
t0 = time.time() | |
ensemble_retriever = EnsembleRetriever(retrievers=[bm25_retriever, retriever], weights=[0.4, 0.6]) | |
compression_retriever = ContextualCompressionRetriever(base_compressor=compressor, base_retriever=ensemble_retriever) | |
docs = compression_retriever.invoke(query) | |
t1 = time.time() | |
print(f"Time for retrieval : {t1 - t0:.2f}s") | |
print(pretty_print_docs(docs)) | |
formatted_docs = format_docs_xml(docs) | |
rag_chain = RunnablePassthrough.assign(context=lambda _: formatted_docs) | xml_prompt | llm | XMLOutputParser() | |
result = rag_chain.invoke({"input": prompt}) | |
citations = get_citations(result, docs) | |
db.delete_collection() # important, othwerwise it will keep the documents in memory | |
torch.cuda.empty_cache() | |
gc.collect() | |
return result, citations | |
def generate_base( | |
prompt: str, topic: str, model: str, temperature: float, max_length: int, api_key: str, sys_message="" | |
): | |
llm = load_llm(model, api_key, temperature, max_length) | |
if llm is None: | |
print("Failed to load LLM. Aborting operation.") | |
return None, None | |
try: | |
output = llm.invoke(prompt).content | |
output_dict = {"cited_text": [{"chunk": [{"text": output}, {"citations": None}]}]} | |
return output_dict, None | |
except Exception as e: | |
print(f"An error occurred while running the model: {e}") | |
return None, None | |
def generate( | |
prompt: str, | |
input_role: str, | |
topic: str, | |
context: str, | |
model: str, | |
url_content: dict, | |
path: list[str], | |
temperature: float = 1.0, | |
max_length: int = 2048, | |
api_key: str = "", | |
sys_message="", | |
yt_content=None, | |
): | |
if path or url_content or yt_content: | |
return generate_rag( | |
prompt, input_role, topic, context, model, url_content, path, temperature, max_length, api_key, sys_message, yt_content | |
) | |
else: | |
return generate_base(prompt, topic, model, temperature, max_length, api_key, sys_message) | |
def llm_wrapper( | |
iam=None, | |
topic=None, | |
context=None, | |
temperature=1.0, | |
max_length=512, | |
api_key="", | |
model="OpenAI GPT 4o Mini", | |
task_type="internet", | |
): | |
llm = load_llm(model, api_key, temperature, max_length) | |
if task_type == "rag": | |
system_message_content = """You are an AI assistant tasked with reformulating user inputs to improve retrieval query in a RAG system. | |
- Given the original user inputs, construct query to be more specific, detailed, and likely to retrieve relevant information. | |
- Generate the query as a complete sentence or question, not just as keywords, to ensure the retrieval process can find detailed and contextually relevant information. | |
- You may enhance the query by adding related and relevant terms, but do not introduce new facts, such as dates, numbers, or assumed information, that were not provided in the input. | |
**Inputs:** | |
- **User Role**: {iam} | |
- **Topic**: {topic} | |
- **Context**: {context} | |
**Only return the search query**.""" | |
elif task_type == "internet": | |
system_message_content = """You are an AI assistant tasked with generating an optimized Google search query to help retrieve relevant websites, news, articles, and other sources of information. | |
- You may enhance the query by adding related and relevant terms, but do not introduce new facts, such as dates, numbers, or assumed information, that were not provided in the input. | |
- The query should be **concise** and include important **keywords** while incorporating **short phrases** or context where it improves the search. | |
- Avoid the use of "site:" operators or narrowing search by specific websites. | |
**Inputs:** | |
- **User Role**: {iam} | |
- **Topic**: {topic} | |
- **Context**: {context} | |
**Only return the search query**. | |
""" | |
else: | |
raise ValueError("Task type not recognized. Please specify 'rag' or 'internet'.") | |
human_message = HumanMessage(content=system_message_content.format(iam=iam, topic=topic, context=context)) | |
response = llm.invoke([human_message]) | |
return response.content.strip('"').strip("'") | |