processors.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. """
  2. 文本分析处理函数
  3. 纯业务逻辑,不涉及文件I/O
  4. """
  5. import re
  6. from taskflow import get_logger
  7. from typing import Dict, List
  8. logger = get_logger("examples.text_analysis.processors")
  9. def process_read_text(text_content: str) -> Dict:
  10. """
  11. 步骤1: 读取并预处理文本
  12. Args:
  13. text_content: 文本内容
  14. Returns:
  15. 处理后的文本数据
  16. """
  17. logger.info("正在预处理文本...")
  18. # 统计基本信息
  19. lines = text_content.split('\n')
  20. words = text_content.split()
  21. characters = len(text_content)
  22. result = {
  23. "original_text": text_content,
  24. "line_count": len(lines),
  25. "word_count": len(words),
  26. "character_count": characters,
  27. "lines": lines
  28. }
  29. logger.info(f"文本统计: {len(lines)} 行, {len(words)} 词, {characters} 字符")
  30. return result
  31. def process_analyze_words(text_data: Dict) -> Dict:
  32. """
  33. 步骤2: 分析词频
  34. Args:
  35. text_data: 文本数据
  36. Returns:
  37. 词频分析结果
  38. """
  39. logger.info("正在分析词频...")
  40. text = text_data["original_text"]
  41. words = re.findall(r'\b\w+\b', text.lower())
  42. # 统计词频
  43. word_freq = {}
  44. for word in words:
  45. word_freq[word] = word_freq.get(word, 0) + 1
  46. # 排序
  47. sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
  48. result = {
  49. "word_frequency": dict(sorted_words[:20]), # 前20个最常见的词
  50. "total_unique_words": len(word_freq),
  51. "total_words": len(words)
  52. }
  53. logger.info(f"发现 {len(word_freq)} 个不同的词")
  54. return result
  55. def process_analyze_sentences(text_data: Dict) -> Dict:
  56. """
  57. 步骤3: 分析句子
  58. Args:
  59. text_data: 文本数据
  60. Returns:
  61. 句子分析结果
  62. """
  63. logger.info("正在分析句子...")
  64. text = text_data["original_text"]
  65. # 简单的句子分割(基于句号、问号、感叹号)
  66. sentences = re.split(r'[.!?]+', text)
  67. sentences = [s.strip() for s in sentences if s.strip()]
  68. # 统计句子长度
  69. sentence_lengths = [len(s.split()) for s in sentences]
  70. result = {
  71. "sentence_count": len(sentences),
  72. "average_sentence_length": sum(sentence_lengths) / len(sentence_lengths) if sentence_lengths else 0,
  73. "max_sentence_length": max(sentence_lengths) if sentence_lengths else 0,
  74. "min_sentence_length": min(sentence_lengths) if sentence_lengths else 0
  75. }
  76. logger.info(f"发现 {len(sentences)} 个句子")
  77. return result
  78. def process_generate_report(word_analysis: Dict, sentence_analysis: Dict, text_data: Dict) -> Dict:
  79. """
  80. 步骤4: 生成分析报告
  81. Args:
  82. word_analysis: 词频分析结果
  83. sentence_analysis: 句子分析结果
  84. text_data: 原始文本数据
  85. Returns:
  86. 分析报告
  87. """
  88. logger.info("正在生成分析报告...")
  89. report = {
  90. "text_statistics": {
  91. "total_lines": text_data["line_count"],
  92. "total_words": text_data["word_count"],
  93. "total_characters": text_data["character_count"]
  94. },
  95. "word_analysis": {
  96. "total_unique_words": word_analysis["total_unique_words"],
  97. "total_words": word_analysis["total_words"],
  98. "top_words": word_analysis["word_frequency"]
  99. },
  100. "sentence_analysis": {
  101. "sentence_count": sentence_analysis["sentence_count"],
  102. "average_length": sentence_analysis["average_sentence_length"],
  103. "max_length": sentence_analysis["max_sentence_length"],
  104. "min_length": sentence_analysis["min_sentence_length"]
  105. }
  106. }
  107. logger.info("分析报告生成完成")
  108. return report