songhieng commited on
Commit
820b323
·
verified ·
1 Parent(s): 23acbcb

Upload 2 files

Browse files
Files changed (2) hide show
  1. main.py +213 -0
  2. requirements.txt +13 -0
main.py ADDED
@@ -0,0 +1,213 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, File, UploadFile, HTTPException
2
+ from fastapi.responses import JSONResponse
3
+ import pickle
4
+ import numpy as np
5
+ import face_recognition
6
+ from PIL import Image
7
+ import io
8
+ from mtcnn import MTCNN
9
+ import cv2
10
+ import faiss
11
+ import os
12
+ import imgaug.augmenters as iaa
13
+ from deepface import DeepFace
14
+
15
+ app = FastAPI()
16
+
17
+ # Load encodings from the file
18
+ def load_encodings(file_path):
19
+ if not os.path.exists(file_path):
20
+ return np.array([]), []
21
+ with open(file_path, "rb") as file:
22
+ data = pickle.load(file)
23
+ return np.array(data["encodings"]), data["labels"]
24
+
25
+ # Save encodings to the file
26
+ def save_encodings(encodings, labels, file_path):
27
+ data = {"encodings": encodings, "labels": labels}
28
+ with open(file_path, "wb") as file:
29
+ pickle.dump(data, file)
30
+
31
+ # Detect and align face
32
+ def detect_and_align_face(image):
33
+ detector = MTCNN() # Initialize the MTCNN face detector
34
+ image_rgb = cv2.cvtColor(np.array(image), cv2.COLOR_BGR2RGB) # Convert the image to RGB format
35
+ detections = detector.detect_faces(image_rgb) # Detect faces in the image
36
+
37
+ if len(detections) == 0:
38
+ raise ValueError("No face detected in the image.")
39
+
40
+ detection = detections[0] # Assume the first detected face
41
+ x, y, width, height = detection['box'] # Get the bounding box of the face
42
+ keypoints = detection['keypoints'] # Get facial keypoints (eyes, nose, mouth)
43
+ face = image_rgb[y:y + height, x:x + width] # Extract the face from the image
44
+
45
+ # Calculate the angle to align the face based on eye positions
46
+ left_eye = keypoints['left_eye']
47
+ right_eye = keypoints['right_eye']
48
+ delta_x = right_eye[0] - left_eye[0]
49
+ delta_y = right_eye[1] - left_eye[1]
50
+ angle = np.arctan2(delta_y, delta_x) * (180.0 / np.pi)
51
+
52
+ # Compute the center of the face and create a rotation matrix
53
+ center = ((x + x + width) // 2, (y + y + height) // 2)
54
+ rot_matrix = cv2.getRotationMatrix2D(center, angle, scale=1.0)
55
+
56
+ # Rotate the image to align the face
57
+ aligned_image = cv2.warpAffine(image_rgb, rot_matrix, (image_rgb.shape[1], image_rgb.shape[0]))
58
+ aligned_face = aligned_image[y:y + height, x:x + width] # Extract the aligned face
59
+
60
+ return Image.fromarray(aligned_face) # Convert to PIL Image format and return
61
+
62
+ # Create FAISS index
63
+ def create_faiss_index(known_encodings):
64
+ dimension = known_encodings.shape[1] # Get the dimensionality of the encodings
65
+ index = faiss.IndexFlatL2(dimension) # Create a FAISS index using L2 distance
66
+ index.add(known_encodings) # Add known encodings to the index
67
+ return index # Return the FAISS index
68
+
69
+ # Augment image function
70
+ def augment_image(image, num_augmented=5):
71
+ """
72
+ Apply data augmentation to an image.
73
+
74
+ Parameters:
75
+ image (PIL.Image): The image to augment.
76
+ num_augmented (int): Number of augmented images to generate.
77
+
78
+ Returns:
79
+ List[PIL.Image]: List of augmented images.
80
+ """
81
+ image = np.array(image)
82
+
83
+ # Define a sequence of augmentation techniques
84
+ aug = iaa.Sequential([
85
+ iaa.Fliplr(0.5), # horizontal flips
86
+ iaa.Affine(rotate=(-25, 25)), # rotation
87
+ iaa.AdditiveGaussianNoise(scale=(0, 0.05*255)), # noise
88
+ iaa.Multiply((0.8, 1.2)), # brightness
89
+ iaa.GaussianBlur(sigma=(0.0, 1.0)) # blur
90
+ ])
91
+
92
+ # Generate augmented images
93
+ augmented_images = [Image.fromarray(aug(image=image)) for _ in range(num_augmented)]
94
+ return augmented_images
95
+
96
+ # Endpoint to process and save augmented encodings
97
+ @app.post("/create/")
98
+ async def preprocess_and_save_augmented_encodings(image: UploadFile = File(...), num_augmented: int = 5):
99
+ known_encodings = []
100
+ known_labels = []
101
+
102
+ # Load the uploaded image
103
+ image_bytes = await image.read()
104
+ original_image = Image.open(io.BytesIO(image_bytes)).convert("RGB") # Ensure the image is in RGB format
105
+
106
+ # Augment the image
107
+ augmented_images = augment_image(original_image, num_augmented=num_augmented)
108
+
109
+ # Include the original image in the list of images to encode
110
+ images_to_encode = [original_image] + augmented_images
111
+
112
+ for img in images_to_encode:
113
+ img_array = np.array(img)
114
+ # Encode the face
115
+ encodings = face_recognition.face_encodings(img_array)
116
+ if encodings:
117
+ encoding = encodings[0]
118
+ # Store the encoding and the corresponding label
119
+ known_encodings.append(encoding)
120
+ known_labels.append(image.filename) # Use the uploaded image filename as the label
121
+
122
+ # Save encodings and labels to a file
123
+ encodings_file = "face_encoding.pkl"
124
+ save_encodings(np.array(known_encodings), known_labels, encodings_file)
125
+
126
+ return JSONResponse(content={"status": "Success", "message": "Augmented encodings created and saved."})
127
+
128
+ @app.post("/encode/")
129
+ async def encode_face(image: UploadFile = File(...)):
130
+ # Load the image from the uploaded file
131
+ image_bytes = await image.read()
132
+ pil_image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
133
+
134
+ # Align the face
135
+ try:
136
+ aligned_face = detect_and_align_face(pil_image)
137
+ except ValueError as e:
138
+ raise HTTPException(status_code=400, detail=str(e))
139
+
140
+ # Load existing encodings
141
+ encodings_file = "face_encoding.pkl"
142
+ known_encodings, known_labels = load_encodings(encodings_file)
143
+
144
+ # Encode the face
145
+ encodings = face_recognition.face_encodings(np.array(aligned_face))
146
+ if not encodings:
147
+ raise HTTPException(status_code=400, detail="No face encoding found.")
148
+
149
+ # Append the new encoding and label
150
+ known_encodings = list(known_encodings)
151
+ known_encodings.append(encodings[0])
152
+ known_labels.append(image.filename)
153
+
154
+ # Save the updated encodings
155
+ save_encodings(np.array(known_encodings), known_labels, encodings_file)
156
+
157
+ return JSONResponse(content={"status": "Success", "message": "Face encoded and saved."})
158
+
159
+ @app.post("/match/")
160
+ async def match_face(image: UploadFile = File(...), similarity_threshold: float = 70.0):
161
+ # Load the image from the uploaded file
162
+ image_bytes = await image.read()
163
+ pil_image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
164
+
165
+ # Align the face
166
+ try:
167
+ aligned_face = detect_and_align_face(pil_image)
168
+ except ValueError as e:
169
+ raise HTTPException(status_code=400, detail=str(e))
170
+
171
+ # Load existing encodings
172
+ encodings_file = "face_encoding.pkl"
173
+ known_encodings, known_labels = load_encodings(encodings_file)
174
+
175
+ if len(known_encodings) == 0:
176
+ raise HTTPException(status_code=400, detail="No known faces in the database. Please add some faces first.")
177
+
178
+ # Encode the face
179
+ target_encodings = face_recognition.face_encodings(np.array(aligned_face))
180
+ if not target_encodings:
181
+ raise HTTPException(status_code=400, detail="No face encoding found.")
182
+
183
+ target_encoding = target_encodings[0].reshape(1, -1)
184
+
185
+ # Create FAISS index and search for the best match
186
+ index = create_faiss_index(np.array(known_encodings))
187
+ distances, indices = index.search(target_encoding, 1)
188
+
189
+ best_match_index = indices[0][0]
190
+ best_similarity_percentage = (1 - distances[0][0]) * 100
191
+
192
+ # Default to True (real) for spoof detection
193
+ is_real = True
194
+
195
+ # Perform face spoof detection using DeepFace
196
+ try:
197
+ result = DeepFace.extract_faces(img_path=image.filename, anti_spoofing=True)
198
+ if result and isinstance(result, list):
199
+ is_real = result[0].get('is_real', True)
200
+ except Exception as e:
201
+ # Log the exception if necessary, but do not interrupt the program
202
+ is_real = False # Conservative approach if spoof detection fails
203
+
204
+ return JSONResponse(content={
205
+ "status": "Success",
206
+ "similarity": f"{best_similarity_percentage:.2f}%",
207
+ "is_real": is_real,
208
+ "message": "Face matched successfully" if best_similarity_percentage >= similarity_threshold else "Face not matched"
209
+ })
210
+
211
+ if __name__ == '__main__':
212
+ import uvicorn
213
+ uvicorn.run(app, host='0.0.0.0', port=8000)
requirements.txt ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ face_recognition
2
+ opencv-python
3
+ pillow
4
+ imgaug
5
+ faiss-cpu
6
+ mtcnn
7
+ fastapi
8
+ uvicorn
9
+ tensorflow-cpu
10
+ python-multipart
11
+ numpy
12
+ imgaug
13
+ deepface