Spaces:
Runtime error
Runtime error
import json | |
import re | |
import tempfile | |
import os | |
import streamlit as st | |
from deepgram import DeepgramClient, PrerecordedOptions, FileSource | |
import os | |
from dotenv import load_dotenv | |
# Load the environment variables from the .env file | |
load_dotenv() | |
# Access the API key | |
DG_KEY = os.getenv("DG_KEY") | |
deepgram = DeepgramClient(DG_KEY) | |
# Function to transcribe an audio file | |
def transcribe_audio_file(audio_file_path): | |
# Read the audio file from the local path | |
with open(audio_file_path, "rb") as audio_file: | |
buffer_data = audio_file.read() | |
# Define the transcription options | |
options = { | |
"model": "nova-2", | |
"smart_format": True, | |
"language": "hi", #alternatively 'en' | |
"diarize": True, | |
"profanity_filter": False | |
} | |
payload = { | |
"buffer": buffer_data, | |
} | |
# Call the transcribe_file method with the audio buffer and options | |
response = deepgram.listen.prerecorded.v("1").transcribe_file(payload, options) | |
return response | |
def process_diarized_transcript(res): | |
transcript = res['results']['channels'][0]['alternatives'][0] | |
words = res['results']['channels'][0]['alternatives'][0]['words'] | |
current_speaker = None | |
current_sentence = [] | |
output = [] | |
for word in words: | |
# This checks if the speaker has changed from the previous word. | |
if current_speaker != word['speaker']: | |
if current_sentence: | |
output.append((current_speaker, ' '.join(current_sentence))) | |
current_sentence = [] | |
current_speaker = word['speaker'] # This updates the current speaker. | |
current_sentence.append(word['punctuated_word']) # adds current word to the sentence being built. | |
# This checks if the current word ends a sentence (by punctuation). | |
if word['punctuated_word'].endswith(('.', '?', '!')): | |
output.append((current_speaker, ' '.join(current_sentence))) | |
current_sentence = [] | |
# adds any remaining words as a final sentence. | |
if current_sentence: | |
output.append((current_speaker, ' '.join(current_sentence))) | |
return output | |
def format_speaker(speaker_num): | |
return f"speaker {speaker_num}" | |
def transcribe_and_process_audio(audio_file_path): | |
# Transcribe the audio file | |
res = transcribe_audio_file(audio_file_path) | |
# Process the diarized transcript | |
diarized_result = process_diarized_transcript(res) | |
# Check if the result is available | |
if not diarized_result: | |
return "No transcription available. The audio might still be too low quality or silent." | |
# Initialize an empty string variable to store the transcription | |
transcription = "" | |
# Open a text file to write the result | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as temp_file: | |
file_path = temp_file.name | |
# Iterate over the diarized result | |
for speaker, sentence in diarized_result: | |
# Format the speaker and sentence | |
line = f"{format_speaker(speaker)}: {sentence}\n" | |
# Append the line to the transcription variable | |
transcription += line | |
# Write the line to the text file | |
temp_file.write(line.encode('utf-8')) | |
return transcription | |
# Streamlit interface | |
st.title("Audio Transcription and Diarization") | |
uploaded_file = st.file_uploader("Choose an audio file", type=["mp3", "wav", "m4a"]) | |
if uploaded_file is not None: | |
with tempfile.NamedTemporaryFile(delete=False) as temp_audio_file: | |
temp_audio_file.write(uploaded_file.read()) | |
temp_audio_file_path = temp_audio_file.name | |
st.write("Transcribing audio...") | |
transcription = transcribe_and_process_audio(temp_audio_file_path) | |
st.write("Transcription:") | |
st.text(transcription) | |