LipCoordNet / lip_coordinate_extraction /lips_coords_extractor.py
ffeew's picture
init
ca51874
import cv2
import dlib
import json
import glob
import os
from multiprocessing import Pool
LIP_COORDINATES_DIRECTORY = "lip_coordinates"
ERROR_DIRECTORY = "error_videos"
# path to the original GRID dataset whose videos are converted to frames
GRID_IMAGES_DIRECTORY = "lip/GRID_imgs"
train_unseen_list = "data/unseen_val.txt"
train_overlap_list = "data/overlap_train.txt"
test_unseen_list = "data/unseen_val.txt"
test_overlap_list = "data/overlap_val.txt"
def load_data_list(data_path, dictionary):
with open(data_path, "r") as f:
for line in f.readlines():
line = line.strip()
speaker = line.split("/")[-4]
vid = line.split("/")[-1]
dictionary[f"{speaker}/{vid}"] = 1
return dictionary
def extract_lip_coordinates(detector, predictor, img_path):
# used to preprocess the original image frames in the GRID dataset to extract the lip coordinates
image = cv2.imread(img_path)
image = cv2.resize(image, (600, 500))
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
rects = detector(gray)
assert len(rects) == 1
for rect in rects:
# extract the coordinates of the bounding box
x1 = rect.left()
y1 = rect.top()
x2 = rect.right()
y2 = rect.bottom()
# apply the shape predictor to the face ROI
shape = predictor(gray, rect)
x = []
y = []
for n in range(48, 68):
x.append(shape.part(n).x)
y.append(shape.part(n).y)
return [x, y]
def log_error_video(video_path):
print("Error: ", video_path)
with open(ERROR_DIRECTORY + "/error_videos.txt", "a") as f:
f.write(video_path + "\n")
data_dict = {}
data_dict = load_data_list(train_unseen_list, data_dict)
data_dict = load_data_list(train_overlap_list, data_dict)
data_dict = load_data_list(test_unseen_list, data_dict)
data_dict = load_data_list(test_overlap_list, data_dict)
speakers = glob.glob(GRID_IMAGES_DIRECTORY + "/*")
print(speakers[0])
def generate_lip_coordinates(speakers):
file_path_sep = "\\"
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor(
"lip_coordinate_extraction/shape_predictor_68_face_landmarks_GTX.dat"
)
for speaker in speakers:
print(speaker)
videos = glob.glob(speaker + "/*")
for video in videos:
print(video)
frames = glob.glob(video + "/*.jpg")
if len(frames) < 50: # filter out bad videos
continue
vid = {}
try:
frames = sorted(
frames,
key=lambda x: int(x.split(file_path_sep)[-1].split(".")[0]),
)
for frame in frames:
retry = 3
while retry > 0:
try:
coords = extract_lip_coordinates(detector, predictor, frame)
break
except Exception as e:
retry -= 1
print("Error: ", video)
print(e)
print("retrying...")
vid[frame.split(file_path_sep)[-1].split(".")[0]] = coords
vid_path = video.split(file_path_sep)
save_path = (
LIP_COORDINATES_DIRECTORY
+ "/"
+ vid_path[-2]
+ "/"
+ vid_path[-1]
+ ".json"
)
if not os.path.exists(LIP_COORDINATES_DIRECTORY + "/" + vid_path[-2]):
os.makedirs(LIP_COORDINATES_DIRECTORY + "/" + vid_path[-2])
with open(
save_path,
"w",
) as f:
json.dump(vid, f)
except Exception as e:
print(e)
log_error_video(video)
def generate_lip_coordinates(speakers):
file_path_sep = "\\"
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor(
"lip_coordinate_extraction/shape_predictor_68_face_landmarks_GTX.dat"
)
for speaker in speakers:
print(speaker)
videos = glob.glob(speaker + "/*")
for video in videos:
# if (
# video.split(file_path_sep)[-2] + "/" + video.split(file_path_sep)[-1]
# not in data_dict
# ):
# continue
print(video)
frames = glob.glob(video + "/*.jpg")
if len(frames) < 50: # filter out bad videos
continue
vid = {}
try:
frames = sorted(
frames,
key=lambda x: int(x.split(file_path_sep)[-1].split(".")[0]),
)
for frame in frames:
retry = 3
while retry > 0:
try:
coords = extract_lip_coordinates(detector, predictor, frame)
break
except Exception as e:
retry -= 1
print("Error: ", video)
print(e)
print("retrying...")
vid[frame.split(file_path_sep)[-1].split(".")[0]] = coords
vid_path = video.split(file_path_sep)
save_path = (
LIP_COORDINATES_DIRECTORY
+ "/"
+ vid_path[-2]
+ "/"
+ vid_path[-1]
+ ".json"
)
if not os.path.exists(LIP_COORDINATES_DIRECTORY + "/" + vid_path[-2]):
os.makedirs(LIP_COORDINATES_DIRECTORY + "/" + vid_path[-2])
with open(
save_path,
"w",
) as f:
json.dump(vid, f)
except Exception as e:
print(e)
log_error_video(video)
num_processes = 8
speaker_groups = []
speaker_interval = len(speakers) // num_processes
for i in range(num_processes):
if i == 4:
speaker_groups.append(speakers[i * speaker_interval :])
else:
speaker_groups.append(
speakers[i * speaker_interval : (i + 1) * speaker_interval]
)
if __name__ == "__main__":
with Pool(num_processes) as p:
p.map(generate_lip_coordinates, speaker_groups)