123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325 |
- import PyPDF2
- import docx
- import nltk, subprocess, os
- from config import nltk_path, converter
- from typing import List, Union
- from pathlib import Path
- import re
- import pandas as pd
- from nltk.tokenize import sent_tokenize
- import spacy
- from langdetect import detect
- from milvus_process import update_mulvus_file
- import fitz
- from marker.output import text_from_rendered
- try:
- nltk.data.find(nltk_path)
- except LookupError:
- nltk.download('punkt')
- try:
- nlp = spacy.load("zh_core_web_sm")
- except OSError:
- pass
- class DocumentProcessor:
- def __init__(self,):
- pass
- def get_file_len(self, file_path: Union[str, Path]) -> int:
- text = self.read_file(file_path)
- length = len(text)
- del text
- return length
-
- @staticmethod
- def convert_docx_to_doc(input_path):
- output_dir = os.path.dirname(input_path)
- command = [
- "soffice", "--headless", "--convert-to", "docx", input_path, "--outdir", output_dir
- ]
- subprocess.run(command, check=True)
-
- def _read_doc(self, file_path) -> str:
- """读取Word文档"""
- self.convert_docx_to_doc(file_path)
- old_file = Path(file_path) # 原始 .doc 文件
- new_file = old_file.with_suffix(".docx") # 转换后的 .docx 文件
- if old_file.exists(): # 确保旧文件存在
- old_file.unlink() # 删除旧文件
- doc = docx.Document(new_file) # 读取 .docx
- text = "\n".join([paragraph.text for paragraph in doc.paragraphs])
- return self.create_chunks(text)
-
- def read_file(self, file_path: Union[str, Path]) -> str:
- """
- 读取不同格式的文档
-
- Args:
- file_path: 文件路径
-
- Returns:
- str: 提取的文本内容
- """
- file_path = Path(file_path)
- extension = file_path.suffix.lower()
-
- if extension == '.pdf':
- return self._read_pdf(file_path)
- elif extension == '.docx':
- return self._read_docx(file_path)
- elif extension == '.doc':
- return self._read_doc(file_path)
- elif extension == '.txt':
- return self._read_txt(file_path)
- elif extension == '.csv':
- return self._read_csv(file_path)
- elif extension == '.xlsx':
- return self._read_excel(file_path)
- else:
- raise ValueError(f"Unsupported file format: {extension}")
-
- def _read_pdf(self, file_path) -> str:
- """读取PDF文件"""
- rendered = converter(str(file_path))
- text, x, images = text_from_rendered(rendered)
- return self.create_chunks(text=text)
-
- def _read_docx(self, file_path: Path) -> str:
- """读取Word文档"""
- doc = docx.Document(file_path)
- text = "\n".join([paragraph.text for paragraph in doc.paragraphs])
- return self.create_chunks(text)
-
- def _read_txt(self, file_path: Path) -> str:
- """读取文本文件"""
- with open(file_path, 'r', encoding='utf-8') as file:
- return self.create_chunks(file.read())
-
- def _read_excel(self, file_path: Path) -> str:
- """读取Excel文件"""
- df = pd.read_excel(file_path, sheet_name=None)
- text = ""
- for sheet_name, sheet_df in df.items():
- text += f"\nSheet: {sheet_name}\n"
- text += sheet_df.to_csv(index=False, sep=' ', header=True)
- return self.create_chunks(text)
- def _read_csv(self, file_path: Path) -> str:
- """读取CSV文件"""
- df = pd.read_csv(file_path)
- return self.create_chunks(df.to_csv(index=False, sep=' ', header=True))
-
- def _clean_text(self, text: str) -> str:
- """
- 清理文本
- - 移除多余的空白字符
- - 标准化换行符
- """
- # 替换多个空格为单个空格
- text = re.sub(r'\s+', ' ', text)
- # 标准化换行符
- text = text.replace('\r\n', '\n').replace('\r', '\n')
- # 移除空行
- text = '\n'.join(line.strip() for line in text.split('\n') if line.strip())
- return text.strip()
-
- def split_into_sentences(self, text: str) -> List[str]:
- """
- 将文本分割成句子
-
- Args:
- text: 输入文本
-
- Returns:
- List[str]: 句子列表
- """
- # 使用NLTK进行句子分割
- sentences = sent_tokenize(text)
- return sentences
- def force_split_sentence(self, sentence: str, max_length: int) -> List[str]:
- """
- 强制将超长句子按字符数切分
-
- Args:
- sentence (str): 输入的句子
- max_length (int): 最大长度
-
- Returns:
- List[str]: 切分后的句子片段列表
- """
- # 使用标点符号作为次要切分点
- punctuation = '。,;!?,.;!?'
- parts = []
- current_part = ''
-
- # 优先在标点符号处切分
- chars = list(sentence)
- for i, char in enumerate(chars):
- current_part += char
-
- # 如果当前部分达到最大长度或遇到标点符号
- if (len(current_part) >= max_length and char in punctuation) or \
- (len(current_part) >= max_length * 1.2): # 允许略微超过max_length以寻找标点
- parts.append(current_part)
- current_part = ''
-
- # 处理剩余部分
- if current_part:
- # 如果剩余部分仍然过长,强制按长度切分
- while len(current_part) > max_length:
- parts.append(current_part[:max_length] + '...')
- current_part = '...' + current_part[max_length:]
- parts.append(current_part)
-
- return parts
- def split_text_nltk(self, text: str, chunk_size: int = 1500, overlap_size: int = 100) -> List[str]:
- """
- 使用NLTK进行中文文本分割,支持文本块重叠和超长句子处理
-
- Args:
- text (str): 输入的中文文本
- chunk_size (int): 每个chunk的近似字符数
- overlap_size (int): 相邻chunk之间的重叠字符数
-
- Returns:
- List[str]: 分割后的文本块列表
- """
- text = self._clean_text(text)
- sentences = nltk.sent_tokenize(text)
- chunks = self.process_sentences(sentences=sentences, chunk_size=chunk_size, overlap_size=overlap_size)
- return chunks
- def split_text_spacy(self, text: str, chunk_size: int = 500, overlap_size: int = 100) -> List[str]:
- """
- 使用SpaCy进行中文文本分割,支持文本块重叠和超长句子处理
-
- Args:
- text (str): 输入的中文文本
- chunk_size (int): 每个chunk的近似字符数
- overlap_size (int): 相邻chunk之间的重叠字符数
-
- Returns:
- List[str]: 分割后的文本块列表
- """
- text = self._clean_text(text)
- doc = nlp(text)
- chunks = []
- sentences = [sent.text for sent in doc.sents]
- chunks = self.process_sentences(sentences=sentences, chunk_size=chunk_size, overlap_size=overlap_size)
- return chunks
- def process_sentences(self, sentences, chunk_size: int = 500, overlap_size: int = 100):
- chunks = []
- current_chunk = []
- current_length = 0
-
- for sentence in sentences:
- # 处理超长句子
- if len(sentence) > chunk_size:
- # 先处理当前chunk中已有的内容
- if current_chunk:
- chunks.append("".join(current_chunk))
- current_chunk = []
- current_length = 0
-
- # 强制切分超长句子
- sentence_parts = self.force_split_sentence(sentence, chunk_size)
- for part in sentence_parts:
- chunks.append(part)
- continue
-
- # 正常处理普通长度的句子
- if current_length + len(sentence) <= chunk_size:
- current_chunk.append(sentence)
- current_length += len(sentence)
- else:
- if current_chunk:
- chunks.append("".join(current_chunk))
-
- # 处理重叠
- overlap_chars = 0
- overlap_sentences = []
- for prev_sentence in reversed(current_chunk):
- if overlap_chars + len(prev_sentence) <= overlap_size:
- overlap_sentences.insert(0, prev_sentence)
- overlap_chars += len(prev_sentence)
- else:
- break
-
- current_chunk = overlap_sentences + [sentence]
- current_length = sum(len(s) for s in current_chunk)
-
- if current_chunk:
- chunks.append("".join(current_chunk))
-
- return chunks
- def create_chunks(self, text: str, chunk_size=300, overlap_size=100) -> List[str]:
- is_chinese = self.is_chinese_text(text)
- if is_chinese:
- # print('检测为中文文章, 采用spacy')
- chunks = self.split_text_spacy(text,chunk_size=chunk_size,overlap_size=overlap_size)
- else:
- # print('检测为外文文章, 采用nltk')
- chunks = self.split_text_spacy(text,chunk_size=chunk_size,overlap_size=overlap_size)
- return chunks
-
- def is_chinese_text(self, text: str) -> bool:
- """
- 判断文本是否主要为中文
-
- Args:
- text (str): 输入文本
-
- Returns:
- bool: 如果是中文文本返回True,否则返回False
- """
- try:
- chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text))
- total_chars = len(re.findall(r'\w', text)) + chinese_chars
- char_ratio = chinese_chars / max(total_chars, 1)
- if char_ratio > 0.1:
- return True
- # 使用langdetect进行语言检测
- lang = detect(text)
- # 如果检测失败,使用备用方法
- if not lang:
- raise Exception("Language detection failed")
-
- return lang == 'zh-cn' or lang == 'zh-tw' or lang == 'zh'
-
- except Exception:
- return char_ratio > 0.1
- def process_document(self, file_path: Union[str, Path], chunk_size=1000, overlap_size=250) -> List[str]:
- """
- 处理文档的主方法
-
- Args:
- file_path: 文档路径
-
- Returns:
- List[str]: 处理后的文本块列表
- """
- # 读取文档
- text = self.read_file(file_path)
- chunks = self.create_chunks(text=text, chunk_size=chunk_size, overlap_size=overlap_size)
- # return chunks
- return chunks
- if __name__ == '__main__':
- import asyncio
- processor = DocumentProcessor()
- # 处理文档
- chunks = processor.read_file("./tests/test.pdf")
- # 打印结果
- # for i, chunk in enumerate(chunks):
- # print(f"Chunk {i+1}:")
- # print(chunk)
- # print(len(chunk))
- # print("-" * 50)
- status = asyncio.run(update_mulvus_file(client_id='test', file_name='test.pdf',chunks=chunks))
|