import os import time import threading from pathlib import Path from typing import Optional, List, Dict, Any, Callable, Union from tqdm import tqdm from dataclasses import dataclass, field from modules.media_understanding.media_captioner import media_captioner from modules.media_process.media_processor import media_processor from modules.media_generate.media_generator import ( video_create, handle_video_result ) from utils.tools import ( read_json_file, save_json_file, ) from utils.logger_config import setup_logger logger = setup_logger(__name__) @dataclass class VideoGenerationConfig: """视频生成配置类""" # 路径配置 output_base_dir: str = "./output" # 视频生成参数 video_resolution: str = "1080p" video_ratio: str = "16:9" watermark_enabled: bool = False crop_frame: bool = False # 超时配置 video_generation_timeout: int = 3600 # 秒 polling_interval: int = 5 # 秒 # 脚本生成参数 script_prompt_type: str = "script" script_scenario: str = "video" prompt_optimization_prefix: str = "待打磨优化的提示词:" class VideoClipGenerator: """视频片段生成器""" def __init__(self, config: VideoGenerationConfig): self.config = config self._completed_tasks = 0 self._lock = threading.Lock() def generate(self, video_script_data: dict) -> bool: """ 生成所有视频片段 Args: script_path: 脚本文件路径 Returns: 是否全部成功完成 Raises: FileNotFoundError: 当脚本文件不存在时 ValueError: 当脚本数据无效时 """ # if not os.path.exists(script_path): # raise FileNotFoundError(f"脚本文件不存在: {script_path}") # # 读取脚本 # video_script_data = read_json_file(script_path) # if not video_script_data: # raise ValueError("无法读取脚本文件或文件为空") lens_details = [] storyboards = video_script_data.get("storyboards", []) for storyboard in storyboards: item_info = storyboard.get("storyboard", []) lens_details.append(item_info) lens_details = [item for sublist in lens_details for item in sublist] if not storyboards: raise ValueError("脚本中未找到分镜详情") total_tasks = len(lens_details) self._completed_tasks = 0 logger.info(f"开始生成 {total_tasks} 个视频片段") # 创建所有视频生成任务 for idx, lens_item in enumerate(tqdm(lens_details, desc="提交视频任务")): self._create_video_task(lens_item, idx, total_tasks) # 保存脚本(包含任务ID) # save_json_file(video_script_data, script_path) # 等待所有任务完成 return video_script_data, self._wait_for_completion(total_tasks) def _create_video_task( self, lens_item: Dict[str, Any], task_index: int, total_tasks: int ) -> None: """ 创建单个视频生成任务 Args: lens_item: 分镜详情字典 script_path: 脚本文件路径 task_index: 任务索引 total_tasks: 总任务数 """ lens_id = lens_item.get("idx") motion_prompt = lens_item.get("motion_desc") image_url = lens_item.get("ff_path") lens_duration = 4 if not all([motion_prompt, image_url, lens_duration]): logger.warning(f"分镜 {lens_id} 缺少必要信息,跳过") return try: # 构建生成参数 gen_params = self._build_gen_params(lens_duration) video_filename = os.path.basename(lens_item["ff_path"]).replace(".png", ".mp4") logger.info(f"正在生成视频片段 {lens_id}: {video_filename}") # 创建完成事件 completion_event = threading.Event() # 包装回调函数 wrapped_callback = self._create_callback_wrapper( handle_video_result, completion_event, task_index, total_tasks ) # 提交异步任务 task_id = video_create.create_video_task_async( prompt=motion_prompt, image_url=image_url, gen_params=gen_params, filename=video_filename, callback=wrapped_callback ) if task_id: lens_item["clip_path"] = f"./output/{video_filename}" lens_item["task_id"] = task_id logger.info(f"视频任务 {task_index + 1}/{total_tasks} 已提交: {task_id}") else: logger.error(f"视频任务 {task_index + 1}/{total_tasks} 提交失败") except Exception as e: logger.error(f"创建视频任务时出错: {e}") def _build_gen_params(self, duration: float) -> str: """ 构建视频生成参数字符串 Args: duration: 视频时长 Returns: 参数字符串 """ return ( f"--rs {self.config.video_resolution} " f"--rt {self.config.video_ratio} " f"--dur {duration} " f"--wm {'true' if self.config.watermark_enabled else 'false'} " f"--cf {'true' if self.config.crop_frame else 'false'}" ) def _create_callback_wrapper( self, original_callback: Callable, event: threading.Event, task_index: int, total_tasks: int ) -> Callable: """ 创建回调函数包装器 Args: original_callback: 原始回调函数 event: 完成事件 task_index: 任务索引 total_tasks: 总任务数 Returns: 包装后的回调函数 """ def wrapper(*args, **kwargs): try: # 调用原始回调 if original_callback: original_callback(*args, **kwargs) except Exception as e: logger.error(f"回调函数执行出错: {e}") finally: # 标记任务完成 event.set() with self._lock: self._completed_tasks += 1 logger.info(f"视频任务 {task_index + 1}/{total_tasks} 完成 " f"({self._completed_tasks}/{total_tasks})") return wrapper def _wait_for_completion(self, total_tasks: int) -> bool: """ 等待所有任务完成 Args: total_tasks: 总任务数 Returns: 是否全部成功完成 """ logger.info(f"等待 {total_tasks} 个视频任务完成...") start_time = time.time() timeout = self.config.video_generation_timeout while self._completed_tasks < total_tasks: elapsed = time.time() - start_time if elapsed > timeout: logger.error(f"视频生成超时({timeout}秒)," f"已完成 {self._completed_tasks}/{total_tasks} 个任务") return False remaining = total_tasks - self._completed_tasks if remaining > 0: logger.info(f"等待中... 剩余任务: {remaining}, " f"已耗时: {int(elapsed)}秒") time.sleep(self.config.polling_interval) logger.info("所有视频生成任务已完成") return True video_generator = VideoClipGenerator(VideoGenerationConfig())