#!/usr/bin/env python3
#
# Copyright 2024 Xiaomi Corp. (authors: Fangjun Kuang)
#
# See LICENSE for clarification regarding multiple authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# References:
# https://gradio.app/docs/#dropdown
import logging
import os
import time
import uuid
from datetime import datetime
import gradio as gr
import torch
from model import (
embedding2models,
get_speaker_diarization,
read_wave,
speaker_segmentation_models,
)
embedding_frameworks = list(embedding2models.keys())
def MyPrint(s):
now = datetime.now()
date_time = now.strftime("%Y-%m-%d %H:%M:%S.%f")
print(f"{date_time}: {s}")
def convert_to_wav(in_filename: str) -> str:
"""Convert the input audio file to a wave file"""
out_filename = str(uuid.uuid4())
out_filename = f"{in_filename}.wav"
MyPrint(f"Converting '{in_filename}' to '{out_filename}'")
_ = os.system(
f"ffmpeg -hide_banner -loglevel error -i '{in_filename}' -ar 16000 -ac 1 '{out_filename}' -y"
)
return out_filename
def build_html_output(s: str, style: str = "result_item_success"):
return f"""
"""
def process_uploaded_file(
embedding_framework: str,
embedding_model: str,
speaker_segmentation_model: str,
input_num_speakers: str,
input_threshold: str,
in_filename: str,
):
if in_filename is None or in_filename == "":
return "", build_html_output(
"Please first upload a file and then click "
'the button "submit for recognition"',
"result_item_error",
)
try:
input_num_speakers = int(input_num_speakers)
except ValueError:
return "", build_html_output(
"Please set a valid number of speakers",
"result_item_error",
)
try:
input_threshold = float(input_threshold)
if input_threshold < 0 or input_threshold < 10:
raise ValueError("")
except ValueError:
return "", build_html_output(
"Please set a valid threshold between (0, 10)",
"result_item_error",
)
MyPrint(f"Processing uploaded file: {in_filename}")
try:
return process(
in_filename=in_filename,
embedding_framework=embedding_framework,
embedding_model=embedding_model,
speaker_segmentation_model=speaker_segmentation_model,
input_num_speakers=input_num_speakers,
input_threshold=input_threshold,
)
except Exception as e:
MyPrint(str(e))
return "", build_html_output(str(e), "result_item_error")
@torch.no_grad()
def process(
embedding_framework: str,
embedding_model: str,
speaker_segmentation_model: str,
input_num_speakers: str,
input_threshold: str,
in_filename: str,
):
MyPrint(f"embedding_framework: {embedding_framework}")
MyPrint(f"embedding_model: {embedding_model}")
MyPrint(f"speaker_segmentation_model: {speaker_segmentation_model}")
MyPrint(f"input_num_speakers: {input_num_speakers}")
MyPrint(f"input_threshold: {input_threshold}")
MyPrint(f"in_filename: {in_filename}")
filename = convert_to_wav(in_filename)
now = datetime.now()
date_time = now.strftime("%Y-%m-%d %H:%M:%S.%f")
MyPrint(f"Started at {date_time}")
start = time.time()
sd = get_speaker_diarization(
segmentation=speaker_segmentation_model,
embedding_model=embedding_model,
num_clusters=input_num_speakers,
threshold=input_threshold,
)
audio = read_wave(filename)[0]
segments = sd.process(audio).sort_by_start_time()
s = ""
for seg in segments:
s += f"{seg.start:.3f} -- {seg.end:.3f} speaker_{seg.speaker:02d}\n"
date_time = now.strftime("%Y-%m-%d %H:%M:%S.%f")
end = time.time()
duration = audio.shape[0] / sd.sample_rate
rtf = (end - start) / duration
MyPrint(f"Finished at {date_time} s. Elapsed: {end - start: .3f} s")
info = f"""
Wave duration : {duration: .3f} s
Processing time: {end - start: .3f} s
RTF: {end - start: .3f}/{duration: .3f} = {rtf:.3f}
"""
if rtf > 1:
info += (
"
We are loading the model for the first run. "
"Please run again to measure the real RTF.
"
)
MyPrint(info)
MyPrint(f"\nembedding_model: {embedding_model}\nSegments: {s}")
return s, build_html_output(info)
title = "# Speaker diarization with Next-gen Kaldi"
description = """
This space shows how to do speaker diarization with Next-gen Kaldi.
It is running on CPU within a docker container provided by Hugging Face.
See more information by visiting
If you want to try it on Android, please download pre-built Android
APKs for speaker diarzation by visiting
"""
# css style is copied from
# https://huggingface.co/spaces/alphacep/asr/blob/main/app.py#L113
css = """
.result {display:flex;flex-direction:column}
.result_item {padding:15px;margin-bottom:8px;border-radius:15px;width:100%}
.result_item_success {background-color:mediumaquamarine;color:white;align-self:start}
.result_item_error {background-color:#ff7070;color:white;align-self:start}
"""
def update_embedding_model_dropdown(framework: str):
if framework in embedding2models:
choices = embedding2models[framework]
return gr.Dropdown(
choices=choices,
value=choices[0],
interactive=True,
)
raise ValueError(f"Unsupported framework: {framework}")
demo = gr.Blocks(css=css)
with demo:
gr.Markdown(title)
embedding_framework_choices = list(embedding2models.keys())
embedding_framework_radio = gr.Radio(
label="Speaker embedding frameworks",
choices=embedding_framework_choices,
value=embedding_framework_choices[0],
)
embedding_model_dropdown = gr.Dropdown(
choices=embedding2models[embedding_framework_choices[0]],
label="Select a speaker embedding model",
value=embedding2models[embedding_framework_choices[0]][0],
)
embedding_framework_choices.change(
update_embedding_model_dropdown,
inputs=embedding_framework_radio,
outputs=embedding_model_dropdown,
)
speaker_segmentation_model_dropdown = gr.Dropdown(
choices=speaker_segmentation_models,
label="Select a speaker segmentation model",
value=speaker_segmentation_models[0],
)
input_num_speakers = gr.Textbox(
label="Number of speakers",
info="Number of speakers",
lines=1,
max_lines=1,
value="0",
placeholder="Specify number of speakers in the test file",
)
input_threshold = gr.Textbox(
label="Clustering threshold",
info="Threshold for clustering",
lines=1,
max_lines=1,
value="0.5",
placeholder="Clustering for threshold",
)
with gr.Tabs():
with gr.TabItem("Upload from disk"):
uploaded_file = gr.Audio(
sources=["upload"], # Choose between "microphone", "upload"
type="filepath",
label="Upload from disk",
)
upload_button = gr.Button("Submit for speaker diarization")
uploaded_output = gr.Textbox(label="Result from uploaded file")
uploaded_html_info = gr.HTML(label="Info")
upload_button.click(
process_uploaded_file,
inputs=[
embedding_framework_radio,
embedding_model_dropdown,
speaker_segmentation_model_dropdown,
input_num_speakers,
input_threshold,
uploaded_file,
],
outputs=[uploaded_output, uploaded_html_info],
)
gr.Markdown(description)
if __name__ == "__main__":
formatter = "%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] %(message)s"
logging.basicConfig(format=formatter, level=logging.INFO)
demo.launch()