Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, HTTPException | |
from semantic_search import SemanticSearch | |
from transaction_maps_search import TransactionMapsSearch | |
from pydantic import BaseModel | |
import os | |
import datetime | |
import json | |
import traceback | |
from llm.vllm_api import LlmParams | |
# Set the path for log files | |
LOGS_BASE_PATH = os.getenv("LOGS_BASE_PATH", "logs") | |
# Create logs directory if it doesn't exist | |
# if not os.path.exists(LOGS_BASE_PATH): | |
# os.makedirs(LOGS_BASE_PATH) | |
# Check if logs are enabled | |
ENABLE_LOGS = os.getenv("ENABLE_LOGS", "0") == "1" | |
class Query(BaseModel): | |
query: str = '' | |
top: int = 10 | |
use_qe: bool = False | |
use_olympic: bool = False | |
find_transaction_maps_by_question: bool = False | |
find_transaction_maps_by_operation: bool = False | |
request_id: str = '' | |
categories: dict = {'НКРФ': False, | |
'ГКРФ': False, | |
'ТКРФ': False, | |
'Федеральный закон': False, | |
'Письмо Минфина': False, | |
'Письмо ФНС': False, | |
'Приказ ФНС': False, | |
'Постановление Правительства': False, | |
'Судебный документ': False, | |
'ВНД': False, | |
'Бухгалтерский документ': False} | |
llm_params: LlmParams = None | |
# search = SemanticSearch() | |
transaction_maps_search = TransactionMapsSearch() | |
app = FastAPI( | |
title="multistep-semantic-search-app", | |
description="multistep-semantic-search-app", | |
version="0.1.0", | |
) | |
def log_query_result(query, top, request_id, result): | |
if not ENABLE_LOGS: | |
return | |
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") | |
log_file_path = os.path.join(LOGS_BASE_PATH, f"{timestamp}.json") | |
log_data = { | |
"timestamp": timestamp, | |
"query": query, | |
"top": top, | |
"request_id": request_id, | |
"result": result | |
} | |
with open(log_file_path, 'w', encoding='utf-8') as log_file: | |
json.dump(log_data, log_file, indent=2, ensure_ascii=False) | |
async def search_route(query: Query) -> dict: | |
try: | |
question = getattr(query, "query", None) | |
if not question: | |
raise ValueError("Query parameter 'query' is required and cannot be empty.") | |
top = getattr(query, "top", 15) | |
use_qe = getattr(query, "use_qe", False) | |
request_id = getattr(query, "request_id", None) | |
categories = getattr(query, "categories", None) | |
use_olympic = getattr(query, "use_olympic", False) | |
find_transaction_maps_by_question = getattr(query, "find_transaction_maps_by_question", False) | |
find_transaction_maps_by_operation = getattr(query, "find_transaction_maps_by_operation", False) | |
llm_params = getattr(query, "llm_params", None) | |
if find_transaction_maps_by_question or find_transaction_maps_by_operation: | |
transaction_maps_results, answer = transaction_maps_search.search_transaction_map( | |
query=question, | |
find_transaction_maps_by_question=find_transaction_maps_by_question, | |
k_neighbours=top) | |
response = {'transaction_maps_results': transaction_maps_results} | |
else: | |
modified_query, titles, concat_docs, \ | |
relevant_consultations, predicted_explanation, \ | |
llm_responses = await search.search(question, use_qe, use_olympic, categories, query.llm_params) | |
results = [{'title': str(item1), 'text_for_llm': str(item2)} for item1, item2 in | |
zip(titles, concat_docs)] | |
consultations = [{'title': key, 'text': value} for key, value in relevant_consultations.items()] | |
explanations = [{'title': key, 'text': value} for key, value in predicted_explanation.items()] | |
response = {'query': modified_query, 'results': results, | |
'consultations': consultations, 'explanations': explanations, 'llm_responses': llm_responses} | |
log_query_result(question, top, request_id, response) | |
return response | |
except ValueError as ve: | |
traceback.print_exception(type(ve), ve, ve.__traceback__) | |
raise HTTPException(status_code=400, detail=str(ve)) | |
except Exception as e: | |
traceback.print_exception(type(e), e, e.__traceback__) | |
raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") | |
def health(): | |
return {"status": "ok"} | |
def read_logs(): | |
logs = [] | |
for log_file in os.listdir(LOGS_BASE_PATH): | |
if log_file.endswith(".json"): | |
with open(os.path.join(LOGS_BASE_PATH, log_file), 'r', encoding='utf-8') as file: | |
log_data = json.load(file) | |
logs.append(log_data) | |
return logs | |
def analyze_logs(): | |
logs_by_query_top = {} | |
for log_file in os.listdir(LOGS_BASE_PATH): | |
if log_file.endswith(".json"): | |
with open(os.path.join(LOGS_BASE_PATH, log_file), 'r', encoding='utf-8') as file: | |
log_data = json.load(file) | |
query = log_data.get("query", "") | |
top = log_data.get("top", "") | |
request_id = log_data.get("request_id", "") | |
# Group logs by query and top | |
key = f"{query}_{top}" | |
if key not in logs_by_query_top: | |
logs_by_query_top[key] = [] | |
logs_by_query_top[key].append(log_data) | |
# Analyze logs and filter out logs with different results for the same query and top | |
invalid_logs = [] | |
for key, logs in logs_by_query_top.items(): | |
if len(set(json.dumps(log['result']) for log in logs)) > 1: | |
invalid_logs.extend(logs) | |
return invalid_logs | |