import PyPDF2 import docx import nltk, subprocess, os from config import nltk_path, converter from typing import List, Union from pathlib import Path import re import pandas as pd from nltk.tokenize import sent_tokenize import spacy from langdetect import detect from milvus_process import update_mulvus_file import fitz from marker.output import text_from_rendered try: nltk.data.find(nltk_path) except LookupError: nltk.download('punkt') try: nlp = spacy.load("zh_core_web_sm") except OSError: pass class DocumentProcessor: def __init__(self,): pass def get_file_len(self, file_path: Union[str, Path]) -> int: text = self.read_file(file_path) length = len(text) del text return length @staticmethod def convert_docx_to_doc(input_path): output_dir = os.path.dirname(input_path) command = [ "soffice", "--headless", "--convert-to", "docx", input_path, "--outdir", output_dir ] subprocess.run(command, check=True) def _read_doc(self, file_path) -> str: """读取Word文档""" self.convert_docx_to_doc(file_path) old_file = Path(file_path) # 原始 .doc 文件 new_file = old_file.with_suffix(".docx") # 转换后的 .docx 文件 if old_file.exists(): # 确保旧文件存在 old_file.unlink() # 删除旧文件 doc = docx.Document(new_file) # 读取 .docx text = "\n".join([paragraph.text for paragraph in doc.paragraphs]) return self.create_chunks(text) def read_file(self, file_path: Union[str, Path]) -> str: """ 读取不同格式的文档 Args: file_path: 文件路径 Returns: str: 提取的文本内容 """ file_path = Path(file_path) extension = file_path.suffix.lower() if extension == '.pdf': return self._read_pdf(file_path) elif extension == '.docx': return self._read_docx(file_path) elif extension == '.doc': return self._read_doc(file_path) elif extension == '.txt': return self._read_txt(file_path) elif extension == '.csv': return self._read_csv(file_path) elif extension == '.xlsx': return self._read_excel(file_path) else: raise ValueError(f"Unsupported file format: {extension}") def _read_pdf(self, file_path) -> str: """读取PDF文件""" rendered = converter(str(file_path)) text, x, images = text_from_rendered(rendered) return self.create_chunks(text=text) def _read_docx(self, file_path: Path) -> str: """读取Word文档""" doc = docx.Document(file_path) text = "\n".join([paragraph.text for paragraph in doc.paragraphs]) return self.create_chunks(text) def _read_txt(self, file_path: Path) -> str: """读取文本文件""" with open(file_path, 'r', encoding='utf-8') as file: return self.create_chunks(file.read()) def _read_excel(self, file_path: Path) -> str: """读取Excel文件""" df = pd.read_excel(file_path, sheet_name=None) text = "" for sheet_name, sheet_df in df.items(): text += f"\nSheet: {sheet_name}\n" text += sheet_df.to_csv(index=False, sep=' ', header=True) return self.create_chunks(text) def _read_csv(self, file_path: Path) -> str: """读取CSV文件""" df = pd.read_csv(file_path) return self.create_chunks(df.to_csv(index=False, sep=' ', header=True)) def _clean_text(self, text: str) -> str: """ 清理文本 - 移除多余的空白字符 - 标准化换行符 """ # 替换多个空格为单个空格 text = re.sub(r'\s+', ' ', text) # 标准化换行符 text = text.replace('\r\n', '\n').replace('\r', '\n') # 移除空行 text = '\n'.join(line.strip() for line in text.split('\n') if line.strip()) return text.strip() def split_into_sentences(self, text: str) -> List[str]: """ 将文本分割成句子 Args: text: 输入文本 Returns: List[str]: 句子列表 """ # 使用NLTK进行句子分割 sentences = sent_tokenize(text) return sentences def force_split_sentence(self, sentence: str, max_length: int) -> List[str]: """ 强制将超长句子按字符数切分 Args: sentence (str): 输入的句子 max_length (int): 最大长度 Returns: List[str]: 切分后的句子片段列表 """ # 使用标点符号作为次要切分点 punctuation = '。,;!?,.;!?' parts = [] current_part = '' # 优先在标点符号处切分 chars = list(sentence) for i, char in enumerate(chars): current_part += char # 如果当前部分达到最大长度或遇到标点符号 if (len(current_part) >= max_length and char in punctuation) or \ (len(current_part) >= max_length * 1.2): # 允许略微超过max_length以寻找标点 parts.append(current_part) current_part = '' # 处理剩余部分 if current_part: # 如果剩余部分仍然过长,强制按长度切分 while len(current_part) > max_length: parts.append(current_part[:max_length] + '...') current_part = '...' + current_part[max_length:] parts.append(current_part) return parts def split_text_nltk(self, text: str, chunk_size: int = 1500, overlap_size: int = 100) -> List[str]: """ 使用NLTK进行中文文本分割,支持文本块重叠和超长句子处理 Args: text (str): 输入的中文文本 chunk_size (int): 每个chunk的近似字符数 overlap_size (int): 相邻chunk之间的重叠字符数 Returns: List[str]: 分割后的文本块列表 """ text = self._clean_text(text) sentences = nltk.sent_tokenize(text) chunks = self.process_sentences(sentences=sentences, chunk_size=chunk_size, overlap_size=overlap_size) return chunks def split_text_spacy(self, text: str, chunk_size: int = 500, overlap_size: int = 100) -> List[str]: """ 使用SpaCy进行中文文本分割,支持文本块重叠和超长句子处理 Args: text (str): 输入的中文文本 chunk_size (int): 每个chunk的近似字符数 overlap_size (int): 相邻chunk之间的重叠字符数 Returns: List[str]: 分割后的文本块列表 """ text = self._clean_text(text) doc = nlp(text) chunks = [] sentences = [sent.text for sent in doc.sents] chunks = self.process_sentences(sentences=sentences, chunk_size=chunk_size, overlap_size=overlap_size) return chunks def process_sentences(self, sentences, chunk_size: int = 500, overlap_size: int = 100): chunks = [] current_chunk = [] current_length = 0 for sentence in sentences: # 处理超长句子 if len(sentence) > chunk_size: # 先处理当前chunk中已有的内容 if current_chunk: chunks.append("".join(current_chunk)) current_chunk = [] current_length = 0 # 强制切分超长句子 sentence_parts = self.force_split_sentence(sentence, chunk_size) for part in sentence_parts: chunks.append(part) continue # 正常处理普通长度的句子 if current_length + len(sentence) <= chunk_size: current_chunk.append(sentence) current_length += len(sentence) else: if current_chunk: chunks.append("".join(current_chunk)) # 处理重叠 overlap_chars = 0 overlap_sentences = [] for prev_sentence in reversed(current_chunk): if overlap_chars + len(prev_sentence) <= overlap_size: overlap_sentences.insert(0, prev_sentence) overlap_chars += len(prev_sentence) else: break current_chunk = overlap_sentences + [sentence] current_length = sum(len(s) for s in current_chunk) if current_chunk: chunks.append("".join(current_chunk)) return chunks def create_chunks(self, text: str, chunk_size=300, overlap_size=100) -> List[str]: is_chinese = self.is_chinese_text(text) if is_chinese: # print('检测为中文文章, 采用spacy') chunks = self.split_text_spacy(text,chunk_size=chunk_size,overlap_size=overlap_size) else: # print('检测为外文文章, 采用nltk') chunks = self.split_text_spacy(text,chunk_size=chunk_size,overlap_size=overlap_size) return chunks def is_chinese_text(self, text: str) -> bool: """ 判断文本是否主要为中文 Args: text (str): 输入文本 Returns: bool: 如果是中文文本返回True,否则返回False """ try: chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text)) total_chars = len(re.findall(r'\w', text)) + chinese_chars char_ratio = chinese_chars / max(total_chars, 1) if char_ratio > 0.1: return True # 使用langdetect进行语言检测 lang = detect(text) # 如果检测失败,使用备用方法 if not lang: raise Exception("Language detection failed") return lang == 'zh-cn' or lang == 'zh-tw' or lang == 'zh' except Exception: return char_ratio > 0.1 def process_document(self, file_path: Union[str, Path], chunk_size=1000, overlap_size=250) -> List[str]: """ 处理文档的主方法 Args: file_path: 文档路径 Returns: List[str]: 处理后的文本块列表 """ # 读取文档 text = self.read_file(file_path) chunks = self.create_chunks(text=text, chunk_size=chunk_size, overlap_size=overlap_size) # return chunks return chunks if __name__ == '__main__': import asyncio processor = DocumentProcessor() # 处理文档 chunks = processor.read_file("./tests/test.pdf") # 打印结果 # for i, chunk in enumerate(chunks): # print(f"Chunk {i+1}:") # print(chunk) # print(len(chunk)) # print("-" * 50) status = asyncio.run(update_mulvus_file(client_id='test', file_name='test.pdf',chunks=chunks))