""" refer_video_create 任务流 基于参考视频创建视频的完整任务流 包括: 1. 从参考视频创建脚本 2. 优化脚本提示词(生图提示词和生视频提示词) 3. 基于image_prompt生成分镜 4. 基于video_prompt和分镜生成视频 5. 拼接所有视频片段 """ import os import asyncio from pathlib import Path from typing import Dict, Optional from taskflow import TaskManager, FileIOHandler from ..mcps.script_create import create_script_refer_video from ..mcps.script_optimate import optimate_script from ..mcps.script_check import check_image_script, check_video_script from api_modules.ark_client_async import AsyncArkClient from api_modules.ark_image_client_async import AsyncArkImageClient from api_modules.ark_video_client_async import AsyncArkVideoClient from examples.video_create.mcps.concat_clip import concat_videos from examples.video_create.utils.tools import string_to_json, download_image, upload_file_to_tos from taskflow import get_logger logger = get_logger("examples.refer_video_create.pipeline.refer_video_create_pipeline") class ReferVideoCreatePipeline: """基于参考视频的视频创作任务流""" def __init__(self, io_handler: FileIOHandler, output_dir: str, manager: TaskManager): """ 初始化视频创作任务流 Args: io_handler: 文件I/O处理器 output_dir: 输出目录 manager: 任务管理器 """ self.io_handler = io_handler self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) self.manager = manager async def step1_create_script(self, video_url: str, user_prompt: Optional[str] = None) -> Dict: """步骤1:从参考视频创建初始脚本""" if user_prompt is None: user_prompt = "请开始执行你的任务" async with AsyncArkClient() as client: script_text = await create_script_refer_video( client=client, video_url=video_url, user_prompt=user_prompt ) # 解析JSON字符串 script = self.io_handler.string_to_json(script_text) output_file = str(self.output_dir / "step1_script.json") await self.io_handler.write_json_async(script, output_file) return { "output_file": output_file, "data": script } async def step2_optimize_prompts(self) -> Dict: """步骤2:优化脚本提示词(并行优化生图提示词和生视频提示词)""" previous_output = self.manager.load_step_output("step1") if previous_output is None: raise ValueError("步骤1未完成,无法优化提示词") script = previous_output["data"] async def optimize_single_lens(client: AsyncArkClient, lens: Dict) -> None: """优化单个镜头的提示词""" lens_id = lens.get("lens_id") lens_params = lens.get("lens_params", "") core_vision = lens.get("core_vision", "") # 构建优化提示词:lens_params + core_vision prompt_text = f"{lens_params} {core_vision}".strip() # 并行优化生图提示词和生视频提示词 async def optimize_image_prompt(): optimized = await optimate_script( client=client, user_prompt=prompt_text, prompt_type="image" ) lens["image_prompt"] = optimized.strip() logger.info(f"镜头 {lens_id} 的生图提示词优化完成") async def optimize_video_prompt(): optimized = await optimate_script( client=client, user_prompt=prompt_text, prompt_type="video" ) lens["video_prompt"] = optimized.strip() logger.info(f"镜头 {lens_id} 的生视频提示词优化完成") # 并行执行两个优化任务 await asyncio.gather(optimize_image_prompt(), optimize_video_prompt()) async with AsyncArkClient() as client: # 并行处理所有镜头 tasks = [ optimize_single_lens(client, lens) for lens in script["lens_details"] ] await asyncio.gather(*tasks) output_file = str(self.output_dir / "step2_optimized_script.json") await self.io_handler.write_json_async(script, output_file) return { "output_file": output_file, "data": script } async def step3_generate_storyboard( self, size: Optional[str] = "1440x2560", refer_image: Optional[list[str]] = None ) -> Dict: """ 步骤3:基于image_prompt和用户指定的参考图片生成分镜图片 Args: size: 生成图片的尺寸,默认为 "1440x2560" refer_image: 参考图片路径(可选),所有分镜都会参考这张图片生成 如果为None,则不使用参考图片 """ previous_output = self.manager.load_step_output("step2") if previous_output is None: raise ValueError("步骤2未完成,无法生成分镜") script = previous_output["data"] # 确保storyboard目录存在 storyboard_dir = self.output_dir / "storyboard" storyboard_dir.mkdir(parents=True, exist_ok=True) # 准备参考图片列表(如果提供了参考图片) reference_images = None if refer_image is not None and isinstance(refer_image, str): # 确保是列表格式 reference_images = [refer_image] logger.info(f"所有分镜将参考图片: {refer_image}") elif refer_image is not None and isinstance(refer_image, list): reference_images = refer_image logger.info(f"所有分镜将参考图片: {refer_image}") else: logger.info("不使用参考图片") async def generate_single_storyboard( image_client: AsyncArkImageClient, ark_client: AsyncArkClient, lens: Dict ) -> None: """生成单个镜头的分镜图片(带审查和重试机制)""" lens_id = lens.get("lens_id") image_prompt = lens.get("image_prompt") if not image_prompt: raise ValueError(f"镜头 {lens_id} 缺少 image_prompt 字段") image_save_path = str(storyboard_dir / f"lens_{lens_id}_{self.output_dir.name}.png") # 如果文件已存在,跳过生成 if os.path.exists(image_save_path): logger.info(f"分镜图片已存在,跳过生成: lens {lens_id}") lens["storyboard_url"] = image_save_path return # 最大重试次数 max_retries = 5 attempt_count = 0 review_passed = False temp_image_path = str(storyboard_dir / f"lens_{lens_id}_{self.output_dir.name}_temp.png") last_image_url = None while attempt_count <= max_retries and not review_passed: try: attempt_count += 1 logger.info(f"镜头 {lens_id} 开始第 {attempt_count} 次生成...") # 生成分镜图片(使用参考图片) response = await image_client.generate_image( prompt=image_prompt, size=size, reference_image=reference_images ) image_url = image_client.get_image_url(response) if not image_url: raise ValueError(f"镜头 {lens_id} 生成分镜图片失败,未获取到图片URL") last_image_url = image_url # 下载图片到临时路径(用于审查) await asyncio.to_thread(download_image, image_url, temp_image_path) # 上传图片到TOS获取URL(用于审查) image_url_for_check = await asyncio.to_thread(upload_file_to_tos, temp_image_path) logger.info(f"镜头 {lens_id} 第 {attempt_count} 次生成完成,图片已上传: {image_url_for_check}") # 调用审查函数 check_result_text = await check_image_script( client=ark_client, image_prompt=image_prompt, image_url=image_url_for_check ) # 解析审查结果 check_result = self.io_handler.string_to_json(check_result_text) review_result = check_result.get("review_result", False) result_reason = check_result.get("result_reason", "") if review_result: # 审查不通过(review_result为true表示有问题,需要重新生成) if attempt_count > max_retries: # 超过最大重试次数,使用最后一次生成的图片 logger.error( f"镜头 {lens_id} 已达到最大重试次数 ({max_retries})," f"审查结果: {result_reason},使用最后一次生成的图片" ) # 将临时文件重命名为最终文件 if os.path.exists(temp_image_path): os.rename(temp_image_path, image_save_path) else: # 如果临时文件不存在,重新下载 await asyncio.to_thread(download_image, last_image_url, image_save_path) review_passed = True else: # 继续重试 logger.warning( f"镜头 {lens_id} 第 {attempt_count} 次生成审查不通过: {result_reason}," f"将重新生成(剩余重试次数: {max_retries - attempt_count})" ) # 保留临时文件,下次生成时会覆盖 else: # 审查通过(review_result为false表示通过) review_passed = True logger.info( f"镜头 {lens_id} 第 {attempt_count} 次生成审查通过: {result_reason}" ) # 将临时文件重命名为最终文件 if os.path.exists(temp_image_path): os.rename(temp_image_path, image_save_path) else: # 如果临时文件不存在,重新下载 await asyncio.to_thread(download_image, last_image_url, image_save_path) except Exception as e: logger.error(f"镜头 {lens_id} 第 {attempt_count} 次生成出错: {e}") if attempt_count > max_retries: logger.error(f"镜头 {lens_id} 已达到最大重试次数,停止重试") # 如果还有临时文件,尝试使用它 if os.path.exists(temp_image_path): os.rename(temp_image_path, image_save_path) raise # 继续重试 continue lens["storyboard_url"] = image_save_path logger.info(f"镜头 {lens_id} 分镜图片最终生成完成: {image_save_path}(共尝试 {attempt_count} 次)") async with AsyncArkImageClient() as image_client, AsyncArkClient() as ark_client: # 并行处理所有镜头 tasks = [ generate_single_storyboard(image_client, ark_client, lens) for lens in script["lens_details"] ] await asyncio.gather(*tasks, return_exceptions=True) output_file = str(self.output_dir / "step3_storyboard.json") await self.io_handler.write_json_async(script, output_file) return { "output_file": output_file, "data": script } async def step4_generate_video_clips(self) -> Dict: """步骤4:基于video_prompt和分镜图片生成视频片段""" previous_output = self.manager.load_step_output("step3") if previous_output is None: raise ValueError("步骤3未完成,无法生成视频片段") script = previous_output["data"] # 确保video_clips目录存在 video_clip_dir = self.output_dir / "video_clips" video_clip_dir.mkdir(parents=True, exist_ok=True) async def process_single_lens(video_client: AsyncArkVideoClient, lens: Dict) -> Optional[asyncio.Task]: """处理单个镜头的视频生成,返回后台任务""" lens_id = lens.get("lens_id") video_prompt = lens.get("video_prompt") storyboard_url = lens.get("storyboard_url") lens_duration = lens.get("lens_duration", 4) if not video_prompt: raise ValueError(f"镜头 {lens_id} 缺少 video_prompt 字段") if not storyboard_url: raise ValueError(f"镜头 {lens_id} 缺少 storyboard_url 字段") video_save_path = str(video_clip_dir / f"lens_{lens_id}_{self.output_dir.name}.mp4") # 如果文件已存在,跳过生成 if os.path.exists(video_save_path): logger.info(f"视频片段已存在,跳过生成: lens {lens_id}") lens["clip_url"] = video_save_path return None # 如果storyboard_url是本地路径,需要上传到TOS获取URL image_url = storyboard_url if not storyboard_url.startswith(("http://", "https://")): # 上传到TOS获取URL from examples.video_create.utils.tools import upload_file_to_tos image_url = await asyncio.to_thread(upload_file_to_tos, storyboard_url) logger.info(f"镜头 {lens_id} 分镜图片已上传到TOS: {image_url}") # 构建生成参数字符串 gen_params = f" --dur {lens_duration}" # 创建视频生成任务 task_id, background_task = await video_client.create_video_task_async( prompt=video_prompt, image_url=image_url, gen_params=gen_params, output_path=video_save_path ) if background_task is not None: lens["clip_url"] = video_save_path logger.info(f"已提交视频生成任务,lens {lens_id}, task_id: {task_id}") return background_task else: logger.error(f"视频生成任务提交失败,lens {lens_id}") return None async with AsyncArkVideoClient() as video_client: # 并行提交所有视频生成任务,收集后台任务 lens_tasks = [ process_single_lens(video_client, lens) for lens in script["lens_details"] ] lens_results = await asyncio.gather(*lens_tasks, return_exceptions=True) # 展平所有后台任务 all_background_tasks = [] for task in lens_results: if isinstance(task, Exception): logger.error(f"处理镜头时出错: {task}") elif task is not None: all_background_tasks.append(task) # 等待所有视频生成和下载完成 if all_background_tasks: logger.info(f"等待 {len(all_background_tasks)} 个视频生成任务完成...") await asyncio.gather(*all_background_tasks, return_exceptions=True) logger.info("所有视频生成任务已完成!") else: logger.warning("没有提交任何视频生成任务") output_file = str(self.output_dir / "step4_video_clips.json") await self.io_handler.write_json_async(script, output_file) return { "output_file": output_file, "data": script } async def step5_concat_clips(self) -> Dict: """步骤5:拼接所有视频片段""" previous_output = self.manager.load_step_output("step4") if previous_output is None: raise ValueError("步骤4未完成,无法进行视频拼接") script = previous_output["data"] # 确保video_save目录存在 video_save_dir = self.output_dir / "video_save" video_save_dir.mkdir(parents=True, exist_ok=True) # 收集所有视频片段路径(按lens_id排序) clips_path = [] for lens in sorted(script["lens_details"], key=lambda x: x.get("lens_id", 0)): clip_url = lens.get("clip_url") if clip_url and os.path.exists(clip_url): clips_path.append(clip_url) else: logger.warning(f"镜头 {lens.get('lens_id')} 的视频片段不存在,跳过") if not clips_path: raise ValueError("没有可用的视频片段进行拼接") output_file = str(video_save_dir / "final_video.mp4") # 拼接视频(使用线程池执行同步操作) await asyncio.to_thread(concat_videos, clips_path, output_file) logger.info(f"视频拼接完成: {output_file}") return { "output_file": output_file, "data": "final video" }