Spaces:
Sleeping
Sleeping
import os | |
import datetime as dt | |
import streamlit as st | |
from streamlit.logger import get_logger | |
from pymongo.mongo_client import MongoClient | |
from pymongo.server_api import ServerApi | |
from app_config import DB_SCHEMA, DB_COMPLETIONS, DB_CONVOS, DB_BATTLES, DB_ERRORS, DB_CPC, DB_BP, DB_TA | |
DB_URL = os.environ['MONGO_URL'] | |
DB_USR = os.environ['MONGO_USR'] | |
DB_PWD = os.environ['MONGO_PWD'] | |
logger = get_logger(__name__) | |
def get_db_client(): | |
uri = f"mongodb+srv://{DB_USR}:{DB_PWD}@{DB_URL}/?retryWrites=true&w=majority" | |
# Create a new client and connect to the server | |
client = MongoClient(uri, server_api=ServerApi('1')) | |
# Send a ping to confirm a successful connection | |
try: | |
client.admin.command('ping') | |
logger.debug(f"DBUTILS: Pinged your deployment. You successfully connected to MongoDB!") | |
return client | |
except Exception as e: | |
logger.error(e) | |
def new_convo(client, issue, language, username, is_comparison, model_one, model_two=None): | |
convo = { | |
"start_timestamp": dt.datetime.now(tz=dt.timezone.utc), | |
"issue": issue, | |
"language": language, | |
"username": username, | |
"is_comparison": is_comparison, | |
"model_one": model_one, | |
"model_two": model_two, | |
} | |
db = client[DB_SCHEMA] | |
convos = db[DB_CONVOS] | |
convo_id = convos.insert_one(convo).inserted_id | |
logger.debug(f"DBUTILS: new convo id is {convo_id}") | |
st.session_state['convo_id'] = convo_id | |
def new_comparison(client, prompt_timestamp, completion_timestamp, | |
chat_history, prompt, completionA, completionB, | |
source="webapp", subset=None | |
): | |
comparison = { | |
"prompt_timestamp": prompt_timestamp, | |
"completion_timestamp": completion_timestamp, | |
"source": source, | |
"subset": subset, | |
"model_one_args": { | |
'temperature':0.8 | |
}, | |
"model_two_args": { | |
'temperature':0.8 | |
}, | |
"convo_id": st.session_state['convo_id'], | |
"chat_history": chat_history, | |
"prompt": prompt, | |
"compeltion_model_one": completionA, | |
"compeltion_model_two": completionB, | |
} | |
db = client[DB_SCHEMA] | |
comparisons = db[DB_COMPLETIONS] | |
comparison_id = comparisons.insert_one(comparison).inserted_id | |
logger.debug(f"DBUTILS: new comparison id is {comparison_id}") | |
st.session_state['comparison_id'] = comparison_id | |
def new_battle_result(client, comparison_id, convo_id, username, model_one, model_two, winner): | |
battle = { | |
"battle_timestamp": dt.datetime.now(tz=dt.timezone.utc), | |
"comparison_id": comparison_id, | |
"convo_id": convo_id, | |
"username": username, | |
"model_one": model_one, | |
"model_two": model_two, | |
"winner": winner, | |
} | |
db = client[DB_SCHEMA] | |
battles = db[DB_BATTLES] | |
battle_id = battles.insert_one(battle).inserted_id | |
logger.debug(f"DBUTILS: new battle id is {battle_id}") | |
def new_completion_error(client, comparison_id, username, model): | |
error = { | |
"error_timestamp": dt.datetime.now(tz=dt.timezone.utc), | |
"comparison_id": comparison_id, | |
"username": username, | |
"model": model, | |
} | |
db = client[DB_SCHEMA] | |
errors = db[DB_ERRORS] | |
error_id = errors.insert_one(error).inserted_id | |
logger.debug(f"DBUTILS: new error id is {error_id}") | |
def new_cpc_comparison(client, convo_id, model, context, last_message, ytrue, ypred): | |
# context = memory.load_memory_variables({})[memory.memory_key] | |
comp = { | |
"CPC_timestamp": dt.datetime.now(tz=dt.timezone.utc), | |
"conversation_id": convo_id, | |
"model": model, | |
"context": context, | |
"last_message": last_message, | |
"predicted_phase": ypred, | |
"manual_phase": ytrue, | |
} | |
db = client[DB_SCHEMA] | |
cpc_comps = db[DB_CPC] | |
comarison_id = cpc_comps.insert_one(comp).inserted_id | |
logger.debug(f"DBUTILS: new error id is {comarison_id}") | |
def new_bp_comparison(client, convo_id, model, context, last_message, ytrue, ypred): | |
# context = memory.load_memory_variables({})[memory.memory_key] | |
comp = { | |
"BP_timestamp": dt.datetime.now(tz=dt.timezone.utc), | |
"conversation_id": convo_id, | |
"model": model, | |
"context": context, | |
"last_message": last_message, | |
"is_advice": ypred["is_advice"], | |
"manual_is_advice": ytrue["is_advice"], | |
"is_pi": ypred["is_personal_info"], | |
"manual_is_pi": ytrue["is_personal_info"], | |
} | |
db = client[DB_SCHEMA] | |
bp_comps = db[DB_BP] | |
comarison_id = bp_comps.insert_one(comp).inserted_id | |
logger.debug(f"DBUTILS: new BP id is {comarison_id}") | |
def new_convo_scoring_comparison(client, convo_id, context, ytrue, ypred): | |
# context = memory.load_memory_variables({})[memory.memory_key] | |
comp = { | |
"scoring_timestamp": dt.datetime.now(tz=dt.timezone.utc), | |
"conversation_id": convo_id, | |
"context": context, | |
"manual_scoring": ytrue, | |
"model_scoring": ypred, | |
} | |
db = client[DB_SCHEMA] | |
ta_comps = db[DB_TA] | |
comarison_id = ta_comps.insert_one(comp).inserted_id | |
logger.debug(f"DBUTILS: new TA convo comparison id is {comarison_id}") | |
def get_non_assesed_comparison(client, username): | |
from bson.son import SON | |
pipeline = [ | |
{'$lookup': { | |
'from': DB_BATTLES, | |
'localField': '_id', | |
'foreignField': 'comparison_id', | |
"pipeline": [ | |
{"$match": {"username":username}}, | |
], | |
'as': 'battles' | |
}}, | |
{'$lookup': { | |
'from': DB_CONVOS, | |
'localField': 'convo_id', | |
'foreignField': '_id', | |
'as': 'convo_info' | |
}}, | |
{"$match":{ | |
"battles": {"$size":0}, | |
}}, | |
{"$addFields": { | |
"is_manual": { | |
"$cond":[ | |
{"$eq": ["$source","manual"]}, | |
1, | |
0 | |
] | |
}, | |
"is_eval":{ | |
"$cond":[ | |
{"$eq": ["$subset","eval"]}, | |
1, | |
0 | |
] | |
}, | |
"priority": {"$sum": ["is_manual","is_eval"]} | |
}}, | |
{"$sort": SON([ | |
("priority", -1), | |
("prompt_timestamp", 1), | |
("convo_id", 1), | |
]) | |
}, | |
{"$limit": 1} | |
] | |
db = client[DB_SCHEMA] | |
return list(db[DB_COMPLETIONS].aggregate(pipeline)) | |