|
|
@@ -0,0 +1,432 @@
|
|
|
+import os
|
|
|
+import json
|
|
|
+import time
|
|
|
+import asyncio
|
|
|
+from typing import Optional
|
|
|
+from utils.tools import (
|
|
|
+ string_to_json,
|
|
|
+ save_json_file,
|
|
|
+ setup_logger,
|
|
|
+ efficient_sort
|
|
|
+)
|
|
|
+
|
|
|
+from tools.banana_pro import generate_image_from_prompt_and_images
|
|
|
+from tools.text_generator import media_captioner
|
|
|
+from tools.image_generator import image_generator
|
|
|
+from tools.video_generator import video_generator
|
|
|
+from tools.video_composer import video_composer, concat_videos
|
|
|
+from mcps.story_create import story_creator
|
|
|
+from mcps.character_extract import character_extractor
|
|
|
+from mcps.character_portraits_generate import character_portraits_generator
|
|
|
+from mcps.storyboard_create import storyboard_creator
|
|
|
+from mcps.camera_tree import camera_tree_creator
|
|
|
+from mcps.reference_image_select import reference_image_selector
|
|
|
+
|
|
|
+logger = setup_logger(__name__)
|
|
|
+
|
|
|
+class Script2VideoPipeline:
|
|
|
+
|
|
|
+ def __init__(
|
|
|
+ self
|
|
|
+ ):
|
|
|
+ pass
|
|
|
+
|
|
|
+ def video_create_pipeline(
|
|
|
+ self,
|
|
|
+ idea: str,
|
|
|
+ user_requirement: Optional[str] = None,
|
|
|
+ style: Optional[str] = None,
|
|
|
+ ):
|
|
|
+
|
|
|
+ # 1. 创建故事
|
|
|
+ logger.info("Creating story...")
|
|
|
+ if os.path.exists("./output/story.txt"):
|
|
|
+ with open("./output/story.txt", "r", encoding='utf-8') as f:
|
|
|
+ story = f.read()
|
|
|
+ else:
|
|
|
+ story = story_creator.develop_story(
|
|
|
+ idea=idea,
|
|
|
+ user_requirement=user_requirement
|
|
|
+ )
|
|
|
+ with open("./output/story.txt", "w", encoding='utf-8') as f:
|
|
|
+ f.write(story)
|
|
|
+
|
|
|
+ # 2. 创建剧本: 分场景创建
|
|
|
+ logger.info("Writing script...")
|
|
|
+ if os.path.exists("./output/script.json"):
|
|
|
+ with open("./output/script.json", "r", encoding='utf-8') as f:
|
|
|
+ script = json.load(f)
|
|
|
+ else:
|
|
|
+ script = story_creator.write_script_on_story(
|
|
|
+ story=story,
|
|
|
+ user_requirement=user_requirement
|
|
|
+ )
|
|
|
+ with open("./output/script.json", "w", encoding='utf-8') as f:
|
|
|
+ json.dump(script, f, ensure_ascii=False, indent=4)
|
|
|
+
|
|
|
+ # 3. 抽取角色
|
|
|
+ logger.info("Extracting characters...")
|
|
|
+ if os.path.exists("./output/characters.json"):
|
|
|
+ with open("./output/characters.json", "r", encoding='utf-8') as f:
|
|
|
+ characters = json.load(f)
|
|
|
+ else:
|
|
|
+ characters = character_extractor.extract_characters(
|
|
|
+ script=script
|
|
|
+ )
|
|
|
+ with open("./output/characters.json", "w", encoding='utf-8') as f:
|
|
|
+ json.dump(characters, f, ensure_ascii=False, indent=4)
|
|
|
+
|
|
|
+ # 4. 设计角色稿
|
|
|
+ logger.info("Designing character portraits...")
|
|
|
+ if os.path.exists("./output/character_portraits.json"):
|
|
|
+ with open("./output/character_portraits.json", "r", encoding='utf-8') as f:
|
|
|
+ character_portraits = json.load(f)
|
|
|
+ else:
|
|
|
+ character_portraits = self._character_portraits_generator(
|
|
|
+ characters=characters,
|
|
|
+ style=style
|
|
|
+ )
|
|
|
+ with open("./output/character_portraits.json", "w", encoding='utf-8') as f:
|
|
|
+ json.dump(character_portraits, f, ensure_ascii=False, indent=4)
|
|
|
+
|
|
|
+ # 5. 为每个场景剧本创建分镜脚本
|
|
|
+ logger.info("Creating storyboard...")
|
|
|
+ if os.path.exists("./output/storyboards.json"):
|
|
|
+ with open("./output/storyboards.json", "r", encoding='utf-8') as f:
|
|
|
+ storyboards = json.load(f)
|
|
|
+ else:
|
|
|
+ storyboards = self._create_storyboard(
|
|
|
+ script=script,
|
|
|
+ characters=str(characters),
|
|
|
+ user_requirement=user_requirement
|
|
|
+ )
|
|
|
+ with open("./output/storyboards.json", "w", encoding='utf-8') as f:
|
|
|
+ json.dump(storyboards, f, ensure_ascii=False, indent=4)
|
|
|
+
|
|
|
+ # 6. 构建相机树
|
|
|
+ logger.info("Building camera tree...")
|
|
|
+ if os.path.exists("./output/storyboards_with_camera_tree.json"):
|
|
|
+ with open("./output/storyboards_with_camera_tree.json", "r", encoding='utf-8') as f:
|
|
|
+ storyboards_with_camera_tree = json.load(f)
|
|
|
+ else:
|
|
|
+ storyboards_with_camera_tree = self._create_camera_tree(
|
|
|
+ storyboards=storyboards
|
|
|
+ )
|
|
|
+ with open("./output/storyboards_with_camera_tree.json", "w", encoding='utf-8') as f:
|
|
|
+ json.dump(storyboards_with_camera_tree, f, ensure_ascii=False, indent=4)
|
|
|
+
|
|
|
+ # 7. 视频帧生成
|
|
|
+ logger.info("Generating video frames...")
|
|
|
+ if os.path.exists("./output/storyboards_with_frames.json"):
|
|
|
+ with open("./output/storyboards_with_frames.json", "r", encoding='utf-8') as f:
|
|
|
+ storyboards_with_frames = json.load(f)
|
|
|
+ else:
|
|
|
+ storyboards_with_frames = self._generate_video_frames_for_scene(
|
|
|
+ storyboards_with_camera_tree=storyboards_with_camera_tree,
|
|
|
+ character_portraits=character_portraits
|
|
|
+ )
|
|
|
+ with open("./output/storyboards_with_frames.json", "w", encoding='utf-8') as f:
|
|
|
+ json.dump(storyboards_with_frames, f, ensure_ascii=False, indent=4)
|
|
|
+
|
|
|
+ # 8. 视频片段生成
|
|
|
+ logger.info("Generating video segments...")
|
|
|
+ if os.path.exists("./output/storyboards_with_segments.json"):
|
|
|
+ with open("./output/storyboards_with_segments.json", "r", encoding='utf-8') as f:
|
|
|
+ storyboards_with_segments = json.load(f)
|
|
|
+ else:
|
|
|
+ storyboards_with_segments = video_generator.generate(
|
|
|
+ video_script_data=storyboards_with_frames
|
|
|
+ )
|
|
|
+ with open("./output/storyboards_with_segments.json", "w", encoding='utf-8') as f:
|
|
|
+ json.dump(storyboards_with_segments[0], f, ensure_ascii=False, indent=4)
|
|
|
+
|
|
|
+ # 9. 拼接视频
|
|
|
+ logger.info("Splicing video...")
|
|
|
+ if os.path.exists("./output/final_video.mp4"):
|
|
|
+ logger.info("Video spliced.")
|
|
|
+ else:
|
|
|
+ concat_videos("./output/storyboards_with_segments.json", "./output/final_video.mp4")
|
|
|
+ logger.info("Video spliced.")
|
|
|
+
|
|
|
+ def _create_storyboard(
|
|
|
+ self,
|
|
|
+ script: dict,
|
|
|
+ characters: str,
|
|
|
+ user_requirement: Optional[str] = None,
|
|
|
+ ):
|
|
|
+ scene_storyboard = []
|
|
|
+ for idx, scene_script in enumerate(script["script"]):
|
|
|
+ logger.info(f"Creating storyboard for scene {idx}...")
|
|
|
+ if os.path.exists(f"./output/storyboard_{idx}.json"):
|
|
|
+ with open(f"./output/storyboard_{idx}.json", "r", encoding='utf-8') as f:
|
|
|
+ storyboard = json.load(f)
|
|
|
+ else:
|
|
|
+ storyboard = storyboard_creator.create_storyboard(
|
|
|
+ script=scene_script,
|
|
|
+ characters=characters,
|
|
|
+ user_requirement=user_requirement
|
|
|
+ )
|
|
|
+ with open(f"./output/storyboard_{idx}.json", "w", encoding='utf-8') as f:
|
|
|
+ json.dump(storyboard, f, ensure_ascii=False, indent=4)
|
|
|
+
|
|
|
+ scene_storyboard.append(storyboard)
|
|
|
+ logger.info(f"Storyboard for scene {idx} created.")
|
|
|
+
|
|
|
+ storyboards = {
|
|
|
+ "storyboards": scene_storyboard
|
|
|
+ }
|
|
|
+
|
|
|
+ return storyboards
|
|
|
+
|
|
|
+ def _create_camera_tree(
|
|
|
+ self,
|
|
|
+ storyboards: dict
|
|
|
+ ):
|
|
|
+ for idx, storyboard in enumerate(storyboards["storyboards"]):
|
|
|
+ logger.info(f"Creating camera tree for scene {idx}...")
|
|
|
+ if os.path.exists(f"./output/storyboard_{idx}_with_camera_tree.json"):
|
|
|
+ with open(f"./output/storyboard_{idx}_with_camera_tree.json", "r", encoding='utf-8') as f:
|
|
|
+ camera_tree = json.load(f)
|
|
|
+ else:
|
|
|
+ camera_tree = camera_tree_creator.create_camera_tree(
|
|
|
+ shot_descriptions=storyboard["storyboard"]
|
|
|
+ )
|
|
|
+ with open(f"./output/storyboard_{idx}_with_camera_tree.json", "w", encoding='utf-8') as f:
|
|
|
+ json.dump(camera_tree, f, ensure_ascii=False, indent=4)
|
|
|
+
|
|
|
+ storyboard |= camera_tree
|
|
|
+ logger.info(f"Camera tree for scene {idx} created.")
|
|
|
+
|
|
|
+ return storyboards
|
|
|
+
|
|
|
+ def _character_portraits_generator(
|
|
|
+ self,
|
|
|
+ characters: dict,
|
|
|
+ style: str
|
|
|
+ ):
|
|
|
+ for idx, character in enumerate(characters["characters"]):
|
|
|
+ logger.info(f"Designing portrait for character {idx}...")
|
|
|
+ if os.path.exists(f"./output/portraits_{idx}.json"):
|
|
|
+ logger.info(f"Portrait for character {idx} already exists.")
|
|
|
+ with open(f"./output/portraits_{idx}.json", "r", encoding='utf-8') as f:
|
|
|
+ portrait_info = json.load(f)
|
|
|
+ else:
|
|
|
+
|
|
|
+ front_image_path = f"./output/front_portrait_{idx}.png"
|
|
|
+ side_image_path = f"./output/side_portrait_{idx}.png"
|
|
|
+ back_image_path = f"./output/back_portrait_{idx}.png"
|
|
|
+
|
|
|
+ front_portrait = character_portraits_generator.generate_front_portrait(
|
|
|
+ character=character,
|
|
|
+ style=style
|
|
|
+ )
|
|
|
+ front_portrait.save(front_image_path)
|
|
|
+
|
|
|
+ side_portrait = character_portraits_generator.generate_side_portrait(
|
|
|
+ character=character,
|
|
|
+ front_image_path=[front_image_path]
|
|
|
+ )
|
|
|
+ side_portrait.save(side_image_path)
|
|
|
+ back_portrait = character_portraits_generator.generate_back_portrait(
|
|
|
+ character=character,
|
|
|
+ front_image_path=[front_image_path]
|
|
|
+ )
|
|
|
+ back_portrait.save(back_image_path)
|
|
|
+
|
|
|
+
|
|
|
+ portrait_info = {
|
|
|
+ "front_portrait": front_image_path,
|
|
|
+ "side_portrait": side_image_path,
|
|
|
+ "back_portrait": back_image_path
|
|
|
+ }
|
|
|
+
|
|
|
+ with open(f"./output/portraits_{idx}.json", "w", encoding='utf-8') as f:
|
|
|
+ json.dump(portrait_info, f, ensure_ascii=False, indent=4)
|
|
|
+
|
|
|
+ character |= portrait_info
|
|
|
+ logger.info(f"Portrait for character {idx} designed.")
|
|
|
+
|
|
|
+ return characters
|
|
|
+
|
|
|
+ def _generate_video_frames_for_scene(
|
|
|
+ self,
|
|
|
+ storyboards_with_camera_tree: dict,
|
|
|
+ character_portraits: dict
|
|
|
+ ):
|
|
|
+
|
|
|
+ shot_num = 0
|
|
|
+ for scene_idx, storyboard_with_camera_tree in enumerate(storyboards_with_camera_tree["storyboards"]):
|
|
|
+ logger.info(f"Generating video frames for scene {scene_idx}...")
|
|
|
+
|
|
|
+ storyboard = storyboard_with_camera_tree["storyboard"]
|
|
|
+ camera_tree = storyboard_with_camera_tree["camera_tree"]
|
|
|
+
|
|
|
+ parent_shot_idxs = [0]
|
|
|
+ active_shot_idxs = []
|
|
|
+ for _, item in enumerate(camera_tree):
|
|
|
+ if item["parent_shot_idx"] is not None:
|
|
|
+ parent_shot_idxs.append(item["parent_shot_idx"])
|
|
|
+ active_shot_idxs.append(item["active_shot_idxs"])
|
|
|
+
|
|
|
+ process_order = efficient_sort(parent_shot_idxs, active_shot_idxs)
|
|
|
+
|
|
|
+ for cam_idx in process_order:
|
|
|
+ logger.info(f"Processing scene {scene_idx} - camera {cam_idx}...")
|
|
|
+ camera_item = camera_tree[cam_idx]
|
|
|
+ prev_frame_path_and_text_pairs = []
|
|
|
+ for _, shot_idx in enumerate(camera_item["active_shot_idxs"]):
|
|
|
+ logger.info(f"Processing scene {scene_idx} - camera {cam_idx} - shot {shot_idx}...")
|
|
|
+ frame_description = storyboard[shot_idx]["ff_desc"]
|
|
|
+ vis_char_idxs = storyboard[shot_idx]["ff_vis_char_idxs"]
|
|
|
+
|
|
|
+ shot_num += 1
|
|
|
+ image_path_and_text_pairs = []
|
|
|
+ frame_save_path = f"./output/frame_scene{scene_idx}_camera{cam_idx}_shot{shot_idx}.png"
|
|
|
+
|
|
|
+ if os.path.exists(frame_save_path):
|
|
|
+ logger.info(f"Frame for scene {scene_idx} - camera {cam_idx} - shot {shot_idx} already exists.")
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+
|
|
|
+ # 参考可见角色三视图
|
|
|
+ for vis_char_idx in vis_char_idxs:
|
|
|
+ logger.info(f"Referencing character {vis_char_idx} portrait...")
|
|
|
+ image_path_and_text_pairs.append((character_portraits["characters"][vis_char_idx]["front_portrait"], f"{character_portraits['characters'][vis_char_idx]['identifier_in_scene']}的正面肖像"))
|
|
|
+ image_path_and_text_pairs.append((character_portraits["characters"][vis_char_idx]["side_portrait"], f"{character_portraits['characters'][vis_char_idx]['identifier_in_scene']}的侧面肖像"))
|
|
|
+ image_path_and_text_pairs.append((character_portraits["characters"][vis_char_idx]["back_portrait"], f"{character_portraits['characters'][vis_char_idx]['identifier_in_scene']}的背面肖像"))
|
|
|
+
|
|
|
+ # 参考前序帧
|
|
|
+ image_path_and_text_pairs.extend(prev_frame_path_and_text_pairs)
|
|
|
+
|
|
|
+ # 参考父帧
|
|
|
+ if camera_item["parent_shot_idx"] is not None:
|
|
|
+ image_path_and_text_pairs.append((storyboard[camera_item["parent_shot_idx"]]["ff_path"], storyboard[camera_item["parent_shot_idx"]]["ff_desc"]))
|
|
|
+
|
|
|
+ # 筛选参考图像,生成生图提示词
|
|
|
+ info_for_gen_frame = reference_image_selector.select_reference_images_and_generate_prompt(
|
|
|
+ image_path_and_text_pairs=image_path_and_text_pairs,
|
|
|
+ frame_description=frame_description
|
|
|
+ )
|
|
|
+
|
|
|
+ logger.info(f"目标帧描述:\n{frame_description}")
|
|
|
+ logger.info(f"可参考帧:\n{image_path_and_text_pairs}")
|
|
|
+ logger.info(f"实际参考:\n{info_for_gen_frame}")
|
|
|
+
|
|
|
+ # 生成序列帧
|
|
|
+ frame_prompt = info_for_gen_frame["text_prompt"]
|
|
|
+ image_urls = [item[0] for item in info_for_gen_frame["reference_image_path_and_text_pairs"]]
|
|
|
+
|
|
|
+ logger.info(f"Frame prompt: {frame_prompt}")
|
|
|
+ logger.info(f"Reference images: {image_urls}")
|
|
|
+
|
|
|
+ # 开始生成帧
|
|
|
+ # if len(image_urls) == 0:
|
|
|
+ # frame = asyncio.run(image_generator.generate_without_refer(frame_prompt))
|
|
|
+ # frame.save_url(frame_save_path)
|
|
|
+ # else:
|
|
|
+ # frame = asyncio.run(image_generator.generate(frame_prompt, image_urls))
|
|
|
+ # frame.save_url(frame_save_path)
|
|
|
+
|
|
|
+ frame = generate_image_from_prompt_and_images(frame_prompt, image_paths=image_urls)
|
|
|
+ frame.save(frame_save_path)
|
|
|
+ # 保存前序帧
|
|
|
+ prev_frame_path_and_text_pairs.append((frame_save_path, frame_description))
|
|
|
+
|
|
|
+ # storyboard[shot_idx]["ff_url"] = frame.data
|
|
|
+ storyboard[shot_idx]["ff_path"] = frame_save_path
|
|
|
+
|
|
|
+ # shot_num += 1
|
|
|
+
|
|
|
+ logger.info(f"Generated {shot_num} video frames.")
|
|
|
+ with open(f"./output/final_storyboards.json", "w", encoding='utf-8') as f:
|
|
|
+ json.dump(storyboards_with_camera_tree, f, ensure_ascii=False, indent=4)
|
|
|
+
|
|
|
+ return storyboards_with_camera_tree
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+
|
|
|
+ pipeline = Script2VideoPipeline()
|
|
|
+
|
|
|
+ pipeline.video_create_pipeline(
|
|
|
+ idea="身穿时尚服装的美女在街头漫步",
|
|
|
+ user_requirement="剧情要连贯,最多三个场景",
|
|
|
+ style="写实风格"
|
|
|
+ )
|
|
|
+
|
|
|
+ # with open("./output/storyboards_with_camera_tree.json", "r") as f:
|
|
|
+ # storyboards = json.load(f)
|
|
|
+
|
|
|
+ # full_items = storyboards["storyboards"]
|
|
|
+
|
|
|
+ # for item in full_items:
|
|
|
+ # camera_tree = item["camera_tree"]
|
|
|
+
|
|
|
+ # for camera in camera_tree:
|
|
|
+ # active_shot_idxs = camera["active_shot_idxs"][0]
|
|
|
+ # camera["active_shot_idxs"] = active_shot_idxs
|
|
|
+
|
|
|
+
|
|
|
+ # with open("./output/storyboards_with_camera_treess.json", "w") as f:
|
|
|
+ # json.dump(storyboards, f, ensure_ascii=False, indent=4)
|
|
|
+
|
|
|
+
|
|
|
+ # 生成角色肖像三视图
|
|
|
+ # with open("./output/characters.json", "r", encoding='utf-8') as f:
|
|
|
+ # characters = json.load(f)
|
|
|
+
|
|
|
+ # character_portraits = pipeline._character_portraits_generator(
|
|
|
+ # characters=characters,
|
|
|
+ # style="cartoon"
|
|
|
+ # )
|
|
|
+
|
|
|
+ # with open("./output/character_portraits.json", "w", encoding='utf-8') as f:
|
|
|
+ # json.dump(character_portraits, f, ensure_ascii=False, indent=4)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ # with open("./output/character_portraits.json", "r", encoding='utf-8') as f:
|
|
|
+ # character_portraits = json.load(f)
|
|
|
+
|
|
|
+ # with open("./output/storyboards_with_camera_tree.json", "r", encoding='utf-8') as f:
|
|
|
+ # storyboards_with_camera_tree = json.load(f)
|
|
|
+
|
|
|
+ # shot_num = 0
|
|
|
+ # for scene_idx, storyboard_with_camera_tree in enumerate(storyboards_with_camera_tree["storyboards"]):
|
|
|
+ # for shot_idx, shot in enumerate(storyboard_with_camera_tree["storyboard"]):
|
|
|
+ # ff_path = f"./output/frame_scene{scene_idx}_camera{shot['cam_idx']}_shot{shot_idx}.png"
|
|
|
+ # if os.path.exists(ff_path):
|
|
|
+ # shot["ff_path"] = ff_path
|
|
|
+ # shot_num += 1
|
|
|
+
|
|
|
+ # logger.info(f"Total shot number: {shot_num}")
|
|
|
+
|
|
|
+
|
|
|
+ # # 生成视频帧
|
|
|
+ # result = pipeline._generate_video_frames_for_scene(
|
|
|
+ # storyboards_with_camera_tree=storyboards_with_camera_tree,
|
|
|
+ # character_portraits=character_portraits
|
|
|
+ # )
|
|
|
+
|
|
|
+ # with open("./output/storyboards_with_frames.json", "w", encoding='utf-8') as f:
|
|
|
+ # json.dump(storyboards_with_camera_tree, f, ensure_ascii=False, indent=4)
|
|
|
+
|
|
|
+ # # 将指定目录下的所有frame_scene*.png文件重命名为new_frame_scene*.png
|
|
|
+ # for file in os.listdir("./output"):
|
|
|
+ # if file.startswith("frame_scene") and file.endswith(".png"):
|
|
|
+ # new_file = file.replace("frame_scene", "new_frame_scene")
|
|
|
+ # os.rename(os.path.join("./output", file), os.path.join("./output", new_file))
|
|
|
+
|
|
|
+ # 生成视频片段
|
|
|
+ # with open("./output/storyboards_with_frames.json", "r", encoding='utf-8') as f:
|
|
|
+ # final_storyboards = json.load(f)
|
|
|
+
|
|
|
+ # storyboards_with_segments = video_generator.generate(
|
|
|
+ # video_script_data=final_storyboards
|
|
|
+ # )
|
|
|
+ # with open("./output/storyboards_with_segments.json", "w", encoding='utf-8') as f:
|
|
|
+ # json.dump(storyboards_with_segments, f, ensure_ascii=False, indent=4)
|
|
|
+
|
|
|
+
|
|
|
+ # concat_videos("./output/storyboards_with_segments.json")
|
|
|
+
|