|
import base64 |
|
import io |
|
import json |
|
import os |
|
import random |
|
import tempfile |
|
import time |
|
import threading |
|
from queue import Queue |
|
|
|
import librosa |
|
import numpy as np |
|
import pandas as pd |
|
import requests |
|
import streamlit as st |
|
from audio_recorder_streamlit import audio_recorder |
|
import torchaudio |
|
|
|
from logger import logger |
|
from utils import fs |
|
from enums import SAVE_PATH, ELO_JSON_PATH, ELO_CSV_PATH, EMAIL_PATH, TEMP_DIR, NEW_TASK_URL,ARENA_PATH |
|
|
|
result_queue = Queue() |
|
random_df = pd.read_csv("random_audios.csv") |
|
random_paths = random_df["path"].tolist() |
|
|
|
def result_writer_thread(): |
|
result_writer = ResultWriter(SAVE_PATH) |
|
while True: |
|
result_input = result_queue.get() |
|
result_writer.write_result(**result_input) |
|
result_queue.task_done() |
|
|
|
|
|
def create_files(): |
|
if not fs.exists(SAVE_PATH): |
|
logger.info("Creating save file") |
|
with fs.open(SAVE_PATH, 'wb') as f: |
|
headers = [ |
|
'email', |
|
'path', |
|
'Ori Apex_score', 'Ori Apex XT_score', 'deepgram_score', 'Ori Swift_score', 'Ori Prime_score', |
|
'Ori Apex_appearance', 'Ori Apex XT_appearance', 'deepgram_appearance', 'Ori Swift_appearance', 'Ori Prime_appearance', |
|
'Ori Apex_duration', 'Ori Apex XT_duration', 'deepgram_duration', 'Ori Swift_duration', 'Ori Prime_duration','azure_score','azure_appearance','azure_duration' |
|
] |
|
df = pd.DataFrame(columns=headers) |
|
df.to_csv(f, index=False) |
|
|
|
if not fs.exists(ELO_JSON_PATH): |
|
logger.info("Creating Elo json file") |
|
with fs.open(ELO_JSON_PATH, 'w') as f: |
|
models = ['Ori Apex', 'Ori Apex XT', 'deepgram', 'Ori Swift', 'Ori Prime', 'azure'] |
|
models = {model: 1000 for model in models} |
|
json.dump(models, f) |
|
|
|
if not fs.exists(ELO_CSV_PATH): |
|
logger.info("Creating Elo csv file") |
|
with fs.open(ELO_CSV_PATH, 'wb') as f: |
|
models = ['Ori Apex', 'Ori Apex XT', 'deepgram', 'Ori Swift', 'Ori Prime', 'azure'] |
|
models = {k:1000 for k in models} |
|
df = pd.DataFrame(models,index=[0]) |
|
df.to_csv(f, index=False) |
|
|
|
if not fs.exists(EMAIL_PATH): |
|
logger.info("Creating email file") |
|
with fs.open(EMAIL_PATH, 'wb') as f: |
|
existing_content = '' |
|
new_content = existing_content |
|
with fs.open(EMAIL_PATH, 'w') as f: |
|
f.write(new_content.encode('utf-8')) |
|
|
|
def write_email(email): |
|
if fs.exists(EMAIL_PATH): |
|
with fs.open(EMAIL_PATH, 'rb') as f: |
|
existing_content = f.read().decode('utf-8') |
|
else: |
|
existing_content = '' |
|
|
|
new_content = existing_content + email + '\n' |
|
|
|
with fs.open(EMAIL_PATH, 'wb') as f: |
|
f.write(new_content.encode('utf-8')) |
|
|
|
class ResultWriter: |
|
def __init__(self, save_path): |
|
self.save_path = save_path |
|
self.headers = [ |
|
'email', |
|
'path', |
|
'Ori Apex_score', 'Ori Apex XT_score', 'deepgram_score', 'Ori Swift_score', 'Ori Prime_score', |
|
'Ori Apex_appearance', 'Ori Apex XT_appearance', 'deepgram_appearance', 'Ori Swift_appearance', 'Ori Prime_appearance', |
|
'Ori Apex_duration', 'Ori Apex XT_duration', 'deepgram_duration', 'Ori Swift_duration', 'Ori Prime_duration','azure_score','azure_appearance','azure_duration' |
|
] |
|
|
|
self.models = ['Ori Apex', 'Ori Apex XT', 'deepgram', 'Ori Swift', 'Ori Prime', 'azure'] |
|
|
|
if not fs.exists(save_path): |
|
print("CSV File not found in s3 bucket creating a new one",save_path) |
|
with fs.open(save_path, 'wb') as f: |
|
df = pd.DataFrame(columns=self.headers) |
|
df.to_csv(f, index=False) |
|
|
|
def write_result(self, |
|
user_email, |
|
audio_path, |
|
option_1_duration_info, |
|
option_2_duration_info, |
|
winner_model=None, |
|
loser_model=None, |
|
both_preferred=False, |
|
none_preferred=False |
|
): |
|
|
|
payload = { |
|
"task":"write_result", |
|
"payload":{ |
|
"winner_model":winner_model, |
|
"loser_model":loser_model, |
|
"both_preferred":both_preferred, |
|
"none_preferred":none_preferred, |
|
"user_email":user_email, |
|
"audio_path":audio_path, |
|
"option_1_duration_info":option_1_duration_info, |
|
"option_2_duration_info":option_2_duration_info |
|
} |
|
} |
|
|
|
send_task(payload) |
|
|
|
def decode_audio_array(base64_string): |
|
bytes_data = base64.b64decode(base64_string) |
|
|
|
buffer = io.BytesIO(bytes_data) |
|
audio_array = np.load(buffer) |
|
|
|
return audio_array |
|
|
|
def send_task(payload): |
|
header = { |
|
"Authorization": f"Bearer {os.getenv('CREATE_TASK_API_KEY')}" |
|
} |
|
if payload["task"] in ["fetch_audio","write_result"]: |
|
response = requests.post(NEW_TASK_URL,json=payload,headers=header,timeout=300) |
|
else: |
|
response = requests.post(NEW_TASK_URL,json=payload,headers=header,timeout=300,stream=True) |
|
try: |
|
response = response.json() |
|
except Exception as e: |
|
logger.error("Error while sending task %s",e) |
|
logger.error("response received %s",response.text) |
|
if response.status_code == 413: |
|
return "Recording too long, please try again" |
|
return "error please try again" |
|
|
|
if payload["task"] == "transcribe_with_fastapi": |
|
return response["text"] |
|
|
|
def fetch_audio(): |
|
filepath = random.choice(random_paths) |
|
with fs.open(f"{ARENA_PATH}/{filepath}", 'rb') as f: |
|
audio,sr = torchaudio.load(f) |
|
audio = audio.numpy() |
|
return audio,sr,filepath |
|
|
|
def encode_audio_array(audio_array): |
|
buffer = io.BytesIO() |
|
np.save(buffer, audio_array) |
|
buffer.seek(0) |
|
|
|
base64_bytes = base64.b64encode(buffer.read()) |
|
base64_string = base64_bytes.decode('utf-8') |
|
|
|
return base64_string |
|
|
|
def call_function(model_name): |
|
if st.session_state.current_audio_type == "recorded": |
|
y,_ = librosa.load(st.session_state.audio_path,sr=22050,mono=True) |
|
encoded_array = encode_audio_array(y) |
|
payload = { |
|
"task":"transcribe_with_fastapi", |
|
"payload":{ |
|
"file_path":encoded_array, |
|
"model_name":model_name, |
|
"audio_b64":True |
|
}} |
|
else: |
|
sr = st.session_state.audio['sample_rate'] |
|
array = st.session_state.audio['data'] |
|
if sr != 22050: |
|
array = librosa.resample(y=array,orig_sr=sr,target_sr=22050) |
|
encoded_array = encode_audio_array(array) |
|
payload = { |
|
"task":"transcribe_with_fastapi", |
|
"payload":{ |
|
"file_path":encoded_array, |
|
"model_name":model_name, |
|
"audio_b64":True |
|
}} |
|
|
|
transcript = send_task(payload) |
|
return transcript |
|
|
|
def transcribe_audio(): |
|
models_list = ["Ori Apex", "Ori Apex XT", "deepgram", "Ori Swift", "Ori Prime","azure"] |
|
model1_name, model2_name = random.sample(models_list, 2) |
|
|
|
st.session_state.option_1_model_name = model1_name |
|
st.session_state.option_2_model_name = model2_name |
|
|
|
time_1 = time.time() |
|
transcript1 = call_function(model1_name) |
|
time_2 = time.time() |
|
transcript2 = call_function(model2_name) |
|
time_3 = time.time() |
|
|
|
st.session_state.option_2_response_time = round(time_3 - time_2,3) |
|
st.session_state.option_1_response_time = round(time_2 - time_1,3) |
|
|
|
if transcript1 == "nan": |
|
transcript1 = "" |
|
if transcript2 == "nan": |
|
transcript2 = "" |
|
|
|
return transcript1, transcript2 |
|
|
|
def reset_state(): |
|
st.session_state.audio = None |
|
st.session_state.current_audio_type = None |
|
st.session_state.audio_path = "" |
|
st.session_state.option_selected = False |
|
st.session_state.transcribed = False |
|
st.session_state.option_2_model_name = "" |
|
st.session_state.option_1_model_name = "" |
|
st.session_state.option_1 = "" |
|
st.session_state.option_2 = "" |
|
st.session_state.option_1_model_name_state = "" |
|
st.session_state.option_2_model_name_state = "" |
|
st.session_state.has_audio = False |
|
|
|
def on_option_1_click(): |
|
if st.session_state.transcribed and not st.session_state.option_selected: |
|
with st.spinner("πΎ Saving and loading results... please wait"): |
|
st.session_state.option_1_model_name_state = f"π {st.session_state.option_1_model_name} π" |
|
st.session_state.option_2_model_name_state = f"π {st.session_state.option_2_model_name} π" |
|
st.session_state.choice = f"You chose Option 1. Option 1 was {st.session_state.option_1_model_name} Option 2 was {st.session_state.option_2_model_name}" |
|
result_queue.put( |
|
{ |
|
"user_email": st.session_state.user_email, |
|
"audio_path": st.session_state.audio_path, |
|
"winner_model": st.session_state.option_1_model_name, |
|
"loser_model": st.session_state.option_2_model_name, |
|
"option_1_duration_info": [(f"{st.session_state.option_1_model_name}_duration",st.session_state.option_1_response_time)], |
|
"option_2_duration_info": [(f"{st.session_state.option_2_model_name}_duration",st.session_state.option_2_response_time)] |
|
} |
|
) |
|
st.session_state.option_selected = True |
|
st.session_state.disable_voting=True |
|
|
|
def on_option_2_click(): |
|
if st.session_state.transcribed and not st.session_state.option_selected: |
|
with st.spinner("πΎ Saving and loading results... please wait"): |
|
st.session_state.option_2_model_name_state = f"π {st.session_state.option_2_model_name} π" |
|
st.session_state.option_1_model_name_state = f"π {st.session_state.option_1_model_name} π" |
|
st.session_state.choice = f"You chose Option 2. Option 1 was {st.session_state.option_1_model_name} Option 2 was {st.session_state.option_2_model_name}" |
|
result_queue.put( |
|
{ |
|
"user_email": st.session_state.user_email, |
|
"audio_path": st.session_state.audio_path, |
|
"winner_model": st.session_state.option_2_model_name, |
|
"loser_model": st.session_state.option_1_model_name, |
|
"option_1_duration_info": [(f"{st.session_state.option_1_model_name}_duration",st.session_state.option_1_response_time)], |
|
"option_2_duration_info": [(f"{st.session_state.option_2_model_name}_duration",st.session_state.option_2_response_time)] |
|
} |
|
) |
|
st.session_state.option_selected = True |
|
st.session_state.disable_voting=True |
|
|
|
def on_option_both_click(): |
|
if st.session_state.transcribed and not st.session_state.option_selected: |
|
with st.spinner("πΎ Saving and loading results... please wait"): |
|
st.session_state.option_2_model_name_state = f"π {st.session_state.option_2_model_name} π" |
|
st.session_state.option_1_model_name_state = f"π {st.session_state.option_1_model_name} π" |
|
st.session_state.choice = f"You chose Prefer both. Option 1 was {st.session_state.option_1_model_name} Option 2 was {st.session_state.option_2_model_name}" |
|
result_queue.put( |
|
{ |
|
"user_email": st.session_state.user_email, |
|
"audio_path": st.session_state.audio_path, |
|
"winner_model": st.session_state.option_1_model_name, |
|
"loser_model": st.session_state.option_2_model_name, |
|
"option_1_duration_info": [(f"{st.session_state.option_1_model_name}_duration",st.session_state.option_1_response_time)], |
|
"option_2_duration_info": [(f"{st.session_state.option_2_model_name}_duration",st.session_state.option_2_response_time)], |
|
"both_preferred": True |
|
} |
|
) |
|
st.session_state.option_selected = True |
|
st.session_state.disable_voting=True |
|
|
|
def on_option_none_click(): |
|
if st.session_state.transcribed and not st.session_state.option_selected: |
|
with st.spinner("πΎ Saving and loading results... please wait"): |
|
st.session_state.option_1_model_name_state = f"π {st.session_state.option_1_model_name} π" |
|
st.session_state.option_2_model_name_state = f"π {st.session_state.option_2_model_name} π" |
|
st.session_state.choice = f"You chose none option. Option 1 was {st.session_state.option_1_model_name} Option 2 was {st.session_state.option_2_model_name}" |
|
result_queue.put({ |
|
"user_email": st.session_state.user_email, |
|
"audio_path": st.session_state.audio_path, |
|
"winner_model": st.session_state.option_1_model_name, |
|
"loser_model": st.session_state.option_2_model_name, |
|
"option_1_duration_info": [(f"{st.session_state.option_1_model_name}_duration",st.session_state.option_1_response_time)], |
|
"option_2_duration_info": [(f"{st.session_state.option_2_model_name}_duration",st.session_state.option_2_response_time)], |
|
"none_preferred": True |
|
} |
|
) |
|
st.session_state.option_selected = True |
|
st.session_state.disable_voting=True |
|
|
|
def on_click_transcribe(): |
|
if st.session_state.has_audio: |
|
with st.spinner("Transcribing audio... this may take up to 30 seconds"): |
|
option_1_text, option_2_text = transcribe_audio( |
|
) |
|
st.session_state.option_1 = option_1_text if option_1_text else "* inaudible *" |
|
st.session_state.option_2 = option_2_text if option_2_text else "* inaudible *" |
|
st.session_state.transcribed = True |
|
st.session_state.option_1_model_name_state = "" |
|
st.session_state.option_2_model_name_state = "" |
|
st.session_state.option_selected = None |
|
st.session_state.recording=True |
|
st.session_state.disable_voting=False |
|
|
|
def on_random_click(): |
|
reset_state() |
|
with st.spinner("Fetching random audio... please wait"): |
|
array, sampling_rate, filepath = fetch_audio() |
|
st.session_state.audio = {"data":array,"sample_rate":sampling_rate,"format":"audio/wav"} |
|
st.session_state.has_audio = True |
|
st.session_state.current_audio_type = "random" |
|
st.session_state.audio_path = filepath |
|
st.session_state.option_selected = None |
|
|
|
def on_reset_click(): |
|
reset_state() |
|
|
|
writer_thread = threading.Thread(target=result_writer_thread) |
|
writer_thread.start() |
|
|
|
def main(): |
|
|
|
st.title("βοΈ Ori Speech-To-Text Arena βοΈ") |
|
|
|
if "has_audio" not in st.session_state: |
|
st.session_state.has_audio = False |
|
if "audio" not in st.session_state: |
|
st.session_state.audio = None |
|
if "audio_path" not in st.session_state: |
|
st.session_state.audio_path = "" |
|
if "option_1" not in st.session_state: |
|
st.session_state.option_1 = "" |
|
if "option_2" not in st.session_state: |
|
st.session_state.option_2 = "" |
|
if "transcribed" not in st.session_state: |
|
st.session_state.transcribed = False |
|
if "option_1_model_name_state" not in st.session_state: |
|
st.session_state.option_1_model_name_state = "" |
|
if "option_1_model_name" not in st.session_state: |
|
st.session_state.option_1_model_name = "" |
|
if "option_2_model_name" not in st.session_state: |
|
st.session_state.option_2_model_name = "" |
|
if "option_2_model_name_state" not in st.session_state: |
|
st.session_state.option_2_model_name_state = "" |
|
if "user_email" not in st.session_state: |
|
st.session_state.user_email = "" |
|
if "recording" not in st.session_state: |
|
st.session_state.recording = True |
|
if "disable_voting" not in st.session_state: |
|
st.session_state.disable_voting = True |
|
col1, col2 = st.columns([1, 1]) |
|
|
|
with col1: |
|
st.markdown("### Record Audio") |
|
with st.container(): |
|
audio_bytes = audio_recorder( |
|
text="Click microphone to start/stop recording", |
|
pause_threshold=3, |
|
icon_size="2x", |
|
key="audio_recorder", |
|
sample_rate=16_000 |
|
) |
|
if audio_bytes and audio_bytes != st.session_state.get('last_recorded_audio'): |
|
reset_state() |
|
st.session_state.last_recorded_audio = audio_bytes |
|
st.session_state.audio = {"data":audio_bytes,"format":"audio/wav"} |
|
st.session_state.current_audio_type = "recorded" |
|
st.session_state.has_audio = True |
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file: |
|
tmp_file.write(audio_bytes) |
|
os.makedirs(TEMP_DIR, exist_ok=True) |
|
st.session_state.audio_path = tmp_file.name |
|
st.session_state.option_selected = None |
|
st.toast("Audio recorded successfully",icon="π€") |
|
st.session_state.recording = False |
|
|
|
with col2: |
|
st.markdown("### Random Audio Example") |
|
with st.container(): |
|
st.button("π² Select Random Audio",on_click=on_random_click,key="random_btn") |
|
st.session_state.recording = False |
|
|
|
if st.session_state.has_audio: |
|
st.audio(**st.session_state.audio) |
|
|
|
|
|
with st.container(): |
|
st.button("π Transcribe Audio",on_click=on_click_transcribe,use_container_width=True,key="transcribe_btn",disabled=st.session_state.recording) |
|
|
|
text_containers = st.columns([1, 1]) |
|
name_containers = st.columns([1, 1]) |
|
|
|
with text_containers[0]: |
|
st.text_area("Option 1", value=st.session_state.option_1, height=300) |
|
|
|
with text_containers[1]: |
|
st.text_area("Option 2", value=st.session_state.option_2, height=300) |
|
|
|
with name_containers[0]: |
|
if st.session_state.option_1_model_name_state: |
|
st.markdown(f"<div style='text-align: center'>{st.session_state.option_1_model_name_state}</div>", unsafe_allow_html=True) |
|
|
|
with name_containers[1]: |
|
if st.session_state.option_2_model_name_state: |
|
st.markdown(f"<div style='text-align: center'>{st.session_state.option_2_model_name_state}</div>", unsafe_allow_html=True) |
|
|
|
c1, c2, c3, c4 = st.columns(4) |
|
|
|
with c1: |
|
st.button("Prefer Option 1",on_click=on_option_1_click,key="option1_btn",disabled=st.session_state.disable_voting) |
|
|
|
with c2: |
|
st.button("Prefer Option 2",on_click=on_option_2_click,key="option2_btn",disabled=st.session_state.disable_voting) |
|
|
|
with c3: |
|
st.button("Prefer Both",on_click=on_option_both_click,key="both_btn",disabled=st.session_state.disable_voting) |
|
|
|
with c4: |
|
st.button("Prefer None",on_click=on_option_none_click,key="none_btn",disabled=st.session_state.disable_voting) |
|
|
|
with st.container(): |
|
st.button("New Match",on_click=on_reset_click,key="reset_btn",use_container_width=True) |
|
|
|
INSTR = """ |
|
## Instructions: |
|
* Record audio to recognise speech (or press π² for random Audio). |
|
* Click on transcribe audio button to commence the transcription process. |
|
* Read the two options one after the other while listening to the audio. |
|
* Vote on which transcript you prefer. |
|
* Note: |
|
* Model names are revealed after the vote is cast. |
|
* Currently Hindi and English are supported, and |
|
the results for Hindi will be in Hinglish (Hindi in Latin script) |
|
* It may take up to 30 seconds for speech recognition in some cases. |
|
""".strip() |
|
|
|
st.markdown(INSTR) |
|
|
|
create_files() |
|
main() |