|
import glob |
|
import os |
|
import random |
|
import requests |
|
from loguru import logger |
|
from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip |
|
|
|
def check_file_exists(file_path): |
|
if not os.path.exists(file_path): |
|
raise FileNotFoundError(f"Tệp {file_path} không tồn tại.") |
|
|
|
|
|
bgm_file = "data/bg_music.mp3" |
|
output_path = "output/final_video.mp4" |
|
|
|
check_file_exists(bgm_file) |
|
|
|
def get_pexels_video(query): |
|
api_key = os.getenv('Pexels_API_KEY') |
|
if not api_key: |
|
raise ValueError("API key không được tìm thấy trong biến môi trường.") |
|
|
|
url = f"https://api.pexels.com/videos/search?query={query}&per_page=30" |
|
headers = {"Authorization": api_key} |
|
response = requests.get(url, headers=headers) |
|
if response.status_code == 200: |
|
data = response.json() |
|
if data['videos']: |
|
return random.choice(data['videos'])['video_files'][0]['link'] |
|
logger.error("Không tìm thấy video nào.") |
|
return None |
|
|
|
def download_video(url, save_path="downloaded_video.mp4"): |
|
try: |
|
response = requests.get(url, stream=True) |
|
response.raise_for_status() |
|
with open(save_path, 'wb') as f: |
|
for chunk in response.iter_content(chunk_size=1024): |
|
f.write(chunk) |
|
return save_path |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Failed to download video from {url}: {e}") |
|
return None |
|
|
|
def get_bgm_file(bgm_type: str = "random", bgm_file: str = ""): |
|
if not bgm_type: |
|
return "" |
|
if bgm_type == "random": |
|
suffix = "*.mp3" |
|
song_dir = "/data/bg-music" |
|
files = glob.glob(os.path.join(song_dir, suffix)) |
|
return random.choice(files) if files else "" |
|
|
|
if os.path.exists(bgm_file): |
|
return bgm_file |
|
|
|
return "" |
|
|
|
def add_background_music(video_path, bgm_file, output_path="video_with_bgm.mp4"): |
|
check_file_exists(video_path) |
|
check_file_exists(bgm_file) |
|
|
|
video = VideoFileClip(video_path) |
|
bgm = AudioFileClip(bgm_file).subclip(0, video.duration) |
|
final_audio = CompositeAudioClip([video.audio, bgm]) |
|
final_video = video.set_audio(final_audio) |
|
final_video.write_videofile(output_path, audio_codec="aac") |
|
return output_path |
|
|
|
def combine_videos(combined_video_path: str, |
|
video_paths: list[str], |
|
audio_file: str, |
|
max_clip_duration: int = 5, |
|
threads: int = 2, |
|
) -> str: |
|
check_file_exists(audio_file) |
|
|
|
audio_clip = AudioFileClip(audio_file) |
|
audio_duration = audio_clip.duration |
|
logger.info(f"Max duration of audio: {audio_duration} seconds") |
|
|
|
clips = [] |
|
video_duration = 0 |
|
|
|
while video_duration < audio_duration: |
|
random.shuffle(video_paths) |
|
|
|
for video_path in video_paths: |
|
check_file_exists(video_path) |
|
clip = VideoFileClip(video_path).without_audio() |
|
if (audio_duration - video_duration) < clip.duration: |
|
clip = clip.subclip(0, (audio_duration - video_duration)) |
|
elif max_clip_duration < clip.duration: |
|
clip = clip.subclip(0, max_clip_duration) |
|
clip = clip.set_fps(30) |
|
clips.append(clip) |
|
video_duration += clip.duration |
|
|
|
final_clip = concatenate_videoclips(clips) |
|
final_clip = final_clip.set_fps(30) |
|
logger.info(f"Writing combined video to {combined_video_path}") |
|
final_clip.write_videofile(combined_video_path, threads=threads) |
|
logger.success(f"Completed combining videos") |
|
return combined_video_path |
|
|
|
def generate_video(video_paths: list[str], |
|
audio_path: str, |
|
output_file: str, |
|
) -> str: |
|
logger.info(f"Start generating video") |
|
combined_video_path = "temp_combined_video.mp4" |
|
|
|
combine_videos(combined_video_path, video_paths, audio_path) |
|
|
|
final_video = VideoFileClip(combined_video_path) |
|
audio_clip = AudioFileClip(audio_path) |
|
final_video = final_video.set_audio(audio_clip) |
|
|
|
logger.info(f"Writing final video to {output_file}") |
|
final_video.write_videofile(output_file, audio_codec="aac") |
|
|
|
os.remove(combined_video_path) |
|
logger.success(f"Completed generating video: {output_file}") |
|
return output_file |