Spaces:
Sleeping
Sleeping
File size: 5,976 Bytes
b24d496 85dfc4f b24d496 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
from fastapi import FastAPI, HTTPException
from semantic_search import SemanticSearch
from transaction_maps_search import TransactionMapsSearch
from pydantic import BaseModel
import os
import datetime
import json
import traceback
from llm.vllm_api import LlmParams
# Set the path for log files
LOGS_BASE_PATH = os.getenv("LOGS_BASE_PATH", "logs")
# Create logs directory if it doesn't exist
# if not os.path.exists(LOGS_BASE_PATH):
# os.makedirs(LOGS_BASE_PATH)
# Check if logs are enabled
ENABLE_LOGS = os.getenv("ENABLE_LOGS", "0") == "1"
class Query(BaseModel):
query: str = ''
top: int = 10
use_qe: bool = False
use_olympic: bool = False
find_transaction_maps_by_question: bool = False
find_transaction_maps_by_operation: bool = False
request_id: str = ''
categories: dict = {'НКРФ': False,
'ГКРФ': False,
'ТКРФ': False,
'Федеральный закон': False,
'Письмо Минфина': False,
'Письмо ФНС': False,
'Приказ ФНС': False,
'Постановление Правительства': False,
'Судебный документ': False,
'ВНД': False,
'Бухгалтерский документ': False}
llm_params: LlmParams = None
# search = SemanticSearch()
transaction_maps_search = TransactionMapsSearch()
app = FastAPI(
title="multistep-semantic-search-app",
description="multistep-semantic-search-app",
version="0.1.0",
)
def log_query_result(query, top, request_id, result):
if not ENABLE_LOGS:
return
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
log_file_path = os.path.join(LOGS_BASE_PATH, f"{timestamp}.json")
log_data = {
"timestamp": timestamp,
"query": query,
"top": top,
"request_id": request_id,
"result": result
}
with open(log_file_path, 'w', encoding='utf-8') as log_file:
json.dump(log_data, log_file, indent=2, ensure_ascii=False)
@app.post('/search')
async def search_route(query: Query) -> dict:
try:
question = getattr(query, "query", None)
if not question:
raise ValueError("Query parameter 'query' is required and cannot be empty.")
top = getattr(query, "top", 15)
use_qe = getattr(query, "use_qe", False)
request_id = getattr(query, "request_id", None)
categories = getattr(query, "categories", None)
use_olympic = getattr(query, "use_olympic", False)
find_transaction_maps_by_question = getattr(query, "find_transaction_maps_by_question", False)
find_transaction_maps_by_operation = getattr(query, "find_transaction_maps_by_operation", False)
llm_params = getattr(query, "llm_params", None)
if find_transaction_maps_by_question or find_transaction_maps_by_operation:
transaction_maps_results, answer = transaction_maps_search.search_transaction_map(
query=question,
find_transaction_maps_by_question=find_transaction_maps_by_question,
k_neighbours=top)
response = {'transaction_maps_results': transaction_maps_results}
else:
modified_query, titles, concat_docs, \
relevant_consultations, predicted_explanation, \
llm_responses = await search.search(question, use_qe, use_olympic, categories, query.llm_params)
results = [{'title': str(item1), 'text_for_llm': str(item2)} for item1, item2 in
zip(titles, concat_docs)]
consultations = [{'title': key, 'text': value} for key, value in relevant_consultations.items()]
explanations = [{'title': key, 'text': value} for key, value in predicted_explanation.items()]
response = {'query': modified_query, 'results': results,
'consultations': consultations, 'explanations': explanations, 'llm_responses': llm_responses}
log_query_result(question, top, request_id, response)
return response
except ValueError as ve:
traceback.print_exception(type(ve), ve, ve.__traceback__)
raise HTTPException(status_code=400, detail=str(ve))
except Exception as e:
traceback.print_exception(type(e), e, e.__traceback__)
raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}")
@app.get('/health')
def health():
return {"status": "ok"}
@app.get('/read_logs')
def read_logs():
logs = []
for log_file in os.listdir(LOGS_BASE_PATH):
if log_file.endswith(".json"):
with open(os.path.join(LOGS_BASE_PATH, log_file), 'r', encoding='utf-8') as file:
log_data = json.load(file)
logs.append(log_data)
return logs
@app.get('/analyze_logs')
def analyze_logs():
logs_by_query_top = {}
for log_file in os.listdir(LOGS_BASE_PATH):
if log_file.endswith(".json"):
with open(os.path.join(LOGS_BASE_PATH, log_file), 'r', encoding='utf-8') as file:
log_data = json.load(file)
query = log_data.get("query", "")
top = log_data.get("top", "")
request_id = log_data.get("request_id", "")
# Group logs by query and top
key = f"{query}_{top}"
if key not in logs_by_query_top:
logs_by_query_top[key] = []
logs_by_query_top[key].append(log_data)
# Analyze logs and filter out logs with different results for the same query and top
invalid_logs = []
for key, logs in logs_by_query_top.items():
if len(set(json.dumps(log['result']) for log in logs)) > 1:
invalid_logs.extend(logs)
return invalid_logs
|