| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432 |
- import os
- import json
- import time
- import asyncio
- from typing import Optional
- from utils.tools import (
- string_to_json,
- save_json_file,
- setup_logger,
- efficient_sort
- )
- from tools.banana_pro import generate_image_from_prompt_and_images
- from tools.text_generator import media_captioner
- from tools.image_generator import image_generator
- from tools.video_generator import video_generator
- from tools.video_composer import video_composer, concat_videos
- from mcps.story_create import story_creator
- from mcps.character_extract import character_extractor
- from mcps.character_portraits_generate import character_portraits_generator
- from mcps.storyboard_create import storyboard_creator
- from mcps.camera_tree import camera_tree_creator
- from mcps.reference_image_select import reference_image_selector
- logger = setup_logger(__name__)
- class Script2VideoPipeline:
- def __init__(
- self
- ):
- pass
- def video_create_pipeline(
- self,
- idea: str,
- user_requirement: Optional[str] = None,
- style: Optional[str] = None,
- ):
- # 1. 创建故事
- logger.info("Creating story...")
- if os.path.exists("./output/story.txt"):
- with open("./output/story.txt", "r", encoding='utf-8') as f:
- story = f.read()
- else:
- story = story_creator.develop_story(
- idea=idea,
- user_requirement=user_requirement
- )
- with open("./output/story.txt", "w", encoding='utf-8') as f:
- f.write(story)
-
- # 2. 创建剧本: 分场景创建
- logger.info("Writing script...")
- if os.path.exists("./output/script.json"):
- with open("./output/script.json", "r", encoding='utf-8') as f:
- script = json.load(f)
- else:
- script = story_creator.write_script_on_story(
- story=story,
- user_requirement=user_requirement
- )
- with open("./output/script.json", "w", encoding='utf-8') as f:
- json.dump(script, f, ensure_ascii=False, indent=4)
- # 3. 抽取角色
- logger.info("Extracting characters...")
- if os.path.exists("./output/characters.json"):
- with open("./output/characters.json", "r", encoding='utf-8') as f:
- characters = json.load(f)
- else:
- characters = character_extractor.extract_characters(
- script=script
- )
- with open("./output/characters.json", "w", encoding='utf-8') as f:
- json.dump(characters, f, ensure_ascii=False, indent=4)
- # 4. 设计角色稿
- logger.info("Designing character portraits...")
- if os.path.exists("./output/character_portraits.json"):
- with open("./output/character_portraits.json", "r", encoding='utf-8') as f:
- character_portraits = json.load(f)
- else:
- character_portraits = self._character_portraits_generator(
- characters=characters,
- style=style
- )
- with open("./output/character_portraits.json", "w", encoding='utf-8') as f:
- json.dump(character_portraits, f, ensure_ascii=False, indent=4)
- # 5. 为每个场景剧本创建分镜脚本
- logger.info("Creating storyboard...")
- if os.path.exists("./output/storyboards.json"):
- with open("./output/storyboards.json", "r", encoding='utf-8') as f:
- storyboards = json.load(f)
- else:
- storyboards = self._create_storyboard(
- script=script,
- characters=str(characters),
- user_requirement=user_requirement
- )
- with open("./output/storyboards.json", "w", encoding='utf-8') as f:
- json.dump(storyboards, f, ensure_ascii=False, indent=4)
- # 6. 构建相机树
- logger.info("Building camera tree...")
- if os.path.exists("./output/storyboards_with_camera_tree.json"):
- with open("./output/storyboards_with_camera_tree.json", "r", encoding='utf-8') as f:
- storyboards_with_camera_tree = json.load(f)
- else:
- storyboards_with_camera_tree = self._create_camera_tree(
- storyboards=storyboards
- )
- with open("./output/storyboards_with_camera_tree.json", "w", encoding='utf-8') as f:
- json.dump(storyboards_with_camera_tree, f, ensure_ascii=False, indent=4)
- # 7. 视频帧生成
- logger.info("Generating video frames...")
- if os.path.exists("./output/storyboards_with_frames.json"):
- with open("./output/storyboards_with_frames.json", "r", encoding='utf-8') as f:
- storyboards_with_frames = json.load(f)
- else:
- storyboards_with_frames = self._generate_video_frames_for_scene(
- storyboards_with_camera_tree=storyboards_with_camera_tree,
- character_portraits=character_portraits
- )
- with open("./output/storyboards_with_frames.json", "w", encoding='utf-8') as f:
- json.dump(storyboards_with_frames, f, ensure_ascii=False, indent=4)
- # 8. 视频片段生成
- logger.info("Generating video segments...")
- if os.path.exists("./output/storyboards_with_segments.json"):
- with open("./output/storyboards_with_segments.json", "r", encoding='utf-8') as f:
- storyboards_with_segments = json.load(f)
- else:
- storyboards_with_segments = video_generator.generate(
- video_script_data=storyboards_with_frames
- )
- with open("./output/storyboards_with_segments.json", "w", encoding='utf-8') as f:
- json.dump(storyboards_with_segments[0], f, ensure_ascii=False, indent=4)
- # 9. 拼接视频
- logger.info("Splicing video...")
- if os.path.exists("./output/final_video.mp4"):
- logger.info("Video spliced.")
- else:
- concat_videos("./output/storyboards_with_segments.json", "./output/final_video.mp4")
- logger.info("Video spliced.")
- def _create_storyboard(
- self,
- script: dict,
- characters: str,
- user_requirement: Optional[str] = None,
- ):
- scene_storyboard = []
- for idx, scene_script in enumerate(script["script"]):
- logger.info(f"Creating storyboard for scene {idx}...")
- if os.path.exists(f"./output/storyboard_{idx}.json"):
- with open(f"./output/storyboard_{idx}.json", "r", encoding='utf-8') as f:
- storyboard = json.load(f)
- else:
- storyboard = storyboard_creator.create_storyboard(
- script=scene_script,
- characters=characters,
- user_requirement=user_requirement
- )
- with open(f"./output/storyboard_{idx}.json", "w", encoding='utf-8') as f:
- json.dump(storyboard, f, ensure_ascii=False, indent=4)
- scene_storyboard.append(storyboard)
- logger.info(f"Storyboard for scene {idx} created.")
- storyboards = {
- "storyboards": scene_storyboard
- }
- return storyboards
- def _create_camera_tree(
- self,
- storyboards: dict
- ):
- for idx, storyboard in enumerate(storyboards["storyboards"]):
- logger.info(f"Creating camera tree for scene {idx}...")
- if os.path.exists(f"./output/storyboard_{idx}_with_camera_tree.json"):
- with open(f"./output/storyboard_{idx}_with_camera_tree.json", "r", encoding='utf-8') as f:
- camera_tree = json.load(f)
- else:
- camera_tree = camera_tree_creator.create_camera_tree(
- shot_descriptions=storyboard["storyboard"]
- )
- with open(f"./output/storyboard_{idx}_with_camera_tree.json", "w", encoding='utf-8') as f:
- json.dump(camera_tree, f, ensure_ascii=False, indent=4)
- storyboard |= camera_tree
- logger.info(f"Camera tree for scene {idx} created.")
- return storyboards
- def _character_portraits_generator(
- self,
- characters: dict,
- style: str
- ):
- for idx, character in enumerate(characters["characters"]):
- logger.info(f"Designing portrait for character {idx}...")
- if os.path.exists(f"./output/portraits_{idx}.json"):
- logger.info(f"Portrait for character {idx} already exists.")
- with open(f"./output/portraits_{idx}.json", "r", encoding='utf-8') as f:
- portrait_info = json.load(f)
- else:
- front_image_path = f"./output/front_portrait_{idx}.png"
- side_image_path = f"./output/side_portrait_{idx}.png"
- back_image_path = f"./output/back_portrait_{idx}.png"
- front_portrait = character_portraits_generator.generate_front_portrait(
- character=character,
- style=style
- )
- front_portrait.save(front_image_path)
-
- side_portrait = character_portraits_generator.generate_side_portrait(
- character=character,
- front_image_path=[front_image_path]
- )
- side_portrait.save(side_image_path)
- back_portrait = character_portraits_generator.generate_back_portrait(
- character=character,
- front_image_path=[front_image_path]
- )
- back_portrait.save(back_image_path)
- portrait_info = {
- "front_portrait": front_image_path,
- "side_portrait": side_image_path,
- "back_portrait": back_image_path
- }
- with open(f"./output/portraits_{idx}.json", "w", encoding='utf-8') as f:
- json.dump(portrait_info, f, ensure_ascii=False, indent=4)
- character |= portrait_info
- logger.info(f"Portrait for character {idx} designed.")
- return characters
- def _generate_video_frames_for_scene(
- self,
- storyboards_with_camera_tree: dict,
- character_portraits: dict
- ):
- shot_num = 0
- for scene_idx, storyboard_with_camera_tree in enumerate(storyboards_with_camera_tree["storyboards"]):
- logger.info(f"Generating video frames for scene {scene_idx}...")
- storyboard = storyboard_with_camera_tree["storyboard"]
- camera_tree = storyboard_with_camera_tree["camera_tree"]
- parent_shot_idxs = [0]
- active_shot_idxs = []
- for _, item in enumerate(camera_tree):
- if item["parent_shot_idx"] is not None:
- parent_shot_idxs.append(item["parent_shot_idx"])
- active_shot_idxs.append(item["active_shot_idxs"])
-
- process_order = efficient_sort(parent_shot_idxs, active_shot_idxs)
- for cam_idx in process_order:
- logger.info(f"Processing scene {scene_idx} - camera {cam_idx}...")
- camera_item = camera_tree[cam_idx]
- prev_frame_path_and_text_pairs = []
- for _, shot_idx in enumerate(camera_item["active_shot_idxs"]):
- logger.info(f"Processing scene {scene_idx} - camera {cam_idx} - shot {shot_idx}...")
- frame_description = storyboard[shot_idx]["ff_desc"]
- vis_char_idxs = storyboard[shot_idx]["ff_vis_char_idxs"]
- shot_num += 1
- image_path_and_text_pairs = []
- frame_save_path = f"./output/frame_scene{scene_idx}_camera{cam_idx}_shot{shot_idx}.png"
- if os.path.exists(frame_save_path):
- logger.info(f"Frame for scene {scene_idx} - camera {cam_idx} - shot {shot_idx} already exists.")
- continue
- else:
- # 参考可见角色三视图
- for vis_char_idx in vis_char_idxs:
- logger.info(f"Referencing character {vis_char_idx} portrait...")
- image_path_and_text_pairs.append((character_portraits["characters"][vis_char_idx]["front_portrait"], f"{character_portraits['characters'][vis_char_idx]['identifier_in_scene']}的正面肖像"))
- image_path_and_text_pairs.append((character_portraits["characters"][vis_char_idx]["side_portrait"], f"{character_portraits['characters'][vis_char_idx]['identifier_in_scene']}的侧面肖像"))
- image_path_and_text_pairs.append((character_portraits["characters"][vis_char_idx]["back_portrait"], f"{character_portraits['characters'][vis_char_idx]['identifier_in_scene']}的背面肖像"))
- # 参考前序帧
- image_path_and_text_pairs.extend(prev_frame_path_and_text_pairs)
- # 参考父帧
- if camera_item["parent_shot_idx"] is not None:
- image_path_and_text_pairs.append((storyboard[camera_item["parent_shot_idx"]]["ff_path"], storyboard[camera_item["parent_shot_idx"]]["ff_desc"]))
- # 筛选参考图像,生成生图提示词
- info_for_gen_frame = reference_image_selector.select_reference_images_and_generate_prompt(
- image_path_and_text_pairs=image_path_and_text_pairs,
- frame_description=frame_description
- )
- logger.info(f"目标帧描述:\n{frame_description}")
- logger.info(f"可参考帧:\n{image_path_and_text_pairs}")
- logger.info(f"实际参考:\n{info_for_gen_frame}")
- # 生成序列帧
- frame_prompt = info_for_gen_frame["text_prompt"]
- image_urls = [item[0] for item in info_for_gen_frame["reference_image_path_and_text_pairs"]]
- logger.info(f"Frame prompt: {frame_prompt}")
- logger.info(f"Reference images: {image_urls}")
- # 开始生成帧
- # if len(image_urls) == 0:
- # frame = asyncio.run(image_generator.generate_without_refer(frame_prompt))
- # frame.save_url(frame_save_path)
- # else:
- # frame = asyncio.run(image_generator.generate(frame_prompt, image_urls))
- # frame.save_url(frame_save_path)
- frame = generate_image_from_prompt_and_images(frame_prompt, image_paths=image_urls)
- frame.save(frame_save_path)
- # 保存前序帧
- prev_frame_path_and_text_pairs.append((frame_save_path, frame_description))
- # storyboard[shot_idx]["ff_url"] = frame.data
- storyboard[shot_idx]["ff_path"] = frame_save_path
- # shot_num += 1
- logger.info(f"Generated {shot_num} video frames.")
- with open(f"./output/final_storyboards.json", "w", encoding='utf-8') as f:
- json.dump(storyboards_with_camera_tree, f, ensure_ascii=False, indent=4)
- return storyboards_with_camera_tree
- if __name__ == "__main__":
- pipeline = Script2VideoPipeline()
- pipeline.video_create_pipeline(
- idea="身穿时尚服装的美女在街头漫步",
- user_requirement="剧情要连贯,最多三个场景",
- style="写实风格"
- )
- # with open("./output/storyboards_with_camera_tree.json", "r") as f:
- # storyboards = json.load(f)
- # full_items = storyboards["storyboards"]
- # for item in full_items:
- # camera_tree = item["camera_tree"]
-
- # for camera in camera_tree:
- # active_shot_idxs = camera["active_shot_idxs"][0]
- # camera["active_shot_idxs"] = active_shot_idxs
-
- # with open("./output/storyboards_with_camera_treess.json", "w") as f:
- # json.dump(storyboards, f, ensure_ascii=False, indent=4)
- # 生成角色肖像三视图
- # with open("./output/characters.json", "r", encoding='utf-8') as f:
- # characters = json.load(f)
- # character_portraits = pipeline._character_portraits_generator(
- # characters=characters,
- # style="cartoon"
- # )
- # with open("./output/character_portraits.json", "w", encoding='utf-8') as f:
- # json.dump(character_portraits, f, ensure_ascii=False, indent=4)
- # with open("./output/character_portraits.json", "r", encoding='utf-8') as f:
- # character_portraits = json.load(f)
- # with open("./output/storyboards_with_camera_tree.json", "r", encoding='utf-8') as f:
- # storyboards_with_camera_tree = json.load(f)
- # shot_num = 0
- # for scene_idx, storyboard_with_camera_tree in enumerate(storyboards_with_camera_tree["storyboards"]):
- # for shot_idx, shot in enumerate(storyboard_with_camera_tree["storyboard"]):
- # ff_path = f"./output/frame_scene{scene_idx}_camera{shot['cam_idx']}_shot{shot_idx}.png"
- # if os.path.exists(ff_path):
- # shot["ff_path"] = ff_path
- # shot_num += 1
- # logger.info(f"Total shot number: {shot_num}")
-
- # # 生成视频帧
- # result = pipeline._generate_video_frames_for_scene(
- # storyboards_with_camera_tree=storyboards_with_camera_tree,
- # character_portraits=character_portraits
- # )
- # with open("./output/storyboards_with_frames.json", "w", encoding='utf-8') as f:
- # json.dump(storyboards_with_camera_tree, f, ensure_ascii=False, indent=4)
- # # 将指定目录下的所有frame_scene*.png文件重命名为new_frame_scene*.png
- # for file in os.listdir("./output"):
- # if file.startswith("frame_scene") and file.endswith(".png"):
- # new_file = file.replace("frame_scene", "new_frame_scene")
- # os.rename(os.path.join("./output", file), os.path.join("./output", new_file))
- # 生成视频片段
- # with open("./output/storyboards_with_frames.json", "r", encoding='utf-8') as f:
- # final_storyboards = json.load(f)
- # storyboards_with_segments = video_generator.generate(
- # video_script_data=final_storyboards
- # )
- # with open("./output/storyboards_with_segments.json", "w", encoding='utf-8') as f:
- # json.dump(storyboards_with_segments, f, ensure_ascii=False, indent=4)
- # concat_videos("./output/storyboards_with_segments.json")
-
|