|
import os |
|
import random |
|
import glob |
|
import shutil |
|
import tempfile |
|
from concurrent.futures import ThreadPoolExecutor |
|
from moviepy.editor import ( |
|
AudioFileClip, |
|
CompositeVideoClip, |
|
ImageClip, |
|
VideoFileClip, |
|
concatenate_videoclips, |
|
vfx, |
|
) |
|
from moviepy.video.tools.subtitles import SubtitlesClip |
|
import tqdm |
|
|
|
from sentence_transformers import SentenceTransformer, util |
|
|
|
|
|
model = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
|
|
NUM_IMAGES = 30 |
|
|
|
def add_transitions(clips, transition_duration=1): |
|
""" |
|
Thêm hiệu ứng chuyển cảnh giữa các clip. |
|
""" |
|
final_clips = [] |
|
for i, clip in enumerate(clips): |
|
start_time = i * (clip.duration - transition_duration) |
|
end_time = start_time + clip.duration |
|
|
|
if i > 0: |
|
|
|
fade_in = clip.fx(vfx.fadeout, duration=transition_duration) |
|
fade_in = fade_in.set_start(start_time) |
|
final_clips.append(fade_in) |
|
|
|
if i < len(clips) - 1: |
|
|
|
fade_out = clip.fx(vfx.fadein, duration=transition_duration) |
|
fade_out = fade_out.set_end(end_time) |
|
final_clips.append(fade_out) |
|
|
|
|
|
final_clips.append(clip.set_start(start_time).set_end(end_time)) |
|
|
|
return CompositeVideoClip(final_clips) |
|
|
|
def create_video(sentences, audio_files, video_files, output_path="output_video.mp4"): |
|
""" |
|
Tạo video từ các câu, file âm thanh và file video. |
|
""" |
|
clips = [] |
|
for sentence, audio_path, video_path in tqdm.tqdm(zip(sentences, audio_files, video_files), desc="Tạo video"): |
|
audio = AudioFileClip(audio_path) |
|
video = VideoFileClip(video_path).set_duration(audio.duration) |
|
video = video.set_audio(audio) |
|
clips.append(video) |
|
|
|
final_video = concatenate_videoclips(clips, method="compose") |
|
final_video.write_videofile(output_path, fps=24) |
|
print(f"Đã tạo video: {output_path}") |
|
return output_path |
|
|
|
def process_images_parallel(image_patch, clip_duration): |
|
""" |
|
Xử lý song song các hình ảnh. |
|
""" |
|
with ThreadPoolExecutor() as executor: |
|
futures = [] |
|
for content, image_path in image_patch: |
|
if image_path: |
|
future = executor.submit(ImageClip, image_path) |
|
futures.append((future, clip_duration)) |
|
|
|
clips = [] |
|
for future, duration in futures: |
|
clip = future.result().set_duration(duration) |
|
clips.append(clip) |
|
|
|
return clips |
|
|
|
|
|
def extract_key_contents(script: str) -> list[str]: |
|
""" |
|
Hàm này dùng để trích xuất các ý chính từ một đoạn script. |
|
|
|
Tham số: |
|
- script (str): Đoạn văn bản cần xử lý để trích xuất các ý chính. |
|
|
|
Trả về: |
|
- list[str]: Danh sách các câu được tách ra từ đoạn script. |
|
|
|
Logic xử lý: |
|
- Đầu tiên, đoạn script được tách thành các câu dựa trên dấu chấm ('.'). |
|
- Mỗi câu được xem như một ý chính và được thêm vào danh sách kết quả. |
|
""" |
|
|
|
if not script: |
|
return [] |
|
|
|
|
|
sentences = script.split('.') |
|
|
|
|
|
sentences = [sentence.strip() for sentence in sentences if sentence.strip()] |
|
|
|
|
|
return sentences |
|
|
|
|
|
def process_script_for_video(script, dataset_path, use_dataset): |
|
""" |
|
Xử lý script để tạo video. |
|
""" |
|
sentences = extract_key_contents(script) |
|
return sentences |
|
|
|
def create_video_func(script, audio_path, dataset_path, use_dataset): |
|
""" |
|
Hàm chính để tạo video. |
|
""" |
|
try: |
|
sentences = process_script_for_video(script, dataset_path, use_dataset) |
|
|
|
|
|
temp_dir = tempfile.mkdtemp() |
|
|
|
|
|
audio_clips = split_audio(audio_path, len(sentences), temp_dir) |
|
|
|
|
|
video_files = glob.glob(os.path.join(dataset_path, "*.mp4")) if use_dataset else [] |
|
|
|
|
|
min_length = min(len(sentences), len(audio_clips), len(video_files)) |
|
sentences = sentences[:min_length] |
|
audio_clips = audio_clips[:min_length] |
|
video_files = video_files[:min_length] |
|
|
|
output_path = "output_video.mp4" |
|
create_video(sentences, audio_clips, video_files, output_path) |
|
|
|
return output_path |
|
except Exception as e: |
|
print(f"Lỗi khi tạo video: {e}") |
|
return None |
|
finally: |
|
|
|
shutil.rmtree(temp_dir) |
|
|
|
def split_audio(audio_path, num_segments, output_dir): |
|
""" |
|
Chia file âm thanh thành các đoạn nhỏ. |
|
""" |
|
audio = AudioFileClip(audio_path) |
|
duration = audio.duration |
|
segment_duration = duration / num_segments |
|
|
|
audio_clips = [] |
|
for i in range(num_segments): |
|
start = i * segment_duration |
|
end = (i + 1) * segment_duration |
|
segment = audio.subclip(start, end) |
|
output_path = os.path.join(output_dir, f"segment_{i}.mp3") |
|
segment.write_audiofile(output_path) |
|
audio_clips.append(output_path) |
|
|
|
return audio_clips |
|
|
|
def find_matching_image(prompt, dataset_path, threshold=0.5): |
|
""" |
|
Tìm kiếm hình ảnh phù hợp với prompt trong dataset. |
|
""" |
|
prompt_embedding = model.encode(prompt, convert_to_tensor=True) |
|
best_match = None |
|
best_score = -1 |
|
|
|
for filename in os.listdir(dataset_path): |
|
if filename.lower().endswith(('.png', '.jpg', '.jpeg')): |
|
image_path = os.path.join(dataset_path, filename) |
|
image_name = os.path.splitext(filename)[0].replace('_', ' ') |
|
image_embedding = model.encode(image_name, convert_to_tensor=True) |
|
cosine_score = util.pytorch_cos_sim(prompt_embedding, image_embedding).item() |
|
if cosine_score > best_score and cosine_score >= threshold: |
|
best_score = cosine_score |
|
best_match = image_path |
|
return best_match |