|
import multiprocessing |
|
import os |
|
import random |
|
import shutil |
|
import tempfile |
|
import zipfile |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
from datetime import datetime |
|
|
|
import fitz |
|
import gradio as gr |
|
from huggingface_hub import DatasetCard, DatasetCardData, HfApi |
|
from PIL import Image |
|
|
|
from dataset_card_template import DATASET_CARD_TEMPLATE |
|
|
|
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" |
|
|
|
|
|
CPU_COUNT = multiprocessing.cpu_count() |
|
MAX_WORKERS = min(32, CPU_COUNT) |
|
|
|
|
|
def process_pdf(pdf_file, sample_percentage, temp_dir): |
|
try: |
|
pdf_path = pdf_file.name |
|
doc = fitz.open(pdf_path) |
|
total_pages = len(doc) |
|
|
|
pages_to_convert = int(total_pages * (sample_percentage / 100)) |
|
pages_to_convert = max( |
|
1, min(pages_to_convert, total_pages) |
|
) |
|
|
|
selected_pages = ( |
|
sorted(random.sample(range(total_pages), pages_to_convert)) |
|
if 0 < sample_percentage < 100 |
|
else range(total_pages) |
|
) |
|
|
|
images = [] |
|
for page_num in selected_pages: |
|
page = doc[page_num] |
|
pix = page.get_pixmap() |
|
image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
|
image_path = os.path.join( |
|
temp_dir, f"{os.path.basename(pdf_path)}_page_{page_num+1}.jpg" |
|
) |
|
image.save(image_path, "JPEG", quality=85, optimize=True) |
|
images.append(image_path) |
|
|
|
doc.close() |
|
return images, None, len(images) |
|
except Exception as e: |
|
return [], f"Error processing {pdf_file.name}: {str(e)}", 0 |
|
|
|
|
|
def pdf_to_images(pdf_files, sample_percentage, temp_dir, progress=gr.Progress()): |
|
if not os.path.exists(temp_dir): |
|
os.makedirs(temp_dir) |
|
|
|
progress(0, desc="Starting conversion") |
|
all_images = [] |
|
skipped_pdfs = [] |
|
|
|
total_pages = sum(len(fitz.open(pdf.name)) for pdf in pdf_files) |
|
processed_pages = 0 |
|
|
|
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: |
|
future_to_pdf = { |
|
executor.submit(process_pdf, pdf, sample_percentage, temp_dir): pdf |
|
for pdf in pdf_files |
|
} |
|
|
|
for future in as_completed(future_to_pdf): |
|
pdf = future_to_pdf[future] |
|
images, error, pages_processed = future.result() |
|
if error: |
|
skipped_pdfs.append(error) |
|
gr.Info(error) |
|
else: |
|
all_images.extend(images) |
|
|
|
processed_pages += pages_processed |
|
progress((processed_pages / total_pages), desc=f"Processing {pdf.name}") |
|
|
|
message = f"Saved {len(all_images)} images to temporary directory" |
|
if skipped_pdfs: |
|
message += f"\nSkipped {len(skipped_pdfs)} PDFs due to errors: {', '.join(skipped_pdfs)}" |
|
return all_images, message |
|
|
|
|
|
def get_size_category(num_images): |
|
if num_images < 1000: |
|
return "n<1K" |
|
elif num_images < 10000: |
|
return "1K<n<10K" |
|
elif num_images < 100000: |
|
return "10K<n<100K" |
|
elif num_images < 1000000: |
|
return "100K<n<1M" |
|
else: |
|
return "n>1M" |
|
|
|
|
|
def process_pdfs( |
|
pdf_files, |
|
sample_percentage, |
|
hf_repo, |
|
create_zip, |
|
private_repo, |
|
oauth_token: gr.OAuthToken | None, |
|
progress=gr.Progress(), |
|
): |
|
if not pdf_files: |
|
return ( |
|
None, |
|
None, |
|
gr.Markdown( |
|
"⚠️ No PDF files uploaded. Please upload at least one PDF file." |
|
), |
|
) |
|
|
|
if oauth_token is None: |
|
return ( |
|
None, |
|
None, |
|
gr.Markdown( |
|
"⚠️ Not logged in to Hugging Face. Please log in to upload to a Hugging Face dataset." |
|
), |
|
) |
|
|
|
try: |
|
temp_dir = tempfile.mkdtemp() |
|
images_dir = os.path.join(temp_dir, "images") |
|
os.makedirs(images_dir) |
|
|
|
progress(0, desc="Starting PDF processing") |
|
images, message = pdf_to_images(pdf_files, sample_percentage, images_dir) |
|
|
|
|
|
sampled_images_dir = os.path.join(temp_dir, "sampled_images") |
|
os.makedirs(sampled_images_dir) |
|
|
|
|
|
updated_images = [] |
|
for image in images: |
|
new_path = os.path.join(sampled_images_dir, os.path.basename(image)) |
|
shutil.move(image, new_path) |
|
updated_images.append(new_path) |
|
|
|
|
|
images = updated_images |
|
|
|
zip_path = None |
|
if create_zip: |
|
|
|
zip_path = os.path.join(temp_dir, "converted_images.zip") |
|
with zipfile.ZipFile(zip_path, "w") as zipf: |
|
progress(0, desc="Zipping images") |
|
for image in progress.tqdm(images, desc="Zipping images"): |
|
zipf.write( |
|
os.path.join(sampled_images_dir, os.path.basename(image)), |
|
os.path.basename(image), |
|
) |
|
message += f"\nCreated zip file with {len(images)} images" |
|
|
|
if hf_repo: |
|
try: |
|
hf_api = HfApi(token=oauth_token.token) |
|
hf_api.create_repo( |
|
hf_repo, |
|
repo_type="dataset", |
|
private=private_repo, |
|
) |
|
|
|
hf_api.upload_folder( |
|
folder_path=sampled_images_dir, |
|
repo_id=hf_repo, |
|
repo_type="dataset", |
|
path_in_repo="images", |
|
) |
|
|
|
|
|
size_category = get_size_category(len(images)) |
|
|
|
|
|
card_data = DatasetCardData( |
|
tags=["created-with-pdfs-to-page-images-converter", "pdf-to-image"], |
|
size_categories=[size_category], |
|
) |
|
|
|
|
|
card = DatasetCard.from_template( |
|
card_data, |
|
template_path=None, |
|
hf_repo=hf_repo, |
|
num_images=len(images), |
|
num_pdfs=len(pdf_files), |
|
sample_size=sample_percentage |
|
if sample_percentage > 0 |
|
else "All pages", |
|
creation_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
) |
|
|
|
|
|
card.text = DATASET_CARD_TEMPLATE.format( |
|
hf_repo=hf_repo, |
|
num_images=len(images), |
|
num_pdfs=len(pdf_files), |
|
sample_size=sample_percentage |
|
if sample_percentage > 0 |
|
else "All pages", |
|
creation_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
size_category=size_category, |
|
) |
|
|
|
repo_url = f"https://huggingface.co/datasets/{hf_repo}" |
|
message += f"\nUploaded dataset card to Hugging Face repo: [{hf_repo}]({repo_url})" |
|
|
|
card.push_to_hub(hf_repo, token=oauth_token.token) |
|
except Exception as e: |
|
message += f"\nFailed to upload to Hugging Face: {str(e)}" |
|
|
|
return images, zip_path, message |
|
except Exception as e: |
|
if "temp_dir" in locals(): |
|
shutil.rmtree(temp_dir) |
|
return None, None, f"An error occurred: {str(e)}" |
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.HTML( |
|
"""<h1 style='text-align: center;'> PDFs to Page Images Converter</h1> |
|
<center><i> 📁 Convert PDFs to an image dataset, splitting pages into individual images 📁 </i></center>""" |
|
) |
|
gr.Markdown( |
|
""" |
|
This app allows you to: |
|
1. Upload one or more PDF files |
|
2. Convert each page of the PDFs into separate image files |
|
3. (Optionally) sample a specific number of pages from each PDF |
|
4. (Optionally) Create a downloadable ZIP file of the converted images |
|
5. (Optionally) Upload the images to a Hugging Face dataset repository |
|
""" |
|
) |
|
|
|
with gr.Row(): |
|
gr.LoginButton(size="sm") |
|
|
|
with gr.Row(): |
|
pdf_files = gr.File( |
|
file_count="multiple", label="Upload PDF(s)", file_types=["*.pdf"] |
|
) |
|
with gr.Row(): |
|
sample_percentage = gr.Slider( |
|
minimum=0, |
|
maximum=100, |
|
value=100, |
|
step=1, |
|
label="Percentage of pages to sample per PDF", |
|
info="0% for no sampling (all pages), 100% for all pages", |
|
) |
|
hf_repo = gr.Textbox( |
|
label="Hugging Face Repo", |
|
placeholder="username/repo-name", |
|
info="Enter the Hugging Face repository name in the format 'username/repo-name'", |
|
) |
|
with gr.Row(): |
|
create_zip = gr.Checkbox(label="Create ZIP file of images?", value=False) |
|
private_repo = gr.Checkbox(label="Make repository private?", value=False) |
|
with gr.Accordion("View converted images", open=False): |
|
output_gallery = gr.Gallery(label="Converted Images") |
|
status_text = gr.Markdown(label="Status") |
|
download_button = gr.File(label="Download Converted Images") |
|
|
|
submit_button = gr.Button("Convert PDFs to page images") |
|
submit_button.click( |
|
process_pdfs, |
|
inputs=[pdf_files, sample_percentage, hf_repo, create_zip, private_repo], |
|
outputs=[output_gallery, download_button, status_text], |
|
) |
|
|
|
|
|
demo.launch(debug=True) |
|
|