updating week format starting on Monday, new staking contracts and new weekly data
285f2a6
from minio import Minio | |
from minio.error import S3Error | |
import os | |
import argparse | |
from utils import HIST_DIR | |
MINIO_ENDPOINT = "minio.autonolas.tech" | |
ACCESS_KEY = os.environ.get("CLOUD_ACCESS_KEY", None) | |
SECRET_KEY = os.environ.get("CLOUD_SECRET_KEY", None) | |
BUCKET_NAME = "weekly-stats" | |
FOLDER_NAME = "historical_data" | |
def initialize_client(): | |
# Initialize the MinIO client | |
client = Minio( | |
MINIO_ENDPOINT, | |
access_key=ACCESS_KEY, | |
secret_key=SECRET_KEY, | |
secure=True, # Set to False if not using HTTPS | |
) | |
return client | |
def upload_file(client, filename: str, file_path: str) -> bool: | |
"""Upload a file to the bucket""" | |
try: | |
OBJECT_NAME = FOLDER_NAME + "/" + filename | |
print( | |
f"filename={filename}, object_name={OBJECT_NAME} and file_path={file_path}" | |
) | |
client.fput_object( | |
BUCKET_NAME, OBJECT_NAME, file_path, part_size=10 * 1024 * 1024 | |
) # 10MB parts | |
print(f"File '{file_path}' uploaded as '{OBJECT_NAME}'.") | |
return True | |
except S3Error as err: | |
print(f"Error uploading file: {err}") | |
return False | |
def download_file(client, filename: str, file_path: str): | |
"""Download the file back""" | |
try: | |
OBJECT_NAME = FOLDER_NAME + "/" + filename | |
client.fget_object(BUCKET_NAME, OBJECT_NAME, "downloaded_" + file_path) | |
print(f"File '{OBJECT_NAME}' downloaded as 'downloaded_{file_path}'.") | |
except S3Error as err: | |
print(f"Error downloading file: {err}") | |
def load_historical_file(client, filename: str) -> bool: | |
"""Function to load one file into the cloud storage""" | |
file_path = filename | |
file_path = HIST_DIR / filename | |
return upload_file(client, filename, file_path) | |
def upload_historical_file(filename: str): | |
client = initialize_client() | |
load_historical_file(client=client, filename=filename) | |
def process_historical_files(client): | |
"""Process all parquet files in historical_data folder""" | |
# Walk through all files in the folder | |
for filename in os.listdir(HIST_DIR): | |
# Check if file is a parquet file | |
if filename.endswith(".parquet"): | |
try: | |
if load_historical_file(client, filename): | |
print(f"Successfully processed {filename}") | |
else: | |
print("Error loading the files") | |
except Exception as e: | |
print(f"Error processing {filename}: {str(e)}") | |
if __name__ == "__main__": | |
# parser = argparse.ArgumentParser( | |
# description="Load files to the cloud storate for historical data" | |
# ) | |
# parser.add_argument("param_1", type=str, help="Name of the file to upload") | |
# # Parse the arguments | |
# args = parser.parse_args() | |
# filename = args.param_1 | |
client = initialize_client() | |
# load_historical_file(client, filename) | |
process_historical_files(client) | |