|
@@ -0,0 +1,325 @@
|
|
|
|
+import PyPDF2
|
|
|
|
+import docx
|
|
|
|
+import nltk, subprocess, os
|
|
|
|
+from config import nltk_path, converter
|
|
|
|
+from typing import List, Union
|
|
|
|
+from pathlib import Path
|
|
|
|
+import re
|
|
|
|
+import pandas as pd
|
|
|
|
+from nltk.tokenize import sent_tokenize
|
|
|
|
+import spacy
|
|
|
|
+from langdetect import detect
|
|
|
|
+from milvus_process import update_mulvus_file
|
|
|
|
+import fitz
|
|
|
|
+from marker.output import text_from_rendered
|
|
|
|
+try:
|
|
|
|
+ nltk.data.find(nltk_path)
|
|
|
|
+except LookupError:
|
|
|
|
+ nltk.download('punkt')
|
|
|
|
+try:
|
|
|
|
+ nlp = spacy.load("zh_core_web_sm")
|
|
|
|
+except OSError:
|
|
|
|
+ pass
|
|
|
|
+class DocumentProcessor:
|
|
|
|
+ def __init__(self,):
|
|
|
|
+ pass
|
|
|
|
+ def get_file_len(self, file_path: Union[str, Path]) -> int:
|
|
|
|
+ text = self.read_file(file_path)
|
|
|
|
+ length = len(text)
|
|
|
|
+ del text
|
|
|
|
+ return length
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def convert_docx_to_doc(input_path):
|
|
|
|
+ output_dir = os.path.dirname(input_path)
|
|
|
|
+ command = [
|
|
|
|
+ "soffice", "--headless", "--convert-to", "docx", input_path, "--outdir", output_dir
|
|
|
|
+ ]
|
|
|
|
+ subprocess.run(command, check=True)
|
|
|
|
+
|
|
|
|
+ def _read_doc(self, file_path) -> str:
|
|
|
|
+ """读取Word文档"""
|
|
|
|
+ self.convert_docx_to_doc(file_path)
|
|
|
|
+ old_file = Path(file_path) # 原始 .doc 文件
|
|
|
|
+ new_file = old_file.with_suffix(".docx") # 转换后的 .docx 文件
|
|
|
|
+
|
|
|
|
+ if old_file.exists(): # 确保旧文件存在
|
|
|
|
+ old_file.unlink() # 删除旧文件
|
|
|
|
+
|
|
|
|
+ doc = docx.Document(new_file) # 读取 .docx
|
|
|
|
+ text = "\n".join([paragraph.text for paragraph in doc.paragraphs])
|
|
|
|
+ return self.create_chunks(text)
|
|
|
|
+
|
|
|
|
+ def read_file(self, file_path: Union[str, Path]) -> str:
|
|
|
|
+ """
|
|
|
|
+ 读取不同格式的文档
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ file_path: 文件路径
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ str: 提取的文本内容
|
|
|
|
+ """
|
|
|
|
+ file_path = Path(file_path)
|
|
|
|
+ extension = file_path.suffix.lower()
|
|
|
|
+
|
|
|
|
+ if extension == '.pdf':
|
|
|
|
+ return self._read_pdf(file_path)
|
|
|
|
+ elif extension == '.docx':
|
|
|
|
+ return self._read_docx(file_path)
|
|
|
|
+ elif extension == '.doc':
|
|
|
|
+ return self._read_doc(file_path)
|
|
|
|
+ elif extension == '.txt':
|
|
|
|
+ return self._read_txt(file_path)
|
|
|
|
+ elif extension == '.csv':
|
|
|
|
+ return self._read_csv(file_path)
|
|
|
|
+ elif extension == '.xlsx':
|
|
|
|
+ return self._read_excel(file_path)
|
|
|
|
+ else:
|
|
|
|
+ raise ValueError(f"Unsupported file format: {extension}")
|
|
|
|
+
|
|
|
|
+ def _read_pdf(self, file_path) -> str:
|
|
|
|
+ """读取PDF文件"""
|
|
|
|
+ rendered = converter(str(file_path))
|
|
|
|
+ text, x, images = text_from_rendered(rendered)
|
|
|
|
+ return self.create_chunks(text=text)
|
|
|
|
+
|
|
|
|
+ def _read_docx(self, file_path: Path) -> str:
|
|
|
|
+ """读取Word文档"""
|
|
|
|
+ doc = docx.Document(file_path)
|
|
|
|
+ text = "\n".join([paragraph.text for paragraph in doc.paragraphs])
|
|
|
|
+ return self.create_chunks(text)
|
|
|
|
+
|
|
|
|
+ def _read_txt(self, file_path: Path) -> str:
|
|
|
|
+ """读取文本文件"""
|
|
|
|
+ with open(file_path, 'r', encoding='utf-8') as file:
|
|
|
|
+ return self.create_chunks(file.read())
|
|
|
|
+
|
|
|
|
+ def _read_excel(self, file_path: Path) -> str:
|
|
|
|
+ """读取Excel文件"""
|
|
|
|
+ df = pd.read_excel(file_path, sheet_name=None)
|
|
|
|
+ text = ""
|
|
|
|
+ for sheet_name, sheet_df in df.items():
|
|
|
|
+ text += f"\nSheet: {sheet_name}\n"
|
|
|
|
+ text += sheet_df.to_csv(index=False, sep=' ', header=True)
|
|
|
|
+ return self.create_chunks(text)
|
|
|
|
+
|
|
|
|
+ def _read_csv(self, file_path: Path) -> str:
|
|
|
|
+ """读取CSV文件"""
|
|
|
|
+ df = pd.read_csv(file_path)
|
|
|
|
+ return self.create_chunks(df.to_csv(index=False, sep=' ', header=True))
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ def _clean_text(self, text: str) -> str:
|
|
|
|
+ """
|
|
|
|
+ 清理文本
|
|
|
|
+ - 移除多余的空白字符
|
|
|
|
+ - 标准化换行符
|
|
|
|
+ """
|
|
|
|
+ # 替换多个空格为单个空格
|
|
|
|
+ text = re.sub(r'\s+', ' ', text)
|
|
|
|
+ # 标准化换行符
|
|
|
|
+ text = text.replace('\r\n', '\n').replace('\r', '\n')
|
|
|
|
+ # 移除空行
|
|
|
|
+ text = '\n'.join(line.strip() for line in text.split('\n') if line.strip())
|
|
|
|
+ return text.strip()
|
|
|
|
+
|
|
|
|
+ def split_into_sentences(self, text: str) -> List[str]:
|
|
|
|
+ """
|
|
|
|
+ 将文本分割成句子
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ text: 输入文本
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ List[str]: 句子列表
|
|
|
|
+ """
|
|
|
|
+ # 使用NLTK进行句子分割
|
|
|
|
+ sentences = sent_tokenize(text)
|
|
|
|
+ return sentences
|
|
|
|
+ def force_split_sentence(self, sentence: str, max_length: int) -> List[str]:
|
|
|
|
+ """
|
|
|
|
+ 强制将超长句子按字符数切分
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ sentence (str): 输入的句子
|
|
|
|
+ max_length (int): 最大长度
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ List[str]: 切分后的句子片段列表
|
|
|
|
+ """
|
|
|
|
+ # 使用标点符号作为次要切分点
|
|
|
|
+ punctuation = '。,;!?,.;!?'
|
|
|
|
+ parts = []
|
|
|
|
+ current_part = ''
|
|
|
|
+
|
|
|
|
+ # 优先在标点符号处切分
|
|
|
|
+ chars = list(sentence)
|
|
|
|
+ for i, char in enumerate(chars):
|
|
|
|
+ current_part += char
|
|
|
|
+
|
|
|
|
+ # 如果当前部分达到最大长度或遇到标点符号
|
|
|
|
+ if (len(current_part) >= max_length and char in punctuation) or \
|
|
|
|
+ (len(current_part) >= max_length * 1.2): # 允许略微超过max_length以寻找标点
|
|
|
|
+ parts.append(current_part)
|
|
|
|
+ current_part = ''
|
|
|
|
+
|
|
|
|
+ # 处理剩余部分
|
|
|
|
+ if current_part:
|
|
|
|
+ # 如果剩余部分仍然过长,强制按长度切分
|
|
|
|
+ while len(current_part) > max_length:
|
|
|
|
+ parts.append(current_part[:max_length] + '...')
|
|
|
|
+ current_part = '...' + current_part[max_length:]
|
|
|
|
+ parts.append(current_part)
|
|
|
|
+
|
|
|
|
+ return parts
|
|
|
|
+
|
|
|
|
+ def split_text_nltk(self, text: str, chunk_size: int = 1500, overlap_size: int = 100) -> List[str]:
|
|
|
|
+ """
|
|
|
|
+ 使用NLTK进行中文文本分割,支持文本块重叠和超长句子处理
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ text (str): 输入的中文文本
|
|
|
|
+ chunk_size (int): 每个chunk的近似字符数
|
|
|
|
+ overlap_size (int): 相邻chunk之间的重叠字符数
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ List[str]: 分割后的文本块列表
|
|
|
|
+ """
|
|
|
|
+ text = self._clean_text(text)
|
|
|
|
+ sentences = nltk.sent_tokenize(text)
|
|
|
|
+ chunks = self.process_sentences(sentences=sentences, chunk_size=chunk_size, overlap_size=overlap_size)
|
|
|
|
+ return chunks
|
|
|
|
+
|
|
|
|
+ def split_text_spacy(self, text: str, chunk_size: int = 500, overlap_size: int = 100) -> List[str]:
|
|
|
|
+ """
|
|
|
|
+ 使用SpaCy进行中文文本分割,支持文本块重叠和超长句子处理
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ text (str): 输入的中文文本
|
|
|
|
+ chunk_size (int): 每个chunk的近似字符数
|
|
|
|
+ overlap_size (int): 相邻chunk之间的重叠字符数
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ List[str]: 分割后的文本块列表
|
|
|
|
+ """
|
|
|
|
+ text = self._clean_text(text)
|
|
|
|
+ doc = nlp(text)
|
|
|
|
+ chunks = []
|
|
|
|
+ sentences = [sent.text for sent in doc.sents]
|
|
|
|
+ chunks = self.process_sentences(sentences=sentences, chunk_size=chunk_size, overlap_size=overlap_size)
|
|
|
|
+ return chunks
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ def process_sentences(self, sentences, chunk_size: int = 500, overlap_size: int = 100):
|
|
|
|
+ chunks = []
|
|
|
|
+ current_chunk = []
|
|
|
|
+ current_length = 0
|
|
|
|
+
|
|
|
|
+ for sentence in sentences:
|
|
|
|
+ # 处理超长句子
|
|
|
|
+ if len(sentence) > chunk_size:
|
|
|
|
+ # 先处理当前chunk中已有的内容
|
|
|
|
+ if current_chunk:
|
|
|
|
+ chunks.append("".join(current_chunk))
|
|
|
|
+ current_chunk = []
|
|
|
|
+ current_length = 0
|
|
|
|
+
|
|
|
|
+ # 强制切分超长句子
|
|
|
|
+ sentence_parts = self.force_split_sentence(sentence, chunk_size)
|
|
|
|
+ for part in sentence_parts:
|
|
|
|
+ chunks.append(part)
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+ # 正常处理普通长度的句子
|
|
|
|
+ if current_length + len(sentence) <= chunk_size:
|
|
|
|
+ current_chunk.append(sentence)
|
|
|
|
+ current_length += len(sentence)
|
|
|
|
+ else:
|
|
|
|
+ if current_chunk:
|
|
|
|
+ chunks.append("".join(current_chunk))
|
|
|
|
+
|
|
|
|
+ # 处理重叠
|
|
|
|
+ overlap_chars = 0
|
|
|
|
+ overlap_sentences = []
|
|
|
|
+ for prev_sentence in reversed(current_chunk):
|
|
|
|
+ if overlap_chars + len(prev_sentence) <= overlap_size:
|
|
|
|
+ overlap_sentences.insert(0, prev_sentence)
|
|
|
|
+ overlap_chars += len(prev_sentence)
|
|
|
|
+ else:
|
|
|
|
+ break
|
|
|
|
+
|
|
|
|
+ current_chunk = overlap_sentences + [sentence]
|
|
|
|
+ current_length = sum(len(s) for s in current_chunk)
|
|
|
|
+
|
|
|
|
+ if current_chunk:
|
|
|
|
+ chunks.append("".join(current_chunk))
|
|
|
|
+
|
|
|
|
+ return chunks
|
|
|
|
+ def create_chunks(self, text: str, chunk_size=300, overlap_size=100) -> List[str]:
|
|
|
|
+ is_chinese = self.is_chinese_text(text)
|
|
|
|
+
|
|
|
|
+ if is_chinese:
|
|
|
|
+ # print('检测为中文文章, 采用spacy')
|
|
|
|
+ chunks = self.split_text_spacy(text,chunk_size=chunk_size,overlap_size=overlap_size)
|
|
|
|
+ else:
|
|
|
|
+ # print('检测为外文文章, 采用nltk')
|
|
|
|
+ chunks = self.split_text_spacy(text,chunk_size=chunk_size,overlap_size=overlap_size)
|
|
|
|
+ return chunks
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ def is_chinese_text(self, text: str) -> bool:
|
|
|
|
+ """
|
|
|
|
+ 判断文本是否主要为中文
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ text (str): 输入文本
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ bool: 如果是中文文本返回True,否则返回False
|
|
|
|
+ """
|
|
|
|
+ try:
|
|
|
|
+ chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text))
|
|
|
|
+ total_chars = len(re.findall(r'\w', text)) + chinese_chars
|
|
|
|
+ char_ratio = chinese_chars / max(total_chars, 1)
|
|
|
|
+ if char_ratio > 0.1:
|
|
|
|
+ return True
|
|
|
|
+ # 使用langdetect进行语言检测
|
|
|
|
+ lang = detect(text)
|
|
|
|
+ # 如果检测失败,使用备用方法
|
|
|
|
+ if not lang:
|
|
|
|
+ raise Exception("Language detection failed")
|
|
|
|
+
|
|
|
|
+ return lang == 'zh-cn' or lang == 'zh-tw' or lang == 'zh'
|
|
|
|
+
|
|
|
|
+ except Exception:
|
|
|
|
+ return char_ratio > 0.1
|
|
|
|
+ def process_document(self, file_path: Union[str, Path], chunk_size=1000, overlap_size=250) -> List[str]:
|
|
|
|
+ """
|
|
|
|
+ 处理文档的主方法
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ file_path: 文档路径
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ List[str]: 处理后的文本块列表
|
|
|
|
+ """
|
|
|
|
+ # 读取文档
|
|
|
|
+ text = self.read_file(file_path)
|
|
|
|
+ chunks = self.create_chunks(text=text, chunk_size=chunk_size, overlap_size=overlap_size)
|
|
|
|
+ # return chunks
|
|
|
|
+ return chunks
|
|
|
|
+
|
|
|
|
+if __name__ == '__main__':
|
|
|
|
+ import asyncio
|
|
|
|
+ processor = DocumentProcessor()
|
|
|
|
+
|
|
|
|
+ # 处理文档
|
|
|
|
+ chunks = processor.read_file("./tests/test.pdf")
|
|
|
|
+ # 打印结果
|
|
|
|
+ # for i, chunk in enumerate(chunks):
|
|
|
|
+ # print(f"Chunk {i+1}:")
|
|
|
|
+ # print(chunk)
|
|
|
|
+ # print(len(chunk))
|
|
|
|
+ # print("-" * 50)
|
|
|
|
+ status = asyncio.run(update_mulvus_file(client_id='test', file_name='test.pdf',chunks=chunks))
|