| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- """
- 文本分析步骤包装器
- 连接文件I/O和业务逻辑
- """
- from pathlib import Path
- from typing import Dict
- from taskflow import TaskManager, FileIOHandler
- from .processors import (
- process_read_text,
- process_analyze_words,
- process_analyze_sentences,
- process_generate_report
- )
- class TextAnalysisSteps:
- """文本分析步骤包装器"""
- def __init__(self, io_handler: FileIOHandler, output_dir: str, manager: TaskManager):
- """
- 初始化步骤包装器
-
- Args:
- io_handler: 文件I/O处理器
- output_dir: 输出目录
- manager: 任务管理器
- """
- self.io_handler = io_handler
- self.output_dir = Path(output_dir)
- self.output_dir.mkdir(parents=True, exist_ok=True)
- self.manager = manager
- def step1_read_text(self, input_file: str) -> Dict:
- """步骤1:读取并预处理文本"""
- # 读取文件
- text_content = self.io_handler.read_text(input_file)
- # 处理文本
- text_data = process_read_text(text_content)
- # 保存结果
- output_file = str(self.output_dir / "step1_result.json")
- self.io_handler.write_json(text_data, output_file)
- return {
- "output_file": output_file,
- "data": text_data
- }
- def step2_analyze_words(self) -> Dict:
- """步骤2:分析词频"""
- # 加载上一步的输出
- previous_output = self.manager.load_step_output("step1")
- if previous_output is None:
- raise ValueError("步骤1未完成,无法分析词频")
- text_data = previous_output["data"]
- # 分析词频
- word_analysis = process_analyze_words(text_data)
- # 保存结果
- output_file = str(self.output_dir / "step2_result.json")
- self.io_handler.write_json(word_analysis, output_file)
- return {
- "output_file": output_file,
- "data": word_analysis
- }
- def step3_analyze_sentences(self) -> Dict:
- """步骤3: 分析句子"""
- # 获取步骤1的输出
- step1_output = self.manager.load_step_output("step1")
- if step1_output is None:
- raise ValueError("无法获取步骤1的输出")
-
- text_data = step1_output["data"]
-
- # 分析句子
- sentence_analysis = process_analyze_sentences(text_data)
-
- # 保存结果
- output_file = str(self.output_dir / "step3_sentence_analysis.json")
- self.io_handler.write_json(sentence_analysis, output_file)
-
- return {
- "output_file": output_file,
- "data": sentence_analysis
- }
-
- def step4_generate_report(self) -> Dict:
- """步骤4: 生成报告"""
- # 获取之前的输出
- step2_output = self.manager.load_step_output("step2")
- step3_output = self.manager.load_step_output("step3")
- step1_output = self.manager.load_step_output("step1")
-
- if not all([step1_output, step2_output, step3_output]):
- raise ValueError("无法获取所有步骤的输出")
-
- # 生成报告
- report = process_generate_report(
- step2_output["data"],
- step3_output["data"],
- step1_output["data"]
- )
-
- # 保存报告
- output_file = str(self.output_dir / "step4_report.json")
- self.io_handler.write_json(report, output_file)
-
- return {
- "output_file": output_file,
- "data": report
- }
|