|
from minio import Minio |
|
from minio.error import S3Error |
|
import os |
|
import argparse |
|
|
|
from utils import HIST_DIR |
|
|
|
MINIO_ENDPOINT = "minio.autonolas.tech" |
|
ACCESS_KEY = os.environ.get("CLOUD_ACCESS_KEY", None) |
|
SECRET_KEY = os.environ.get("CLOUD_SECRET_KEY", None) |
|
BUCKET_NAME = "weekly-stats" |
|
FOLDER_NAME = "historical_data" |
|
|
|
|
|
def initialize_client(): |
|
|
|
client = Minio( |
|
MINIO_ENDPOINT, |
|
access_key=ACCESS_KEY, |
|
secret_key=SECRET_KEY, |
|
secure=True, |
|
) |
|
return client |
|
|
|
|
|
def upload_file(client, filename: str, file_path: str): |
|
"""Upload a file to the bucket""" |
|
try: |
|
OBJECT_NAME = FOLDER_NAME + "/" + filename |
|
print( |
|
f"filename={filename}, object_name={OBJECT_NAME} and file_path={file_path}" |
|
) |
|
client.fput_object( |
|
BUCKET_NAME, OBJECT_NAME, file_path, part_size=10 * 1024 * 1024 |
|
) |
|
print(f"File '{file_path}' uploaded as '{OBJECT_NAME}'.") |
|
except S3Error as err: |
|
print(f"Error uploading file: {err}") |
|
|
|
|
|
def download_file(client, filename: str, file_path: str): |
|
"""Download the file back""" |
|
try: |
|
OBJECT_NAME = FOLDER_NAME + "/" + filename |
|
client.fget_object(BUCKET_NAME, OBJECT_NAME, "downloaded_" + file_path) |
|
print(f"File '{OBJECT_NAME}' downloaded as 'downloaded_{file_path}'.") |
|
except S3Error as err: |
|
print(f"Error downloading file: {err}") |
|
|
|
|
|
def load_historical_file(client, filename: str): |
|
"""Function to load one file into the cloud storage""" |
|
file_path = filename |
|
file_path = HIST_DIR / filename |
|
upload_file(client, filename, file_path) |
|
|
|
|
|
def process_historical_files(client): |
|
"""Process all parquet files in historical_data folder""" |
|
|
|
|
|
for filename in os.listdir(HIST_DIR): |
|
|
|
if filename.endswith(".parquet"): |
|
try: |
|
load_historical_file(client, filename) |
|
print(f"Successfully processed {filename}") |
|
except Exception as e: |
|
print(f"Error processing {filename}: {str(e)}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
client = initialize_client() |
|
|
|
process_historical_files(client) |
|
|