Spaces:
Running
Running
enhanced_accessibility = False #@param {type:"boolean"} | |
#@markdown --- | |
#@markdown #### Please select your language: | |
#lang_select = "English" #@param ["English", "Spanish"] | |
#if lang_select == "English": | |
# lang = "en" | |
#elif lang_select == "Spanish": | |
# lang = "es" | |
#else: | |
# raise Exception("Language not supported.") | |
#@markdown --- | |
use_gpu = True #@param {type:"boolean"} | |
from fastapi import FastAPI, Request, Form | |
from fastapi.responses import HTMLResponse | |
from fastapi.responses import FileResponse | |
from fastapi.templating import Jinja2Templates | |
from fastapi.staticfiles import StaticFiles | |
# ... | |
# Mount a directory to serve static files (e.g., CSS and JavaScript) | |
import logging | |
app = FastAPI() | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
templates = Jinja2Templates(directory="templates") | |
files = {} | |
# Configure logging | |
logging.basicConfig(level=logging.DEBUG) | |
# Mock data for your interface | |
data = { | |
"speaker_options": ["en","en-us","en-029","en-gb-x-gbclan","en-gb-x-rp","en-gb-scotland","en-gb-gbcwmd", "es", "de", "pl","ar","be","bn","bpy","bs","bg","ca","yue","hak","haw","cmn","hr","cs","da","nl","eo","et","fa","fa-latn","fi","fr-be","fr","ga","gd","ka","grc","el","kl","gn","gu","ht","he","hi","hu","id","io","it","ja","kn","kok","ko","ku","kk","ky","la","lb","ltg","lv","lfn","lt","jbo","mi","mk","ms","ml","mt","mr","nci","ne","nb","nog","or","om","pap","pt-br","pt","ro","ru","ru-lv","uk","sjn","sr","tn","sd","shn","si","sk","sl","es","es-419","sw","sv","ta","th","tk","tt","te","tr","ug","ur","uz","vi-vn-x-central","vi","vi0vn-x-south"], | |
"default_speaker": "en", | |
} | |
# Define a dictionary to store model configurations | |
model_configurations = {} | |
# Define global variables | |
onnx_models = [] # A list to store model names | |
onnx_configs = [] | |
speaker_id_map = { | |
"speaker1": "Speaker 1 Name", | |
"speaker2": "Speaker 2 Name", | |
# Add more speaker IDs and names as needed | |
} | |
import logging | |
import math | |
import sys | |
from pathlib import Path | |
from enum import Enum | |
from typing import Iterable, List, Optional, Union | |
import numpy as np | |
import onnxruntime | |
import glob | |
#import ipywidgets as widgets | |
from pydub import AudioSegment | |
import tempfile | |
import uuid | |
import soundfile as sf | |
#from IPython.display import display, Audio, Markdown, clear_output | |
from piper_phonemize import phonemize_codepoints, phonemize_espeak, tashkeel_run | |
async def read_root(request: Request): | |
# You should populate data and model_configurations here | |
# Make sure speaker_id_map is defined and populated correctly | |
# data = {"your_data_key": "your_data_value"} # Replace with your data | |
# model_configurations = {} # Replace with your model configurations | |
# Ensure that speaker_id_map is included in the context | |
return templates.TemplateResponse("interface.html", {"request": request, "data": data, "model_names": onnx_models, "speaker_id_map": speaker_id_map}) | |
import json | |
_LOGGER = logging.getLogger("piper_train.infer_onnx") | |
import os | |
read_key = os.environ.get('HF_TOKEN', None) | |
#if not os.path.exists("./content/piper/src/python/lng"): | |
# import subprocess | |
# command = "cp -r ./content/piper/notebooks/lng ./content/piper/src/python/lng" | |
# subprocess.run(command, shell=True) | |
import sys | |
#sys.path.append('/content/piper/notebooks') | |
sys.path.append('./content/piper/src/python') | |
import configparser | |
class Translator: | |
def __init__(self): | |
self.configs = {} | |
def load_language(self, language_name): | |
if language_name not in self.configs: | |
config = configparser.ConfigParser() | |
config.read(os.path.join(os.getcwd(), "lng", f"{language_name}.lang")) | |
self.configs[language_name] = config | |
def translate(self, language_name, string): | |
if language_name == "en": | |
return string | |
elif language_name not in self.configs: | |
self.load_language(language_name) | |
config = self.configs[language_name] | |
try: | |
return config.get("Strings", string) | |
except (configparser.NoOptionError, configparser.NoSectionError): | |
if string: | |
return string | |
else: | |
raise Exception("language engine error: This translation is corrupt!") | |
return 0 | |
#from translator import * | |
lan = Translator() | |
def detect_onnx_models(path): | |
onnx_models = glob.glob(path + '/*.onnx') | |
onnx_configs = glob.glob(path + '/*.json') | |
if len(onnx_models) > 1: | |
return onnx_models, onnx_configs # Return both lists as a tuple | |
elif len(onnx_models) == 1: | |
return onnx_models[0], onnx_configs[0] | |
else: | |
return None | |
# Define a dependency function to get the selected_model and selected_speaker_id on startup | |
#def get_initial_values(): | |
# You can set default values or load them from a configuration file here | |
# selected_model = onnx_models[0] if onnx_models else "default_model" | |
# selected_speaker_id = 0 # Default value | |
# Check if there are onnx models and load the speaker_id_map from the first model's config | |
# if onnx_models: | |
# first_model_config = model_configurations.get(onnx_models[0]) | |
# if first_model_config: | |
# speaker_id_map = first_model_config.get("speaker_id_map") | |
# if speaker_id_map: | |
# selected_speaker_id = next(iter(speaker_id_map)) # Get the first speaker_id | |
# else: | |
# selected_speaker_id = 0 | |
# return selected_model, selected_speaker_id | |
async def get_speaker_id_map(selected_model: str): | |
config = model_configurations.get(selected_model + ".json") | |
if config: | |
speaker_id_map = config.get("speaker_id_map", {}) | |
if not speaker_id_map: | |
# Assign a default value to speaker_id_map if it's empty | |
speaker_id_map = {"speaker1": "0"} | |
return {"speaker_id_map": speaker_id_map} | |
# Handle the case where the config is not available for the selected model | |
return {"speaker_id_map": {}} | |
async def load_model_data(): | |
global config_names, onnx_models, model_configurations, models_path # Make onnx_models, model_configurations, and models_path available globally | |
# Load data for all models in the directory upon startup | |
sys.path.append('./content/piper/src/python') | |
models_path = "./content/piper/src/python" | |
logging.basicConfig(level=logging.DEBUG) | |
# Collect data for all models in the directory and populate model_configurations | |
model_names, config_names = detect_onnx_models(models_path) | |
onnx_models = model_names # Populate onnx_models here | |
for config_name in config_names: | |
# Load the configuration data for each model (including speaker_id_map) | |
config = load_model_configuration(models_path, config_name) # Pass config_name, not models_path | |
if config: | |
model_configurations[config_name] = config | |
def load_model_configuration(models_path, config_name): | |
# Assuming config_name is the name of the JSON configuration file, e.g., 'model.json' | |
config_file_path = os.path.join("", config_name) | |
try: | |
with open(config_file_path, 'r') as config_file: | |
config_data = json.load(config_file) | |
return config_data | |
except FileNotFoundError: | |
# Handle the case where the configuration file does not exist | |
return None | |
except IsADirectoryError: | |
# Handle the case where config_name is a directory (not a file) | |
return None | |
# Define a dependency function to get the selected_model and selected_speaker_id on startup | |
#def get_initial_values() -> Tuple[str, str]: | |
# You can set default values or load them from a configuration file here | |
# selected_model = onnx_models[0] if onnx_models else "default_model" | |
# selected_speaker_id = "default_speaker_id" # Default value | |
# Check if there are onnx models and load the speaker_id_map from the first model's config | |
# if onnx_models: | |
# first_model_config = model_configurations.get(onnx_models[0]) | |
# if first_model_config: | |
# speaker_id_map = first_model_config.get("speaker_id_map") | |
# if speaker_id_map: | |
# selected_speaker_id = next(iter(speaker_id_map)) # Get the first speaker_id | |
#return selected_model, selected_speaker_id | |
async def main( | |
request: Request, | |
text_input: str = Form(default="1, 2, 3. This is a test. Enter some text to generate."), | |
selected_model: str = Form(...), # Selected model | |
selected_speaker_id: str = Form(...), # Selected speaker ID | |
speaker: str = Form(...), | |
speed_slider: float = Form(...), | |
noise_scale_slider: float = Form(...), | |
noise_scale_w_slider: float = Form(...), | |
play: bool = Form(True), | |
# initial_values: Tuple[str, str] = Depends(get_initial_values) # Use the dependency here | |
): | |
# ... (previous code) | |
if selected_model in onnx_models: | |
# model_name = selected_model | |
# onnx_model = selected_model # Replace with the actual key for your ONNX model file | |
# providers = [("CUDAExecutionProvider", {"cudnn_conv_use_max_workspace": '1'})] | |
providers = ["CPUExecutionProvider" | |
if use_gpu is False | |
else ("CUDAExecutionProvider", {"cudnn_conv_algo_search": "DEFAULT"}) | |
] | |
print(onnxruntime.get_device()) | |
sess_options = onnxruntime.SessionOptions() | |
model, config = load_onnx(selected_model, sess_options, providers) | |
config["espeak"]["voice"] = speaker | |
# speaker_id_map = config.get("speaker_id_map", {}) | |
print(text_input) | |
print(speaker) | |
auto_play = play | |
audio = inferencing(model, config, selected_speaker_id, text_input, speed_slider, noise_scale_slider, noise_scale_w_slider, auto_play) | |
temp_dir = tempfile.mkdtemp() | |
renamed_audio_file = os.path.join(temp_dir, "download.mp3") | |
audio.export(renamed_audio_file, format="mp3") | |
# Generate a unique file ID | |
file_id = str(uuid.uuid4()) | |
# Store the file path with the generated file ID | |
files[file_id] = renamed_audio_file | |
# Create a URL to download the file | |
file_url = f'/download?fileId={file_id}' | |
# Restore the form and return the response | |
response_html = """ | |
<script> | |
document.getElementById("loading-message").innerText = "Audio generated successfully!"; | |
document.getElementById("synthesize_button").disabled = false; | |
</script> | |
""" | |
else: | |
# The selected_model is not found in the list; handle this case as needed | |
# You can show an error message or handle it differently | |
response_html = """ | |
<div id="error-message">Selected model not found.</div> | |
<script> | |
document.getElementById("synthesize_button").disabled = true; | |
</script> | |
""" | |
# Pass the necessary data to the HTML template, including speaker_id_map | |
return templates.TemplateResponse("interface.html", { | |
"request": request, | |
"file_url": file_url, | |
"text_input": text_input, | |
"data": data, | |
"selected_model": selected_model, | |
"model_names": onnx_models, | |
"selected_model": selected_model, | |
"selected_speaker_id": selected_speaker_id, | |
"speaker_id_map": speaker_id_map, # Make sure speaker_id_map is included here | |
"dynamic_content": response_html | |
}) | |
async def download_file(fileId: str): | |
# Retrieve the file path from the dictionary using the file ID | |
filepath = files.get(fileId) | |
if filepath: | |
# Create a FileResponse to serve the file for download | |
return FileResponse(filepath, headers={"Content-Disposition": "attachment"}) | |
else: | |
return {"error": "File not found"} | |
def load_onnx(model, sess_options, providers): | |
_LOGGER.debug("Loading model from %s", model) | |
config = load_config(model) | |
model = onnxruntime.InferenceSession( | |
str(model), | |
sess_options=sess_options, | |
providers= providers | |
) | |
_LOGGER.info("Loaded model from %s", model) | |
return model, config | |
def load_config(model): | |
with open(f"{model}.json", "r") as file: | |
config = json.load(file) | |
return config | |
PAD = "_" # padding (0) | |
BOS = "^" # beginning of sentence | |
EOS = "$" # end of sentence | |
class PhonemeType(str, Enum): | |
ESPEAK = "espeak" | |
TEXT = "text" | |
def phonemize(config, text: str) -> List[List[str]]: | |
"""Text to phonemes grouped by sentence.""" | |
if config["phoneme_type"] == PhonemeType.ESPEAK: | |
if config["espeak"]["voice"] == "ar": | |
# Arabic diacritization | |
# https://github.com/mush42/libtashkeel/ | |
text = tashkeel_run(text) | |
return phonemize_espeak(text, config["espeak"]["voice"]) | |
if config["phoneme_type"] == PhonemeType.TEXT: | |
return phonemize_codepoints(text) | |
raise ValueError(f'Unexpected phoneme type: {config["phoneme_type"]}') | |
def phonemes_to_ids(config, phonemes: List[str]) -> List[int]: | |
"""Phonemes to ids.""" | |
id_map = config["phoneme_id_map"] | |
ids: List[int] = list(id_map[BOS]) | |
for phoneme in phonemes: | |
if phoneme not in id_map: | |
print("Missing phoneme from id map: %s", phoneme) | |
continue | |
ids.extend(id_map[phoneme]) | |
ids.extend(id_map[PAD]) | |
ids.extend(id_map[EOS]) | |
return ids | |
def audio_float_to_int16( | |
audio: np.ndarray, max_wav_value: float = 32767.0 | |
) -> np.ndarray: | |
"""Normalize audio and convert to int16 range""" | |
audio_norm = audio * (max_wav_value / max(0.01, np.max(np.abs(audio)))) | |
audio_norm = np.clip(audio_norm, -max_wav_value, max_wav_value) | |
audio_norm = audio_norm.astype("int16") | |
return audio_norm | |
def inferencing(model, config, sid, line, length_scale, noise_scale, noise_scale_w, auto_play=True): | |
audios = [] | |
# Check if 'phoneme_type' exists in the config dictionary | |
phoneme_type = config.get("phoneme_type", PhonemeType.ESPEAK.value) | |
# Fix applied here | |
if phoneme_type == PhonemeType.ESPEAK.value: | |
config["phoneme_type"] = "espeak" | |
text = phonemize(config, line) | |
for phonemes in text: | |
phoneme_ids = phonemes_to_ids(config, phonemes) | |
num_speakers = config["num_speakers"] | |
if num_speakers == 1: | |
speaker_id = None # for now | |
else: | |
speaker_id = sid | |
text = np.expand_dims(np.array(phoneme_ids, dtype=np.int64), 0) | |
text_lengths = np.array([text.shape[1]], dtype=np.int64) | |
scales = np.array( | |
[noise_scale, length_scale, noise_scale_w], | |
dtype=np.float32, | |
) | |
sid = None | |
if speaker_id is not None: | |
sid = np.asarray([int(speaker_id)], dtype=np.int64) # Convert to 1D array | |
audio = model.run( | |
None, | |
{ | |
"input": text, | |
"input_lengths": text_lengths, | |
"scales": scales, | |
"sid": sid, | |
}, | |
)[0].squeeze((0, 1)) | |
audio = audio_float_to_int16(audio.squeeze()) | |
audios.append(audio) | |
merged_audio = np.concatenate(audios) | |
sample_rate = config["audio"]["sample_rate"] | |
temp_audio_path = os.path.join(tempfile.gettempdir(), "generated_audio.wav") | |
sf.write(temp_audio_path, merged_audio, config["audio"]["sample_rate"]) | |
audio = AudioSegment.from_mp3(temp_audio_path) | |
os.remove(temp_audio_path) | |
return audio | |
def denoise( | |
audio: np.ndarray, bias_spec: np.ndarray, denoiser_strength: float | |
) -> np.ndarray: | |
audio_spec, audio_angles = transform(audio) | |
a = bias_spec.shape[-1] | |
b = audio_spec.shape[-1] | |
repeats = max(1, math.ceil(b / a)) | |
bias_spec_repeat = np.repeat(bias_spec, repeats, axis=-1)[..., :b] | |
audio_spec_denoised = audio_spec - (bias_spec_repeat * denoiser_strength) | |
audio_spec_denoised = np.clip(audio_spec_denoised, a_min=0.0, a_max=None) | |
audio_denoised = inverse(audio_spec_denoised, audio_angles) | |
return audio_denoised | |
def stft(x, fft_size, hopsamp): | |
"""Compute and return the STFT of the supplied time domain signal x. | |
Args: | |
x (1-dim Numpy array): A time domain signal. | |
fft_size (int): FFT size. Should be a power of 2, otherwise DFT will be used. | |
hopsamp (int): | |
Returns: | |
The STFT. The rows are the time slices and columns are the frequency bins. | |
""" | |
window = np.hanning(fft_size) | |
fft_size = int(fft_size) | |
hopsamp = int(hopsamp) | |
return np.array( | |
[ | |
np.fft.rfft(window * x[i : i + fft_size]) | |
for i in range(0, len(x) - fft_size, hopsamp) | |
] | |
) | |
def istft(X, fft_size, hopsamp): | |
"""Invert a STFT into a time domain signal. | |
Args: | |
X (2-dim Numpy array): Input spectrogram. The rows are the time slices and columns are the frequency bins. | |
fft_size (int): | |
hopsamp (int): The hop size, in samples. | |
Returns: | |
The inverse STFT. | |
""" | |
fft_size = int(fft_size) | |
hopsamp = int(hopsamp) | |
window = np.hanning(fft_size) | |
time_slices = X.shape[0] | |
len_samples = int(time_slices * hopsamp + fft_size) | |
x = np.zeros(len_samples) | |
for n, i in enumerate(range(0, len(x) - fft_size, hopsamp)): | |
x[i : i + fft_size] += window * np.real(np.fft.irfft(X[n])) | |
return x | |
def inverse(magnitude, phase): | |
recombine_magnitude_phase = np.concatenate( | |
[magnitude * np.cos(phase), magnitude * np.sin(phase)], axis=1 | |
) | |
x_org = recombine_magnitude_phase | |
n_b, n_f, n_t = x_org.shape # pylint: disable=unpacking-non-sequence | |
x = np.empty([n_b, n_f // 2, n_t], dtype=np.complex64) | |
x.real = x_org[:, : n_f // 2] | |
x.imag = x_org[:, n_f // 2 :] | |
inverse_transform = [] | |
for y in x: | |
y_ = istft(y.T, fft_size=1024, hopsamp=256) | |
inverse_transform.append(y_[None, :]) | |
inverse_transform = np.concatenate(inverse_transform, 0) | |
return inverse_transform | |
def transform(input_data): | |
x = input_data | |
real_part = [] | |
imag_part = [] | |
for y in x: | |
y_ = stft(y, fft_size=1024, hopsamp=256).T | |
real_part.append(y_.real[None, :, :]) # pylint: disable=unsubscriptable-object | |
imag_part.append(y_.imag[None, :, :]) # pylint: disable=unsubscriptable-object | |
real_part = np.concatenate(real_part, 0) | |
imag_part = np.concatenate(imag_part, 0) | |
magnitude = np.sqrt(real_part**2 + imag_part**2) | |
phase = np.arctan2(imag_part.data, real_part.data) | |
return magnitude, phase | |
#@app.get("/") | |
#async def read_root(request: Request): | |
# return templates.TemplateResponse("interface.html", {"request": request}) | |
if __name__ == "__main__": | |
# main() | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |
# main() | |
# pass | |
# app() | |
# Create an instance of the FastAPI class | |
#app = main() | |
# Define a route for the root endpoint | |
#def read_root(): | |
# return {"message": "Hello, World!"} |