| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473 |
- """
- idea2video 任务流
- 从idea到video的完整任务流
- 包括:
- 1. 从idea到story
- 2. 从story到script
- 3. 从script到video
- """
- import os
- import asyncio
- from pathlib import Path
- from typing import Dict, Optional
- from taskflow import TaskManager, FileIOHandler
- from ..mcps.story_create import develop_story, develop_story_base_on_story
- from ..mcps.character_extract import extract_characters
- from ..mcps.storyboard_create import create_storyboard
- from ..mcps.character_portrait import gen_single_character_portrait
- from ..mcps.camera_tree import create_camera_tree
- from ..mcps.refer_image import select_refer_image
- from ..mcps.concat_clip import concat_videos
- from ..utils.tools import download_image, efficient_sort
- from api_modules.ark_client_async import AsyncArkClient
- from api_modules.ark_image_client_async import AsyncArkImageClient
- from api_modules.ark_video_client import ArkVideoClient
- from api_modules.ark_video_client_async import AsyncArkVideoClient
- from taskflow import get_logger
- logger = get_logger("examples.video_create.pipeline.idea2video_pipeline")
- class Idea2VideoPipeline:
- """视频创作任务流"""
- def __init__(self, io_handler: FileIOHandler, output_dir: str, manager: TaskManager):
- """
- 初始化视频创作任务流
- Args:
- io_handler: 文件I/O处理器
- output_dir: 输出目录
- manager: 任务管理器
- """
- self.io_handler = io_handler
- self.output_dir = Path(output_dir)
- self.output_dir.mkdir(parents=True, exist_ok=True)
- self.manager = manager
- async def step1_develop_story(self, idea: str, user_requirement: Optional[str] = None) -> Dict:
- """步骤1:从idea到story"""
- async with AsyncArkClient() as client:
- story = await develop_story(client=client, idea=idea, user_requirement=user_requirement)
- output_file = str(self.output_dir / "step1_story.txt")
- await self.io_handler.write_text_async(story, output_file)
- return {
- "output_file": output_file,
- "data": story
- }
- async def step2_develop_script(self, user_requirement: Optional[str] = None) -> Dict:
- """步骤2:从story到script"""
- previous_output = self.manager.load_step_output("step1")
- if previous_output is None:
- raise ValueError("步骤1未完成,无法从story到script")
- story = previous_output["data"]
- async with AsyncArkClient() as client:
- script = await develop_story_base_on_story(client=client, story=story, user_requirement=user_requirement)
- output_file = str(self.output_dir / "step2_script.json")
- await self.io_handler.write_json_async(script, output_file)
- return {
- "output_file": output_file,
- "data": script
- }
- async def step3_extract_characters(self) -> Dict:
- """步骤3:从script到characters"""
- previous_output = self.manager.load_step_output("step1")
- if previous_output is None:
- raise ValueError("步骤2未完成,无法从story到characters")
- story = previous_output["data"]
-
- async with AsyncArkClient() as client:
- characters = await extract_characters(client=client, story=story)
- output_file = str(self.output_dir / "step3_characters.json")
- await self.io_handler.write_json_async(characters, output_file)
- return {
- "output_file": output_file,
- "data": characters
- }
- async def step4_create_storyboard(self, user_requirement: Optional[str] = None) -> Dict:
- """步骤4:从script到storyboard"""
- previous_script_output = self.manager.load_step_output("step2")
- if previous_script_output is None:
- raise ValueError("步骤2未完成,无法从script到storyboard")
- previous_characters_output = self.manager.load_step_output("step3")
- if previous_characters_output is None:
- raise ValueError("步骤3未完成,无法从script到storyboard")
- script = previous_script_output["data"]
- characters = previous_characters_output["data"]
-
- # 将 characters 转换为字符串格式(如果它是字典)
- if isinstance(characters, dict):
- # 将角色信息格式化为字符串
- characters_str = ""
- if "characters" in characters:
- for char in characters["characters"]:
- char_info = []
- if "identifier_in_scene" in char:
- char_info.append(f"标识符: {char['identifier_in_scene']}")
- if "static_features" in char:
- char_info.append(f"静态特征: {char['static_features']}")
- if "dynamic_features" in char:
- char_info.append(f"动态特征: {char['dynamic_features']}")
- characters_str += " | ".join(char_info) + "\n"
- else:
- characters_str = str(characters)
- else:
- characters_str = str(characters)
-
- storyboard = await create_storyboard(script=script, characters=characters_str, user_requirement=user_requirement)
- output_file = str(self.output_dir / "step4_storyboard.json")
- await self.io_handler.write_json_async(storyboard, output_file)
- return {
- "output_file": output_file,
- "data": storyboard
- }
- async def step5_generate_portrait(
- self,
- size: Optional[str] = "2048x2048",
- refer_image: Optional[list[str]] = None,
- refer_image_map: Optional[Dict[str, list[str]]] = None,
- style: Optional[str] = None
- ) -> Dict:
- """
- 步骤5:从characters到portrait(并行生成所有角色肖像)
-
- Args:
- size: 生成图片的尺寸,默认为 "2048x2048"
- refer_image: 全局参考图片列表(所有角色共享),优先级低于 refer_image_map
- refer_image_map: 角色标识符到参考图片列表的映射字典,格式为 {角色标识符: [图片路径1, 图片路径2, ...]}
- 例如: {"林小星": ["path/to/image1.jpg"], "阿凯": ["path/to/image2.jpg"]}
- 优先级最高,会覆盖 refer_image 和角色字段中的 refer_image
- style: 生成风格,例如 "写实"、"卡通" 等
- """
- previous_output = self.manager.load_step_output("step3")
- if previous_output is None:
- raise ValueError("步骤3未完成,无法从characters到portrait")
- characters = previous_output["data"]
-
- # 确保portraits目录存在
- portraits_dir = self.output_dir / "portraits"
- portraits_dir.mkdir(parents=True, exist_ok=True)
- async def process_single_character(client: AsyncArkImageClient, character: dict) -> None:
- """处理单个角色的肖像生成和下载"""
- character_id = character["identifier_in_scene"]
-
- # 确定该角色使用的参考图片,优先级从高到低:
- # 1. refer_image_map 中该角色的映射
- # 2. 全局 refer_image
- # 3. 角色字典中的 refer_image 字段(如果存在)
- # 4. None
- character_refer_image = None
- if refer_image_map is not None and character_id in refer_image_map:
- character_refer_image = refer_image_map[character_id]
- logger.info(f"角色 {character_id} 使用映射中的参考图片: {character_refer_image}")
- elif refer_image is not None:
- character_refer_image = refer_image
- logger.info(f"角色 {character_id} 使用全局参考图片: {character_refer_image}")
- elif "refer_image" in character and character["refer_image"] is not None:
- character_refer_image = character["refer_image"]
- # 确保是列表格式
- if isinstance(character_refer_image, str):
- character_refer_image = [character_refer_image]
- logger.info(f"角色 {character_id} 使用角色字段中的参考图片: {character_refer_image}")
-
- # 生成肖像图片URL
- image_url = await gen_single_character_portrait(
- client=client,
- character=character,
- size=size,
- refer_image=character_refer_image,
- style=style
- )
- # 下载图片(使用asyncio.to_thread在线程池中执行同步IO操作)
- image_path = str(portraits_dir / f"{character_id}_{self.output_dir.name}.jpg")
- await asyncio.to_thread(download_image, image_url, image_path)
- character["portrait_path"] = image_path
- async with AsyncArkImageClient() as client:
- # 并行处理所有角色
- tasks = [
- process_single_character(client, character)
- for character in characters["characters"]
- ]
- await asyncio.gather(*tasks)
- output_file = str(self.output_dir / "step5_portrait.json")
- await self.io_handler.write_json_async(characters, output_file)
- return {
- "output_file": output_file,
- "data": characters
- }
- async def step6_create_camera_tree(self) -> Dict:
- """步骤6:从storyboard到camera_tree"""
- previous_output = self.manager.load_step_output("step4")
- if previous_output is None:
- raise ValueError("步骤4未完成,无法从storyboard到camera_tree")
- storyboards = previous_output["data"]
- # 确保camera_tree目录存在
- camera_tree_dir = self.output_dir / "camera_tree"
- camera_tree_dir.mkdir(parents=True, exist_ok=True)
- async def process_single_storyboard(client: AsyncArkClient, storyboard: Dict, scene_idx: int) -> None:
- """处理单个storyboard的camera_tree生成和下载"""
- camera_tree = await create_camera_tree(client=client, storyboard=storyboard)
- camera_tree_path = str(camera_tree_dir / f"camera_tree_{scene_idx}.json")
- await self.io_handler.write_json_async(camera_tree, camera_tree_path)
- async with AsyncArkClient() as client:
- # 并行处理所有storyboard
- tasks = [
- process_single_storyboard(client, storyboard, idx)
- for idx, storyboard in enumerate(storyboards["storyboard"])
- ]
- await asyncio.gather(*tasks)
- output_file = str(self.output_dir / "step6_camera_tree.json")
- await self.io_handler.write_json_async(storyboards, output_file)
- return {
- "output_file": output_file,
- "data": storyboards
- }
- async def step7_generate_video_frames(self) -> Dict:
- """步骤7:从camera_tree到video_frames"""
- previous_storyboard_output = self.manager.load_step_output("step6")
- if previous_storyboard_output is None:
- raise ValueError("步骤6未完成,无法从camera_tree到video_frames")
- previous_portrait_output = self.manager.load_step_output("step5")
- if previous_portrait_output is None:
- raise ValueError("步骤5未完成,无法从characters到video_frames")
- storyboards = previous_storyboard_output["data"]
- characters = previous_portrait_output["data"]
- # 确保video_frames目录存在
- video_frames_dir = self.output_dir / "video_frames"
- video_frames_dir.mkdir(parents=True, exist_ok=True)
- async def process_single_storyboard(client: AsyncArkClient, image_client: AsyncArkImageClient, storyboard: Dict, scene_idx: int) -> Dict:
- """处理单个storyboard的video_frames生成和下载"""
- camera_tree = storyboard["camera_tree"]
- storyboard = storyboard["storyboard"]
- parent_shot_idxs = [0]
- active_shot_idxs = []
- for _, item in enumerate(camera_tree):
- if item["parent_shot_idx"] is not None:
- parent_shot_idxs.append(item["parent_shot_idx"])
- active_shot_idxs.append(item["active_shot_idxs"])
-
- process_order = efficient_sort(parent_shot_idxs, active_shot_idxs)
- process_order += [i for i in range(len(active_shot_idxs)) if i not in process_order]
- for cam_idx in process_order:
- logger.info(f"Processing scene {scene_idx} - camera {cam_idx}...")
- camera_item = camera_tree[cam_idx]
- prev_frame_path_and_text_pairs = []
- for _, shot_idx in enumerate(camera_item["active_shot_idxs"]):
- logger.info(f"Processing scene {scene_idx} - camera {cam_idx} - shot {shot_idx}...")
- frame_description = storyboard[shot_idx]["ff_desc"]
- vis_char_idxs = storyboard[shot_idx]["ff_vis_char_idxs"]
- image_path_and_text_pairs = []
- frame_save_path = str(video_frames_dir / f"scene{scene_idx}_camera{cam_idx}_shot{shot_idx}_{self.output_dir.name}.png")
- if os.path.exists(frame_save_path):
- logger.info(f"Frame for scene {scene_idx} - camera {cam_idx} - shot {shot_idx} already exists.")
- # 即使文件已存在,也需要设置 ff_path 和 prev_frame_path_and_text_pairs,以便后续引用
- storyboard[shot_idx]["ff_path"] = frame_save_path
- prev_frame_path_and_text_pairs.append((frame_save_path, frame_description))
- continue
- else:
- # 参考可见角色三视图
- for vis_char_idx in vis_char_idxs:
- logger.info(f"Referencing character {vis_char_idx} portrait...")
- image_path_and_text_pairs.append((characters["characters"][vis_char_idx]["portrait_path"], f'{characters["characters"][vis_char_idx]["identifier_in_scene"]}的三视图肖像'))
-
- # 参考前序帧
- image_path_and_text_pairs.extend(prev_frame_path_and_text_pairs)
- # 参考父帧
- if camera_item["parent_shot_idx"] is not None:
- parent_shot_idx = camera_item["parent_shot_idx"]
- parent_shot = storyboard[parent_shot_idx]
-
- # 检查父帧的 ff_path 是否存在
- parent_ff_path = parent_shot.get("ff_path")
-
- # 如果 ff_path 不存在,尝试从文件系统推断(基于 parent_cam_idx)
- if parent_ff_path is None and camera_item.get("parent_cam_idx") is not None:
- parent_cam_idx = camera_item["parent_cam_idx"]
- inferred_parent_path = str(video_frames_dir / f"scene{scene_idx}_camera{parent_cam_idx}_shot{parent_shot_idx}.png")
- if os.path.exists(inferred_parent_path):
- parent_ff_path = inferred_parent_path
- # 更新父帧的 ff_path,以便后续引用
- parent_shot["ff_path"] = parent_ff_path
- logger.info(f"Found parent frame at inferred path: {parent_ff_path}")
-
- # 如果找到了父帧路径,添加到参考列表
- if parent_ff_path is not None:
- image_path_and_text_pairs.append((parent_ff_path, parent_shot["ff_desc"]))
- else:
- logger.warning(f"Parent frame for shot {shot_idx} (parent_shot_idx={parent_shot_idx}) not found, skipping parent frame reference.")
- # 筛选参考图像,生成生图提示词
- info_for_gen_frame = await select_refer_image(
- client=client,
- frame_description=frame_description,
- image_text_pairs=image_path_and_text_pairs
- )
- # 生成序列帧
- frame_prompt = info_for_gen_frame["text_prompt"]
- image_urls = [item[0] for item in info_for_gen_frame["reference_image_path_and_text_pairs"]]
- logger.info(f"Frame prompt: {frame_prompt}")
- logger.info(f"Reference images: {image_urls}")
- response = await image_client.generate_image(
- prompt=frame_prompt,
- reference_image=image_urls
- )
- frame_url = image_client.get_image_url(response)
- await asyncio.to_thread(download_image, frame_url, frame_save_path)
- prev_frame_path_and_text_pairs.append((frame_save_path, frame_description))
- storyboard[shot_idx]["ff_path"] = frame_save_path
- async with AsyncArkClient() as client, AsyncArkImageClient() as image_client:
- # 并行处理所有storyboard
- tasks = [
- process_single_storyboard(client, image_client, storyboard, idx)
- for idx, storyboard in enumerate(storyboards["storyboard"])
- ]
- await asyncio.gather(*tasks, return_exceptions=True)
- output_file = str(self.output_dir / "step7_video_frames.json")
- await self.io_handler.write_json_async(storyboards, output_file)
- return {
- "output_file": output_file,
- "data": storyboards
- }
- async def step8_generate_video(self) -> Dict:
- """步骤8:从video_frames到video_clips"""
- previous_video_frames_output = self.manager.load_step_output("step7")
- if previous_video_frames_output is None:
- raise ValueError("步骤7未完成,无法从video_frames到video_clips")
- storyboards = previous_video_frames_output["data"]
- # 确保video_clips目录存在
- video_clip_dir = self.output_dir / "video_clips"
- video_clip_dir.mkdir(parents=True, exist_ok=True)
- async def process_single_storyboard(video_client: AsyncArkVideoClient, storyboard: Dict, scene_idx: int) -> list:
- """处理单个video_frame的video_clip生成和下载,返回所有后台任务"""
- background_tasks = []
- for shot in storyboard["storyboard"]:
- len_id = shot["idx"]
- motion_prompt = shot["motion_desc"]
- image_url = shot["ff_path"]
- lens_duration = 4
- video_save_path = str(video_clip_dir / f"scene{scene_idx}_len{len_id}.mp4")
-
- # 构建生成参数字符串
- gen_params = f" --dur {lens_duration}"
-
- task_id, background_task = await video_client.create_video_task_async(
- prompt=motion_prompt,
- image_url=image_url,
- gen_params=gen_params,
- output_path=video_save_path
- )
-
- if background_task is not None:
- background_tasks.append(background_task)
- shot["clip_path"] = video_save_path
- logger.info(f"已提交视频生成任务,scene {scene_idx}, shot {len_id}, task_id: {task_id}")
- else:
- logger.error(f"视频生成任务提交失败,scene {scene_idx}, shot {len_id}")
-
- return background_tasks
- async with AsyncArkVideoClient() as video_client:
- # 并行处理所有storyboard,收集所有后台任务
- storyboard_tasks = [
- process_single_storyboard(video_client, storyboard, idx)
- for idx, storyboard in enumerate(storyboards["storyboard"])
- ]
- storyboard_results = await asyncio.gather(*storyboard_tasks)
-
- # 展平所有后台任务
- all_background_tasks = []
- for task_list in storyboard_results:
- all_background_tasks.extend(task_list)
-
- # 等待所有视频生成和下载完成
- if all_background_tasks:
- logger.info(f"等待 {len(all_background_tasks)} 个视频生成任务完成...")
- await asyncio.gather(*all_background_tasks)
- logger.info("所有视频生成任务已完成!")
- else:
- logger.warning("没有提交任何视频生成任务")
- output_file = str(self.output_dir / "step8_video_clips.json")
- await self.io_handler.write_json_async(storyboards, output_file)
- return {
- "output_file": output_file,
- "data": storyboards
- }
- async def step9_concat_clip(self) -> Dict:
- """步骤9:拼接所有视频片段进行输出"""
- previous_output = self.manager.load_step_output("step8")
- if previous_output is None:
- raise ValueError("步骤8未完成,无法进行视频拼接!")
- storyboards = previous_output["data"]
- # 确保video_save目录存在
- video_save_dir = self.output_dir / "video_save"
- video_save_dir.mkdir(parents=True, exist_ok=True)
- clips_path = []
- for storyboard in storyboards["storyboard"]:
- for item in storyboard["storyboard"]:
- if os.path.exists(item["clip_path"]):
- clips_path.append(item["clip_path"])
-
- output_file = str(video_save_dir / "final_video.mp4")
- concat_videos(clips_path, output_file)
-
- return {
- "output_file": output_file,
- "data": "final video"
- }
|