import logging from datetime import datetime import pandas as pd from markets import etl as mkt_etl, DEFAULT_FILENAME as MARKETS_FILENAME, fpmmTrades_etl from tools import DEFAULT_FILENAME as TOOLS_FILENAME, generate_tools_file from profitability import run_profitability_analysis, add_trades_profitability from utils import ( get_question, current_answer, RPC, measure_execution_time, DATA_DIR, HIST_DIR, TMP_DIR, ) from get_mech_info import ( get_mech_events_since_last_run, update_fpmmTrades_parquet, update_json_files, ) from update_tools_accuracy import compute_tools_accuracy from cleaning_old_info import clean_old_data_from_parquet_files from web3_utils import updating_timestamps from manage_space_files import move_files from cloud_storage import upload_historical_file from tools_metrics import compute_tools_based_datasets logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) def add_current_answer(tools_filename: str): # Get currentAnswer from FPMMS fpmms = pd.read_parquet(DATA_DIR / MARKETS_FILENAME) tools = pd.read_parquet(DATA_DIR / tools_filename) # Get the question from the tools logging.info("Getting the question and current answer for the tools") tools["title"] = tools["prompt_request"].apply(lambda x: get_question(x)) tools["currentAnswer"] = tools["title"].apply(lambda x: current_answer(x, fpmms)) tools["currentAnswer"] = tools["currentAnswer"].str.replace("yes", "Yes") tools["currentAnswer"] = tools["currentAnswer"].str.replace("no", "No") # Save the tools data after the updates on the content tools.to_parquet(DATA_DIR / tools_filename, index=False) del fpmms def save_historical_data(): """Function to save a copy of the main trades and tools file into the historical folder""" print("Saving historical data copies") current_datetime = datetime.now() timestamp = current_datetime.strftime("%Y%m%d_%H%M%S") try: tools = pd.read_parquet(TMP_DIR / "tools.parquet") filename = f"tools_{timestamp}.parquet" tools.to_parquet(HIST_DIR / filename, index=False) # save into cloud storage upload_historical_file(filename) except Exception as e: print(f"Error saving tools file in the historical folder {e}") try: all_trades = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet") filename = f"all_trades_profitability_{timestamp}.parquet" all_trades.to_parquet(HIST_DIR / filename, index=False) # save into cloud storage upload_historical_file(filename) except Exception as e: print( f"Error saving all_trades_profitability file in the historical folder {e}" ) @measure_execution_time def only_new_weekly_analysis(): """Run weekly analysis for the FPMMS project.""" rpc = RPC # Run markets ETL logging.info("Running markets ETL") mkt_etl(MARKETS_FILENAME) logging.info("Markets ETL completed") # Mech events ETL logging.info("Generating the mech json files") # get only new data latest_timestamp = get_mech_events_since_last_run(logger) if latest_timestamp == None: print("Error while getting the mech events") return logging.info(f"Finished generating the mech json files from {latest_timestamp}") # FpmmTrades ETL fpmmTrades_etl( trades_filename="new_fpmmTrades.parquet", from_timestamp=int(latest_timestamp.timestamp()), ) # merge with previous file print("Merging with previous fpmmTrades file") update_fpmmTrades_parquet(trades_filename="new_fpmmTrades.parquet") # Run tools ETL logging.info("Generate and parse the tools content") # generate only new file generate_tools_file("new_tools_info.json", "new_tools.parquet") logging.info("Tools ETL completed") add_current_answer("new_tools.parquet") # # Run profitability analysis logging.info("Running profitability analysis") run_profitability_analysis( tools_filename="new_tools.parquet", trades_filename="new_fpmmTrades.parquet", merge=True, ) logging.info("Profitability analysis completed") # merge new json files with old json files update_json_files() save_historical_data() try: clean_old_data_from_parquet_files("2024-11-19") except Exception as e: print("Error cleaning the oldest information from parquet files") print(f"reason = {e}") compute_tools_accuracy() compute_tools_based_datasets() # # move to tmp folder the new generated files move_files() logging.info("Weekly analysis files generated and saved") def restoring_trades_data(from_date: str, to_date: str): # Convert the string to datetime64[ns, UTC] min_date_utc = pd.to_datetime(from_date, format="%Y-%m-%d", utc=True) max_date_utc = pd.to_datetime(to_date, format="%Y-%m-%d", utc=True) logging.info("Running markets ETL") mkt_etl(MARKETS_FILENAME) logging.info("Markets ETL completed") fpmmTrades_etl( trades_filename="missing_fpmmTrades.parquet", from_timestamp=int(min_date_utc.timestamp()), to_timestamp=int(max_date_utc.timestamp()), ) # merge with the old file print("Merging with previous fpmmTrades file") update_fpmmTrades_parquet(trades_filename="missing_fpmmTrades.parquet") # adding tools information add_trades_profitability(trades_filename="missing_fpmmTrades.parquet") if __name__ == "__main__": only_new_weekly_analysis() # restoring_trades_data("2024-12-28", "2025-01-07")