ai-audio-books / src /audio_generators.py
navalnica
async sound effects generation; add text samples; improve UI; add input len check
9281119
raw
history blame
11.3 kB
import asyncio
import os
import re
from pathlib import Path
from uuid import uuid4
import random
from langchain_community.callbacks import get_openai_callback
from pydub import AudioSegment
from src.lc_callbacks import LCMessageLoggerAsync
from src.tts import tts_astream_consumed, sound_generation_consumed
from src.utils import consume_aiter
from src.emotions.generation import (
EffectGeneratorAsync,
TextPreparationForTTSTaskOutput,
)
from src.emotions.utils import add_overlay_for_audio
from src.config import ELEVENLABS_MAX_PARALLEL, logger, OPENAI_MAX_PARALLEL
from src.text_split_chain import SplitTextOutput
class AudioGeneratorSimple:
async def generate_audio(
self,
text_split: SplitTextOutput,
character_to_voice: dict[str, str],
) -> Path:
semaphore = asyncio.Semaphore(ELEVENLABS_MAX_PARALLEL)
async def tts_astream_with_semaphore(voice_id: str, text: str):
async with semaphore:
bytes_ = await tts_astream_consumed(voice_id=voice_id, text=text)
# bytes_ = await consume_aiter(iter_)
return bytes_
tasks = []
for character_phrase in text_split.phrases:
voice_id = character_to_voice[character_phrase.character]
task = tts_astream_with_semaphore(
voice_id=voice_id, text=character_phrase.text
)
tasks.append(task)
results = await asyncio.gather(*tasks)
save_dir = Path("data") / "books"
save_dir.mkdir(exist_ok=True)
audio_combined_fp = save_dir / f"{uuid4()}.wav"
logger.info(f'saving generated audio book to: "{audio_combined_fp}"')
with open(audio_combined_fp, "wb") as ab:
for result in results:
for chunk in result:
ab.write(chunk)
return audio_combined_fp
class AudioGeneratorWithEffects:
def __init__(self):
self.effect_generator = EffectGeneratorAsync()
self.semaphore = asyncio.Semaphore(ELEVENLABS_MAX_PARALLEL)
self.temp_files = []
async def generate_audio(
self,
text_split: SplitTextOutput,
character_to_voice: dict[str, str],
out_path: Path | None = None,
*,
generate_effects: bool = True,
) -> Path:
"""Main method to generate the audiobook with TTS, emotion, and sound effects."""
num_lines = len(text_split.phrases)
lines_for_sound_effect = self._select_lines_for_sound_effect(
num_lines,
fraction=float(0.2 * generate_effects),
)
logger.info(f"{generate_effects = }, {lines_for_sound_effect = }")
data_for_tts, data_for_sound_effects = await self._prepare_text_for_tts(
text_split, lines_for_sound_effect
)
tts_results, self.temp_files = await self._generate_tts_audio(
text_split, data_for_tts, character_to_voice
)
audio_chunks = await self._add_sound_effects(
tts_results, lines_for_sound_effect, data_for_sound_effects, self.temp_files
)
normalized_audio_chunks = self._normalize_audio_chunks(
audio_chunks, self.temp_files
)
final_output = self._merge_audio_files(
normalized_audio_chunks, save_path=out_path
)
self._cleanup_temp_files(self.temp_files)
return final_output
def _select_lines_for_sound_effect(
self, num_lines: int, fraction: float
) -> list[int]:
"""Select % of the lines randomly for sound effect generation."""
return random.sample(range(num_lines), k=int(fraction * num_lines))
async def _prepare_text_for_tts(
self, text_split: SplitTextOutput, lines_for_sound_effect: list[int]
) -> tuple[list[dict], list[dict]]:
semaphore = asyncio.Semaphore(OPENAI_MAX_PARALLEL)
async def run_task_with_semaphore(func, **params):
async with semaphore:
outputs = await func(**params)
return outputs
task_emotion_code = "add_emotion"
task_effects_code = "add_effects"
tasks = []
for idx, character_phrase in enumerate(text_split.phrases):
character_text = character_phrase.text.strip().lower()
tasks.append(
run_task_with_semaphore(
func=self.effect_generator.add_emotion_to_text,
text=character_text,
)
)
# If this line needs sound effects, generate parameters
if idx in lines_for_sound_effect:
tasks.append(
run_task_with_semaphore(
func=self.effect_generator.generate_parameters_for_sound_effect,
text=character_text,
)
)
tasks_results: list[TextPreparationForTTSTaskOutput] = []
tasks_results = await asyncio.gather(*tasks)
emotion_tasks_results = [
x.output for x in tasks_results if x.task == task_emotion_code
]
effects_tasks_results = [
x.output for x in tasks_results if x.task == task_effects_code
]
return emotion_tasks_results, effects_tasks_results
async def _generate_tts_audio(
self,
text_split: SplitTextOutput,
data_for_tts: list[dict],
character_to_voice: dict[str, str],
) -> tuple[list[str], list[str]]:
"""Generate TTS audio for modified text."""
tasks_for_tts = []
temp_files = []
async def tts_astream_with_semaphore(voice_id: str, text: str, params: dict):
async with self.semaphore:
bytes_ = await tts_astream_consumed(
voice_id=voice_id, text=text, params=params
)
# bytes_ = await consume_aiter(iter_)
return bytes_
for idx, (data_item, character_phrase) in enumerate(
zip(data_for_tts, text_split.phrases)
):
voice_id = character_to_voice[character_phrase.character]
task = tts_astream_with_semaphore(
voice_id=voice_id,
text=data_item["modified_text"],
params=data_item["params"],
)
tasks_for_tts.append(task)
tts_results = await asyncio.gather(*tasks_for_tts)
# Save the results to temporary files
tts_audio_files = []
for idx, tts_result in enumerate(tts_results):
tts_filename = f"tts_output_{idx}.wav"
with open(tts_filename, "wb") as ab:
for chunk in tts_result:
ab.write(chunk)
tts_audio_files.append(tts_filename)
temp_files.append(tts_filename)
return tts_audio_files, temp_files
async def _add_sound_effects(
self,
tts_audio_files: list[str],
lines_for_sound_effect: list[int],
data_for_sound_effects: list[dict],
temp_files: list[str],
) -> list[str]:
"""Add sound effects to the selected lines."""
semaphore = asyncio.Semaphore(ELEVENLABS_MAX_PARALLEL)
async def _process_single_phrase(
tts_filename: str,
sound_effect_data: dict | None,
sound_effect_filename: str,
):
if sound_effect_data is None:
return (tts_filename, [])
async with semaphore:
sound_result = await sound_generation_consumed(sound_effect_data)
# save to file
with open(sound_effect_filename, "wb") as ab:
for chunk in sound_result:
ab.write(chunk)
# overlay sound effect on TTS audio
tts_with_effects_filename = add_overlay_for_audio(
main_audio_filename=tts_filename,
sound_effect_filename=sound_effect_filename,
cycling_effect=True,
decrease_effect_volume=5,
)
tmp_files = [sound_effect_filename, tts_with_effects_filename]
return (tts_with_effects_filename, tmp_files)
tasks = []
for idx, tts_filename in enumerate(tts_audio_files):
sound_effect_filename = f"sound_effect_{idx}.wav"
if idx not in lines_for_sound_effect:
tasks.append(
_process_single_phrase(
tts_filename=tts_filename,
sound_effect_data=None,
sound_effect_filename=sound_effect_filename,
)
)
else:
sound_effect_data = data_for_sound_effects.pop(0)
tasks.append(
_process_single_phrase(
tts_filename=tts_filename,
sound_effect_data=sound_effect_data,
sound_effect_filename=sound_effect_filename,
)
)
outputs = await asyncio.gather(*tasks)
audio_chunks = [x[0] for x in outputs]
tmp_files_to_add = [item for x in outputs for item in x[1]]
temp_files.extend(tmp_files_to_add)
return audio_chunks
def _normalize_audio(
self, audio_segment: AudioSegment, target_dBFS: float = -20.0
) -> AudioSegment:
"""Normalize an audio segment to the target dBFS level."""
change_in_dBFS = target_dBFS - audio_segment.dBFS
return audio_segment.apply_gain(change_in_dBFS)
def _normalize_audio_chunks(
self, audio_filenames: list[str], temp_files, target_dBFS: float = -20.0
) -> list[str]:
"""Normalize all audio chunks to the target volume level."""
normalized_files = []
for audio_file in audio_filenames:
audio_segment = AudioSegment.from_file(audio_file)
normalized_audio = self._normalize_audio(audio_segment, target_dBFS)
normalized_filename = f"normalized_{Path(audio_file).stem}.wav"
normalized_audio.export(normalized_filename, format="wav")
normalized_files.append(normalized_filename)
temp_files.append(normalized_filename)
return normalized_files
def _merge_audio_files(
self, audio_filenames: list[str], save_path: Path | None = None
) -> Path:
"""Helper function to merge multiple audio files into one."""
combined = AudioSegment.from_file(audio_filenames[0])
for filename in audio_filenames[1:]:
next_audio = AudioSegment.from_file(filename)
combined += next_audio # Concatenate the audio
if save_path is None:
save_dir = Path("data") / "books"
save_dir.mkdir(exist_ok=True)
save_path = save_dir / f"{uuid4()}.wav"
combined.export(save_path, format="wav")
return Path(save_path)
def _cleanup_temp_files(self, temp_files: list[str]) -> None:
"""Helper function to delete all temporary files."""
for temp_file in temp_files:
try:
os.remove(temp_file)
except FileNotFoundError:
continue