convosim-ui-dev / utils /mongo_utils.py
ivnban27-ctl's picture
training-adherence-features (#1)
f3e0ba5 verified
raw
history blame
6.45 kB
import os
import datetime as dt
import streamlit as st
from streamlit.logger import get_logger
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
from app_config import DB_SCHEMA, DB_COMPLETIONS, DB_CONVOS, DB_BATTLES, DB_ERRORS, DB_CPC, DB_BP, DB_TA
DB_URL = os.environ['MONGO_URL']
DB_USR = os.environ['MONGO_USR']
DB_PWD = os.environ['MONGO_PWD']
logger = get_logger(__name__)
def get_db_client():
uri = f"mongodb+srv://{DB_USR}:{DB_PWD}@{DB_URL}/?retryWrites=true&w=majority"
# Create a new client and connect to the server
client = MongoClient(uri, server_api=ServerApi('1'))
# Send a ping to confirm a successful connection
try:
client.admin.command('ping')
logger.debug(f"DBUTILS: Pinged your deployment. You successfully connected to MongoDB!")
return client
except Exception as e:
logger.error(e)
def new_convo(client, issue, language, username, is_comparison, model_one, model_two=None):
convo = {
"start_timestamp": dt.datetime.now(tz=dt.timezone.utc),
"issue": issue,
"language": language,
"username": username,
"is_comparison": is_comparison,
"model_one": model_one,
"model_two": model_two,
}
db = client[DB_SCHEMA]
convos = db[DB_CONVOS]
convo_id = convos.insert_one(convo).inserted_id
logger.debug(f"DBUTILS: new convo id is {convo_id}")
st.session_state['convo_id'] = convo_id
def new_comparison(client, prompt_timestamp, completion_timestamp,
chat_history, prompt, completionA, completionB,
source="webapp", subset=None
):
comparison = {
"prompt_timestamp": prompt_timestamp,
"completion_timestamp": completion_timestamp,
"source": source,
"subset": subset,
"model_one_args": {
'temperature':0.8
},
"model_two_args": {
'temperature':0.8
},
"convo_id": st.session_state['convo_id'],
"chat_history": chat_history,
"prompt": prompt,
"compeltion_model_one": completionA,
"compeltion_model_two": completionB,
}
db = client[DB_SCHEMA]
comparisons = db[DB_COMPLETIONS]
comparison_id = comparisons.insert_one(comparison).inserted_id
logger.debug(f"DBUTILS: new comparison id is {comparison_id}")
st.session_state['comparison_id'] = comparison_id
def new_battle_result(client, comparison_id, convo_id, username, model_one, model_two, winner):
battle = {
"battle_timestamp": dt.datetime.now(tz=dt.timezone.utc),
"comparison_id": comparison_id,
"convo_id": convo_id,
"username": username,
"model_one": model_one,
"model_two": model_two,
"winner": winner,
}
db = client[DB_SCHEMA]
battles = db[DB_BATTLES]
battle_id = battles.insert_one(battle).inserted_id
logger.debug(f"DBUTILS: new battle id is {battle_id}")
def new_completion_error(client, comparison_id, username, model):
error = {
"error_timestamp": dt.datetime.now(tz=dt.timezone.utc),
"comparison_id": comparison_id,
"username": username,
"model": model,
}
db = client[DB_SCHEMA]
errors = db[DB_ERRORS]
error_id = errors.insert_one(error).inserted_id
logger.debug(f"DBUTILS: new error id is {error_id}")
def new_cpc_comparison(client, convo_id, model, context, last_message, ytrue, ypred):
# context = memory.load_memory_variables({})[memory.memory_key]
comp = {
"CPC_timestamp": dt.datetime.now(tz=dt.timezone.utc),
"conversation_id": convo_id,
"model": model,
"context": context,
"last_message": last_message,
"predicted_phase": ypred,
"manual_phase": ytrue,
}
db = client[DB_SCHEMA]
cpc_comps = db[DB_CPC]
comarison_id = cpc_comps.insert_one(comp).inserted_id
logger.debug(f"DBUTILS: new error id is {comarison_id}")
def new_bp_comparison(client, convo_id, model, context, last_message, ytrue, ypred):
# context = memory.load_memory_variables({})[memory.memory_key]
comp = {
"BP_timestamp": dt.datetime.now(tz=dt.timezone.utc),
"conversation_id": convo_id,
"model": model,
"context": context,
"last_message": last_message,
"is_advice": ypred["is_advice"],
"manual_is_advice": ytrue["is_advice"],
"is_pi": ypred["is_personal_info"],
"manual_is_pi": ytrue["is_personal_info"],
}
db = client[DB_SCHEMA]
bp_comps = db[DB_BP]
comarison_id = bp_comps.insert_one(comp).inserted_id
logger.debug(f"DBUTILS: new BP id is {comarison_id}")
def new_convo_scoring_comparison(client, convo_id, context, ytrue, ypred):
# context = memory.load_memory_variables({})[memory.memory_key]
comp = {
"scoring_timestamp": dt.datetime.now(tz=dt.timezone.utc),
"conversation_id": convo_id,
"context": context,
"manual_scoring": ytrue,
"model_scoring": ypred,
}
db = client[DB_SCHEMA]
ta_comps = db[DB_TA]
comarison_id = ta_comps.insert_one(comp).inserted_id
logger.debug(f"DBUTILS: new TA convo comparison id is {comarison_id}")
def get_non_assesed_comparison(client, username):
from bson.son import SON
pipeline = [
{'$lookup': {
'from': DB_BATTLES,
'localField': '_id',
'foreignField': 'comparison_id',
"pipeline": [
{"$match": {"username":username}},
],
'as': 'battles'
}},
{'$lookup': {
'from': DB_CONVOS,
'localField': 'convo_id',
'foreignField': '_id',
'as': 'convo_info'
}},
{"$match":{
"battles": {"$size":0},
}},
{"$addFields": {
"is_manual": {
"$cond":[
{"$eq": ["$source","manual"]},
1,
0
]
},
"is_eval":{
"$cond":[
{"$eq": ["$subset","eval"]},
1,
0
]
},
"priority": {"$sum": ["is_manual","is_eval"]}
}},
{"$sort": SON([
("priority", -1),
("prompt_timestamp", 1),
("convo_id", 1),
])
},
{"$limit": 1}
]
db = client[DB_SCHEMA]
return list(db[DB_COMPLETIONS].aggregate(pipeline))