from minio import Minio from minio.error import S3Error import os import argparse from utils import HIST_DIR MINIO_ENDPOINT = "minio.autonolas.tech" ACCESS_KEY = os.environ.get("CLOUD_ACCESS_KEY", None) SECRET_KEY = os.environ.get("CLOUD_SECRET_KEY", None) BUCKET_NAME = "weekly-stats" FOLDER_NAME = "historical_data" def initialize_client(): # Initialize the MinIO client client = Minio( MINIO_ENDPOINT, access_key=ACCESS_KEY, secret_key=SECRET_KEY, secure=True, # Set to False if not using HTTPS ) return client def upload_file(client, filename: str, file_path: str) -> bool: """Upload a file to the bucket""" try: OBJECT_NAME = FOLDER_NAME + "/" + filename print( f"filename={filename}, object_name={OBJECT_NAME} and file_path={file_path}" ) client.fput_object( BUCKET_NAME, OBJECT_NAME, file_path, part_size=10 * 1024 * 1024 ) # 10MB parts print(f"File '{file_path}' uploaded as '{OBJECT_NAME}'.") return True except S3Error as err: print(f"Error uploading file: {err}") return False def download_file(client, filename: str, file_path: str): """Download the file back""" try: OBJECT_NAME = FOLDER_NAME + "/" + filename client.fget_object(BUCKET_NAME, OBJECT_NAME, "downloaded_" + file_path) print(f"File '{OBJECT_NAME}' downloaded as 'downloaded_{file_path}'.") except S3Error as err: print(f"Error downloading file: {err}") def load_historical_file(client, filename: str) -> bool: """Function to load one file into the cloud storage""" file_path = filename file_path = HIST_DIR / filename return upload_file(client, filename, file_path) def upload_historical_file(filename: str): client = initialize_client() load_historical_file(client=client, filename=filename) def process_historical_files(client): """Process all parquet files in historical_data folder""" # Walk through all files in the folder for filename in os.listdir(HIST_DIR): # Check if file is a parquet file if filename.endswith(".parquet"): try: if load_historical_file(client, filename): print(f"Successfully processed {filename}") else: print("Error loading the files") except Exception as e: print(f"Error processing {filename}: {str(e)}") if __name__ == "__main__": # parser = argparse.ArgumentParser( # description="Load files to the cloud storate for historical data" # ) # parser.add_argument("param_1", type=str, help="Name of the file to upload") # # Parse the arguments # args = parser.parse_args() # filename = args.param_1 client = initialize_client() # load_historical_file(client, filename) process_historical_files(client)