# Copyright (c) 2021 Mobvoi Inc (Binbin Zhang, Di Wu) # 2022 Xingchen Song (sxc19@mails.tsinghua.edu.cn) # 2024 Alibaba Inc (Xiang Lyu) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Modified from ESPnet(https://github.com/espnet/espnet) """Encoder definition.""" from typing import Tuple import torch from torch import nn import torch.utils.checkpoint as ckpt from torch.nn import functional as F from cosyvoice.transformer.convolution import ConvolutionModule from cosyvoice.transformer.encoder_layer import ConformerEncoderLayer from cosyvoice.transformer.positionwise_feed_forward import PositionwiseFeedForward from cosyvoice.utils.class_utils import ( COSYVOICE_EMB_CLASSES, COSYVOICE_SUBSAMPLE_CLASSES, COSYVOICE_ATTENTION_CLASSES, COSYVOICE_ACTIVATION_CLASSES, ) from cosyvoice.utils.mask import make_pad_mask from cosyvoice.utils.mask import add_optional_chunk_mask class Upsample1D(nn.Module): """A 1D upsampling layer with an optional convolution. Parameters: channels (`int`): number of channels in the inputs and outputs. use_conv (`bool`, default `False`): option to use a convolution. use_conv_transpose (`bool`, default `False`): option to use a convolution transpose. out_channels (`int`, optional): number of output channels. Defaults to `channels`. """ def __init__(self, channels: int, out_channels: int, stride: int=2): super().__init__() self.channels = channels self.out_channels = out_channels self.stride = stride # In this mode, first repeat interpolate, than conv with stride=1 self.conv = nn.Conv1d( self.channels, self.out_channels, stride*2+1, stride=1, padding=0, ) def forward(self, inputs: torch.Tensor, input_lengths: torch.Tensor): outputs = F.interpolate(inputs, scale_factor=float(self.stride), mode="nearest") outputs = F.pad(outputs, (self.stride * 2, 0), value=0.0) outputs = self.conv(outputs) return outputs, input_lengths * self.stride class PreLookaheadLayer(nn.Module): def __init__(self, channels: int, pre_lookahead_len: int = 1): super().__init__() self.channels = channels self.pre_lookahead_len = pre_lookahead_len self.conv1 = nn.Conv1d( channels, channels, kernel_size=pre_lookahead_len+1, stride=1, padding=0, ) self.conv2 = nn.Conv1d( channels, channels, kernel_size=3, stride=1, padding=0, ) def forward(self, inputs: torch.Tensor) -> torch.Tensor: """ inputs: (batch_size, seq_len, channels) """ outputs = inputs.transpose(1, 2).contiguous() # look ahead outputs = F.pad(outputs, (0, self.pre_lookahead_len), mode='constant', value=0.0) outputs = F.leaky_relu(self.conv1(outputs)) # outputs outputs = F.pad(outputs, (2, 0), mode='constant', value=0.0) outputs = self.conv2(outputs) outputs = outputs.transpose(1, 2).contiguous() # residual connection outputs = outputs + inputs return outputs class UpsampleConformerEncoder(torch.nn.Module): def __init__( self, input_size: int, output_size: int = 256, attention_heads: int = 4, linear_units: int = 2048, num_blocks: int = 6, dropout_rate: float = 0.1, positional_dropout_rate: float = 0.1, attention_dropout_rate: float = 0.0, input_layer: str = "conv2d", pos_enc_layer_type: str = "rel_pos", normalize_before: bool = True, static_chunk_size: int = 0, use_dynamic_chunk: bool = False, global_cmvn: torch.nn.Module = None, use_dynamic_left_chunk: bool = False, positionwise_conv_kernel_size: int = 1, macaron_style: bool = True, selfattention_layer_type: str = "rel_selfattn", activation_type: str = "swish", use_cnn_module: bool = True, cnn_module_kernel: int = 15, causal: bool = False, cnn_module_norm: str = "batch_norm", key_bias: bool = True, gradient_checkpointing: bool = False, ): """ Args: input_size (int): input dim output_size (int): dimension of attention attention_heads (int): the number of heads of multi head attention linear_units (int): the hidden units number of position-wise feed forward num_blocks (int): the number of decoder blocks dropout_rate (float): dropout rate attention_dropout_rate (float): dropout rate in attention positional_dropout_rate (float): dropout rate after adding positional encoding input_layer (str): input layer type. optional [linear, conv2d, conv2d6, conv2d8] pos_enc_layer_type (str): Encoder positional encoding layer type. opitonal [abs_pos, scaled_abs_pos, rel_pos, no_pos] normalize_before (bool): True: use layer_norm before each sub-block of a layer. False: use layer_norm after each sub-block of a layer. static_chunk_size (int): chunk size for static chunk training and decoding use_dynamic_chunk (bool): whether use dynamic chunk size for training or not, You can only use fixed chunk(chunk_size > 0) or dyanmic chunk size(use_dynamic_chunk = True) global_cmvn (Optional[torch.nn.Module]): Optional GlobalCMVN module use_dynamic_left_chunk (bool): whether use dynamic left chunk in dynamic chunk training key_bias: whether use bias in attention.linear_k, False for whisper models. gradient_checkpointing: rerunning a forward-pass segment for each checkpointed segment during backward. """ super().__init__() self._output_size = output_size self.global_cmvn = global_cmvn self.embed = COSYVOICE_SUBSAMPLE_CLASSES[input_layer]( input_size, output_size, dropout_rate, COSYVOICE_EMB_CLASSES[pos_enc_layer_type](output_size, positional_dropout_rate), ) self.normalize_before = normalize_before self.after_norm = torch.nn.LayerNorm(output_size, eps=1e-5) self.static_chunk_size = static_chunk_size self.use_dynamic_chunk = use_dynamic_chunk self.use_dynamic_left_chunk = use_dynamic_left_chunk self.gradient_checkpointing = gradient_checkpointing activation = COSYVOICE_ACTIVATION_CLASSES[activation_type]() # self-attention module definition encoder_selfattn_layer_args = ( attention_heads, output_size, attention_dropout_rate, key_bias, ) # feed-forward module definition positionwise_layer_args = ( output_size, linear_units, dropout_rate, activation, ) # convolution module definition convolution_layer_args = (output_size, cnn_module_kernel, activation, cnn_module_norm, causal) self.pre_lookahead_layer = PreLookaheadLayer(channels=512, pre_lookahead_len=3) self.encoders = torch.nn.ModuleList([ ConformerEncoderLayer( output_size, COSYVOICE_ATTENTION_CLASSES[selfattention_layer_type]( *encoder_selfattn_layer_args), PositionwiseFeedForward(*positionwise_layer_args), PositionwiseFeedForward( *positionwise_layer_args) if macaron_style else None, ConvolutionModule( *convolution_layer_args) if use_cnn_module else None, dropout_rate, normalize_before, ) for _ in range(num_blocks) ]) self.up_layer = Upsample1D(channels=512, out_channels=512, stride=2) self.up_embed = COSYVOICE_SUBSAMPLE_CLASSES[input_layer]( input_size, output_size, dropout_rate, COSYVOICE_EMB_CLASSES[pos_enc_layer_type](output_size, positional_dropout_rate), ) self.up_encoders = torch.nn.ModuleList([ ConformerEncoderLayer( output_size, COSYVOICE_ATTENTION_CLASSES[selfattention_layer_type]( *encoder_selfattn_layer_args), PositionwiseFeedForward(*positionwise_layer_args), PositionwiseFeedForward( *positionwise_layer_args) if macaron_style else None, ConvolutionModule( *convolution_layer_args) if use_cnn_module else None, dropout_rate, normalize_before, ) for _ in range(4) ]) def output_size(self) -> int: return self._output_size def forward( self, xs: torch.Tensor, xs_lens: torch.Tensor, decoding_chunk_size: int = 0, num_decoding_left_chunks: int = -1, ) -> Tuple[torch.Tensor, torch.Tensor]: """Embed positions in tensor. Args: xs: padded input tensor (B, T, D) xs_lens: input length (B) decoding_chunk_size: decoding chunk size for dynamic chunk 0: default for training, use random dynamic chunk. <0: for decoding, use full chunk. >0: for decoding, use fixed chunk size as set. num_decoding_left_chunks: number of left chunks, this is for decoding, the chunk size is decoding_chunk_size. >=0: use num_decoding_left_chunks <0: use all left chunks Returns: encoder output tensor xs, and subsampled masks xs: padded output tensor (B, T' ~= T/subsample_rate, D) masks: torch.Tensor batch padding mask after subsample (B, 1, T' ~= T/subsample_rate) NOTE(xcsong): We pass the `__call__` method of the modules instead of `forward` to the checkpointing API because `__call__` attaches all the hooks of the module. https://discuss.pytorch.org/t/any-different-between-model-input-and-model-forward-input/3690/2 """ T = xs.size(1) masks = ~make_pad_mask(xs_lens, T).unsqueeze(1) # (B, 1, T) if self.global_cmvn is not None: xs = self.global_cmvn(xs) xs, pos_emb, masks = self.embed(xs, masks) mask_pad = masks # (B, 1, T/subsample_rate) chunk_masks = add_optional_chunk_mask(xs, masks, self.use_dynamic_chunk, self.use_dynamic_left_chunk, decoding_chunk_size, self.static_chunk_size, num_decoding_left_chunks) # lookahead + conformer encoder xs = self.pre_lookahead_layer(xs) xs = self.forward_layers(xs, chunk_masks, pos_emb, mask_pad) # upsample + conformer encoder xs = xs.transpose(1, 2).contiguous() xs, xs_lens = self.up_layer(xs, xs_lens) xs = xs.transpose(1, 2).contiguous() T = xs.size(1) masks = ~make_pad_mask(xs_lens, T).unsqueeze(1) # (B, 1, T) xs, pos_emb, masks = self.up_embed(xs, masks) mask_pad = masks # (B, 1, T/subsample_rate) chunk_masks = add_optional_chunk_mask(xs, masks, self.use_dynamic_chunk, self.use_dynamic_left_chunk, decoding_chunk_size, self.static_chunk_size * self.up_layer.stride, num_decoding_left_chunks) xs = self.forward_up_layers(xs, chunk_masks, pos_emb, mask_pad) if self.normalize_before: xs = self.after_norm(xs) # Here we assume the mask is not changed in encoder layers, so just # return the masks before encoder layers, and the masks will be used # for cross attention with decoder later return xs, masks def forward_layers(self, xs: torch.Tensor, chunk_masks: torch.Tensor, pos_emb: torch.Tensor, mask_pad: torch.Tensor) -> torch.Tensor: for layer in self.encoders: xs, chunk_masks, _, _ = layer(xs, chunk_masks, pos_emb, mask_pad) return xs def forward_up_layers(self, xs: torch.Tensor, chunk_masks: torch.Tensor, pos_emb: torch.Tensor, mask_pad: torch.Tensor) -> torch.Tensor: for layer in self.up_encoders: xs, chunk_masks, _, _ = layer(xs, chunk_masks, pos_emb, mask_pad) return xs