Spaces:
Sleeping
Sleeping
from llama_parse import LlamaParse | |
from llama_index.core import SimpleDirectoryReader | |
import os | |
from dotenv import load_dotenv | |
load_dotenv() | |
import tempfile | |
import requests | |
import streamlit as st | |
import boto3 | |
from botocore.exceptions import ClientError, NoCredentialsError | |
def check_pdf(read_file_path): | |
try: | |
parser = LlamaParse(result_type="markdown", api_key=os.environ['LLAMA_CLOUD_API_KEY'], ignore_errors=False) | |
file_extractor = {".pdf": parser} | |
markdown_data = SimpleDirectoryReader(input_files=[read_file_path], file_extractor=file_extractor).load_data() | |
if markdown_data == []: | |
st.error('No markdown data found') | |
else: | |
st.success('File Parsed successfully') | |
except Exception as e: | |
st.error(f"An error occurred: {e}") | |
def download_file_from_url(url, filename): | |
st.markdown(f"Downloading file from {url} to {filename}") | |
os.makedirs(os.path.dirname(filename), exist_ok=True) | |
response = requests.get(url, stream=True) | |
if response.status_code == 200: | |
with open(filename, 'wb') as file: | |
for chunk in response.iter_content(chunk_size=1024): | |
file.write(chunk) | |
st.markdown(f"File downloaded and saved as {filename}") | |
else: | |
st.error(f"Failed to download file. Status code: {response.status_code}") | |
url = st.text_input("Enter URL", key="url") | |
if url: | |
with tempfile.TemporaryDirectory() as temp_dir: | |
download_file_from_url(url, os.path.join(temp_dir, "task_for_you.pdf")) | |
check_pdf(os.path.join(temp_dir, "task_for_you.pdf")) | |
def download_files_from_s3(bucket_name, local_folder, file_path_list): | |
s3 = boto3.client('s3') | |
folder_prefix = '' | |
try: | |
# List objects in the S3 bucket | |
paginator = s3.get_paginator('list_objects_v2') | |
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=folder_prefix) | |
# Download filtered files | |
for page in page_iterator: | |
for obj in page.get('Contents', []): | |
key = obj['Key'] | |
# Apply file filter if specified | |
if key not in file_path_list: | |
continue | |
# Construct local file path | |
local_path = os.path.join(local_folder, key) | |
os.makedirs(os.path.dirname(local_path), exist_ok=True) | |
try: | |
st.markdown(f"Downloading: {key} -> {local_path}") | |
s3.download_file(bucket_name, key, local_path) | |
st.markdown(f"Downloaded: {local_path}") | |
except Exception as e: | |
st.error(f"Error downloading {key}: {e}") | |
for path in file_path_list: | |
if not os.path.isfile(os.path.join(local_folder, path)): | |
st.error(f"Failed to download file {path}") | |
except NoCredentialsError: | |
st.error("No AWS credentials found.") | |
except Exception as e: | |
st.error(f"An error occurred: {e}") | |
bucket_name = st.text_input("Enter bucket name", key="bucket_name") | |
key = st.text_input("Enter key", key="key") | |
if st.button("Submit"): | |
with tempfile.TemporaryDirectory() as temp_dir: | |
download_files_from_s3(bucket_name, temp_dir, [key]) | |
file_name = os.path.join(temp_dir, key) | |
check_pdf(os.path.join(temp_dir, file_name)) | |