import streamlit as st import os import re import tempfile from audio_recorder_streamlit import audio_recorder import numpy as np import time import requests import io import base64 import random import librosa import fsspec import pandas as pd import plotly.express as px import plotly.graph_objects as go import boto3 import json from plotly.subplots import make_subplots from logger import logger fs = fsspec.filesystem( 's3', key=os.getenv("AWS_ACCESS_KEY"), secret=os.getenv("AWS_SECRET_KEY") ) s3_client = boto3.client( 's3', aws_access_key_id=os.getenv("AWS_ACCESS_KEY"), aws_secret_access_key=os.getenv("AWS_SECRET_KEY") ) SAVE_PATH = f"s3://{os.getenv('AWS_BUCKET_NAME')}/{os.getenv('RESULTS_KEY')}" ELO_JSON_PATH = f"s3://{os.getenv('AWS_BUCKET_NAME')}/{os.getenv('ELO_JSON_PATH')}" ELO_CSV_PATH = f"s3://{os.getenv('AWS_BUCKET_NAME')}/{os.getenv('ELO_CSV_KEY')}" EMAIL_PATH = f"s3://{os.getenv('AWS_BUCKET_NAME')}/{os.getenv('EMAILS_KEY')}" TEMP_DIR = f"s3://{os.getenv('AWS_BUCKET_NAME')}/{os.getenv('AUDIOS_KEY')}" CREATE_TASK_URL = os.getenv("CREATE_TASK_URL") def create_files(): if not fs.exists(SAVE_PATH): logger.info("Creating save file") with fs.open(SAVE_PATH, 'wb') as f: headers = [ 'email', 'path', 'Ori Apex_score', 'Ori Apex XT_score', 'deepgram_score', 'Ori Swift_score', 'Ori Prime_score', 'Ori Apex_appearance', 'Ori Apex XT_appearance', 'deepgram_appearance', 'Ori Swift_appearance', 'Ori Prime_appearance', 'Ori Apex_duration', 'Ori Apex XT_duration', 'deepgram_duration', 'Ori Swift_duration', 'Ori Prime_duration','azure_score','azure_appearance','azure_duration' ] df = pd.DataFrame(columns=headers) df.to_csv(f, index=False) if not fs.exists(ELO_JSON_PATH): logger.info("Creating Elo json file") with fs.open(ELO_JSON_PATH, 'w') as f: models = ['Ori Apex', 'Ori Apex XT', 'deepgram', 'Ori Swift', 'Ori Prime', 'azure'] models = {model: 1000 for model in models} json.dump(models, f) if not fs.exists(ELO_CSV_PATH): logger.info("Creating Elo csv file") with fs.open(ELO_CSV_PATH, 'wb') as f: models = ['Ori Apex', 'Ori Apex XT', 'deepgram', 'Ori Swift', 'Ori Prime', 'azure'] models = {k:1000 for k in models} df = pd.DataFrame(models,index=[0]) df.to_csv(f, index=False) if not fs.exists(EMAIL_PATH): logger.info("Creating email file") with fs.open(EMAIL_PATH, 'wb') as f: existing_content = '' new_content = existing_content with fs.open(EMAIL_PATH, 'w') as f: f.write(new_content.encode('utf-8')) def write_email(email): if fs.exists(EMAIL_PATH): with fs.open(EMAIL_PATH, 'rb') as f: existing_content = f.read().decode('utf-8') else: existing_content = '' new_content = existing_content + email + '\n' with fs.open(EMAIL_PATH, 'wb') as f: f.write(new_content.encode('utf-8')) class ResultWriter: def __init__(self, save_path): self.save_path = save_path self.headers = [ 'email', 'path', 'Ori Apex_score', 'Ori Apex XT_score', 'deepgram_score', 'Ori Swift_score', 'Ori Prime_score', 'Ori Apex_appearance', 'Ori Apex XT_appearance', 'deepgram_appearance', 'Ori Swift_appearance', 'Ori Prime_appearance', 'Ori Apex_duration', 'Ori Apex XT_duration', 'deepgram_duration', 'Ori Swift_duration', 'Ori Prime_duration','azure_score','azure_appearance','azure_duration' ] self.models = ['Ori Apex', 'Ori Apex XT', 'deepgram', 'Ori Swift', 'Ori Prime', 'azure'] if not fs.exists(save_path): print("CSV File not found in s3 bucket creating a new one",save_path) with fs.open(save_path, 'wb') as f: df = pd.DataFrame(columns=self.headers) df.to_csv(f, index=False) def write_result(self, user_email, audio_path, option_1_duration_info, option_2_duration_info, winner_model=None, loser_model=None, both_preferred=False, none_preferred=False ): payload = { "task":"write_result", "payload":{ "winner_model":winner_model, "loser_model":loser_model, "both_preferred":both_preferred, "none_preferred":none_preferred, "user_email":user_email, "audio_path":audio_path, "option_1_duration_info":option_1_duration_info, "option_2_duration_info":option_2_duration_info } } send_task(payload) def decode_audio_array(base64_string): bytes_data = base64.b64decode(base64_string) buffer = io.BytesIO(bytes_data) audio_array = np.load(buffer) return audio_array def send_task(payload): header = { "Authorization": f"Bearer {os.getenv('CREATE_TASK_API_KEY')}" } response = requests.post(CREATE_TASK_URL,json=payload,headers=header) try: response = response.json() except Exception as e: logger.error("Error while sending task %s",e) logger.debug("Payload which caused the error %s",payload) return "error please try again" if payload["task"] == "transcribe_with_fastapi": return response["text"] elif payload["task"] == "fetch_audio": array = response["array"] array = decode_audio_array(array) sampling_rate = response["sample_rate"] filepath = response["filepath"] return array,sampling_rate,filepath def encode_audio_array(audio_array): buffer = io.BytesIO() np.save(buffer, audio_array) buffer.seek(0) base64_bytes = base64.b64encode(buffer.read()) base64_string = base64_bytes.decode('utf-8') return base64_string def call_function(model_name): if st.session_state.current_audio_type == "recorded": y,_ = librosa.load(st.session_state.audio_path,sr=22050,mono=True) encoded_array = encode_audio_array(y) payload = { "task":"transcribe_with_fastapi", "payload":{ "file_path":encoded_array, "model_name":model_name, "audio_b64":True }} else: payload = { "task":"transcribe_with_fastapi", "payload":{ "file_path":st.session_state.audio_path, "model_name":model_name, "audio_b64":False }} transcript = send_task(payload) return transcript def transcribe_audio(): with st.spinner("🎯 Transcribing audio... this may take up to 30 seconds"): models_list = ["Ori Apex", "Ori Apex XT", "deepgram", "Ori Swift", "Ori Prime","azure"] model1_name, model2_name = random.sample(models_list, 2) st.session_state.option_1_model_name = model1_name st.session_state.option_2_model_name = model2_name time_1 = time.time() transcript1 = call_function(model1_name) time_2 = time.time() transcript2 = call_function(model2_name) time_3 = time.time() st.session_state.option_2_response_time = round(time_3 - time_2,3) st.session_state.option_1_response_time = round(time_2 - time_1,3) return transcript1, transcript2 def reset_state(): st.session_state.audio = None st.session_state.current_audio_type = None st.session_state.audio_path = "" st.session_state.option_selected = False st.session_state.transcribed = False st.session_state.option_2_model_name = "" st.session_state.option_1_model_name = "" st.session_state.option_1 = "" st.session_state.option_2 = "" st.session_state.option_1_model_name_state = "" st.session_state.option_2_model_name_state = "" def on_option_1_click(): if st.session_state.transcribed and not st.session_state.option_selected: st.session_state.option_1_model_name_state = f"👑 {st.session_state.option_1_model_name} 👑" st.session_state.option_2_model_name_state = f"👎 {st.session_state.option_2_model_name} 👎" st.session_state.choice = f"You chose Option 1. Option 1 was {st.session_state.option_1_model_name} Option 2 was {st.session_state.option_2_model_name}" result_writer.write_result( st.session_state.user_email, st.session_state.audio_path, winner_model=st.session_state.option_1_model_name, loser_model=st.session_state.option_2_model_name, option_1_duration_info=[(f"{st.session_state.option_1_model_name}_duration",st.session_state.option_1_response_time)], option_2_duration_info=[(f"{st.session_state.option_2_model_name}_duration",st.session_state.option_2_response_time)] ) st.session_state.option_selected = True def on_option_2_click(): if st.session_state.transcribed and not st.session_state.option_selected: st.session_state.option_2_model_name_state = f"👑 {st.session_state.option_2_model_name} 👑" st.session_state.option_1_model_name_state = f"👎 {st.session_state.option_1_model_name} 👎" st.session_state.choice = f"You chose Option 2. Option 1 was {st.session_state.option_1_model_name} Option 2 was {st.session_state.option_2_model_name}" result_writer.write_result( st.session_state.user_email, st.session_state.audio_path, winner_model=st.session_state.option_2_model_name, loser_model=st.session_state.option_1_model_name, option_1_duration_info=[(f"{st.session_state.option_1_model_name}_duration",st.session_state.option_1_response_time)], option_2_duration_info=[(f"{st.session_state.option_2_model_name}_duration",st.session_state.option_2_response_time)] ) st.session_state.option_selected = True def on_option_both_click(): if st.session_state.transcribed and not st.session_state.option_selected: st.session_state.option_2_model_name_state = f"👑 {st.session_state.option_2_model_name} 👑" st.session_state.option_1_model_name_state = f"👑 {st.session_state.option_1_model_name} 👑" st.session_state.choice = f"You chose Prefer both. Option 1 was {st.session_state.option_1_model_name} Option 2 was {st.session_state.option_2_model_name}" result_writer.write_result( st.session_state.user_email, st.session_state.audio_path, winner_model=st.session_state.option_1_model_name, loser_model=st.session_state.option_2_model_name, option_1_duration_info=[(f"{st.session_state.option_1_model_name}_duration",st.session_state.option_1_response_time)], option_2_duration_info=[(f"{st.session_state.option_2_model_name}_duration",st.session_state.option_2_response_time)], both_preferred=True ) st.session_state.option_selected = True def on_option_none_click(): if st.session_state.transcribed and not st.session_state.option_selected: st.session_state.option_1_model_name_state = f"👎 {st.session_state.option_1_model_name} 👎" st.session_state.option_2_model_name_state = f"👎 {st.session_state.option_2_model_name} 👎" st.session_state.choice = f"You chose none option. Option 1 was {st.session_state.option_1_model_name} Option 2 was {st.session_state.option_2_model_name}" result_writer.write_result( st.session_state.user_email, st.session_state.audio_path, winner_model=st.session_state.option_1_model_name, loser_model=st.session_state.option_2_model_name, option_1_duration_info=[(f"{st.session_state.option_1_model_name}_duration",st.session_state.option_1_response_time)], option_2_duration_info=[(f"{st.session_state.option_2_model_name}_duration",st.session_state.option_2_response_time)], none_preferred=True ) st.session_state.option_selected = True def on_click_transcribe(): if st.session_state.has_audio: option_1_text, option_2_text = transcribe_audio( ) st.session_state.option_1 = option_1_text st.session_state.option_2 = option_2_text st.session_state.transcribed = True st.session_state.option_1_model_name_state = "" st.session_state.option_2_model_name_state = "" st.session_state.option_selected = None def on_random_click(): reset_state() fetch_audio_payload = {"task": "fetch_audio"} array, sampling_rate, filepath = send_task(fetch_audio_payload) st.session_state.audio = {"data":array,"sample_rate":sampling_rate,"format":"audio/wav"} st.session_state.has_audio = True st.session_state.current_audio_type = "random" st.session_state.audio_path = filepath st.session_state.option_selected = None result_writer = ResultWriter(SAVE_PATH) def validate_email(email): pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return re.match(pattern, email) is not None def get_model_abbreviation(model_name): abbrev_map = { 'Ori Apex': 'Ori Apex', 'Ori Apex XT': 'Ori Apex XT', 'deepgram': 'DG', 'Ori Swift': 'Ori Swift', 'Ori Prime': 'Ori Prime', 'azure' : 'Azure' } return abbrev_map.get(model_name, model_name) def calculate_metrics(df): models = ['Ori Apex', 'Ori Apex XT', 'deepgram', 'Ori Swift', 'Ori Prime', 'azure'] metrics = {} for model in models: appearances = df[f'{model}_appearance'].sum() wins = df[f'{model}_score'].sum() durations = df[df[f'{model}_appearance'] == 1][f'{model}_duration'] if appearances > 0: win_rate = (wins / appearances) * 100 avg_duration = durations.mean() duration_std = durations.std() else: win_rate = 0 avg_duration = 0 duration_std = 0 metrics[model] = { 'appearances': appearances, 'wins': wins, 'win_rate': win_rate, 'avg_response_time': avg_duration, 'response_time_std': duration_std } return metrics def create_win_rate_chart(metrics): models = list(metrics.keys()) win_rates = [metrics[model]['win_rate'] for model in models] fig = go.Figure(data=[ go.Bar( x=[get_model_abbreviation(model) for model in models], y=win_rates, text=[f'{rate:.1f}%' for rate in win_rates], textposition='auto', hovertext=models ) ]) fig.update_layout( title='Win Rate by Model', xaxis_title='Model', yaxis_title='Win Rate (%)', yaxis_range=[0, 100] ) return fig def create_appearance_chart(metrics): models = list(metrics.keys()) appearances = [metrics[model]['appearances'] for model in models] fig = px.pie( values=appearances, names=[get_model_abbreviation(model) for model in models], title='Model Appearances Distribution', hover_data=[models] ) return fig def create_head_to_head_matrix(df): models = ['Ori Apex', 'Ori Apex XT', 'deepgram', 'Ori Swift', 'Ori Prime', 'azure'] matrix = np.zeros((len(models), len(models))) for i, model1 in enumerate(models): for j, model2 in enumerate(models): if i != j: matches = df[ (df[f'{model1}_appearance'] == 1) & (df[f'{model2}_appearance'] == 1) ] if len(matches) > 0: win_rate = (matches[f'{model1}_score'].sum() / len(matches)) * 100 matrix[i][j] = win_rate fig = go.Figure(data=go.Heatmap( z=matrix, x=[get_model_abbreviation(model) for model in models], y=[get_model_abbreviation(model) for model in models], text=[[f'{val:.1f}%' if val > 0 else '' for val in row] for row in matrix], texttemplate='%{text}', colorscale='RdYlBu', zmin=0, zmax=100 )) fig.update_layout( title='Head-to-Head Win Rates', xaxis_title='Opponent Model', yaxis_title='Model' ) return fig def create_elo_chart(df): fig = make_subplots(rows=1, cols=1, subplot_titles=('ELO Rating Progression'), row_heights=[0.7]) for column in df.columns: fig.add_trace( go.Scatter( x=list(range(len(df))), y=df[column], name=column, mode='lines+markers' ), row=1, col=1 ) fig.update_layout( title='Model ELO Ratings Analysis', showlegend=True, hovermode='x unified' ) fig.update_xaxes(title_text='Match Number', row=1, col=1) fig.update_xaxes(title_text='Models', row=2, col=1) return fig def create_metric_container(label, value, full_name=None): container = st.container() with container: st.markdown(f"**{label}**") if full_name: st.markdown(f"
{description}