|
from fastapi import FastAPI, File, UploadFile, HTTPException
|
|
from fastapi.responses import JSONResponse
|
|
import pickle
|
|
import numpy as np
|
|
import face_recognition
|
|
from PIL import Image
|
|
import io
|
|
from mtcnn import MTCNN
|
|
import cv2
|
|
import faiss
|
|
import os
|
|
import imgaug.augmenters as iaa
|
|
from deepface import DeepFace
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
def load_encodings(file_path):
|
|
if not os.path.exists(file_path):
|
|
return np.array([]), []
|
|
with open(file_path, "rb") as file:
|
|
data = pickle.load(file)
|
|
return np.array(data["encodings"]), data["labels"]
|
|
|
|
|
|
def save_encodings(encodings, labels, file_path):
|
|
data = {"encodings": encodings, "labels": labels}
|
|
with open(file_path, "wb") as file:
|
|
pickle.dump(data, file)
|
|
|
|
|
|
def detect_and_align_face(image):
|
|
detector = MTCNN()
|
|
image_rgb = cv2.cvtColor(np.array(image), cv2.COLOR_BGR2RGB)
|
|
detections = detector.detect_faces(image_rgb)
|
|
|
|
if len(detections) == 0:
|
|
raise ValueError("No face detected in the image.")
|
|
|
|
detection = detections[0]
|
|
x, y, width, height = detection['box']
|
|
keypoints = detection['keypoints']
|
|
face = image_rgb[y:y + height, x:x + width]
|
|
|
|
|
|
left_eye = keypoints['left_eye']
|
|
right_eye = keypoints['right_eye']
|
|
delta_x = right_eye[0] - left_eye[0]
|
|
delta_y = right_eye[1] - left_eye[1]
|
|
angle = np.arctan2(delta_y, delta_x) * (180.0 / np.pi)
|
|
|
|
|
|
center = ((x + x + width) // 2, (y + y + height) // 2)
|
|
rot_matrix = cv2.getRotationMatrix2D(center, angle, scale=1.0)
|
|
|
|
|
|
aligned_image = cv2.warpAffine(image_rgb, rot_matrix, (image_rgb.shape[1], image_rgb.shape[0]))
|
|
aligned_face = aligned_image[y:y + height, x:x + width]
|
|
|
|
return Image.fromarray(aligned_face)
|
|
|
|
|
|
def create_faiss_index(known_encodings):
|
|
dimension = known_encodings.shape[1]
|
|
index = faiss.IndexFlatL2(dimension)
|
|
index.add(known_encodings)
|
|
return index
|
|
|
|
|
|
def augment_image(image, num_augmented=5):
|
|
"""
|
|
Apply data augmentation to an image.
|
|
|
|
Parameters:
|
|
image (PIL.Image): The image to augment.
|
|
num_augmented (int): Number of augmented images to generate.
|
|
|
|
Returns:
|
|
List[PIL.Image]: List of augmented images.
|
|
"""
|
|
image = np.array(image)
|
|
|
|
|
|
aug = iaa.Sequential([
|
|
iaa.Fliplr(0.5),
|
|
iaa.Affine(rotate=(-25, 25)),
|
|
iaa.AdditiveGaussianNoise(scale=(0, 0.05*255)),
|
|
iaa.Multiply((0.8, 1.2)),
|
|
iaa.GaussianBlur(sigma=(0.0, 1.0))
|
|
])
|
|
|
|
|
|
augmented_images = [Image.fromarray(aug(image=image)) for _ in range(num_augmented)]
|
|
return augmented_images
|
|
|
|
|
|
@app.post("/create/")
|
|
async def preprocess_and_save_augmented_encodings(image: UploadFile = File(...), num_augmented: int = 5):
|
|
known_encodings = []
|
|
known_labels = []
|
|
|
|
|
|
image_bytes = await image.read()
|
|
original_image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
|
|
|
|
|
|
augmented_images = augment_image(original_image, num_augmented=num_augmented)
|
|
|
|
|
|
images_to_encode = [original_image] + augmented_images
|
|
|
|
for img in images_to_encode:
|
|
img_array = np.array(img)
|
|
|
|
encodings = face_recognition.face_encodings(img_array)
|
|
if encodings:
|
|
encoding = encodings[0]
|
|
|
|
known_encodings.append(encoding)
|
|
known_labels.append(image.filename)
|
|
|
|
|
|
encodings_file = "face_encoding.pkl"
|
|
save_encodings(np.array(known_encodings), known_labels, encodings_file)
|
|
|
|
return JSONResponse(content={"status": "Success", "message": "Augmented encodings created and saved."})
|
|
|
|
@app.post("/encode/")
|
|
async def encode_face(image: UploadFile = File(...)):
|
|
|
|
image_bytes = await image.read()
|
|
pil_image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
|
|
|
|
|
|
try:
|
|
aligned_face = detect_and_align_face(pil_image)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
|
|
encodings_file = "face_encoding.pkl"
|
|
known_encodings, known_labels = load_encodings(encodings_file)
|
|
|
|
|
|
encodings = face_recognition.face_encodings(np.array(aligned_face))
|
|
if not encodings:
|
|
raise HTTPException(status_code=400, detail="No face encoding found.")
|
|
|
|
|
|
known_encodings = list(known_encodings)
|
|
known_encodings.append(encodings[0])
|
|
known_labels.append(image.filename)
|
|
|
|
|
|
save_encodings(np.array(known_encodings), known_labels, encodings_file)
|
|
|
|
return JSONResponse(content={"status": "Success", "message": "Face encoded and saved."})
|
|
|
|
@app.post("/match/")
|
|
async def match_face(image: UploadFile = File(...), similarity_threshold: float = 70.0):
|
|
|
|
image_bytes = await image.read()
|
|
pil_image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
|
|
|
|
|
|
try:
|
|
aligned_face = detect_and_align_face(pil_image)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
|
|
encodings_file = "face_encoding.pkl"
|
|
known_encodings, known_labels = load_encodings(encodings_file)
|
|
|
|
if len(known_encodings) == 0:
|
|
raise HTTPException(status_code=400, detail="No known faces in the database. Please add some faces first.")
|
|
|
|
|
|
target_encodings = face_recognition.face_encodings(np.array(aligned_face))
|
|
if not target_encodings:
|
|
raise HTTPException(status_code=400, detail="No face encoding found.")
|
|
|
|
target_encoding = target_encodings[0].reshape(1, -1)
|
|
|
|
|
|
index = create_faiss_index(np.array(known_encodings))
|
|
distances, indices = index.search(target_encoding, 1)
|
|
|
|
best_match_index = indices[0][0]
|
|
best_similarity_percentage = (1 - distances[0][0]) * 100
|
|
|
|
|
|
is_real = True
|
|
|
|
|
|
try:
|
|
result = DeepFace.extract_faces(img_path=image.filename, anti_spoofing=True)
|
|
if result and isinstance(result, list):
|
|
is_real = result[0].get('is_real', True)
|
|
except Exception as e:
|
|
|
|
is_real = False
|
|
|
|
return JSONResponse(content={
|
|
"status": "Success",
|
|
"similarity": f"{best_similarity_percentage:.2f}%",
|
|
"is_real": is_real,
|
|
"message": "Face matched successfully" if best_similarity_percentage >= similarity_threshold else "Face not matched"
|
|
})
|
|
|
|
if __name__ == '__main__':
|
|
import uvicorn
|
|
uvicorn.run(app, host='0.0.0.0', port=8000)
|
|
|