File size: 8,737 Bytes
1a10833 |
|
#!/usr/bin/env python3
#############################################################
# eval.sh contains the commands to run evaluation properly
############################################################
import argparse
import sys
import re
from typing import Dict
import torch
from datasets import Audio, Dataset, load_dataset, load_metric
from pyctcdecode import BeamSearchDecoderCTC
from transformers import AutoFeatureExtractor, AutoTokenizer, pipeline
import transformers
import Levenshtein
import hunspell
dutch_unigrams = set(open('language_model/unigrams.txt').read().splitlines())
dutch_hobj = hunspell.HunSpell('dictionaries/nl.dic', 'dictionaries/nl.aff')
MOST_COMMON_WORDS = 'ik|je|het|de|is|dat|een|niet|en|wat|van|we|in|ze|op|te|hij|zijn|er|maar|me|die|heb|voor|met|als|ben|was|n|mijn|u|dit|aan|hier|om|naar|dan|jij|weet|ja|kan|geen|zo|nog|wil|wel|moet|goed|hem|hebben|nee|heeft|waar|nu|hoe|ga|t|kom|uit|gaan|bent|haar|doen|ook|mij|over|of|daar|zou|al|jullie|bij|ons|zal|gaat|hebt|meer|waarom|iets|laat|deze|had|doe|wie|jou|moeten|alles|denk|kunnen|eens|echt|man|weg|door|oké|toch|zien|alleen|s|nou'.split('|')
def log_results(result: Dataset, args: Dict[str, str]):
"""DO NOT CHANGE. This function computes and logs the result metrics."""
log_outputs = args.log_outputs
dataset_id = "_".join(args.dataset.split("/") + [args.config, args.split])
# load metric
wer = load_metric("wer")
cer = load_metric("cer")
# compute metrics
wer_result = wer.compute(references=result["target"], predictions=result["prediction"])
cer_result = cer.compute(references=result["target"], predictions=result["prediction"])
# print & log results
result_str = f"WER: {wer_result}\n" f"CER: {cer_result}"
print(result_str)
with open(f"{dataset_id}_eval_results.txt", "w") as f:
f.write(result_str)
# log all results in text file. Possibly interesting for analysis
if log_outputs is not None:
pred_file = f"log_{dataset_id}_predictions.txt"
target_file = f"log_{dataset_id}_targets.txt"
with open(pred_file, "w") as p, open(target_file, "w") as t:
# mapping function to write output
def write_to_file(batch, i):
p.write(f"{i}" + "\n")
p.write(batch["prediction"] + "\n")
t.write(f"{i}" + "\n")
t.write(batch["target"] + "\n")
result.map(write_to_file, with_indices=True)
def normalize_text(text: str) -> str:
"""DO ADAPT FOR YOUR USE CASE. this function normalizes the target text."""
chars_to_ignore_regex = '[,?.!\-\;\:"“%‘”�—’…–]' # noqa: W605 IMPORTANT: this should correspond to the chars that were ignored during training
text = re.sub(chars_to_ignore_regex, "", text.lower())
text = re.sub(r'[\n\s]+', ' ', text)
return text
def main(args):
# load dataset
dataset = load_dataset(args.dataset, args.config, split=args.split, use_auth_token=True)
# for testing: only process the first two examples as a test
# dataset = dataset.select(range(10))
# load processor
feature_extractor = AutoFeatureExtractor.from_pretrained(args.model_id)
sampling_rate = feature_extractor.sampling_rate
# resample audio
dataset = dataset.cast_column("audio", Audio(sampling_rate=sampling_rate))
# load eval pipeline
if args.device is None:
args.device = 0 if torch.cuda.is_available() else -1
config = transformers.PretrainedConfig.from_pretrained(args.model_id)
model=transformers.Wav2Vec2ForCTC.from_pretrained(args.model_id)
tokenizer = AutoTokenizer.from_pretrained(args.model_id)
processor = transformers.AutoProcessor.from_pretrained(args.model_id)
language_model = BeamSearchDecoderCTC.model_container[processor.decoder._model_key]._kenlm_model
#asr = pipeline("automatic-speech-recognition", model=args.model_id, device=args.device)
asr = pipeline("automatic-speech-recognition", config=config, model=model, tokenizer=tokenizer, feature_extractor=feature_extractor, decoder=processor.decoder, device=args.device)
# map function to decode audio
def map_to_pred(batch):
prediction = asr(
batch["audio"]["array"], chunk_length_s=args.chunk_length_s, stride_length_s=args.stride_length_s
)
text = prediction["text"]
#print('### STARTING TO FIND TYPOS')
text_words = text.split(' ')
is_known_word = lambda word: (len(word) == 0) or (word in dutch_unigrams) or (dutch_hobj.spell(word))
for index in range(len(text_words)):
curr_word = text_words[index]
if is_known_word(curr_word): continue
prev_word = text_words[index-1] if index>0 else '<s>'
next_word = text_words[index+1] if index<len(text_words)-1 else '</s>'
BASE_PENALITY = -2
EDIT_PENALITY = -0.5
curr_word_letters = curr_word.replace("-",'').replace("'",'')
best_word = curr_word
best_score = language_model.score(prev_word + ' ' + curr_word + ' ' + next_word) + BASE_PENALITY
#print(prev_word + ' ' + curr_word + ' ' + next_word + ' == ' + str(best_score))
# typos suggestions by hunspell
all_suggestions = list(dutch_hobj.suggest(curr_word))
# diphtongs flattened: a common faillure mode of pyctcdecode for dutch
if curr_word.endswith('lik'):
all_suggestions.append(curr_word[0:-3] + 'lijk')
# words merged: a common failure mode of pyctcdecode for dutch
for most_common_word in MOST_COMMON_WORDS:
if curr_word.endswith(most_common_word):
all_suggestions.append(curr_word[0:-len(most_common_word)] + ' ' + most_common_word)
# look at all the suggestions and see if somethings look better
for sugg_word in all_suggestions:
sugg_word = sugg_word.lower()
#if sugg_word == curr_word or sugg_word == best_word: continue
sugg_word_letters = sugg_word.replace("-",'').replace("'",'')
sugg_distance = Levenshtein.distance(curr_word_letters, sugg_word_letters)
sugg_distance = sugg_distance if sugg_distance > 0 else -3 # bonus for perfect match
sugg_score = language_model.score(prev_word + ' ' + sugg_word + ' ' + next_word) + EDIT_PENALITY * sugg_distance
#print(prev_word + ' ' + sugg_word + ' ' + next_word + ' == ' + str(sugg_score))
if sugg_score > best_score:
best_score = sugg_score
best_word = sugg_word
if best_word != curr_word:
text_words[index] = best_word
#print(curr_word + ' ===> ' + best_word)
#print('### DONE FINDING TYPOS')
text = " ".join(text_words)
batch["prediction"] = text
batch["target"] = normalize_text(batch["sentence"])
return batch
# run inference on all examples
result = dataset.map(map_to_pred, remove_columns=dataset.column_names)
# compute and log_results
# do not change function below
log_results(result, args)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--model_id", type=str, required=True, help="Model identifier. Should be loadable with 🤗 Transformers"
)
parser.add_argument(
"--dataset",
type=str,
required=True,
help="Dataset name to evaluate the `model_id`. Should be loadable with 🤗 Datasets",
)
parser.add_argument(
"--config", type=str, required=True, help="Config of the dataset. *E.g.* `'en'` for Common Voice"
)
parser.add_argument("--split", type=str, required=True, help="Split of the dataset. *E.g.* `'test'`")
parser.add_argument(
"--chunk_length_s", type=float, default=None, help="Chunk length in seconds. Defaults to 5 seconds."
)
parser.add_argument(
"--stride_length_s", type=float, default=None, help="Stride of the audio chunks. Defaults to 1 second."
)
parser.add_argument(
"--log_outputs", action="store_true", help="If defined, write outputs to log file for analysis."
)
parser.add_argument(
"--device",
type=int,
default=None,
help="The device to run the pipeline on. -1 for CPU (default), 0 for the first GPU and so on.",
)
args = parser.parse_args()
main(args)
|