Spaces:
Sleeping
Sleeping
File size: 15,006 Bytes
327b68f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2020 Imperial College London (Pingchuan Ma)
# Apache 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
""" Crop Mouth ROIs from videos for lipreading"""
# from msilib.schema import File
from ast import Pass
import os
import cv2 # OpenCV 라이브러리
import glob # 리눅스식 경로 표기법을 사용하여 원하는 폴더/파일 리스트 얻음
import argparse # 명령행 인자를 파싱해주는 모듈
import numpy as np
from collections import deque # collections 모듈에 있는 데크 불러오기 # 데크: 스택과 큐를 합친 자료구조
from utils import * # utils.py 모듈에 있는 모든 함수 불러오기
from transform import * # transform.py 모듈에 있는 모든 함수 불러오기
import dlib # face landmark 찾는 라이브러리
import face_alignment # face landmark 찾는 라이브러리
from PIL import Image
# 인자값을 받아서 처리하는 함수
def load_args(default_config=None):
# 인자값을 받아서 처리하는 함수
parser = argparse.ArgumentParser(description='Lipreading Pre-processing')
# 입력받을 인자값 등록
# -- utils
parser.add_argument('--video-direc', default=None, help='raw video directory')
parser.add_argument('--video-format', default='.mp4', help='raw video format')
parser.add_argument('--landmark-direc', default=None, help='landmark directory')
parser.add_argument('--filename-path', default='./vietnamese_detected_face_30.csv', help='list of detected video and its subject ID')
parser.add_argument('--save-direc', default=None, help='the directory of saving mouth ROIs')
# -- mean face utils
parser.add_argument('--mean-face', default='./20words_mean_face.npy', help='mean face pathname')
# -- mouthROIs utils
parser.add_argument('--crop-width', default=96, type=int, help='the width of mouth ROIs')
parser.add_argument('--crop-height', default=96, type=int, help='the height of mouth ROIs')
parser.add_argument('--start-idx', default=48, type=int, help='the start of landmark index')
parser.add_argument('--stop-idx', default=68, type=int, help='the end of landmark index')
parser.add_argument('--window-margin', default=12, type=int, help='window margin for smoothed_landmarks')
# -- convert to gray scale
parser.add_argument('--convert-gray', default=False, action='store_true', help='convert2grayscale')
# -- test set only
parser.add_argument('--testset-only', default=False, action='store_true', help='process testing set only')
# 입력받은 인자값을 args에 저장 (type: namespace)
args = parser.parse_args()
return args
args = load_args() # args 파싱 및 로드
# -- mean face utils
STD_SIZE = (256, 256)
mean_face_landmarks = np.load(args.mean_face) # 20words_mean_face.npy
stablePntsIDs = [33, 36, 39, 42, 45]
# 영상에서 랜드마크 받아서 입술 잘라내기
def crop_patch( video_pathname, landmarks):
"""Crop mouth patch
:param str video_pathname: pathname for the video_dieo # 영상 위치
:param list landmarks: interpolated landmarks # 보간된 랜드마크
"""
frame_idx = 0 # 프레임 인덱스 번호 0 으로 초기화
frame_gen = read_video(video_pathname) # 비디오 불러오기
# 무한 반복
while True:
try:
frame = frame_gen.__next__() ## -- BGR # 이미지 프레임 하나씩 불러오기
except StopIteration: # 더 이상 next 요소가 없으면 StopIterraion Exception 발생
break # while 빠져나가기
if frame_idx == 0: # 프레임 인덱스 번호가 0일 경우
q_frame, q_landmarks = deque(), deque() # 데크 생성
sequence = []
q_landmarks.append(landmarks[frame_idx]) # 프레임 인덱스 번호에 맞는 랜드마크 정보 추가
q_frame.append(frame) # 프레임 정보 추가
if len(q_frame) == args.window_margin:
smoothed_landmarks = np.mean(q_landmarks, axis=0) # 각 그룹의 같은 원소끼리 평균
cur_landmarks = q_landmarks.popleft() # 데크 제일 왼쪽 값 꺼내기
cur_frame = q_frame.popleft() # 데크 제일 왼쪽 값 꺼내기
# -- affine transformation # 아핀 변환
trans_frame, trans = warp_img( smoothed_landmarks[stablePntsIDs, :],
mean_face_landmarks[stablePntsIDs, :],
cur_frame,
STD_SIZE)
trans_landmarks = trans(cur_landmarks)
# -- crop mouth patch # 입술 잘라내기
sequence.append( cut_patch( trans_frame,
trans_landmarks[args.start_idx:args.stop_idx],
args.crop_height//2,
args.crop_width//2,))
if frame_idx == len(landmarks)-1:
while q_frame:
cur_frame = q_frame.popleft() # 데크 제일 왼쪽 값 꺼내기
# -- transform frame # 프레임 변환
trans_frame = apply_transform( trans, cur_frame, STD_SIZE)
# -- transform landmarks # 랜드마크 변환
trans_landmarks = trans(q_landmarks.popleft())
# -- crop mouth patch # 입술 잘라내기
sequence.append( cut_patch( trans_frame,
trans_landmarks[args.start_idx:args.stop_idx],
args.crop_height//2,
args.crop_width//2,))
return np.array(sequence) # 입술 numpy 반환
frame_idx += 1 # 프레임 인덱스 번호 증가
return None
# 랜드마크 보간
def landmarks_interpolate(landmarks):
"""Interpolate landmarks
param list landmarks: landmarks detected in raw videos # 원본 영상 데이터에서 검출한 랜드마크
"""
valid_frames_idx = [idx for idx, _ in enumerate(landmarks) if _ is not None] # 랜드마크 번호 list 생성
# 랜드마크 번호 list 가 비어있다면
if not valid_frames_idx:
return None
# 1부터 (랜드마크 번호 list 개수-1)만큼 for 문 반복
for idx in range(1, len(valid_frames_idx)):
if valid_frames_idx[idx] - valid_frames_idx[idx-1] == 1: # 현재 랜드마크 번호 - 이전 랜드마크 번호 == 1 일 경우
continue # 코드 실행 건너뛰기
else: # 아니라면
landmarks = linear_interpolate(landmarks, valid_frames_idx[idx-1], valid_frames_idx[idx]) # 랜드마크 업데이트(보간)
valid_frames_idx = [idx for idx, _ in enumerate(landmarks) if _ is not None] # 랜드마크 번호 list 생성
# -- Corner case: keep frames at the beginning or at the end failed to be detected. # 시작 또는 끝 프레임을 보관하지 못함
if valid_frames_idx:
landmarks[:valid_frames_idx[0]] = [landmarks[valid_frames_idx[0]]] * valid_frames_idx[0] # 랜드마크 첫번째 프레임 정보 저장
landmarks[valid_frames_idx[-1]:] = [landmarks[valid_frames_idx[-1]]] * (len(landmarks) - valid_frames_idx[-1]) # 랜드마크 마지막 프레임 정보 저장
valid_frames_idx = [idx for idx, _ in enumerate(landmarks) if _ is not None] # 랜드마크 번호 list 생성
# 랜드마크 번호 list 개수 == 보간한 랜드마크 개수 확인, 아니면 AssertionError 메시지를 띄움
assert len(valid_frames_idx) == len(landmarks), "not every frame has landmark" # 원하는 조건의 변수값을 보증하기 위해 사용
return landmarks # 랜드마크 반환
def get_yield(output_video):
for frame in output_video:
yield frame
lines = open(args.filename_path).read().splitlines() # 문자열을 '\n' 기준으로 쪼갠 후 list 생성
lines = list(filter(lambda x: 'test' == x.split('/')[-2], lines)) if args.testset_only else lines # args.testset_only 값이 있다면 test 폴더 속 파일명만 불러와서 list 생성, 아니라면 원래 lines 그대로 값 유지
# lines 개수만큼 반복문 실행
for filename_idx, line in enumerate(lines):
# 파일명, 사람id
filename, person_id = line.split(',')
print('idx: {} \tProcessing.\t{}'.format(filename_idx, filename)) # 파일 인덱스번호, 파일명 출력
video_pathname = os.path.join(args.video_direc, filename+args.video_format) # 영상디렉토리 + 파일명.비디오포맷/
landmarks_pathname = os.path.join(args.landmark_direc, filename+'.npz') # 저장디렉토리 + 랜드마크 파일명.npz
dst_pathname = os.path.join( args.save_direc, filename+'.npz') # 저장디렉토리 + 결과영상 파일명.npz
# 파일이 있는지 확인, 없으면 AssertionError 메시지를 띄움
assert os.path.isfile(video_pathname), "File does not exist. Path input: {}".format(video_pathname) # 원하는 조건의 변수값을 보증하기 위해 사용
# video 에 대한 face landmark npz 파일이 없고 영상 확장자 avi 인 경우 dlib 으로 직접 npz 파일 생성
if not os.path.exists(landmarks_pathname) and video_pathname.split('.')[-1] == 'mp4':
# dlib 사용해서 face landmark 찾기
def get_face_landmark(img):
detector_hog = dlib.get_frontal_face_detector()
dlib_rects = detector_hog(img, 1)
model_path = os.path.dirname(os.path.abspath(__file__)) + '/shape_predictor_68_face_landmarks.dat'
landmark_predictor = dlib.shape_predictor(model_path)
# dlib 으로 face landmark 찾기
list_landmarks = []
for dlib_rect in dlib_rects:
points = landmark_predictor(img, dlib_rect)
list_points = list(map(lambda p: (p.x, p.y), points.parts()))
list_landmarks.append(list_points)
input_width, input_height = img.shape
output_width, output_height = (256, 256)
width_rate = input_width / output_width
height_rate = input_height / output_height
img_rate = [(width_rate, height_rate)]*68
face_rate = np.array(img_rate)
eye_rate = np.array(img_rate[36:48])
# face landmark list 가 비어있지 않은 경우
if list_landmarks:
for dlib_rect, landmark in zip(dlib_rects, list_landmarks):
face_landmark = np.array(landmark) # face landmark
eye_landmark = np.array(landmark[36:48]) # eye landmark
return face_landmark, eye_landmark
# face landmark list 가 비어있는 경우
else:
landmark = [(0.0, 0.0)] * 68
face_landmark = np.array(landmark) # face landmark
eye_landmark = np.array(landmark[36:48]) # eye landmark
return face_landmark, eye_landmark
target_frames = 29 # 원하는 프레임 개수
video = videoToArray(video_pathname, is_gray=args.convert_gray) # 영상 정보 앞에 영상 프레임 개수를 추가한 numpy
output_video = frameAdjust(video, target_frames) # frame sampling (프레임 개수 맞추기)
multi_sub_landmarks = []
person_landmarks = []
frame_landmarks = []
for frame_idx, frame in enumerate(get_yield(output_video)):
print(f'\n ------------frame {frame_idx}------------ ')
facial_landmarks, eye_landmarks = get_face_landmark(frame) # dlib 사용해서 face landmark 찾기
person_landmarks = {
'id': 0,
'most_recent_fitting_scores': np.array([2.0,2.0,2.0]),
'facial_landmarks': facial_landmarks,
'roll': 7,
'yaw': 3.5,
'eye_landmarks': eye_landmarks,
'fitting_scores_updated': True,
'pitch': -0.05
}
frame_landmarks.append(person_landmarks)
multi_sub_landmarks.append(np.array(frame_landmarks.copy(), dtype=object))
multi_sub_landmarks = np.array(multi_sub_landmarks) # list to numpy
save2npz(landmarks_pathname, data=multi_sub_landmarks) # face landmark npz 저장
print('\n ------------ save npz ------------ \n')
# video 에 대한 face landmark npz 파일이 있는 경우
else:
# 파일이 있는지 확인, 없으면 AssertionError 메시지를 띄움
assert os.path.isfile(landmarks_pathname), "File does not exist. Path input: {}".format(landmarks_pathname) # 원하는 조건의 변수값을 보증하기 위해 사용
# 파일이 존재할 경우
if os.path.exists(dst_pathname):
continue # 코드 실행 건너뛰기
multi_sub_landmarks = np.load( landmarks_pathname, allow_pickle=True)['data'] # numpy 파일 열기
landmarks = [None] * len( multi_sub_landmarks) # 랜드마크 변수 초기화
for frame_idx in range(len(landmarks)):
try:
landmarks[frame_idx] = multi_sub_landmarks[frame_idx][int(person_id)]['facial_landmarks'].astype(np.float64) # 프레임 인덱스 번호에서 사람id의 얼굴 랜드마크 정보 가져오기
except IndexError: # 해당 인덱스 번호에 깂이 없으면 IndexError 발생
continue # 코드 실행 건너뛰기
# face landmark 가 [(0,0)]*68 이 아니면 랜드마크 보간 후 npz 파일 생성
landmarks_empty_list = []
landmarks_empty = [(0, 0)]*68
landmarks_empty = np.array(landmarks_empty, dtype=object)
for i in range(len(landmarks_empty)):
landmarks_empty_list.append(landmarks_empty.copy())
condition = landmarks != landmarks_empty_list
if condition:
# -- pre-process landmarks: interpolate frames not being detected.
preprocessed_landmarks = landmarks_interpolate(landmarks) # 랜드마크 보간
# 변수가 비어있지 않다면
if not preprocessed_landmarks:
continue # 코드 실행 건너뛰기
# -- crop
sequence = crop_patch(video_pathname, preprocessed_landmarks) # 영상에서 랜드마크 받아서 입술 잘라내기
# sequence가 비어있는지 확인, 비어있으면 AssertionError 메시지를 띄움
assert sequence is not None, "cannot crop from {}.".format(filename) # 원하는 조건의 변수값을 보증하기 위해 사용
# -- save
data = convert_bgr2gray(sequence) if args.convert_gray else sequence[...,::-1] # gray 변환
save2npz(dst_pathname, data=data) # 데이터를 npz 형식으로 저장
print('Done.') |