import os |
import random |
import glob |
import shutil |
import tempfile |
from concurrent.futures import ThreadPoolExecutor |
from moviepy.editor import ( |
AudioFileClip, |
CompositeVideoClip, |
ImageClip, |
VideoFileClip, |
concatenate_videoclips, |
vfx, |
) |
from moviepy.video.tools.subtitles import SubtitlesClip |
import tqdm |
from sentence_transformers import SentenceTransformer, util |
model = SentenceTransformer('all-MiniLM-L6-v2') |
def add_transitions(clips, transition_duration=1): |
""" |
Thêm hiệu ứng chuyển cảnh giữa các clip. |
""" |
final_clips = [] |
for i, clip in enumerate(clips): |
start_time = i * (clip.duration - transition_duration) |
end_time = start_time + clip.duration |
if i > 0: |
fade_in = clip.fx(vfx.fadeout, duration=transition_duration) |
fade_in = fade_in.set_start(start_time) |
final_clips.append(fade_in) |
if i < len(clips) - 1: |
fade_out = clip.fx(vfx.fadein, duration=transition_duration) |
fade_out = fade_out.set_end(end_time) |
final_clips.append(fade_out) |
final_clips.append(clip.set_start(start_time).set_end(end_time)) |
return CompositeVideoClip(final_clips) |
def create_video(sentences, audio_files, video_files, output_path="output_video.mp4"): |
""" |
Tạo video từ các câu, file âm thanh và file video. |
""" |
clips = [] |
for sentence, audio_path, video_path in tqdm.tqdm(zip(sentences, audio_files, video_files), desc="Tạo video"): |
audio = AudioFileClip(audio_path) |
video = VideoFileClip(video_path).set_duration(audio.duration) |
video = video.set_audio(audio) |
clips.append(video) |
final_video = concatenate_videoclips(clips, method="compose") |
final_video.write_videofile(output_path, fps=24) |
print(f"Đã tạo video: {output_path}") |
return output_path |
def process_images_parallel(image_patch, clip_duration): |
""" |
Xử lý song song các hình ảnh. |
""" |
with ThreadPoolExecutor() as executor: |
futures = [] |
for content, image_path in image_patch: |
if image_path: |
future = executor.submit(ImageClip, image_path) |
futures.append((future, clip_duration)) |
clips = [] |
for future, duration in futures: |
clip = future.result().set_duration(duration) |
clips.append(clip) |
return clips |
def extract_key_contents(script: str) -> list[str]: |
""" |
Hàm này dùng để trích xuất các ý chính từ một đoạn script. |
Tham số: |
- script (str): Đoạn văn bản cần xử lý để trích xuất các ý chính. |
Trả về: |
- list[str]: Danh sách các câu được tách ra từ đoạn script. |
Logic xử lý: |
- Đầu tiên, đoạn script được tách thành các câu dựa trên dấu chấm ('.'). |
- Mỗi câu được xem như một ý chính và được thêm vào danh sách kết quả. |
""" |
if not script: |
return [] |
sentences = script.split('.') |
sentences = [sentence.strip() for sentence in sentences if sentence.strip()] |
return sentences |
def process_script_for_video(script, dataset_path, use_dataset): |
""" |
Xử lý script để tạo video. |
""" |
sentences = extract_key_contents(script) |
return sentences |
def create_video_func(script, audio_path, dataset_path, use_dataset): |
""" |
Hàm chính để tạo video. |
""" |
try: |
sentences = process_script_for_video(script, dataset_path, use_dataset) |
temp_dir = tempfile.mkdtemp() |
audio_clips = split_audio(audio_path, len(sentences), temp_dir) |
video_files = glob.glob(os.path.join(dataset_path, "*.mp4")) if use_dataset else [] |
min_length = min(len(sentences), len(audio_clips), len(video_files)) |
sentences = sentences[:min_length] |
audio_clips = audio_clips[:min_length] |
video_files = video_files[:min_length] |
output_path = "output_video.mp4" |
create_video(sentences, audio_clips, video_files, output_path) |
return output_path |
except Exception as e: |
print(f"Lỗi khi tạo video: {e}") |
return None |
finally: |
shutil.rmtree(temp_dir) |
def split_audio(audio_path, num_segments, output_dir): |
""" |
Chia file âm thanh thành các đoạn nhỏ. |
""" |
audio = AudioFileClip(audio_path) |
duration = audio.duration |
segment_duration = duration / num_segments |
audio_clips = [] |
for i in range(num_segments): |
start = i * segment_duration |
end = (i + 1) * segment_duration |
segment = audio.subclip(start, end) |
output_path = os.path.join(output_dir, f"segment_{i}.mp3") |
segment.write_audiofile(output_path) |
audio_clips.append(output_path) |
return audio_clips |
def find_matching_image(prompt, dataset_path, threshold=0.5): |
""" |
Tìm kiếm hình ảnh phù hợp với prompt trong dataset. |
""" |
prompt_embedding = model.encode(prompt, convert_to_tensor=True) |
best_match = None |
best_score = -1 |
for filename in os.listdir(dataset_path): |
if filename.lower().endswith(('.png', '.jpg', '.jpeg')): |
image_path = os.path.join(dataset_path, filename) |
image_name = os.path.splitext(filename)[0].replace('_', ' ') |
image_embedding = model.encode(image_name, convert_to_tensor=True) |
cosine_score = util.pytorch_cos_sim(prompt_embedding, image_embedding).item() |
if cosine_score > best_score and cosine_score >= threshold: |
best_score = cosine_score |
best_match = image_path |
return best_match |