""" idea2video 任务流 从idea到video的完整任务流 包括: 1. 从idea到story 2. 从story到script 3. 从script到video """ import os import asyncio from pathlib import Path from typing import Dict, Optional from taskflow import TaskManager, FileIOHandler from ..mcps.story_create import develop_story, develop_story_base_on_story from ..mcps.character_extract import extract_characters from ..mcps.storyboard_create import create_storyboard from ..mcps.character_portrait import gen_single_character_portrait from ..mcps.camera_tree import create_camera_tree from ..mcps.refer_image import select_refer_image from ..mcps.concat_clip import concat_videos from ..utils.tools import download_image, efficient_sort from api_modules.ark_client_async import AsyncArkClient from api_modules.ark_image_client_async import AsyncArkImageClient from api_modules.ark_video_client import ArkVideoClient from api_modules.ark_video_client_async import AsyncArkVideoClient from taskflow import get_logger logger = get_logger("examples.video_create.pipeline.idea2video_pipeline") class Idea2VideoPipeline: """视频创作任务流""" def __init__(self, io_handler: FileIOHandler, output_dir: str, manager: TaskManager): """ 初始化视频创作任务流 Args: io_handler: 文件I/O处理器 output_dir: 输出目录 manager: 任务管理器 """ self.io_handler = io_handler self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) self.manager = manager async def step1_develop_story(self, idea: str, user_requirement: Optional[str] = None) -> Dict: """步骤1:从idea到story""" async with AsyncArkClient() as client: story = await develop_story(client=client, idea=idea, user_requirement=user_requirement) output_file = str(self.output_dir / "step1_story.txt") await self.io_handler.write_text_async(story, output_file) return { "output_file": output_file, "data": story } async def step2_develop_script(self, user_requirement: Optional[str] = None) -> Dict: """步骤2:从story到script""" previous_output = self.manager.load_step_output("step1") if previous_output is None: raise ValueError("步骤1未完成,无法从story到script") story = previous_output["data"] async with AsyncArkClient() as client: script = await develop_story_base_on_story(client=client, story=story, user_requirement=user_requirement) output_file = str(self.output_dir / "step2_script.json") await self.io_handler.write_json_async(script, output_file) return { "output_file": output_file, "data": script } async def step3_extract_characters(self) -> Dict: """步骤3:从script到characters""" previous_output = self.manager.load_step_output("step1") if previous_output is None: raise ValueError("步骤2未完成,无法从story到characters") story = previous_output["data"] async with AsyncArkClient() as client: characters = await extract_characters(client=client, story=story) output_file = str(self.output_dir / "step3_characters.json") await self.io_handler.write_json_async(characters, output_file) return { "output_file": output_file, "data": characters } async def step4_create_storyboard(self, user_requirement: Optional[str] = None) -> Dict: """步骤4:从script到storyboard""" previous_script_output = self.manager.load_step_output("step2") if previous_script_output is None: raise ValueError("步骤2未完成,无法从script到storyboard") previous_characters_output = self.manager.load_step_output("step3") if previous_characters_output is None: raise ValueError("步骤3未完成,无法从script到storyboard") script = previous_script_output["data"] characters = previous_characters_output["data"] # 将 characters 转换为字符串格式(如果它是字典) if isinstance(characters, dict): # 将角色信息格式化为字符串 characters_str = "" if "characters" in characters: for char in characters["characters"]: char_info = [] if "identifier_in_scene" in char: char_info.append(f"标识符: {char['identifier_in_scene']}") if "static_features" in char: char_info.append(f"静态特征: {char['static_features']}") if "dynamic_features" in char: char_info.append(f"动态特征: {char['dynamic_features']}") characters_str += " | ".join(char_info) + "\n" else: characters_str = str(characters) else: characters_str = str(characters) storyboard = await create_storyboard(script=script, characters=characters_str, user_requirement=user_requirement) output_file = str(self.output_dir / "step4_storyboard.json") await self.io_handler.write_json_async(storyboard, output_file) return { "output_file": output_file, "data": storyboard } async def step5_generate_portrait( self, size: Optional[str] = "2048x2048", refer_image: Optional[list[str]] = None, refer_image_map: Optional[Dict[str, list[str]]] = None, style: Optional[str] = None ) -> Dict: """ 步骤5:从characters到portrait(并行生成所有角色肖像) Args: size: 生成图片的尺寸,默认为 "2048x2048" refer_image: 全局参考图片列表(所有角色共享),优先级低于 refer_image_map refer_image_map: 角色标识符到参考图片列表的映射字典,格式为 {角色标识符: [图片路径1, 图片路径2, ...]} 例如: {"林小星": ["path/to/image1.jpg"], "阿凯": ["path/to/image2.jpg"]} 优先级最高,会覆盖 refer_image 和角色字段中的 refer_image style: 生成风格,例如 "写实"、"卡通" 等 """ previous_output = self.manager.load_step_output("step3") if previous_output is None: raise ValueError("步骤3未完成,无法从characters到portrait") characters = previous_output["data"] # 确保portraits目录存在 portraits_dir = self.output_dir / "portraits" portraits_dir.mkdir(parents=True, exist_ok=True) async def process_single_character(client: AsyncArkImageClient, character: dict) -> None: """处理单个角色的肖像生成和下载""" character_id = character["identifier_in_scene"] # 确定该角色使用的参考图片,优先级从高到低: # 1. refer_image_map 中该角色的映射 # 2. 全局 refer_image # 3. 角色字典中的 refer_image 字段(如果存在) # 4. None character_refer_image = None if refer_image_map is not None and character_id in refer_image_map: character_refer_image = refer_image_map[character_id] logger.info(f"角色 {character_id} 使用映射中的参考图片: {character_refer_image}") elif refer_image is not None: character_refer_image = refer_image logger.info(f"角色 {character_id} 使用全局参考图片: {character_refer_image}") elif "refer_image" in character and character["refer_image"] is not None: character_refer_image = character["refer_image"] # 确保是列表格式 if isinstance(character_refer_image, str): character_refer_image = [character_refer_image] logger.info(f"角色 {character_id} 使用角色字段中的参考图片: {character_refer_image}") # 生成肖像图片URL image_url = await gen_single_character_portrait( client=client, character=character, size=size, refer_image=character_refer_image, style=style ) # 下载图片(使用asyncio.to_thread在线程池中执行同步IO操作) image_path = str(portraits_dir / f"{character_id}_{self.output_dir.name}.jpg") await asyncio.to_thread(download_image, image_url, image_path) character["portrait_path"] = image_path async with AsyncArkImageClient() as client: # 并行处理所有角色 tasks = [ process_single_character(client, character) for character in characters["characters"] ] await asyncio.gather(*tasks) output_file = str(self.output_dir / "step5_portrait.json") await self.io_handler.write_json_async(characters, output_file) return { "output_file": output_file, "data": characters } async def step6_create_camera_tree(self) -> Dict: """步骤6:从storyboard到camera_tree""" previous_output = self.manager.load_step_output("step4") if previous_output is None: raise ValueError("步骤4未完成,无法从storyboard到camera_tree") storyboards = previous_output["data"] # 确保camera_tree目录存在 camera_tree_dir = self.output_dir / "camera_tree" camera_tree_dir.mkdir(parents=True, exist_ok=True) async def process_single_storyboard(client: AsyncArkClient, storyboard: Dict, scene_idx: int) -> None: """处理单个storyboard的camera_tree生成和下载""" camera_tree = await create_camera_tree(client=client, storyboard=storyboard) camera_tree_path = str(camera_tree_dir / f"camera_tree_{scene_idx}.json") await self.io_handler.write_json_async(camera_tree, camera_tree_path) async with AsyncArkClient() as client: # 并行处理所有storyboard tasks = [ process_single_storyboard(client, storyboard, idx) for idx, storyboard in enumerate(storyboards["storyboard"]) ] await asyncio.gather(*tasks) output_file = str(self.output_dir / "step6_camera_tree.json") await self.io_handler.write_json_async(storyboards, output_file) return { "output_file": output_file, "data": storyboards } async def step7_generate_video_frames(self) -> Dict: """步骤7:从camera_tree到video_frames""" previous_storyboard_output = self.manager.load_step_output("step6") if previous_storyboard_output is None: raise ValueError("步骤6未完成,无法从camera_tree到video_frames") previous_portrait_output = self.manager.load_step_output("step5") if previous_portrait_output is None: raise ValueError("步骤5未完成,无法从characters到video_frames") storyboards = previous_storyboard_output["data"] characters = previous_portrait_output["data"] # 确保video_frames目录存在 video_frames_dir = self.output_dir / "video_frames" video_frames_dir.mkdir(parents=True, exist_ok=True) async def process_single_storyboard(client: AsyncArkClient, image_client: AsyncArkImageClient, storyboard: Dict, scene_idx: int) -> Dict: """处理单个storyboard的video_frames生成和下载""" camera_tree = storyboard["camera_tree"] storyboard = storyboard["storyboard"] parent_shot_idxs = [0] active_shot_idxs = [] for _, item in enumerate(camera_tree): if item["parent_shot_idx"] is not None: parent_shot_idxs.append(item["parent_shot_idx"]) active_shot_idxs.append(item["active_shot_idxs"]) process_order = efficient_sort(parent_shot_idxs, active_shot_idxs) process_order += [i for i in range(len(active_shot_idxs)) if i not in process_order] for cam_idx in process_order: logger.info(f"Processing scene {scene_idx} - camera {cam_idx}...") camera_item = camera_tree[cam_idx] prev_frame_path_and_text_pairs = [] for _, shot_idx in enumerate(camera_item["active_shot_idxs"]): logger.info(f"Processing scene {scene_idx} - camera {cam_idx} - shot {shot_idx}...") frame_description = storyboard[shot_idx]["ff_desc"] vis_char_idxs = storyboard[shot_idx]["ff_vis_char_idxs"] image_path_and_text_pairs = [] frame_save_path = str(video_frames_dir / f"scene{scene_idx}_camera{cam_idx}_shot{shot_idx}_{self.output_dir.name}.png") if os.path.exists(frame_save_path): logger.info(f"Frame for scene {scene_idx} - camera {cam_idx} - shot {shot_idx} already exists.") # 即使文件已存在,也需要设置 ff_path 和 prev_frame_path_and_text_pairs,以便后续引用 storyboard[shot_idx]["ff_path"] = frame_save_path prev_frame_path_and_text_pairs.append((frame_save_path, frame_description)) continue else: # 参考可见角色三视图 for vis_char_idx in vis_char_idxs: logger.info(f"Referencing character {vis_char_idx} portrait...") image_path_and_text_pairs.append((characters["characters"][vis_char_idx]["portrait_path"], f'{characters["characters"][vis_char_idx]["identifier_in_scene"]}的三视图肖像')) # 参考前序帧 image_path_and_text_pairs.extend(prev_frame_path_and_text_pairs) # 参考父帧 if camera_item["parent_shot_idx"] is not None: parent_shot_idx = camera_item["parent_shot_idx"] parent_shot = storyboard[parent_shot_idx] # 检查父帧的 ff_path 是否存在 parent_ff_path = parent_shot.get("ff_path") # 如果 ff_path 不存在,尝试从文件系统推断(基于 parent_cam_idx) if parent_ff_path is None and camera_item.get("parent_cam_idx") is not None: parent_cam_idx = camera_item["parent_cam_idx"] inferred_parent_path = str(video_frames_dir / f"scene{scene_idx}_camera{parent_cam_idx}_shot{parent_shot_idx}.png") if os.path.exists(inferred_parent_path): parent_ff_path = inferred_parent_path # 更新父帧的 ff_path,以便后续引用 parent_shot["ff_path"] = parent_ff_path logger.info(f"Found parent frame at inferred path: {parent_ff_path}") # 如果找到了父帧路径,添加到参考列表 if parent_ff_path is not None: image_path_and_text_pairs.append((parent_ff_path, parent_shot["ff_desc"])) else: logger.warning(f"Parent frame for shot {shot_idx} (parent_shot_idx={parent_shot_idx}) not found, skipping parent frame reference.") # 筛选参考图像,生成生图提示词 info_for_gen_frame = await select_refer_image( client=client, frame_description=frame_description, image_text_pairs=image_path_and_text_pairs ) # 生成序列帧 frame_prompt = info_for_gen_frame["text_prompt"] image_urls = [item[0] for item in info_for_gen_frame["reference_image_path_and_text_pairs"]] logger.info(f"Frame prompt: {frame_prompt}") logger.info(f"Reference images: {image_urls}") response = await image_client.generate_image( prompt=frame_prompt, reference_image=image_urls ) frame_url = image_client.get_image_url(response) await asyncio.to_thread(download_image, frame_url, frame_save_path) prev_frame_path_and_text_pairs.append((frame_save_path, frame_description)) storyboard[shot_idx]["ff_path"] = frame_save_path async with AsyncArkClient() as client, AsyncArkImageClient() as image_client: # 并行处理所有storyboard tasks = [ process_single_storyboard(client, image_client, storyboard, idx) for idx, storyboard in enumerate(storyboards["storyboard"]) ] await asyncio.gather(*tasks, return_exceptions=True) output_file = str(self.output_dir / "step7_video_frames.json") await self.io_handler.write_json_async(storyboards, output_file) return { "output_file": output_file, "data": storyboards } async def step8_generate_video(self) -> Dict: """步骤8:从video_frames到video_clips""" previous_video_frames_output = self.manager.load_step_output("step7") if previous_video_frames_output is None: raise ValueError("步骤7未完成,无法从video_frames到video_clips") storyboards = previous_video_frames_output["data"] # 确保video_clips目录存在 video_clip_dir = self.output_dir / "video_clips" video_clip_dir.mkdir(parents=True, exist_ok=True) async def process_single_storyboard(video_client: AsyncArkVideoClient, storyboard: Dict, scene_idx: int) -> list: """处理单个video_frame的video_clip生成和下载,返回所有后台任务""" background_tasks = [] for shot in storyboard["storyboard"]: len_id = shot["idx"] motion_prompt = shot["motion_desc"] image_url = shot["ff_path"] lens_duration = 4 video_save_path = str(video_clip_dir / f"scene{scene_idx}_len{len_id}.mp4") # 构建生成参数字符串 gen_params = f" --dur {lens_duration}" task_id, background_task = await video_client.create_video_task_async( prompt=motion_prompt, image_url=image_url, gen_params=gen_params, output_path=video_save_path ) if background_task is not None: background_tasks.append(background_task) shot["clip_path"] = video_save_path logger.info(f"已提交视频生成任务,scene {scene_idx}, shot {len_id}, task_id: {task_id}") else: logger.error(f"视频生成任务提交失败,scene {scene_idx}, shot {len_id}") return background_tasks async with AsyncArkVideoClient() as video_client: # 并行处理所有storyboard,收集所有后台任务 storyboard_tasks = [ process_single_storyboard(video_client, storyboard, idx) for idx, storyboard in enumerate(storyboards["storyboard"]) ] storyboard_results = await asyncio.gather(*storyboard_tasks) # 展平所有后台任务 all_background_tasks = [] for task_list in storyboard_results: all_background_tasks.extend(task_list) # 等待所有视频生成和下载完成 if all_background_tasks: logger.info(f"等待 {len(all_background_tasks)} 个视频生成任务完成...") await asyncio.gather(*all_background_tasks) logger.info("所有视频生成任务已完成!") else: logger.warning("没有提交任何视频生成任务") output_file = str(self.output_dir / "step8_video_clips.json") await self.io_handler.write_json_async(storyboards, output_file) return { "output_file": output_file, "data": storyboards } async def step9_concat_clip(self) -> Dict: """步骤9:拼接所有视频片段进行输出""" previous_output = self.manager.load_step_output("step8") if previous_output is None: raise ValueError("步骤8未完成,无法进行视频拼接!") storyboards = previous_output["data"] # 确保video_save目录存在 video_save_dir = self.output_dir / "video_save" video_save_dir.mkdir(parents=True, exist_ok=True) clips_path = [] for storyboard in storyboards["storyboard"]: for item in storyboard["storyboard"]: if os.path.exists(item["clip_path"]): clips_path.append(item["clip_path"]) output_file = str(video_save_dir / "final_video.mp4") concat_videos(clips_path, output_file) return { "output_file": output_file, "data": "final video" }