Pipertts / app.py
Gregniuki's picture
Update app.py
58187c4
raw
history blame
12.1 kB
from fastapi import FastAPI
# Create an instance of the FastAPI class
app = FastAPI()
# Define a route for the root endpoint
@app.get("/")
def read_root():
return {"message": "Hello, World!"}
def detect_onnx_models(path):
onnx_models = glob.glob(path + '/*.onnx')
if len(onnx_models) > 1:
return onnx_models
elif len(onnx_models) == 1:
return onnx_models[0]
else:
return None
def main():
"""Main entry point"""
models_path = "/content/piper/src/python"
logging.basicConfig(level=logging.DEBUG)
providers = [
"CPUExecutionProvider"
if use_gpu is False
else ("CUDAExecutionProvider", {"cudnn_conv_algo_search": "DEFAULT"})
]
sess_options = onnxruntime.SessionOptions()
model = None
onnx_models = detect_onnx_models(models_path)
speaker_selection = widgets.Dropdown(
options=[],
description=f'{lan.translate(lang, "Select speaker")}:',
layout={'visibility': 'hidden'}
)
if onnx_models is None:
if enhanced_accessibility:
playaudio("novoices")
raise Exception(lan.translate(lang, "No downloaded voice packages!"))
elif isinstance(onnx_models, str):
onnx_model = onnx_models
model, config = load_onnx(onnx_model, sess_options, providers)
if config["num_speakers"] > 1:
speaker_selection.options = config["speaker_id_map"].values()
speaker_selection.layout.visibility = 'visible'
preview_sid = 0
if enhanced_accessibility:
playaudio("multispeaker")
else:
speaker_selection.layout.visibility = 'hidden'
preview_sid = None
if enhanced_accessibility:
inferencing(
model,
config,
preview_sid,
lan.translate(
config["espeak"]["voice"][:2],
"Interface openned. Write your texts, configure the different synthesis options or download all the voices you want. Enjoy!"
)
)
else:
voice_model_names = []
for current in onnx_models:
voice_struct = current.split("/")[5]
voice_model_names.append(voice_struct)
if enhanced_accessibility:
playaudio("selectmodel")
selection = widgets.Dropdown(
options=voice_model_names,
description=f'{lan.translate(lang, "Select voice package")}:',
)
load_btn = widgets.Button(
description=lan.translate(lang, "Load it!")
)
config = None
def load_model(button):
nonlocal config
global onnx_model
nonlocal model
nonlocal models_path
selected_voice = selection.value
onnx_model = f"{models_path}/{selected_voice}"
model, config = load_onnx(onnx_model, sess_options, providers)
if enhanced_accessibility:
playaudio("loaded")
if config["num_speakers"] > 1:
speaker_selection.options = config["speaker_id_map"].values()
speaker_selection.layout.visibility = 'visible'
if enhanced_accessibility:
playaudio("multispeaker")
else:
speaker_selection.layout.visibility = 'hidden'
load_btn.on_click(load_model)
display(selection, load_btn)
display(speaker_selection)
speed_slider = widgets.FloatSlider(
value=1,
min=0.25,
max=4,
step=0.1,
description=lan.translate(lang, "Rate scale"),
orientation='horizontal',
)
noise_scale_slider = widgets.FloatSlider(
value=0.667,
min=0.25,
max=4,
step=0.1,
description=lan.translate(lang, "Phoneme noise scale"),
orientation='horizontal',
)
noise_scale_w_slider = widgets.FloatSlider(
value=1,
min=0.25,
max=4,
step=0.1,
description=lan.translate(lang, "Phoneme stressing scale"),
orientation='horizontal',
)
play = widgets.Checkbox(
value=True,
description=lan.translate(lang, "Auto-play"),
disabled=False
)
text_input = widgets.Text(
value='',
placeholder=f'{lan.translate(lang, "Enter your text here")}:',
description=lan.translate(lang, "Text to synthesize"),
layout=widgets.Layout(width='80%')
)
synthesize_button = widgets.Button(
description=lan.translate(lang, "Synthesize"),
button_style='success', # 'success', 'info', 'warning', 'danger' or ''
tooltip=lan.translate(lang, "Click here to synthesize the text."),
icon='check'
)
close_button = widgets.Button(
description=lan.translate(lang, "Exit"),
tooltip=lan.translate(lang, "Closes this GUI."),
icon='check'
)
def on_synthesize_button_clicked(b):
if model is None:
if enhanced_accessibility:
playaudio("nomodel")
raise Exception(lan.translate(lang, "You have not loaded any model from the list!"))
text = text_input.value
if config["num_speakers"] > 1:
sid = speaker_selection.value
else:
sid = None
rate = speed_slider.value
noise_scale = noise_scale_slider.value
noise_scale_w = noise_scale_w_slider.value
auto_play = play.value
inferencing(model, config, sid, text, rate, noise_scale, noise_scale_w, auto_play)
def on_close_button_clicked(b):
clear_output()
if enhanced_accessibility:
playaudio("exit")
synthesize_button.on_click(on_synthesize_button_clicked)
close_button.on_click(on_close_button_clicked)
display(text_input)
display(speed_slider)
display(noise_scale_slider)
display(noise_scale_w_slider)
display(play)
display(synthesize_button)
display(close_button)
def load_onnx(model, sess_options, providers = ["CPUExecutionProvider"]):
_LOGGER.debug("Loading model from %s", model)
config = load_config(model)
model = onnxruntime.InferenceSession(
str(model),
sess_options=sess_options,
providers= providers
)
_LOGGER.info("Loaded model from %s", model)
return model, config
def load_config(model):
with open(f"{model}.json", "r") as file:
config = json.load(file)
return config
PAD = "_" # padding (0)
BOS = "^" # beginning of sentence
EOS = "$" # end of sentence
class PhonemeType(str, Enum):
ESPEAK = "espeak"
TEXT = "text"
def phonemize(config, text: str) -> List[List[str]]:
"""Text to phonemes grouped by sentence."""
if config["phoneme_type"] == PhonemeType.ESPEAK:
if config["espeak"]["voice"] == "ar":
# Arabic diacritization
# https://github.com/mush42/libtashkeel/
text = tashkeel_run(text)
return phonemize_espeak(text, config["espeak"]["voice"])
if config["phoneme_type"] == PhonemeType.TEXT:
return phonemize_codepoints(text)
raise ValueError(f'Unexpected phoneme type: {config["phoneme_type"]}')
def phonemes_to_ids(config, phonemes: List[str]) -> List[int]:
"""Phonemes to ids."""
id_map = config["phoneme_id_map"]
ids: List[int] = list(id_map[BOS])
for phoneme in phonemes:
if phoneme not in id_map:
print("Missing phoneme from id map: %s", phoneme)
continue
ids.extend(id_map[phoneme])
ids.extend(id_map[PAD])
ids.extend(id_map[EOS])
return ids
def inferencing(model, config, sid, line, length_scale = 1, noise_scale = 0.667, noise_scale_w = 0.8, auto_play=True):
audios = []
if config["phoneme_type"] == "PhonemeType.ESPEAK":
config["phoneme_type"] = "espeak"
text = phonemize(config, line)
for phonemes in text:
phoneme_ids = phonemes_to_ids(config, phonemes)
num_speakers = config["num_speakers"]
if num_speakers == 1:
speaker_id = None # for now
else:
speaker_id = sid
text = np.expand_dims(np.array(phoneme_ids, dtype=np.int64), 0)
text_lengths = np.array([text.shape[1]], dtype=np.int64)
scales = np.array(
[noise_scale, length_scale, noise_scale_w],
dtype=np.float32,
)
sid = None
if speaker_id is not None:
sid = np.array([speaker_id], dtype=np.int64)
audio = model.run(
None,
{
"input": text,
"input_lengths": text_lengths,
"scales": scales,
"sid": sid,
},
)[0].squeeze((0, 1))
audio = audio_float_to_int16(audio.squeeze())
audios.append(audio)
merged_audio = np.concatenate(audios)
sample_rate = config["audio"]["sample_rate"]
display(Markdown(f"{line}"))
display(Audio(merged_audio, rate=sample_rate, autoplay=auto_play))
def denoise(
audio: np.ndarray, bias_spec: np.ndarray, denoiser_strength: float
) -> np.ndarray:
audio_spec, audio_angles = transform(audio)
a = bias_spec.shape[-1]
b = audio_spec.shape[-1]
repeats = max(1, math.ceil(b / a))
bias_spec_repeat = np.repeat(bias_spec, repeats, axis=-1)[..., :b]
audio_spec_denoised = audio_spec - (bias_spec_repeat * denoiser_strength)
audio_spec_denoised = np.clip(audio_spec_denoised, a_min=0.0, a_max=None)
audio_denoised = inverse(audio_spec_denoised, audio_angles)
return audio_denoised
def stft(x, fft_size, hopsamp):
"""Compute and return the STFT of the supplied time domain signal x.
Args:
x (1-dim Numpy array): A time domain signal.
fft_size (int): FFT size. Should be a power of 2, otherwise DFT will be used.
hopsamp (int):
Returns:
The STFT. The rows are the time slices and columns are the frequency bins.
"""
window = np.hanning(fft_size)
fft_size = int(fft_size)
hopsamp = int(hopsamp)
return np.array(
[
np.fft.rfft(window * x[i : i + fft_size])
for i in range(0, len(x) - fft_size, hopsamp)
]
)
def istft(X, fft_size, hopsamp):
"""Invert a STFT into a time domain signal.
Args:
X (2-dim Numpy array): Input spectrogram. The rows are the time slices and columns are the frequency bins.
fft_size (int):
hopsamp (int): The hop size, in samples.
Returns:
The inverse STFT.
"""
fft_size = int(fft_size)
hopsamp = int(hopsamp)
window = np.hanning(fft_size)
time_slices = X.shape[0]
len_samples = int(time_slices * hopsamp + fft_size)
x = np.zeros(len_samples)
for n, i in enumerate(range(0, len(x) - fft_size, hopsamp)):
x[i : i + fft_size] += window * np.real(np.fft.irfft(X[n]))
return x
def inverse(magnitude, phase):
recombine_magnitude_phase = np.concatenate(
[magnitude * np.cos(phase), magnitude * np.sin(phase)], axis=1
)
x_org = recombine_magnitude_phase
n_b, n_f, n_t = x_org.shape # pylint: disable=unpacking-non-sequence
x = np.empty([n_b, n_f // 2, n_t], dtype=np.complex64)
x.real = x_org[:, : n_f // 2]
x.imag = x_org[:, n_f // 2 :]
inverse_transform = []
for y in x:
y_ = istft(y.T, fft_size=1024, hopsamp=256)
inverse_transform.append(y_[None, :])
inverse_transform = np.concatenate(inverse_transform, 0)
return inverse_transform
def transform(input_data):
x = input_data
real_part = []
imag_part = []
for y in x:
y_ = stft(y, fft_size=1024, hopsamp=256).T
real_part.append(y_.real[None, :, :]) # pylint: disable=unsubscriptable-object
imag_part.append(y_.imag[None, :, :]) # pylint: disable=unsubscriptable-object
real_part = np.concatenate(real_part, 0)
imag_part = np.concatenate(imag_part, 0)
magnitude = np.sqrt(real_part**2 + imag_part**2)
phase = np.arctan2(imag_part.data, real_part.data)
return magnitude, phase