import os import sys os.system('git clone https://github.com/facebookresearch/av_hubert.git') os.chdir('/home/user/app/av_hubert') os.system('git submodule init') os.system('git submodule update') os.chdir('/home/user/app/av_hubert/fairseq') os.system('pip install ./') os.system('pip install scipy') os.system('pip install sentencepiece') os.system('pip install python_speech_features') os.system('pip install scikit-video') os.system('pip install transformers') os.system('pip install gradio==3.12') os.system('pip install numpy==1.23.3') os.chdir('/home/user/app') os.makedirs("./result", exist_ok = True) os.makedirs("./video/và/test", exist_ok = True) # sys.path.append('/home/user/app/av_hubert') sys.path.append('/home/user/app/av_hubert/avhubert') print(sys.path) print(os.listdir()) print(sys.argv, type(sys.argv)) sys.argv.append('dummy') import dlib, cv2, os import numpy as np import skvideo import skvideo.io from tqdm import tqdm from preparation.align_mouth import landmarks_interpolate, crop_patch, write_video_ffmpeg from base64 import b64encode import torch import cv2 import tempfile from argparse import Namespace import fairseq from fairseq import checkpoint_utils, options, tasks, utils from fairseq.dataclass.configs import GenerationConfig from huggingface_hub import hf_hub_download import gradio as gr from pytube import YouTube # os.chdir('/home/user/app/av_hubert/avhubert') user_dir = "/home/user/app/av_hubert/avhubert" utils.import_user_module(Namespace(user_dir=user_dir)) data_dir = "/home/user/app/video" # ckpt_path = hf_hub_download('vumichien/AV-HuBERT', 'model.pt') face_detector_path = "/home/user/app/mmod_human_face_detector.dat" face_predictor_path = "/home/user/app/shape_predictor_68_face_landmarks.dat" mean_face_path = "/home/user/app/20words_mean_face.npy" mouth_roi_path = "/home/user/app/roi.mp4" output_video_path = "/home/user/app/video/và/test" modalities = ["video"] gen_subset = "test" gen_cfg = GenerationConfig(beam=20) models, saved_cfg, task = checkpoint_utils.load_model_ensemble_and_task([ckpt_path]) models = [model.eval().cuda() if torch.cuda.is_available() else model.eval() for model in models] saved_cfg.task.modalities = modalities saved_cfg.task.data = data_dir saved_cfg.task.label_dir = data_dir task = tasks.setup_task(saved_cfg.task) generator = task.build_generator(models, gen_cfg) def get_youtube(video_url): yt = YouTube(video_url) abs_video_path = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first().download() print("Success download video") print(abs_video_path) return abs_video_path import dlib, cv2, os import numpy as np import skvideo import skvideo.io from tqdm import tqdm from preparation.align_mouth import landmarks_interpolate, crop_patch, write_video_ffmpeg from IPython.display import HTML from base64 import b64encode import numpy as np def convert_bgr2gray(data): # np.stack(배열_1, 배열_2, axis=0): 지정한 axis를 완전히 새로운 axis로 생각 return np.stack([cv2.cvtColor(_, cv2.COLOR_BGR2GRAY) for _ in data], axis=0) def save2npz(filename, data=None): """save2npz. :param filename: str, the fileanme where the data will be saved. :param data: ndarray, arrays to save to the file. """ assert data is not None, "data is {}".format(data) if not os.path.exists(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) np.savez_compressed(filename, data=data) def detect_landmark(image, detector, predictor): gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) face_locations = detector(gray, 1) coords = None for (_, face_location) in enumerate(face_locations): if torch.cuda.is_available(): rect = face_location.rect else: rect = face_location shape = predictor(gray, rect) coords = np.zeros((68, 2), dtype=np.int32) for i in range(0, 68): coords[i] = (shape.part(i).x, shape.part(i).y) return coords def preprocess_video(input_video_path): if torch.cuda.is_available(): detector = dlib.cnn_face_detection_model_v1(face_detector_path) else: detector = dlib.get_frontal_face_detector() predictor = dlib.shape_predictor(face_predictor_path) STD_SIZE = (256, 256) mean_face_landmarks = np.load(mean_face_path) stablePntsIDs = [33, 36, 39, 42, 45] videogen = skvideo.io.vread(input_video_path) frames = np.array([frame for frame in videogen]) landmarks = [] for frame in tqdm(frames): landmark = detect_landmark(frame, detector, predictor) landmarks.append(landmark) preprocessed_landmarks = landmarks_interpolate(landmarks) rois = crop_patch(input_video_path, preprocessed_landmarks, mean_face_landmarks, stablePntsIDs, STD_SIZE, window_margin=12, start_idx=48, stop_idx=68, crop_height=96, crop_width=96) rois_gray=convert_bgr2gray(rois) save2npz(output_video_path, data=rois_gray) write_video_ffmpeg(rois, mouth_roi_path, "/usr/bin/ffmpeg") return mouth_roi_path def predict(process_video): os.chdir('/home/user/app') return os.system('bash TestVisual.sh') # ---- Gradio Layout ----- youtube_url_in = gr.Textbox(label="Youtube url", lines=1, interactive=True) video_in = gr.Video(label="Input Video", mirror_webcam=False, interactive=True) video_out = gr.Video(label="Audio Visual Video", mirror_webcam=False, interactive=True) demo = gr.Blocks() demo.encrypt = False text_output = gr.Textbox() with demo: # gr.Markdown(''' #