from string import Template from typing import Any from datetime import datetime, timedelta, UTC from utils import ( SUBGRAPH_API_KEY, measure_execution_time, DATA_DIR, TMP_DIR, NETWORK_SUBGRAPH_URL, transform_to_datetime, ) import requests import pandas as pd import numpy as np from mech_request_utils import ( collect_all_mech_delivers, collect_all_mech_requests, clean_mech_delivers, fix_duplicate_requestIds, merge_requests_delivers, get_ipfs_data, merge_json_files, ) SUBGRAPH_HEADERS = { "Accept": "application/json, multipart/mixed", "Content-Type": "application/json", } QUERY_BATCH_SIZE = 1000 DATETIME_60_DAYS_AGO = datetime.now(UTC) - timedelta(days=60) DATETIME_10_DAYS_AGO = datetime.now(UTC) - timedelta(days=10) DATETIME_10_HOURS_AGO = datetime.now(UTC) - timedelta(hours=10) BLOCK_NUMBER = Template( """ { blocks( first: 1, orderBy: timestamp, orderDirection: asc, where: { timestamp_gte: "${timestamp_from}", timestamp_lte: "${timestamp_to}" } ){ id, number, } } """ ) LATEST_BLOCK_QUERY = """ { blocks( first: 1, orderBy: timestamp, orderDirection: desc, ){ id, number, } } """ def fetch_last_block_number() -> dict: # print(f"Sending query for the subgraph = {query}") network_subgraph_url = NETWORK_SUBGRAPH_URL.substitute( subgraph_api_key=SUBGRAPH_API_KEY ) query = LATEST_BLOCK_QUERY response = requests.post( network_subgraph_url, headers=SUBGRAPH_HEADERS, json={"query": query}, timeout=300, ) result_json = response.json() print(f"Response of the query={result_json}") blocks = result_json.get("data", {}).get("blocks", "") if len(blocks) == 0: raise ValueError(f"The query {query} did not return any results") return blocks[0] def fetch_block_number(timestamp_from: int, timestamp_to: int) -> dict: """Get a block number by its timestamp margins.""" query = BLOCK_NUMBER.substitute( timestamp_from=timestamp_from, timestamp_to=timestamp_to ) # print(f"Sending query for the subgraph = {query}") network_subgraph_url = NETWORK_SUBGRAPH_URL.substitute( subgraph_api_key=SUBGRAPH_API_KEY ) response = requests.post( network_subgraph_url, headers=SUBGRAPH_HEADERS, json={"query": query}, timeout=300, ) # print(f"block query: {query}") result_json = response.json() print(f"Response of the query={result_json}") blocks = result_json.get("data", {}).get("blocks", "") if len(blocks) == 0: raise ValueError(f"The query {query} did not return any results") return blocks[0] def update_json_files(): merge_json_files("mech_requests.json", "new_mech_requests.json") merge_json_files("mech_delivers.json", "new_mech_delivers.json") merge_json_files("merged_requests.json", "new_merged_requests.json") merge_json_files("tools_info.json", "new_tools_info.json") def update_fpmmTrades_parquet(trades_filename: str) -> pd.DataFrame: # Read old trades parquet file try: old_trades_df = pd.read_parquet(TMP_DIR / "fpmmTrades.parquet") except Exception as e: print(f"Error reading old trades parquet file {e}") return None try: new_trades_df = pd.read_parquet(DATA_DIR / trades_filename) except Exception as e: print(f"Error reading new trades parquet file {e}") return None # lowercase and strip creator_address new_trades_df["trader_address"] = ( new_trades_df["trader_address"].str.lower().str.strip() ) # ensure creationTimestamp compatibility try: new_trades_df["creationTimestamp"] = new_trades_df["creationTimestamp"].apply( lambda x: transform_to_datetime(x) ) except Exception as e: print(f"Transformation not needed") try: old_trades_df["creationTimestamp"] = old_trades_df["creationTimestamp"].apply( lambda x: transform_to_datetime(x) ) except Exception as e: print(f"Transformation not needed") # merge two dataframes merge_df = pd.concat([old_trades_df, new_trades_df], ignore_index=True) # avoid numpy objects merge_df["fpmm.arbitrationOccurred"] = merge_df["fpmm.arbitrationOccurred"].astype( bool ) merge_df["fpmm.isPendingArbitration"] = merge_df[ "fpmm.isPendingArbitration" ].astype(bool) # Check for duplicates print(f"Initial length before removing duplicates in fpmmTrades= {len(merge_df)}") # Remove duplicates # fpmm.outcomes is a numpy array merge_df.drop_duplicates("id", keep="last", inplace=True) print(f"Final length after removing duplicates in fpmmTrades= {len(merge_df)}") # save the parquet file merge_df.to_parquet(TMP_DIR / "fpmmTrades.parquet", index=False) return def update_all_trades_parquet(new_trades_df: pd.DataFrame) -> pd.DataFrame: # Read old all_trades parquet file try: old_trades_df = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet") except Exception as e: print(f"Error reading old trades parquet file {e}") return None # merge two dataframes merge_df = pd.concat([old_trades_df, new_trades_df], ignore_index=True) # Check for duplicates print(f"Initial length before removing duplicates in all_trades= {len(merge_df)}") # Remove duplicates merge_df.drop_duplicates("trade_id", inplace=True) print(f"Final length after removing duplicates in all_trades = {len(merge_df)}") return merge_df def update_tools_parquet(new_tools_filename: pd.DataFrame): try: old_tools_df = pd.read_parquet(TMP_DIR / "tools.parquet") except Exception as e: print(f"Error reading old tools parquet file {e}") return None try: new_tools_df = pd.read_parquet(DATA_DIR / new_tools_filename) except Exception as e: print(f"Error reading new trades parquet file {e}") return None # merge two dataframes merge_df = pd.concat([old_tools_df, new_tools_df], ignore_index=True) # Check for duplicates print(f"Initial length before removing duplicates in tools= {len(merge_df)}") # Remove duplicates merge_df.drop_duplicates( subset=["request_id", "request_time"], keep="last", inplace=True ) print(f"Final length after removing duplicates in tools= {len(merge_df)}") # save the parquet file merge_df.to_parquet(TMP_DIR / "tools.parquet", index=False) def get_mech_info_2024() -> dict[str, Any]: """Query the subgraph to get the 2024 information from mech.""" date = "2024-01-01" datetime_jan_2024 = datetime.strptime(date, "%Y-%m-%d") timestamp_jan_2024 = int(datetime_jan_2024.timestamp()) margin = timedelta(seconds=5) timestamp_jan_2024_plus_margin = int((datetime_jan_2024 + margin).timestamp()) jan_block_number = fetch_block_number( timestamp_jan_2024, timestamp_jan_2024_plus_margin ) # expecting only one block jan_block_number = jan_block_number.get("number", "") if jan_block_number.isdigit(): jan_block_number = int(jan_block_number) if jan_block_number == "": raise ValueError( "Could not find a valid block number for the first of January 2024" ) MECH_TO_INFO = { # this block number is when the creator had its first tx ever, and after this mech's creation "0xff82123dfb52ab75c417195c5fdb87630145ae81": ( "old_mech_abi.json", jan_block_number, ), # this block number is when this mech was created "0x77af31de935740567cf4ff1986d04b2c964a786a": ( "new_mech_abi.json", jan_block_number, ), } return MECH_TO_INFO def get_last_block_number() -> int: last_block_number = fetch_last_block_number() # expecting only one block last_block_number = last_block_number.get("number", "") if last_block_number.isdigit(): last_block_number = int(last_block_number) if last_block_number == "": raise ValueError("Could not find a valid block number for last month data") return last_block_number def get_last_60_days_block_number() -> int: timestamp_60_days_ago = int((DATETIME_60_DAYS_AGO).timestamp()) margin = timedelta(seconds=5) timestamp_60_days_ago_plus_margin = int((DATETIME_60_DAYS_AGO + margin).timestamp()) last_month_block_number = fetch_block_number( timestamp_60_days_ago, timestamp_60_days_ago_plus_margin ) # expecting only one block last_month_block_number = last_month_block_number.get("number", "") if last_month_block_number.isdigit(): last_month_block_number = int(last_month_block_number) if last_month_block_number == "": raise ValueError("Could not find a valid block number for last month data") return last_month_block_number def get_mech_info_last_60_days() -> dict[str, Any]: """Query the subgraph to get the last 60 days of information from mech.""" last_month_block_number = get_last_60_days_block_number() MECH_TO_INFO = { # this block number is when the creator had its first tx ever, and after this mech's creation "0xff82123dfb52ab75c417195c5fdb87630145ae81": ( "old_mech_abi.json", last_month_block_number, ), # this block number is when this mech was created "0x77af31de935740567cf4ff1986d04b2c964a786a": ( "new_mech_abi.json", last_month_block_number, ), } print(f"last 60 days block number {last_month_block_number}") return MECH_TO_INFO @measure_execution_time def get_mech_events_since_last_run(logger): """Function to download only the new events since the last execution.""" # Read the latest date from stored data try: all_trades = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet") latest_timestamp = max(all_trades.creation_timestamp) # cutoff_date = "2024-12-22" # latest_timestamp = pd.Timestamp( # datetime.strptime(cutoff_date, "%Y-%m-%d") # ).tz_localize("UTC") print(f"Updating data since {latest_timestamp}") except Exception: print("Error while reading the profitability parquet file") return None # Get the block number of lastest date five_seconds = np.timedelta64(5, "s") last_run_block_number = fetch_block_number( int(latest_timestamp.timestamp()), int((latest_timestamp + five_seconds).timestamp()), ) # expecting only one block last_run_block_number = last_run_block_number.get("number", "") if last_run_block_number.isdigit(): last_run_block_number = int(last_run_block_number) if last_run_block_number == "": raise ValueError("Could not find a valid block number for last collected data") last_block_number = get_last_block_number() # mech requests requests_dict, duplicatedReqId, nr_errors = collect_all_mech_requests( from_block=last_run_block_number, to_block=last_block_number, filename="new_mech_requests.json", ) print(f"NUMBER OF MECH REQUEST ERRORS={nr_errors}") # mech delivers delivers_dict, duplicatedIds, nr_errors = collect_all_mech_delivers( from_block=last_run_block_number, to_block=last_block_number, filename="new_mech_delivers.json", ) print(f"NUMBER OF MECH DELIVER ERRORS={nr_errors}") if delivers_dict is None: return None # clean delivers clean_mech_delivers("new_mech_requests.json", "new_mech_delivers.json") # solve duplicated requestIds block_map = fix_duplicate_requestIds( "new_mech_requests.json", "new_mech_delivers.json" ) # merge the two files into one source not_found = merge_requests_delivers( "new_mech_requests.json", "new_mech_delivers.json", "new_merged_requests.json" ) # Add ipfs contents get_ipfs_data("new_merged_requests.json", "new_tools_info.json", logger) return latest_timestamp if __name__ == "__main__": get_mech_events_since_last_run() # result = get_mech_info_last_60_days() # print(result)