idea2video_pipeline.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. """
  2. idea2video 任务流
  3. 从idea到video的完整任务流
  4. 包括:
  5. 1. 从idea到story
  6. 2. 从story到script
  7. 3. 从script到video
  8. """
  9. import os
  10. import asyncio
  11. from pathlib import Path
  12. from typing import Dict, Optional
  13. from taskflow import TaskManager, FileIOHandler
  14. from ..mcps.story_create import develop_story, develop_story_base_on_story
  15. from ..mcps.character_extract import extract_characters
  16. from ..mcps.storyboard_create import create_storyboard
  17. from ..mcps.character_portrait import gen_single_character_portrait
  18. from ..mcps.camera_tree import create_camera_tree
  19. from ..mcps.refer_image import select_refer_image
  20. from ..mcps.concat_clip import concat_videos
  21. from ..utils.tools import download_image, efficient_sort
  22. from api_modules.ark_client_async import AsyncArkClient
  23. from api_modules.ark_image_client_async import AsyncArkImageClient
  24. from api_modules.ark_video_client import ArkVideoClient
  25. from api_modules.ark_video_client_async import AsyncArkVideoClient
  26. from taskflow import get_logger
  27. logger = get_logger("examples.video_create.pipeline.idea2video_pipeline")
  28. class Idea2VideoPipeline:
  29. """视频创作任务流"""
  30. def __init__(self, io_handler: FileIOHandler, output_dir: str, manager: TaskManager):
  31. """
  32. 初始化视频创作任务流
  33. Args:
  34. io_handler: 文件I/O处理器
  35. output_dir: 输出目录
  36. manager: 任务管理器
  37. """
  38. self.io_handler = io_handler
  39. self.output_dir = Path(output_dir)
  40. self.output_dir.mkdir(parents=True, exist_ok=True)
  41. self.manager = manager
  42. async def step1_develop_story(self, idea: str, user_requirement: Optional[str] = None) -> Dict:
  43. """步骤1:从idea到story"""
  44. async with AsyncArkClient() as client:
  45. story = await develop_story(client=client, idea=idea, user_requirement=user_requirement)
  46. output_file = str(self.output_dir / "step1_story.txt")
  47. await self.io_handler.write_text_async(story, output_file)
  48. return {
  49. "output_file": output_file,
  50. "data": story
  51. }
  52. async def step2_develop_script(self, user_requirement: Optional[str] = None) -> Dict:
  53. """步骤2:从story到script"""
  54. previous_output = self.manager.load_step_output("step1")
  55. if previous_output is None:
  56. raise ValueError("步骤1未完成,无法从story到script")
  57. story = previous_output["data"]
  58. async with AsyncArkClient() as client:
  59. script = await develop_story_base_on_story(client=client, story=story, user_requirement=user_requirement)
  60. output_file = str(self.output_dir / "step2_script.json")
  61. await self.io_handler.write_json_async(script, output_file)
  62. return {
  63. "output_file": output_file,
  64. "data": script
  65. }
  66. async def step3_extract_characters(self) -> Dict:
  67. """步骤3:从script到characters"""
  68. previous_output = self.manager.load_step_output("step1")
  69. if previous_output is None:
  70. raise ValueError("步骤2未完成,无法从story到characters")
  71. story = previous_output["data"]
  72. async with AsyncArkClient() as client:
  73. characters = await extract_characters(client=client, story=story)
  74. output_file = str(self.output_dir / "step3_characters.json")
  75. await self.io_handler.write_json_async(characters, output_file)
  76. return {
  77. "output_file": output_file,
  78. "data": characters
  79. }
  80. async def step4_create_storyboard(self, user_requirement: Optional[str] = None) -> Dict:
  81. """步骤4:从script到storyboard"""
  82. previous_script_output = self.manager.load_step_output("step2")
  83. if previous_script_output is None:
  84. raise ValueError("步骤2未完成,无法从script到storyboard")
  85. previous_characters_output = self.manager.load_step_output("step3")
  86. if previous_characters_output is None:
  87. raise ValueError("步骤3未完成,无法从script到storyboard")
  88. script = previous_script_output["data"]
  89. characters = previous_characters_output["data"]
  90. # 将 characters 转换为字符串格式(如果它是字典)
  91. if isinstance(characters, dict):
  92. # 将角色信息格式化为字符串
  93. characters_str = ""
  94. if "characters" in characters:
  95. for char in characters["characters"]:
  96. char_info = []
  97. if "identifier_in_scene" in char:
  98. char_info.append(f"标识符: {char['identifier_in_scene']}")
  99. if "static_features" in char:
  100. char_info.append(f"静态特征: {char['static_features']}")
  101. if "dynamic_features" in char:
  102. char_info.append(f"动态特征: {char['dynamic_features']}")
  103. characters_str += " | ".join(char_info) + "\n"
  104. else:
  105. characters_str = str(characters)
  106. else:
  107. characters_str = str(characters)
  108. storyboard = await create_storyboard(script=script, characters=characters_str, user_requirement=user_requirement)
  109. output_file = str(self.output_dir / "step4_storyboard.json")
  110. await self.io_handler.write_json_async(storyboard, output_file)
  111. return {
  112. "output_file": output_file,
  113. "data": storyboard
  114. }
  115. async def step5_generate_portrait(
  116. self,
  117. size: Optional[str] = "2048x2048",
  118. refer_image: Optional[list[str]] = None,
  119. refer_image_map: Optional[Dict[str, list[str]]] = None,
  120. style: Optional[str] = None
  121. ) -> Dict:
  122. """
  123. 步骤5:从characters到portrait(并行生成所有角色肖像)
  124. Args:
  125. size: 生成图片的尺寸,默认为 "2048x2048"
  126. refer_image: 全局参考图片列表(所有角色共享),优先级低于 refer_image_map
  127. refer_image_map: 角色标识符到参考图片列表的映射字典,格式为 {角色标识符: [图片路径1, 图片路径2, ...]}
  128. 例如: {"林小星": ["path/to/image1.jpg"], "阿凯": ["path/to/image2.jpg"]}
  129. 优先级最高,会覆盖 refer_image 和角色字段中的 refer_image
  130. style: 生成风格,例如 "写实"、"卡通" 等
  131. """
  132. previous_output = self.manager.load_step_output("step3")
  133. if previous_output is None:
  134. raise ValueError("步骤3未完成,无法从characters到portrait")
  135. characters = previous_output["data"]
  136. # 确保portraits目录存在
  137. portraits_dir = self.output_dir / "portraits"
  138. portraits_dir.mkdir(parents=True, exist_ok=True)
  139. async def process_single_character(client: AsyncArkImageClient, character: dict) -> None:
  140. """处理单个角色的肖像生成和下载"""
  141. character_id = character["identifier_in_scene"]
  142. # 确定该角色使用的参考图片,优先级从高到低:
  143. # 1. refer_image_map 中该角色的映射
  144. # 2. 全局 refer_image
  145. # 3. 角色字典中的 refer_image 字段(如果存在)
  146. # 4. None
  147. character_refer_image = None
  148. if refer_image_map is not None and character_id in refer_image_map:
  149. character_refer_image = refer_image_map[character_id]
  150. logger.info(f"角色 {character_id} 使用映射中的参考图片: {character_refer_image}")
  151. elif refer_image is not None:
  152. character_refer_image = refer_image
  153. logger.info(f"角色 {character_id} 使用全局参考图片: {character_refer_image}")
  154. elif "refer_image" in character and character["refer_image"] is not None:
  155. character_refer_image = character["refer_image"]
  156. # 确保是列表格式
  157. if isinstance(character_refer_image, str):
  158. character_refer_image = [character_refer_image]
  159. logger.info(f"角色 {character_id} 使用角色字段中的参考图片: {character_refer_image}")
  160. # 生成肖像图片URL
  161. image_url = await gen_single_character_portrait(
  162. client=client,
  163. character=character,
  164. size=size,
  165. refer_image=character_refer_image,
  166. style=style
  167. )
  168. # 下载图片(使用asyncio.to_thread在线程池中执行同步IO操作)
  169. image_path = str(portraits_dir / f"{character_id}_{self.output_dir.name}.jpg")
  170. await asyncio.to_thread(download_image, image_url, image_path)
  171. character["portrait_path"] = image_path
  172. async with AsyncArkImageClient() as client:
  173. # 并行处理所有角色
  174. tasks = [
  175. process_single_character(client, character)
  176. for character in characters["characters"]
  177. ]
  178. await asyncio.gather(*tasks)
  179. output_file = str(self.output_dir / "step5_portrait.json")
  180. await self.io_handler.write_json_async(characters, output_file)
  181. return {
  182. "output_file": output_file,
  183. "data": characters
  184. }
  185. async def step6_create_camera_tree(self) -> Dict:
  186. """步骤6:从storyboard到camera_tree"""
  187. previous_output = self.manager.load_step_output("step4")
  188. if previous_output is None:
  189. raise ValueError("步骤4未完成,无法从storyboard到camera_tree")
  190. storyboards = previous_output["data"]
  191. # 确保camera_tree目录存在
  192. camera_tree_dir = self.output_dir / "camera_tree"
  193. camera_tree_dir.mkdir(parents=True, exist_ok=True)
  194. async def process_single_storyboard(client: AsyncArkClient, storyboard: Dict, scene_idx: int) -> None:
  195. """处理单个storyboard的camera_tree生成和下载"""
  196. camera_tree = await create_camera_tree(client=client, storyboard=storyboard)
  197. camera_tree_path = str(camera_tree_dir / f"camera_tree_{scene_idx}.json")
  198. await self.io_handler.write_json_async(camera_tree, camera_tree_path)
  199. async with AsyncArkClient() as client:
  200. # 并行处理所有storyboard
  201. tasks = [
  202. process_single_storyboard(client, storyboard, idx)
  203. for idx, storyboard in enumerate(storyboards["storyboard"])
  204. ]
  205. await asyncio.gather(*tasks)
  206. output_file = str(self.output_dir / "step6_camera_tree.json")
  207. await self.io_handler.write_json_async(storyboards, output_file)
  208. return {
  209. "output_file": output_file,
  210. "data": storyboards
  211. }
  212. async def step7_generate_video_frames(self) -> Dict:
  213. """步骤7:从camera_tree到video_frames"""
  214. previous_storyboard_output = self.manager.load_step_output("step6")
  215. if previous_storyboard_output is None:
  216. raise ValueError("步骤6未完成,无法从camera_tree到video_frames")
  217. previous_portrait_output = self.manager.load_step_output("step5")
  218. if previous_portrait_output is None:
  219. raise ValueError("步骤5未完成,无法从characters到video_frames")
  220. storyboards = previous_storyboard_output["data"]
  221. characters = previous_portrait_output["data"]
  222. # 确保video_frames目录存在
  223. video_frames_dir = self.output_dir / "video_frames"
  224. video_frames_dir.mkdir(parents=True, exist_ok=True)
  225. async def process_single_storyboard(client: AsyncArkClient, image_client: AsyncArkImageClient, storyboard: Dict, scene_idx: int) -> Dict:
  226. """处理单个storyboard的video_frames生成和下载"""
  227. camera_tree = storyboard["camera_tree"]
  228. storyboard = storyboard["storyboard"]
  229. parent_shot_idxs = [0]
  230. active_shot_idxs = []
  231. for _, item in enumerate(camera_tree):
  232. if item["parent_shot_idx"] is not None:
  233. parent_shot_idxs.append(item["parent_shot_idx"])
  234. active_shot_idxs.append(item["active_shot_idxs"])
  235. process_order = efficient_sort(parent_shot_idxs, active_shot_idxs)
  236. process_order += [i for i in range(len(active_shot_idxs)) if i not in process_order]
  237. for cam_idx in process_order:
  238. logger.info(f"Processing scene {scene_idx} - camera {cam_idx}...")
  239. camera_item = camera_tree[cam_idx]
  240. prev_frame_path_and_text_pairs = []
  241. for _, shot_idx in enumerate(camera_item["active_shot_idxs"]):
  242. logger.info(f"Processing scene {scene_idx} - camera {cam_idx} - shot {shot_idx}...")
  243. frame_description = storyboard[shot_idx]["ff_desc"]
  244. vis_char_idxs = storyboard[shot_idx]["ff_vis_char_idxs"]
  245. image_path_and_text_pairs = []
  246. frame_save_path = str(video_frames_dir / f"scene{scene_idx}_camera{cam_idx}_shot{shot_idx}_{self.output_dir.name}.png")
  247. if os.path.exists(frame_save_path):
  248. logger.info(f"Frame for scene {scene_idx} - camera {cam_idx} - shot {shot_idx} already exists.")
  249. # 即使文件已存在,也需要设置 ff_path 和 prev_frame_path_and_text_pairs,以便后续引用
  250. storyboard[shot_idx]["ff_path"] = frame_save_path
  251. prev_frame_path_and_text_pairs.append((frame_save_path, frame_description))
  252. continue
  253. else:
  254. # 参考可见角色三视图
  255. for vis_char_idx in vis_char_idxs:
  256. logger.info(f"Referencing character {vis_char_idx} portrait...")
  257. image_path_and_text_pairs.append((characters["characters"][vis_char_idx]["portrait_path"], f'{characters["characters"][vis_char_idx]["identifier_in_scene"]}的三视图肖像'))
  258. # 参考前序帧
  259. image_path_and_text_pairs.extend(prev_frame_path_and_text_pairs)
  260. # 参考父帧
  261. if camera_item["parent_shot_idx"] is not None:
  262. parent_shot_idx = camera_item["parent_shot_idx"]
  263. parent_shot = storyboard[parent_shot_idx]
  264. # 检查父帧的 ff_path 是否存在
  265. parent_ff_path = parent_shot.get("ff_path")
  266. # 如果 ff_path 不存在,尝试从文件系统推断(基于 parent_cam_idx)
  267. if parent_ff_path is None and camera_item.get("parent_cam_idx") is not None:
  268. parent_cam_idx = camera_item["parent_cam_idx"]
  269. inferred_parent_path = str(video_frames_dir / f"scene{scene_idx}_camera{parent_cam_idx}_shot{parent_shot_idx}.png")
  270. if os.path.exists(inferred_parent_path):
  271. parent_ff_path = inferred_parent_path
  272. # 更新父帧的 ff_path,以便后续引用
  273. parent_shot["ff_path"] = parent_ff_path
  274. logger.info(f"Found parent frame at inferred path: {parent_ff_path}")
  275. # 如果找到了父帧路径,添加到参考列表
  276. if parent_ff_path is not None:
  277. image_path_and_text_pairs.append((parent_ff_path, parent_shot["ff_desc"]))
  278. else:
  279. logger.warning(f"Parent frame for shot {shot_idx} (parent_shot_idx={parent_shot_idx}) not found, skipping parent frame reference.")
  280. # 筛选参考图像,生成生图提示词
  281. info_for_gen_frame = await select_refer_image(
  282. client=client,
  283. frame_description=frame_description,
  284. image_text_pairs=image_path_and_text_pairs
  285. )
  286. # 生成序列帧
  287. frame_prompt = info_for_gen_frame["text_prompt"]
  288. image_urls = [item[0] for item in info_for_gen_frame["reference_image_path_and_text_pairs"]]
  289. logger.info(f"Frame prompt: {frame_prompt}")
  290. logger.info(f"Reference images: {image_urls}")
  291. response = await image_client.generate_image(
  292. prompt=frame_prompt,
  293. reference_image=image_urls
  294. )
  295. frame_url = image_client.get_image_url(response)
  296. await asyncio.to_thread(download_image, frame_url, frame_save_path)
  297. prev_frame_path_and_text_pairs.append((frame_save_path, frame_description))
  298. storyboard[shot_idx]["ff_path"] = frame_save_path
  299. async with AsyncArkClient() as client, AsyncArkImageClient() as image_client:
  300. # 并行处理所有storyboard
  301. tasks = [
  302. process_single_storyboard(client, image_client, storyboard, idx)
  303. for idx, storyboard in enumerate(storyboards["storyboard"])
  304. ]
  305. await asyncio.gather(*tasks, return_exceptions=True)
  306. output_file = str(self.output_dir / "step7_video_frames.json")
  307. await self.io_handler.write_json_async(storyboards, output_file)
  308. return {
  309. "output_file": output_file,
  310. "data": storyboards
  311. }
  312. async def step8_generate_video(self) -> Dict:
  313. """步骤8:从video_frames到video_clips"""
  314. previous_video_frames_output = self.manager.load_step_output("step7")
  315. if previous_video_frames_output is None:
  316. raise ValueError("步骤7未完成,无法从video_frames到video_clips")
  317. storyboards = previous_video_frames_output["data"]
  318. # 确保video_clips目录存在
  319. video_clip_dir = self.output_dir / "video_clips"
  320. video_clip_dir.mkdir(parents=True, exist_ok=True)
  321. async def process_single_storyboard(video_client: AsyncArkVideoClient, storyboard: Dict, scene_idx: int) -> list:
  322. """处理单个video_frame的video_clip生成和下载,返回所有后台任务"""
  323. background_tasks = []
  324. for shot in storyboard["storyboard"]:
  325. len_id = shot["idx"]
  326. motion_prompt = shot["motion_desc"]
  327. image_url = shot["ff_path"]
  328. lens_duration = 4
  329. video_save_path = str(video_clip_dir / f"scene{scene_idx}_len{len_id}.mp4")
  330. # 构建生成参数字符串
  331. gen_params = f" --dur {lens_duration}"
  332. task_id, background_task = await video_client.create_video_task_async(
  333. prompt=motion_prompt,
  334. image_url=image_url,
  335. gen_params=gen_params,
  336. output_path=video_save_path
  337. )
  338. if background_task is not None:
  339. background_tasks.append(background_task)
  340. shot["clip_path"] = video_save_path
  341. logger.info(f"已提交视频生成任务,scene {scene_idx}, shot {len_id}, task_id: {task_id}")
  342. else:
  343. logger.error(f"视频生成任务提交失败,scene {scene_idx}, shot {len_id}")
  344. return background_tasks
  345. async with AsyncArkVideoClient() as video_client:
  346. # 并行处理所有storyboard,收集所有后台任务
  347. storyboard_tasks = [
  348. process_single_storyboard(video_client, storyboard, idx)
  349. for idx, storyboard in enumerate(storyboards["storyboard"])
  350. ]
  351. storyboard_results = await asyncio.gather(*storyboard_tasks)
  352. # 展平所有后台任务
  353. all_background_tasks = []
  354. for task_list in storyboard_results:
  355. all_background_tasks.extend(task_list)
  356. # 等待所有视频生成和下载完成
  357. if all_background_tasks:
  358. logger.info(f"等待 {len(all_background_tasks)} 个视频生成任务完成...")
  359. await asyncio.gather(*all_background_tasks)
  360. logger.info("所有视频生成任务已完成!")
  361. else:
  362. logger.warning("没有提交任何视频生成任务")
  363. output_file = str(self.output_dir / "step8_video_clips.json")
  364. await self.io_handler.write_json_async(storyboards, output_file)
  365. return {
  366. "output_file": output_file,
  367. "data": storyboards
  368. }
  369. async def step9_concat_clip(self) -> Dict:
  370. """步骤9:拼接所有视频片段进行输出"""
  371. previous_output = self.manager.load_step_output("step8")
  372. if previous_output is None:
  373. raise ValueError("步骤8未完成,无法进行视频拼接!")
  374. storyboards = previous_output["data"]
  375. # 确保video_save目录存在
  376. video_save_dir = self.output_dir / "video_save"
  377. video_save_dir.mkdir(parents=True, exist_ok=True)
  378. clips_path = []
  379. for storyboard in storyboards["storyboard"]:
  380. for item in storyboard["storyboard"]:
  381. if os.path.exists(item["clip_path"]):
  382. clips_path.append(item["clip_path"])
  383. output_file = str(video_save_dir / "final_video.mp4")
  384. concat_videos(clips_path, output_file)
  385. return {
  386. "output_file": output_file,
  387. "data": "final video"
  388. }