import os import time import json import subprocess from utils.logger_config import setup_logger from moviepy.editor import VideoFileClip, concatenate_videoclips from moviepy.audio.fx import audio_fadein, audio_fadeout from tqdm import tqdm logger = setup_logger(__name__) # def get_video_duration(video_path): # """ # 获取视频时长(以秒为单位)。 # Args: # video_path (str): 视频文件路径 # Returns: # float: 视频时长(秒) # """ # command = f'ffprobe -v error -show_entries format=duration -of json "{video_path}"' # result = subprocess.run(command, shell=True, capture_output=True, text=True) # duration_info = json.loads(result.stdout) # return float(duration_info['format']['duration']) # def merge_video(video_1, video_2, video_output): # logger.info(f"口播视频拼接。。。") # # 转场设置 # video_1_duration = get_video_duration(video_1) # print(f"!!!!!!!!!!: {video_1_duration}") # filter_complex = f""" # xfade=transition=fade:duration=0.5:offset={video_1_duration - 0.5}, # format=yuv420p; # acrossfade=d=0.2 # """.replace("\n", "") # # 执行视频拼接 # command = f'ffmpeg -i {video_1} -i {video_2} -filter_complex "{filter_complex}" -y {video_output}' # subprocess.run(command, shell=True) def get_video_duration(video_path): """ 获取视频时长(以秒为单位)。 """ command = f'ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 "{video_path}"' result = subprocess.run(command, shell=True, capture_output=True, text=True) return float(result.stdout.strip()) def merge_video(video_1, video_2, video_output, fade_duration=0.5): logger.info(f"口播视频拼接。。。") # 获取 video_1 的时长 video_1_duration = get_video_duration(video_1) # 设置转场和淡入淡出效果 filter_complex = f""" [0:v]setpts=PTS-STARTPTS, fade=type=out:duration={fade_duration}:start_time={video_1_duration - fade_duration}[v0f]; [1:v]setpts=PTS-STARTPTS, fade=type=in:duration={fade_duration}:start_time=0[v1f]; [v0f][v1f]concat=n=2:v=1:a=0[outv]; [0:a][1:a]acrossfade=d={fade_duration}[outa] """.replace("\n", "") # 执行视频拼接 command = f'ffmpeg -i {video_1} -i {video_2} -filter_complex "{filter_complex}" -map "[outv]" -map "[outa]" -y {video_output}' subprocess.run(command, shell=True) def merge_videos(video_list): """ 按列表顺序拼接多个视频。 Args: video_list (list): 视频文件路径列表 """ if len(video_list) < 2: logger.error("至少需要两个视频进行拼接。") return # 使用第一个视频作为初始视频 merged_video = video_list[0] for idx, video in tqdm(enumerate(video_list[1:], start=1)): output_file = f'output/oral_video/merged_video_{idx}.mp4' merge_video(merged_video, video, output_file) merged_video = output_file # 最终合并结果重命名为 merged_video.mp4 video_name = os.path.splitext(os.path.basename(video_list[0]))[0].split('_')[0] final_output = f"output/oral_video/{video_name}.mp4" subprocess.run(f'mv {merged_video} {final_output}', shell=True) logger.info(f"最终合并视频保存为 {final_output}") if __name__ == "__main__": input_files = [ "./output/video_clips/test_video_003.mp4", "./output/video_clips/test_video_002.mp4", "./output/video_clips/test_video_004.mp4", ] input_files = ['data/clip_video/videoa_009.mp4', 'data/clip_video/videoa_023.mp4', 'data/clip_video/videoa_004.mp4'] merge_videos(input_files) # input_files = [ # "./output/video_clips/test_video_001.mp4", # "./output/video_clips/test_video_002.mp4", # ] # start_time = time.time() # merge_video(input_files[0], input_files[1], "merged_video.mp4")