Spaces:
Sleeping
Sleeping
File size: 6,847 Bytes
b075822 5e32aa7 b075822 5e32aa7 b075822 fbd44bb b075822 fbd44bb b075822 fbd44bb b075822 fbd44bb b075822 fbd44bb b075822 fbd44bb b075822 fbd44bb b075822 fbd44bb b075822 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
import os
def upload_folder_to_s3(local_dir, prefix=''):
s3_bucket = os.getenv("AWS_BUCKET_NAME")
s3_client = boto3.client('s3')
for root, dirs, files in os.walk(local_dir):
for dir in dirs:
dir_path = os.path.join(root, dir)
relative_path = os.path.relpath(dir_path, local_dir)
# Create the directory in S3 if it doesn't exist
try:
s3_client.put_object(Bucket=s3_bucket, Key=os.path.join(prefix, relative_path))
except ClientError as e:
if e.response['Error']['Code'] == '404':
continue # Directory already exists
else:
raise
for file in files:
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, local_dir)
try:
s3_client.upload_file(file_path, s3_bucket, os.path.join(prefix, relative_path))
print(f"Uploaded: {file_path} -> s3://{s3_bucket}/{os.path.join(prefix, relative_path)}")
except Exception as e:
raise e
# print(f"Error uploading {file_path}: {e}")
def check_file_exists_in_s3(file_path):
bucket_name = os.getenv("AWS_BUCKET_NAME")
s3_client = boto3.client('s3')
try:
s3_client.head_object(Bucket=bucket_name, Key=file_path)
return True
except ClientError as e:
if e.response['Error']['Code'] == '404':
return False
else:
raise e
def download_files_from_s3(local_folder, file_path_list):
s3 = boto3.client('s3')
bucket_name = os.getenv("AWS_BUCKET_NAME")
folder_prefix = ''
try:
# List objects in the S3 bucket
paginator = s3.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=folder_prefix)
# Download filtered files
for page in page_iterator:
for obj in page.get('Contents', []):
key = obj['Key']
# Apply file filter if specified
if key not in file_path_list:
continue
# Construct local file path
local_path = os.path.join(local_folder, key)
os.makedirs(os.path.dirname(local_path), exist_ok=True)
try:
print(f"Downloading: {key} -> {local_path}")
s3.download_file(bucket_name, key, local_path)
print(f"Downloaded: {local_path}")
except Exception as e:
print(f"Error downloading {key}: {e}")
except NoCredentialsError:
print("No AWS credentials found.")
except Exception as e:
print(f"An error occurred: {e}")
def download_folder_from_s3(local_folder, aws_folder_prefix):
s3 = boto3.client('s3')
bucket_name = os.getenv("AWS_BUCKET_NAME")
if not bucket_name:
raise ValueError("AWS_BUCKET_NAME environment variable is not set")
try:
# Create the local folder if it doesn't exist
os.makedirs(local_folder, exist_ok=True)
# List objects in the S3 bucket
paginator = s3.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=aws_folder_prefix)
# Process objects
for page in page_iterator:
for obj in page.get('Contents', []):
key = obj['Key']
# Determine if it's a file or directory
if obj['Size'] == 0:
continue
# Construct local file path
local_path = os.path.join(local_folder, os.path.relpath(key, aws_folder_prefix))
os.makedirs(os.path.dirname(local_path), exist_ok=True)
try:
print(f"Downloading: {key} -> {local_path}")
s3.download_file(bucket_name, key, local_path)
print(f"Downloaded: {local_path}")
except ClientError as e:
if e.response['Error']['Code'] == 'AccessDenied':
print(f"Permission denied when trying to download {key}: {e}")
elif e.response['Error']['Code'] == 'NoSuchKey':
print(f"The object {key} does not exist in the bucket.")
elif e.response['Error']['Code'] == "":
pass
else:
print(f"An error occurred while downloading {key}: {e}")
raise e
except Exception as e:
print(f"An unexpected error occurred : {e}")
def delete_s3_folder(folder_path):
bucket_name = os.getenv("AWS_BUCKET_NAME")
s3_client = boto3.client('s3')
try:
# List objects in the S3 bucket
paginator = s3_client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=folder_path)
# Delete objects within the folder_path
delete_keys = {'Objects': []}
for page in page_iterator:
for obj in page.get('Contents', []):
key = obj['Key']
# Construct the full key for deletion
delete_key = {'Key': key}
delete_keys['Objects'].append(delete_key)
print(f"Deleting: {key}")
# Perform batch delete operation
if len(delete_keys['Objects']) > 0:
s3_client.delete_objects(Bucket=bucket_name, Delete=delete_keys)
print(f"Deleted {len(delete_keys['Objects'])} objects in folder '{folder_path}'")
else:
print(f"No objects found in folder '{folder_path}'")
except ClientError as e:
print(f"An error occurred: {e}")
def list_s3_objects(prefix=''):
bucket_name = os.getenv("AWS_BUCKET_NAME")
s3_client = boto3.client('s3')
try:
paginator = s3_client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
for page in page_iterator:
for obj in page.get('Contents', []):
print(f"Key: {obj['Key']}")
print(f"Size: {obj['Size']} bytes")
print(f"Last Modified: {obj['LastModified']}")
print(f"ETag: {obj['ETag']}")
print(f"File Extension: {os.path.splitext(obj['Key'])[-1]}")
print("---")
except ClientError as e:
print(f"An error occurred: {e}")
|