FaYo
model
d8d694f
raw
history blame
7.28 kB
import hashlib
import os
import pandas as pd
from bs4 import BeautifulSoup
from loguru import logger
class FileName:
"""Record file original name, state and copied filepath with text
format."""
def __init__(self, root: str, filename: str, _type: str):
self.root = root
self.prefix = filename.replace("/", "_")
self.basename = os.path.basename(filename)
self.origin = os.path.join(root, filename)
self.copypath = ""
self._type = _type
self.state = True
self.reason = ""
def __str__(self):
return "{},{},{},{}\n".format(self.basename, self.copypath, self.state, self.reason)
class FileOperation:
"""Encapsulate all file reading operations."""
def __init__(self):
self.image_suffix = [".jpg", ".jpeg", ".png", ".bmp"]
self.md_suffix = ".md"
self.text_suffix = [".txt", ".text"]
self.excel_suffix = [".xlsx", ".xls", ".csv"]
self.pdf_suffix = ".pdf"
self.ppt_suffix = ".pptx"
self.html_suffix = [".html", ".htm", ".shtml", ".xhtml"]
self.word_suffix = [".docx", ".doc"]
self.normal_suffix = (
[self.md_suffix]
+ self.text_suffix
+ self.excel_suffix
+ [self.pdf_suffix]
+ self.word_suffix
+ [self.ppt_suffix]
+ self.html_suffix
)
def get_type(self, filepath: str):
filepath = filepath.lower()
if filepath.endswith(self.pdf_suffix):
return "pdf"
if filepath.endswith(self.md_suffix):
return "md"
if filepath.endswith(self.ppt_suffix):
return "ppt"
for suffix in self.image_suffix:
if filepath.endswith(suffix):
return "image"
for suffix in self.text_suffix:
if filepath.endswith(suffix):
return "text"
for suffix in self.word_suffix:
if filepath.endswith(suffix):
return "word"
for suffix in self.excel_suffix:
if filepath.endswith(suffix):
return "excel"
for suffix in self.html_suffix:
if filepath.endswith(suffix):
return "html"
return None
def md5(self, filepath: str):
hash_object = hashlib.sha256()
with open(filepath, "rb") as file:
chunk_size = 8192
while chunk := file.read(chunk_size):
hash_object.update(chunk)
return hash_object.hexdigest()[0:8]
def summarize(self, files: list):
success = 0
skip = 0
failed = 0
for file in files:
if file.state:
success += 1
elif file.reason == "skip":
skip += 1
else:
logger.info("{} {}".format(file.origin, file.reason))
failed += 1
logger.info("{} {}".format(file.reason, file.copypath))
logger.info("累计{}文件,成功{}个,跳过{}个,异常{}个".format(len(files), success, skip, failed))
def scan_dir(self, repo_dir: str):
files = []
for root, _, filenames in os.walk(repo_dir):
for filename in filenames:
_type = self.get_type(filename)
if _type is not None:
files.append(FileName(root=root, filename=filename, _type=_type))
return files
def read_pdf(self, filepath: str):
# load pdf and serialize table
# TODO fitz 安装有些不兼容,后续按需完善
import fitz
text = ""
with fitz.open(filepath) as pages:
for page in pages:
text += page.get_text()
tables = page.find_tables()
for table in tables:
tablename = "_".join(filter(lambda x: x is not None and "Col" not in x, table.header.names))
pan = table.to_pandas()
json_text = pan.dropna(axis=1).to_json(force_ascii=False)
text += tablename
text += "\n"
text += json_text
text += "\n"
return text
def read_excel(self, filepath: str):
table = None
if filepath.endswith(".csv"):
table = pd.read_csv(filepath)
else:
table = pd.read_excel(filepath)
if table is None:
return ""
json_text = table.dropna(axis=1).to_json(force_ascii=False)
return json_text
def read(self, filepath: str):
file_type = self.get_type(filepath)
text = ""
if not os.path.exists(filepath):
return text, None
try:
if file_type == "md" or file_type == "text":
with open(filepath) as f:
text = f.read()
elif file_type == "pdf":
text += self.read_pdf(filepath)
elif file_type == "excel":
text += self.read_excel(filepath)
elif file_type == "word" or file_type == "ppt":
# https://stackoverflow.com/questions/36001482/read-doc-file-with-python
# https://textract.readthedocs.io/en/latest/installation.html
# TODO textract 在 pip 高于 24.1 后安装不了,因为其库自身原因,后续按需进行完善
# 可自行安装 pip install textract==1.6.5
import textract # for word and ppt
text = textract.process(filepath).decode("utf8")
if file_type == "ppt":
text = text.replace("\n", " ")
elif file_type == "html":
with open(filepath) as f:
soup = BeautifulSoup(f.read(), "html.parser")
text += soup.text
except Exception as e:
logger.error((filepath, str(e)))
return "", e
text = text.replace("\n\n", "\n")
text = text.replace("\n\n", "\n")
text = text.replace("\n\n", "\n")
text = text.replace(" ", " ")
text = text.replace(" ", " ")
text = text.replace(" ", " ")
return text, None
if __name__ == "__main__":
def get_pdf_files(directory):
pdf_files = []
# 遍历目录
for root, dirs, files in os.walk(directory):
for file in files:
# 检查文件扩展名是否为.pdf
if file.lower().endswith(".pdf"):
# 将完整路径添加到列表中
pdf_files.append(os.path.abspath(os.path.join(root, file)))
return pdf_files
# 将你想要搜索的目录替换为下面的路径
pdf_list = get_pdf_files("/home/khj/huixiangdou-web-online-data/hxd-bad-file")
# 打印所有找到的PDF文件的绝对路径
opr = FileOperation()
for pdf_path in pdf_list:
text, error = opr.read(pdf_path)
print("processing {}".format(pdf_path))
if error is not None:
# pdb.set_trace()
print("")
else:
if text is not None:
print(len(text))
else:
# pdb.set_trace()
print("")