import json from collections import Counter from modules.video_processing.video_cut import moviepy_cut from .path_config import PathConfig from .logger_config import setup_logger logger = setup_logger(__name__) path_config = PathConfig() def find_longest_consecutive_spk(data): """ 找出字典列表中连续 spk 为 2 的最长子集的索引。 # TODO:错误源头,没有进行存在性检验 Args: data (list): 字典列表,每个字典都有键 "spk" Returns: tuple: 包含最长子集的起始和结束索引,如果没有找到则返回 None """ longest_start = longest_end = -1 longest_length = 0 current_start = -1 current_length = 0 speakers = sorted({item['spk'] for item in data}) for index, item in enumerate(data): if item.get("spk") == speakers[1]: if current_length == 0: # 开始新的连续序列 current_start = index current_length += 1 else: if current_length > longest_length: # 更新最长序列 longest_length = current_length longest_start = current_start longest_end = index - 1 current_length = 0 # 重置当前长度 # 检查最后一个序列 if current_length > longest_length: longest_length = current_length longest_start = current_start longest_end = len(data) - 1 if longest_length > 0: return (longest_start, longest_end) else: return None # TODO: debug: sentences为none? def speaker_extract(audio_json): with open(audio_json, 'r') as file: data = json.load(file) sentences = data["sentence_info"] video_name = data["key"] spk_index = find_longest_consecutive_spk(sentences) start_time = sentences[spk_index[0]]["start"] / 1000 + 5 end_time = sentences[spk_index[1]]["end"] / 1000 + 1 return start_time, end_time, video_name def aide_cut(audio_json): # 筛选助播讲话片段 start_time, end_time, video_name = speaker_extract(audio_json) logger.info(f"助播片段裁切:{video_name} - {start_time} - {end_time}") # 助播视频裁切 aide_video_dir = path_config.get_path('aide_video') sub_video_dir = path_config.get_path('sub_video') clip_video = sub_video_dir / f"{video_name}.mp4" output_video = aide_video_dir / f"aide-{video_name}.mp4" moviepy_cut(clip_video, output_video, start_time, end_time) return output_video if __name__ == "__main__": print(aide_cut("transcript.json"))