File size: 3,184 Bytes
a3d26e6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import os 
from dotenv import load_dotenv

from .ConnectorStrategy import ConnectorStrategy

from pinecone import Pinecone, ServerlessSpec
from langchain_openai import OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain_core.documents import Document

import unicodedata
import time

class PineconeConnector(ConnectorStrategy):
    def __init__(self):

        load_dotenv()

        pinecone_api_key = os.environ.get("PINECONE_API_KEY")

        self.index_name = os.environ.get("PINECONE_INDEX_NAME")
        self.namespace = os.environ.get("PINECONE_NAMESPACE")


        pc = Pinecone(api_key=pinecone_api_key)

        existing_indexes = [index_info["name"] for index_info in pc.list_indexes()]

        if self.index_name not in existing_indexes:
            pc.create_index(
                name=self.index_name,
                dimension=3072,
                metric="cosine",
                spec=ServerlessSpec(cloud="aws", region="us-east-1"),
            )
            while not pc.describe_index(self.index_name).status["ready"]:
                time.sleep(1)

        self.index = pc.Index(self.index_name)

    
    def getDocs(self):
        # Simulate getting docs from Pinecone
        
        docs_names = []
        for ids in self.index.list(namespace=self.namespace):
            for id in ids:
                name_doc = "_".join(id.split("_")[:-1])
                if name_doc not in docs_names:
                    docs_names.append(name_doc)

        return docs_names
    
    
    def addDoc(self, filename, text_chunks, embedding):
        try:
            vector_store = PineconeVectorStore(index=self.index, embedding=embedding,namespace=self.namespace)

            file_name = filename.split(".")[0].replace(" ","_").replace("-","_").replace(".","_").replace("/","_").replace("\\","_").strip()

            documents = []
            uuids = []

            for i, chunk in enumerate(text_chunks):
                clean_filename = remove_non_standard_ascii(file_name)
                uuid = f"{clean_filename}_{i}"

                document = Document(
                    page_content=chunk,
                    metadata={ "filename":filename, "chunk_id":uuid },
                )

                uuids.append(uuid)
                documents.append(document)
            

            vector_store.add_documents(documents=documents, ids=uuids)

            return {"filename_id":clean_filename}
        
        except Exception as e:
            print(e)
            return False
    
    def retriever(self, query, embedding):

        vector_store = PineconeVectorStore(index=self.index, embedding=embedding,namespace=self.namespace)

        retriever = vector_store.as_retriever(
            search_type="similarity_score_threshold",
            search_kwargs={"k": 3, "score_threshold": 0.6},
        )

        return retriever.invoke(query)
    

def remove_non_standard_ascii(input_string: str) -> str:
    normalized_string = unicodedata.normalize('NFKD', input_string)
    return ''.join(char for char in normalized_string if 'a' <= char <= 'z' or 'A' <= char <= 'Z' or char.isdigit() or char in ' .,!?')