import os import time import threading from pathlib import Path from typing import Optional, List, Dict, Any, Callable, Union from enum import Enum from dataclasses import dataclass, field from tqdm import tqdm from moviepy.editor import VideoFileClip, concatenate_videoclips import os import json from modules.media_process.media_processor import media_processor from utils.tools import ( read_json_file, string_to_json, save_json_file, convert_webp_to_png ) from utils.upload import upload_file_to_tos from utils.logger_config import setup_logger logger = setup_logger(__name__) class VideoComposer: """视频合成器""" def compose(self, script_path: str) -> str: """ 将视频片段合成为完整视频 Args: script_path: 脚本文件路径 Returns: 最终视频文件名 Raises: FileNotFoundError: 当脚本文件不存在时 ValueError: 当脚本数据无效或视频片段不存在时 """ if not os.path.exists(script_path): raise FileNotFoundError(f"脚本文件不存在: {script_path}") # 读取脚本 video_script_data = read_json_file(script_path) if not video_script_data: raise ValueError("无法读取脚本文件或文件为空") storyboards = video_script_data[0].get("storyboards", []) if not storyboards: raise ValueError("脚本中未找到分镜详情") lens_details = [] for storyboard in storyboards: clip_path = storyboard.get("storyboard") lens_details.append(clip_path) lens_details = [item for sublist in lens_details for item in sublist] # 收集所有视频片段路径 clips = [] for lens_item in lens_details: clip_path = lens_item.get("clip_path") if not clip_path: logger.warning(f"分镜 {lens_item.get('lens_id')} 缺少视频片段路径") continue if not os.path.exists(clip_path): logger.warning(f"视频片段不存在: {clip_path}") continue clips.append(clip_path) if not clips: raise ValueError("未找到有效的视频片段") logger.info(f"开始拼接 {len(clips)} 个视频片段") logger.debug(f"视频片段列表: {clips}") # 生成最终视频文件名 film_path = f"./output/final_video.mp4" try: media_processor.concat_videos(clips, film_path) logger.info(f"视频合成完成: {film_path}") return film_path except Exception as e: logger.error(f"视频合成失败: {e}") raise video_composer = VideoComposer() def concat_videos(json_path, output_path): """ 拼接多个视频文件为一个视频。 :param video_paths: 视频文件路径列表,例如 ['1.mp4', '2.mp4'] :param output_path: 输出视频文件路径,例如 'output.mp4' """ with open(json_path, "r", encoding='utf-8') as f: final_storyboards = json.load(f)["storyboards"] segments = [] for storyboard in final_storyboards: storyboard_path = storyboard["storyboard"] for item in storyboard_path: clip_path = item["clip_path"] segments.append(clip_path) if not segments: raise ValueError("视频路径列表不能为空") clips = [] for path in segments: if not os.path.isfile(path): raise FileNotFoundError(f"视频文件不存在: {path}") clips.append(VideoFileClip(path)) # 拼接所有视频片段 final_clip = concatenate_videoclips(clips, method="compose") # 使用 compose 可处理不同尺寸 # 写入输出文件 final_clip.write_videofile( output_path, codec='libx264', audio_codec='aac', temp_audiofile='temp-audio.m4a', remove_temp=True ) # 关闭所有 clip 以释放资源 for clip in clips: clip.close() final_clip.close() # 示例用法 if __name__ == "__main__": # 生成视频片段 with open("./output/storyboards_with_segments.json", "r", encoding='utf-8') as f: final_storyboards = json.load(f)[0]["storyboards"] segments = [] for storyboard in final_storyboards: storyboard_path = storyboard["storyboard"] for item in storyboard_path: clip_path = item["clip_path"] segments.append(clip_path) concat_videos("./output/storyboards_with_segments.json", "output_combined_4.mp4")