manager.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655
  1. """
  2. 任务管理模块
  3. 提供TaskManager类和StepStatus枚举,用于管理多步骤任务的执行
  4. """
  5. import asyncio
  6. import json
  7. import os
  8. import pickle
  9. import threading
  10. from collections import deque
  11. from concurrent.futures import ThreadPoolExecutor, as_completed
  12. from datetime import datetime
  13. from typing import Any, Callable, Dict, List, Optional, Set
  14. from pathlib import Path
  15. from .config import get_config
  16. from .logger import get_logger
  17. class StepStatus:
  18. """步骤状态枚举"""
  19. PENDING = "pending" # 待执行
  20. RUNNING = "running" # 执行中
  21. COMPLETED = "completed" # 已完成
  22. FAILED = "failed" # 失败
  23. SKIPPED = "skipped" # 跳过
  24. class TaskManager:
  25. """
  26. 任务管理器 - 支持断点续传和步骤重试,支持并行执行
  27. 特性:
  28. 1. 自动保存每个步骤的执行状态
  29. 2. 支持从上次中断的地方继续执行
  30. 3. 支持强制重新执行指定步骤
  31. 4. 支持步骤依赖管理
  32. 5. 支持步骤输出缓存和恢复
  33. 6. 支持并行执行(自动检测可并行步骤)
  34. 使用示例:
  35. >>> manager = TaskManager(state_file="state.json", cache_dir="cache")
  36. >>> manager.register_step("step1", my_function, depends_on=[])
  37. >>> manager.register_step("step2", my_function2, depends_on=["step1"])
  38. >>> manager.register_step("step3", my_function3, depends_on=["step2"])
  39. >>> manager.register_step("step4", my_function4, depends_on=["step2"])
  40. >>> # 顺序执行
  41. >>> manager.run_all()
  42. >>> # 并行执行(自动检测step3和step4可以并行)
  43. >>> manager.run_all_parallel(max_workers=2)
  44. """
  45. def __init__(self, state_file: str = "task_state.json", cache_dir: str = "task_cache"):
  46. """
  47. 初始化任务管理器
  48. Args:
  49. state_file: 状态文件路径
  50. cache_dir: 缓存目录路径,用于存储步骤输出缓存
  51. """
  52. self.state_file = state_file
  53. self.cache_dir = Path(cache_dir)
  54. self.cache_dir.mkdir(parents=True, exist_ok=True)
  55. self.logger = get_logger("taskflow.manager")
  56. # 加载已有状态
  57. self.state = self._load_state()
  58. self.steps: Dict[str, Dict] = {}
  59. # 线程锁,用于保护状态文件的并发访问
  60. self._state_lock = threading.Lock()
  61. def _load_state(self) -> Dict:
  62. """加载任务状态"""
  63. if os.path.exists(self.state_file):
  64. try:
  65. with open(self.state_file, 'r', encoding='utf-8') as f:
  66. return json.load(f)
  67. except Exception as e:
  68. self.logger.warning(f"加载任务状态失败,将创建新状态:{e}")
  69. return {
  70. "steps": {},
  71. "metadata": {
  72. "created_at": datetime.now().isoformat(),
  73. "last_updated": None
  74. }
  75. }
  76. def _save_state(self):
  77. """保存任务状态(线程安全)"""
  78. with self._state_lock:
  79. self.state["metadata"]["last_updated"] = datetime.now().isoformat()
  80. try:
  81. with open(self.state_file, 'w', encoding='utf-8') as f:
  82. json.dump(self.state, f, ensure_ascii=False, indent=2)
  83. except Exception as e:
  84. self.logger.error(f"保存状态文件失败: {e}")
  85. def _get_cache_path(self, step_name: str) -> Path:
  86. """获取步骤缓存文件路径"""
  87. return self.cache_dir / f"{step_name}.pkl"
  88. def register_step(
  89. self,
  90. step_name: str,
  91. func: Callable,
  92. depends_on: Optional[List[str]] = None,
  93. force_rerun: bool = False
  94. ):
  95. """
  96. 注册步骤
  97. Args:
  98. step_name: 步骤名称(唯一标识)
  99. func: 步骤函数
  100. depends_on: 依赖的步骤名称列表
  101. force_rerun: 是否强制重新执行(忽略已完成状态)
  102. """
  103. if depends_on is None:
  104. depends_on = []
  105. self.steps[step_name] = {
  106. "func": func,
  107. "depends_on": depends_on,
  108. "force_rerun": force_rerun
  109. }
  110. # 初始化步骤状态(如果不存在)
  111. if step_name not in self.state["steps"]:
  112. self.state["steps"][step_name] = {
  113. "status": StepStatus.PENDING,
  114. "started_at": None,
  115. "completed_at": None,
  116. "error": None,
  117. "output_file": None
  118. }
  119. def get_step_status(self, step_name: str) -> str:
  120. """获取步骤状态(线程安全)"""
  121. with self._state_lock:
  122. if step_name not in self.state["steps"]:
  123. return StepStatus.PENDING
  124. return self.state["steps"][step_name]["status"]
  125. def set_step_status(self, step_name: str, status: str, error: Optional[str] = None):
  126. """设置步骤状态(线程安全)"""
  127. with self._state_lock:
  128. if step_name not in self.state["steps"]:
  129. self.state["steps"][step_name] = {
  130. "status": status,
  131. "started_at": None,
  132. "completed_at": None,
  133. "error": None,
  134. "output_file": None
  135. }
  136. step_state = self.state["steps"][step_name]
  137. step_state["status"] = status
  138. if status == StepStatus.RUNNING:
  139. step_state["started_at"] = datetime.now().isoformat()
  140. elif status in [StepStatus.COMPLETED, StepStatus.FAILED]:
  141. step_state["completed_at"] = datetime.now().isoformat()
  142. if error:
  143. step_state["error"] = str(error)
  144. self._save_state()
  145. def save_step_output(self, step_name: str, output: Any):
  146. """保存步骤输出到缓存(线程安全)"""
  147. cache_path = self._get_cache_path(step_name)
  148. try:
  149. with open(cache_path, 'wb') as f:
  150. pickle.dump(output, f)
  151. with self._state_lock:
  152. self.state["steps"][step_name]["output_file"] = str(cache_path)
  153. self._save_state()
  154. except Exception as e:
  155. self.logger.warning(f"保存步骤 {step_name} 的输出失败: {e}")
  156. def load_step_output(self, step_name: str) -> Optional[Any]:
  157. """从缓存加载步骤输出"""
  158. step_state = self.state["steps"].get(step_name, {})
  159. output_file = step_state.get("output_file")
  160. if output_file and os.path.exists(output_file):
  161. try:
  162. with open(output_file, 'rb') as f:
  163. return pickle.load(f)
  164. except Exception as e:
  165. self.logger.warning(f"加载步骤 {step_name} 的输出失败: {e}")
  166. return None
  167. def check_dependencies(self, step_name: str) -> bool:
  168. """检查步骤的依赖是否都已完成(线程安全)"""
  169. step_info = self.steps.get(step_name, {})
  170. depends_on = step_info.get("depends_on", [])
  171. for dep_step in depends_on:
  172. dep_status = self.get_step_status(dep_step)
  173. if dep_status != StepStatus.COMPLETED:
  174. self.logger.warning(f"步骤 {step_name} 的依赖 {dep_step} 尚未完成(状态: {dep_status})")
  175. return False
  176. return True
  177. def _get_ready_steps(self, step_order: Optional[List[str]] = None) -> List[str]:
  178. """
  179. 获取当前可以执行的步骤(依赖已满足且未完成)
  180. Args:
  181. step_order: 步骤顺序(如果为None,使用所有注册的步骤)
  182. Returns:
  183. 可以执行的步骤列表
  184. """
  185. if step_order is None:
  186. step_order = list(self.steps.keys())
  187. ready_steps = []
  188. for step_name in step_order:
  189. if step_name not in self.steps:
  190. continue
  191. # 检查是否需要执行
  192. if not self.should_run_step(step_name):
  193. continue
  194. # 检查依赖是否满足
  195. if self.check_dependencies(step_name):
  196. ready_steps.append(step_name)
  197. return ready_steps
  198. def _topological_sort(self, step_order: Optional[List[str]] = None) -> List[List[str]]:
  199. """
  200. 使用拓扑排序将步骤分组,每组内的步骤可以并行执行
  201. Args:
  202. step_order: 步骤顺序(如果为None,使用所有注册的步骤)
  203. Returns:
  204. 步骤批次列表,每个批次内的步骤可以并行执行
  205. """
  206. if step_order is None:
  207. step_order = list(self.steps.keys())
  208. # 构建依赖图和入度计数
  209. in_degree: Dict[str, int] = {step: 0 for step in step_order if step in self.steps}
  210. graph: Dict[str, List[str]] = {step: [] for step in step_order if step in self.steps}
  211. for step_name in step_order:
  212. if step_name not in self.steps:
  213. continue
  214. step_info = self.steps[step_name]
  215. depends_on = step_info.get("depends_on", [])
  216. for dep in depends_on:
  217. if dep in graph:
  218. graph[dep].append(step_name)
  219. in_degree[step_name] = in_degree.get(step_name, 0) + 1
  220. # 拓扑排序
  221. batches: List[List[str]] = []
  222. queue = deque([step for step in step_order if step in in_degree and in_degree[step] == 0])
  223. while queue:
  224. # 当前批次:所有入度为0的步骤
  225. current_batch = []
  226. batch_size = len(queue)
  227. for _ in range(batch_size):
  228. step = queue.popleft()
  229. if step in self.steps:
  230. current_batch.append(step)
  231. # 减少依赖此步骤的步骤的入度
  232. for dependent in graph.get(step, []):
  233. in_degree[dependent] -= 1
  234. if in_degree[dependent] == 0:
  235. queue.append(dependent)
  236. if current_batch:
  237. batches.append(current_batch)
  238. return batches
  239. def should_run_step(self, step_name: str) -> bool:
  240. """判断步骤是否应该执行"""
  241. step_info = self.steps.get(step_name, {})
  242. force_rerun = step_info.get("force_rerun", False)
  243. current_status = self.get_step_status(step_name)
  244. # 强制重新执行
  245. if force_rerun:
  246. return True
  247. # 已完成或跳过,不需要重新执行
  248. if current_status == StepStatus.COMPLETED:
  249. return False
  250. # 待执行或失败,需要执行
  251. if current_status in [StepStatus.PENDING, StepStatus.FAILED]:
  252. return True
  253. # 执行中(可能是上次中断),需要重新执行
  254. if current_status == StepStatus.RUNNING:
  255. return True
  256. return False
  257. def run_step(self, step_name: str, *args, **kwargs) -> Any:
  258. """
  259. 执行单个步骤
  260. Args:
  261. step_name: 步骤名称
  262. *args, **kwargs: 传递给步骤函数的参数
  263. Returns:
  264. 步骤的输出结果
  265. """
  266. if step_name not in self.steps:
  267. raise ValueError(f"步骤 {step_name} 未注册")
  268. # 检查是否需要执行
  269. if not self.should_run_step(step_name):
  270. self.logger.info(f"步骤 {step_name} 已完成,跳过执行。使用 load_step_output() 获取结果。")
  271. return self.load_step_output(step_name)
  272. # 检查依赖
  273. if not self.check_dependencies(step_name):
  274. raise RuntimeError(f"步骤 {step_name} 的依赖未满足")
  275. step_info = self.steps[step_name]
  276. func = step_info["func"]
  277. # 标记为执行中
  278. self.set_step_status(step_name, StepStatus.RUNNING)
  279. try:
  280. self.logger.info(f"开始执行步骤: {step_name}")
  281. # 执行步骤函数(支持同步和异步函数)
  282. import asyncio
  283. import inspect
  284. # 检测函数是否为异步函数
  285. if inspect.iscoroutinefunction(func):
  286. # 异步函数:在当前事件循环中运行,如果没有则创建新的
  287. try:
  288. loop = asyncio.get_event_loop()
  289. if loop.is_running():
  290. # 如果事件循环正在运行,需要在新线程中运行
  291. import concurrent.futures
  292. with concurrent.futures.ThreadPoolExecutor() as executor:
  293. future = executor.submit(asyncio.run, func(*args, **kwargs))
  294. output = future.result()
  295. else:
  296. output = loop.run_until_complete(func(*args, **kwargs))
  297. except RuntimeError:
  298. # 没有事件循环,创建新的
  299. output = asyncio.run(func(*args, **kwargs))
  300. else:
  301. # 同步函数:直接调用
  302. output = func(*args, **kwargs)
  303. # 保存输出
  304. self.save_step_output(step_name, output)
  305. # 标记为已完成
  306. self.set_step_status(step_name, StepStatus.COMPLETED)
  307. self.logger.info(f"步骤 {step_name} 执行完成")
  308. return output
  309. except Exception as e:
  310. # 标记为失败
  311. self.set_step_status(step_name, StepStatus.FAILED, error=str(e))
  312. self.logger.error(f"步骤 {step_name} 执行失败: {e}", exc_info=True)
  313. raise
  314. def run_all(self, step_order: Optional[List[str]] = None):
  315. """
  316. 按顺序执行所有步骤
  317. Args:
  318. step_order: 步骤执行顺序(如果为None,则按注册顺序执行)
  319. """
  320. if step_order is None:
  321. step_order = list(self.steps.keys())
  322. for step_name in step_order:
  323. if step_name not in self.steps:
  324. self.logger.warning(f"步骤 {step_name} 未注册,跳过")
  325. continue
  326. try:
  327. self.run_step(step_name)
  328. except Exception as e:
  329. self.logger.error(f"执行步骤 {step_name} 时出错: {e}", exc_info=True)
  330. # 可以选择继续执行后续步骤,或者中断
  331. # 这里选择中断,可以根据需要修改
  332. raise
  333. def run_all_parallel(
  334. self,
  335. step_order: Optional[List[str]] = None,
  336. max_workers: Optional[int] = None,
  337. continue_on_error: bool = False
  338. ):
  339. """
  340. 并行执行所有步骤(自动检测可并行步骤)
  341. 该方法会自动分析步骤依赖关系,将可以并行执行的步骤分组执行。
  342. 例如:如果step3和step4都依赖step2,它们会在step2完成后并行执行。
  343. Args:
  344. step_order: 步骤执行顺序(如果为None,则按注册顺序执行)
  345. max_workers: 最大并行工作线程数(如果为None,使用批次大小)
  346. continue_on_error: 遇到错误时是否继续执行后续步骤(默认False,遇到错误立即停止)
  347. 使用示例:
  348. >>> manager.register_step("step1", func1, depends_on=[])
  349. >>> manager.register_step("step2", func2, depends_on=["step1"])
  350. >>> manager.register_step("step3", func3, depends_on=["step2"])
  351. >>> manager.register_step("step4", func4, depends_on=["step2"])
  352. >>> # step3和step4会在step2完成后并行执行
  353. >>> manager.run_all_parallel(max_workers=2)
  354. """
  355. if step_order is None:
  356. step_order = list(self.steps.keys())
  357. # 获取拓扑排序的批次
  358. batches = self._topological_sort(step_order)
  359. if not batches:
  360. self.logger.warning("没有可执行的步骤")
  361. return
  362. self.logger.info(f"共 {len(batches)} 个执行批次")
  363. # 按批次执行
  364. for batch_idx, batch in enumerate(batches, 1):
  365. # 过滤出需要执行的步骤
  366. ready_steps = [step for step in batch if self.should_run_step(step) and self.check_dependencies(step)]
  367. if not ready_steps:
  368. self.logger.info(f"批次 {batch_idx}/{len(batches)}: 所有步骤已完成,跳过")
  369. continue
  370. self.logger.info(f"批次 {batch_idx}/{len(batches)}: 执行步骤 {ready_steps} (共 {len(ready_steps)} 个)")
  371. # 如果只有一个步骤,直接执行(避免线程开销)
  372. if len(ready_steps) == 1:
  373. try:
  374. self.run_step(ready_steps[0])
  375. except Exception as e:
  376. self.logger.error(f"执行步骤 {ready_steps[0]} 时出错: {e}", exc_info=True)
  377. if not continue_on_error:
  378. raise
  379. else:
  380. # 并行执行多个步骤
  381. workers = max_workers if max_workers is not None else len(ready_steps)
  382. with ThreadPoolExecutor(max_workers=workers) as executor:
  383. # 提交所有步骤
  384. future_to_step = {
  385. executor.submit(self.run_step, step_name): step_name
  386. for step_name in ready_steps
  387. }
  388. # 等待所有步骤完成
  389. for future in as_completed(future_to_step):
  390. step_name = future_to_step[future]
  391. try:
  392. result = future.result()
  393. self.logger.info(f"步骤 {step_name} 执行完成")
  394. except Exception as e:
  395. self.logger.error(f"执行步骤 {step_name} 时出错: {e}", exc_info=True)
  396. if not continue_on_error:
  397. # 取消其他未完成的步骤
  398. for f in future_to_step:
  399. f.cancel()
  400. raise
  401. async def run_step_async(self, step_name: str, *args, **kwargs) -> Any:
  402. """
  403. 异步执行单个步骤
  404. Args:
  405. step_name: 步骤名称
  406. *args, **kwargs: 传递给步骤函数的参数
  407. Returns:
  408. 步骤的输出结果
  409. """
  410. if step_name not in self.steps:
  411. raise ValueError(f"步骤 {step_name} 未注册")
  412. # 检查是否需要执行
  413. if not self.should_run_step(step_name):
  414. self.logger.info(f"步骤 {step_name} 已完成,跳过执行。使用 load_step_output() 获取结果。")
  415. return self.load_step_output(step_name)
  416. # 检查依赖
  417. if not self.check_dependencies(step_name):
  418. raise RuntimeError(f"步骤 {step_name} 的依赖未满足")
  419. step_info = self.steps[step_name]
  420. func = step_info["func"]
  421. # 标记为执行中
  422. self.set_step_status(step_name, StepStatus.RUNNING)
  423. try:
  424. self.logger.info(f"开始执行步骤: {step_name}")
  425. # 执行步骤函数(支持同步和异步函数)
  426. import inspect
  427. # 检测函数是否为异步函数
  428. if inspect.iscoroutinefunction(func):
  429. # 异步函数:直接 await
  430. output = await func(*args, **kwargs)
  431. else:
  432. # 同步函数:在线程池中运行,避免阻塞事件循环
  433. loop = asyncio.get_event_loop()
  434. output = await loop.run_in_executor(None, func, *args, **kwargs)
  435. # 保存输出
  436. self.save_step_output(step_name, output)
  437. # 标记为已完成
  438. self.set_step_status(step_name, StepStatus.COMPLETED)
  439. self.logger.info(f"步骤 {step_name} 执行完成")
  440. return output
  441. except Exception as e:
  442. # 标记为失败
  443. self.set_step_status(step_name, StepStatus.FAILED, error=str(e))
  444. self.logger.error(f"步骤 {step_name} 执行失败: {e}", exc_info=True)
  445. raise
  446. async def run_all_async(
  447. self,
  448. step_order: Optional[List[str]] = None,
  449. continue_on_error: bool = False
  450. ):
  451. """
  452. 异步并行执行所有步骤(自动检测可并行步骤)
  453. 该方法会自动分析步骤依赖关系,将可以并行执行的步骤分组执行。
  454. 使用 asyncio.gather() 实现真正的异步并发,比 ThreadPoolExecutor 更高效。
  455. 例如:如果step3和step4都依赖step2,它们会在step2完成后并行执行。
  456. Args:
  457. step_order: 步骤执行顺序(如果为None,则按注册顺序执行)
  458. continue_on_error: 遇到错误时是否继续执行后续步骤(默认False,遇到错误立即停止)
  459. 使用示例:
  460. >>> async def main():
  461. ... manager = TaskManager(...)
  462. ... manager.register_step("step1", async_func1, depends_on=[])
  463. ... manager.register_step("step2", async_func2, depends_on=["step1"])
  464. ... manager.register_step("step3", async_func3, depends_on=["step2"])
  465. ... manager.register_step("step4", async_func4, depends_on=["step2"])
  466. ... # step3和step4会在step2完成后并行执行
  467. ... await manager.run_all_async()
  468. """
  469. if step_order is None:
  470. step_order = list(self.steps.keys())
  471. # 获取拓扑排序的批次
  472. batches = self._topological_sort(step_order)
  473. if not batches:
  474. self.logger.warning("没有可执行的步骤")
  475. return
  476. self.logger.info(f"共 {len(batches)} 个执行批次(异步并发执行)")
  477. # 按批次执行
  478. for batch_idx, batch in enumerate(batches, 1):
  479. # 过滤出需要执行的步骤
  480. ready_steps = [step for step in batch if self.should_run_step(step) and self.check_dependencies(step)]
  481. if not ready_steps:
  482. self.logger.info(f"批次 {batch_idx}/{len(batches)}: 所有步骤已完成,跳过")
  483. continue
  484. self.logger.info(f"批次 {batch_idx}/{len(batches)}: 异步并发执行步骤 {ready_steps} (共 {len(ready_steps)} 个)")
  485. # 如果只有一个步骤,直接执行(避免并发开销)
  486. if len(ready_steps) == 1:
  487. try:
  488. await self.run_step_async(ready_steps[0])
  489. except Exception as e:
  490. self.logger.error(f"执行步骤 {ready_steps[0]} 时出错: {e}", exc_info=True)
  491. if not continue_on_error:
  492. raise
  493. else:
  494. # 异步并发执行多个步骤
  495. tasks = [self.run_step_async(step_name) for step_name in ready_steps]
  496. # 使用 asyncio.gather 并发执行所有步骤
  497. results = await asyncio.gather(*tasks, return_exceptions=True)
  498. # 处理结果和异常
  499. for step_name, result in zip(ready_steps, results):
  500. if isinstance(result, Exception):
  501. self.logger.error(f"执行步骤 {step_name} 时出错: {result}", exc_info=True)
  502. if not continue_on_error:
  503. raise result
  504. else:
  505. self.logger.info(f"步骤 {step_name} 执行完成")
  506. def reset_step(self, step_name: str):
  507. """重置步骤状态,使其可以重新执行"""
  508. if step_name in self.state["steps"]:
  509. self.state["steps"][step_name] = {
  510. "status": StepStatus.PENDING,
  511. "started_at": None,
  512. "completed_at": None,
  513. "error": None,
  514. "output_file": None
  515. }
  516. # 删除缓存文件
  517. cache_path = self._get_cache_path(step_name)
  518. if cache_path.exists():
  519. cache_path.unlink()
  520. self._save_state()
  521. self.logger.info(f"步骤 {step_name} 已重置")
  522. def reset_all(self):
  523. """重置所有步骤状态"""
  524. for step_name in list(self.state["steps"].keys()):
  525. self.reset_step(step_name)
  526. def get_summary(self) -> Dict:
  527. """获取任务执行摘要"""
  528. total = len(self.state["steps"])
  529. completed = sum(1 for s in self.state["steps"].values()
  530. if s["status"] == StepStatus.COMPLETED)
  531. failed = sum(1 for s in self.state["steps"].values()
  532. if s["status"] == StepStatus.FAILED)
  533. pending = sum(1 for s in self.state["steps"].values()
  534. if s["status"] == StepStatus.PENDING)
  535. return {
  536. "total": total,
  537. "completed": completed,
  538. "failed": failed,
  539. "pending": pending,
  540. "progress": f"{completed}/{total}" if total > 0 else "0/0"
  541. }