ptchecker / pdf_processing.py
viboognesh-doaz
create new vectorstore each time
4b993b3
raw
history blame
6.23 kB
from PyPDF2 import PdfReader
import pymupdf
import numpy as np
import cv2
import shutil
import imageio
from PIL import Image
import imagehash
import tempfile
import os
from llama_index.core.indices import MultiModalVectorStoreIndex
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.core import SimpleDirectoryReader, StorageContext
from awsfunctions import upload_folder_to_s3, check_file_exists_in_s3, download_folder_from_s3, delete_s3_folder
import qdrant_client
import streamlit as st
def extract_text_from_pdf(pdf_path):
reader = PdfReader(pdf_path)
full_text = ''
for page in reader.pages:
text = page.extract_text()
full_text += text
return full_text
def extract_images_from_pdf(pdf_path, img_save_path):
doc = pymupdf.open(pdf_path)
for page in doc:
img_number = 0
for block in page.get_text("dict")["blocks"]:
if block['type'] == 1:
name = os.path.join(img_save_path, f"img{page.number}-{img_number}.{block['ext']}")
out = open(name, "wb")
out.write(block["image"])
out.close()
img_number += 1
def is_empty(img_path):
image = cv2.imread(img_path, 0)
std_dev = np.std(image)
return std_dev < 1
def move_images(source_folder, dest_folder):
image_files = [f for f in os.listdir(source_folder)
if f.lower().endswith(('.jpg', '.jpeg', '.png', '.gif'))]
os.makedirs(dest_folder, exist_ok=True)
moved_count = 0
for file in image_files:
src_path = os.path.join(source_folder, file)
if not is_empty(src_path):
shutil.move(src_path, os.path.join(dest_folder, file))
moved_count += 1
return moved_count
def remove_low_size_images(data_path):
images_list = os.listdir(data_path)
low_size_photo_list = []
for one_image in images_list:
image_path = os.path.join(data_path, one_image)
try:
pic = imageio.imread(image_path)
size = pic.size
if size < 100:
low_size_photo_list.append(one_image)
except:
pass
for one_image in low_size_photo_list[1:]:
os.remove(os.path.join(data_path, one_image))
def calc_diff(img1 , img2) :
i1 = Image.open(img1)
i2 = Image.open(img2)
h1 = imagehash.phash(i1)
h2 = imagehash.phash(i2)
return h1 - h2
def remove_duplicate_images(data_path) :
image_files = os.listdir(data_path)
only_images = []
for one_image in image_files :
if one_image.endswith('jpeg') or one_image.endswith('png') or one_image.endswith('jpg') :
only_images.append(one_image)
only_images1 = sorted(only_images)
for one_image in only_images1 :
for another_image in only_images1 :
try :
if one_image == another_image :
continue
else :
diff = calc_diff(os.path.join(data_path ,one_image) , os.path.join(data_path ,another_image))
if diff ==0 :
os.remove(os.path.join(data_path , another_image))
except Exception as e:
print(e)
pass
# from langchain_chroma import Chroma
# import chromadb
def initialize_qdrant(temp_dir , aws_prefix):
client = qdrant_client.QdrantClient(path=os.path.join(temp_dir, "qdrant"))
text_store = QdrantVectorStore( client = client , collection_name=f"text_collection" )
image_store = QdrantVectorStore(client = client , collection_name=f"image_collection")
storage_context = StorageContext.from_defaults(vector_store=text_store, image_store=image_store)
documents = SimpleDirectoryReader(os.path.join(temp_dir, f"data")).load_data()
for doc in documents:
doc.metadata["file_path"] = os.path.join(aws_prefix, os.path.relpath(doc.metadata["file_path"], temp_dir))
index = MultiModalVectorStoreIndex.from_documents(documents, storage_context=storage_context)
retriever_engine = index.as_retriever(similarity_top_k=1, image_similarity_top_k=1)
return retriever_engine
def process_pdf(pdf_file):
username = "ptchecker"
aws_prefix_path = os.path.join(os.getenv("FOLDER_PREFIX"), username, "FILES", os.path.splitext(pdf_file.name)[0])
if check_file_exists_in_s3(os.path.join(aws_prefix_path, pdf_file.name)):
delete_s3_folder(aws_prefix_path)
# temp_dir = tempfile.mkdtemp()
# download_folder_from_s3(local_folder=temp_dir, aws_folder_prefix=os.path.join(aws_prefix_path, "qdrant"))
# client = qdrant_client.QdrantClient(path=os.path.join(temp_dir, "qdrant"))
# image_store = QdrantVectorStore(client = client , collection_name=f"image_collection")
# text_store = QdrantVectorStore(client = client , collection_name=f"text_collection")
# index = MultiModalVectorStoreIndex.from_vector_store(vector_store=text_store, image_store=image_store)
# retriever_engine = index.as_retriever(similarity_top_k=1, image_similarity_top_k=1)
# shutil.rmtree(temp_dir)
# return retriever_engine
temp_dir = tempfile.mkdtemp()
temp_pdf_path = os.path.join(temp_dir, pdf_file.name)
with open(temp_pdf_path, "wb") as f:
f.write(pdf_file.getvalue())
data_path = os.path.join(temp_dir, "data")
os.makedirs(data_path , exist_ok=True)
img_save_path = os.path.join(temp_dir, "images")
os.makedirs(img_save_path , exist_ok=True)
extracted_text = extract_text_from_pdf(temp_pdf_path)
with open(os.path.join(data_path, "content.txt"), "w") as file:
file.write(extracted_text)
extract_images_from_pdf(temp_pdf_path, img_save_path)
moved_count = move_images(img_save_path, data_path)
print("Images moved count : ", moved_count)
remove_low_size_images(data_path)
remove_duplicate_images(data_path)
shutil.rmtree(img_save_path)
retriever_engine = initialize_qdrant(temp_dir=temp_dir, aws_prefix=aws_prefix_path) # os.path.join("folder" , os.path.splitext(pdf_file.name)[0] , unique_folder_name)
upload_folder_to_s3(temp_dir, aws_prefix_path)
shutil.rmtree(temp_dir)
return retriever_engine