import asyncio import os import re from pathlib import Path from uuid import uuid4 import random from langchain_community.callbacks import get_openai_callback from pydub import AudioSegment from src.lc_callbacks import LCMessageLoggerAsync from src.tts import tts_astream_consumed, sound_generation_consumed from src.utils import consume_aiter from src.emotions.generation import ( EffectGeneratorAsync, TextPreparationForTTSTaskOutput, ) from src.emotions.utils import add_overlay_for_audio from src.config import ELEVENLABS_MAX_PARALLEL, logger, OPENAI_MAX_PARALLEL from src.text_split_chain import SplitTextOutput class AudioGeneratorSimple: async def generate_audio( self, text_split: SplitTextOutput, character_to_voice: dict[str, str], ) -> Path: semaphore = asyncio.Semaphore(ELEVENLABS_MAX_PARALLEL) async def tts_astream_with_semaphore(voice_id: str, text: str): async with semaphore: bytes_ = await tts_astream_consumed(voice_id=voice_id, text=text) # bytes_ = await consume_aiter(iter_) return bytes_ tasks = [] for character_phrase in text_split.phrases: voice_id = character_to_voice[character_phrase.character] task = tts_astream_with_semaphore( voice_id=voice_id, text=character_phrase.text ) tasks.append(task) results = await asyncio.gather(*tasks) save_dir = Path("data") / "books" save_dir.mkdir(exist_ok=True) audio_combined_fp = save_dir / f"{uuid4()}.wav"'saving generated audio book to: "{audio_combined_fp}"') with open(audio_combined_fp, "wb") as ab: for result in results: for chunk in result: ab.write(chunk) return audio_combined_fp class AudioGeneratorWithEffects: def __init__(self): self.effect_generator = EffectGeneratorAsync() self.semaphore = asyncio.Semaphore(ELEVENLABS_MAX_PARALLEL) self.temp_files = [] async def generate_audio( self, text_split: SplitTextOutput, character_to_voice: dict[str, str], out_path: Path | None = None, *, generate_effects: bool = True, ) -> Path: """Main method to generate the audiobook with TTS, emotion, and sound effects.""" num_lines = len(text_split.phrases) lines_for_sound_effect = self._select_lines_for_sound_effect( num_lines, fraction=float(0.2 * generate_effects), )"{generate_effects = }, {lines_for_sound_effect = }") data_for_tts, data_for_sound_effects = await self._prepare_text_for_tts( text_split, lines_for_sound_effect ) tts_results, self.temp_files = await self._generate_tts_audio( text_split, data_for_tts, character_to_voice ) audio_chunks = await self._add_sound_effects( tts_results, lines_for_sound_effect, data_for_sound_effects, self.temp_files ) normalized_audio_chunks = self._normalize_audio_chunks( audio_chunks, self.temp_files ) final_output = self._merge_audio_files( normalized_audio_chunks, save_path=out_path ) self._cleanup_temp_files(self.temp_files) return final_output def _select_lines_for_sound_effect( self, num_lines: int, fraction: float ) -> list[int]: """Select % of the lines randomly for sound effect generation.""" return random.sample(range(num_lines), k=int(fraction * num_lines)) async def _prepare_text_for_tts( self, text_split: SplitTextOutput, lines_for_sound_effect: list[int] ) -> tuple[list[dict], list[dict]]: semaphore = asyncio.Semaphore(OPENAI_MAX_PARALLEL) async def run_task_with_semaphore(func, **params): async with semaphore: outputs = await func(**params) return outputs task_emotion_code = "add_emotion" task_effects_code = "add_effects" tasks = [] for idx, character_phrase in enumerate(text_split.phrases): character_text = character_phrase.text.strip().lower() tasks.append( run_task_with_semaphore( func=self.effect_generator.add_emotion_to_text, text=character_text, ) ) # If this line needs sound effects, generate parameters if idx in lines_for_sound_effect: tasks.append( run_task_with_semaphore( func=self.effect_generator.generate_parameters_for_sound_effect, text=character_text, ) ) tasks_results: list[TextPreparationForTTSTaskOutput] = [] tasks_results = await asyncio.gather(*tasks) emotion_tasks_results = [ x.output for x in tasks_results if x.task == task_emotion_code ] effects_tasks_results = [ x.output for x in tasks_results if x.task == task_effects_code ] return emotion_tasks_results, effects_tasks_results async def _generate_tts_audio( self, text_split: SplitTextOutput, data_for_tts: list[dict], character_to_voice: dict[str, str], ) -> tuple[list[str], list[str]]: """Generate TTS audio for modified text.""" tasks_for_tts = [] temp_files = [] async def tts_astream_with_semaphore(voice_id: str, text: str, params: dict): async with self.semaphore: bytes_ = await tts_astream_consumed( voice_id=voice_id, text=text, params=params ) # bytes_ = await consume_aiter(iter_) return bytes_ for idx, (data_item, character_phrase) in enumerate( zip(data_for_tts, text_split.phrases) ): voice_id = character_to_voice[character_phrase.character] task = tts_astream_with_semaphore( voice_id=voice_id, text=data_item["modified_text"], params=data_item["params"], ) tasks_for_tts.append(task) tts_results = await asyncio.gather(*tasks_for_tts) # Save the results to temporary files tts_audio_files = [] for idx, tts_result in enumerate(tts_results): tts_filename = f"tts_output_{idx}.wav" with open(tts_filename, "wb") as ab: for chunk in tts_result: ab.write(chunk) tts_audio_files.append(tts_filename) temp_files.append(tts_filename) return tts_audio_files, temp_files async def _add_sound_effects( self, tts_audio_files: list[str], lines_for_sound_effect: list[int], data_for_sound_effects: list[dict], temp_files: list[str], ) -> list[str]: """Add sound effects to the selected lines.""" semaphore = asyncio.Semaphore(ELEVENLABS_MAX_PARALLEL) async def _process_single_phrase( tts_filename: str, sound_effect_data: dict | None, sound_effect_filename: str, ): if sound_effect_data is None: return (tts_filename, []) async with semaphore: sound_result = await sound_generation_consumed(sound_effect_data) # save to file with open(sound_effect_filename, "wb") as ab: for chunk in sound_result: ab.write(chunk) # overlay sound effect on TTS audio tts_with_effects_filename = add_overlay_for_audio( main_audio_filename=tts_filename, sound_effect_filename=sound_effect_filename, cycling_effect=True, decrease_effect_volume=5, ) tmp_files = [sound_effect_filename, tts_with_effects_filename] return (tts_with_effects_filename, tmp_files) tasks = [] for idx, tts_filename in enumerate(tts_audio_files): sound_effect_filename = f"sound_effect_{idx}.wav" if idx not in lines_for_sound_effect: tasks.append( _process_single_phrase( tts_filename=tts_filename, sound_effect_data=None, sound_effect_filename=sound_effect_filename, ) ) else: sound_effect_data = data_for_sound_effects.pop(0) tasks.append( _process_single_phrase( tts_filename=tts_filename, sound_effect_data=sound_effect_data, sound_effect_filename=sound_effect_filename, ) ) outputs = await asyncio.gather(*tasks) audio_chunks = [x[0] for x in outputs] tmp_files_to_add = [item for x in outputs for item in x[1]] temp_files.extend(tmp_files_to_add) return audio_chunks def _normalize_audio( self, audio_segment: AudioSegment, target_dBFS: float = -20.0 ) -> AudioSegment: """Normalize an audio segment to the target dBFS level.""" change_in_dBFS = target_dBFS - audio_segment.dBFS return audio_segment.apply_gain(change_in_dBFS) def _normalize_audio_chunks( self, audio_filenames: list[str], temp_files, target_dBFS: float = -20.0 ) -> list[str]: """Normalize all audio chunks to the target volume level.""" normalized_files = [] for audio_file in audio_filenames: audio_segment = AudioSegment.from_file(audio_file) normalized_audio = self._normalize_audio(audio_segment, target_dBFS) normalized_filename = f"normalized_{Path(audio_file).stem}.wav" normalized_audio.export(normalized_filename, format="wav") normalized_files.append(normalized_filename) temp_files.append(normalized_filename) return normalized_files def _merge_audio_files( self, audio_filenames: list[str], save_path: Path | None = None ) -> Path: """Helper function to merge multiple audio files into one.""" combined = AudioSegment.from_file(audio_filenames[0]) for filename in audio_filenames[1:]: next_audio = AudioSegment.from_file(filename) combined += next_audio # Concatenate the audio if save_path is None: save_dir = Path("data") / "books" save_dir.mkdir(exist_ok=True) save_path = save_dir / f"{uuid4()}.wav" combined.export(save_path, format="wav") return Path(save_path) def _cleanup_temp_files(self, temp_files: list[str]) -> None: """Helper function to delete all temporary files.""" for temp_file in temp_files: try: os.remove(temp_file) except FileNotFoundError: continue