rosacastillo's picture
updating cloud storage scrip
32faccc
raw
history blame
2.65 kB
from minio import Minio
from minio.error import S3Error
import os
import argparse
from utils import HIST_DIR
MINIO_ENDPOINT = "minio.autonolas.tech"
ACCESS_KEY = os.environ.get("CLOUD_ACCESS_KEY", None)
SECRET_KEY = os.environ.get("CLOUD_SECRET_KEY", None)
BUCKET_NAME = "weekly-stats"
FOLDER_NAME = "historical_data"
def initialize_client():
# Initialize the MinIO client
client = Minio(
MINIO_ENDPOINT,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
secure=True, # Set to False if not using HTTPS
)
return client
def upload_file(client, filename: str, file_path: str):
"""Upload a file to the bucket"""
try:
OBJECT_NAME = FOLDER_NAME + "/" + filename
print(
f"filename={filename}, object_name={OBJECT_NAME} and file_path={file_path}"
)
client.fput_object(
BUCKET_NAME, OBJECT_NAME, file_path, part_size=10 * 1024 * 1024
) # 10MB parts
print(f"File '{file_path}' uploaded as '{OBJECT_NAME}'.")
except S3Error as err:
print(f"Error uploading file: {err}")
def download_file(client, filename: str, file_path: str):
"""Download the file back"""
try:
OBJECT_NAME = FOLDER_NAME + "/" + filename
client.fget_object(BUCKET_NAME, OBJECT_NAME, "downloaded_" + file_path)
print(f"File '{OBJECT_NAME}' downloaded as 'downloaded_{file_path}'.")
except S3Error as err:
print(f"Error downloading file: {err}")
def load_historical_file(client, filename: str):
"""Function to load one file into the cloud storage"""
file_path = filename
file_path = HIST_DIR / filename
upload_file(client, filename, file_path)
def process_historical_files(client):
"""Process all parquet files in historical_data folder"""
# Walk through all files in the folder
for filename in os.listdir(HIST_DIR):
# Check if file is a parquet file
if filename.endswith(".parquet"):
try:
load_historical_file(client, filename)
print(f"Successfully processed {filename}")
except Exception as e:
print(f"Error processing {filename}: {str(e)}")
if __name__ == "__main__":
# parser = argparse.ArgumentParser(
# description="Load files to the cloud storate for historical data"
# )
# parser.add_argument("param_1", type=str, help="Name of the file to upload")
# # Parse the arguments
# args = parser.parse_args()
# filename = args.param_1
client = initialize_client()
# load_historical_file(client, filename)
process_historical_files(client)