| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- import os
- import time
- import threading
- from pathlib import Path
- from typing import Optional, List, Dict, Any, Callable, Union
- from tqdm import tqdm
- from dataclasses import dataclass, field
- from modules.media_understanding.media_captioner import media_captioner
- from modules.media_process.media_processor import media_processor
- from modules.media_generate.media_generator import (
- video_create,
- handle_video_result
- )
- from utils.tools import (
- read_json_file,
- save_json_file,
- )
- from utils.logger_config import setup_logger
- logger = setup_logger(__name__)
- @dataclass
- class VideoGenerationConfig:
- """视频生成配置类"""
- # 路径配置
- output_base_dir: str = "./output"
-
- # 视频生成参数
- video_resolution: str = "1080p"
- video_ratio: str = "16:9"
- watermark_enabled: bool = False
- crop_frame: bool = False
-
- # 超时配置
- video_generation_timeout: int = 3600 # 秒
- polling_interval: int = 5 # 秒
-
- # 脚本生成参数
- script_prompt_type: str = "script"
- script_scenario: str = "video"
- prompt_optimization_prefix: str = "待打磨优化的提示词:"
- class VideoClipGenerator:
- """视频片段生成器"""
-
- def __init__(self, config: VideoGenerationConfig):
- self.config = config
- self._completed_tasks = 0
- self._lock = threading.Lock()
-
- def generate(self, video_script_data: dict) -> bool:
- """
- 生成所有视频片段
-
- Args:
- script_path: 脚本文件路径
-
- Returns:
- 是否全部成功完成
-
- Raises:
- FileNotFoundError: 当脚本文件不存在时
- ValueError: 当脚本数据无效时
- """
- # if not os.path.exists(script_path):
- # raise FileNotFoundError(f"脚本文件不存在: {script_path}")
-
- # # 读取脚本
- # video_script_data = read_json_file(script_path)
- # if not video_script_data:
- # raise ValueError("无法读取脚本文件或文件为空")
-
- lens_details = []
- storyboards = video_script_data.get("storyboards", [])
- for storyboard in storyboards:
- item_info = storyboard.get("storyboard", [])
- lens_details.append(item_info)
- lens_details = [item for sublist in lens_details for item in sublist]
- if not storyboards:
- raise ValueError("脚本中未找到分镜详情")
-
- total_tasks = len(lens_details)
- self._completed_tasks = 0
-
- logger.info(f"开始生成 {total_tasks} 个视频片段")
-
- # 创建所有视频生成任务
- for idx, lens_item in enumerate(tqdm(lens_details, desc="提交视频任务")):
- self._create_video_task(lens_item, idx, total_tasks)
-
- # 保存脚本(包含任务ID)
- # save_json_file(video_script_data, script_path)
-
- # 等待所有任务完成
- return video_script_data, self._wait_for_completion(total_tasks)
-
- def _create_video_task(
- self,
- lens_item: Dict[str, Any],
- task_index: int,
- total_tasks: int
- ) -> None:
- """
- 创建单个视频生成任务
-
- Args:
- lens_item: 分镜详情字典
- script_path: 脚本文件路径
- task_index: 任务索引
- total_tasks: 总任务数
- """
- lens_id = lens_item.get("idx")
- motion_prompt = lens_item.get("motion_desc")
- image_url = lens_item.get("ff_path")
- lens_duration = 4
- if not all([motion_prompt, image_url, lens_duration]):
- logger.warning(f"分镜 {lens_id} 缺少必要信息,跳过")
- return
-
- try:
- # 构建生成参数
- gen_params = self._build_gen_params(lens_duration)
- video_filename = os.path.basename(lens_item["ff_path"]).replace(".png", ".mp4")
- logger.info(f"正在生成视频片段 {lens_id}: {video_filename}")
- # 创建完成事件
- completion_event = threading.Event()
-
- # 包装回调函数
- wrapped_callback = self._create_callback_wrapper(
- handle_video_result,
- completion_event,
- task_index,
- total_tasks
- )
-
- # 提交异步任务
- task_id = video_create.create_video_task_async(
- prompt=motion_prompt,
- image_url=image_url,
- gen_params=gen_params,
- filename=video_filename,
- callback=wrapped_callback
- )
-
- if task_id:
- lens_item["clip_path"] = f"./output/{video_filename}"
- lens_item["task_id"] = task_id
- logger.info(f"视频任务 {task_index + 1}/{total_tasks} 已提交: {task_id}")
- else:
- logger.error(f"视频任务 {task_index + 1}/{total_tasks} 提交失败")
-
- except Exception as e:
- logger.error(f"创建视频任务时出错: {e}")
-
- def _build_gen_params(self, duration: float) -> str:
- """
- 构建视频生成参数字符串
-
- Args:
- duration: 视频时长
-
- Returns:
- 参数字符串
- """
- return (
- f"--rs {self.config.video_resolution} "
- f"--rt {self.config.video_ratio} "
- f"--dur {duration} "
- f"--wm {'true' if self.config.watermark_enabled else 'false'} "
- f"--cf {'true' if self.config.crop_frame else 'false'}"
- )
-
- def _create_callback_wrapper(
- self,
- original_callback: Callable,
- event: threading.Event,
- task_index: int,
- total_tasks: int
- ) -> Callable:
- """
- 创建回调函数包装器
-
- Args:
- original_callback: 原始回调函数
- event: 完成事件
- task_index: 任务索引
- total_tasks: 总任务数
-
- Returns:
- 包装后的回调函数
- """
- def wrapper(*args, **kwargs):
- try:
- # 调用原始回调
- if original_callback:
- original_callback(*args, **kwargs)
- except Exception as e:
- logger.error(f"回调函数执行出错: {e}")
- finally:
- # 标记任务完成
- event.set()
- with self._lock:
- self._completed_tasks += 1
- logger.info(f"视频任务 {task_index + 1}/{total_tasks} 完成 "
- f"({self._completed_tasks}/{total_tasks})")
-
- return wrapper
-
- def _wait_for_completion(self, total_tasks: int) -> bool:
- """
- 等待所有任务完成
-
- Args:
- total_tasks: 总任务数
-
- Returns:
- 是否全部成功完成
- """
- logger.info(f"等待 {total_tasks} 个视频任务完成...")
-
- start_time = time.time()
- timeout = self.config.video_generation_timeout
-
- while self._completed_tasks < total_tasks:
- elapsed = time.time() - start_time
-
- if elapsed > timeout:
- logger.error(f"视频生成超时({timeout}秒),"
- f"已完成 {self._completed_tasks}/{total_tasks} 个任务")
- return False
-
- remaining = total_tasks - self._completed_tasks
- if remaining > 0:
- logger.info(f"等待中... 剩余任务: {remaining}, "
- f"已耗时: {int(elapsed)}秒")
-
- time.sleep(self.config.polling_interval)
-
- logger.info("所有视频生成任务已完成")
- return True
- video_generator = VideoClipGenerator(VideoGenerationConfig())
|