aide_cut.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. import json
  2. from collections import Counter
  3. from modules.video_processing.video_cut import moviepy_cut
  4. from .path_config import PathConfig
  5. from .logger_config import setup_logger
  6. logger = setup_logger(__name__)
  7. path_config = PathConfig()
  8. def find_longest_consecutive_spk(data):
  9. """
  10. 找出字典列表中连续 spk 为 2 的最长子集的索引。 # TODO:错误源头,没有进行存在性检验
  11. Args:
  12. data (list): 字典列表,每个字典都有键 "spk"
  13. Returns:
  14. tuple: 包含最长子集的起始和结束索引,如果没有找到则返回 None
  15. """
  16. longest_start = longest_end = -1
  17. longest_length = 0
  18. current_start = -1
  19. current_length = 0
  20. speakers = sorted({item['spk'] for item in data})
  21. for index, item in enumerate(data):
  22. if item.get("spk") == speakers[1]:
  23. if current_length == 0: # 开始新的连续序列
  24. current_start = index
  25. current_length += 1
  26. else:
  27. if current_length > longest_length: # 更新最长序列
  28. longest_length = current_length
  29. longest_start = current_start
  30. longest_end = index - 1
  31. current_length = 0 # 重置当前长度
  32. # 检查最后一个序列
  33. if current_length > longest_length:
  34. longest_length = current_length
  35. longest_start = current_start
  36. longest_end = len(data) - 1
  37. if longest_length > 0:
  38. return (longest_start, longest_end)
  39. else:
  40. return None
  41. # TODO: debug: sentences为none?
  42. def speaker_extract(audio_json):
  43. with open(audio_json, 'r') as file:
  44. data = json.load(file)
  45. sentences = data["sentence_info"]
  46. video_name = data["key"]
  47. spk_index = find_longest_consecutive_spk(sentences)
  48. start_time = sentences[spk_index[0]]["start"] / 1000 + 5
  49. end_time = sentences[spk_index[1]]["end"] / 1000 + 1
  50. return start_time, end_time, video_name
  51. def aide_cut(audio_json):
  52. # 筛选助播讲话片段
  53. start_time, end_time, video_name = speaker_extract(audio_json)
  54. logger.info(f"助播片段裁切:{video_name} - {start_time} - {end_time}")
  55. # 助播视频裁切
  56. aide_video_dir = path_config.get_path('aide_video')
  57. sub_video_dir = path_config.get_path('sub_video')
  58. clip_video = sub_video_dir / f"{video_name}.mp4"
  59. output_video = aide_video_dir / f"aide-{video_name}.mp4"
  60. moviepy_cut(clip_video, output_video, start_time, end_time)
  61. return output_video
  62. if __name__ == "__main__":
  63. print(aide_cut("transcript.json"))