Spaces:
Running
Running
feat: update config.yaml and chatbot logic to enhance budget handling and improve message formatting
ab998fb
import os | |
from dotenv import load_dotenv | |
from .ConnectorStrategy import ConnectorStrategy | |
from pinecone import Pinecone, ServerlessSpec | |
from langchain_openai import OpenAIEmbeddings | |
from langchain_pinecone import PineconeVectorStore | |
from langchain_core.documents import Document | |
import unicodedata | |
import time | |
class PineconeConnector(ConnectorStrategy): | |
def __init__(self): | |
load_dotenv() | |
pinecone_api_key = os.environ.get("PINECONE_API_KEY") | |
self.index_name = os.environ.get("PINECONE_INDEX_NAME") | |
self.namespace = os.environ.get("PINECONE_NAMESPACE") | |
pc = Pinecone(api_key=pinecone_api_key) | |
existing_indexes = [index_info["name"] for index_info in pc.list_indexes()] | |
if self.index_name not in existing_indexes: | |
pc.create_index( | |
name=self.index_name, | |
dimension=3072, | |
metric="cosine", | |
spec=ServerlessSpec(cloud="aws", region="us-east-1"), | |
) | |
while not pc.describe_index(self.index_name).status["ready"]: | |
time.sleep(1) | |
self.index = pc.Index(self.index_name) | |
def getDocs(self): | |
# Simulate getting docs from Pinecone | |
docs_names = [] | |
for ids in self.index.list(namespace=self.namespace): | |
for id in ids: | |
name_doc = "_".join(id.split("_")[:-1]) | |
if name_doc not in docs_names: | |
docs_names.append(name_doc) | |
return docs_names | |
def addDoc(self, filename, text_chunks, embedding): | |
try: | |
vector_store = PineconeVectorStore(index=self.index, embedding=embedding,namespace=self.namespace) | |
file_name = filename.split(".")[0].replace(" ","_").replace("-","_").replace(".","_").replace("/","_").replace("\\","_").strip() | |
documents = [] | |
uuids = [] | |
for i, chunk in enumerate(text_chunks): | |
clean_filename = remove_non_standard_ascii(file_name) | |
uuid = f"{clean_filename}_{i}" | |
document = Document( | |
page_content=chunk, | |
metadata={ "filename":filename, "chunk_id":uuid }, | |
) | |
uuids.append(uuid) | |
documents.append(document) | |
vector_store.add_documents(documents=documents, ids=uuids) | |
return {"filename_id":clean_filename} | |
except Exception as e: | |
print(e) | |
return False | |
def retriever(self, query, embedding): | |
vector_store = PineconeVectorStore(index=self.index, embedding=embedding,namespace=self.namespace) | |
retriever = vector_store.as_retriever( | |
search_type="similarity_score_threshold", | |
search_kwargs={"k": 5, "score_threshold": 0.6}, | |
) | |
return retriever.invoke(query) | |
def remove_non_standard_ascii(input_string: str) -> str: | |
normalized_string = unicodedata.normalize('NFKD', input_string) | |
return ''.join(char for char in normalized_string if 'a' <= char <= 'z' or 'A' <= char <= 'Z' or char.isdigit() or char in ' .,!?') | |