video_concat.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. import os
  2. import time
  3. import json
  4. import subprocess
  5. from utils.logger_config import setup_logger
  6. from moviepy.editor import VideoFileClip, concatenate_videoclips
  7. from moviepy.audio.fx import audio_fadein, audio_fadeout
  8. from tqdm import tqdm
  9. logger = setup_logger(__name__)
  10. # def get_video_duration(video_path):
  11. # """
  12. # 获取视频时长(以秒为单位)。
  13. # Args:
  14. # video_path (str): 视频文件路径
  15. # Returns:
  16. # float: 视频时长(秒)
  17. # """
  18. # command = f'ffprobe -v error -show_entries format=duration -of json "{video_path}"'
  19. # result = subprocess.run(command, shell=True, capture_output=True, text=True)
  20. # duration_info = json.loads(result.stdout)
  21. # return float(duration_info['format']['duration'])
  22. # def merge_video(video_1, video_2, video_output):
  23. # logger.info(f"口播视频拼接。。。")
  24. # # 转场设置
  25. # video_1_duration = get_video_duration(video_1)
  26. # print(f"!!!!!!!!!!: {video_1_duration}")
  27. # filter_complex = f"""
  28. # xfade=transition=fade:duration=0.5:offset={video_1_duration - 0.5},
  29. # format=yuv420p;
  30. # acrossfade=d=0.2
  31. # """.replace("\n", "")
  32. # # 执行视频拼接
  33. # command = f'ffmpeg -i {video_1} -i {video_2} -filter_complex "{filter_complex}" -y {video_output}'
  34. # subprocess.run(command, shell=True)
  35. def get_video_duration(video_path):
  36. """
  37. 获取视频时长(以秒为单位)。
  38. """
  39. command = f'ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 "{video_path}"'
  40. result = subprocess.run(command, shell=True, capture_output=True, text=True)
  41. return float(result.stdout.strip())
  42. def merge_video(video_1, video_2, video_output, fade_duration=0.5):
  43. logger.info(f"口播视频拼接。。。")
  44. # 获取 video_1 的时长
  45. video_1_duration = get_video_duration(video_1)
  46. # 设置转场和淡入淡出效果
  47. filter_complex = f"""
  48. [0:v]setpts=PTS-STARTPTS, fade=type=out:duration={fade_duration}:start_time={video_1_duration - fade_duration}[v0f];
  49. [1:v]setpts=PTS-STARTPTS, fade=type=in:duration={fade_duration}:start_time=0[v1f];
  50. [v0f][v1f]concat=n=2:v=1:a=0[outv];
  51. [0:a][1:a]acrossfade=d={fade_duration}[outa]
  52. """.replace("\n", "")
  53. # 执行视频拼接
  54. command = f'ffmpeg -i {video_1} -i {video_2} -filter_complex "{filter_complex}" -map "[outv]" -map "[outa]" -y {video_output}'
  55. subprocess.run(command, shell=True)
  56. def merge_videos(video_list):
  57. """
  58. 按列表顺序拼接多个视频。
  59. Args:
  60. video_list (list): 视频文件路径列表
  61. """
  62. if len(video_list) < 2:
  63. logger.error("至少需要两个视频进行拼接。")
  64. return
  65. # 使用第一个视频作为初始视频
  66. merged_video = video_list[0]
  67. for idx, video in tqdm(enumerate(video_list[1:], start=1)):
  68. output_file = f'output/oral_video/merged_video_{idx}.mp4'
  69. merge_video(merged_video, video, output_file)
  70. merged_video = output_file
  71. # 最终合并结果重命名为 merged_video.mp4
  72. video_name = os.path.splitext(os.path.basename(video_list[0]))[0].split('_')[0]
  73. final_output = f"output/oral_video/{video_name}.mp4"
  74. subprocess.run(f'mv {merged_video} {final_output}', shell=True)
  75. logger.info(f"最终合并视频保存为 {final_output}")
  76. if __name__ == "__main__":
  77. input_files = [
  78. "./output/video_clips/test_video_003.mp4",
  79. "./output/video_clips/test_video_002.mp4",
  80. "./output/video_clips/test_video_004.mp4",
  81. ]
  82. input_files = ['data/clip_video/videoa_009.mp4', 'data/clip_video/videoa_023.mp4', 'data/clip_video/videoa_004.mp4']
  83. merge_videos(input_files)
  84. # input_files = [
  85. # "./output/video_clips/test_video_001.mp4",
  86. # "./output/video_clips/test_video_002.mp4",
  87. # ]
  88. # start_time = time.time()
  89. # merge_video(input_files[0], input_files[1], "merged_video.mp4")