|
import logging
|
|
from langchain_core.documents import Document
|
|
from typing import List
|
|
from langchain_core.output_parsers import BaseOutputParser
|
|
from qdrant_client import QdrantClient, models
|
|
import requests
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def format_docs(docs):
|
|
return "\n\n".join(doc.page_content for doc in docs)
|
|
|
|
|
|
class LineListOutputParser(BaseOutputParser[List[str]]):
|
|
"""Output parser for a list of lines."""
|
|
|
|
def parse(self, text: str) -> List[str]:
|
|
lines = text.strip().split("\n")
|
|
return list(filter(None, lines))
|
|
|
|
def extract_metadata(docs, headers=('Header_1', 'Header_2', 'Header_3')):
|
|
meta_data_docs = []
|
|
for doc in docs:
|
|
meta_data_doc = [doc.metadata[header] for header in headers if doc.metadata.get(header)]
|
|
meta_data_docs.append(meta_data_doc)
|
|
return meta_data_docs
|
|
|
|
def search_with_filter(query, vector_store, k, headers):
|
|
conditions = []
|
|
|
|
|
|
if len(headers) == 1:
|
|
conditions.append(
|
|
models.FieldCondition(
|
|
key="metadata.Header_1",
|
|
match=models.MatchValue(
|
|
value=headers[0]
|
|
),
|
|
)
|
|
)
|
|
elif len(headers) == 2:
|
|
conditions.append(
|
|
models.FieldCondition(
|
|
key="metadata.Header_1",
|
|
match=models.MatchValue(
|
|
value=headers[0]
|
|
),
|
|
)
|
|
)
|
|
conditions.append(
|
|
models.FieldCondition(
|
|
key="metadata.Header_2",
|
|
match=models.MatchValue(
|
|
value=headers[1]
|
|
),
|
|
)
|
|
)
|
|
elif len(headers) == 3:
|
|
conditions.append(
|
|
models.FieldCondition(
|
|
key="metadata.Header_1",
|
|
match=models.MatchValue(
|
|
value=headers[0]
|
|
),
|
|
)
|
|
)
|
|
conditions.append(
|
|
models.FieldCondition(
|
|
key="metadata.Header_2",
|
|
match=models.MatchValue(
|
|
value=headers[1]
|
|
),
|
|
)
|
|
)
|
|
conditions.append(
|
|
models.FieldCondition(
|
|
key="metadata.Header_3",
|
|
match=models.MatchValue(
|
|
value=headers[2]
|
|
),
|
|
)
|
|
)
|
|
|
|
|
|
single_result = vector_store.similarity_search(
|
|
query=query,
|
|
k=k,
|
|
filter=models.Filter(
|
|
must=conditions
|
|
),
|
|
)
|
|
|
|
return single_result
|
|
|
|
def get_relevant_documents(documents: List[Document], limit: int) -> List[Document]:
|
|
result = []
|
|
seen = set()
|
|
for doc in documents:
|
|
if doc.page_content not in seen:
|
|
result.append(doc)
|
|
seen.add(doc.page_content)
|
|
if len(result) == limit:
|
|
break
|
|
return result
|
|
|
|
|
|
|
|
def translate(text: str) -> str:
|
|
url = "https://translate.googleapis.com/translate_a/single"
|
|
params = {
|
|
"client": "gtx",
|
|
"sl": "vi",
|
|
"tl": "en",
|
|
"dt": "t",
|
|
"q": text,
|
|
}
|
|
|
|
try:
|
|
r = requests.get(
|
|
url, params=params, timeout=10
|
|
)
|
|
r.raise_for_status()
|
|
result = r.json()
|
|
translated_text = "".join([sentence[0] for sentence in result[0]])
|
|
return translated_text
|
|
except requests.exceptions.RequestException as e:
|
|
error_msg = f"Translation API error: {str(e)}"
|
|
logging.error(error_msg)
|
|
return f"{text}\n\n[Translation failed: {error_msg}]"
|
|
except Exception as e:
|
|
error_msg = f"Unexpected error during translation: {str(e)}"
|
|
logging.exception(error_msg)
|
|
return f"{text}\n\n[Translation failed: {error_msg}]" |