|
import numpy as np |
|
import librosa |
|
import os |
|
import re |
|
import random |
|
from sklearn.preprocessing import StandardScaler |
|
from sklearn.model_selection import train_test_split, KFold |
|
from sklearn.metrics import roc_curve |
|
import matplotlib.pyplot as plt |
|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
from torch.utils.data import Dataset, DataLoader, Subset |
|
|
|
def extract_mfcc_and_pitch(audio_path, sr=16000, n_mfcc=40): |
|
""" |
|
Ekstrak fitur MFCC dan pitch dari file audio |
|
""" |
|
|
|
audio, sr = librosa.load(audio_path, sr=sr) |
|
|
|
|
|
mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=n_mfcc) |
|
|
|
|
|
mfcc = (mfcc - np.mean(mfcc)) / np.std(mfcc) |
|
|
|
|
|
pitch = librosa.yin(audio, fmin=librosa.note_to_hz('C2'), fmax=librosa.note_to_hz('C6')) |
|
pitch = np.nan_to_num(pitch, nan=np.nanmean(pitch)) |
|
|
|
|
|
pitch = (pitch - np.mean(pitch)) / np.std(pitch) |
|
|
|
|
|
pitch = pitch.reshape(1, -1) |
|
|
|
|
|
combined_features = np.vstack([mfcc, pitch]) |
|
|
|
return combined_features |
|
|
|
|
|
class XVectorNet(nn.Module): |
|
def __init__(self, input_dim=41, dropout_rate=0.45): |
|
super(XVectorNet, self).__init__() |
|
|
|
|
|
self.layer1 = nn.Conv1d(input_dim, 512, 5, padding=2) |
|
self.dropout1 = nn.Dropout(dropout_rate) |
|
self.layer2 = nn.Conv1d(512, 512, 3, padding=1) |
|
self.dropout2 = nn.Dropout(dropout_rate) |
|
self.layer3 = nn.Conv1d(512, 512, 3, padding=1) |
|
self.dropout3 = nn.Dropout(dropout_rate) |
|
self.layer4 = nn.Conv1d(512, 512, 1) |
|
self.dropout4 = nn.Dropout(dropout_rate) |
|
self.layer5 = nn.Conv1d(512, 1500, 1) |
|
|
|
|
|
self.stats_pooling = StatsPooling() |
|
|
|
|
|
self.layer6 = nn.Linear(3000, 512) |
|
self.dropout6 = nn.Dropout(dropout_rate) |
|
self.layer7 = nn.Linear(512, 512) |
|
self.dropout7 = nn.Dropout(dropout_rate) |
|
self.output = nn.Linear(512, 2) |
|
|
|
def forward(self, x): |
|
x = F.relu(self.layer1(x)) |
|
x = self.dropout1(x) |
|
x = F.relu(self.layer2(x)) |
|
x = self.dropout2(x) |
|
x = F.relu(self.layer3(x)) |
|
x = self.dropout3(x) |
|
x = F.relu(self.layer4(x)) |
|
x = self.dropout4(x) |
|
x = F.relu(self.layer5(x)) |
|
|
|
x = self.stats_pooling(x) |
|
|
|
x = F.relu(self.layer6(x)) |
|
x = self.dropout6(x) |
|
x = F.relu(self.layer7(x)) |
|
x = self.dropout7(x) |
|
x = self.output(x) |
|
|
|
return x |
|
|
|
class StatsPooling(nn.Module): |
|
def forward(self, x): |
|
mean = torch.mean(x, dim=2) |
|
std = torch.std(x, dim=2) |
|
return torch.cat((mean, std), dim=1) |
|
|
|
def compute_eer(y_true, y_scores): |
|
""" |
|
Menghitung Equal Error Rate (EER) dari predicted scores |
|
|
|
Args: |
|
y_true: Label yang benar (ground truth) |
|
y_scores: Probability scores dari model (untuk kelas positif) |
|
|
|
Returns: |
|
eer: Equal Error Rate |
|
threshold: Threshold optimal di titik EER |
|
""" |
|
fpr, tpr, thresholds = roc_curve(y_true, y_scores) |
|
fnr = 1 - tpr |
|
|
|
|
|
eer_threshold = thresholds[np.nanargmin(np.absolute((fnr - fpr)))] |
|
eer = fpr[np.nanargmin(np.absolute((fnr - fpr)))] |
|
|
|
return eer, eer_threshold |
|
|
|
def evaluate_model(model, data_loader, device): |
|
""" |
|
Evaluasi model dan hitung EER |
|
""" |
|
model.eval() |
|
all_scores = [] |
|
all_labels = [] |
|
|
|
with torch.no_grad(): |
|
for data, target in data_loader: |
|
data, target = data.to(device), target.to(device) |
|
output = model(data) |
|
scores = F.softmax(output, dim=1)[:, 1] |
|
|
|
all_scores.extend(scores.cpu().numpy()) |
|
all_labels.extend(target.cpu().numpy()) |
|
|
|
all_scores = np.array(all_scores) |
|
all_labels = np.array(all_labels) |
|
|
|
eer, threshold = compute_eer(all_labels, all_scores) |
|
return eer, threshold |
|
|
|
def extract_number(file_name): |
|
"""Extract number from filename for proper sorting""" |
|
match = re.search(r'segment_(\d+)', file_name) |
|
if match: |
|
return int(match.group(1)) |
|
return -1 |
|
|
|
def get_sorted_files(directory): |
|
"""Get alphabetically sorted files from directory""" |
|
files = [f for f in os.listdir(directory) if f.endswith('.wav')] |
|
return sorted(files, key=extract_number) |
|
|
|
|
|
class SpeakerDataset(Dataset): |
|
def __init__(self, data_dir, target_speaker): |
|
self.data = [] |
|
self.labels = [] |
|
|
|
|
|
pos_dir = os.path.join(data_dir, target_speaker) |
|
pos_files = get_sorted_files(pos_dir) |
|
for file in pos_files: |
|
self.data.append(os.path.join(pos_dir, file)) |
|
self.labels.append(1) |
|
print(f"Target Speaker Directory: {pos_dir}") |
|
print(f"Total Positive Samples (Class 1): {self.labels.count(1)}") |
|
|
|
|
|
speakers = sorted([s for s in os.listdir(data_dir) if s != target_speaker]) |
|
print(f"Other speakers: {speakers}") |
|
|
|
samples_per_negative = self.labels.count(1) // len(speakers) |
|
print(f"Samples per negative speaker: {samples_per_negative}") |
|
|
|
def generate_speaker_indices(wav_files, num_speakers): |
|
""" |
|
Generate indices for each speaker's negative samples, maintaining alphabetical order |
|
""" |
|
total_files = len(wav_files) |
|
files_per_speaker = total_files // (num_speakers - 1) |
|
|
|
speaker_indices = [] |
|
start_idx = 0 |
|
|
|
for i in range(num_speakers - 1): |
|
if i < num_speakers - 2: |
|
end_idx = start_idx + files_per_speaker |
|
indices = list(range(start_idx, end_idx)) |
|
else: |
|
indices = list(range(start_idx, total_files)) |
|
speaker_indices.append(indices) |
|
start_idx = end_idx |
|
|
|
return speaker_indices |
|
|
|
|
|
for speaker_idx, speaker in enumerate(speakers): |
|
neg_dir = os.path.join(data_dir, speaker) |
|
wav_files = get_sorted_files(neg_dir) |
|
|
|
|
|
speaker_indices = generate_speaker_indices(wav_files, len(speakers) + 1) |
|
indices = speaker_indices[speaker_idx] |
|
|
|
|
|
if samples_per_negative > 0: |
|
indices = indices[:samples_per_negative] |
|
|
|
print(f"Speaker: {speaker}, using indices: {indices}") |
|
print(f"Files selected for {speaker}:") |
|
|
|
|
|
for idx in indices: |
|
if idx < len(wav_files): |
|
file = wav_files[idx] |
|
self.data.append(os.path.join(neg_dir, file)) |
|
self.labels.append(0) |
|
print(f"Negative sample added: {os.path.join(neg_dir, file)}") |
|
|
|
|
|
print(f"\nFinal Dataset Statistics:") |
|
print(f"Total Positive Samples (Class 1): {self.labels.count(1)}") |
|
print(f"Total Negative Samples (Class 0): {self.labels.count(0)}") |
|
|
|
def __len__(self): |
|
return len(self.data) |
|
|
|
def __getitem__(self, idx): |
|
audio_path = self.data[idx] |
|
features = extract_mfcc_and_pitch(audio_path) |
|
label = self.labels[idx] |
|
return torch.FloatTensor(features), torch.LongTensor([label]) |
|
|
|
class EarlyStopping: |
|
def __init__(self, patience=5, delta=0): |
|
""" |
|
Early stopping class to stop training when validation loss stops improving. |
|
|
|
:param patience: Number of epochs with no improvement after which training will be stopped. |
|
:param delta: Minimum change in the validation loss to qualify as an improvement. |
|
""" |
|
self.patience = patience |
|
self.delta = delta |
|
self.best_loss = None |
|
self.counter = 0 |
|
self.stop_training = False |
|
|
|
def __call__(self, val_loss): |
|
if self.best_loss is None: |
|
self.best_loss = val_loss |
|
elif val_loss < self.best_loss - self.delta: |
|
self.best_loss = val_loss |
|
self.counter = 0 |
|
else: |
|
self.counter += 1 |
|
if self.counter >= self.patience: |
|
self.stop_training = True |
|
return self.stop_training |
|
|
|
|
|
def train_with_kfold(dataset, model_class, num_folds=0, num_epochs=0, batch_size=0): |
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
kfold = KFold(n_splits=num_folds, shuffle=True, random_state=42) |
|
|
|
fold_results = { |
|
'train_losses': [], |
|
'val_losses': [], |
|
'val_accuracies': [], |
|
'eers': [] |
|
} |
|
|
|
|
|
best_model = None |
|
best_accuracy = 0.0 |
|
|
|
for fold, (train_idx, val_idx) in enumerate(kfold.split(dataset), 1): |
|
|
|
model = model_class() |
|
model = model.to(device) |
|
|
|
train_subdata = Subset(dataset, train_idx) |
|
val_subdata = Subset(dataset, val_idx) |
|
|
|
train_loader = DataLoader(train_subdata, batch_size=batch_size, shuffle=True) |
|
val_loader = DataLoader(val_subdata, batch_size=batch_size) |
|
|
|
criterion = nn.CrossEntropyLoss() |
|
optimizer = torch.optim.Adam(model.parameters()) |
|
|
|
early_stopping = EarlyStopping(patience=5, delta=0) |
|
should_stop_training = False |
|
|
|
|
|
print(f"\nFold {fold}") |
|
best_fold_accuracy = 0.0 |
|
|
|
for epoch in range(num_epochs): |
|
model.train() |
|
train_loss = 0 |
|
for batch_idx, (data, target) in enumerate(train_loader): |
|
data, target = data.to(device), target.to(device) |
|
|
|
optimizer.zero_grad() |
|
output = model(data) |
|
loss = criterion(output, target.squeeze()) |
|
|
|
loss.backward() |
|
optimizer.step() |
|
|
|
train_loss += loss.item() |
|
|
|
avg_train_loss = train_loss/len(train_loader) |
|
|
|
|
|
model.eval() |
|
val_loss = 0 |
|
correct = 0 |
|
with torch.no_grad(): |
|
for data, target in val_loader: |
|
data, target = data.to(device), target.to(device) |
|
output = model(data) |
|
val_loss += criterion(output, target.squeeze()).item() |
|
pred = output.argmax(dim=1, keepdim=True) |
|
correct += pred.eq(target.view_as(pred)).sum().item() |
|
|
|
avg_val_loss = val_loss/len(val_loader) |
|
val_accuracy = correct/len(val_subdata) |
|
|
|
print(f'Epoch: {epoch+1}') |
|
print(f'Training Loss: {avg_train_loss:.4f}') |
|
print(f'Validation Loss: {avg_val_loss:.4f}') |
|
print(f'Validation Accuracy: {val_accuracy:.4f}') |
|
|
|
|
|
if avg_train_loss <= 0.001: |
|
print(f"Training loss is 0 at epoch {epoch+1}. Stopping training for this fold.") |
|
should_stop_training = True |
|
|
|
|
|
if early_stopping(avg_val_loss): |
|
print(f"Early stopping triggered at epoch {epoch+1}") |
|
should_stop_training = True |
|
|
|
if should_stop_training: |
|
break |
|
|
|
|
|
eer, threshold = evaluate_model(model, val_loader, device) |
|
print(f'EER: {eer:.4f} at threshold: {threshold:.4f}') |
|
|
|
|
|
fold_results['eers'].append(eer) |
|
|
|
|
|
if val_accuracy > best_accuracy: |
|
best_accuracy = val_accuracy |
|
best_model = model.state_dict() |
|
|
|
|
|
if val_accuracy > best_fold_accuracy: |
|
best_fold_accuracy = val_accuracy |
|
|
|
torch.save(model.state_dict(), f'output/best_model_fold_{fold}.pth') |
|
|
|
fold_results['train_losses'].append(train_loss/len(train_loader)) |
|
fold_results['val_losses'].append(val_loss/len(val_loader)) |
|
fold_results['val_accuracies'].append(val_accuracy) |
|
|
|
|
|
print("\nK-Fold Cross-Validation Summary:") |
|
print(f"Average Validation Accuracy: {np.mean(fold_results['val_accuracies']):.4f} ± {np.std(fold_results['val_accuracies']):.4f}") |
|
print(f"Average Validation Loss: {np.mean(fold_results['val_losses']):.4f} ± {np.std(fold_results['val_losses']):.4f}") |
|
print(f"Average EER: {np.mean(fold_results['eers']):.4f} ± {np.std(fold_results['eers']):.4f}") |
|
|
|
|
|
if best_model is not None: |
|
torch.save(best_model, 'output/best_overall_model.pth') |
|
print(f"\nBest overall model saved with accuracy: {best_accuracy:.4f}") |
|
|
|
return fold_results |
|
|
|
def save_training_results(results, output_dir='output10'): |
|
""" |
|
Simpan grafik hasil pelatihan ke dalam file. |
|
|
|
Args: |
|
results: Dictionary yang berisi metrik pelatihan. |
|
output_dir: Direktori tempat menyimpan grafik. |
|
""" |
|
import os |
|
if not os.path.exists(output_dir): |
|
os.makedirs(output_dir) |
|
|
|
|
|
plt.figure(figsize=(12, 5)) |
|
|
|
|
|
plt.subplot(1, 2, 1) |
|
plt.plot(results['train_losses'], label='Training Loss') |
|
plt.plot(results['val_losses'], label='Validation Loss') |
|
plt.title('Training and Validation Loss') |
|
plt.xlabel('Epoch') |
|
plt.ylabel('Loss') |
|
plt.legend() |
|
|
|
|
|
plt.subplot(1, 2, 2) |
|
plt.plot(results['val_accuracies'], label='Validation Accuracy') |
|
plt.title('Validation Accuracy') |
|
plt.xlabel('Epoch') |
|
plt.ylabel('Accuracy') |
|
plt.legend() |
|
|
|
plt.tight_layout() |
|
plt.savefig(os.path.join(output_dir, 'training_validation_metrics.png')) |
|
plt.close() |
|
|
|
|
|
plt.figure(figsize=(6, 5)) |
|
plt.plot(results['eers'], label='EER') |
|
plt.title('Equal Error Rate (EER)') |
|
plt.xlabel('Fold') |
|
plt.ylabel('EER') |
|
plt.legend() |
|
plt.savefig(os.path.join(output_dir, 'eer_metrics.png')) |
|
plt.close() |
|
|
|
|
|
def main(): |
|
|
|
batch_size = 16 |
|
num_epochs = 30 |
|
|
|
num_folds = 5 |
|
|
|
|
|
dataset = SpeakerDataset( |
|
data_dir='/path/to/dataset', |
|
target_speaker='target speaker', |
|
) |
|
|
|
if not os.path.exists('output10'): |
|
os.makedirs('output10') |
|
|
|
|
|
results = train_with_kfold( |
|
dataset, |
|
model_class=XVectorNet, |
|
num_folds=num_folds, |
|
num_epochs=num_epochs, |
|
batch_size=batch_size |
|
) |
|
|
|
|
|
save_training_results(results, output_dir='output') |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|