|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from IPython.display import Audio |
|
import IPython.display as ipd |
|
from scipy.io import wavfile |
|
import numpy as np |
|
import warnings |
|
import re |
|
warnings.filterwarnings("ignore") |
|
import soundfile as sf |
|
import librosa |
|
import torch |
|
import os |
|
import soundfile as sf |
|
import librosa |
|
import noisereduce as nr |
|
import numpy as np |
|
import gradio as gr |
|
import pyloudnorm as pyln |
|
|
|
from transformers import Wav2Vec2ForCTC, Wav2Vec2Tokenizer |
|
from transformers import AutoModelForCTC, AutoProcessor, AutoTokenizer, AutoModelForCausalLM |
|
from transformers import pipeline, AutoProcessor, AutoModelForSpeechSeq2Seq |
|
import pandas as pd |
|
from transformers import pipeline, AutoModelForAudioClassification, AutoProcessor |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
device = "cuda:0" if torch.cuda.is_available() else "cpu" |
|
|
|
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 |
|
lid_model_id = "facebook/mms-lid-126" |
|
lid_pipeline = pipeline("audio-classification", model=lid_model_id,device=device) |
|
language_mapping = { |
|
"hin": "hindi", |
|
"ben": "bengali", |
|
"eng": "english", |
|
"guj": "gujarati" |
|
} |
|
|
|
|
|
|
|
|
|
|
|
def detect_language_for_audio_file(audio_file_path, lid_pipeline, target_sampling_rate=16000): |
|
""" |
|
Detects the language of a given audio file and returns a DataFrame. |
|
|
|
Parameters: |
|
- audio_file_path (str): The path to the audio file. |
|
- lid_pipeline: The language identification pipeline. |
|
- target_sampling_rate (int): The target sampling rate for the audio file. Default is 16000. |
|
|
|
Returns: |
|
- df (pd.DataFrame): A DataFrame containing the detected language and filename. |
|
""" |
|
detected_languages = [] |
|
audio_filenames = [] |
|
|
|
filename = os.path.basename(audio_file_path) |
|
waveform, original_sampling_rate = librosa.load(audio_file_path, sr=None) |
|
|
|
if len(waveform.shape) > 1: |
|
waveform = librosa.to_mono(waveform) |
|
|
|
if original_sampling_rate != target_sampling_rate: |
|
waveform = librosa.resample(waveform, orig_sr=original_sampling_rate, target_sr=target_sampling_rate) |
|
|
|
|
|
lid_result = lid_pipeline(waveform, sampling_rate=target_sampling_rate) |
|
detected_language = lid_result[0]['label'].split('_')[0] |
|
print(f"Detected language for {filename}: {detected_language}") |
|
|
|
detected_languages.append(detected_language) |
|
audio_filenames.append(filename) |
|
|
|
df = pd.DataFrame({ |
|
"Detected_Language": detected_languages, |
|
"Audio_Filename": audio_filenames |
|
}) |
|
|
|
|
|
|
|
df['Detected_Language'] = df['Detected_Language'].map(language_mapping) |
|
|
|
df.dropna(inplace=True, axis= 0) |
|
|
|
|
|
model_names = [] |
|
|
|
for index, row in df.iterrows(): |
|
detected_language = row['Detected_Language'] |
|
|
|
model_name = "ai4bharat/indicwav2vec_v1_" + detected_language |
|
|
|
model_names.append(model_name) |
|
|
|
df['Model_Name'] = model_names |
|
|
|
return df |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
loaded_models = {} |
|
current_loaded_model = None |
|
|
|
def load_model_and_tokenizer(standardized_language): |
|
global current_loaded_model |
|
|
|
|
|
if standardized_language in loaded_models: |
|
return loaded_models[standardized_language] |
|
|
|
|
|
if current_loaded_model == standardized_language: |
|
return loaded_models[standardized_language] |
|
|
|
|
|
elif current_loaded_model is not None: |
|
del loaded_models[current_loaded_model] |
|
torch.cuda.empty_cache() |
|
current_loaded_model = None |
|
|
|
|
|
if standardized_language == 'hindi': |
|
model_name = "ai4bharat/indicwav2vec-hindi" |
|
elif standardized_language == 'odia': |
|
model_name = "ai4bharat/indicwav2vec-odia" |
|
elif standardized_language == 'english': |
|
model_name = "facebook/wav2vec2-large-960h-lv60-self" |
|
else: |
|
model_name = "ai4bharat/indicwav2vec_v1_" + standardized_language |
|
|
|
|
|
model = Wav2Vec2ForCTC.from_pretrained(model_name) |
|
tokenizer = Wav2Vec2Tokenizer.from_pretrained(model_name) |
|
|
|
|
|
loaded_models[standardized_language] = (model, tokenizer) |
|
current_loaded_model = standardized_language |
|
|
|
return model, tokenizer |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def perform_transcription(df): |
|
|
|
transcriptions = [] |
|
|
|
for index, row in df.iterrows(): |
|
audio_file_path = row['Audio_Filename'] |
|
detected_language = row['Detected_Language'] |
|
|
|
standardized_language = language_mapping.get(detected_language, detected_language) |
|
model, tokenizer = load_model_and_tokenizer(standardized_language) |
|
|
|
input_audio, _ = librosa.load(audio_file_path, sr=16000) |
|
input_values = tokenizer(input_audio, return_tensors="pt").input_values |
|
|
|
with torch.no_grad(): |
|
logits = model(input_values).logits |
|
|
|
predicted_ids = torch.argmax(logits, dim=-1) |
|
text = tokenizer.batch_decode(predicted_ids)[0] |
|
|
|
transcriptions.append(text) |
|
|
|
df['Transcription'] = transcriptions |
|
|
|
return df |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained("soketlabs/pragna-1b", token=os.environ.get('HF_TOKEN')) |
|
model = AutoModelForCausalLM.from_pretrained( |
|
"soketlabs/pragna-1b", |
|
token=os.environ.get('HF_TOKEN'), |
|
revision='3c5b8b1309f7d89710331ba2f164570608af0de7' |
|
) |
|
model.load_adapter('soketlabs/pragna-1b-it-v0.1', token=os.environ.get('HF_TOKEN')) |
|
model = model.to(device) |
|
|
|
|
|
|
|
def generate_response(transcription): |
|
try: |
|
messages = [ |
|
{"role": "system", "content": " you are a friendly bot to help the user"}, |
|
{"role": "user", "content": transcription}, |
|
] |
|
tokenized_chat = tokenizer.apply_chat_template(messages, tokenize=True, add_generation_prompt=True, return_tensors="pt") |
|
input_ids = tokenized_chat[0].to(device) |
|
if len(input_ids.shape) == 1: |
|
input_ids = input_ids.unsqueeze(0) |
|
with torch.no_grad(): |
|
output = model.generate( |
|
input_ids, |
|
max_new_tokens=300, |
|
do_sample=True, |
|
top_k=5, |
|
num_beams=1, |
|
use_cache=False, |
|
temperature=0.2, |
|
repetition_penalty=1.1, |
|
) |
|
generated_text = tokenizer.decode(output[0], skip_special_tokens=True) |
|
return find_last_sentence(generated_text) |
|
except Exception as e: |
|
print("Error during response generation:", e) |
|
return "Response generation error: " + str(e) |
|
|
|
|
|
def find_last_sentence(text): |
|
sentence_endings = re.finditer(r'[।?!]', text) |
|
end_positions = [ending.end() for ending in sentence_endings] |
|
if end_positions: |
|
return text[:end_positions[-1]] |
|
return text |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_text_and_display_audio(row, model, tokenizer): |
|
audio_file = row['Audio_Filename'] |
|
transcription = row['Transcription'] |
|
|
|
|
|
generated_text = generate_response(transcription) |
|
|
|
generated_text = find_last_sentence(generated_text) |
|
|
|
|
|
return transcription, generated_text |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def spectral_subtraction(audio_data, sample_rate): |
|
|
|
stft = librosa.stft(audio_data) |
|
|
|
|
|
power_spec = np.abs(stft)**2 |
|
|
|
|
|
noise_power = np.median(power_spec, axis=1) |
|
|
|
|
|
alpha = 2.0 |
|
denoised_spec = np.maximum(power_spec - alpha * noise_power[:, np.newaxis], 0) |
|
|
|
|
|
denoised_audio = librosa.istft(np.sqrt(denoised_spec) * np.exp(1j * np.angle(stft))) |
|
|
|
return denoised_audio |
|
|
|
def apply_compression(audio_data, sample_rate): |
|
|
|
meter = pyln.Meter(sample_rate) |
|
loudness = meter.integrated_loudness(audio_data) |
|
|
|
|
|
loud_norm = pyln.normalize.loudness(audio_data, loudness, -24.0) |
|
|
|
return loud_norm |
|
|
|
def process_audio(audio_file_path): |
|
try: |
|
|
|
audio_data, sample_rate = librosa.load(audio_file_path) |
|
print(f"Read audio data: {audio_file_path}, Sample Rate: {sample_rate}") |
|
|
|
|
|
reduced_noise = nr.reduce_noise(y=audio_data, sr=sample_rate) |
|
print("Noise reduction applied") |
|
|
|
|
|
denoised_audio = spectral_subtraction(reduced_noise, sample_rate) |
|
print("Spectral subtraction applied") |
|
|
|
|
|
compressed_audio = apply_compression(denoised_audio, sample_rate) |
|
print("Dynamic range compression applied") |
|
|
|
|
|
final_audio = librosa.effects.trim(compressed_audio)[0] |
|
print("Silences trimmed") |
|
|
|
|
|
processed_file_path = 'processed_audio.wav' |
|
sf.write(processed_file_path, final_audio, sample_rate) |
|
print(f"Processed audio saved to: {processed_file_path}") |
|
|
|
|
|
if not os.path.isfile(processed_file_path): |
|
raise FileNotFoundError(f"Processed file not found: {processed_file_path}") |
|
|
|
|
|
processed_audio_data, _ = librosa.load(processed_file_path) |
|
print(f"Processed audio reloaded for transcription: {processed_file_path}") |
|
|
|
df = detect_language_for_audio_file(processed_file_path, lid_pipeline) |
|
print(df) |
|
df_transcription= perform_transcription(df) |
|
print(df_transcription) |
|
for index, row in df_transcription.iterrows(): |
|
print(index, row) |
|
transcription, response = generate_text_and_display_audio(row, model, tokenizer) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return processed_file_path, transcription, response |
|
except Exception as e: |
|
print("Error during audio processing:", e) |
|
return "Error during audio processing:", str(e) |
|
|
|
|
|
|
|
iface = gr.Interface( |
|
fn=process_audio, |
|
inputs=gr.Audio(label="Record Audio", type="filepath"), |
|
outputs=[gr.Audio(label="Processed Audio"), gr.Textbox(label="Transcription"), gr.Textbox(label="Response")] |
|
) |
|
|
|
iface.launch(share=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|