File size: 4,716 Bytes
74b1bac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import logging
from langchain_core.documents import Document
from typing import List
from langchain_core.output_parsers import BaseOutputParser
from qdrant_client import QdrantClient, models
import requests
# from langchain_cohere import CohereRerank




# def format_docs(docs: List[Document]) -> str:
#     """Convert Documents to a single string."""
#     formatted = [
#         f"Article Content: {doc.metadata['Header 1']}"
#         + (f" - {doc.metadata['Header 2']}" if 'Header 2' in doc.metadata and doc.metadata['Header 2'] else "")
#         + (f" - {doc.metadata['Header 3']}" if 'Header 3' in doc.metadata and doc.metadata['Header 3'] else "")
#         + f"\n{doc.page_content}"
#         for doc in docs
#     ]
#     return "\n" + "\n".join(formatted)


def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


class LineListOutputParser(BaseOutputParser[List[str]]):
    """Output parser for a list of lines."""

    def parse(self, text: str) -> List[str]:
        lines = text.strip().split("\n")
        return list(filter(None, lines))  # Remove empty lines

def extract_metadata(docs, headers=('Header_1', 'Header_2', 'Header_3')):
    meta_data_docs = []
    for doc in docs:
        meta_data_doc = [doc.metadata[header] for header in headers if doc.metadata.get(header)]
        meta_data_docs.append(meta_data_doc)
    return meta_data_docs

def search_with_filter(query, vector_store, k, headers):
    conditions = []
    
    # Xử lý điều kiện theo số lượng headers
    if len(headers) == 1:
        conditions.append(
            models.FieldCondition(
                key="metadata.Header_1",
                match=models.MatchValue(
                    value=headers[0]
                ),
            )
        )
    elif len(headers) == 2:
        conditions.append(
            models.FieldCondition(
                key="metadata.Header_1",
                match=models.MatchValue(
                    value=headers[0]
                ),
            )
        )
        conditions.append(
            models.FieldCondition(
                key="metadata.Header_2",
                match=models.MatchValue(
                    value=headers[1]
                ),
            )
        )
    elif len(headers) == 3:
        conditions.append(
            models.FieldCondition(
                key="metadata.Header_1",
                match=models.MatchValue(
                    value=headers[0]
                ),
            )
        )
        conditions.append(
            models.FieldCondition(
                key="metadata.Header_2",
                match=models.MatchValue(
                    value=headers[1]
                ),
            )
        )
        conditions.append(
            models.FieldCondition(
                key="metadata.Header_3",
                match=models.MatchValue(
                    value=headers[2]
                ),
            )
        )

    # Thực hiện truy vấn với các điều kiện
    single_result = vector_store.similarity_search(
        query=query,
        k=k,
        filter=models.Filter(
            must=conditions
        ),
    )
    
    return single_result

def get_relevant_documents(documents: List[Document], limit: int) -> List[Document]:
    result = []
    seen = set()
    for doc in documents:
        if doc.page_content not in seen:
            result.append(doc)
            seen.add(doc.page_content)
        if len(result) == limit:
            break
    return result



def translate(text: str) -> str:
        url = "https://translate.googleapis.com/translate_a/single"
        params = {
            "client": "gtx",
            "sl": "vi",
            "tl": "en",
            "dt": "t",
            "q": text,
        }

        try:
            r = requests.get(
                url, params=params, timeout=10
            )  # Add timeout for robustness
            r.raise_for_status()
            result = r.json()
            translated_text = "".join([sentence[0] for sentence in result[0]])
            return translated_text
        except requests.exceptions.RequestException as e:
            error_msg = f"Translation API error: {str(e)}"
            logging.error(error_msg)
            return f"{text}\n\n[Translation failed: {error_msg}]"
        except Exception as e:
            error_msg = f"Unexpected error during translation: {str(e)}"
            logging.exception(error_msg)  # Log traceback for unexpected errors
            return f"{text}\n\n[Translation failed: {error_msg}]"