""" 文本分析步骤包装器 连接文件I/O和业务逻辑 """ from pathlib import Path from typing import Dict from taskflow import TaskManager, FileIOHandler from .processors import ( process_read_text, process_analyze_words, process_analyze_sentences, process_generate_report ) class TextAnalysisSteps: """文本分析步骤包装器""" def __init__(self, io_handler: FileIOHandler, output_dir: str, manager: TaskManager): """ 初始化步骤包装器 Args: io_handler: 文件I/O处理器 output_dir: 输出目录 manager: 任务管理器 """ self.io_handler = io_handler self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) self.manager = manager def step1_read_text(self, input_file: str) -> Dict: """步骤1:读取并预处理文本""" # 读取文件 text_content = self.io_handler.read_text(input_file) # 处理文本 text_data = process_read_text(text_content) # 保存结果 output_file = str(self.output_dir / "step1_result.json") self.io_handler.write_json(text_data, output_file) return { "output_file": output_file, "data": text_data } def step2_analyze_words(self) -> Dict: """步骤2:分析词频""" # 加载上一步的输出 previous_output = self.manager.load_step_output("step1") if previous_output is None: raise ValueError("步骤1未完成,无法分析词频") text_data = previous_output["data"] # 分析词频 word_analysis = process_analyze_words(text_data) # 保存结果 output_file = str(self.output_dir / "step2_result.json") self.io_handler.write_json(word_analysis, output_file) return { "output_file": output_file, "data": word_analysis } def step3_analyze_sentences(self) -> Dict: """步骤3: 分析句子""" # 获取步骤1的输出 step1_output = self.manager.load_step_output("step1") if step1_output is None: raise ValueError("无法获取步骤1的输出") text_data = step1_output["data"] # 分析句子 sentence_analysis = process_analyze_sentences(text_data) # 保存结果 output_file = str(self.output_dir / "step3_sentence_analysis.json") self.io_handler.write_json(sentence_analysis, output_file) return { "output_file": output_file, "data": sentence_analysis } def step4_generate_report(self) -> Dict: """步骤4: 生成报告""" # 获取之前的输出 step2_output = self.manager.load_step_output("step2") step3_output = self.manager.load_step_output("step3") step1_output = self.manager.load_step_output("step1") if not all([step1_output, step2_output, step3_output]): raise ValueError("无法获取所有步骤的输出") # 生成报告 report = process_generate_report( step2_output["data"], step3_output["data"], step1_output["data"] ) # 保存报告 output_file = str(self.output_dir / "step4_report.json") self.io_handler.write_json(report, output_file) return { "output_file": output_file, "data": report }