""" Specifies the inference interfaces for Automatic speech Recognition (ASR) modules. Authors: * Aku Rouhe 2021 * Peter Plantinga 2021 * Loren Lugosch 2020 * Mirco Ravanelli 2020 * Titouan Parcollet 2021 * Abdel Heba 2021 * Andreas Nautsch 2022, 2023 * Pooneh Mousavi 2023 * Sylvain de Langen 2023, 2024 * Adel Moumen 2023, 2024 * Pradnya Kandarkar 2023 """ import functools import itertools from dataclasses import dataclass from typing import Any, List, Optional, Tuple import sentencepiece import torch import torchaudio from tqdm import tqdm import speechbrain from speechbrain.inference.interfaces import Pretrained from speechbrain.utils.data_utils import split_path from speechbrain.utils.dynamic_chunk_training import DynChunkTrainConfig from speechbrain.utils.fetching import fetch from speechbrain.utils.streaming import split_fixed_chunks class EncoderDecoderASR(Pretrained): """A ready-to-use Encoder-Decoder ASR model The class can be used either to run only the encoder (encode()) to extract features or to run the entire encoder-decoder model (transcribe()) to transcribe speech. The given YAML must contain the fields specified in the *_NEEDED[] lists. Arguments --------- *args : tuple **kwargs : dict Arguments are forwarded to ``Pretrained`` parent class. Example ------- >>> from speechbrain.inference.ASR import EncoderDecoderASR >>> tmpdir = getfixture("tmpdir") >>> asr_model = EncoderDecoderASR.from_hparams( ... source="speechbrain/asr-crdnn-rnnlm-librispeech", ... savedir=tmpdir, ... ) # doctest: +SKIP >>> asr_model.transcribe_file("tests/samples/single-mic/example2.flac") # doctest: +SKIP "MY FATHER HAS REVEALED THE CULPRIT'S NAME" """ HPARAMS_NEEDED = ["tokenizer"] MODULES_NEEDED = ["encoder", "decoder"] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.tokenizer = self.hparams.tokenizer self.transducer_beam_search = False self.transformer_beam_search = False if hasattr(self.hparams, "transducer_beam_search"): self.transducer_beam_search = self.hparams.transducer_beam_search if hasattr(self.hparams, "transformer_beam_search"): self.transformer_beam_search = self.hparams.transformer_beam_search def transcribe_file(self, path, **kwargs): """Transcribes the given audiofile into a sequence of words. Arguments --------- path : str Path to audio file which to transcribe. **kwargs : dict Arguments forwarded to ``load_audio``. Returns ------- str The audiofile transcription produced by this ASR system. """ waveform = self.load_audio(path, **kwargs) # Fake a batch: batch = waveform.unsqueeze(0) rel_length = torch.tensor([1.0]) predicted_words, predicted_tokens = self.transcribe_batch( batch, rel_length ) return predicted_words[0] def encode_batch(self, wavs, wav_lens): """Encodes the input audio into a sequence of hidden states The waveforms should already be in the model's desired format. You can call: ``normalized = EncoderDecoderASR.normalizer(signal, sample_rate)`` to get a correctly converted signal in most cases. Arguments --------- wavs : torch.Tensor Batch of waveforms [batch, time, channels] or [batch, time] depending on the model. wav_lens : torch.Tensor Lengths of the waveforms relative to the longest one in the batch, tensor of shape [batch]. The longest one should have relative length 1.0 and others len(waveform) / max_length. Used for ignoring padding. Returns ------- torch.Tensor The encoded batch """ wavs = wavs.float() wavs, wav_lens = wavs.to(self.device), wav_lens.to(self.device) encoder_out = self.mods.encoder(wavs, wav_lens) if self.transformer_beam_search: encoder_out = self.mods.transformer.encode(encoder_out, wav_lens) return encoder_out def transcribe_batch(self, wavs, wav_lens): """Transcribes the input audio into a sequence of words The waveforms should already be in the model's desired format. You can call: ``normalized = EncoderDecoderASR.normalizer(signal, sample_rate)`` to get a correctly converted signal in most cases. Arguments --------- wavs : torch.Tensor Batch of waveforms [batch, time, channels] or [batch, time] depending on the model. wav_lens : torch.Tensor Lengths of the waveforms relative to the longest one in the batch, tensor of shape [batch]. The longest one should have relative length 1.0 and others len(waveform) / max_length. Used for ignoring padding. Returns ------- list Each waveform in the batch transcribed. tensor Each predicted token id. """ with torch.no_grad(): wav_lens = wav_lens.to(self.device) encoder_out = self.encode_batch(wavs, wav_lens) if self.transducer_beam_search: inputs = [encoder_out] else: inputs = [encoder_out, wav_lens] predicted_tokens, _, _, _ = self.mods.decoder(*inputs) predicted_words = [ self.tokenizer.decode_ids(token_seq) for token_seq in predicted_tokens ] return predicted_words, predicted_tokens def forward(self, wavs, wav_lens): """Runs full transcription - note: no gradients through decoding""" return self.transcribe_batch(wavs, wav_lens) class EncoderASR(Pretrained): """A ready-to-use Encoder ASR model The class can be used either to run only the encoder (encode()) to extract features or to run the entire encoder + decoder function model (transcribe()) to transcribe speech. The given YAML must contain the fields specified in the *_NEEDED[] lists. Arguments --------- *args : tuple **kwargs : dict Arguments are forwarded to ``Pretrained`` parent class. Example ------- >>> from speechbrain.inference.ASR import EncoderASR >>> tmpdir = getfixture("tmpdir") >>> asr_model = EncoderASR.from_hparams( ... source="speechbrain/asr-wav2vec2-commonvoice-fr", ... savedir=tmpdir, ... ) # doctest: +SKIP >>> asr_model.transcribe_file("samples/audio_samples/example_fr.wav") # doctest: +SKIP """ HPARAMS_NEEDED = ["tokenizer", "decoding_function"] MODULES_NEEDED = ["encoder"] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.tokenizer = self.hparams.tokenizer self.set_decoding_function() def set_decoding_function(self): """Set the decoding function based on the parameters defined in the hyperparameter file. The decoding function is determined by the `decoding_function` specified in the hyperparameter file. It can be either a functools.partial object representing a decoding function or an instance of `speechbrain.decoders.ctc.CTCBaseSearcher` for beam search decoding. Raises: ValueError: If the decoding function is neither a functools.partial nor an instance of speechbrain.decoders.ctc.CTCBaseSearcher. Note: - For greedy decoding (functools.partial), the provided `decoding_function` is assigned directly. - For CTCBeamSearcher decoding, an instance of the specified `decoding_function` is created, and additional parameters are added based on the tokenizer type. """ # Greedy Decoding case if isinstance(self.hparams.decoding_function, functools.partial): self.decoding_function = self.hparams.decoding_function # CTCBeamSearcher case else: # 1. check if the decoding function is an instance of speechbrain.decoders.CTCBaseSearcher if issubclass( self.hparams.decoding_function, speechbrain.decoders.ctc.CTCBaseSearcher, ): # If so, we need to retrieve the vocab list from the tokenizer. # We also need to check if the tokenizer is a sentencepiece or a CTCTextEncoder. if isinstance( self.tokenizer, speechbrain.dataio.encoder.CTCTextEncoder ): ind2lab = self.tokenizer.ind2lab vocab_list = [ind2lab[x] for x in range(len(ind2lab))] elif isinstance( self.tokenizer, sentencepiece.SentencePieceProcessor ): vocab_list = [ self.tokenizer.id_to_piece(i) for i in range(self.tokenizer.vocab_size()) ] else: raise ValueError( "The tokenizer must be sentencepiece or CTCTextEncoder" ) # We can now instantiate the decoding class and add all the parameters if hasattr(self.hparams, "test_beam_search"): opt_beam_search_params = self.hparams.test_beam_search # check if the kenlm_model_path is provided and fetch it if necessary if "kenlm_model_path" in opt_beam_search_params: source, fl = split_path( opt_beam_search_params["kenlm_model_path"] ) kenlm_model_path = str( fetch( fl, source=source, savedir=self.hparams.savedir ) ) # we need to update the kenlm_model_path in the opt_beam_search_params opt_beam_search_params["kenlm_model_path"] = ( kenlm_model_path ) else: opt_beam_search_params = {} self.decoding_function = self.hparams.decoding_function( **opt_beam_search_params, vocab_list=vocab_list ) else: raise ValueError( "The decoding function must be an instance of speechbrain.decoders.CTCBaseSearcher" ) def transcribe_file(self, path, **kwargs): """Transcribes the given audiofile into a sequence of words. Arguments --------- path : str Path to audio file which to transcribe. **kwargs : dict Arguments forwarded to ``load_audio``. Returns ------- str The audiofile transcription produced by this ASR system. """ waveform = self.load_audio(path, **kwargs) # Fake a batch: batch = waveform.unsqueeze(0) rel_length = torch.tensor([1.0]) predicted_words, predicted_tokens = self.transcribe_batch( batch, rel_length ) return str(predicted_words[0]) def encode_batch(self, wavs, wav_lens): """Encodes the input audio into a sequence of hidden states The waveforms should already be in the model's desired format. You can call: ``normalized = EncoderASR.normalizer(signal, sample_rate)`` to get a correctly converted signal in most cases. Arguments --------- wavs : torch.Tensor Batch of waveforms [batch, time, channels] or [batch, time] depending on the model. wav_lens : torch.Tensor Lengths of the waveforms relative to the longest one in the batch, tensor of shape [batch]. The longest one should have relative length 1.0 and others len(waveform) / max_length. Used for ignoring padding. Returns ------- torch.Tensor The encoded batch """ wavs = wavs.float() wavs, wav_lens = wavs.to(self.device), wav_lens.to(self.device) encoder_out = self.mods.wav2vec(wavs, wav_lens) x = self.mods.dec(encoder_out) logits = self.mods.output_lin(x) p_ctc = self.hparams.softmax(logits) return p_ctc def transcribe_batch(self, wavs, wav_lens): """Transcribes the input audio into a sequence of words The waveforms should already be in the model's desired format. You can call: ``normalized = EncoderASR.normalizer(signal, sample_rate)`` to get a correctly converted signal in most cases. Arguments --------- wavs : torch.Tensor Batch of waveforms [batch, time, channels] or [batch, time] depending on the model. wav_lens : torch.Tensor Lengths of the waveforms relative to the longest one in the batch, tensor of shape [batch]. The longest one should have relative length 1.0 and others len(waveform) / max_length. Used for ignoring padding. Returns ------- list Each waveform in the batch transcribed. tensor Each predicted token id. """ with torch.no_grad(): wav_lens = wav_lens.to(self.device) encoder_out = self.encode_batch(wavs, wav_lens) predictions = self.decoding_function(encoder_out, wav_lens) print(predictions) is_ctc_text_encoder_tokenizer = isinstance( self.tokenizer, speechbrain.dataio.encoder.CTCTextEncoder ) self.tokenizer.load('sample_data/SLU/labelencoder.txt') if isinstance(self.hparams.decoding_function, functools.partial): if is_ctc_text_encoder_tokenizer: predicted_words = [ "".join(self.tokenizer.decode_ndim(token_seq)) for token_seq in predictions ] else: predicted_words = [ self.tokenizer.decode_ids(token_seq) for token_seq in predictions ] else: predicted_words = [hyp[0].text for hyp in predictions] return predicted_words, predictions def forward(self, wavs, wav_lens): """Runs the encoder""" return self.encode_batch(wavs, wav_lens) @dataclass class ASRWhisperSegment: """A single chunk of audio for Whisper ASR streaming. This object is intended to be mutated as streaming progresses and passed across calls to the lower-level APIs such as `encode_chunk`, `decode_chunk`, etc. Attributes ---------- start : float The start time of the audio chunk. end : float The end time of the audio chunk. chunk : torch.Tensor The audio chunk, shape [time, channels]. lang_id : str The language identifier associated with the audio chunk. words : str The predicted words for the audio chunk. tokens : List[int] The predicted tokens for the audio chunk. prompt : List[str] The prompt associated with the audio chunk. avg_log_probs : float The average log probability associated with the prediction. no_speech_prob : float The probability of no speech in the audio chunk. """ start: float end: float chunk: torch.Tensor lang_id: Optional[str] = None words: Optional[str] = None tokens: Optional[List[str]] = None prompt: Optional[List[str]] = None avg_log_probs: Optional[float] = None no_speech_prob: Optional[float] = None class WhisperASR(Pretrained): """A ready-to-use Whisper ASR model. The class can be used to run the entire encoder-decoder whisper model. The set of tasks supported are: ``transcribe``, ``translate``, and ``lang_id``. The given YAML must contains the fields specified in the *_NEEDED[] lists. Arguments --------- *args : tuple **kwargs : dict Arguments are forwarded to ``Pretrained`` parent class. Example ------- >>> from speechbrain.inference.ASR import WhisperASR >>> tmpdir = getfixture("tmpdir") >>> asr_model = WhisperASR.from_hparams(source="speechbrain/asr-whisper-medium-commonvoice-it", savedir=tmpdir,) # doctest: +SKIP >>> hyp = asr_model.transcribe_file("speechbrain/asr-whisper-medium-commonvoice-it/example-it.wav") # doctest: +SKIP >>> hyp # doctest: +SKIP buongiorno a tutti e benvenuti a bordo >>> _, probs = asr_model.detect_language_file("speechbrain/asr-whisper-medium-commonvoice-it/example-it.wav") # doctest: +SKIP >>> print(f"Detected language: {max(probs[0], key=probs[0].get)}") # doctest: +SKIP Detected language: it """ HPARAMS_NEEDED = ["language", "sample_rate"] MODULES_NEEDED = ["whisper", "decoder"] TASKS = ["transcribe", "translate", "lang_id"] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.tokenizer = self.hparams.whisper.tokenizer @torch.no_grad() def detect_language_file(self, path: str): """Detects the language of the given audiofile. This method only works on input_file of 30 seconds or less. Arguments --------- path : str Path to audio file which to transcribe. Returns ------- language_tokens : torch.Tensor The detected language tokens. language_probs : dict The probabilities of the detected language tokens. Raises ------ ValueError If the model doesn't have language tokens. """ wavs = self.load_audio(path).float().to(self.device).unsqueeze(0) mel = self.mods.whisper._get_mel(wavs) language_tokens, language_probs = self.mods.whisper.detect_language(mel) return language_tokens, language_probs @torch.no_grad() def detect_language_batch(self, wav: torch.Tensor): """Detects the language of the given wav Tensor. This method only works on wav files of 30 seconds or less. Arguments --------- wav : torch.tensor Batch of waveforms [batch, time, channels]. Returns ------- language_tokens : torch.Tensor of shape (batch_size,) ids of the most probable language tokens, which appears after the startoftranscript token. language_probs : List[Dict[str, float]] list of dictionaries containing the probability distribution over all languages. Raises ------ ValueError If the model doesn't have language tokens. Example ------- >>> from speechbrain.inference.ASR import WhisperASR >>> import torchaudio >>> tmpdir = getfixture("tmpdir") >>> asr_model = WhisperASR.from_hparams( ... source="speechbrain/asr-whisper-medium-commonvoice-it", ... savedir=tmpdir, ... ) # doctest: +SKIP >>> wav, _ = torchaudio.load("your_audio") # doctest: +SKIP >>> language_tokens, language_probs = asr_model.detect_language(wav) # doctest: +SKIP """ mel = self.mods.whisper._get_mel(wav) language_tokens, language_probs = self.mods.whisper.detect_language(mel) return language_tokens, language_probs @torch.no_grad() def _detect_language(self, mel: torch.Tensor, task: str): """Detects the language of the given mel spectrogram. Arguments --------- mel : torch.tensor Batch of mel spectrograms [batch, time, channels]. task : str The task to perform. Returns ------- language_tokens : Tensor, shape = (n_audio,) ids of the most probable language tokens, which appears after the startoftranscript token. language_probs : List[Dict[str, float]], length = n_audio list of dictionaries containing the probability distribution over all languages. """ languages = [self.mods.whisper.language] * mel.shape[0] lang_probs = None if self.mods.whisper.language is None or task == "lang_id": lang_tokens, lang_probs = self.mods.whisper.detect_language(mel) languages = [max(probs, key=probs.get) for probs in lang_probs] self.mods.decoder.set_lang_tokens(lang_tokens) return languages, lang_probs def _get_audio_stream( self, streamer: "torchaudio.io.StreamReader", frames_per_chunk: int ): """From a :class:`torchaudio.io.StreamReader`, identifies the audio stream and returns an iterable stream of chunks (after resampling and downmixing to mono). Arguments --------- streamer : torchaudio.io.StreamReader The stream object. Must hold exactly one source stream of an audio type. frames_per_chunk : int The number of frames per chunk. For a streaming model, this should be determined from the DynChunkTrain configuration. Yields ------ chunks from streamer """ stream_infos = [ streamer.get_src_stream_info(i) for i in range(streamer.num_src_streams) ] audio_stream_infos = [ (i, stream_info) for i, stream_info in enumerate(stream_infos) if stream_info.media_type == "audio" ] if len(audio_stream_infos) != 1: raise ValueError( f"Expected stream to have only 1 stream (with any number of channels), got {len(audio_stream_infos)} (with streams: {stream_infos})" ) # find the index of the first (and only) audio stream audio_stream_index = audio_stream_infos[0][0] # output stream #0 streamer.add_basic_audio_stream( frames_per_chunk=frames_per_chunk, stream_index=audio_stream_index, sample_rate=self.audio_normalizer.sample_rate, format="fltp", # torch.float32 num_channels=1, ) for (chunk,) in streamer.stream(): chunk = chunk.squeeze(-1) # we deal with mono, remove that dim chunk = chunk.unsqueeze(0) # create a fake batch dim yield chunk @torch.no_grad() def transcribe_file_streaming( self, path: str, task: Optional[str] = None, initial_prompt: Optional[str] = None, logprob_threshold: Optional[float] = -1.0, no_speech_threshold=0.6, condition_on_previous_text: bool = False, verbose: bool = False, use_torchaudio_streaming: bool = False, chunk_size: Optional[int] = 30, **kwargs, ): """Transcribes the given audiofile into a sequence of words. This method supports the following tasks: ``transcribe``, ``translate``, and ``lang_id``. It can process an input audio file longer than 30 seconds by splitting it into chunk_size-second segments. Arguments --------- path : str URI/path to the audio to transcribe. When ``use_torchaudio_streaming`` is ``False``, uses SB fetching to allow fetching from HF or a local file. When ``True``, resolves the URI through ffmpeg, as documented in :class:`torchaudio.io.StreamReader`. task : Optional[str] The task to perform. If None, the default task is the one passed in the Whisper model. initial_prompt : Optional[str] The initial prompt to condition the model on. logprob_threshold : Optional[float] The log probability threshold to continue decoding the current segment. no_speech_threshold : float The threshold to skip decoding segment if the no_speech_prob is higher than this value. condition_on_previous_text : bool If True, the model will be condition on the last 224 tokens. verbose : bool If True, print the transcription of each segment. use_torchaudio_streaming : bool Whether the audio file can be loaded in a streaming fashion. If not, transcription is still performed through chunks of audio, but the entire audio file is fetched and loaded at once. This skips the usual fetching method and instead resolves the URI using torchaudio (via ffmpeg). chunk_size : Optional[int] The size of the chunks to split the audio into. The default chunk size is 30 seconds which corresponds to the maximal length that the model can process in one go. **kwargs : dict Arguments forwarded to ``load_audio`` Yields ------ ASRWhisperSegment A new ASRWhisperSegment instance initialized with the provided parameters. """ if task is not None: if task in self.TASKS: if task != "lang_id": self.mods.decoder.set_task(task) else: raise ValueError( f"Task {task} not supported. Supported tasks are {self.TASKS}" ) # create chunks of chunk_size seconds num_frames_per_chunk = chunk_size * self.hparams.sample_rate if use_torchaudio_streaming: streamer = torchaudio.io.StreamReader(path) segments = self._get_audio_stream(streamer, num_frames_per_chunk) else: waveform = self.load_audio(path, **kwargs) batch = waveform.unsqueeze(0) segments = split_fixed_chunks(batch, num_frames_per_chunk) rel_length = torch.tensor([1.0]) all_tokens = [] prompt_reset_since = 0 if initial_prompt is not None: initial_prompt_tokens = self.whisper.tokenizer.encode( " " + initial_prompt.strip() ) all_tokens.extend(initial_prompt_tokens) else: initial_prompt_tokens = [] for i, segment in enumerate(tqdm(segments, disable=verbose)): # move the segment on the device segment = segment.to(self.device) # extract mel spectrogram mel_segment = self.mods.whisper._get_mel(segment) start = i * chunk_size end = (i + 1) * chunk_size encoder_out = self.mods.whisper.forward_encoder(mel_segment) languages, _ = self._detect_language(mel_segment, task) if task == "lang_id": yield ASRWhisperSegment( start=start, end=end, chunk=segment, lang_id=languages[0], ) continue prompt = all_tokens[prompt_reset_since:] self.mods.decoder.set_prompt(prompt) predicted_tokens, _, scores, _ = self.mods.decoder( encoder_out, rel_length ) avg_log_probs = scores.sum() / (len(predicted_tokens[0]) + 1) if no_speech_threshold is not None: should_skip = ( self.mods.decoder.no_speech_probs[0] > no_speech_threshold ) if ( logprob_threshold is not None and avg_log_probs > logprob_threshold ): # don't skip if the logprob is high enough, despite the no_speech_prob should_skip = False if should_skip: yield ASRWhisperSegment( start=start, end=end, chunk=segment, lang_id=languages[0], words="", tokens=[], prompt=prompt, avg_log_probs=avg_log_probs.item(), no_speech_prob=self.mods.decoder.no_speech_probs[0], ) continue predicted_words = [ self.tokenizer.decode(t, skip_special_tokens=True).strip() for t in predicted_tokens ] yield ASRWhisperSegment( start=start, end=end, chunk=segment, lang_id=languages[0], words=predicted_words[0], tokens=predicted_tokens[0], prompt=prompt, avg_log_probs=avg_log_probs.item(), no_speech_prob=self.mods.decoder.no_speech_probs[0], ) all_tokens.extend(predicted_tokens[0]) if ( not condition_on_previous_text or self.mods.decoder.temperature > 0.5 ): prompt_reset_since = len(all_tokens) def transcribe_file( self, path: str, task: Optional[str] = None, initial_prompt: Optional[str] = None, logprob_threshold: Optional[float] = -1.0, no_speech_threshold=0.6, condition_on_previous_text: bool = False, verbose: bool = False, use_torchaudio_streaming: bool = False, chunk_size: Optional[int] = 30, **kwargs, ) -> List[ASRWhisperSegment]: """Run the Whisper model using the specified task on the given audio file and return the ``ASRWhisperSegment`` objects for each segment. This method supports the following tasks: ``transcribe``, ``translate``, and ``lang_id``. It can process an input audio file longer than 30 seconds by splitting it into chunk_size-second segments. Arguments --------- path : str URI/path to the audio to transcribe. When ``use_torchaudio_streaming`` is ``False``, uses SB fetching to allow fetching from HF or a local file. When ``True``, resolves the URI through ffmpeg, as documented in :class:`torchaudio.io.StreamReader`. task : Optional[str] The task to perform. If None, the default task is the one passed in the Whisper model. It can be one of the following: ``transcribe``, ``translate``, ``lang_id``. initial_prompt : Optional[str] The initial prompt to condition the model on. logprob_threshold : Optional[float] The log probability threshold to continue decoding the current segment. no_speech_threshold : float The threshold to skip decoding segment if the no_speech_prob is higher than this value. condition_on_previous_text : bool If True, the model will be condition on the last 224 tokens. verbose : bool If True, print the details of each segment. use_torchaudio_streaming : bool Whether the audio file can be loaded in a streaming fashion. If not, transcription is still performed through chunks of audio, but the entire audio file is fetched and loaded at once. This skips the usual fetching method and instead resolves the URI using torchaudio (via ffmpeg). chunk_size : Optional[int] The size of the chunks to split the audio into. The default chunk size is 30 seconds which corresponds to the maximal length that the model can process in one go. **kwargs : dict Arguments forwarded to ``load_audio`` Returns ------- results : list A list of ``WhisperASRChunk`` objects, each containing the task result. """ results = [] for whisper_segment in self.transcribe_file_streaming( path, task=task, initial_prompt=initial_prompt, logprob_threshold=logprob_threshold, no_speech_threshold=no_speech_threshold, condition_on_previous_text=condition_on_previous_text, verbose=verbose, use_torchaudio_streaming=use_torchaudio_streaming, chunk_size=chunk_size, **kwargs, ): results.append(whisper_segment) if verbose: pred = ( whisper_segment.words if task != "lang_id" else whisper_segment.lang_id ) print( f"[{whisper_segment.start}s --> {whisper_segment.end}s] {pred}" ) return results def encode_batch(self, wavs, wav_lens): """Encodes the input audio into a sequence of hidden states The waveforms should already be in the model's desired format. You can call: ``normalized = EncoderDecoderASR.normalizer(signal, sample_rate)`` to get a correctly converted signal in most cases. Arguments --------- wavs : torch.tensor Batch of waveforms [batch, time, channels]. wav_lens : torch.tensor Lengths of the waveforms relative to the longest one in the batch, tensor of shape [batch]. The longest one should have relative length 1.0 and others len(waveform) / max_length. Used for ignoring padding. Returns ------- torch.tensor The encoded batch """ wavs = wavs.to(device=self.device, dtype=torch.float32) mel = self.mods.whisper._get_mel(wavs) encoder_out = self.mods.whisper.forward_encoder(mel) return encoder_out @torch.no_grad() def transcribe_batch(self, wavs, wav_lens): """Transcribes the input audio into a sequence of words The waveforms should already be in the model's desired format. You can call: ``normalized = EncoderDecoderASR.normalizer(signal, sample_rate)`` to get a correctly converted signal in most cases. Arguments --------- wavs : torch.tensor Batch of waveforms [batch, time, channels]. wav_lens : torch.tensor Lengths of the waveforms relative to the longest one in the batch, tensor of shape [batch]. The longest one should have relative length 1.0 and others len(waveform) / max_length. Used for ignoring padding. Returns ------- list Each waveform in the batch transcribed. tensor Each predicted token id. """ wav_lens = wav_lens.float().to(self.device) encoder_out = self.encode_batch(wavs, wav_lens) predicted_tokens, _, _, _ = self.mods.decoder(encoder_out, wav_lens) predicted_words = [ self.tokenizer.decode(t, skip_special_tokens=True).strip() for t in predicted_tokens ] if self.hparams.normalized_transcripts: predicted_words = [ self.tokenizer.normalize(text).split(" ") for text in predicted_words ] return predicted_words, predicted_tokens def forward(self, wavs, wav_lens): """Runs full transcription - note: no gradients through decoding""" return self.transcribe_batch(wavs, wav_lens) @dataclass class ASRStreamingContext: """Streaming metadata, initialized by :meth:`~StreamingASR.make_streaming_context` (see there for details on initialization of fields here). This object is intended to be mutate: the same object should be passed across calls as streaming progresses (namely when using the lower-level :meth:`~StreamingASR.encode_chunk`, etc. APIs). Holds some references to opaque streaming contexts, so the context is model-agnostic to an extent.""" config: DynChunkTrainConfig """Dynamic chunk training configuration used to initialize the streaming context. Cannot be modified on the fly.""" fea_extractor_context: Any """Opaque feature extractor streaming context.""" encoder_context: Any """Opaque encoder streaming context.""" decoder_context: Any """Opaque decoder streaming context.""" tokenizer_context: Optional[List[Any]] """Opaque streaming context for the tokenizer. Initially `None`. Initialized to a list of tokenizer contexts once batch size can be determined.""" class StreamingASR(Pretrained): """A ready-to-use, streaming-capable ASR model. Arguments --------- *args : tuple **kwargs : dict Arguments are forwarded to ``Pretrained`` parent class. Example ------- >>> from speechbrain.inference.ASR import StreamingASR >>> from speechbrain.utils.dynamic_chunk_training import DynChunkTrainConfig >>> tmpdir = getfixture("tmpdir") >>> asr_model = StreamingASR.from_hparams(source="speechbrain/asr-conformer-streaming-librispeech", savedir=tmpdir,) # doctest: +SKIP >>> asr_model.transcribe_file("speechbrain/asr-conformer-streaming-librispeech/test-en.wav", DynChunkTrainConfig(24, 8)) # doctest: +SKIP """ HPARAMS_NEEDED = [ "fea_streaming_extractor", "make_decoder_streaming_context", "decoding_function", "make_tokenizer_streaming_context", "tokenizer_decode_streaming", ] MODULES_NEEDED = ["enc", "proj_enc"] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.filter_props = self.hparams.fea_streaming_extractor.properties def _get_audio_stream( self, streamer: "torchaudio.io.StreamReader", frames_per_chunk: int ): """From a :class:`torchaudio.io.StreamReader`, identifies the audio stream and returns an iterable stream of chunks (after resampling and downmixing to mono). Arguments --------- streamer : torchaudio.io.StreamReader The stream object. Must hold exactly one source stream of an audio type. frames_per_chunk : int The number of frames per chunk. For a streaming model, this should be determined from the DynChunkTrain configuration. Yields ------ chunks from streamer """ stream_infos = [ streamer.get_src_stream_info(i) for i in range(streamer.num_src_streams) ] audio_stream_infos = [ (i, stream_info) for i, stream_info in enumerate(stream_infos) if stream_info.media_type == "audio" ] if len(audio_stream_infos) != 1: raise ValueError( f"Expected stream to have only 1 stream (with any number of channels), got {len(audio_stream_infos)} (with streams: {stream_infos})" ) # find the index of the first (and only) audio stream audio_stream_index = audio_stream_infos[0][0] # output stream #0 streamer.add_basic_audio_stream( frames_per_chunk=frames_per_chunk, stream_index=audio_stream_index, sample_rate=self.audio_normalizer.sample_rate, format="fltp", # torch.float32 num_channels=1, ) for (chunk,) in streamer.stream(): chunk = chunk.squeeze(-1) # we deal with mono, remove that dim chunk = chunk.unsqueeze(0) # create a fake batch dim yield chunk def transcribe_file_streaming( self, path, dynchunktrain_config: DynChunkTrainConfig, use_torchaudio_streaming: bool = True, **kwargs, ): """Transcribes the given audio file into a sequence of words, in a streaming fashion, meaning that text is being yield from this generator, in the form of strings to concatenate. Arguments --------- path : str URI/path to the audio to transcribe. When ``use_torchaudio_streaming`` is ``False``, uses SB fetching to allow fetching from HF or a local file. When ``True``, resolves the URI through ffmpeg, as documented in :class:`torchaudio.io.StreamReader`. dynchunktrain_config : DynChunkTrainConfig Streaming configuration. Sane values and how much time chunks actually represent is model-dependent. use_torchaudio_streaming : bool Whether the audio file can be loaded in a streaming fashion. If not, transcription is still performed through chunks of audio, but the entire audio file is fetched and loaded at once. This skips the usual fetching method and instead resolves the URI using torchaudio (via ffmpeg). **kwargs : dict Arguments forwarded to ``load_audio`` Yields ------ generator of str An iterator yielding transcribed chunks (strings). There is a yield for every chunk, even if the transcribed string for that chunk is an empty string. """ chunk_size = self.get_chunk_size_frames(dynchunktrain_config) if use_torchaudio_streaming: streamer = torchaudio.io.StreamReader(path) chunks = self._get_audio_stream(streamer, chunk_size) else: waveform = self.load_audio(path, **kwargs) batch = waveform.unsqueeze(0) # create batch dim chunks = split_fixed_chunks(batch, chunk_size) rel_length = torch.tensor([1.0]) context = self.make_streaming_context(dynchunktrain_config) final_chunks = [ torch.zeros((1, chunk_size), device=self.device) ] * self.hparams.fea_streaming_extractor.get_recommended_final_chunk_count( chunk_size ) for chunk in itertools.chain(chunks, final_chunks): predicted_words = self.transcribe_chunk(context, chunk, rel_length) yield predicted_words[0] def transcribe_file( self, path, dynchunktrain_config: DynChunkTrainConfig, use_torchaudio_streaming: bool = True, ): """Transcribes the given audio file into a sequence of words. Arguments --------- path : str URI/path to the audio to transcribe. When ``use_torchaudio_streaming`` is ``False``, uses SB fetching to allow fetching from HF or a local file. When ``True``, resolves the URI through ffmpeg, as documented in :class:`torchaudio.io.StreamReader`. dynchunktrain_config : DynChunkTrainConfig Streaming configuration. Sane values and how much time chunks actually represent is model-dependent. use_torchaudio_streaming : bool Whether the audio file can be loaded in a streaming fashion. If not, transcription is still performed through chunks of audio, but the entire audio file is fetched and loaded at once. This skips the usual fetching method and instead resolves the URI using torchaudio (via ffmpeg). Returns ------- str The audio file transcription produced by this ASR system. """ pred = "" for text_chunk in self.transcribe_file_streaming( path, dynchunktrain_config, use_torchaudio_streaming ): pred += text_chunk return pred def make_streaming_context(self, dynchunktrain_config: DynChunkTrainConfig): """Create a blank streaming context to be passed around for chunk encoding/transcription. Arguments --------- dynchunktrain_config : DynChunkTrainConfig Streaming configuration. Sane values and how much time chunks actually represent is model-dependent. Returns ------- ASRStreamingContext """ return ASRStreamingContext( config=dynchunktrain_config, fea_extractor_context=self.hparams.fea_streaming_extractor.make_streaming_context(), encoder_context=self.mods.enc.make_streaming_context( dynchunktrain_config ), decoder_context=self.hparams.make_decoder_streaming_context(), tokenizer_context=None, ) def get_chunk_size_frames( self, dynchunktrain_config: DynChunkTrainConfig ) -> int: """Returns the chunk size in actual audio samples, i.e. the exact expected length along the time dimension of an input chunk tensor (as passed to :meth:`~StreamingASR.encode_chunk` and similar low-level streaming functions). Arguments --------- dynchunktrain_config : DynChunkTrainConfig The streaming configuration to determine the chunk frame count of. Returns ------- chunk size """ return (self.filter_props.stride - 1) * dynchunktrain_config.chunk_size @torch.no_grad() def encode_chunk( self, context: ASRStreamingContext, chunk: torch.Tensor, chunk_len: Optional[torch.Tensor] = None, ): """Encoding of a batch of audio chunks into a batch of encoded sequences. For full speech-to-text offline transcription, use `transcribe_batch` or `transcribe_file`. Must be called over a given context in the correct order of chunks over time. Arguments --------- context : ASRStreamingContext Mutable streaming context object, which must be specified and reused across calls when streaming. You can obtain an initial context by calling `asr.make_streaming_context(config)`. chunk : torch.Tensor The tensor for an audio chunk of shape `[batch size, time]`. The time dimension must strictly match `asr.get_chunk_size_frames(config)`. The waveform is expected to be in the model's expected format (i.e. the sampling rate must be correct). chunk_len : torch.Tensor, optional The relative chunk length tensor of shape `[batch size]`. This is to be used when the audio in one of the chunks of the batch is ending within this chunk. If unspecified, equivalent to `torch.ones((batch_size,))`. Returns ------- torch.Tensor Encoded output, of a model-dependent shape.""" if chunk_len is None: chunk_len = torch.ones((chunk.size(0),)) chunk = chunk.float() chunk, chunk_len = chunk.to(self.device), chunk_len.to(self.device) assert chunk.shape[-1] <= self.get_chunk_size_frames(context.config) x = self.hparams.fea_streaming_extractor( chunk, context=context.fea_extractor_context, lengths=chunk_len ) x = self.mods.enc.forward_streaming(x, context.encoder_context) x = self.mods.proj_enc(x) return x @torch.no_grad() def decode_chunk( self, context: ASRStreamingContext, x: torch.Tensor ) -> Tuple[List[str], List[List[int]]]: """Decodes the output of the encoder into tokens and the associated transcription. Must be called over a given context in the correct order of chunks over time. Arguments --------- context : ASRStreamingContext Mutable streaming context object, which should be the same object that was passed to `encode_chunk`. x : torch.Tensor The output of `encode_chunk` for a given chunk. Returns ------- list of str Decoded tokens of length `batch_size`. The decoded strings can be of 0-length. list of list of output token hypotheses List of length `batch_size`, each holding a list of tokens of any length `>=0`. """ tokens = self.hparams.decoding_function(x, context.decoder_context) # initialize token context for real now that we know the batch size if context.tokenizer_context is None: context.tokenizer_context = [ self.hparams.make_tokenizer_streaming_context() for _ in range(len(tokens)) ] words = [ self.hparams.tokenizer_decode_streaming( self.hparams.tokenizer, cur_tokens, context.tokenizer_context[i] ) for i, cur_tokens in enumerate(tokens) ] return words, tokens def transcribe_chunk( self, context: ASRStreamingContext, chunk: torch.Tensor, chunk_len: Optional[torch.Tensor] = None, ): """Transcription of a batch of audio chunks into transcribed text. Must be called over a given context in the correct order of chunks over time. Arguments --------- context : ASRStreamingContext Mutable streaming context object, which must be specified and reused across calls when streaming. You can obtain an initial context by calling `asr.make_streaming_context(config)`. chunk : torch.Tensor The tensor for an audio chunk of shape `[batch size, time]`. The time dimension must strictly match `asr.get_chunk_size_frames(config)`. The waveform is expected to be in the model's expected format (i.e. the sampling rate must be correct). chunk_len : torch.Tensor, optional The relative chunk length tensor of shape `[batch size]`. This is to be used when the audio in one of the chunks of the batch is ending within this chunk. If unspecified, equivalent to `torch.ones((batch_size,))`. Returns ------- str Transcribed string for this chunk, might be of length zero. """ if chunk_len is None: chunk_len = torch.ones((chunk.size(0),)) chunk = chunk.float() chunk, chunk_len = chunk.to(self.device), chunk_len.to(self.device) x = self.encode_chunk(context, chunk, chunk_len) words, _ = self.decode_chunk(context, x) return words