import glob import os import random import requests from loguru import logger from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip def check_file_exists(file_path): if not os.path.exists(file_path): raise FileNotFoundError(f"Tệp {file_path} không tồn tại.") # Đường dẫn đầy đủ tới các file bgm_file = "data/bg_music.mp3" output_path = "output/final_video.mp4" check_file_exists(bgm_file) def get_pexels_video(query): api_key = os.getenv('Pexels_API_KEY') if not api_key: raise ValueError("API key không được tìm thấy trong biến môi trường.") url = f"https://api.pexels.com/videos/search?query={query}&per_page=30" headers = {"Authorization": api_key} response = requests.get(url, headers=headers) if response.status_code == 200: data = response.json() if data['videos']: return random.choice(data['videos'])['video_files'][0]['link'] logger.error("Không tìm thấy video nào.") return None def download_video(url, save_path="downloaded_video.mp4"): try: response = requests.get(url, stream=True) response.raise_for_status() with open(save_path, 'wb') as f: for chunk in response.iter_content(chunk_size=1024): f.write(chunk) return save_path except requests.exceptions.RequestException as e: logger.error(f"Failed to download video from {url}: {e}") return None def get_bgm_file(bgm_type: str = "random", bgm_file: str = ""): if not bgm_type: return "" if bgm_type == "random": suffix = "*.mp3" song_dir = "/data/bg-music" files = glob.glob(os.path.join(song_dir, suffix)) return random.choice(files) if files else "" if os.path.exists(bgm_file): return bgm_file return "" def add_background_music(video_path, bgm_file, output_path="video_with_bgm.mp4"): check_file_exists(video_path) check_file_exists(bgm_file) video = VideoFileClip(video_path) bgm = AudioFileClip(bgm_file).subclip(0, video.duration) final_audio = CompositeAudioClip([video.audio, bgm]) final_video = video.set_audio(final_audio) final_video.write_videofile(output_path, audio_codec="aac") return output_path def combine_videos(combined_video_path: str, video_paths: list[str], audio_file: str, max_clip_duration: int = 5, threads: int = 2, ) -> str: check_file_exists(audio_file) audio_clip = AudioFileClip(audio_file) audio_duration = audio_clip.duration logger.info(f"Max duration of audio: {audio_duration} seconds") clips = [] video_duration = 0 while video_duration < audio_duration: random.shuffle(video_paths) for video_path in video_paths: check_file_exists(video_path) clip = VideoFileClip(video_path).without_audio() if (audio_duration - video_duration) < clip.duration: clip = clip.subclip(0, (audio_duration - video_duration)) elif max_clip_duration < clip.duration: clip = clip.subclip(0, max_clip_duration) clip = clip.set_fps(30) clips.append(clip) video_duration += clip.duration final_clip = concatenate_videoclips(clips) final_clip = final_clip.set_fps(30) logger.info(f"Writing combined video to {combined_video_path}") final_clip.write_videofile(combined_video_path, threads=threads) logger.success(f"Completed combining videos") return combined_video_path def generate_video(video_paths: list[str], audio_path: str, output_file: str, ) -> str: logger.info(f"Start generating video") combined_video_path = "temp_combined_video.mp4" combine_videos(combined_video_path, video_paths, audio_path) final_video = VideoFileClip(combined_video_path) audio_clip = AudioFileClip(audio_path) final_video = final_video.set_audio(audio_clip) logger.info(f"Writing final video to {output_file}") final_video.write_videofile(output_file, audio_codec="aac") os.remove(combined_video_path) logger.success(f"Completed generating video: {output_file}") return output_file