File size: 4,313 Bytes
06a4f42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
00ed2b4
06a4f42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import asyncio
import io
import logging
import os
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import List, Union, Tuple

import sounddevice as sd
import soundfile as sf
from elevenlabslib import ElevenLabsUser, ElevenLabsVoice

from .utils import timeit

logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

try:
    USER = ElevenLabsUser(os.environ["ELEVENLABS_API_KEY"])
except KeyError as e:
    USER = None
    log.warning("ELEVENLABS_API_KEY not found in environment variables.")
    pass


@dataclass
class Speaker:
    name: str
    voice: ElevenLabsVoice
    color: str
    description: str = None


async def text_to_speechbytes_async(text, speaker, loop):
    with ThreadPoolExecutor() as executor:
        speech_bytes = await loop.run_in_executor(executor, text_to_speechbytes, text, speaker.voice)
    return speech_bytes


async def play_history(history: List[Tuple[Speaker, str]]):
    loop = asyncio.get_event_loop()

    # Create a list of tasks for all text_to_speechbytes function calls
    tasks = [text_to_speechbytes_async(
        text, speaker, loop) for speaker, text in history]

    # Run tasks concurrently, waiting for the first one to complete
    for speech_bytes in await asyncio.gather(*tasks):
        audioFile = io.BytesIO(speech_bytes)
        soundFile = sf.SoundFile(audioFile)
        sd.play(soundFile.read(), samplerate=soundFile.samplerate, blocking=True)


async def save_history(history: List[Tuple[Speaker, str]], audio_savepath: str):
    loop = asyncio.get_event_loop()

    # Create a list of tasks for all text_to_speechbytes function calls
    tasks = [text_to_speechbytes_async(
        text, speaker, loop) for speaker, text in history]

    # Run tasks concurrently, waiting for the first one to complete
    all_speech_bytes = await asyncio.gather(*tasks)

    # Combine all audio bytes into a single audio file
    concatenated_audio = io.BytesIO(b''.join(all_speech_bytes))

    # Save the combined audio file to disk
    with sf.SoundFile(concatenated_audio, mode='r') as soundFile:
        with sf.SoundFile(
            audio_savepath, mode='w',
            samplerate=soundFile.samplerate,
            channels=soundFile.channels,
        ) as outputFile:
            outputFile.write(soundFile.read())


def check_voice_exists(voice: Union[ElevenLabsVoice, str]) -> Union[ElevenLabsVoice, None]:
    if USER is None:
        log.warning(
            "No ElevenLabsUser found, have you set the ELEVENLABS_API_KEY environment variable?")
        return None
    log.info(f"Getting voice {voice}...")
    _available_voices = USER.get_voices_by_name(voice)
    if _available_voices:
        log.info(f"Voice {voice} already exists, found {_available_voices}.")
        return _available_voices[0]
    return None


@timeit
def get_make_voice(voice: Union[ElevenLabsVoice, str], audio_path: List[str] = None) -> ElevenLabsVoice:
    if USER is None:
        log.warning(
            "No ElevenLabsUser found, have you set the ELEVENLABS_API_KEY environment variable?")
        return None
    _voice = check_voice_exists(voice)
    if _voice is not None:
        return _voice
    else:
        if USER.get_voice_clone_available():
            assert audio_path is not None, "audio_path must be provided"
            assert isinstance(audio_path, list), "audio_path must be a list"
            log.info(f"Cloning voice {voice}...")
            _audio_source_dict = {
                # Audio path is a PosixPath
                _.name: open(_, "rb").read() for _ in audio_path
            }
            newVoice = USER.clone_voice_bytes(voice, _audio_source_dict)
            return newVoice
    raise ValueError(
        f"Voice {voice} does not exist and cloning is not available.")


@timeit
def text_to_speech(text: str, voice: ElevenLabsVoice):
    log.info(f"Generating audio using voice {voice}...")
    time_start = time.time()
    voice.generate_and_play_audio(text, playInBackground=False)
    duration = time.time() - time_start
    return duration


@timeit
def text_to_speechbytes(text: str, voice: ElevenLabsVoice):
    log.info(f"Generating audio for voice {voice} text {text}...")
    audio_bytes = voice.generate_audio_bytes(text)
    return audio_bytes