FV-V2 / main.py
songhieng's picture
Upload 2 files
820b323 verified
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
import pickle
import numpy as np
import face_recognition
from PIL import Image
import io
from mtcnn import MTCNN
import cv2
import faiss
import os
import imgaug.augmenters as iaa
from deepface import DeepFace
app = FastAPI()
# Load encodings from the file
def load_encodings(file_path):
if not os.path.exists(file_path):
return np.array([]), []
with open(file_path, "rb") as file:
data = pickle.load(file)
return np.array(data["encodings"]), data["labels"]
# Save encodings to the file
def save_encodings(encodings, labels, file_path):
data = {"encodings": encodings, "labels": labels}
with open(file_path, "wb") as file:
pickle.dump(data, file)
# Detect and align face
def detect_and_align_face(image):
detector = MTCNN() # Initialize the MTCNN face detector
image_rgb = cv2.cvtColor(np.array(image), cv2.COLOR_BGR2RGB) # Convert the image to RGB format
detections = detector.detect_faces(image_rgb) # Detect faces in the image
if len(detections) == 0:
raise ValueError("No face detected in the image.")
detection = detections[0] # Assume the first detected face
x, y, width, height = detection['box'] # Get the bounding box of the face
keypoints = detection['keypoints'] # Get facial keypoints (eyes, nose, mouth)
face = image_rgb[y:y + height, x:x + width] # Extract the face from the image
# Calculate the angle to align the face based on eye positions
left_eye = keypoints['left_eye']
right_eye = keypoints['right_eye']
delta_x = right_eye[0] - left_eye[0]
delta_y = right_eye[1] - left_eye[1]
angle = np.arctan2(delta_y, delta_x) * (180.0 / np.pi)
# Compute the center of the face and create a rotation matrix
center = ((x + x + width) // 2, (y + y + height) // 2)
rot_matrix = cv2.getRotationMatrix2D(center, angle, scale=1.0)
# Rotate the image to align the face
aligned_image = cv2.warpAffine(image_rgb, rot_matrix, (image_rgb.shape[1], image_rgb.shape[0]))
aligned_face = aligned_image[y:y + height, x:x + width] # Extract the aligned face
return Image.fromarray(aligned_face) # Convert to PIL Image format and return
# Create FAISS index
def create_faiss_index(known_encodings):
dimension = known_encodings.shape[1] # Get the dimensionality of the encodings
index = faiss.IndexFlatL2(dimension) # Create a FAISS index using L2 distance
index.add(known_encodings) # Add known encodings to the index
return index # Return the FAISS index
# Augment image function
def augment_image(image, num_augmented=5):
"""
Apply data augmentation to an image.
Parameters:
image (PIL.Image): The image to augment.
num_augmented (int): Number of augmented images to generate.
Returns:
List[PIL.Image]: List of augmented images.
"""
image = np.array(image)
# Define a sequence of augmentation techniques
aug = iaa.Sequential([
iaa.Fliplr(0.5), # horizontal flips
iaa.Affine(rotate=(-25, 25)), # rotation
iaa.AdditiveGaussianNoise(scale=(0, 0.05*255)), # noise
iaa.Multiply((0.8, 1.2)), # brightness
iaa.GaussianBlur(sigma=(0.0, 1.0)) # blur
])
# Generate augmented images
augmented_images = [Image.fromarray(aug(image=image)) for _ in range(num_augmented)]
return augmented_images
# Endpoint to process and save augmented encodings
@app.post("/create/")
async def preprocess_and_save_augmented_encodings(image: UploadFile = File(...), num_augmented: int = 5):
known_encodings = []
known_labels = []
# Load the uploaded image
image_bytes = await image.read()
original_image = Image.open(io.BytesIO(image_bytes)).convert("RGB") # Ensure the image is in RGB format
# Augment the image
augmented_images = augment_image(original_image, num_augmented=num_augmented)
# Include the original image in the list of images to encode
images_to_encode = [original_image] + augmented_images
for img in images_to_encode:
img_array = np.array(img)
# Encode the face
encodings = face_recognition.face_encodings(img_array)
if encodings:
encoding = encodings[0]
# Store the encoding and the corresponding label
known_encodings.append(encoding)
known_labels.append(image.filename) # Use the uploaded image filename as the label
# Save encodings and labels to a file
encodings_file = "face_encoding.pkl"
save_encodings(np.array(known_encodings), known_labels, encodings_file)
return JSONResponse(content={"status": "Success", "message": "Augmented encodings created and saved."})
@app.post("/encode/")
async def encode_face(image: UploadFile = File(...)):
# Load the image from the uploaded file
image_bytes = await image.read()
pil_image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
# Align the face
try:
aligned_face = detect_and_align_face(pil_image)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Load existing encodings
encodings_file = "face_encoding.pkl"
known_encodings, known_labels = load_encodings(encodings_file)
# Encode the face
encodings = face_recognition.face_encodings(np.array(aligned_face))
if not encodings:
raise HTTPException(status_code=400, detail="No face encoding found.")
# Append the new encoding and label
known_encodings = list(known_encodings)
known_encodings.append(encodings[0])
known_labels.append(image.filename)
# Save the updated encodings
save_encodings(np.array(known_encodings), known_labels, encodings_file)
return JSONResponse(content={"status": "Success", "message": "Face encoded and saved."})
@app.post("/match/")
async def match_face(image: UploadFile = File(...), similarity_threshold: float = 70.0):
# Load the image from the uploaded file
image_bytes = await image.read()
pil_image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
# Align the face
try:
aligned_face = detect_and_align_face(pil_image)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Load existing encodings
encodings_file = "face_encoding.pkl"
known_encodings, known_labels = load_encodings(encodings_file)
if len(known_encodings) == 0:
raise HTTPException(status_code=400, detail="No known faces in the database. Please add some faces first.")
# Encode the face
target_encodings = face_recognition.face_encodings(np.array(aligned_face))
if not target_encodings:
raise HTTPException(status_code=400, detail="No face encoding found.")
target_encoding = target_encodings[0].reshape(1, -1)
# Create FAISS index and search for the best match
index = create_faiss_index(np.array(known_encodings))
distances, indices = index.search(target_encoding, 1)
best_match_index = indices[0][0]
best_similarity_percentage = (1 - distances[0][0]) * 100
# Default to True (real) for spoof detection
is_real = True
# Perform face spoof detection using DeepFace
try:
result = DeepFace.extract_faces(img_path=image.filename, anti_spoofing=True)
if result and isinstance(result, list):
is_real = result[0].get('is_real', True)
except Exception as e:
# Log the exception if necessary, but do not interrupt the program
is_real = False # Conservative approach if spoof detection fails
return JSONResponse(content={
"status": "Success",
"similarity": f"{best_similarity_percentage:.2f}%",
"is_real": is_real,
"message": "Face matched successfully" if best_similarity_percentage >= similarity_threshold else "Face not matched"
})
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=8000)