File size: 8,309 Bytes
820b323
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
import pickle
import numpy as np
import face_recognition
from PIL import Image
import io
from mtcnn import MTCNN
import cv2
import faiss
import os
import imgaug.augmenters as iaa
from deepface import DeepFace

app = FastAPI()

# Load encodings from the file
def load_encodings(file_path):
    if not os.path.exists(file_path):
        return np.array([]), []
    with open(file_path, "rb") as file:
        data = pickle.load(file)
    return np.array(data["encodings"]), data["labels"]

# Save encodings to the file
def save_encodings(encodings, labels, file_path):
    data = {"encodings": encodings, "labels": labels}
    with open(file_path, "wb") as file:
        pickle.dump(data, file)

# Detect and align face
def detect_and_align_face(image):
    detector = MTCNN()  # Initialize the MTCNN face detector
    image_rgb = cv2.cvtColor(np.array(image), cv2.COLOR_BGR2RGB)  # Convert the image to RGB format
    detections = detector.detect_faces(image_rgb)  # Detect faces in the image

    if len(detections) == 0:
        raise ValueError("No face detected in the image.")

    detection = detections[0]  # Assume the first detected face
    x, y, width, height = detection['box']  # Get the bounding box of the face
    keypoints = detection['keypoints']  # Get facial keypoints (eyes, nose, mouth)
    face = image_rgb[y:y + height, x:x + width]  # Extract the face from the image

    # Calculate the angle to align the face based on eye positions
    left_eye = keypoints['left_eye']
    right_eye = keypoints['right_eye']
    delta_x = right_eye[0] - left_eye[0]
    delta_y = right_eye[1] - left_eye[1]
    angle = np.arctan2(delta_y, delta_x) * (180.0 / np.pi)

    # Compute the center of the face and create a rotation matrix
    center = ((x + x + width) // 2, (y + y + height) // 2)
    rot_matrix = cv2.getRotationMatrix2D(center, angle, scale=1.0)

    # Rotate the image to align the face
    aligned_image = cv2.warpAffine(image_rgb, rot_matrix, (image_rgb.shape[1], image_rgb.shape[0]))
    aligned_face = aligned_image[y:y + height, x:x + width]  # Extract the aligned face

    return Image.fromarray(aligned_face)  # Convert to PIL Image format and return

# Create FAISS index
def create_faiss_index(known_encodings):
    dimension = known_encodings.shape[1]  # Get the dimensionality of the encodings
    index = faiss.IndexFlatL2(dimension)  # Create a FAISS index using L2 distance
    index.add(known_encodings)  # Add known encodings to the index
    return index  # Return the FAISS index

# Augment image function
def augment_image(image, num_augmented=5):
    """

    Apply data augmentation to an image.

    

    Parameters:

    image (PIL.Image): The image to augment.

    num_augmented (int): Number of augmented images to generate.

    

    Returns:

    List[PIL.Image]: List of augmented images.

    """
    image = np.array(image)

    # Define a sequence of augmentation techniques
    aug = iaa.Sequential([
        iaa.Fliplr(0.5),  # horizontal flips
        iaa.Affine(rotate=(-25, 25)),  # rotation
        iaa.AdditiveGaussianNoise(scale=(0, 0.05*255)),  # noise
        iaa.Multiply((0.8, 1.2)),  # brightness
        iaa.GaussianBlur(sigma=(0.0, 1.0))  # blur
    ])

    # Generate augmented images
    augmented_images = [Image.fromarray(aug(image=image)) for _ in range(num_augmented)]
    return augmented_images

# Endpoint to process and save augmented encodings
@app.post("/create/")
async def preprocess_and_save_augmented_encodings(image: UploadFile = File(...), num_augmented: int = 5):
    known_encodings = []
    known_labels = []

    # Load the uploaded image
    image_bytes = await image.read()
    original_image = Image.open(io.BytesIO(image_bytes)).convert("RGB")  # Ensure the image is in RGB format

    # Augment the image
    augmented_images = augment_image(original_image, num_augmented=num_augmented)

    # Include the original image in the list of images to encode
    images_to_encode = [original_image] + augmented_images

    for img in images_to_encode:
        img_array = np.array(img)
        # Encode the face
        encodings = face_recognition.face_encodings(img_array)
        if encodings:
            encoding = encodings[0]
            # Store the encoding and the corresponding label
            known_encodings.append(encoding)
            known_labels.append(image.filename)  # Use the uploaded image filename as the label

    # Save encodings and labels to a file
    encodings_file = "face_encoding.pkl"
    save_encodings(np.array(known_encodings), known_labels, encodings_file)

    return JSONResponse(content={"status": "Success", "message": "Augmented encodings created and saved."})

@app.post("/encode/")
async def encode_face(image: UploadFile = File(...)):
    # Load the image from the uploaded file
    image_bytes = await image.read()
    pil_image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
    
    # Align the face
    try:
        aligned_face = detect_and_align_face(pil_image)
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    
    # Load existing encodings
    encodings_file = "face_encoding.pkl"
    known_encodings, known_labels = load_encodings(encodings_file)
    
    # Encode the face
    encodings = face_recognition.face_encodings(np.array(aligned_face))
    if not encodings:
        raise HTTPException(status_code=400, detail="No face encoding found.")
    
    # Append the new encoding and label
    known_encodings = list(known_encodings)
    known_encodings.append(encodings[0])
    known_labels.append(image.filename)
    
    # Save the updated encodings
    save_encodings(np.array(known_encodings), known_labels, encodings_file)
    
    return JSONResponse(content={"status": "Success", "message": "Face encoded and saved."})

@app.post("/match/")
async def match_face(image: UploadFile = File(...), similarity_threshold: float = 70.0):
    # Load the image from the uploaded file
    image_bytes = await image.read()
    pil_image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
    
    # Align the face
    try:
        aligned_face = detect_and_align_face(pil_image)
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    
    # Load existing encodings
    encodings_file = "face_encoding.pkl"
    known_encodings, known_labels = load_encodings(encodings_file)
    
    if len(known_encodings) == 0:
        raise HTTPException(status_code=400, detail="No known faces in the database. Please add some faces first.")
    
    # Encode the face
    target_encodings = face_recognition.face_encodings(np.array(aligned_face))
    if not target_encodings:
        raise HTTPException(status_code=400, detail="No face encoding found.")
    
    target_encoding = target_encodings[0].reshape(1, -1)
    
    # Create FAISS index and search for the best match
    index = create_faiss_index(np.array(known_encodings))
    distances, indices = index.search(target_encoding, 1)
    
    best_match_index = indices[0][0]
    best_similarity_percentage = (1 - distances[0][0]) * 100
    
    # Default to True (real) for spoof detection
    is_real = True
    
    # Perform face spoof detection using DeepFace
    try:
        result = DeepFace.extract_faces(img_path=image.filename, anti_spoofing=True)
        if result and isinstance(result, list):
            is_real = result[0].get('is_real', True)
    except Exception as e:
        # Log the exception if necessary, but do not interrupt the program
        is_real = False  # Conservative approach if spoof detection fails
    
    return JSONResponse(content={
        "status": "Success",
        "similarity": f"{best_similarity_percentage:.2f}%",
        "is_real": is_real,
        "message": "Face matched successfully" if best_similarity_percentage >= similarity_threshold else "Face not matched"
    })

if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host='0.0.0.0', port=8000)