| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- import os
- import time
- import threading
- from pathlib import Path
- from typing import Optional, List, Dict, Any, Callable, Union
- from enum import Enum
- from dataclasses import dataclass, field
- from tqdm import tqdm
- from moviepy.editor import VideoFileClip, concatenate_videoclips
- import os
- import json
- from modules.media_process.media_processor import media_processor
- from utils.tools import (
- read_json_file,
- string_to_json,
- save_json_file,
- convert_webp_to_png
- )
- from utils.upload import upload_file_to_tos
- from utils.logger_config import setup_logger
- logger = setup_logger(__name__)
- class VideoComposer:
- """视频合成器"""
-
- def compose(self, script_path: str) -> str:
- """
- 将视频片段合成为完整视频
-
- Args:
- script_path: 脚本文件路径
-
- Returns:
- 最终视频文件名
-
- Raises:
- FileNotFoundError: 当脚本文件不存在时
- ValueError: 当脚本数据无效或视频片段不存在时
- """
- if not os.path.exists(script_path):
- raise FileNotFoundError(f"脚本文件不存在: {script_path}")
-
- # 读取脚本
- video_script_data = read_json_file(script_path)
- if not video_script_data:
- raise ValueError("无法读取脚本文件或文件为空")
-
- storyboards = video_script_data[0].get("storyboards", [])
- if not storyboards:
- raise ValueError("脚本中未找到分镜详情")
- lens_details = []
- for storyboard in storyboards:
- clip_path = storyboard.get("storyboard")
- lens_details.append(clip_path)
- lens_details = [item for sublist in lens_details for item in sublist]
-
- # 收集所有视频片段路径
- clips = []
- for lens_item in lens_details:
- clip_path = lens_item.get("clip_path")
- if not clip_path:
- logger.warning(f"分镜 {lens_item.get('lens_id')} 缺少视频片段路径")
- continue
-
- if not os.path.exists(clip_path):
- logger.warning(f"视频片段不存在: {clip_path}")
- continue
-
- clips.append(clip_path)
-
- if not clips:
- raise ValueError("未找到有效的视频片段")
-
- logger.info(f"开始拼接 {len(clips)} 个视频片段")
- logger.debug(f"视频片段列表: {clips}")
-
- # 生成最终视频文件名
- film_path = f"./output/final_video.mp4"
-
- try:
- media_processor.concat_videos(clips, film_path)
- logger.info(f"视频合成完成: {film_path}")
- return film_path
- except Exception as e:
- logger.error(f"视频合成失败: {e}")
- raise
- video_composer = VideoComposer()
- def concat_videos(json_path, output_path):
- """
- 拼接多个视频文件为一个视频。
- :param video_paths: 视频文件路径列表,例如 ['1.mp4', '2.mp4']
- :param output_path: 输出视频文件路径,例如 'output.mp4'
- """
- with open(json_path, "r", encoding='utf-8') as f:
- final_storyboards = json.load(f)["storyboards"]
- segments = []
- for storyboard in final_storyboards:
- storyboard_path = storyboard["storyboard"]
- for item in storyboard_path:
- clip_path = item["clip_path"]
- segments.append(clip_path)
-
- if not segments:
- raise ValueError("视频路径列表不能为空")
- clips = []
- for path in segments:
- if not os.path.isfile(path):
- raise FileNotFoundError(f"视频文件不存在: {path}")
- clips.append(VideoFileClip(path))
- # 拼接所有视频片段
- final_clip = concatenate_videoclips(clips, method="compose") # 使用 compose 可处理不同尺寸
- # 写入输出文件
- final_clip.write_videofile(
- output_path,
- codec='libx264',
- audio_codec='aac',
- temp_audiofile='temp-audio.m4a',
- remove_temp=True
- )
- # 关闭所有 clip 以释放资源
- for clip in clips:
- clip.close()
- final_clip.close()
- # 示例用法
- if __name__ == "__main__":
- # 生成视频片段
- with open("./output/storyboards_with_segments.json", "r", encoding='utf-8') as f:
- final_storyboards = json.load(f)[0]["storyboards"]
- segments = []
- for storyboard in final_storyboards:
- storyboard_path = storyboard["storyboard"]
- for item in storyboard_path:
- clip_path = item["clip_path"]
- segments.append(clip_path)
- concat_videos("./output/storyboards_with_segments.json", "output_combined_4.mp4")
|