Spaces:
Running
Running
import asyncio | |
import os | |
import re | |
from pathlib import Path | |
from uuid import uuid4 | |
import random | |
from langchain_community.callbacks import get_openai_callback | |
from pydub import AudioSegment | |
from src.lc_callbacks import LCMessageLoggerAsync | |
from src.tts import tts_astream, sound_generation_astream | |
from src.utils import consume_aiter | |
from src.emotions.generation import EffectGeneratorAsync | |
from src.emotions.utils import add_overlay_for_audio | |
from src.config import AI_ML_API_KEY, ELEVENLABS_MAX_PARALLEL, logger | |
from src.text_split_chain import SplitTextOutput | |
class AudioGeneratorSimple: | |
async def generate_audio( | |
self, | |
text_split: SplitTextOutput, | |
character_to_voice: dict[str, str], | |
) -> Path: | |
semaphore = asyncio.Semaphore(ELEVENLABS_MAX_PARALLEL) | |
async def tts_astream_with_semaphore(voice_id: str, text: str): | |
async with semaphore: | |
iter_ = tts_astream(voice_id=voice_id, text=text) | |
bytes_ = await consume_aiter(iter_) | |
return bytes_ | |
tasks = [] | |
for character_phrase in text_split.phrases: | |
voice_id = character_to_voice[character_phrase.character] | |
task = tts_astream_with_semaphore( | |
voice_id=voice_id, text=character_phrase.text | |
) | |
tasks.append(task) | |
results = await asyncio.gather(*tasks) | |
save_dir = Path("data") / "books" | |
save_dir.mkdir(exist_ok=True) | |
audio_combined_fp = save_dir / f"{uuid4()}.wav" | |
logger.info(f'saving generated audio book to: "{audio_combined_fp}"') | |
with open(audio_combined_fp, "wb") as ab: | |
for result in results: | |
for chunk in result: | |
ab.write(chunk) | |
return audio_combined_fp | |
class AudioGeneratorWithEffects: | |
def __init__(self): | |
self.effect_generator = EffectGeneratorAsync(AI_ML_API_KEY) | |
self.semaphore = asyncio.Semaphore(ELEVENLABS_MAX_PARALLEL) | |
self.temp_files = [] | |
async def generate_audio( | |
self, | |
text_split: SplitTextOutput, | |
character_to_voice: dict[str, str], | |
) -> Path: | |
"""Main method to generate the audiobook with TTS, emotion, and sound effects.""" | |
num_lines = len(text_split.phrases) | |
lines_for_sound_effect = self._select_lines_for_sound_effect(num_lines) | |
# Step 1: Process and modify text | |
modified_texts, sound_emotion_results = await self._process_and_modify_text( | |
text_split, lines_for_sound_effect | |
) | |
# Step 2: Generate TTS audio for modified text | |
tts_results, self.temp_files = await self._generate_tts_audio( | |
text_split, modified_texts, character_to_voice | |
) | |
# Step 3: Add sound effects to selected lines | |
audio_chunks = await self._add_sound_effects( | |
tts_results, lines_for_sound_effect, sound_emotion_results, self.temp_files | |
) | |
# Step 4: Merge audio files | |
normalized_audio_chunks = self._normalize_audio_chunks(audio_chunks, self.temp_files) | |
final_output = self._merge_audio_files(normalized_audio_chunks) | |
# Clean up temporary files | |
self._cleanup_temp_files(self.temp_files) | |
return final_output | |
def _select_lines_for_sound_effect(self, num_lines: int) -> list[int]: | |
"""Select 20% of the lines randomly for sound effect generation.""" | |
return random.sample(range(num_lines), k=int(0.0 * num_lines)) | |
async def _process_and_modify_text( | |
self, text_split: SplitTextOutput, lines_for_sound_effect: list[int] | |
) -> tuple[list[dict], list[dict]]: | |
"""Process the text by modifying it and generating tasks for sound effects.""" | |
tasks_for_text_modification = [] | |
sound_emotion_tasks = [] | |
for idx, character_phrase in enumerate(text_split.phrases): | |
character_text = character_phrase.text.strip().lower() | |
# Add text emotion modification task | |
tasks_for_text_modification.append( | |
self.effect_generator.add_emotion_to_text(character_text) | |
) | |
# If this line needs sound effects, generate parameters | |
if idx in lines_for_sound_effect: | |
sound_emotion_tasks.append( | |
self.effect_generator.generate_parameters_for_sound_effect( | |
character_text | |
) | |
) | |
# Await tasks for text modification and sound effects | |
modified_texts = await asyncio.gather(*tasks_for_text_modification) | |
sound_emotion_results = await asyncio.gather(*sound_emotion_tasks) | |
return modified_texts, sound_emotion_results | |
async def _generate_tts_audio( | |
self, | |
text_split: SplitTextOutput, | |
modified_texts: list[dict], | |
character_to_voice: dict[str, str], | |
) -> tuple[list[str], list[str]]: | |
"""Generate TTS audio for modified text.""" | |
tasks_for_tts = [] | |
temp_files = [] | |
async def tts_astream_with_semaphore(voice_id: str, text: str, params: dict): | |
async with self.semaphore: | |
iter_ = tts_astream(voice_id=voice_id, text=text, params=params) | |
bytes_ = await consume_aiter(iter_) | |
return bytes_ | |
for idx, (modified_text, character_phrase) in enumerate( | |
zip(modified_texts, text_split.phrases) | |
): | |
voice_id = character_to_voice[character_phrase.character] | |
# Use the semaphore-protected TTS function | |
task = tts_astream_with_semaphore( | |
voice_id=voice_id, | |
text=modified_text["modified_text"], | |
params=modified_text["params"], | |
) | |
tasks_for_tts.append(task) | |
# Gather all TTS results | |
tts_results = await asyncio.gather(*tasks_for_tts) | |
# Save the results to temporary files | |
tts_audio_files = [] | |
for idx, tts_result in enumerate(tts_results): | |
tts_filename = f"tts_output_{idx}.wav" | |
with open(tts_filename, "wb") as ab: | |
for chunk in tts_result: | |
ab.write(chunk) | |
tts_audio_files.append(tts_filename) | |
temp_files.append(tts_filename) | |
return tts_audio_files, temp_files | |
async def _add_sound_effects( | |
self, | |
tts_audio_files: list[str], | |
lines_for_sound_effect: list[int], | |
sound_emotion_results: list[dict], | |
temp_files: list[str], | |
) -> list[str]: | |
"""Add sound effects to the selected lines.""" | |
audio_chunks = [] | |
for idx, tts_filename in enumerate(tts_audio_files): | |
# If the line has sound emotion data, generate sound effect and overlay | |
if idx in lines_for_sound_effect: | |
sound_effect_data = sound_emotion_results.pop(0) # Get next sound effect data | |
sound_effect_filename = f"sound_effect_{idx}.wav" | |
# Generate sound effect asynchronously | |
sound_result = await consume_aiter(sound_generation_astream(sound_effect_data)) | |
with open(sound_effect_filename, "wb") as ab: | |
for chunk in sound_result: | |
ab.write(chunk) | |
# Add sound effect overlay | |
output_filename = add_overlay_for_audio( | |
main_audio_filename=tts_filename, | |
sound_effect_filename=sound_effect_filename, | |
cycling_effect=True, | |
decrease_effect_volume=5, | |
) | |
audio_chunks.append(output_filename) | |
temp_files.append(sound_effect_filename) # Track temp files | |
temp_files.append(output_filename) | |
else: | |
audio_chunks.append(tts_filename) | |
return audio_chunks | |
def _normalize_audio(self, audio_segment: AudioSegment, target_dBFS: float = -20.0) -> AudioSegment: | |
"""Normalize an audio segment to the target dBFS level.""" | |
change_in_dBFS = target_dBFS - audio_segment.dBFS | |
return audio_segment.apply_gain(change_in_dBFS) | |
def _normalize_audio_chunks(self, audio_filenames: list[str], temp_files, target_dBFS: float = -20.0) -> list[str]: | |
"""Normalize all audio chunks to the target volume level.""" | |
normalized_files = [] | |
for audio_file in audio_filenames: | |
audio_segment = AudioSegment.from_file(audio_file) | |
normalized_audio = self._normalize_audio(audio_segment, target_dBFS) | |
normalized_filename = f"normalized_{Path(audio_file).stem}.wav" | |
normalized_audio.export(normalized_filename, format="wav") | |
normalized_files.append(normalized_filename) | |
temp_files.append(normalized_filename) | |
return normalized_files | |
def _merge_audio_files(self, audio_filenames: list[str]) -> Path: | |
"""Helper function to merge multiple audio files into one.""" | |
combined = AudioSegment.from_file(audio_filenames[0]) | |
for filename in audio_filenames[1:]: | |
next_audio = AudioSegment.from_file(filename) | |
combined += next_audio # Concatenate the audio | |
save_dir = Path("data") / "books" | |
save_dir.mkdir(exist_ok=True) | |
save_path = save_dir / f"{uuid4()}.wav" | |
combined.export(save_path, format="wav") | |
return Path(save_path) | |
def _cleanup_temp_files(self, temp_files: list[str]) -> None: | |
"""Helper function to delete all temporary files.""" | |
for temp_file in temp_files: | |
try: | |
os.remove(temp_file) | |
except FileNotFoundError: | |
continue |