|
|
|
from fastapi import FastAPI |
|
|
|
app = FastAPI() |
|
from fastapi.middleware.cors import CORSMiddleware |
|
origins = [ |
|
"*" |
|
] |
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=origins, |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
@app.get("/") |
|
def greet_json(): |
|
return {"Hello": "World!"} |
|
|
|
|
|
|
|
|
|
import os |
|
import gdown |
|
|
|
file_id = "1zhisRgRi2qBFX73VFhzh-Ho93MORQqVa" |
|
output_dir = "./downloads" |
|
output_file = "file.h5" |
|
|
|
if not os.path.exists(output_dir): |
|
os.makedirs(output_dir) |
|
|
|
output_path = os.path.join(output_dir, output_file) |
|
|
|
url = f"https://drive.google.com/uc?id={file_id}" |
|
|
|
try: |
|
gdown.download(url, output_path, quiet=False) |
|
print(f"File downloaded successfully to: {output_path}") |
|
except Exception as e: |
|
print(f"Error downloading file: {e}") |
|
|
|
output_file = "file.h5" |
|
file_path = os.path.join(output_dir, output_file) |
|
|
|
|
|
|
|
|
|
file_id = "1wIaycDFGTF3e0PpAHKk-GLnxk4cMehOU" |
|
output_dir = "./downloads" |
|
output_file = "file2.h5" |
|
|
|
if not os.path.exists(output_dir): |
|
os.makedirs(output_dir) |
|
|
|
output_path = os.path.join(output_dir, output_file) |
|
|
|
url = f"https://drive.google.com/uc?id={file_id}" |
|
|
|
try: |
|
gdown.download(url, output_path, quiet=False) |
|
print(f"File downloaded successfully to: {output_path}") |
|
except Exception as e: |
|
print(f"Error downloading file: {e}") |
|
|
|
output_file = "file2.h5" |
|
file_path = os.path.join(output_dir, output_file) |
|
|
|
|
|
if os.path.exists(file_path): |
|
print(f"The file '{output_file}' exists at '{file_path}'.") |
|
else: |
|
print(f"The file '{output_file}' does not exist at '{file_path}'.") |
|
|
|
|
|
import os |
|
import numpy as np |
|
import tensorflow as tf |
|
import tensorflow |
|
import librosa |
|
import matplotlib.pyplot as plt |
|
|
|
|
|
import os |
|
os.environ["TORCH_HOME"] = "/tmp/torch_cache" |
|
os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0" |
|
os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib_config" |
|
os.environ["FONTCONFIG_PATH"] = "/tmp/fontconfig" |
|
os.environ["HF_HOME"] = "/tmp/huggingface_cache" |
|
|
|
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input |
|
from tensorflow.keras.models import Model |
|
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout |
|
from tensorflow.keras.optimizers import Adam |
|
from transformers import pipeline |
|
|
|
class UnifiedDeepfakeDetector: |
|
def __init__(self): |
|
self.input_shape = (224, 224, 3) |
|
self.vgg_model = self.build_vgg16_model() |
|
self.dense_model = tf.keras.models.load_model('downloads/file2.h5') |
|
self.cnn_model = tf.keras.models.load_model('downloads/file.h5') |
|
self.melody_machine = pipeline(model="MelodyMachine/Deepfake-audio-detection-V2") |
|
|
|
def build_vgg16_model(self): |
|
base_model = VGG16(weights='imagenet', include_top=False, input_shape=self.input_shape) |
|
for layer in base_model.layers: |
|
layer.trainable = False |
|
|
|
x = base_model.output |
|
x = GlobalAveragePooling2D()(x) |
|
x = Dense(512, activation='relu')(x) |
|
x = Dropout(0.5)(x) |
|
x = Dense(256, activation='relu')(x) |
|
x = Dropout(0.3)(x) |
|
output = Dense(1, activation='sigmoid')(x) |
|
|
|
model = Model(inputs=base_model.input, outputs=output) |
|
model.compile(optimizer=Adam(learning_rate=0.0001), |
|
loss='binary_crossentropy', |
|
metrics=['accuracy']) |
|
return model |
|
|
|
def audio_to_spectrogram(self, file_path, plot=False): |
|
try: |
|
audio, sr = librosa.load(file_path, duration=5.0, sr=22050) |
|
spectrogram = librosa.feature.melspectrogram(y=audio, sr=sr, n_mels=224, fmax=8000) |
|
spectrogram_db = librosa.power_to_db(spectrogram, ref=np.max) |
|
|
|
if plot: |
|
plt.figure(figsize=(12, 6)) |
|
librosa.display.specshow(spectrogram_db, y_axis='mel', x_axis='time', cmap='viridis') |
|
plt.colorbar(format='%+2.0f dB') |
|
plt.title('Mel Spectrogram Analysis') |
|
plot_path = 'spectrogram_plot.png' |
|
plt.savefig(plot_path, dpi=300, bbox_inches='tight') |
|
plt.close() |
|
return plot_path |
|
|
|
spectrogram_norm = (spectrogram_db - spectrogram_db.min()) / (spectrogram_db.max() - spectrogram_db.min()) |
|
spectrogram_rgb = np.stack([spectrogram_norm]*3, axis=-1) |
|
spectrogram_resized = tf.image.resize(spectrogram_rgb, (224, 224)) |
|
return preprocess_input(spectrogram_resized * 255) |
|
|
|
except Exception as e: |
|
print(f"Spectrogram error: {e}") |
|
return None |
|
|
|
def analyze_audio_rf(self, audio_path, model_choice="all"): |
|
results = {} |
|
plots = {} |
|
r = [] |
|
audio_features = {} |
|
|
|
try: |
|
|
|
audio, sr = librosa.load(audio_path, res_type="kaiser_fast") |
|
audio_features = { |
|
"sample_rate": sr, |
|
"duration": librosa.get_duration(y=audio, sr=sr), |
|
"rms_energy": float(np.mean(librosa.feature.rms(y=audio))), |
|
"zero_crossing_rate": float(np.mean(librosa.feature.zero_crossing_rate(y=audio))) |
|
} |
|
|
|
|
|
if model_choice in ["VGG16", "all"]: |
|
spec = self.audio_to_spectrogram(audio_path) |
|
if spec is not None: |
|
pred = self.vgg_model.predict(np.expand_dims(spec, axis=0))[0][0] |
|
results["VGG16"] = { |
|
"prediction": "FAKE" if pred > 0.5 else "REAL", |
|
"confidence": float(pred if pred > 0.5 else 1 - pred), |
|
"raw_score": float(pred) |
|
} |
|
plots["spectrogram"] = self.audio_to_spectrogram(audio_path, plot=True) |
|
r.append("FAKE" if pred > 0.5 else "REAL") |
|
|
|
|
|
if model_choice in ["Dense", "all"]: |
|
mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=40) |
|
mfcc_scaled = np.mean(mfcc.T, axis=0).reshape(1, -1) |
|
pred = self.dense_model.predict(mfcc_scaled) |
|
results["Dense"] = { |
|
"prediction": "FAKE" if np.argmax(pred[0]) == 0 else "REAL", |
|
"confidence": float(np.max(pred[0])), |
|
"raw_scores": pred[0].tolist() |
|
} |
|
r.append("FAKE" if np.argmax(pred[0]) == 0 else "REAL") |
|
|
|
|
|
if model_choice in ["CNN", "all"]: |
|
mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=40) |
|
mfcc_scaled = np.mean(mfcc.T, axis=0).reshape(None, 40, 1, 1) |
|
pred = self.cnn_model.predict(mfcc_scaled) |
|
results["CNN"] = { |
|
"prediction": "FAKE" if np.argmax(pred[0]) == 0 else "REAL", |
|
"confidence": float(np.max(pred[0])), |
|
"raw_scores": pred[0].tolist() |
|
} |
|
r.append("FAKE" if np.argmax(pred[0]) == 0 else "REAL") |
|
|
|
|
|
if model_choice in ["MelodyMachine", "all"]: |
|
result = self.melody_machine(audio_path) |
|
best_pred = max(result, key=lambda x: x['score']) |
|
results["MelodyMachine"] = { |
|
"prediction": best_pred['label'].upper(), |
|
"confidence": float(best_pred['score']), |
|
"all_predictions": result |
|
} |
|
r.append(best_pred['label'].upper()) |
|
|
|
return r |
|
|
|
except Exception as e: |
|
print(f"Analysis error: {e}") |
|
return None, None, None |
|
|
|
|
|
|
|
import torchaudio |
|
import torch |
|
import numpy as np |
|
from scipy.stats import skew, kurtosis, median_abs_deviation |
|
import os |
|
import torch.nn.functional as F |
|
|
|
|
|
import os |
|
os.environ["TORCH_HOME"] = "/tmp/torch_cache" |
|
|
|
|
|
|
|
from torchaudio.pipelines import WAV2VEC2_BASE |
|
bundle = WAV2VEC2_BASE |
|
|
|
model = bundle.get_model() |
|
print("Model downloaded successfully!") |
|
|
|
|
|
def extract_features(file_path): |
|
if os.path.exists(file_path): |
|
print(f"File successfully written: {file_path}") |
|
else: |
|
print("File writing failed.") |
|
waveform, sample_rate = torchaudio.load(file_path) |
|
if sample_rate != bundle.sample_rate: |
|
waveform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=bundle.sample_rate)(waveform) |
|
|
|
with torch.inference_mode(): |
|
features, _ = model.extract_features(waveform) |
|
|
|
pooled_features = [] |
|
for f in features: |
|
if f.dim() == 3: |
|
f = f.permute(0, 2, 1) |
|
pooled_f = F.adaptive_avg_pool1d(f[0].unsqueeze(0), 1).squeeze(0) |
|
pooled_features.append(pooled_f) |
|
|
|
final_features = torch.cat(pooled_features, dim=0).numpy() |
|
final_features = (final_features - np.mean(final_features)) / (np.std(final_features) + 1e-10) |
|
|
|
return final_features |
|
|
|
def additional_features(features): |
|
mad = median_abs_deviation(features) |
|
features_clipped = np.clip(features, 1e-10, None) |
|
entropy = -np.sum(features_clipped * np.log(features_clipped)) |
|
return mad, entropy |
|
|
|
def classify_audio(features): |
|
|
|
_, entropy = additional_features(features) |
|
print(entropy) |
|
|
|
if entropy > 150: |
|
return True, entropy |
|
else: |
|
return False, entropy |
|
|
|
|
|
from fastapi import FastAPI, File, UploadFile, Form |
|
from fastapi.responses import JSONResponse |
|
import torch |
|
from scipy.stats import skew, kurtosis, median_abs_deviation |
|
import shutil |
|
import subprocess |
|
import os |
|
import librosa |
|
|
|
|
|
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" |
|
os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" |
|
os.environ["FONTCONFIG_PATH"] = "/tmp/fontconfig" |
|
os.environ["TF_ENABLE_ONEDNN_OPTS"]="0" |
|
os.environ["HF_HOME"] = "/tmp/huggingface_cache" |
|
|
|
os.makedirs("/tmp/matplotlib", exist_ok=True) |
|
os.makedirs("/tmp/fontconfig", exist_ok=True) |
|
os.makedirs("/tmp/huggingface_cache", exist_ok=True) |
|
|
|
SAVE_DIR = './audio' |
|
os.makedirs(SAVE_DIR, exist_ok=True) |
|
|
|
os.system('apt-get update && apt-get install -y ffmpeg') |
|
|
|
|
|
def reencode_audio(input_path, output_path): |
|
command = [ |
|
'/usr/bin/ffmpeg', '-i', input_path, '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', output_path |
|
] |
|
subprocess.run(command, check=True) |
|
|
|
|
|
from collections import Counter |
|
from datetime import datetime |
|
import base64 |
|
|
|
@app.post("/upload") |
|
async def upload_file(file: UploadFile = File(...)): |
|
print(f"Received file: {file.filename}") |
|
|
|
original_filename = file.filename.rsplit('.', 1)[0] |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
wav_filename = os.path.join(SAVE_DIR, f"{timestamp}.wav") |
|
reencoded_filename = os.path.join(SAVE_DIR, f"{timestamp}_reencoded.wav") |
|
|
|
|
|
with open(wav_filename, "wb") as buffer: |
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
reencode_audio(wav_filename, reencoded_filename) |
|
os.remove(wav_filename) |
|
print(f"File successfully re-encoded as: {reencoded_filename}") |
|
|
|
try: |
|
audio, sr = librosa.load(reencoded_filename, sr=None) |
|
print("Loaded successfully with librosa") |
|
except Exception as e: |
|
print(f"Error loading re-encoded file: {e}") |
|
new_features = extract_features(reencoded_filename) |
|
prediction, entropy = classify_audio(new_features) |
|
with open(reencoded_filename, "rb") as audio_file: |
|
audio_data = audio_file.read() |
|
|
|
|
|
os.remove(reencoded_filename) |
|
return JSONResponse(content={ |
|
"prediction": bool(prediction), |
|
"entropy": float(entropy), |
|
}) |
|
|
|
|
|
@app.post("/upload_audio") |
|
async def upload_file(file: UploadFile = File(...)): |
|
print(f"Received file: {file.filename}") |
|
|
|
original_filename = file.filename.rsplit('.', 1)[0] |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
wav_filename = os.path.join(SAVE_DIR, f"{timestamp}.wav") |
|
reencoded_filename = os.path.join(SAVE_DIR, f"{timestamp}_reencoded.wav") |
|
|
|
|
|
with open(wav_filename, "wb") as buffer: |
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
reencode_audio(wav_filename, reencoded_filename) |
|
|
|
os.remove(wav_filename) |
|
print(f"File successfully re-encoded as: {reencoded_filename}") |
|
|
|
try: |
|
audio, sr = librosa.load(reencoded_filename, sr=None) |
|
print("Loaded successfully with librosa") |
|
except Exception as e: |
|
print(f"Error loading re-encoded file: {e}") |
|
new_features = extract_features(reencoded_filename) |
|
detector = UnifiedDeepfakeDetector() |
|
print(reencoded_filename) |
|
result = detector.analyze_audio_rf(reencoded_filename, model_choice="all") |
|
prediction, entropy = classify_audio(new_features) |
|
with open(reencoded_filename, "rb") as audio_file: |
|
audio_data = audio_file.read() |
|
result = list(result) |
|
result.append("FAKE" if float(entropy) < 150 else "REAL") |
|
print(result) |
|
r_normalized = [x.upper() for x in result if x is not None] |
|
counter = Counter(r_normalized) |
|
|
|
most_common_element, _ = counter.most_common(1)[0] |
|
|
|
print(f"The most frequent element is: {most_common_element}") |
|
|
|
|
|
audio_base64 = base64.b64encode(audio_data).decode('utf-8') |
|
print(f"Audio Data Length: {len(audio_data)}") |
|
|
|
os.remove(reencoded_filename) |
|
return JSONResponse(content={ |
|
"filename": file.filename, |
|
"prediction": most_common_element.upper(), |
|
"entropy": float(entropy), |
|
"audio": audio_base64, |
|
"content_type": "audio/wav" |
|
}) |
|
|
|
|