steps.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. """
  2. 文本分析步骤包装器
  3. 连接文件I/O和业务逻辑
  4. """
  5. from pathlib import Path
  6. from typing import Dict
  7. from taskflow import TaskManager, FileIOHandler
  8. from .processors import (
  9. process_read_text,
  10. process_analyze_words,
  11. process_analyze_sentences,
  12. process_generate_report
  13. )
  14. class TextAnalysisSteps:
  15. """文本分析步骤包装器"""
  16. def __init__(self, io_handler: FileIOHandler, output_dir: str, manager: TaskManager):
  17. """
  18. 初始化步骤包装器
  19. Args:
  20. io_handler: 文件I/O处理器
  21. output_dir: 输出目录
  22. manager: 任务管理器
  23. """
  24. self.io_handler = io_handler
  25. self.output_dir = Path(output_dir)
  26. self.output_dir.mkdir(parents=True, exist_ok=True)
  27. self.manager = manager
  28. def step1_read_text(self, input_file: str) -> Dict:
  29. """步骤1:读取并预处理文本"""
  30. # 读取文件
  31. text_content = self.io_handler.read_text(input_file)
  32. # 处理文本
  33. text_data = process_read_text(text_content)
  34. # 保存结果
  35. output_file = str(self.output_dir / "step1_result.json")
  36. self.io_handler.write_json(text_data, output_file)
  37. return {
  38. "output_file": output_file,
  39. "data": text_data
  40. }
  41. def step2_analyze_words(self) -> Dict:
  42. """步骤2:分析词频"""
  43. # 加载上一步的输出
  44. previous_output = self.manager.load_step_output("step1")
  45. if previous_output is None:
  46. raise ValueError("步骤1未完成,无法分析词频")
  47. text_data = previous_output["data"]
  48. # 分析词频
  49. word_analysis = process_analyze_words(text_data)
  50. # 保存结果
  51. output_file = str(self.output_dir / "step2_result.json")
  52. self.io_handler.write_json(word_analysis, output_file)
  53. return {
  54. "output_file": output_file,
  55. "data": word_analysis
  56. }
  57. def step3_analyze_sentences(self) -> Dict:
  58. """步骤3: 分析句子"""
  59. # 获取步骤1的输出
  60. step1_output = self.manager.load_step_output("step1")
  61. if step1_output is None:
  62. raise ValueError("无法获取步骤1的输出")
  63. text_data = step1_output["data"]
  64. # 分析句子
  65. sentence_analysis = process_analyze_sentences(text_data)
  66. # 保存结果
  67. output_file = str(self.output_dir / "step3_sentence_analysis.json")
  68. self.io_handler.write_json(sentence_analysis, output_file)
  69. return {
  70. "output_file": output_file,
  71. "data": sentence_analysis
  72. }
  73. def step4_generate_report(self) -> Dict:
  74. """步骤4: 生成报告"""
  75. # 获取之前的输出
  76. step2_output = self.manager.load_step_output("step2")
  77. step3_output = self.manager.load_step_output("step3")
  78. step1_output = self.manager.load_step_output("step1")
  79. if not all([step1_output, step2_output, step3_output]):
  80. raise ValueError("无法获取所有步骤的输出")
  81. # 生成报告
  82. report = process_generate_report(
  83. step2_output["data"],
  84. step3_output["data"],
  85. step1_output["data"]
  86. )
  87. # 保存报告
  88. output_file = str(self.output_dir / "step4_report.json")
  89. self.io_handler.write_json(report, output_file)
  90. return {
  91. "output_file": output_file,
  92. "data": report
  93. }