GV-a / utils.py
TDN-M's picture
Upload 5 files
2b12a37 verified
raw
history blame
4.4 kB
import glob
import os
import random
import requests
from loguru import logger
from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip
def check_file_exists(file_path):
if not os.path.exists(file_path):
raise FileNotFoundError(f"Tệp {file_path} không tồn tại.")
# Đường dẫn đầy đủ tới các file
bgm_file = "data/bg_music.mp3"
output_path = "output/final_video.mp4"
check_file_exists(bgm_file)
def get_pexels_video(query):
api_key = os.getenv('Pexels_API_KEY')
if not api_key:
raise ValueError("API key không được tìm thấy trong biến môi trường.")
url = f"https://api.pexels.com/videos/search?query={query}&per_page=30"
headers = {"Authorization": api_key}
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
if data['videos']:
return random.choice(data['videos'])['video_files'][0]['link']
logger.error("Không tìm thấy video nào.")
return None
def download_video(url, save_path="downloaded_video.mp4"):
try:
response = requests.get(url, stream=True)
response.raise_for_status()
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=1024):
f.write(chunk)
return save_path
except requests.exceptions.RequestException as e:
logger.error(f"Failed to download video from {url}: {e}")
return None
def get_bgm_file(bgm_type: str = "random", bgm_file: str = ""):
if not bgm_type:
return ""
if bgm_type == "random":
suffix = "*.mp3"
song_dir = "/data/bg-music"
files = glob.glob(os.path.join(song_dir, suffix))
return random.choice(files) if files else ""
if os.path.exists(bgm_file):
return bgm_file
return ""
def add_background_music(video_path, bgm_file, output_path="video_with_bgm.mp4"):
check_file_exists(video_path)
check_file_exists(bgm_file)
video = VideoFileClip(video_path)
bgm = AudioFileClip(bgm_file).subclip(0, video.duration)
final_audio = CompositeAudioClip([video.audio, bgm])
final_video = video.set_audio(final_audio)
final_video.write_videofile(output_path, audio_codec="aac")
return output_path
def combine_videos(combined_video_path: str,
video_paths: list[str],
audio_file: str,
max_clip_duration: int = 5,
threads: int = 2,
) -> str:
check_file_exists(audio_file)
audio_clip = AudioFileClip(audio_file)
audio_duration = audio_clip.duration
logger.info(f"Max duration of audio: {audio_duration} seconds")
clips = []
video_duration = 0
while video_duration < audio_duration:
random.shuffle(video_paths)
for video_path in video_paths:
check_file_exists(video_path)
clip = VideoFileClip(video_path).without_audio()
if (audio_duration - video_duration) < clip.duration:
clip = clip.subclip(0, (audio_duration - video_duration))
elif max_clip_duration < clip.duration:
clip = clip.subclip(0, max_clip_duration)
clip = clip.set_fps(30)
clips.append(clip)
video_duration += clip.duration
final_clip = concatenate_videoclips(clips)
final_clip = final_clip.set_fps(30)
logger.info(f"Writing combined video to {combined_video_path}")
final_clip.write_videofile(combined_video_path, threads=threads)
logger.success(f"Completed combining videos")
return combined_video_path
def generate_video(video_paths: list[str],
audio_path: str,
output_file: str,
) -> str:
logger.info(f"Start generating video")
combined_video_path = "temp_combined_video.mp4"
combine_videos(combined_video_path, video_paths, audio_path)
final_video = VideoFileClip(combined_video_path)
audio_clip = AudioFileClip(audio_path)
final_video = final_video.set_audio(audio_clip)
logger.info(f"Writing final video to {output_file}")
final_video.write_videofile(output_file, audio_codec="aac")
os.remove(combined_video_path)
logger.success(f"Completed generating video: {output_file}")
return output_file