| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413 |
- """
- refer_video_create 任务流
- 基于参考视频创建视频的完整任务流
- 包括:
- 1. 从参考视频创建脚本
- 2. 优化脚本提示词(生图提示词和生视频提示词)
- 3. 基于image_prompt生成分镜
- 4. 基于video_prompt和分镜生成视频
- 5. 拼接所有视频片段
- """
- import os
- import asyncio
- from pathlib import Path
- from typing import Dict, Optional
- from taskflow import TaskManager, FileIOHandler
- from ..mcps.script_create import create_script_refer_video
- from ..mcps.script_optimate import optimate_script
- from ..mcps.script_check import check_image_script, check_video_script
- from api_modules.ark_client_async import AsyncArkClient
- from api_modules.ark_image_client_async import AsyncArkImageClient
- from api_modules.ark_video_client_async import AsyncArkVideoClient
- from examples.video_create.mcps.concat_clip import concat_videos
- from examples.video_create.utils.tools import string_to_json, download_image, upload_file_to_tos
- from taskflow import get_logger
- logger = get_logger("examples.refer_video_create.pipeline.refer_video_create_pipeline")
- class ReferVideoCreatePipeline:
- """基于参考视频的视频创作任务流"""
- def __init__(self, io_handler: FileIOHandler, output_dir: str, manager: TaskManager):
- """
- 初始化视频创作任务流
- Args:
- io_handler: 文件I/O处理器
- output_dir: 输出目录
- manager: 任务管理器
- """
- self.io_handler = io_handler
- self.output_dir = Path(output_dir)
- self.output_dir.mkdir(parents=True, exist_ok=True)
- self.manager = manager
- async def step1_create_script(self, video_url: str, user_prompt: Optional[str] = None) -> Dict:
- """步骤1:从参考视频创建初始脚本"""
- if user_prompt is None:
- user_prompt = "请开始执行你的任务"
-
- async with AsyncArkClient() as client:
- script_text = await create_script_refer_video(
- client=client,
- video_url=video_url,
- user_prompt=user_prompt
- )
-
- # 解析JSON字符串
- script = self.io_handler.string_to_json(script_text)
-
- output_file = str(self.output_dir / "step1_script.json")
- await self.io_handler.write_json_async(script, output_file)
-
- return {
- "output_file": output_file,
- "data": script
- }
- async def step2_optimize_prompts(self) -> Dict:
- """步骤2:优化脚本提示词(并行优化生图提示词和生视频提示词)"""
- previous_output = self.manager.load_step_output("step1")
- if previous_output is None:
- raise ValueError("步骤1未完成,无法优化提示词")
-
- script = previous_output["data"]
-
- async def optimize_single_lens(client: AsyncArkClient, lens: Dict) -> None:
- """优化单个镜头的提示词"""
- lens_id = lens.get("lens_id")
- lens_params = lens.get("lens_params", "")
- core_vision = lens.get("core_vision", "")
-
- # 构建优化提示词:lens_params + core_vision
- prompt_text = f"{lens_params} {core_vision}".strip()
-
- # 并行优化生图提示词和生视频提示词
- async def optimize_image_prompt():
- optimized = await optimate_script(
- client=client,
- user_prompt=prompt_text,
- prompt_type="image"
- )
- lens["image_prompt"] = optimized.strip()
- logger.info(f"镜头 {lens_id} 的生图提示词优化完成")
-
- async def optimize_video_prompt():
- optimized = await optimate_script(
- client=client,
- user_prompt=prompt_text,
- prompt_type="video"
- )
- lens["video_prompt"] = optimized.strip()
- logger.info(f"镜头 {lens_id} 的生视频提示词优化完成")
-
- # 并行执行两个优化任务
- await asyncio.gather(optimize_image_prompt(), optimize_video_prompt())
-
- async with AsyncArkClient() as client:
- # 并行处理所有镜头
- tasks = [
- optimize_single_lens(client, lens)
- for lens in script["lens_details"]
- ]
- await asyncio.gather(*tasks)
-
- output_file = str(self.output_dir / "step2_optimized_script.json")
- await self.io_handler.write_json_async(script, output_file)
-
- return {
- "output_file": output_file,
- "data": script
- }
- async def step3_generate_storyboard(
- self,
- size: Optional[str] = "1440x2560",
- refer_image: Optional[list[str]] = None
- ) -> Dict:
- """
- 步骤3:基于image_prompt和用户指定的参考图片生成分镜图片
-
- Args:
- size: 生成图片的尺寸,默认为 "1440x2560"
- refer_image: 参考图片路径(可选),所有分镜都会参考这张图片生成
- 如果为None,则不使用参考图片
- """
- previous_output = self.manager.load_step_output("step2")
- if previous_output is None:
- raise ValueError("步骤2未完成,无法生成分镜")
-
- script = previous_output["data"]
-
- # 确保storyboard目录存在
- storyboard_dir = self.output_dir / "storyboard"
- storyboard_dir.mkdir(parents=True, exist_ok=True)
-
- # 准备参考图片列表(如果提供了参考图片)
- reference_images = None
- if refer_image is not None and isinstance(refer_image, str):
- # 确保是列表格式
- reference_images = [refer_image]
- logger.info(f"所有分镜将参考图片: {refer_image}")
- elif refer_image is not None and isinstance(refer_image, list):
- reference_images = refer_image
- logger.info(f"所有分镜将参考图片: {refer_image}")
- else:
- logger.info("不使用参考图片")
-
- async def generate_single_storyboard(
- image_client: AsyncArkImageClient,
- ark_client: AsyncArkClient,
- lens: Dict
- ) -> None:
- """生成单个镜头的分镜图片(带审查和重试机制)"""
- lens_id = lens.get("lens_id")
- image_prompt = lens.get("image_prompt")
-
- if not image_prompt:
- raise ValueError(f"镜头 {lens_id} 缺少 image_prompt 字段")
-
- image_save_path = str(storyboard_dir / f"lens_{lens_id}_{self.output_dir.name}.png")
-
- # 如果文件已存在,跳过生成
- if os.path.exists(image_save_path):
- logger.info(f"分镜图片已存在,跳过生成: lens {lens_id}")
- lens["storyboard_url"] = image_save_path
- return
-
- # 最大重试次数
- max_retries = 5
- attempt_count = 0
- review_passed = False
- temp_image_path = str(storyboard_dir / f"lens_{lens_id}_{self.output_dir.name}_temp.png")
- last_image_url = None
-
- while attempt_count <= max_retries and not review_passed:
- try:
- attempt_count += 1
- logger.info(f"镜头 {lens_id} 开始第 {attempt_count} 次生成...")
-
- # 生成分镜图片(使用参考图片)
- response = await image_client.generate_image(
- prompt=image_prompt,
- size=size,
- reference_image=reference_images
- )
-
- image_url = image_client.get_image_url(response)
- if not image_url:
- raise ValueError(f"镜头 {lens_id} 生成分镜图片失败,未获取到图片URL")
-
- last_image_url = image_url
-
- # 下载图片到临时路径(用于审查)
- await asyncio.to_thread(download_image, image_url, temp_image_path)
-
- # 上传图片到TOS获取URL(用于审查)
- image_url_for_check = await asyncio.to_thread(upload_file_to_tos, temp_image_path)
- logger.info(f"镜头 {lens_id} 第 {attempt_count} 次生成完成,图片已上传: {image_url_for_check}")
-
- # 调用审查函数
- check_result_text = await check_image_script(
- client=ark_client,
- image_prompt=image_prompt,
- image_url=image_url_for_check
- )
-
- # 解析审查结果
- check_result = self.io_handler.string_to_json(check_result_text)
- review_result = check_result.get("review_result", False)
- result_reason = check_result.get("result_reason", "")
-
- if review_result:
- # 审查不通过(review_result为true表示有问题,需要重新生成)
- if attempt_count > max_retries:
- # 超过最大重试次数,使用最后一次生成的图片
- logger.error(
- f"镜头 {lens_id} 已达到最大重试次数 ({max_retries}),"
- f"审查结果: {result_reason},使用最后一次生成的图片"
- )
- # 将临时文件重命名为最终文件
- if os.path.exists(temp_image_path):
- os.rename(temp_image_path, image_save_path)
- else:
- # 如果临时文件不存在,重新下载
- await asyncio.to_thread(download_image, last_image_url, image_save_path)
- review_passed = True
- else:
- # 继续重试
- logger.warning(
- f"镜头 {lens_id} 第 {attempt_count} 次生成审查不通过: {result_reason},"
- f"将重新生成(剩余重试次数: {max_retries - attempt_count})"
- )
- # 保留临时文件,下次生成时会覆盖
- else:
- # 审查通过(review_result为false表示通过)
- review_passed = True
- logger.info(
- f"镜头 {lens_id} 第 {attempt_count} 次生成审查通过: {result_reason}"
- )
- # 将临时文件重命名为最终文件
- if os.path.exists(temp_image_path):
- os.rename(temp_image_path, image_save_path)
- else:
- # 如果临时文件不存在,重新下载
- await asyncio.to_thread(download_image, last_image_url, image_save_path)
-
- except Exception as e:
- logger.error(f"镜头 {lens_id} 第 {attempt_count} 次生成出错: {e}")
- if attempt_count > max_retries:
- logger.error(f"镜头 {lens_id} 已达到最大重试次数,停止重试")
- # 如果还有临时文件,尝试使用它
- if os.path.exists(temp_image_path):
- os.rename(temp_image_path, image_save_path)
- raise
- # 继续重试
- continue
-
- lens["storyboard_url"] = image_save_path
- logger.info(f"镜头 {lens_id} 分镜图片最终生成完成: {image_save_path}(共尝试 {attempt_count} 次)")
-
- async with AsyncArkImageClient() as image_client, AsyncArkClient() as ark_client:
- # 并行处理所有镜头
- tasks = [
- generate_single_storyboard(image_client, ark_client, lens)
- for lens in script["lens_details"]
- ]
- await asyncio.gather(*tasks, return_exceptions=True)
-
- output_file = str(self.output_dir / "step3_storyboard.json")
- await self.io_handler.write_json_async(script, output_file)
-
- return {
- "output_file": output_file,
- "data": script
- }
- async def step4_generate_video_clips(self) -> Dict:
- """步骤4:基于video_prompt和分镜图片生成视频片段"""
- previous_output = self.manager.load_step_output("step3")
- if previous_output is None:
- raise ValueError("步骤3未完成,无法生成视频片段")
-
- script = previous_output["data"]
-
- # 确保video_clips目录存在
- video_clip_dir = self.output_dir / "video_clips"
- video_clip_dir.mkdir(parents=True, exist_ok=True)
-
- async def process_single_lens(video_client: AsyncArkVideoClient, lens: Dict) -> Optional[asyncio.Task]:
- """处理单个镜头的视频生成,返回后台任务"""
- lens_id = lens.get("lens_id")
- video_prompt = lens.get("video_prompt")
- storyboard_url = lens.get("storyboard_url")
- lens_duration = lens.get("lens_duration", 4)
-
- if not video_prompt:
- raise ValueError(f"镜头 {lens_id} 缺少 video_prompt 字段")
- if not storyboard_url:
- raise ValueError(f"镜头 {lens_id} 缺少 storyboard_url 字段")
-
- video_save_path = str(video_clip_dir / f"lens_{lens_id}_{self.output_dir.name}.mp4")
-
- # 如果文件已存在,跳过生成
- if os.path.exists(video_save_path):
- logger.info(f"视频片段已存在,跳过生成: lens {lens_id}")
- lens["clip_url"] = video_save_path
- return None
-
- # 如果storyboard_url是本地路径,需要上传到TOS获取URL
- image_url = storyboard_url
- if not storyboard_url.startswith(("http://", "https://")):
- # 上传到TOS获取URL
- from examples.video_create.utils.tools import upload_file_to_tos
- image_url = await asyncio.to_thread(upload_file_to_tos, storyboard_url)
- logger.info(f"镜头 {lens_id} 分镜图片已上传到TOS: {image_url}")
-
- # 构建生成参数字符串
- gen_params = f" --dur {lens_duration}"
-
- # 创建视频生成任务
- task_id, background_task = await video_client.create_video_task_async(
- prompt=video_prompt,
- image_url=image_url,
- gen_params=gen_params,
- output_path=video_save_path
- )
-
- if background_task is not None:
- lens["clip_url"] = video_save_path
- logger.info(f"已提交视频生成任务,lens {lens_id}, task_id: {task_id}")
- return background_task
- else:
- logger.error(f"视频生成任务提交失败,lens {lens_id}")
- return None
-
- async with AsyncArkVideoClient() as video_client:
- # 并行提交所有视频生成任务,收集后台任务
- lens_tasks = [
- process_single_lens(video_client, lens)
- for lens in script["lens_details"]
- ]
- lens_results = await asyncio.gather(*lens_tasks, return_exceptions=True)
-
- # 展平所有后台任务
- all_background_tasks = []
- for task in lens_results:
- if isinstance(task, Exception):
- logger.error(f"处理镜头时出错: {task}")
- elif task is not None:
- all_background_tasks.append(task)
-
- # 等待所有视频生成和下载完成
- if all_background_tasks:
- logger.info(f"等待 {len(all_background_tasks)} 个视频生成任务完成...")
- await asyncio.gather(*all_background_tasks, return_exceptions=True)
- logger.info("所有视频生成任务已完成!")
- else:
- logger.warning("没有提交任何视频生成任务")
-
- output_file = str(self.output_dir / "step4_video_clips.json")
- await self.io_handler.write_json_async(script, output_file)
-
- return {
- "output_file": output_file,
- "data": script
- }
- async def step5_concat_clips(self) -> Dict:
- """步骤5:拼接所有视频片段"""
- previous_output = self.manager.load_step_output("step4")
- if previous_output is None:
- raise ValueError("步骤4未完成,无法进行视频拼接")
-
- script = previous_output["data"]
-
- # 确保video_save目录存在
- video_save_dir = self.output_dir / "video_save"
- video_save_dir.mkdir(parents=True, exist_ok=True)
-
- # 收集所有视频片段路径(按lens_id排序)
- clips_path = []
- for lens in sorted(script["lens_details"], key=lambda x: x.get("lens_id", 0)):
- clip_url = lens.get("clip_url")
- if clip_url and os.path.exists(clip_url):
- clips_path.append(clip_url)
- else:
- logger.warning(f"镜头 {lens.get('lens_id')} 的视频片段不存在,跳过")
-
- if not clips_path:
- raise ValueError("没有可用的视频片段进行拼接")
-
- output_file = str(video_save_dir / "final_video.mp4")
-
- # 拼接视频(使用线程池执行同步操作)
- await asyncio.to_thread(concat_videos, clips_path, output_file)
-
- logger.info(f"视频拼接完成: {output_file}")
-
- return {
- "output_file": output_file,
- "data": "final video"
- }
|