updating week format starting on Monday, new staking contracts and new weekly data
285f2a6
import logging | |
from datetime import datetime | |
import pandas as pd | |
from markets import etl as mkt_etl, DEFAULT_FILENAME as MARKETS_FILENAME, fpmmTrades_etl | |
from tools import DEFAULT_FILENAME as TOOLS_FILENAME, generate_tools_file | |
from profitability import run_profitability_analysis, add_trades_profitability | |
from utils import ( | |
get_question, | |
current_answer, | |
RPC, | |
measure_execution_time, | |
DATA_DIR, | |
HIST_DIR, | |
TMP_DIR, | |
) | |
from get_mech_info import ( | |
get_mech_events_since_last_run, | |
update_fpmmTrades_parquet, | |
update_json_files, | |
) | |
from update_tools_accuracy import compute_tools_accuracy | |
from cleaning_old_info import clean_old_data_from_parquet_files | |
from web3_utils import updating_timestamps | |
from manage_space_files import move_files | |
from cloud_storage import upload_historical_file | |
from tools_metrics import compute_tools_based_datasets | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
datefmt="%Y-%m-%d %H:%M:%S", | |
) | |
logger = logging.getLogger(__name__) | |
def add_current_answer(tools_filename: str): | |
# Get currentAnswer from FPMMS | |
fpmms = pd.read_parquet(DATA_DIR / MARKETS_FILENAME) | |
tools = pd.read_parquet(DATA_DIR / tools_filename) | |
# Get the question from the tools | |
logging.info("Getting the question and current answer for the tools") | |
tools["title"] = tools["prompt_request"].apply(lambda x: get_question(x)) | |
tools["currentAnswer"] = tools["title"].apply(lambda x: current_answer(x, fpmms)) | |
tools["currentAnswer"] = tools["currentAnswer"].str.replace("yes", "Yes") | |
tools["currentAnswer"] = tools["currentAnswer"].str.replace("no", "No") | |
# Save the tools data after the updates on the content | |
tools.to_parquet(DATA_DIR / tools_filename, index=False) | |
del fpmms | |
def save_historical_data(): | |
"""Function to save a copy of the main trades and tools file | |
into the historical folder""" | |
print("Saving historical data copies") | |
current_datetime = datetime.now() | |
timestamp = current_datetime.strftime("%Y%m%d_%H%M%S") | |
try: | |
tools = pd.read_parquet(TMP_DIR / "tools.parquet") | |
filename = f"tools_{timestamp}.parquet" | |
tools.to_parquet(HIST_DIR / filename, index=False) | |
# save into cloud storage | |
upload_historical_file(filename) | |
except Exception as e: | |
print(f"Error saving tools file in the historical folder {e}") | |
try: | |
all_trades = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet") | |
filename = f"all_trades_profitability_{timestamp}.parquet" | |
all_trades.to_parquet(HIST_DIR / filename, index=False) | |
# save into cloud storage | |
upload_historical_file(filename) | |
except Exception as e: | |
print( | |
f"Error saving all_trades_profitability file in the historical folder {e}" | |
) | |
def only_new_weekly_analysis(): | |
"""Run weekly analysis for the FPMMS project.""" | |
rpc = RPC | |
# Run markets ETL | |
logging.info("Running markets ETL") | |
mkt_etl(MARKETS_FILENAME) | |
logging.info("Markets ETL completed") | |
# Mech events ETL | |
logging.info("Generating the mech json files") | |
# get only new data | |
latest_timestamp = get_mech_events_since_last_run(logger) | |
if latest_timestamp == None: | |
print("Error while getting the mech events") | |
return | |
logging.info(f"Finished generating the mech json files from {latest_timestamp}") | |
# FpmmTrades ETL | |
fpmmTrades_etl( | |
trades_filename="new_fpmmTrades.parquet", | |
from_timestamp=int(latest_timestamp.timestamp()), | |
) | |
# merge with previous file | |
print("Merging with previous fpmmTrades file") | |
update_fpmmTrades_parquet(trades_filename="new_fpmmTrades.parquet") | |
# Run tools ETL | |
logging.info("Generate and parse the tools content") | |
# generate only new file | |
generate_tools_file("new_tools_info.json", "new_tools.parquet") | |
logging.info("Tools ETL completed") | |
add_current_answer("new_tools.parquet") | |
# # Run profitability analysis | |
logging.info("Running profitability analysis") | |
run_profitability_analysis( | |
tools_filename="new_tools.parquet", | |
trades_filename="new_fpmmTrades.parquet", | |
merge=True, | |
) | |
logging.info("Profitability analysis completed") | |
# merge new json files with old json files | |
update_json_files() | |
save_historical_data() | |
try: | |
clean_old_data_from_parquet_files("2024-11-19") | |
except Exception as e: | |
print("Error cleaning the oldest information from parquet files") | |
print(f"reason = {e}") | |
compute_tools_accuracy() | |
compute_tools_based_datasets() | |
# # move to tmp folder the new generated files | |
move_files() | |
logging.info("Weekly analysis files generated and saved") | |
def restoring_trades_data(from_date: str, to_date: str): | |
# Convert the string to datetime64[ns, UTC] | |
min_date_utc = pd.to_datetime(from_date, format="%Y-%m-%d", utc=True) | |
max_date_utc = pd.to_datetime(to_date, format="%Y-%m-%d", utc=True) | |
logging.info("Running markets ETL") | |
mkt_etl(MARKETS_FILENAME) | |
logging.info("Markets ETL completed") | |
fpmmTrades_etl( | |
trades_filename="missing_fpmmTrades.parquet", | |
from_timestamp=int(min_date_utc.timestamp()), | |
to_timestamp=int(max_date_utc.timestamp()), | |
) | |
# merge with the old file | |
print("Merging with previous fpmmTrades file") | |
update_fpmmTrades_parquet(trades_filename="missing_fpmmTrades.parquet") | |
# adding tools information | |
add_trades_profitability(trades_filename="missing_fpmmTrades.parquet") | |
if __name__ == "__main__": | |
only_new_weekly_analysis() | |
# restoring_trades_data("2024-12-28", "2025-01-07") | |