Aleph-Weo-Webeta / soni_translate /text_to_speech.py
r3gm's picture
v0.5.0
b152010
raw
history blame
51.4 kB
from gtts import gTTS
import edge_tts, asyncio, json, glob # noqa
from tqdm import tqdm
import librosa, os, re, torch, gc, subprocess # noqa
from .language_configuration import (
fix_code_language,
BARK_VOICES_LIST,
VITS_VOICES_LIST,
)
from .utils import (
download_manager,
create_directories,
copy_files,
rename_file,
remove_directory_contents,
remove_files,
run_command,
)
import numpy as np
from typing import Any, Dict
from pathlib import Path
import soundfile as sf
import platform
import logging
import traceback
from .logging_setup import logger
class TTS_OperationError(Exception):
def __init__(self, message="The operation did not complete successfully."):
self.message = message
super().__init__(self.message)
def verify_saved_file_and_size(filename):
if not os.path.exists(filename):
raise TTS_OperationError(f"File '{filename}' was not saved.")
if os.path.getsize(filename) == 0:
raise TTS_OperationError(
f"File '{filename}' has a zero size. "
"Related to incorrect TTS for the target language"
)
def error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename):
traceback.print_exc()
logger.error(f"Error: {str(error)}")
try:
from tempfile import TemporaryFile
tts = gTTS(segment["text"], lang=fix_code_language(TRANSLATE_AUDIO_TO))
# tts.save(filename)
f = TemporaryFile()
tts.write_to_fp(f)
# Reset the file pointer to the beginning of the file
f.seek(0)
# Read audio data from the TemporaryFile using soundfile
audio_data, samplerate = sf.read(f)
f.close() # Close the TemporaryFile
sf.write(
filename, audio_data, samplerate, format="ogg", subtype="vorbis"
)
logger.warning(
'TTS auxiliary will be utilized '
f'rather than TTS: {segment["tts_name"]}'
)
verify_saved_file_and_size(filename)
except Exception as error:
logger.critical(f"Error: {str(error)}")
sample_rate_aux = 22050
duration = float(segment["end"]) - float(segment["start"])
data = np.zeros(int(sample_rate_aux * duration)).astype(np.float32)
sf.write(
filename, data, sample_rate_aux, format="ogg", subtype="vorbis"
)
logger.error("Audio will be replaced -> [silent audio].")
verify_saved_file_and_size(filename)
def pad_array(array, sr):
if isinstance(array, list):
array = np.array(array)
if not array.shape[0]:
raise ValueError("The generated audio does not contain any data")
valid_indices = np.where(np.abs(array) > 0.001)[0]
if len(valid_indices) == 0:
logger.debug(f"No valid indices: {array}")
return array
try:
pad_indice = int(0.1 * sr)
start_pad = max(0, valid_indices[0] - pad_indice)
end_pad = min(len(array), valid_indices[-1] + 1 + pad_indice)
padded_array = array[start_pad:end_pad]
return padded_array
except Exception as error:
logger.error(str(error))
return array
# =====================================
# EDGE TTS
# =====================================
def edge_tts_voices_list():
try:
completed_process = subprocess.run(
["edge-tts", "--list-voices"], capture_output=True, text=True
)
lines = completed_process.stdout.strip().split("\n")
except Exception as error:
logger.debug(str(error))
lines = []
voices = []
for line in lines:
if line.startswith("Name: "):
voice_entry = {}
voice_entry["Name"] = line.split(": ")[1]
elif line.startswith("Gender: "):
voice_entry["Gender"] = line.split(": ")[1]
voices.append(voice_entry)
formatted_voices = [
f"{entry['Name']}-{entry['Gender']}" for entry in voices
]
if not formatted_voices:
logger.warning(
"The list of Edge TTS voices could not be obtained, "
"switching to an alternative method"
)
tts_voice_list = asyncio.new_event_loop().run_until_complete(
edge_tts.list_voices()
)
formatted_voices = sorted(
[f"{v['ShortName']}-{v['Gender']}" for v in tts_voice_list]
)
if not formatted_voices:
logger.error("Can't get EDGE TTS - list voices")
return formatted_voices
def segments_egde_tts(filtered_edge_segments, TRANSLATE_AUDIO_TO, is_gui):
for segment in tqdm(filtered_edge_segments["segments"]):
speaker = segment["speaker"] # noqa
text = segment["text"]
start = segment["start"]
tts_name = segment["tts_name"]
# make the tts audio
filename = f"audio/{start}.ogg"
temp_file = filename[:-3] + "mp3"
logger.info(f"{text} >> {filename}")
try:
if is_gui:
asyncio.run(
edge_tts.Communicate(
text, "-".join(tts_name.split("-")[:-1])
).save(temp_file)
)
else:
# nest_asyncio.apply() if not is_gui else None
command = f'edge-tts -t "{text}" -v "{tts_name.replace("-Male", "").replace("-Female", "")}" --write-media "{temp_file}"'
run_command(command)
verify_saved_file_and_size(temp_file)
data, sample_rate = sf.read(temp_file)
data = pad_array(data, sample_rate)
# os.remove(temp_file)
# Save file
sf.write(
file=filename,
samplerate=sample_rate,
data=data,
format="ogg",
subtype="vorbis",
)
verify_saved_file_and_size(filename)
except Exception as error:
error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename)
# =====================================
# BARK TTS
# =====================================
def segments_bark_tts(
filtered_bark_segments, TRANSLATE_AUDIO_TO, model_id_bark="suno/bark-small"
):
from transformers import AutoProcessor, BarkModel
from optimum.bettertransformer import BetterTransformer
device = os.environ.get("SONITR_DEVICE")
torch_dtype_env = torch.float16 if device == "cuda" else torch.float32
# load model bark
model = BarkModel.from_pretrained(
model_id_bark, torch_dtype=torch_dtype_env
).to(device)
model = model.to(device)
processor = AutoProcessor.from_pretrained(
model_id_bark, return_tensors="pt"
) # , padding=True
if device == "cuda":
# convert to bettertransformer
model = BetterTransformer.transform(model, keep_original_model=False)
# enable CPU offload
# model.enable_cpu_offload()
sampling_rate = model.generation_config.sample_rate
# filtered_segments = filtered_bark_segments['segments']
# Sorting the segments by 'tts_name'
# sorted_segments = sorted(filtered_segments, key=lambda x: x['tts_name'])
# logger.debug(sorted_segments)
for segment in tqdm(filtered_bark_segments["segments"]):
speaker = segment["speaker"] # noqa
text = segment["text"]
start = segment["start"]
tts_name = segment["tts_name"]
inputs = processor(text, voice_preset=BARK_VOICES_LIST[tts_name]).to(
device
)
# make the tts audio
filename = f"audio/{start}.ogg"
logger.info(f"{text} >> {filename}")
try:
# Infer
with torch.inference_mode():
speech_output = model.generate(
**inputs,
do_sample=True,
fine_temperature=0.4,
coarse_temperature=0.8,
pad_token_id=processor.tokenizer.pad_token_id,
)
# Save file
data_tts = pad_array(
speech_output.cpu().numpy().squeeze().astype(np.float32),
sampling_rate,
)
sf.write(
file=filename,
samplerate=sampling_rate,
data=data_tts,
format="ogg",
subtype="vorbis",
)
verify_saved_file_and_size(filename)
except Exception as error:
error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename)
gc.collect()
torch.cuda.empty_cache()
try:
del processor
del model
gc.collect()
torch.cuda.empty_cache()
except Exception as error:
logger.error(str(error))
gc.collect()
torch.cuda.empty_cache()
# =====================================
# VITS TTS
# =====================================
def uromanize(input_string):
"""Convert non-Roman strings to Roman using the `uroman` perl package."""
# script_path = os.path.join(uroman_path, "bin", "uroman.pl")
if not os.path.exists("./uroman"):
logger.info(
"Clonning repository uroman https://github.com/isi-nlp/uroman.git"
" for romanize the text"
)
process = subprocess.Popen(
["git", "clone", "https://github.com/isi-nlp/uroman.git"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = process.communicate()
script_path = os.path.join("./uroman", "bin", "uroman.pl")
command = ["perl", script_path]
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# Execute the perl command
stdout, stderr = process.communicate(input=input_string.encode())
if process.returncode != 0:
raise ValueError(f"Error {process.returncode}: {stderr.decode()}")
# Return the output as a string and skip the new-line character at the end
return stdout.decode()[:-1]
def segments_vits_tts(filtered_vits_segments, TRANSLATE_AUDIO_TO):
from transformers import VitsModel, AutoTokenizer
filtered_segments = filtered_vits_segments["segments"]
# Sorting the segments by 'tts_name'
sorted_segments = sorted(filtered_segments, key=lambda x: x["tts_name"])
logger.debug(sorted_segments)
model_name_key = None
for segment in tqdm(sorted_segments):
speaker = segment["speaker"] # noqa
text = segment["text"]
start = segment["start"]
tts_name = segment["tts_name"]
if tts_name != model_name_key:
model_name_key = tts_name
model = VitsModel.from_pretrained(VITS_VOICES_LIST[tts_name])
tokenizer = AutoTokenizer.from_pretrained(
VITS_VOICES_LIST[tts_name]
)
sampling_rate = model.config.sampling_rate
if tokenizer.is_uroman:
romanize_text = uromanize(text)
logger.debug(f"Romanize text: {romanize_text}")
inputs = tokenizer(romanize_text, return_tensors="pt")
else:
inputs = tokenizer(text, return_tensors="pt")
# make the tts audio
filename = f"audio/{start}.ogg"
logger.info(f"{text} >> {filename}")
try:
# Infer
with torch.no_grad():
speech_output = model(**inputs).waveform
data_tts = pad_array(
speech_output.cpu().numpy().squeeze().astype(np.float32),
sampling_rate,
)
# Save file
sf.write(
file=filename,
samplerate=sampling_rate,
data=data_tts,
format="ogg",
subtype="vorbis",
)
verify_saved_file_and_size(filename)
except Exception as error:
error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename)
gc.collect()
torch.cuda.empty_cache()
try:
del tokenizer
del model
gc.collect()
torch.cuda.empty_cache()
except Exception as error:
logger.error(str(error))
gc.collect()
torch.cuda.empty_cache()
# =====================================
# Coqui XTTS
# =====================================
def coqui_xtts_voices_list():
main_folder = "_XTTS_"
pattern_coqui = re.compile(r".+\.(wav|mp3|ogg|m4a)$")
pattern_automatic_speaker = re.compile(r"AUTOMATIC_SPEAKER_\d+\.wav$")
# List only files in the directory matching the pattern but not matching
# AUTOMATIC_SPEAKER_00.wav, AUTOMATIC_SPEAKER_01.wav, etc.
wav_voices = [
"_XTTS_/" + f
for f in os.listdir(main_folder)
if os.path.isfile(os.path.join(main_folder, f))
and pattern_coqui.match(f)
and not pattern_automatic_speaker.match(f)
]
return ["_XTTS_/AUTOMATIC.wav"] + wav_voices
def seconds_to_hhmmss_ms(seconds):
hours = seconds // 3600
minutes = (seconds % 3600) // 60
seconds = seconds % 60
milliseconds = int((seconds - int(seconds)) * 1000)
return "%02d:%02d:%02d.%03d" % (hours, minutes, int(seconds), milliseconds)
def audio_trimming(audio_path, destination, start, end):
if isinstance(start, (int, float)):
start = seconds_to_hhmmss_ms(start)
if isinstance(end, (int, float)):
end = seconds_to_hhmmss_ms(end)
if destination:
file_directory = destination
else:
file_directory = os.path.dirname(audio_path)
file_name = os.path.splitext(os.path.basename(audio_path))[0]
file_ = f"{file_name}_trim.wav"
# file_ = f'{os.path.splitext(audio_path)[0]}_trim.wav'
output_path = os.path.join(file_directory, file_)
# -t (duration from -ss) | -to (time stop) | -af silenceremove=1:0:-50dB (remove silence)
command = f'ffmpeg -y -loglevel error -i "{audio_path}" -ss {start} -to {end} -acodec pcm_s16le -f wav "{output_path}"'
run_command(command)
return output_path
def convert_to_xtts_good_sample(audio_path: str = "", destination: str = ""):
if destination:
file_directory = destination
else:
file_directory = os.path.dirname(audio_path)
file_name = os.path.splitext(os.path.basename(audio_path))[0]
file_ = f"{file_name}_good_sample.wav"
# file_ = f'{os.path.splitext(audio_path)[0]}_good_sample.wav'
mono_path = os.path.join(file_directory, file_) # get root
command = f'ffmpeg -y -loglevel error -i "{audio_path}" -ac 1 -ar 22050 -sample_fmt s16 -f wav "{mono_path}"'
run_command(command)
return mono_path
def sanitize_file_name(file_name):
import unicodedata
# Normalize the string to NFKD form to separate combined characters into
# base characters and diacritics
normalized_name = unicodedata.normalize("NFKD", file_name)
# Replace any non-ASCII characters or special symbols with an underscore
sanitized_name = re.sub(r"[^\w\s.-]", "_", normalized_name)
return sanitized_name
def create_wav_file_vc(
sample_name="", # name final file
audio_wav="", # path
start=None, # trim start
end=None, # trim end
output_final_path="_XTTS_",
get_vocals_dereverb=True,
):
sample_name = sample_name if sample_name else "default_name"
sample_name = sanitize_file_name(sample_name)
audio_wav = audio_wav if isinstance(audio_wav, str) else audio_wav.name
BASE_DIR = (
"." # os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
output_dir = os.path.join(BASE_DIR, "clean_song_output") # remove content
# remove_directory_contents(output_dir)
if start or end:
# Cut file
audio_segment = audio_trimming(audio_wav, output_dir, start, end)
else:
# Complete file
audio_segment = audio_wav
from .mdx_net import process_uvr_task
try:
_, _, _, _, audio_segment = process_uvr_task(
orig_song_path=audio_segment,
main_vocals=True,
dereverb=get_vocals_dereverb,
)
except Exception as error:
logger.error(str(error))
sample = convert_to_xtts_good_sample(audio_segment)
sample_name = f"{sample_name}.wav"
sample_rename = rename_file(sample, sample_name)
copy_files(sample_rename, output_final_path)
final_sample = os.path.join(output_final_path, sample_name)
if os.path.exists(final_sample):
logger.info(final_sample)
return final_sample
else:
raise Exception(f"Error wav: {final_sample}")
def create_new_files_for_vc(
speakers_coqui,
segments_base,
dereverb_automatic=True
):
# before function delete automatic delete_previous_automatic
output_dir = os.path.join(".", "clean_song_output") # remove content
remove_directory_contents(output_dir)
for speaker in speakers_coqui:
filtered_speaker = [
segment
for segment in segments_base
if segment["speaker"] == speaker
]
if len(filtered_speaker) > 4:
filtered_speaker = filtered_speaker[1:]
if filtered_speaker[0]["tts_name"] == "_XTTS_/AUTOMATIC.wav":
name_automatic_wav = f"AUTOMATIC_{speaker}"
if os.path.exists(f"_XTTS_/{name_automatic_wav}.wav"):
logger.info(f"WAV automatic {speaker} exists")
# path_wav = path_automatic_wav
pass
else:
# create wav
wav_ok = False
for seg in filtered_speaker:
duration = float(seg["end"]) - float(seg["start"])
if duration > 7.0 and duration < 12.0:
logger.info(
f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {duration}, {seg["text"]}'
)
create_wav_file_vc(
sample_name=name_automatic_wav,
audio_wav="audio.wav",
start=(float(seg["start"]) + 1.0),
end=(float(seg["end"]) - 1.0),
get_vocals_dereverb=dereverb_automatic,
)
wav_ok = True
break
if not wav_ok:
logger.info("Taking the first segment")
seg = filtered_speaker[0]
logger.info(
f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {seg["text"]}'
)
max_duration = float(seg["end"]) - float(seg["start"])
max_duration = max(2.0, min(max_duration, 9.0))
create_wav_file_vc(
sample_name=name_automatic_wav,
audio_wav="audio.wav",
start=(float(seg["start"])),
end=(float(seg["start"]) + max_duration),
get_vocals_dereverb=dereverb_automatic,
)
def segments_coqui_tts(
filtered_coqui_segments,
TRANSLATE_AUDIO_TO,
model_id_coqui="tts_models/multilingual/multi-dataset/xtts_v2",
speakers_coqui=None,
delete_previous_automatic=True,
dereverb_automatic=True,
emotion=None,
):
"""XTTS
Install:
pip install -q TTS==0.21.1
pip install -q numpy==1.23.5
Notes:
- tts_name is the wav|mp3|ogg|m4a file for VC
"""
from TTS.api import TTS
TRANSLATE_AUDIO_TO = fix_code_language(TRANSLATE_AUDIO_TO, syntax="coqui")
supported_lang_coqui = [
"zh-cn",
"en",
"fr",
"de",
"it",
"pt",
"pl",
"tr",
"ru",
"nl",
"cs",
"ar",
"es",
"hu",
"ko",
"ja",
]
if TRANSLATE_AUDIO_TO not in supported_lang_coqui:
raise TTS_OperationError(
f"'{TRANSLATE_AUDIO_TO}' is not a supported language for Coqui XTTS"
)
# Emotion and speed can only be used with Coqui Studio models. discontinued
# emotions = ["Neutral", "Happy", "Sad", "Angry", "Dull"]
if delete_previous_automatic:
for spk in speakers_coqui:
remove_files(f"_XTTS_/AUTOMATIC_{spk}.wav")
directory_audios_vc = "_XTTS_"
create_directories(directory_audios_vc)
create_new_files_for_vc(
speakers_coqui,
filtered_coqui_segments["segments"],
dereverb_automatic,
)
# Init TTS
device = os.environ.get("SONITR_DEVICE")
model = TTS(model_id_coqui).to(device)
sampling_rate = 24000
# filtered_segments = filtered_coqui_segments['segments']
# Sorting the segments by 'tts_name'
# sorted_segments = sorted(filtered_segments, key=lambda x: x['tts_name'])
# logger.debug(sorted_segments)
for segment in tqdm(filtered_coqui_segments["segments"]):
speaker = segment["speaker"]
text = segment["text"]
start = segment["start"]
tts_name = segment["tts_name"]
if tts_name == "_XTTS_/AUTOMATIC.wav":
tts_name = f"_XTTS_/AUTOMATIC_{speaker}.wav"
# make the tts audio
filename = f"audio/{start}.ogg"
logger.info(f"{text} >> {filename}")
try:
# Infer
wav = model.tts(
text=text, speaker_wav=tts_name, language=TRANSLATE_AUDIO_TO
)
data_tts = pad_array(
wav,
sampling_rate,
)
# Save file
sf.write(
file=filename,
samplerate=sampling_rate,
data=data_tts,
format="ogg",
subtype="vorbis",
)
verify_saved_file_and_size(filename)
except Exception as error:
error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename)
gc.collect()
torch.cuda.empty_cache()
try:
del model
gc.collect()
torch.cuda.empty_cache()
except Exception as error:
logger.error(str(error))
gc.collect()
torch.cuda.empty_cache()
# =====================================
# PIPER TTS
# =====================================
def piper_tts_voices_list():
file_path = download_manager(
url="https://huggingface.co/rhasspy/piper-voices/resolve/main/voices.json",
path="./PIPER_MODELS",
)
with open(file_path, "r", encoding="utf8") as file:
data = json.load(file)
piper_id_models = [key + " VITS-onnx" for key in data.keys()]
return piper_id_models
def replace_text_in_json(file_path, key_to_replace, new_text, condition=None):
# Read the JSON file
with open(file_path, "r", encoding="utf-8") as file:
data = json.load(file)
# Modify the specified key's value with the new text
if key_to_replace in data:
if condition:
value_condition = condition
else:
value_condition = data[key_to_replace]
if data[key_to_replace] == value_condition:
data[key_to_replace] = new_text
# Write the modified content back to the JSON file
with open(file_path, "w") as file:
json.dump(
data, file, indent=2
) # Write the modified data back to the file with indentation for readability
def load_piper_model(
model: str,
data_dir: list,
download_dir: str = "",
update_voices: bool = False,
):
from piper import PiperVoice
from piper.download import ensure_voice_exists, find_voice, get_voices
try:
import onnxruntime as rt
if rt.get_device() == "GPU" and os.environ.get("SONITR_DEVICE") == "cuda":
logger.debug("onnxruntime device > GPU")
cuda = True
else:
logger.info(
"onnxruntime device > CPU"
) # try pip install onnxruntime-gpu
cuda = False
except Exception as error:
raise TTS_OperationError(f"onnxruntime error: {str(error)}")
# Disable CUDA in Windows
if platform.system() == "Windows":
logger.info("Employing CPU exclusivity with Piper TTS")
cuda = False
if not download_dir:
# Download to first data directory by default
download_dir = data_dir[0]
else:
data_dir = [os.path.join(data_dir[0], download_dir)]
# Download voice if file doesn't exist
model_path = Path(model)
if not model_path.exists():
# Load voice info
voices_info = get_voices(download_dir, update_voices=update_voices)
# Resolve aliases for backwards compatibility with old voice names
aliases_info: Dict[str, Any] = {}
for voice_info in voices_info.values():
for voice_alias in voice_info.get("aliases", []):
aliases_info[voice_alias] = {"_is_alias": True, **voice_info}
voices_info.update(aliases_info)
ensure_voice_exists(model, data_dir, download_dir, voices_info)
model, config = find_voice(model, data_dir)
replace_text_in_json(
config, "phoneme_type", "espeak", "PhonemeType.ESPEAK"
)
# Load voice
voice = PiperVoice.load(model, config_path=config, use_cuda=cuda)
return voice
def synthesize_text_to_audio_np_array(voice, text, synthesize_args):
audio_stream = voice.synthesize_stream_raw(text, **synthesize_args)
# Collect the audio bytes into a single NumPy array
audio_data = b""
for audio_bytes in audio_stream:
audio_data += audio_bytes
# Ensure correct data type and convert audio bytes to NumPy array
audio_np = np.frombuffer(audio_data, dtype=np.int16)
return audio_np
def segments_vits_onnx_tts(filtered_onnx_vits_segments, TRANSLATE_AUDIO_TO):
"""
Install:
pip install -q piper-tts==1.2.0 onnxruntime-gpu # for cuda118
"""
data_dir = [
str(Path.cwd())
] # "Data directory to check for downloaded models (default: current directory)"
download_dir = "PIPER_MODELS"
# model_name = "en_US-lessac-medium" tts_name in a dict like VITS
update_voices = True # "Download latest voices.json during startup",
synthesize_args = {
"speaker_id": None,
"length_scale": 1.0,
"noise_scale": 0.667,
"noise_w": 0.8,
"sentence_silence": 0.0,
}
filtered_segments = filtered_onnx_vits_segments["segments"]
# Sorting the segments by 'tts_name'
sorted_segments = sorted(filtered_segments, key=lambda x: x["tts_name"])
logger.debug(sorted_segments)
model_name_key = None
for segment in tqdm(sorted_segments):
speaker = segment["speaker"] # noqa
text = segment["text"]
start = segment["start"]
tts_name = segment["tts_name"].replace(" VITS-onnx", "")
if tts_name != model_name_key:
model_name_key = tts_name
model = load_piper_model(
tts_name, data_dir, download_dir, update_voices
)
sampling_rate = model.config.sample_rate
# make the tts audio
filename = f"audio/{start}.ogg"
logger.info(f"{text} >> {filename}")
try:
# Infer
speech_output = synthesize_text_to_audio_np_array(
model, text, synthesize_args
)
data_tts = pad_array(
speech_output, # .cpu().numpy().squeeze().astype(np.float32),
sampling_rate,
)
# Save file
sf.write(
file=filename,
samplerate=sampling_rate,
data=data_tts,
format="ogg",
subtype="vorbis",
)
verify_saved_file_and_size(filename)
except Exception as error:
error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename)
gc.collect()
torch.cuda.empty_cache()
try:
del model
gc.collect()
torch.cuda.empty_cache()
except Exception as error:
logger.error(str(error))
gc.collect()
torch.cuda.empty_cache()
# =====================================
# CLOSEAI TTS
# =====================================
def segments_openai_tts(
filtered_openai_tts_segments, TRANSLATE_AUDIO_TO
):
from openai import OpenAI
client = OpenAI()
sampling_rate = 24000
# filtered_segments = filtered_openai_tts_segments['segments']
# Sorting the segments by 'tts_name'
# sorted_segments = sorted(filtered_segments, key=lambda x: x['tts_name'])
for segment in tqdm(filtered_openai_tts_segments["segments"]):
speaker = segment["speaker"] # noqa
text = segment["text"].strip()
start = segment["start"]
tts_name = segment["tts_name"]
# make the tts audio
filename = f"audio/{start}.ogg"
logger.info(f"{text} >> {filename}")
try:
# Request
response = client.audio.speech.create(
model="tts-1-hd" if "HD" in tts_name else "tts-1",
voice=tts_name.split()[0][1:],
response_format="wav",
input=text
)
audio_bytes = b''
for data in response.iter_bytes(chunk_size=4096):
audio_bytes += data
speech_output = np.frombuffer(audio_bytes, dtype=np.int16)
# Save file
data_tts = pad_array(
speech_output[240:],
sampling_rate,
)
sf.write(
file=filename,
samplerate=sampling_rate,
data=data_tts,
format="ogg",
subtype="vorbis",
)
verify_saved_file_and_size(filename)
except Exception as error:
error_handling_in_tts(error, segment, TRANSLATE_AUDIO_TO, filename)
# =====================================
# Select task TTS
# =====================================
def find_spkr(pattern, speaker_to_voice, segments):
return [
speaker
for speaker, voice in speaker_to_voice.items()
if pattern.match(voice) and any(
segment["speaker"] == speaker for segment in segments
)
]
def filter_by_speaker(speakers, segments):
return {
"segments": [
segment
for segment in segments
if segment["speaker"] in speakers
]
}
def audio_segmentation_to_voice(
result_diarize,
TRANSLATE_AUDIO_TO,
is_gui,
tts_voice00,
tts_voice01="",
tts_voice02="",
tts_voice03="",
tts_voice04="",
tts_voice05="",
tts_voice06="",
tts_voice07="",
tts_voice08="",
tts_voice09="",
tts_voice10="",
tts_voice11="",
dereverb_automatic=True,
model_id_bark="suno/bark-small",
model_id_coqui="tts_models/multilingual/multi-dataset/xtts_v2",
delete_previous_automatic=True,
):
remove_directory_contents("audio")
# Mapping speakers to voice variables
speaker_to_voice = {
"SPEAKER_00": tts_voice00,
"SPEAKER_01": tts_voice01,
"SPEAKER_02": tts_voice02,
"SPEAKER_03": tts_voice03,
"SPEAKER_04": tts_voice04,
"SPEAKER_05": tts_voice05,
"SPEAKER_06": tts_voice06,
"SPEAKER_07": tts_voice07,
"SPEAKER_08": tts_voice08,
"SPEAKER_09": tts_voice09,
"SPEAKER_10": tts_voice10,
"SPEAKER_11": tts_voice11,
}
# Assign 'SPEAKER_00' to segments without a 'speaker' key
for segment in result_diarize["segments"]:
if "speaker" not in segment:
segment["speaker"] = "SPEAKER_00"
logger.warning(
"NO SPEAKER DETECT IN SEGMENT: First TTS will be used in the"
f" segment time {segment['start'], segment['text']}"
)
# Assign the TTS name
segment["tts_name"] = speaker_to_voice[segment["speaker"]]
# Find TTS method
pattern_edge = re.compile(r".*-(Male|Female)$")
pattern_bark = re.compile(r".* BARK$")
pattern_vits = re.compile(r".* VITS$")
pattern_coqui = re.compile(r".+\.(wav|mp3|ogg|m4a)$")
pattern_vits_onnx = re.compile(r".* VITS-onnx$")
pattern_openai_tts = re.compile(r".* OpenAI-TTS$")
all_segments = result_diarize["segments"]
speakers_edge = find_spkr(pattern_edge, speaker_to_voice, all_segments)
speakers_bark = find_spkr(pattern_bark, speaker_to_voice, all_segments)
speakers_vits = find_spkr(pattern_vits, speaker_to_voice, all_segments)
speakers_coqui = find_spkr(pattern_coqui, speaker_to_voice, all_segments)
speakers_vits_onnx = find_spkr(
pattern_vits_onnx, speaker_to_voice, all_segments
)
speakers_openai_tts = find_spkr(
pattern_openai_tts, speaker_to_voice, all_segments
)
# Filter method in segments
filtered_edge = filter_by_speaker(speakers_edge, all_segments)
filtered_bark = filter_by_speaker(speakers_bark, all_segments)
filtered_vits = filter_by_speaker(speakers_vits, all_segments)
filtered_coqui = filter_by_speaker(speakers_coqui, all_segments)
filtered_vits_onnx = filter_by_speaker(speakers_vits_onnx, all_segments)
filtered_openai_tts = filter_by_speaker(speakers_openai_tts, all_segments)
# Infer
if filtered_edge["segments"]:
logger.info(f"EDGE TTS: {speakers_edge}")
segments_egde_tts(filtered_edge, TRANSLATE_AUDIO_TO, is_gui) # mp3
if filtered_bark["segments"]:
logger.info(f"BARK TTS: {speakers_bark}")
segments_bark_tts(
filtered_bark, TRANSLATE_AUDIO_TO, model_id_bark
) # wav
if filtered_vits["segments"]:
logger.info(f"VITS TTS: {speakers_vits}")
segments_vits_tts(filtered_vits, TRANSLATE_AUDIO_TO) # wav
if filtered_coqui["segments"]:
logger.info(f"Coqui TTS: {speakers_coqui}")
segments_coqui_tts(
filtered_coqui,
TRANSLATE_AUDIO_TO,
model_id_coqui,
speakers_coqui,
delete_previous_automatic,
dereverb_automatic,
) # wav
if filtered_vits_onnx["segments"]:
logger.info(f"PIPER TTS: {speakers_vits_onnx}")
segments_vits_onnx_tts(filtered_vits_onnx, TRANSLATE_AUDIO_TO) # wav
if filtered_openai_tts["segments"]:
logger.info(f"OpenAI TTS: {speakers_openai_tts}")
segments_openai_tts(filtered_openai_tts, TRANSLATE_AUDIO_TO) # wav
[result.pop("tts_name", None) for result in result_diarize["segments"]]
return [
speakers_edge,
speakers_bark,
speakers_vits,
speakers_coqui,
speakers_vits_onnx,
speakers_openai_tts
]
def accelerate_segments(
result_diarize,
max_accelerate_audio,
valid_speakers,
acceleration_rate_regulation=False,
folder_output="audio2",
):
logger.info("Apply acceleration")
(
speakers_edge,
speakers_bark,
speakers_vits,
speakers_coqui,
speakers_vits_onnx,
speakers_openai_tts
) = valid_speakers
create_directories(f"{folder_output}/audio/")
remove_directory_contents(f"{folder_output}/audio/")
audio_files = []
speakers_list = []
max_count_segments_idx = len(result_diarize["segments"]) - 1
for i, segment in tqdm(enumerate(result_diarize["segments"])):
text = segment["text"] # noqa
start = segment["start"]
end = segment["end"]
speaker = segment["speaker"]
# find name audio
# if speaker in speakers_edge:
filename = f"audio/{start}.ogg"
# elif speaker in speakers_bark + speakers_vits + speakers_coqui + speakers_vits_onnx:
# filename = f"audio/{start}.wav" # wav
# duration
duration_true = end - start
duration_tts = librosa.get_duration(filename=filename)
# Accelerate percentage
acc_percentage = duration_tts / duration_true
# Smoth
if acceleration_rate_regulation and acc_percentage >= 1.3:
try:
next_segment = result_diarize["segments"][
min(max_count_segments_idx, i + 1)
]
next_start = next_segment["start"]
next_speaker = next_segment["speaker"]
duration_with_next_start = next_start - start
if duration_with_next_start > duration_true:
extra_time = duration_with_next_start - duration_true
if speaker == next_speaker:
# half
smoth_duration = duration_true + (extra_time * 0.5)
else:
# 7/10
smoth_duration = duration_true + (extra_time * 0.7)
logger.debug(
f"Base acc: {acc_percentage}, "
f"smoth acc: {duration_tts / smoth_duration}"
)
acc_percentage = max(1.2, (duration_tts / smoth_duration))
except Exception as error:
logger.error(str(error))
if acc_percentage > max_accelerate_audio:
acc_percentage = max_accelerate_audio
elif acc_percentage <= 1.15 and acc_percentage >= 0.8:
acc_percentage = 1.0
elif acc_percentage <= 0.79:
acc_percentage = 0.8
# Round
acc_percentage = round(acc_percentage + 0.0, 1)
# Format read if need
if speaker in speakers_edge:
info_enc = sf.info(filename).format
else:
info_enc = "OGG"
# Apply aceleration or opposite to the audio file in folder_output folder
if acc_percentage == 1.0 and info_enc == "OGG":
copy_files(filename, f"{folder_output}{os.sep}audio")
else:
os.system(
f"ffmpeg -y -loglevel panic -i {filename} -filter:a atempo={acc_percentage} {folder_output}/{filename}"
)
if logger.isEnabledFor(logging.DEBUG):
duration_create = librosa.get_duration(
filename=f"{folder_output}/{filename}"
)
logger.debug(
f"acc_percen is {acc_percentage}, tts duration "
f"is {duration_tts}, new duration is {duration_create}"
f", for {filename}"
)
audio_files.append(f"{folder_output}/{filename}")
speaker = "TTS Speaker {:02d}".format(int(speaker[-2:]) + 1)
speakers_list.append(speaker)
return audio_files, speakers_list
# =====================================
# Tone color converter
# =====================================
def se_process_audio_segments(
source_seg, tone_color_converter, device, remove_previous_processed=True
):
# list wav seg
source_audio_segs = glob.glob(f"{source_seg}/*.wav")
if not source_audio_segs:
raise ValueError(
f"No audio segments found in {str(source_audio_segs)}"
)
source_se_path = os.path.join(source_seg, "se.pth")
# if exist not create wav
if os.path.isfile(source_se_path):
se = torch.load(source_se_path).to(device)
logger.debug(f"Previous created {source_se_path}")
else:
se = tone_color_converter.extract_se(source_audio_segs, source_se_path)
return se
def create_wav_vc(
valid_speakers,
segments_base,
audio_name,
max_segments=10,
target_dir="processed",
get_vocals_dereverb=False,
):
# valid_speakers = list({item['speaker'] for item in segments_base})
# Before function delete automatic delete_previous_automatic
output_dir = os.path.join(".", target_dir) # remove content
# remove_directory_contents(output_dir)
path_source_segments = []
path_target_segments = []
for speaker in valid_speakers:
filtered_speaker = [
segment
for segment in segments_base
if segment["speaker"] == speaker
]
if len(filtered_speaker) > 4:
filtered_speaker = filtered_speaker[1:]
dir_name_speaker = speaker + audio_name
dir_name_speaker_tts = "tts" + speaker + audio_name
dir_path_speaker = os.path.join(output_dir, dir_name_speaker)
dir_path_speaker_tts = os.path.join(output_dir, dir_name_speaker_tts)
create_directories([dir_path_speaker, dir_path_speaker_tts])
path_target_segments.append(dir_path_speaker)
path_source_segments.append(dir_path_speaker_tts)
# create wav
max_segments_count = 0
for seg in filtered_speaker:
duration = float(seg["end"]) - float(seg["start"])
if duration > 3.0 and duration < 18.0:
logger.info(
f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {duration}, {seg["text"]}'
)
name_new_wav = str(seg["start"])
check_segment_audio_target_file = os.path.join(
dir_path_speaker, f"{name_new_wav}.wav"
)
if os.path.exists(check_segment_audio_target_file):
logger.debug(
"Segment vc source exists: "
f"{check_segment_audio_target_file}"
)
pass
else:
create_wav_file_vc(
sample_name=name_new_wav,
audio_wav="audio.wav",
start=(float(seg["start"]) + 1.0),
end=(float(seg["end"]) - 1.0),
output_final_path=dir_path_speaker,
get_vocals_dereverb=get_vocals_dereverb,
)
file_name_tts = f"audio2/audio/{str(seg['start'])}.ogg"
# copy_files(file_name_tts, os.path.join(output_dir, dir_name_speaker_tts)
convert_to_xtts_good_sample(
file_name_tts, dir_path_speaker_tts
)
max_segments_count += 1
if max_segments_count == max_segments:
break
if max_segments_count == 0:
logger.info("Taking the first segment")
seg = filtered_speaker[0]
logger.info(
f'Processing segment: {seg["start"]}, {seg["end"]}, {seg["speaker"]}, {seg["text"]}'
)
max_duration = float(seg["end"]) - float(seg["start"])
max_duration = max(1.0, min(max_duration, 18.0))
name_new_wav = str(seg["start"])
create_wav_file_vc(
sample_name=name_new_wav,
audio_wav="audio.wav",
start=(float(seg["start"])),
end=(float(seg["start"]) + max_duration),
output_final_path=dir_path_speaker,
get_vocals_dereverb=get_vocals_dereverb,
)
file_name_tts = f"audio2/audio/{str(seg['start'])}.ogg"
# copy_files(file_name_tts, os.path.join(output_dir, dir_name_speaker_tts)
convert_to_xtts_good_sample(file_name_tts, dir_path_speaker_tts)
logger.debug(f"Base: {str(path_source_segments)}")
logger.debug(f"Target: {str(path_target_segments)}")
return path_source_segments, path_target_segments
def toneconverter_openvoice(
result_diarize,
preprocessor_max_segments,
remove_previous_process=True,
get_vocals_dereverb=False,
model="openvoice",
):
audio_path = "audio.wav"
# se_path = "se.pth"
target_dir = "processed"
create_directories(target_dir)
from openvoice import se_extractor
from openvoice.api import ToneColorConverter
audio_name = f"{os.path.basename(audio_path).rsplit('.', 1)[0]}_{se_extractor.hash_numpy_array(audio_path)}"
# se_path = os.path.join(target_dir, audio_name, 'se.pth')
# create wav seg original and target
valid_speakers = list(
{item["speaker"] for item in result_diarize["segments"]}
)
logger.info("Openvoice preprocessor...")
if remove_previous_process:
remove_directory_contents(target_dir)
path_source_segments, path_target_segments = create_wav_vc(
valid_speakers,
result_diarize["segments"],
audio_name,
max_segments=preprocessor_max_segments,
get_vocals_dereverb=get_vocals_dereverb,
)
logger.info("Openvoice loading model...")
model_path_openvoice = "./OPENVOICE_MODELS"
url_model_openvoice = "https://huggingface.co/myshell-ai/OpenVoice/resolve/main/checkpoints/converter"
if "v2" in model:
model_path = os.path.join(model_path_openvoice, "v2")
url_model_openvoice = url_model_openvoice.replace(
"OpenVoice", "OpenVoiceV2"
).replace("checkpoints/", "")
else:
model_path = os.path.join(model_path_openvoice, "v1")
create_directories(model_path)
config_url = f"{url_model_openvoice}/config.json"
checkpoint_url = f"{url_model_openvoice}/checkpoint.pth"
config_path = download_manager(url=config_url, path=model_path)
checkpoint_path = download_manager(
url=checkpoint_url, path=model_path
)
device = os.environ.get("SONITR_DEVICE")
tone_color_converter = ToneColorConverter(config_path, device=device)
tone_color_converter.load_ckpt(checkpoint_path)
logger.info("Openvoice tone color converter:")
global_progress_bar = tqdm(total=len(result_diarize["segments"]), desc="Progress")
for source_seg, target_seg, speaker in zip(
path_source_segments, path_target_segments, valid_speakers
):
# source_se_path = os.path.join(source_seg, 'se.pth')
source_se = se_process_audio_segments(source_seg, tone_color_converter, device)
# target_se_path = os.path.join(target_seg, 'se.pth')
target_se = se_process_audio_segments(target_seg, tone_color_converter, device)
# Iterate throw segments
encode_message = "@MyShell"
filtered_speaker = [
segment
for segment in result_diarize["segments"]
if segment["speaker"] == speaker
]
for seg in filtered_speaker:
src_path = (
save_path
) = f"audio2/audio/{str(seg['start'])}.ogg" # overwrite
logger.debug(f"{src_path}")
tone_color_converter.convert(
audio_src_path=src_path,
src_se=source_se,
tgt_se=target_se,
output_path=save_path,
message=encode_message,
)
global_progress_bar.update(1)
global_progress_bar.close()
try:
del tone_color_converter
gc.collect()
torch.cuda.empty_cache()
except Exception as error:
logger.error(str(error))
gc.collect()
torch.cuda.empty_cache()
def toneconverter_freevc(
result_diarize,
remove_previous_process=True,
get_vocals_dereverb=False,
):
audio_path = "audio.wav"
target_dir = "processed"
create_directories(target_dir)
from openvoice import se_extractor
audio_name = f"{os.path.basename(audio_path).rsplit('.', 1)[0]}_{se_extractor.hash_numpy_array(audio_path)}"
# create wav seg; original is target and dubbing is source
valid_speakers = list(
{item["speaker"] for item in result_diarize["segments"]}
)
logger.info("FreeVC preprocessor...")
if remove_previous_process:
remove_directory_contents(target_dir)
path_source_segments, path_target_segments = create_wav_vc(
valid_speakers,
result_diarize["segments"],
audio_name,
max_segments=1,
get_vocals_dereverb=get_vocals_dereverb,
)
logger.info("FreeVC loading model...")
device_id = os.environ.get("SONITR_DEVICE")
device = None if device_id == "cpu" else device_id
try:
from TTS.api import TTS
tts = TTS(
model_name="voice_conversion_models/multilingual/vctk/freevc24",
progress_bar=False
).to(device)
except Exception as error:
logger.error(str(error))
logger.error("Error loading the FreeVC model.")
return
logger.info("FreeVC process:")
global_progress_bar = tqdm(total=len(result_diarize["segments"]), desc="Progress")
for source_seg, target_seg, speaker in zip(
path_source_segments, path_target_segments, valid_speakers
):
filtered_speaker = [
segment
for segment in result_diarize["segments"]
if segment["speaker"] == speaker
]
files_and_directories = os.listdir(target_seg)
wav_files = [file for file in files_and_directories if file.endswith(".wav")]
original_wav_audio_segment = os.path.join(target_seg, wav_files[0])
for seg in filtered_speaker:
src_path = (
save_path
) = f"audio2/audio/{str(seg['start'])}.ogg" # overwrite
logger.debug(f"{src_path} - {original_wav_audio_segment}")
wav = tts.voice_conversion(
source_wav=src_path,
target_wav=original_wav_audio_segment,
)
sf.write(
file=save_path,
samplerate=tts.voice_converter.vc_config.audio.output_sample_rate,
data=wav,
format="ogg",
subtype="vorbis",
)
global_progress_bar.update(1)
global_progress_bar.close()
try:
del tts
gc.collect()
torch.cuda.empty_cache()
except Exception as error:
logger.error(str(error))
gc.collect()
torch.cuda.empty_cache()
def toneconverter(
result_diarize,
preprocessor_max_segments,
remove_previous_process=True,
get_vocals_dereverb=False,
method_vc="freevc"
):
if method_vc == "freevc":
if preprocessor_max_segments > 1:
logger.info("FreeVC only uses one segment.")
return toneconverter_freevc(
result_diarize,
remove_previous_process=remove_previous_process,
get_vocals_dereverb=get_vocals_dereverb,
)
elif "openvoice" in method_vc:
return toneconverter_openvoice(
result_diarize,
preprocessor_max_segments,
remove_previous_process=remove_previous_process,
get_vocals_dereverb=get_vocals_dereverb,
model=method_vc,
)
if __name__ == "__main__":
from segments import result_diarize
audio_segmentation_to_voice(
result_diarize,
TRANSLATE_AUDIO_TO="en",
max_accelerate_audio=2.1,
is_gui=True,
tts_voice00="en-facebook-mms VITS",
tts_voice01="en-CA-ClaraNeural-Female",
tts_voice02="en-GB-ThomasNeural-Male",
tts_voice03="en-GB-SoniaNeural-Female",
tts_voice04="en-NZ-MitchellNeural-Male",
tts_voice05="en-GB-MaisieNeural-Female",
)