File size: 6,577 Bytes
256e1f6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
"""
Synthesize a given text using the trained DiT models.
"""

import json
import os

os.environ["NLTK_DATA"] = "nltk_data"
import torch
import yaml
from g2p_en import G2p
import soundfile as sf
from vocos import Vocos
from sample import sample


def synthesize(
    text,
    duration_model_config,
    duration_model_checkpoint,
    acoustic_model_config,
    acoustic_model_checkpoint,
    speaker_id,
    cfg_scale=4.0,
    num_sampling_steps=1000,
):
    """
    Synthesize speech from text using trained DiT models.

    Args:
        text (str): Input text to synthesize
        duration_model_config (str): Path to duration model config file
        duration_model_checkpoint (str): Path to duration model checkpoint
        acoustic_model_config (str): Path to acoustic model config file
        acoustic_model_checkpoint (str): Path to acoustic model checkpoint
        speaker_id (str): Speaker ID to use for synthesis
        cfg_scale (float): Classifier-free guidance scale (default: 4.0)
        num_sampling_steps (int): Number of sampling steps for diffusion (default: 1000)

    Returns:
        numpy.ndarray: Audio waveform array
        int: Sample rate (24000)
    """

    print("Text:", text)

    # Read duration model config
    with open(duration_model_config, "r") as f:
        duration_config = yaml.safe_load(f)

    # Get data directory from data_path
    data_dir = os.path.dirname(duration_config["data"]["data_path"])

    # Read maps.json from same directory
    with open(os.path.join(data_dir, "maps.json"), "r") as f:
        maps = json.load(f)
    phone_to_idx = maps["phone_to_idx"]
    phone_kind_to_idx = maps["phone_kind_to_idx"]
    speaker_id_to_idx = maps["speaker_id_to_idx"]

    # Step 1: Text to phonemes
    def text_to_phonemes(text, insert_empty=True):
        g2p = G2p()
        phonemes = g2p(text)
        words = []
        word = []
        for p in phonemes:
            if p == " ":
                if len(word) > 0:
                    words.append(word)
                word = []
            else:
                word.append(p)
        if len(word) > 0:
            words.append(word)

        phones = []
        phone_kinds = []
        for word in words:
            for i, p in enumerate(word):
                if p in [",", ".", "!", "?", ";", ":"]:
                    p = "EMPTY"
                elif p in phone_to_idx:
                    pass
                else:
                    continue

                if p == "EMPTY":
                    phone_kind = "EMPTY"
                elif len(word) == 1:
                    phone_kind = "WORD"
                elif i == 0:
                    phone_kind = "START"
                elif i == len(word) - 1:
                    phone_kind = "END"
                else:
                    phone_kind = "MIDDLE"

                phones.append(p)
                phone_kinds.append(phone_kind)

        if insert_empty:
            if phones[0] != "EMPTY":
                phones.insert(0, "EMPTY")
                phone_kinds.insert(0, "EMPTY")
            if phones[-1] != "EMPTY":
                phones.append("EMPTY")
                phone_kinds.append("EMPTY")

        return phones, phone_kinds

    phonemes, phone_kinds = text_to_phonemes(text)
    # Convert phonemes to indices
    phoneme_indices = [phone_to_idx[p] for p in phonemes]
    phone_kind_indices = [phone_kind_to_idx[p] for p in phone_kinds]
    print("Phonemes:", phonemes)

    # Step 2: Duration prediction
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    torch_phoneme_indices = torch.tensor(phoneme_indices)[None, :].long().to(device)
    torch_speaker_id = torch.full_like(torch_phoneme_indices, int(speaker_id))
    torch_phone_kind_indices = (
        torch.tensor(phone_kind_indices)[None, :].long().to(device)
    )

    samples = sample(
        duration_model_config,
        duration_model_checkpoint,
        cfg_scale=cfg_scale,
        num_sampling_steps=num_sampling_steps,
        seed=0,
        speaker_id=torch_speaker_id,
        phone=torch_phoneme_indices,
        phone_kind=torch_phone_kind_indices,
    )
    phoneme_durations = samples[-1][0, 0]

    # Step 3: Acoustic prediction
    # First, we need to convert phoneme durations to number of frames per phoneme (min 1 frame)
    SAMPLE_RATE = 24000
    HOP_LENGTH = 256
    N_FFT = 1024
    N_MELS = 100
    time_per_frame = HOP_LENGTH / SAMPLE_RATE
    # convert predicted durations to raw durations using data mean and std in the config
    if duration_config["data"]["normalize"]:
        mean = duration_config["data"]["data_mean"]
        std = duration_config["data"]["data_std"]
        raw_durations = phoneme_durations * std + mean
    else:
        raw_durations = phoneme_durations

    raw_durations = raw_durations.clamp(min=time_per_frame, max=1.0)
    end_time = torch.cumsum(raw_durations, dim=0)
    end_frame = end_time / time_per_frame
    int_end_frame = end_frame.floor().int()
    repeated_phoneme_indices = []
    repeated_phone_kind_indices = []
    for i in range(len(phonemes)):
        repeated_phoneme_indices.extend(
            [phoneme_indices[i]] * (int_end_frame[i] - len(repeated_phoneme_indices))
        )
        repeated_phone_kind_indices.extend(
            [phone_kind_indices[i]]
            * (int_end_frame[i] - len(repeated_phone_kind_indices))
        )

    torch_phoneme_indices = (
        torch.tensor(repeated_phoneme_indices)[None, :].long().to(device)
    )
    torch_speaker_id = torch.full_like(torch_phoneme_indices, int(speaker_id))
    torch_phone_kind_indices = (
        torch.tensor(repeated_phone_kind_indices)[None, :].long().to(device)
    )

    samples = sample(
        acoustic_model_config,
        acoustic_model_checkpoint,
        cfg_scale=cfg_scale,
        num_sampling_steps=num_sampling_steps,
        seed=0,
        speaker_id=torch_speaker_id,
        phone=torch_phoneme_indices,
        phone_kind=torch_phone_kind_indices,
    )
    mel = samples[-1][0]
    # compute raw mel if acoustic model normalize is true
    acoustic_config = yaml.safe_load(open(acoustic_model_config, "r"))
    if acoustic_config["data"]["normalize"]:
        mean = acoustic_config["data"]["data_mean"]
        std = acoustic_config["data"]["data_std"]
        raw_mel = mel * std + mean
    else:
        raw_mel = mel

    # Step 4: Vocoder
    vocos = Vocos.from_pretrained("charactr/vocos-mel-24khz")
    audio = vocos.decode(raw_mel.cpu()[None, :, :]).squeeze().cpu().numpy()

    return audio, SAMPLE_RATE