Spaces:
Runtime error
Runtime error
File size: 6,800 Bytes
b24d496 67beed8 b24d496 a5ee5e1 b24d496 a5ee5e1 b24d496 67beed8 b24d496 acb485b ede4b3c b24d496 1d40914 7be11b2 b24d496 9bef663 e99ccbb 67beed8 7be11b2 67beed8 b24d496 67beed8 b24d496 67beed8 b24d496 67beed8 b24d496 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
from fastapi import FastAPI, HTTPException
from semantic_search import SemanticSearch
from transaction_maps_search import TransactionMapsSearch
from pydantic import BaseModel
import os
import datetime
import json
import traceback
from llm.common import LlmParams, LlmPredictParams
from llm.deepinfra_api import DeepInfraApi
# Check if logs are enabled
ENABLE_LOGS = os.getenv("ENABLE_LOGS", "0") == "1"
# Set the path for log files
LOGS_BASE_PATH = os.getenv("LOGS_BASE_PATH", "logs")
# Create logs directory if it doesn't exist
if ENABLE_LOGS and not os.path.exists(LOGS_BASE_PATH):
os.makedirs(LOGS_BASE_PATH)
LLM_API_URL = os.getenv("LLM_API_URL", "")
LLM_API_KEY = os.getenv("LLM_API_KEY", "")
LLM_USE_DEEPINFRA = os.getenv("LLM_USE_DEEPINFRA", "") == "1"
class Query(BaseModel):
query: str = ''
top: int = 10
use_qe: bool = False
use_olympic: bool = False
find_transaction_maps_by_question: bool = False
find_transaction_maps_by_operation: bool = False
request_id: str = ''
categories: dict = {'НКРФ': False,
'ГКРФ': False,
'ТКРФ': False,
'Федеральный закон': False,
'Письмо Минфина': False,
'Письмо ФНС': False,
'Приказ ФНС': False,
'Постановление Правительства': False,
'Судебный документ': False,
'ВНД': False,
'Бухгалтерский документ': False}
llm_params: LlmParams = None
search = SemanticSearch()
transaction_maps_search = TransactionMapsSearch()
app = FastAPI(
title="multistep-semantic-search-app",
description="multistep-semantic-search-app",
version="0.1.0",
)
def log_query_result(query, top, request_id, result):
if not ENABLE_LOGS:
return
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
log_file_path = os.path.join(LOGS_BASE_PATH, f"{timestamp}.json")
log_data = {
"timestamp": timestamp,
"query": query,
"top": top,
"request_id": request_id,
"result": result
}
with open(log_file_path, 'w', encoding='utf-8') as log_file:
json.dump(log_data, log_file, indent=2, ensure_ascii=False)
@app.post('/search')
async def search_route(query: Query) -> dict:
default_llm_params = LlmParams(url=LLM_API_URL,api_key=LLM_API_KEY, model="meta-llama/Llama-3.3-70B-Instruct", predict_params=LlmPredictParams(temperature=0.15, top_p=0.95, min_p=0.05, seed=42, repetition_penalty=1.2, presence_penalty=1.1, max_tokens=6000))
try:
question = getattr(query, "query", None)
if not question:
raise ValueError("Query parameter 'query' is required and cannot be empty.")
top = getattr(query, "top", 15)
use_qe = getattr(query, "use_qe", False)
request_id = getattr(query, "request_id", None)
categories = getattr(query, "categories", None)
use_olympic = getattr(query, "use_olympic", False)
find_transaction_maps_by_question = getattr(query, "find_transaction_maps_by_question", False)
find_transaction_maps_by_operation = getattr(query, "find_transaction_maps_by_operation", False)
request_llm_params = getattr(query, "llm_params", None)
print(request_llm_params)
llm_params = default_llm_params#getattr(query, "llm_params", default_llm_params)
if LLM_USE_DEEPINFRA:
print(llm_params.model)
llm_api = DeepInfraApi(llm_params)
if find_transaction_maps_by_question or find_transaction_maps_by_operation:
transaction_maps_results, answer = await transaction_maps_search.search_transaction_map(
query=question,
find_transaction_maps_by_question=find_transaction_maps_by_question,
k_neighbours=top,
llm_api=llm_api)
response = {'transaction_maps_results': transaction_maps_results}
else:
modified_query, titles, concat_docs, \
relevant_consultations, predicted_explanation, \
llm_responses = await search.search(question, use_qe, use_olympic, categories, llm_params)
results = [{'title': str(item1), 'text_for_llm': str(item2)} for item1, item2 in
zip(titles, concat_docs)]
consultations = [{'title': key, 'text': value} for key, value in relevant_consultations.items()]
explanations = [{'title': key, 'text': value} for key, value in predicted_explanation.items()]
response = {'query': modified_query, 'results': results,
'consultations': consultations, 'explanations': explanations, 'llm_responses': llm_responses}
log_query_result(question, top, request_id, response)
return response
except ValueError as ve:
traceback.print_exception(type(ve), ve, ve.__traceback__)
raise HTTPException(status_code=400, detail=str(ve))
except Exception as e:
traceback.print_exception(type(e), e, e.__traceback__)
raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}")
@app.get('/health')
def health():
return {"status": "ok"}
@app.get('/read_logs')
def read_logs():
logs = []
for log_file in os.listdir(LOGS_BASE_PATH):
if log_file.endswith(".json"):
with open(os.path.join(LOGS_BASE_PATH, log_file), 'r', encoding='utf-8') as file:
log_data = json.load(file)
logs.append(log_data)
return logs
@app.get('/analyze_logs')
def analyze_logs():
logs_by_query_top = {}
for log_file in os.listdir(LOGS_BASE_PATH):
if log_file.endswith(".json"):
with open(os.path.join(LOGS_BASE_PATH, log_file), 'r', encoding='utf-8') as file:
log_data = json.load(file)
query = log_data.get("query", "")
top = log_data.get("top", "")
request_id = log_data.get("request_id", "")
# Group logs by query and top
key = f"{query}_{top}"
if key not in logs_by_query_top:
logs_by_query_top[key] = []
logs_by_query_top[key].append(log_data)
# Analyze logs and filter out logs with different results for the same query and top
invalid_logs = []
for key, logs in logs_by_query_top.items():
if len(set(json.dumps(log['result']) for log in logs)) > 1:
invalid_logs.extend(logs)
return invalid_logs
|