123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- import json
- import os
- from datetime import datetime
- import time
- from pathlib import Path
- from utils.audio_analysis import audio_analysis_pipeline
- from modules.video_processing.video_concat import merge_videos
- from modules.video_processing.video_cut import video_cut_by_json
- from utils.llm_director import director_json
- from utils.video_concat import concat_videos
- from utils.common import read_video_list, find_vidoe2cut
- from utils.first_cut import cut_once
- from utils.oral_cut import oral_cut
- from utils.aide_cut import aide_cut
- from utils.show_cut import show_cut
- from utils.logger_config import setup_logger
- logger = setup_logger(__name__)
- class PipelineState:
- """管理流水线状态的类"""
- def __init__(self, state_file='pipeline_state.json'):
- self.state_file = state_file
- self.state = self._load_state()
- def _load_state(self):
- """加载状态文件,若不存在则初始化"""
- if os.path.exists(self.state_file):
- with open(self.state_file, 'r') as f:
- return json.load(f)
- else:
- return {'steps': {}}
- def save(self):
- """保存当前状态到文件"""
- with open(self.state_file, 'w') as f:
- json.dump(self.state, f, indent=2)
- def is_completed(self, step_name):
- """检查步骤是否已完成"""
- return self.state['steps'].get(step_name, {}).get('status') == 'completed'
- def mark_completed(self, step_name, output=None):
- """标记步骤为已完成"""
- self.state['steps'][step_name] = {
- 'status': 'completed',
- 'timestamp': datetime.now().isoformat(),
- 'output': output
- }
- self.save()
- def reset_status(self):
- """重置所有步骤的状态为 failed"""
- for step in self.state['steps']:
- self.state['steps'][step]['status'] = 'failed'
- self.save()
- def get_output(self, step_name):
- """获取步骤返回值"""
- return self.state['steps'][step_name]['output']
- def run_step(step_name, func, state, dependencies=[], *args, **kwargs):
- """执行单个步骤并处理依赖"""
- # 检查依赖是否全部完成
- for dep in dependencies:
- if not state.is_completed(dep):
- raise RuntimeError(f"Step {step_name} depends on {dep}, which is not completed.")
-
- if not state.is_completed(step_name):
- print(f"Running step: {step_name}")
- try:
- result = func(*args, **kwargs) # 执行实际步骤逻辑
- state.mark_completed(step_name, result)
- print(f"Step {step_name} completed successfully.")
- except Exception as e:
- print(f"Step {step_name} failed: {str(e)}")
- raise # 中断后续执行
- def main(video_path):
- start = time.time()
- state = PipelineState()
- try:
- run_step('step1', cut_once, state, [], video_path)
- for sub_video in state.get_output('step1')[1]:
- run_step('step2', oral_cut, state, ['step1'], sub_video)
- run_step('step3', show_cut, state, ['step1'], f"data/audio_json/{Path(sub_video).stem}.json")
- run_step('step4', concat_videos, state, ['step2', 'step3'], [state.get_output('step2'), state.get_output('step3')])
- logger.info(f"{sub_video} 裁切完成!")
- logger.info(f"{video_path} 裁切完成,耗时:{time.time() - start} 秒.")
- except Exception as e:
- print(f"流程中断,因为: {str(e)}")
- else:
- print("所有步骤执行完成!")
- state.reset_status()
- if __name__ == "__main__":
- main("data/raw_video/usab.mp4")
|