Spaces:
Running
Running
from fastapi import FastAPI | |
import json | |
import logging | |
import math | |
import sys | |
from pathlib import Path | |
from enum import Enum | |
from typing import Iterable, List, Optional, Union | |
import numpy as np | |
import onnxruntime | |
import glob | |
import ipywidgets as widgets | |
from IPython.display import display, Audio, Markdown, clear_output | |
from piper_phonemize import phonemize_codepoints, phonemize_espeak, tashkeel_run | |
#_LOGGER = logging.getLogger("piper_train.infer_onnx") | |
def detect_onnx_models(path): | |
onnx_models = glob.glob(path + '/*.onnx') | |
if len(onnx_models) > 1: | |
return onnx_models | |
elif len(onnx_models) == 1: | |
return onnx_models[0] | |
else: | |
return None | |
def main(): | |
"""Main entry point""" | |
models_path = "/content/piper/src/python" | |
logging.basicConfig(level=logging.DEBUG) | |
providers = [ | |
"CPUExecutionProvider" | |
if use_gpu is False | |
else ("CUDAExecutionProvider", {"cudnn_conv_algo_search": "DEFAULT"}) | |
] | |
sess_options = onnxruntime.SessionOptions() | |
model = None | |
onnx_models = detect_onnx_models(models_path) | |
speaker_selection = widgets.Dropdown( | |
options=[], | |
description=f'{lan.translate(lang, "Select speaker")}:', | |
layout={'visibility': 'hidden'} | |
) | |
if onnx_models is None: | |
if enhanced_accessibility: | |
playaudio("novoices") | |
raise Exception(lan.translate(lang, "No downloaded voice packages!")) | |
elif isinstance(onnx_models, str): | |
onnx_model = onnx_models | |
model, config = load_onnx(onnx_model, sess_options, providers) | |
if config["num_speakers"] > 1: | |
speaker_selection.options = config["speaker_id_map"].values() | |
speaker_selection.layout.visibility = 'visible' | |
preview_sid = 0 | |
if enhanced_accessibility: | |
playaudio("multispeaker") | |
else: | |
speaker_selection.layout.visibility = 'hidden' | |
preview_sid = None | |
if enhanced_accessibility: | |
inferencing( | |
model, | |
config, | |
preview_sid, | |
lan.translate( | |
config["espeak"]["voice"][:2], | |
"Interface openned. Write your texts, configure the different synthesis options or download all the voices you want. Enjoy!" | |
) | |
) | |
else: | |
voice_model_names = [] | |
for current in onnx_models: | |
voice_struct = current.split("/")[5] | |
voice_model_names.append(voice_struct) | |
if enhanced_accessibility: | |
playaudio("selectmodel") | |
selection = widgets.Dropdown( | |
options=voice_model_names, | |
description=f'{lan.translate(lang, "Select voice package")}:', | |
) | |
load_btn = widgets.Button( | |
description=lan.translate(lang, "Load it!") | |
) | |
config = None | |
def load_model(button): | |
nonlocal config | |
global onnx_model | |
nonlocal model | |
nonlocal models_path | |
selected_voice = selection.value | |
onnx_model = f"{models_path}/{selected_voice}" | |
model, config = load_onnx(onnx_model, sess_options, providers) | |
if enhanced_accessibility: | |
playaudio("loaded") | |
if config["num_speakers"] > 1: | |
speaker_selection.options = config["speaker_id_map"].values() | |
speaker_selection.layout.visibility = 'visible' | |
if enhanced_accessibility: | |
playaudio("multispeaker") | |
else: | |
speaker_selection.layout.visibility = 'hidden' | |
load_btn.on_click(load_model) | |
display(selection, load_btn) | |
display(speaker_selection) | |
speed_slider = widgets.FloatSlider( | |
value=1, | |
min=0.25, | |
max=4, | |
step=0.1, | |
description=lan.translate(lang, "Rate scale"), | |
orientation='horizontal', | |
) | |
noise_scale_slider = widgets.FloatSlider( | |
value=0.667, | |
min=0.25, | |
max=4, | |
step=0.1, | |
description=lan.translate(lang, "Phoneme noise scale"), | |
orientation='horizontal', | |
) | |
noise_scale_w_slider = widgets.FloatSlider( | |
value=1, | |
min=0.25, | |
max=4, | |
step=0.1, | |
description=lan.translate(lang, "Phoneme stressing scale"), | |
orientation='horizontal', | |
) | |
play = widgets.Checkbox( | |
value=True, | |
description=lan.translate(lang, "Auto-play"), | |
disabled=False | |
) | |
text_input = widgets.Text( | |
value='', | |
placeholder=f'{lan.translate(lang, "Enter your text here")}:', | |
description=lan.translate(lang, "Text to synthesize"), | |
layout=widgets.Layout(width='80%') | |
) | |
synthesize_button = widgets.Button( | |
description=lan.translate(lang, "Synthesize"), | |
button_style='success', # 'success', 'info', 'warning', 'danger' or '' | |
tooltip=lan.translate(lang, "Click here to synthesize the text."), | |
icon='check' | |
) | |
close_button = widgets.Button( | |
description=lan.translate(lang, "Exit"), | |
tooltip=lan.translate(lang, "Closes this GUI."), | |
icon='check' | |
) | |
def on_synthesize_button_clicked(b): | |
if model is None: | |
if enhanced_accessibility: | |
playaudio("nomodel") | |
raise Exception(lan.translate(lang, "You have not loaded any model from the list!")) | |
text = text_input.value | |
if config["num_speakers"] > 1: | |
sid = speaker_selection.value | |
else: | |
sid = None | |
rate = speed_slider.value | |
noise_scale = noise_scale_slider.value | |
noise_scale_w = noise_scale_w_slider.value | |
auto_play = play.value | |
inferencing(model, config, sid, text, rate, noise_scale, noise_scale_w, auto_play) | |
def on_close_button_clicked(b): | |
clear_output() | |
if enhanced_accessibility: | |
playaudio("exit") | |
synthesize_button.on_click(on_synthesize_button_clicked) | |
close_button.on_click(on_close_button_clicked) | |
display(text_input) | |
display(speed_slider) | |
display(noise_scale_slider) | |
display(noise_scale_w_slider) | |
display(play) | |
display(synthesize_button) | |
display(close_button) | |
def load_onnx(model, sess_options, providers = ["CPUExecutionProvider"]): | |
_LOGGER.debug("Loading model from %s", model) | |
config = load_config(model) | |
model = onnxruntime.InferenceSession( | |
str(model), | |
sess_options=sess_options, | |
providers= providers | |
) | |
_LOGGER.info("Loaded model from %s", model) | |
return model, config | |
def load_config(model): | |
with open(f"{model}.json", "r") as file: | |
config = json.load(file) | |
return config | |
PAD = "_" # padding (0) | |
BOS = "^" # beginning of sentence | |
EOS = "$" # end of sentence | |
class PhonemeType(str, Enum): | |
ESPEAK = "espeak" | |
TEXT = "text" | |
def phonemize(config, text: str) -> List[List[str]]: | |
"""Text to phonemes grouped by sentence.""" | |
if config["phoneme_type"] == PhonemeType.ESPEAK: | |
if config["espeak"]["voice"] == "ar": | |
# Arabic diacritization | |
# https://github.com/mush42/libtashkeel/ | |
text = tashkeel_run(text) | |
return phonemize_espeak(text, config["espeak"]["voice"]) | |
if config["phoneme_type"] == PhonemeType.TEXT: | |
return phonemize_codepoints(text) | |
raise ValueError(f'Unexpected phoneme type: {config["phoneme_type"]}') | |
def phonemes_to_ids(config, phonemes: List[str]) -> List[int]: | |
"""Phonemes to ids.""" | |
id_map = config["phoneme_id_map"] | |
ids: List[int] = list(id_map[BOS]) | |
for phoneme in phonemes: | |
if phoneme not in id_map: | |
print("Missing phoneme from id map: %s", phoneme) | |
continue | |
ids.extend(id_map[phoneme]) | |
ids.extend(id_map[PAD]) | |
ids.extend(id_map[EOS]) | |
return ids | |
def inferencing(model, config, sid, line, length_scale = 1, noise_scale = 0.667, noise_scale_w = 0.8, auto_play=True): | |
audios = [] | |
if config["phoneme_type"] == "PhonemeType.ESPEAK": | |
config["phoneme_type"] = "espeak" | |
text = phonemize(config, line) | |
for phonemes in text: | |
phoneme_ids = phonemes_to_ids(config, phonemes) | |
num_speakers = config["num_speakers"] | |
if num_speakers == 1: | |
speaker_id = None # for now | |
else: | |
speaker_id = sid | |
text = np.expand_dims(np.array(phoneme_ids, dtype=np.int64), 0) | |
text_lengths = np.array([text.shape[1]], dtype=np.int64) | |
scales = np.array( | |
[noise_scale, length_scale, noise_scale_w], | |
dtype=np.float32, | |
) | |
sid = None | |
if speaker_id is not None: | |
sid = np.array([speaker_id], dtype=np.int64) | |
audio = model.run( | |
None, | |
{ | |
"input": text, | |
"input_lengths": text_lengths, | |
"scales": scales, | |
"sid": sid, | |
}, | |
)[0].squeeze((0, 1)) | |
audio = audio_float_to_int16(audio.squeeze()) | |
audios.append(audio) | |
merged_audio = np.concatenate(audios) | |
sample_rate = config["audio"]["sample_rate"] | |
display(Markdown(f"{line}")) | |
display(Audio(merged_audio, rate=sample_rate, autoplay=auto_play)) | |
def denoise( | |
audio: np.ndarray, bias_spec: np.ndarray, denoiser_strength: float | |
) -> np.ndarray: | |
audio_spec, audio_angles = transform(audio) | |
a = bias_spec.shape[-1] | |
b = audio_spec.shape[-1] | |
repeats = max(1, math.ceil(b / a)) | |
bias_spec_repeat = np.repeat(bias_spec, repeats, axis=-1)[..., :b] | |
audio_spec_denoised = audio_spec - (bias_spec_repeat * denoiser_strength) | |
audio_spec_denoised = np.clip(audio_spec_denoised, a_min=0.0, a_max=None) | |
audio_denoised = inverse(audio_spec_denoised, audio_angles) | |
return audio_denoised | |
def stft(x, fft_size, hopsamp): | |
"""Compute and return the STFT of the supplied time domain signal x. | |
Args: | |
x (1-dim Numpy array): A time domain signal. | |
fft_size (int): FFT size. Should be a power of 2, otherwise DFT will be used. | |
hopsamp (int): | |
Returns: | |
The STFT. The rows are the time slices and columns are the frequency bins. | |
""" | |
window = np.hanning(fft_size) | |
fft_size = int(fft_size) | |
hopsamp = int(hopsamp) | |
return np.array( | |
[ | |
np.fft.rfft(window * x[i : i + fft_size]) | |
for i in range(0, len(x) - fft_size, hopsamp) | |
] | |
) | |
def istft(X, fft_size, hopsamp): | |
"""Invert a STFT into a time domain signal. | |
Args: | |
X (2-dim Numpy array): Input spectrogram. The rows are the time slices and columns are the frequency bins. | |
fft_size (int): | |
hopsamp (int): The hop size, in samples. | |
Returns: | |
The inverse STFT. | |
""" | |
fft_size = int(fft_size) | |
hopsamp = int(hopsamp) | |
window = np.hanning(fft_size) | |
time_slices = X.shape[0] | |
len_samples = int(time_slices * hopsamp + fft_size) | |
x = np.zeros(len_samples) | |
for n, i in enumerate(range(0, len(x) - fft_size, hopsamp)): | |
x[i : i + fft_size] += window * np.real(np.fft.irfft(X[n])) | |
return x | |
def inverse(magnitude, phase): | |
recombine_magnitude_phase = np.concatenate( | |
[magnitude * np.cos(phase), magnitude * np.sin(phase)], axis=1 | |
) | |
x_org = recombine_magnitude_phase | |
n_b, n_f, n_t = x_org.shape # pylint: disable=unpacking-non-sequence | |
x = np.empty([n_b, n_f // 2, n_t], dtype=np.complex64) | |
x.real = x_org[:, : n_f // 2] | |
x.imag = x_org[:, n_f // 2 :] | |
inverse_transform = [] | |
for y in x: | |
y_ = istft(y.T, fft_size=1024, hopsamp=256) | |
inverse_transform.append(y_[None, :]) | |
inverse_transform = np.concatenate(inverse_transform, 0) | |
return inverse_transform | |
def transform(input_data): | |
x = input_data | |
real_part = [] | |
imag_part = [] | |
for y in x: | |
y_ = stft(y, fft_size=1024, hopsamp=256).T | |
real_part.append(y_.real[None, :, :]) # pylint: disable=unsubscriptable-object | |
imag_part.append(y_.imag[None, :, :]) # pylint: disable=unsubscriptable-object | |
real_part = np.concatenate(real_part, 0) | |
imag_part = np.concatenate(imag_part, 0) | |
magnitude = np.sqrt(real_part**2 + imag_part**2) | |
phase = np.arctan2(imag_part.data, real_part.data) | |
return magnitude, phase | |
# Create an instance of the FastAPI class | |
app = main() | |
# Define a route for the root endpoint | |
def read_root(): | |
return {"message": "Hello, World!"} |