""" 文本分析示例 - 主程序 演示如何使用 TaskFlow 进行文本文件的处理和分析 """ import logging from pathlib import Path from taskflow import TaskManager, FileIOHandler, RunManager from taskflow import setup_logger from .steps import TextAnalysisSteps # 设置日志 logger = setup_logger("text_analysis.main", level=logging.INFO) def create_sample_text_file(input_file: str, io_handler: FileIOHandler): """创建示例文本文件(如果不存在)""" if io_handler.file_exists(input_file): logger.info(f"输入文件已存在: {input_file}") return # 创建示例文本 sample_text = """ TaskFlow is a powerful Python framework for managing multi-step workflows. It supports checkpoint and resume functionality, allowing you to continue from where you left off if a process is interrupted. The framework provides features like step dependency management, output caching, and automatic state persistence. You can easily register steps, define dependencies, and execute complex workflows with confidence. With TaskFlow, you can build robust data processing pipelines, batch jobs, and any other multi-step processes that require reliability and resumability. """ io_handler.write_text(sample_text.strip(), input_file) logger.info(f"已创建示例文本文件: {input_file}") def main(): """主程序""" logger.info("=== 文本分析示例 ===\n") # 1. 创建运行管理器 run_manager = RunManager(base_output_dir="output") run_output_dir = run_manager.create_run_directory() run_id = run_manager.get_run_id() logger.info(f"运行ID: {run_id}") logger.info(f"输出目录: {run_output_dir}") # 2. 创建文件I/O处理器 io_handler = FileIOHandler() # 3. 创建示例输入文件 input_file = "./data/input/sample_text.txt" create_sample_text_file(input_file, io_handler) # 4. 创建任务管理器 state_file = str(Path(run_output_dir) / "task_state.json") cache_dir = str(Path(run_output_dir) / "task_cache") manager = TaskManager( state_file=state_file, cache_dir=cache_dir ) # 5. 创建步骤包装器 step_wrapper = TextAnalysisSteps(io_handler, run_output_dir, manager) # 6. 注册步骤 logger.info("注册步骤...\n") manager.register_step( "step1", lambda: step_wrapper.step1_read_text(input_file), force_rerun=False ) manager.register_step( "step2", lambda: step_wrapper.step2_analyze_words(), depends_on=["step1"], force_rerun=False ) manager.register_step( "step3", lambda: step_wrapper.step3_analyze_sentences(), depends_on=["step1"], # 也依赖step1 force_rerun=False ) manager.register_step( "step4", lambda: step_wrapper.step4_generate_report(), depends_on=["step2", "step3"], # 依赖step2和step3 force_rerun=False ) # 7. 显示当前状态 summary = manager.get_summary() logger.info(f"总步骤数: {summary['total']}") logger.info(f"待执行: {summary['pending']}") # 8. 执行所有步骤 logger.info("\n开始执行文本分析...") try: manager.run_all(step_order=["step1", "step2", "step3", "step4"]) logger.info("\n文本分析完成!") # 9. 显示结果 report = manager.load_step_output("step4") if report: logger.info("\n分析结果摘要:") stats = report["data"]["text_statistics"] logger.info(f" 总行数: {stats['total_lines']}") logger.info(f" 总词数: {stats['total_words']}") logger.info(f" 总字符数: {stats['total_characters']}") word_analysis = report["data"]["word_analysis"] logger.info(f" 不同词数: {word_analysis['total_unique_words']}") sentence_analysis = report["data"]["sentence_analysis"] logger.info(f" 句子数: {sentence_analysis['sentence_count']}") logger.info(f" 平均句子长度: {sentence_analysis['average_length']:.2f} 词") except Exception as e: logger.error(f"\n✗ 执行失败: {e}", exc_info=True) raise logger.info(f"\n所有结果已保存到: {run_output_dir}") if __name__ == "__main__": main()