run_manager.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. """
  2. 运行管理模块
  3. 管理每次运行的输出目录,确保每次运行的结果独立存储
  4. """
  5. from datetime import datetime
  6. from pathlib import Path
  7. from typing import Optional
  8. from .logger import get_logger
  9. class RunManager:
  10. """
  11. 运行管理类:为每次运行创建独立的输出目录
  12. 每次运行都会在基础输出目录下创建一个独立的子目录
  13. 用于存储该次运行的所有结果文件,便于区分和管理不同运行的结果
  14. 使用示例:
  15. >>> run_manager = RunManager(base_output_dir="output")
  16. >>> run_dir = run_manager.create_run_directory()
  17. >>> print(run_dir) # 输出: output/run_20250101_120000
  18. """
  19. def __init__(self, base_output_dir: str = "output"):
  20. """
  21. 初始化运行管理器
  22. Args:
  23. base_output_dir: 基础输出目录
  24. """
  25. self.base_output_dir = Path(base_output_dir)
  26. self.base_output_dir.mkdir(parents=True, exist_ok=True)
  27. self.current_run_dir: Optional[Path] = None
  28. self.run_id: Optional[str] = None
  29. self.logger = get_logger("taskflow.run_manager")
  30. def create_run_directory(self, run_id: Optional[str] = None) -> str:
  31. """
  32. 创建本次运行的输出目录
  33. Args:
  34. run_id: 运行ID(如果为None,使用当前时间戳)
  35. Returns:
  36. str: 本次运行的输出目录路径
  37. """
  38. if run_id is None:
  39. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  40. run_id = f"run_{timestamp}"
  41. self.run_id = run_id
  42. self.current_run_dir = self.base_output_dir / run_id
  43. self.current_run_dir.mkdir(parents=True, exist_ok=True)
  44. self.logger.info(f"本次运行的输出目录: {self.current_run_dir}")
  45. return str(self.current_run_dir)
  46. def get_run_directory(self) -> str:
  47. """
  48. 获取本次运行的输出目录
  49. Returns:
  50. str: 本次运行的输出目录路径
  51. Raises:
  52. RuntimeError: 未创建运行目录
  53. """
  54. if self.current_run_dir is None:
  55. raise RuntimeError("未创建运行目录, 请先调用 create_run_directory 方法")
  56. return str(self.current_run_dir)
  57. def get_run_id(self) -> str:
  58. """
  59. 获取本次运行的运行ID
  60. Returns:
  61. str: 本次运行的运行ID
  62. """
  63. if self.run_id is None:
  64. raise RuntimeError("未创建运行目录, 请先调用 create_run_directory 方法")
  65. return self.run_id
  66. def list_runs(self) -> list:
  67. """
  68. 列出所有以存在的运行目录
  69. Returns:
  70. 运行ID列表,按时间排序,最新的运行排在前面
  71. """
  72. runs = []
  73. for item in self.base_output_dir.iterdir():
  74. if item.is_dir() and item.name.startswith("run_"):
  75. runs.append({
  76. "run_id": item.name,
  77. "path": str(item),
  78. "created_at": datetime.fromtimestamp(item.stat().st_ctime).isoformat()
  79. })
  80. runs.sort(key=lambda x: x["created_at"], reverse=True)
  81. return runs