| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- """
- 文本分析示例 - 主程序
- 演示如何使用 TaskFlow 进行文本文件的处理和分析
- """
- import logging
- from pathlib import Path
- from taskflow import TaskManager, FileIOHandler, RunManager
- from taskflow import setup_logger
- from .steps import TextAnalysisSteps
- # 设置日志
- logger = setup_logger("text_analysis.main", level=logging.INFO)
- def create_sample_text_file(input_file: str, io_handler: FileIOHandler):
- """创建示例文本文件(如果不存在)"""
- if io_handler.file_exists(input_file):
- logger.info(f"输入文件已存在: {input_file}")
- return
-
- # 创建示例文本
- sample_text = """
- TaskFlow is a powerful Python framework for managing multi-step workflows.
- It supports checkpoint and resume functionality, allowing you to continue
- from where you left off if a process is interrupted.
-
- The framework provides features like step dependency management, output caching,
- and automatic state persistence. You can easily register steps, define dependencies,
- and execute complex workflows with confidence.
-
- With TaskFlow, you can build robust data processing pipelines, batch jobs,
- and any other multi-step processes that require reliability and resumability.
- """
-
- io_handler.write_text(sample_text.strip(), input_file)
- logger.info(f"已创建示例文本文件: {input_file}")
- def main():
- """主程序"""
- logger.info("=== 文本分析示例 ===\n")
- # 1. 创建运行管理器
- run_manager = RunManager(base_output_dir="output")
- run_output_dir = run_manager.create_run_directory()
- run_id = run_manager.get_run_id()
- logger.info(f"运行ID: {run_id}")
- logger.info(f"输出目录: {run_output_dir}")
- # 2. 创建文件I/O处理器
- io_handler = FileIOHandler()
- # 3. 创建示例输入文件
- input_file = "./data/input/sample_text.txt"
- create_sample_text_file(input_file, io_handler)
- # 4. 创建任务管理器
- state_file = str(Path(run_output_dir) / "task_state.json")
- cache_dir = str(Path(run_output_dir) / "task_cache")
- manager = TaskManager(
- state_file=state_file,
- cache_dir=cache_dir
- )
- # 5. 创建步骤包装器
- step_wrapper = TextAnalysisSteps(io_handler, run_output_dir, manager)
- # 6. 注册步骤
- logger.info("注册步骤...\n")
- manager.register_step(
- "step1",
- lambda: step_wrapper.step1_read_text(input_file),
- force_rerun=False
- )
- manager.register_step(
- "step2",
- lambda: step_wrapper.step2_analyze_words(),
- depends_on=["step1"],
- force_rerun=False
- )
- manager.register_step(
- "step3",
- lambda: step_wrapper.step3_analyze_sentences(),
- depends_on=["step1"], # 也依赖step1
- force_rerun=False
- )
-
- manager.register_step(
- "step4",
- lambda: step_wrapper.step4_generate_report(),
- depends_on=["step2", "step3"], # 依赖step2和step3
- force_rerun=False
- )
-
- # 7. 显示当前状态
- summary = manager.get_summary()
- logger.info(f"总步骤数: {summary['total']}")
- logger.info(f"待执行: {summary['pending']}")
-
- # 8. 执行所有步骤
- logger.info("\n开始执行文本分析...")
- try:
- manager.run_all(step_order=["step1", "step2", "step3", "step4"])
- logger.info("\n文本分析完成!")
- # 9. 显示结果
- report = manager.load_step_output("step4")
- if report:
- logger.info("\n分析结果摘要:")
- stats = report["data"]["text_statistics"]
- logger.info(f" 总行数: {stats['total_lines']}")
- logger.info(f" 总词数: {stats['total_words']}")
- logger.info(f" 总字符数: {stats['total_characters']}")
-
- word_analysis = report["data"]["word_analysis"]
- logger.info(f" 不同词数: {word_analysis['total_unique_words']}")
-
- sentence_analysis = report["data"]["sentence_analysis"]
- logger.info(f" 句子数: {sentence_analysis['sentence_count']}")
- logger.info(f" 平均句子长度: {sentence_analysis['average_length']:.2f} 词")
-
- except Exception as e:
- logger.error(f"\n✗ 执行失败: {e}", exc_info=True)
- raise
-
- logger.info(f"\n所有结果已保存到: {run_output_dir}")
- if __name__ == "__main__":
- main()
|