import os import json import time import asyncio from typing import Optional from utils.tools import ( string_to_json, save_json_file, setup_logger, efficient_sort ) from tools.banana_pro import generate_image_from_prompt_and_images from tools.text_generator import media_captioner from tools.image_generator import image_generator from tools.video_generator import video_generator from tools.video_composer import video_composer, concat_videos from mcps.story_create import story_creator from mcps.character_extract import character_extractor from mcps.character_portraits_generate import character_portraits_generator from mcps.storyboard_create import storyboard_creator from mcps.camera_tree import camera_tree_creator from mcps.reference_image_select import reference_image_selector logger = setup_logger(__name__) class Script2VideoPipeline: def __init__( self ): pass def video_create_pipeline( self, idea: str, user_requirement: Optional[str] = None, style: Optional[str] = None, ): # 1. 创建故事 logger.info("Creating story...") if os.path.exists("./output/story.txt"): with open("./output/story.txt", "r", encoding='utf-8') as f: story = f.read() else: story = story_creator.develop_story( idea=idea, user_requirement=user_requirement ) with open("./output/story.txt", "w", encoding='utf-8') as f: f.write(story) # 2. 创建剧本: 分场景创建 logger.info("Writing script...") if os.path.exists("./output/script.json"): with open("./output/script.json", "r", encoding='utf-8') as f: script = json.load(f) else: script = story_creator.write_script_on_story( story=story, user_requirement=user_requirement ) with open("./output/script.json", "w", encoding='utf-8') as f: json.dump(script, f, ensure_ascii=False, indent=4) # 3. 抽取角色 logger.info("Extracting characters...") if os.path.exists("./output/characters.json"): with open("./output/characters.json", "r", encoding='utf-8') as f: characters = json.load(f) else: characters = character_extractor.extract_characters( script=script ) with open("./output/characters.json", "w", encoding='utf-8') as f: json.dump(characters, f, ensure_ascii=False, indent=4) # 4. 设计角色稿 logger.info("Designing character portraits...") if os.path.exists("./output/character_portraits.json"): with open("./output/character_portraits.json", "r", encoding='utf-8') as f: character_portraits = json.load(f) else: character_portraits = self._character_portraits_generator( characters=characters, style=style ) with open("./output/character_portraits.json", "w", encoding='utf-8') as f: json.dump(character_portraits, f, ensure_ascii=False, indent=4) # 5. 为每个场景剧本创建分镜脚本 logger.info("Creating storyboard...") if os.path.exists("./output/storyboards.json"): with open("./output/storyboards.json", "r", encoding='utf-8') as f: storyboards = json.load(f) else: storyboards = self._create_storyboard( script=script, characters=str(characters), user_requirement=user_requirement ) with open("./output/storyboards.json", "w", encoding='utf-8') as f: json.dump(storyboards, f, ensure_ascii=False, indent=4) # 6. 构建相机树 logger.info("Building camera tree...") if os.path.exists("./output/storyboards_with_camera_tree.json"): with open("./output/storyboards_with_camera_tree.json", "r", encoding='utf-8') as f: storyboards_with_camera_tree = json.load(f) else: storyboards_with_camera_tree = self._create_camera_tree( storyboards=storyboards ) with open("./output/storyboards_with_camera_tree.json", "w", encoding='utf-8') as f: json.dump(storyboards_with_camera_tree, f, ensure_ascii=False, indent=4) # 7. 视频帧生成 logger.info("Generating video frames...") if os.path.exists("./output/storyboards_with_frames.json"): with open("./output/storyboards_with_frames.json", "r", encoding='utf-8') as f: storyboards_with_frames = json.load(f) else: storyboards_with_frames = self._generate_video_frames_for_scene( storyboards_with_camera_tree=storyboards_with_camera_tree, character_portraits=character_portraits ) with open("./output/storyboards_with_frames.json", "w", encoding='utf-8') as f: json.dump(storyboards_with_frames, f, ensure_ascii=False, indent=4) # 8. 视频片段生成 logger.info("Generating video segments...") if os.path.exists("./output/storyboards_with_segments.json"): with open("./output/storyboards_with_segments.json", "r", encoding='utf-8') as f: storyboards_with_segments = json.load(f) else: storyboards_with_segments = video_generator.generate( video_script_data=storyboards_with_frames ) with open("./output/storyboards_with_segments.json", "w", encoding='utf-8') as f: json.dump(storyboards_with_segments[0], f, ensure_ascii=False, indent=4) # 9. 拼接视频 logger.info("Splicing video...") if os.path.exists("./output/final_video.mp4"): logger.info("Video spliced.") else: concat_videos("./output/storyboards_with_segments.json", "./output/final_video.mp4") logger.info("Video spliced.") def _create_storyboard( self, script: dict, characters: str, user_requirement: Optional[str] = None, ): scene_storyboard = [] for idx, scene_script in enumerate(script["script"]): logger.info(f"Creating storyboard for scene {idx}...") if os.path.exists(f"./output/storyboard_{idx}.json"): with open(f"./output/storyboard_{idx}.json", "r", encoding='utf-8') as f: storyboard = json.load(f) else: storyboard = storyboard_creator.create_storyboard( script=scene_script, characters=characters, user_requirement=user_requirement ) with open(f"./output/storyboard_{idx}.json", "w", encoding='utf-8') as f: json.dump(storyboard, f, ensure_ascii=False, indent=4) scene_storyboard.append(storyboard) logger.info(f"Storyboard for scene {idx} created.") storyboards = { "storyboards": scene_storyboard } return storyboards def _create_camera_tree( self, storyboards: dict ): for idx, storyboard in enumerate(storyboards["storyboards"]): logger.info(f"Creating camera tree for scene {idx}...") if os.path.exists(f"./output/storyboard_{idx}_with_camera_tree.json"): with open(f"./output/storyboard_{idx}_with_camera_tree.json", "r", encoding='utf-8') as f: camera_tree = json.load(f) else: camera_tree = camera_tree_creator.create_camera_tree( shot_descriptions=storyboard["storyboard"] ) with open(f"./output/storyboard_{idx}_with_camera_tree.json", "w", encoding='utf-8') as f: json.dump(camera_tree, f, ensure_ascii=False, indent=4) storyboard |= camera_tree logger.info(f"Camera tree for scene {idx} created.") return storyboards def _character_portraits_generator( self, characters: dict, style: str ): for idx, character in enumerate(characters["characters"]): logger.info(f"Designing portrait for character {idx}...") if os.path.exists(f"./output/portraits_{idx}.json"): logger.info(f"Portrait for character {idx} already exists.") with open(f"./output/portraits_{idx}.json", "r", encoding='utf-8') as f: portrait_info = json.load(f) else: front_image_path = f"./output/front_portrait_{idx}.png" side_image_path = f"./output/side_portrait_{idx}.png" back_image_path = f"./output/back_portrait_{idx}.png" front_portrait = character_portraits_generator.generate_front_portrait( character=character, style=style ) front_portrait.save(front_image_path) side_portrait = character_portraits_generator.generate_side_portrait( character=character, front_image_path=[front_image_path] ) side_portrait.save(side_image_path) back_portrait = character_portraits_generator.generate_back_portrait( character=character, front_image_path=[front_image_path] ) back_portrait.save(back_image_path) portrait_info = { "front_portrait": front_image_path, "side_portrait": side_image_path, "back_portrait": back_image_path } with open(f"./output/portraits_{idx}.json", "w", encoding='utf-8') as f: json.dump(portrait_info, f, ensure_ascii=False, indent=4) character |= portrait_info logger.info(f"Portrait for character {idx} designed.") return characters def _generate_video_frames_for_scene( self, storyboards_with_camera_tree: dict, character_portraits: dict ): shot_num = 0 for scene_idx, storyboard_with_camera_tree in enumerate(storyboards_with_camera_tree["storyboards"]): logger.info(f"Generating video frames for scene {scene_idx}...") storyboard = storyboard_with_camera_tree["storyboard"] camera_tree = storyboard_with_camera_tree["camera_tree"] parent_shot_idxs = [0] active_shot_idxs = [] for _, item in enumerate(camera_tree): if item["parent_shot_idx"] is not None: parent_shot_idxs.append(item["parent_shot_idx"]) active_shot_idxs.append(item["active_shot_idxs"]) process_order = efficient_sort(parent_shot_idxs, active_shot_idxs) for cam_idx in process_order: logger.info(f"Processing scene {scene_idx} - camera {cam_idx}...") camera_item = camera_tree[cam_idx] prev_frame_path_and_text_pairs = [] for _, shot_idx in enumerate(camera_item["active_shot_idxs"]): logger.info(f"Processing scene {scene_idx} - camera {cam_idx} - shot {shot_idx}...") frame_description = storyboard[shot_idx]["ff_desc"] vis_char_idxs = storyboard[shot_idx]["ff_vis_char_idxs"] shot_num += 1 image_path_and_text_pairs = [] frame_save_path = f"./output/frame_scene{scene_idx}_camera{cam_idx}_shot{shot_idx}.png" if os.path.exists(frame_save_path): logger.info(f"Frame for scene {scene_idx} - camera {cam_idx} - shot {shot_idx} already exists.") continue else: # 参考可见角色三视图 for vis_char_idx in vis_char_idxs: logger.info(f"Referencing character {vis_char_idx} portrait...") image_path_and_text_pairs.append((character_portraits["characters"][vis_char_idx]["front_portrait"], f"{character_portraits['characters'][vis_char_idx]['identifier_in_scene']}的正面肖像")) image_path_and_text_pairs.append((character_portraits["characters"][vis_char_idx]["side_portrait"], f"{character_portraits['characters'][vis_char_idx]['identifier_in_scene']}的侧面肖像")) image_path_and_text_pairs.append((character_portraits["characters"][vis_char_idx]["back_portrait"], f"{character_portraits['characters'][vis_char_idx]['identifier_in_scene']}的背面肖像")) # 参考前序帧 image_path_and_text_pairs.extend(prev_frame_path_and_text_pairs) # 参考父帧 if camera_item["parent_shot_idx"] is not None: image_path_and_text_pairs.append((storyboard[camera_item["parent_shot_idx"]]["ff_path"], storyboard[camera_item["parent_shot_idx"]]["ff_desc"])) # 筛选参考图像,生成生图提示词 info_for_gen_frame = reference_image_selector.select_reference_images_and_generate_prompt( image_path_and_text_pairs=image_path_and_text_pairs, frame_description=frame_description ) logger.info(f"目标帧描述:\n{frame_description}") logger.info(f"可参考帧:\n{image_path_and_text_pairs}") logger.info(f"实际参考:\n{info_for_gen_frame}") # 生成序列帧 frame_prompt = info_for_gen_frame["text_prompt"] image_urls = [item[0] for item in info_for_gen_frame["reference_image_path_and_text_pairs"]] logger.info(f"Frame prompt: {frame_prompt}") logger.info(f"Reference images: {image_urls}") # 开始生成帧 # if len(image_urls) == 0: # frame = asyncio.run(image_generator.generate_without_refer(frame_prompt)) # frame.save_url(frame_save_path) # else: # frame = asyncio.run(image_generator.generate(frame_prompt, image_urls)) # frame.save_url(frame_save_path) frame = generate_image_from_prompt_and_images(frame_prompt, image_paths=image_urls) frame.save(frame_save_path) # 保存前序帧 prev_frame_path_and_text_pairs.append((frame_save_path, frame_description)) # storyboard[shot_idx]["ff_url"] = frame.data storyboard[shot_idx]["ff_path"] = frame_save_path # shot_num += 1 logger.info(f"Generated {shot_num} video frames.") with open(f"./output/final_storyboards.json", "w", encoding='utf-8') as f: json.dump(storyboards_with_camera_tree, f, ensure_ascii=False, indent=4) return storyboards_with_camera_tree if __name__ == "__main__": pipeline = Script2VideoPipeline() pipeline.video_create_pipeline( idea="身穿时尚服装的美女在街头漫步", user_requirement="剧情要连贯,最多三个场景", style="写实风格" ) # with open("./output/storyboards_with_camera_tree.json", "r") as f: # storyboards = json.load(f) # full_items = storyboards["storyboards"] # for item in full_items: # camera_tree = item["camera_tree"] # for camera in camera_tree: # active_shot_idxs = camera["active_shot_idxs"][0] # camera["active_shot_idxs"] = active_shot_idxs # with open("./output/storyboards_with_camera_treess.json", "w") as f: # json.dump(storyboards, f, ensure_ascii=False, indent=4) # 生成角色肖像三视图 # with open("./output/characters.json", "r", encoding='utf-8') as f: # characters = json.load(f) # character_portraits = pipeline._character_portraits_generator( # characters=characters, # style="cartoon" # ) # with open("./output/character_portraits.json", "w", encoding='utf-8') as f: # json.dump(character_portraits, f, ensure_ascii=False, indent=4) # with open("./output/character_portraits.json", "r", encoding='utf-8') as f: # character_portraits = json.load(f) # with open("./output/storyboards_with_camera_tree.json", "r", encoding='utf-8') as f: # storyboards_with_camera_tree = json.load(f) # shot_num = 0 # for scene_idx, storyboard_with_camera_tree in enumerate(storyboards_with_camera_tree["storyboards"]): # for shot_idx, shot in enumerate(storyboard_with_camera_tree["storyboard"]): # ff_path = f"./output/frame_scene{scene_idx}_camera{shot['cam_idx']}_shot{shot_idx}.png" # if os.path.exists(ff_path): # shot["ff_path"] = ff_path # shot_num += 1 # logger.info(f"Total shot number: {shot_num}") # # 生成视频帧 # result = pipeline._generate_video_frames_for_scene( # storyboards_with_camera_tree=storyboards_with_camera_tree, # character_portraits=character_portraits # ) # with open("./output/storyboards_with_frames.json", "w", encoding='utf-8') as f: # json.dump(storyboards_with_camera_tree, f, ensure_ascii=False, indent=4) # # 将指定目录下的所有frame_scene*.png文件重命名为new_frame_scene*.png # for file in os.listdir("./output"): # if file.startswith("frame_scene") and file.endswith(".png"): # new_file = file.replace("frame_scene", "new_frame_scene") # os.rename(os.path.join("./output", file), os.path.join("./output", new_file)) # 生成视频片段 # with open("./output/storyboards_with_frames.json", "r", encoding='utf-8') as f: # final_storyboards = json.load(f) # storyboards_with_segments = video_generator.generate( # video_script_data=final_storyboards # ) # with open("./output/storyboards_with_segments.json", "w", encoding='utf-8') as f: # json.dump(storyboards_with_segments, f, ensure_ascii=False, indent=4) # concat_videos("./output/storyboards_with_segments.json")