new_main.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import json
  2. import os
  3. from datetime import datetime
  4. import time
  5. from pathlib import Path
  6. from utils.audio_analysis import audio_analysis_pipeline
  7. from modules.video_processing.video_concat import merge_videos
  8. from modules.video_processing.video_cut import video_cut_by_json
  9. from utils.llm_director import director_json
  10. from utils.video_concat import concat_videos
  11. from utils.common import read_video_list, find_vidoe2cut
  12. from utils.first_cut import cut_once
  13. from utils.oral_cut import oral_cut
  14. from utils.aide_cut import aide_cut
  15. from utils.show_cut import show_cut
  16. from utils.logger_config import setup_logger
  17. logger = setup_logger(__name__)
  18. class PipelineState:
  19. """管理流水线状态的类"""
  20. def __init__(self, state_file='pipeline_state.json'):
  21. self.state_file = state_file
  22. self.state = self._load_state()
  23. def _load_state(self):
  24. """加载状态文件,若不存在则初始化"""
  25. if os.path.exists(self.state_file):
  26. with open(self.state_file, 'r') as f:
  27. return json.load(f)
  28. else:
  29. return {'steps': {}}
  30. def save(self):
  31. """保存当前状态到文件"""
  32. with open(self.state_file, 'w') as f:
  33. json.dump(self.state, f, indent=2)
  34. def is_completed(self, step_name):
  35. """检查步骤是否已完成"""
  36. return self.state['steps'].get(step_name, {}).get('status') == 'completed'
  37. def mark_completed(self, step_name, output=None):
  38. """标记步骤为已完成"""
  39. self.state['steps'][step_name] = {
  40. 'status': 'completed',
  41. 'timestamp': datetime.now().isoformat(),
  42. 'output': output
  43. }
  44. self.save()
  45. def reset_status(self):
  46. """重置所有步骤的状态为 failed"""
  47. for step in self.state['steps']:
  48. self.state['steps'][step]['status'] = 'failed'
  49. self.save()
  50. def get_output(self, step_name):
  51. """获取步骤返回值"""
  52. return self.state['steps'][step_name]['output']
  53. def run_step(step_name, func, state, dependencies=[], *args, **kwargs):
  54. """执行单个步骤并处理依赖"""
  55. # 检查依赖是否全部完成
  56. for dep in dependencies:
  57. if not state.is_completed(dep):
  58. raise RuntimeError(f"Step {step_name} depends on {dep}, which is not completed.")
  59. if not state.is_completed(step_name):
  60. print(f"Running step: {step_name}")
  61. try:
  62. result = func(*args, **kwargs) # 执行实际步骤逻辑
  63. state.mark_completed(step_name, result)
  64. print(f"Step {step_name} completed successfully.")
  65. except Exception as e:
  66. print(f"Step {step_name} failed: {str(e)}")
  67. raise # 中断后续执行
  68. def main(video_path):
  69. start = time.time()
  70. state = PipelineState()
  71. try:
  72. run_step('step1', cut_once, state, [], video_path)
  73. for sub_video in state.get_output('step1')[1]:
  74. run_step('step2', oral_cut, state, ['step1'], sub_video)
  75. run_step('step3', show_cut, state, ['step1'], f"data/audio_json/{Path(sub_video).stem}.json")
  76. run_step('step4', concat_videos, state, ['step2', 'step3'], [state.get_output('step2'), state.get_output('step3')])
  77. logger.info(f"{sub_video} 裁切完成!")
  78. logger.info(f"{video_path} 裁切完成,耗时:{time.time() - start} 秒.")
  79. except Exception as e:
  80. print(f"流程中断,因为: {str(e)}")
  81. else:
  82. print("所有步骤执行完成!")
  83. state.reset_status()
  84. if __name__ == "__main__":
  85. main("data/raw_video/usab.mp4")