|
import cv2 |
|
import dlib |
|
import json |
|
import glob |
|
import os |
|
from multiprocessing import Pool |
|
|
|
LIP_COORDINATES_DIRECTORY = "lip_coordinates" |
|
ERROR_DIRECTORY = "error_videos" |
|
|
|
|
|
GRID_IMAGES_DIRECTORY = "lip/GRID_imgs" |
|
train_unseen_list = "data/unseen_val.txt" |
|
train_overlap_list = "data/overlap_train.txt" |
|
test_unseen_list = "data/unseen_val.txt" |
|
test_overlap_list = "data/overlap_val.txt" |
|
|
|
|
|
def load_data_list(data_path, dictionary): |
|
with open(data_path, "r") as f: |
|
for line in f.readlines(): |
|
line = line.strip() |
|
speaker = line.split("/")[-4] |
|
vid = line.split("/")[-1] |
|
dictionary[f"{speaker}/{vid}"] = 1 |
|
return dictionary |
|
|
|
|
|
def extract_lip_coordinates(detector, predictor, img_path): |
|
|
|
image = cv2.imread(img_path) |
|
image = cv2.resize(image, (600, 500)) |
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
|
|
|
rects = detector(gray) |
|
assert len(rects) == 1 |
|
for rect in rects: |
|
|
|
x1 = rect.left() |
|
y1 = rect.top() |
|
x2 = rect.right() |
|
y2 = rect.bottom() |
|
|
|
|
|
shape = predictor(gray, rect) |
|
x = [] |
|
y = [] |
|
for n in range(48, 68): |
|
x.append(shape.part(n).x) |
|
y.append(shape.part(n).y) |
|
return [x, y] |
|
|
|
|
|
def log_error_video(video_path): |
|
print("Error: ", video_path) |
|
with open(ERROR_DIRECTORY + "/error_videos.txt", "a") as f: |
|
f.write(video_path + "\n") |
|
|
|
|
|
data_dict = {} |
|
data_dict = load_data_list(train_unseen_list, data_dict) |
|
data_dict = load_data_list(train_overlap_list, data_dict) |
|
data_dict = load_data_list(test_unseen_list, data_dict) |
|
data_dict = load_data_list(test_overlap_list, data_dict) |
|
|
|
|
|
speakers = glob.glob(GRID_IMAGES_DIRECTORY + "/*") |
|
print(speakers[0]) |
|
|
|
|
|
def generate_lip_coordinates(speakers): |
|
file_path_sep = "\\" |
|
detector = dlib.get_frontal_face_detector() |
|
predictor = dlib.shape_predictor( |
|
"lip_coordinate_extraction/shape_predictor_68_face_landmarks_GTX.dat" |
|
) |
|
for speaker in speakers: |
|
print(speaker) |
|
videos = glob.glob(speaker + "/*") |
|
for video in videos: |
|
print(video) |
|
frames = glob.glob(video + "/*.jpg") |
|
if len(frames) < 50: |
|
continue |
|
vid = {} |
|
try: |
|
frames = sorted( |
|
frames, |
|
key=lambda x: int(x.split(file_path_sep)[-1].split(".")[0]), |
|
) |
|
for frame in frames: |
|
retry = 3 |
|
while retry > 0: |
|
try: |
|
coords = extract_lip_coordinates(detector, predictor, frame) |
|
break |
|
except Exception as e: |
|
retry -= 1 |
|
print("Error: ", video) |
|
print(e) |
|
print("retrying...") |
|
|
|
vid[frame.split(file_path_sep)[-1].split(".")[0]] = coords |
|
vid_path = video.split(file_path_sep) |
|
save_path = ( |
|
LIP_COORDINATES_DIRECTORY |
|
+ "/" |
|
+ vid_path[-2] |
|
+ "/" |
|
+ vid_path[-1] |
|
+ ".json" |
|
) |
|
|
|
if not os.path.exists(LIP_COORDINATES_DIRECTORY + "/" + vid_path[-2]): |
|
os.makedirs(LIP_COORDINATES_DIRECTORY + "/" + vid_path[-2]) |
|
|
|
with open( |
|
save_path, |
|
"w", |
|
) as f: |
|
json.dump(vid, f) |
|
except Exception as e: |
|
print(e) |
|
log_error_video(video) |
|
|
|
|
|
def generate_lip_coordinates(speakers): |
|
file_path_sep = "\\" |
|
detector = dlib.get_frontal_face_detector() |
|
predictor = dlib.shape_predictor( |
|
"lip_coordinate_extraction/shape_predictor_68_face_landmarks_GTX.dat" |
|
) |
|
for speaker in speakers: |
|
print(speaker) |
|
videos = glob.glob(speaker + "/*") |
|
for video in videos: |
|
|
|
|
|
|
|
|
|
|
|
print(video) |
|
frames = glob.glob(video + "/*.jpg") |
|
if len(frames) < 50: |
|
continue |
|
vid = {} |
|
try: |
|
frames = sorted( |
|
frames, |
|
key=lambda x: int(x.split(file_path_sep)[-1].split(".")[0]), |
|
) |
|
for frame in frames: |
|
retry = 3 |
|
while retry > 0: |
|
try: |
|
coords = extract_lip_coordinates(detector, predictor, frame) |
|
break |
|
except Exception as e: |
|
retry -= 1 |
|
print("Error: ", video) |
|
print(e) |
|
print("retrying...") |
|
|
|
vid[frame.split(file_path_sep)[-1].split(".")[0]] = coords |
|
vid_path = video.split(file_path_sep) |
|
save_path = ( |
|
LIP_COORDINATES_DIRECTORY |
|
+ "/" |
|
+ vid_path[-2] |
|
+ "/" |
|
+ vid_path[-1] |
|
+ ".json" |
|
) |
|
|
|
if not os.path.exists(LIP_COORDINATES_DIRECTORY + "/" + vid_path[-2]): |
|
os.makedirs(LIP_COORDINATES_DIRECTORY + "/" + vid_path[-2]) |
|
|
|
with open( |
|
save_path, |
|
"w", |
|
) as f: |
|
json.dump(vid, f) |
|
except Exception as e: |
|
print(e) |
|
log_error_video(video) |
|
|
|
|
|
num_processes = 8 |
|
|
|
speaker_groups = [] |
|
speaker_interval = len(speakers) // num_processes |
|
for i in range(num_processes): |
|
if i == 4: |
|
speaker_groups.append(speakers[i * speaker_interval :]) |
|
else: |
|
speaker_groups.append( |
|
speakers[i * speaker_interval : (i + 1) * speaker_interval] |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
with Pool(num_processes) as p: |
|
p.map(generate_lip_coordinates, speaker_groups) |
|
|