ashishanand's picture
citation improved
026406c
raw
history blame
10 kB
# app.py
import os
import re
import torch
# import pdfplumber
from chromadb.utils import embedding_functions
from rerankers import Reranker
# from transformers import GPT2TokenizerFast
from groq import Groq
from chromadb import PersistentClient
import gradio as gr
# Retrieve the API key from environment variables (Hugging Face Secrets)
groq_api_key = os.environ.get('GROQ_API_KEY')
# Initialize the chat client with the API key
chat_client = Groq(api_key=groq_api_key)
model = "llama-3.2-90b-text-preview"
def edit_text(text):
# Find all citations and their positions
citation_matches = list(re.finditer(r'\[(\d+)\]', text))
# List to store indices of citations to remove
indices_to_remove = []
prev_num = None
prev_index = None
# Identify consecutive duplicate citations
for i in range(len(citation_matches)):
current_citation = citation_matches[i]
current_num = current_citation.group(1)
if prev_num == current_num:
# Mark the previous citation for removal
indices_to_remove.append(prev_index)
prev_num = current_num
prev_index = i
# Reconstruct the text with modifications
output_parts = []
last_end = 0
for i in range(len(citation_matches)):
m = citation_matches[i]
start, end = m.span()
if i in indices_to_remove:
# Remove citation
output_parts.append(text[last_end:start])
else:
# Keep and modify citation
output_parts.append(text[last_end:start])
page_num = m.group(1)
new_citation = '[Page ' + page_num + ']'
output_parts.append(new_citation)
last_end = end
# Append any remaining text after the last citation
output_parts.append(text[last_end:])
modified_text = ''.join(output_parts)
return modified_text
# def parse_pdf(pdf_path):
# texts = []
# with pdfplumber.open(pdf_path) as pdf:
# for page_num, page in enumerate(pdf.pages, start=1):
# text = page.extract_text()
# if text:
# texts.append({
# 'text': text,
# 'metadata': {
# 'page_number': page_num
# }
# })
# return texts
# def preprocess_text(text):
# # ... (same as your original function)
# text = re.sub(r'\s+', ' ', text)
# text = text.strip()
# return text
def call_Llama_api(query, context):
# ... (same as your original function)
chat_completion = chat_client.chat.completions.create(
messages=[
{
"role": "system",
"content": "You are a car technician. Given the user's question and relevant excerpts from different car manuals, answer the question by including direct quotes from the correct car manual. Be concise and to the point in your response."
},
{
"role": "user",
"content": "User Question: " + query + "\n\nRelevant Excerpt(s):\n\n" + context,
}
],
temperature=0.6,
max_tokens=200,
top_p=1,
stream=False,
stop=None,
model=model
)
response = chat_completion.choices[0].message.content
return response
# def chunk_texts(texts, max_tokens=500, overlap_tokens=50):
# """
# Splits texts into chunks based on paragraphs with overlap to preserve context.
# """
# global tokenizer
# chunks = []
# for item in texts:
# text = preprocess_text(item['text'])
# if not text:
# continue
# metadata = item['metadata']
# # Split text into paragraphs
# paragraphs = text.split('\n\n')
# current_chunk = ''
# current_tokens = 0
# for i, paragraph in enumerate(paragraphs):
# paragraph = paragraph.strip()
# if not paragraph:
# continue
# paragraph_tokens = len(tokenizer.encode(paragraph))
# if current_tokens + paragraph_tokens <= max_tokens:
# current_chunk += paragraph + '\n\n'
# current_tokens += paragraph_tokens
# else:
# # Save the current chunk
# chunk = {
# 'text': current_chunk.strip(),
# 'metadata': metadata
# }
# chunks.append(chunk)
# # Start a new chunk with overlap
# overlap_text = ' '.join(current_chunk.split()[-overlap_tokens:])
# current_chunk = overlap_text + ' ' + paragraph + '\n\n'
# current_tokens = len(tokenizer.encode(current_chunk))
# if current_chunk:
# chunk = {
# 'text': current_chunk.strip(),
# 'metadata': metadata
# }
# chunks.append(chunk)
# return chunks
def is_car_model_available(query, available_models):
# ... (same as your original function)
for model in available_models:
if model.lower() in query.lower():
return model
return None
# def extract_car_model(pdf_filename):
# base_name = os.path.basename(pdf_filename)
# match = re.search(r'manual_(.+)\.pdf', base_name)
# if match:
# model_name = match.group(1).replace('_', ' ').title()
# return model_name
# else:
# return 'Unknown Model'
def colbert_rerank(query=None, chunks=None):
# ... (same as your original function)
d = ranker.rank(query=query, docs=chunks)
reranked_chunks = [d[i].text for i in range(len(chunks))]
return reranked_chunks
def process_query(query):
# Use global variables
global available_car_models, collection
print("Input Query:",query)
print(type(query))
car_model = is_car_model_available(query, available_car_models)
if not car_model:
return "The manual for the specified car model is not present."
# Initial retrieval from ChromaDB
results = collection.query(
query_texts=[query],
n_results=50,
where={"car_model": car_model},
include=['documents', 'metadatas']
)
if not results['documents']:
return "No relevant information found in the manual."
# Extract chunks and metadata
pre_chunks = results['documents'][0]
metadatas = results['metadatas'][0]
chunks = [f'Page {y["page_number"]}:: {x}' for x,y in zip(pre_chunks,metadatas)]
reranked_chunks = colbert_rerank(query, chunks)
final_context = " ".join(reranked_chunks[:10])
answer = call_Llama_api(query, final_context)
last_complete = answer.rfind('.')
# last_newline = answer.rfind('\n')
# last_complete = max(last_period, last_newline)
if last_complete != -1:
answer = answer[:last_complete + 1].strip()
answer = edit_text(answer)
# Prepare citations
# citations = [
# f"Page {meta.get('page_number', 'N/A')}" for meta in metadatas[:5]
# ]
# citations_text = "Pages cited from:\n" + "\n".join(citations)
# return f"{answer}\n\n{citations_text}"
return answer
# Initialize global variables
def initialize():
global collection, available_car_models, ranker
# Check for CUDA availability
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")
# tokenizer = GPT2TokenizerFast.from_pretrained("gpt2") # For token counting
# Initialize embedding model
embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name="all-MiniLM-L12-v2", device=device
)
client = PersistentClient(path="./chromadb")
# Get the collection
collection_name = "car_manuals5"
# if collection_name in [col.name for col in client.list_collections()]:
# collection = client.get_collection(
# name=collection_name,
# embedding_function=embedding_function
# )
available_car_models = ['TIAGO', 'Astor']
# else:
collection = client.get_collection(
name=collection_name,
embedding_function=embedding_function
)
# collection = client.get_or_create_collection(
# name=collection_name,
# embedding_function=embedding_function
# )
# Set available car models
# available_car_models = ['TIAGO', 'Astor']
# pdf_files = ['./car_manuals/manual_Tiago.pdf', './car_manuals/manual_Astor.pdf']
# available_car_models = []
# for pdf_file in pdf_files:
# print(f"Parsing {pdf_file}...")
# pdf_texts = parse_pdf(pdf_file)
# car_model = extract_car_model(pdf_file)
# available_car_models.append(car_model)
# # Add car model to metadata
# for item in pdf_texts:
# item['metadata']['car_model'] = car_model
# # Chunk texts using the refined strategy
# chunks = chunk_texts(pdf_texts, max_tokens=500, overlap_tokens=50)
# # Prepare data for ChromaDB
# documents = [chunk['text'] for chunk in chunks]
# metadatas = [chunk['metadata'] for chunk in chunks]
# ids = [f"{car_model}_{i}" for i in range(len(documents))]
# # Add to ChromaDB collection
# collection.add(
# documents=documents,
# metadatas=metadatas,
# ids=ids
# )
# Initialize the ranker
ranker = Reranker("answerdotai/answerai-colbert-small-v1", model_type='colbert')
# Call initialize function
initialize()
# Set up the Gradio interface
iface = gr.Interface(
fn=process_query,
inputs=gr.Textbox(lines=2, placeholder='Enter your question here...'),
outputs='text',
title='Car Manual Assistant',
description='Ask a question about Tata Tiago or MG Astor.',
)
if __name__ == "__main__":
# iface.launch(server_name="0.0.0.0", server_port=7860)
iface.launch()