import numpy as np |
import librosa |
import os |
import re |
import random |
from sklearn.preprocessing import StandardScaler |
from sklearn.model_selection import train_test_split, KFold |
from sklearn.metrics import roc_curve |
import matplotlib.pyplot as plt |
import torch |
import torch.nn as nn |
import torch.nn.functional as F |
from torch.utils.data import Dataset, DataLoader, Subset |
def extract_mfcc_and_pitch(audio_path, sr=16000, n_mfcc=40): |
""" |
Ekstrak fitur MFCC dan pitch dari file audio |
""" |
audio, sr = librosa.load(audio_path, sr=sr) |
mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=n_mfcc) |
mfcc = (mfcc - np.mean(mfcc)) / np.std(mfcc) |
pitch = librosa.yin(audio, fmin=librosa.note_to_hz('C2'), fmax=librosa.note_to_hz('C6')) |
pitch = np.nan_to_num(pitch, nan=np.nanmean(pitch)) |
pitch = (pitch - np.mean(pitch)) / np.std(pitch) |
pitch = pitch.reshape(1, -1) |
combined_features = np.vstack([mfcc, pitch]) |
return combined_features |
class XVectorNet(nn.Module): |
def __init__(self, input_dim=41, dropout_rate=0.45): |
super(XVectorNet, self).__init__() |
self.layer1 = nn.Conv1d(input_dim, 512, 5, padding=2) |
self.dropout1 = nn.Dropout(dropout_rate) |
self.layer2 = nn.Conv1d(512, 512, 3, padding=1) |
self.dropout2 = nn.Dropout(dropout_rate) |
self.layer3 = nn.Conv1d(512, 512, 3, padding=1) |
self.dropout3 = nn.Dropout(dropout_rate) |
self.layer4 = nn.Conv1d(512, 512, 1) |
self.dropout4 = nn.Dropout(dropout_rate) |
self.layer5 = nn.Conv1d(512, 1500, 1) |
self.stats_pooling = StatsPooling() |
self.layer6 = nn.Linear(3000, 512) |
self.dropout6 = nn.Dropout(dropout_rate) |
self.layer7 = nn.Linear(512, 512) |
self.dropout7 = nn.Dropout(dropout_rate) |
self.output = nn.Linear(512, 2) |
def forward(self, x): |
x = F.relu(self.layer1(x)) |
x = self.dropout1(x) |
x = F.relu(self.layer2(x)) |
x = self.dropout2(x) |
x = F.relu(self.layer3(x)) |
x = self.dropout3(x) |
x = F.relu(self.layer4(x)) |
x = self.dropout4(x) |
x = F.relu(self.layer5(x)) |
x = self.stats_pooling(x) |
x = F.relu(self.layer6(x)) |
x = self.dropout6(x) |
x = F.relu(self.layer7(x)) |
x = self.dropout7(x) |
x = self.output(x) |
return x |
class StatsPooling(nn.Module): |
def forward(self, x): |
mean = torch.mean(x, dim=2) |
std = torch.std(x, dim=2) |
return torch.cat((mean, std), dim=1) |
def compute_eer(y_true, y_scores): |
""" |
Menghitung Equal Error Rate (EER) dari predicted scores |
Args: |
y_true: Label yang benar (ground truth) |
y_scores: Probability scores dari model (untuk kelas positif) |
Returns: |
eer: Equal Error Rate |
threshold: Threshold optimal di titik EER |
""" |
fpr, tpr, thresholds = roc_curve(y_true, y_scores) |
fnr = 1 - tpr |
eer_threshold = thresholds[np.nanargmin(np.absolute((fnr - fpr)))] |
eer = fpr[np.nanargmin(np.absolute((fnr - fpr)))] |
return eer, eer_threshold |
def evaluate_model(model, data_loader, device): |
""" |
Evaluasi model dan hitung EER |
""" |
model.eval() |
all_scores = [] |
all_labels = [] |
with torch.no_grad(): |
for data, target in data_loader: |
data, target = data.to(device), target.to(device) |
output = model(data) |
scores = F.softmax(output, dim=1)[:, 1] |
all_scores.extend(scores.cpu().numpy()) |
all_labels.extend(target.cpu().numpy()) |
all_scores = np.array(all_scores) |
all_labels = np.array(all_labels) |
eer, threshold = compute_eer(all_labels, all_scores) |
return eer, threshold |
def extract_number(file_name): |
"""Extract number from filename for proper sorting""" |
match = re.search(r'segment_(\d+)', file_name) |
if match: |
return int(match.group(1)) |
return -1 |
def get_sorted_files(directory): |
"""Get alphabetically sorted files from directory""" |
files = [f for f in os.listdir(directory) if f.endswith('.wav')] |
return sorted(files, key=extract_number) |
class SpeakerDataset(Dataset): |
def __init__(self, data_dir, target_speaker): |
self.data = [] |
self.labels = [] |
pos_dir = os.path.join(data_dir, target_speaker) |
pos_files = get_sorted_files(pos_dir) |
for file in pos_files: |
self.data.append(os.path.join(pos_dir, file)) |
self.labels.append(1) |
print(f"Target Speaker Directory: {pos_dir}") |
print(f"Total Positive Samples (Class 1): {self.labels.count(1)}") |
speakers = sorted([s for s in os.listdir(data_dir) if s != target_speaker]) |
print(f"Other speakers: {speakers}") |
samples_per_negative = self.labels.count(1) // len(speakers) |
print(f"Samples per negative speaker: {samples_per_negative}") |
def generate_speaker_indices(wav_files, num_speakers): |
""" |
Generate indices for each speaker's negative samples, maintaining alphabetical order |
""" |
total_files = len(wav_files) |
files_per_speaker = total_files // (num_speakers - 1) |
speaker_indices = [] |
start_idx = 0 |
for i in range(num_speakers - 1): |
if i < num_speakers - 2: |
end_idx = start_idx + files_per_speaker |
indices = list(range(start_idx, end_idx)) |
else: |
indices = list(range(start_idx, total_files)) |
speaker_indices.append(indices) |
start_idx = end_idx |
return speaker_indices |
for speaker_idx, speaker in enumerate(speakers): |
neg_dir = os.path.join(data_dir, speaker) |
wav_files = get_sorted_files(neg_dir) |
speaker_indices = generate_speaker_indices(wav_files, len(speakers) + 1) |
indices = speaker_indices[speaker_idx] |
if samples_per_negative > 0: |
indices = indices[:samples_per_negative] |
print(f"Speaker: {speaker}, using indices: {indices}") |
print(f"Files selected for {speaker}:") |
for idx in indices: |
if idx < len(wav_files): |
file = wav_files[idx] |
self.data.append(os.path.join(neg_dir, file)) |
self.labels.append(0) |
print(f"Negative sample added: {os.path.join(neg_dir, file)}") |
print(f"\nFinal Dataset Statistics:") |
print(f"Total Positive Samples (Class 1): {self.labels.count(1)}") |
print(f"Total Negative Samples (Class 0): {self.labels.count(0)}") |
def __len__(self): |
return len(self.data) |
def __getitem__(self, idx): |
audio_path = self.data[idx] |
features = extract_mfcc_and_pitch(audio_path) |
label = self.labels[idx] |
return torch.FloatTensor(features), torch.LongTensor([label]) |
class EarlyStopping: |
def __init__(self, patience=5, delta=0): |
""" |
Early stopping class to stop training when validation loss stops improving. |
:param patience: Number of epochs with no improvement after which training will be stopped. |
:param delta: Minimum change in the validation loss to qualify as an improvement. |
""" |
self.patience = patience |
self.delta = delta |
self.best_loss = None |
self.counter = 0 |
self.stop_training = False |
def __call__(self, val_loss): |
if self.best_loss is None: |
self.best_loss = val_loss |
elif val_loss < self.best_loss - self.delta: |
self.best_loss = val_loss |
self.counter = 0 |
else: |
self.counter += 1 |
if self.counter >= self.patience: |
self.stop_training = True |
return self.stop_training |
def train_with_kfold(dataset, model_class, num_folds=0, num_epochs=0, batch_size=0): |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
kfold = KFold(n_splits=num_folds, shuffle=True, random_state=42) |
fold_results = { |
'train_losses': [], |
'val_losses': [], |
'val_accuracies': [], |
'eers': [] |
} |
best_model = None |
best_accuracy = 0.0 |
for fold, (train_idx, val_idx) in enumerate(kfold.split(dataset), 1): |
model = model_class() |
model = model.to(device) |
train_subdata = Subset(dataset, train_idx) |
val_subdata = Subset(dataset, val_idx) |
train_loader = DataLoader(train_subdata, batch_size=batch_size, shuffle=True) |
val_loader = DataLoader(val_subdata, batch_size=batch_size) |
criterion = nn.CrossEntropyLoss() |
optimizer = torch.optim.Adam(model.parameters()) |
early_stopping = EarlyStopping(patience=5, delta=0) |
should_stop_training = False |
print(f"\nFold {fold}") |
best_fold_accuracy = 0.0 |
for epoch in range(num_epochs): |
model.train() |
train_loss = 0 |
for batch_idx, (data, target) in enumerate(train_loader): |
data, target = data.to(device), target.to(device) |
optimizer.zero_grad() |
output = model(data) |
loss = criterion(output, target.squeeze()) |
loss.backward() |
optimizer.step() |
train_loss += loss.item() |
avg_train_loss = train_loss/len(train_loader) |
model.eval() |
val_loss = 0 |
correct = 0 |
with torch.no_grad(): |
for data, target in val_loader: |
data, target = data.to(device), target.to(device) |
output = model(data) |
val_loss += criterion(output, target.squeeze()).item() |
pred = output.argmax(dim=1, keepdim=True) |
correct += pred.eq(target.view_as(pred)).sum().item() |
avg_val_loss = val_loss/len(val_loader) |
val_accuracy = correct/len(val_subdata) |
print(f'Epoch: {epoch+1}') |
print(f'Training Loss: {avg_train_loss:.4f}') |
print(f'Validation Loss: {avg_val_loss:.4f}') |
print(f'Validation Accuracy: {val_accuracy:.4f}') |
if avg_train_loss <= 0.001: |
print(f"Training loss is 0 at epoch {epoch+1}. Stopping training for this fold.") |
should_stop_training = True |
if early_stopping(avg_val_loss): |
print(f"Early stopping triggered at epoch {epoch+1}") |
should_stop_training = True |
if should_stop_training: |
break |
eer, threshold = evaluate_model(model, val_loader, device) |
print(f'EER: {eer:.4f} at threshold: {threshold:.4f}') |
fold_results['eers'].append(eer) |
if val_accuracy > best_accuracy: |
best_accuracy = val_accuracy |
best_model = model.state_dict() |
if val_accuracy > best_fold_accuracy: |
best_fold_accuracy = val_accuracy |
torch.save(model.state_dict(), f'output/best_model_fold_{fold}.pth') |
fold_results['train_losses'].append(train_loss/len(train_loader)) |
fold_results['val_losses'].append(val_loss/len(val_loader)) |
fold_results['val_accuracies'].append(val_accuracy) |
print("\nK-Fold Cross-Validation Summary:") |
print(f"Average Validation Accuracy: {np.mean(fold_results['val_accuracies']):.4f} ± {np.std(fold_results['val_accuracies']):.4f}") |
print(f"Average Validation Loss: {np.mean(fold_results['val_losses']):.4f} ± {np.std(fold_results['val_losses']):.4f}") |
print(f"Average EER: {np.mean(fold_results['eers']):.4f} ± {np.std(fold_results['eers']):.4f}") |
if best_model is not None: |
torch.save(best_model, 'output/best_overall_model.pth') |
print(f"\nBest overall model saved with accuracy: {best_accuracy:.4f}") |
return fold_results |
def save_training_results(results, output_dir='output10'): |
""" |
Simpan grafik hasil pelatihan ke dalam file. |
Args: |
results: Dictionary yang berisi metrik pelatihan. |
output_dir: Direktori tempat menyimpan grafik. |
""" |
import os |
if not os.path.exists(output_dir): |
os.makedirs(output_dir) |
plt.figure(figsize=(12, 5)) |
plt.subplot(1, 2, 1) |
plt.plot(results['train_losses'], label='Training Loss') |
plt.plot(results['val_losses'], label='Validation Loss') |
plt.title('Training and Validation Loss') |
plt.xlabel('Epoch') |
plt.ylabel('Loss') |
plt.legend() |
plt.subplot(1, 2, 2) |
plt.plot(results['val_accuracies'], label='Validation Accuracy') |
plt.title('Validation Accuracy') |
plt.xlabel('Epoch') |
plt.ylabel('Accuracy') |
plt.legend() |
plt.tight_layout() |
plt.savefig(os.path.join(output_dir, 'training_validation_metrics.png')) |
plt.close() |
plt.figure(figsize=(6, 5)) |
plt.plot(results['eers'], label='EER') |
plt.title('Equal Error Rate (EER)') |
plt.xlabel('Fold') |
plt.ylabel('EER') |
plt.legend() |
plt.savefig(os.path.join(output_dir, 'eer_metrics.png')) |
plt.close() |
def main(): |
batch_size = 16 |
num_epochs = 30 |
num_folds = 5 |
dataset = SpeakerDataset( |
data_dir='/path/to/dataset', |
target_speaker='target speaker', |
) |
if not os.path.exists('output10'): |
os.makedirs('output10') |
results = train_with_kfold( |
dataset, |
model_class=XVectorNet, |
num_folds=num_folds, |
num_epochs=num_epochs, |
batch_size=batch_size |
) |
save_training_results(results, output_dir='output') |
if __name__ == "__main__": |
main() |