import json import os from datetime import datetime import time from pathlib import Path from utils.audio_analysis import audio_analysis_pipeline from modules.video_processing.video_concat import merge_videos from modules.video_processing.video_cut import video_cut_by_json from utils.llm_director import director_json from utils.video_concat import concat_videos from utils.common import read_video_list, find_vidoe2cut from utils.first_cut import cut_once from utils.oral_cut import oral_cut from utils.aide_cut import aide_cut from utils.show_cut import show_cut from utils.logger_config import setup_logger logger = setup_logger(__name__) class PipelineState: """管理流水线状态的类""" def __init__(self, state_file='pipeline_state.json'): self.state_file = state_file self.state = self._load_state() def _load_state(self): """加载状态文件,若不存在则初始化""" if os.path.exists(self.state_file): with open(self.state_file, 'r') as f: return json.load(f) else: return {'steps': {}} def save(self): """保存当前状态到文件""" with open(self.state_file, 'w') as f: json.dump(self.state, f, indent=2) def is_completed(self, step_name): """检查步骤是否已完成""" return self.state['steps'].get(step_name, {}).get('status') == 'completed' def mark_completed(self, step_name, output=None): """标记步骤为已完成""" self.state['steps'][step_name] = { 'status': 'completed', 'timestamp': datetime.now().isoformat(), 'output': output } self.save() def reset_status(self): """重置所有步骤的状态为 failed""" for step in self.state['steps']: self.state['steps'][step]['status'] = 'failed' self.save() def get_output(self, step_name): """获取步骤返回值""" return self.state['steps'][step_name]['output'] def run_step(step_name, func, state, dependencies=[], *args, **kwargs): """执行单个步骤并处理依赖""" # 检查依赖是否全部完成 for dep in dependencies: if not state.is_completed(dep): raise RuntimeError(f"Step {step_name} depends on {dep}, which is not completed.") if not state.is_completed(step_name): print(f"Running step: {step_name}") try: result = func(*args, **kwargs) # 执行实际步骤逻辑 state.mark_completed(step_name, result) print(f"Step {step_name} completed successfully.") except Exception as e: print(f"Step {step_name} failed: {str(e)}") raise # 中断后续执行 def main(video_path): start = time.time() state = PipelineState() try: run_step('step1', cut_once, state, [], video_path) for sub_video in state.get_output('step1')[1]: run_step('step2', oral_cut, state, ['step1'], sub_video) run_step('step3', show_cut, state, ['step1'], f"data/audio_json/{Path(sub_video).stem}.json") run_step('step4', concat_videos, state, ['step2', 'step3'], [state.get_output('step2'), state.get_output('step3')]) logger.info(f"{sub_video} 裁切完成!") logger.info(f"{video_path} 裁切完成,耗时:{time.time() - start} 秒.") except Exception as e: print(f"流程中断,因为: {str(e)}") else: print("所有步骤执行完成!") state.reset_status() if __name__ == "__main__": main("data/raw_video/usab.mp4")