|
import gradio as gr |
|
import os |
|
import shutil |
|
import yaml |
|
import tempfile |
|
import cv2 |
|
import huggingface_hub |
|
import subprocess |
|
import threading |
|
|
|
def stream_output(pipe): |
|
for line in iter(pipe.readline, ''): |
|
print(line, end='') |
|
pipe.close() |
|
|
|
HF_TKN = os.environ.get("GATED_HF_TOKEN") |
|
huggingface_hub.login(token=HF_TKN) |
|
|
|
huggingface_hub.hf_hub_download( |
|
repo_id='yzd-v/DWPose', |
|
filename='yolox_l.onnx', |
|
local_dir='./models/DWPose' |
|
) |
|
|
|
huggingface_hub.hf_hub_download( |
|
repo_id='yzd-v/DWPose', |
|
filename='dw-ll_ucoco_384.onnx', |
|
local_dir='./models/DWPose' |
|
) |
|
|
|
huggingface_hub.hf_hub_download( |
|
repo_id='ixaac/MimicMotion', |
|
filename='MimicMotion_1.pth', |
|
local_dir='./models' |
|
) |
|
|
|
def print_directory_contents(path): |
|
for root, dirs, files in os.walk(path): |
|
level = root.replace(path, '').count(os.sep) |
|
indent = ' ' * 4 * (level) |
|
print(f"{indent}{os.path.basename(root)}/") |
|
subindent = ' ' * 4 * (level + 1) |
|
for f in files: |
|
print(f"{subindent}{f}") |
|
|
|
def check_outputs_folder(folder_path): |
|
|
|
if os.path.exists(folder_path) and os.path.isdir(folder_path): |
|
|
|
for filename in os.listdir(folder_path): |
|
file_path = os.path.join(folder_path, filename) |
|
try: |
|
if os.path.isfile(file_path) or os.path.islink(file_path): |
|
os.unlink(file_path) |
|
elif os.path.isdir(file_path): |
|
shutil.rmtree(file_path) |
|
except Exception as e: |
|
print(f'Failed to delete {file_path}. Reason: {e}') |
|
else: |
|
print(f'The folder {folder_path} does not exist.') |
|
|
|
def check_for_mp4_in_outputs(): |
|
|
|
outputs_folder = './outputs' |
|
|
|
|
|
if not os.path.exists(outputs_folder): |
|
return None |
|
|
|
|
|
mp4_files = [f for f in os.listdir(outputs_folder) if f.endswith('.mp4')] |
|
|
|
|
|
if mp4_files: |
|
return os.path.join(outputs_folder, mp4_files[0]) |
|
else: |
|
return None |
|
|
|
def get_video_fps(video_path): |
|
|
|
video_capture = cv2.VideoCapture(video_path) |
|
|
|
if not video_capture.isOpened(): |
|
raise ValueError("Error opening video file") |
|
|
|
|
|
fps = video_capture.get(cv2.CAP_PROP_FPS) |
|
|
|
|
|
video_capture.release() |
|
|
|
return fps |
|
|
|
def infer(ref_image_in, ref_video_in): |
|
|
|
check_outputs_folder('./outputs') |
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
print("Temporary directory created:", temp_dir) |
|
|
|
|
|
ref_video_path = ref_video_in |
|
ref_image_path = ref_image_in |
|
num_frames = 16 |
|
resolution = 576 |
|
frames_overlap = 6 |
|
num_inference_steps = 25 |
|
noise_aug_strength = 0 |
|
guidance_scale = 2.0 |
|
sample_stride = 2 |
|
fps = 24 |
|
seed = 42 |
|
|
|
|
|
data = { |
|
'base_model_path': 'stabilityai/stable-video-diffusion-img2vid-xt-1-1', |
|
'ckpt_path': 'models/MimicMotion_1.pth', |
|
'test_case': [ |
|
{ |
|
'ref_video_path': ref_video_path, |
|
'ref_image_path': ref_image_path, |
|
'num_frames': num_frames, |
|
'resolution': resolution, |
|
'frames_overlap': frames_overlap, |
|
'num_inference_steps': num_inference_steps, |
|
'noise_aug_strength': noise_aug_strength, |
|
'guidance_scale': guidance_scale, |
|
'sample_stride': sample_stride, |
|
'fps': fps, |
|
'seed': seed |
|
} |
|
] |
|
} |
|
|
|
|
|
file_path = os.path.join(temp_dir, 'config.yaml') |
|
|
|
|
|
with open(file_path, 'w') as file: |
|
yaml.dump(data, file, default_flow_style=False) |
|
|
|
print("YAML file 'config.yaml' created successfully in", file_path) |
|
|
|
|
|
command = ['python', 'inference.py', '--inference_config', file_path] |
|
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1) |
|
|
|
|
|
stdout_thread = threading.Thread(target=stream_output, args=(process.stdout,)) |
|
stderr_thread = threading.Thread(target=stream_output, args=(process.stderr,)) |
|
|
|
|
|
|
|
stdout_thread.start() |
|
stderr_thread.start() |
|
|
|
|
|
process.wait() |
|
stdout_thread.join() |
|
stderr_thread.join() |
|
|
|
print("Inference script finished with return code:", process.returncode) |
|
|
|
|
|
print_directory_contents('./outputs') |
|
|
|
|
|
mp4_file_path = check_for_mp4_in_outputs() |
|
print(mp4_file_path) |
|
|
|
return mp4_file_path |
|
|
|
with gr.Blocks() as demo: |
|
with gr.Column(): |
|
with gr.Row(): |
|
with gr.Column(): |
|
with gr.Row(): |
|
ref_image_in = gr.Image(type="filepath") |
|
ref_video_in = gr.Video() |
|
submit_btn = gr.Button("Submit") |
|
output_video = gr.Video() |
|
submit_btn.click( |
|
fn = infer, |
|
inputs = [ref_image_in, ref_video_in], |
|
outputs = [output_video] |
|
) |
|
|
|
demo.launch() |