|
from string import Template |
|
from typing import Any |
|
from datetime import datetime, timedelta, UTC |
|
from utils import SUBGRAPH_API_KEY, measure_execution_time, DATA_DIR |
|
import requests |
|
import pandas as pd |
|
import numpy as np |
|
from mech_request_utils import ( |
|
collect_all_mech_delivers, |
|
collect_all_mech_requests, |
|
clean_mech_delivers, |
|
fix_duplicate_requestIds, |
|
merge_requests_delivers, |
|
get_ipfs_data, |
|
merge_json_files, |
|
) |
|
from web3_utils import updating_timestamps |
|
|
|
OLD_MECH_SUBGRAPH_URL = ( |
|
"https://api.thegraph.com/subgraphs/name/stakewise/ethereum-gnosis" |
|
) |
|
|
|
NETWORK_SUBGRAPH_URL = Template( |
|
"""https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/FxV6YUix58SpYmLBwc9gEHkwjfkqwe1X5FJQjn8nKPyA""" |
|
) |
|
|
|
SUBGRAPH_HEADERS = { |
|
"Accept": "application/json, multipart/mixed", |
|
"Content-Type": "application/json", |
|
} |
|
|
|
QUERY_BATCH_SIZE = 1000 |
|
DATETIME_60_DAYS_AGO = datetime.now(UTC) - timedelta(days=60) |
|
DATETIME_10_DAYS_AGO = datetime.now(UTC) - timedelta(days=10) |
|
DATETIME_10_HOURS_AGO = datetime.now(UTC) - timedelta(hours=10) |
|
BLOCK_NUMBER = Template( |
|
""" |
|
{ |
|
blocks( |
|
first: 1, |
|
orderBy: timestamp, |
|
orderDirection: asc, |
|
where: { |
|
timestamp_gte: "${timestamp_from}", |
|
timestamp_lte: "${timestamp_to}" |
|
} |
|
){ |
|
id, |
|
number, |
|
} |
|
} |
|
""" |
|
) |
|
|
|
LATEST_BLOCK_QUERY = """ |
|
{ |
|
blocks( |
|
first: 1, |
|
orderBy: timestamp, |
|
orderDirection: desc, |
|
){ |
|
id, |
|
number, |
|
} |
|
} |
|
""" |
|
|
|
|
|
def fetch_last_block_number() -> dict: |
|
|
|
network_subgraph_url = NETWORK_SUBGRAPH_URL.substitute( |
|
subgraph_api_key=SUBGRAPH_API_KEY |
|
) |
|
query = LATEST_BLOCK_QUERY |
|
response = requests.post( |
|
network_subgraph_url, |
|
headers=SUBGRAPH_HEADERS, |
|
json={"query": query}, |
|
timeout=300, |
|
) |
|
|
|
result_json = response.json() |
|
print(f"Response of the query={result_json}") |
|
blocks = result_json.get("data", {}).get("blocks", "") |
|
if len(blocks) == 0: |
|
raise ValueError(f"The query {query} did not return any results") |
|
return blocks[0] |
|
|
|
|
|
def fetch_block_number(timestamp_from: int, timestamp_to: int) -> dict: |
|
"""Get a block number by its timestamp margins.""" |
|
|
|
query = BLOCK_NUMBER.substitute( |
|
timestamp_from=timestamp_from, timestamp_to=timestamp_to |
|
) |
|
|
|
network_subgraph_url = NETWORK_SUBGRAPH_URL.substitute( |
|
subgraph_api_key=SUBGRAPH_API_KEY |
|
) |
|
response = requests.post( |
|
network_subgraph_url, |
|
headers=SUBGRAPH_HEADERS, |
|
json={"query": query}, |
|
timeout=300, |
|
) |
|
|
|
result_json = response.json() |
|
print(f"Response of the query={result_json}") |
|
blocks = result_json.get("data", {}).get("blocks", "") |
|
if len(blocks) == 0: |
|
raise ValueError(f"The query {query} did not return any results") |
|
return blocks[0] |
|
|
|
|
|
def update_json_files(): |
|
merge_json_files("mech_requests.json", "new_mech_requests.json") |
|
merge_json_files("mech_delivers.json", "new_mech_delivers.json") |
|
merge_json_files("merged_requests.json", "new_merged_requests.json") |
|
merge_json_files("tools_info.json", "new_tools_info.json") |
|
|
|
|
|
def update_fpmmTrades_parquet(trades_filename: str) -> pd.DataFrame: |
|
|
|
try: |
|
old_trades_df = pd.read_parquet(DATA_DIR / "fpmmTrades.parquet") |
|
except Exception as e: |
|
print(f"Error reading old trades parquet file {e}") |
|
return None |
|
|
|
try: |
|
new_trades_df = pd.read_parquet(DATA_DIR / trades_filename) |
|
except Exception as e: |
|
print(f"Error reading new trades parquet file {e}") |
|
return None |
|
|
|
|
|
merge_df = pd.concat([old_trades_df, new_trades_df], ignore_index=True) |
|
|
|
merge_df["fpmm.arbitrationOccurred"] = merge_df["fpmm.arbitrationOccurred"].astype( |
|
bool |
|
) |
|
merge_df["fpmm.isPendingArbitration"] = merge_df[ |
|
"fpmm.isPendingArbitration" |
|
].astype(bool) |
|
|
|
|
|
print(f"Initial length before removing duplicates in fpmmTrades= {len(merge_df)}") |
|
|
|
|
|
|
|
merge_df.drop_duplicates("id", inplace=True) |
|
print(f"Final length after removing duplicates in fpmmTrades= {len(merge_df)}") |
|
|
|
|
|
merge_df.to_parquet(DATA_DIR / "fpmmTrades.parquet", index=False) |
|
|
|
return |
|
|
|
|
|
def update_all_trades_parquet(new_trades_df: pd.DataFrame) -> pd.DataFrame: |
|
|
|
try: |
|
old_trades_df = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet") |
|
except Exception as e: |
|
print(f"Error reading old trades parquet file {e}") |
|
return None |
|
|
|
merge_df = pd.concat([old_trades_df, new_trades_df], ignore_index=True) |
|
|
|
|
|
print(f"Initial length before removing duplicates in all_trades= {len(merge_df)}") |
|
|
|
|
|
merge_df.drop_duplicates("trade_id", inplace=True) |
|
print(f"Final length after removing duplicates in all_trades = {len(merge_df)}") |
|
return merge_df |
|
|
|
|
|
def update_tools_parquet(rpc: str, new_tools_filename: pd.DataFrame): |
|
try: |
|
old_tools_df = pd.read_parquet(DATA_DIR / "tools.parquet") |
|
except Exception as e: |
|
print(f"Error reading old tools parquet file {e}") |
|
return None |
|
try: |
|
new_tools_df = pd.read_parquet(DATA_DIR / new_tools_filename) |
|
|
|
updating_timestamps(rpc, new_tools_filename) |
|
except Exception as e: |
|
print(f"Error reading new trades parquet file {e}") |
|
return None |
|
|
|
|
|
merge_df = pd.concat([old_tools_df, new_tools_df], ignore_index=True) |
|
|
|
|
|
print(f"Initial length before removing duplicates in tools= {len(merge_df)}") |
|
|
|
|
|
merge_df.drop_duplicates( |
|
subset=["request_id", "request_time"], keep="last", inplace=True |
|
) |
|
print(f"Final length after removing duplicates in tools= {len(merge_df)}") |
|
|
|
|
|
merge_df.to_parquet(DATA_DIR / "tools.parquet", index=False) |
|
|
|
|
|
def get_mech_info_2024() -> dict[str, Any]: |
|
"""Query the subgraph to get the 2024 information from mech.""" |
|
|
|
date = "2024-01-01" |
|
datetime_jan_2024 = datetime.strptime(date, "%Y-%m-%d") |
|
timestamp_jan_2024 = int(datetime_jan_2024.timestamp()) |
|
margin = timedelta(seconds=5) |
|
timestamp_jan_2024_plus_margin = int((datetime_jan_2024 + margin).timestamp()) |
|
|
|
jan_block_number = fetch_block_number( |
|
timestamp_jan_2024, timestamp_jan_2024_plus_margin |
|
) |
|
|
|
jan_block_number = jan_block_number.get("number", "") |
|
if jan_block_number.isdigit(): |
|
jan_block_number = int(jan_block_number) |
|
|
|
if jan_block_number == "": |
|
raise ValueError( |
|
"Could not find a valid block number for the first of January 2024" |
|
) |
|
MECH_TO_INFO = { |
|
|
|
"0xff82123dfb52ab75c417195c5fdb87630145ae81": ( |
|
"old_mech_abi.json", |
|
jan_block_number, |
|
), |
|
|
|
"0x77af31de935740567cf4ff1986d04b2c964a786a": ( |
|
"new_mech_abi.json", |
|
jan_block_number, |
|
), |
|
} |
|
return MECH_TO_INFO |
|
|
|
|
|
def get_last_block_number() -> int: |
|
last_block_number = fetch_last_block_number() |
|
|
|
last_block_number = last_block_number.get("number", "") |
|
if last_block_number.isdigit(): |
|
last_block_number = int(last_block_number) |
|
|
|
if last_block_number == "": |
|
raise ValueError("Could not find a valid block number for last month data") |
|
return last_block_number |
|
|
|
|
|
def get_last_60_days_block_number() -> int: |
|
timestamp_60_days_ago = int((DATETIME_60_DAYS_AGO).timestamp()) |
|
margin = timedelta(seconds=5) |
|
timestamp_60_days_ago_plus_margin = int((DATETIME_60_DAYS_AGO + margin).timestamp()) |
|
|
|
last_month_block_number = fetch_block_number( |
|
timestamp_60_days_ago, timestamp_60_days_ago_plus_margin |
|
) |
|
|
|
last_month_block_number = last_month_block_number.get("number", "") |
|
if last_month_block_number.isdigit(): |
|
last_month_block_number = int(last_month_block_number) |
|
|
|
if last_month_block_number == "": |
|
raise ValueError("Could not find a valid block number for last month data") |
|
return last_month_block_number |
|
|
|
|
|
def get_mech_info_last_60_days() -> dict[str, Any]: |
|
"""Query the subgraph to get the last 60 days of information from mech.""" |
|
last_month_block_number = get_last_60_days_block_number() |
|
|
|
MECH_TO_INFO = { |
|
|
|
"0xff82123dfb52ab75c417195c5fdb87630145ae81": ( |
|
"old_mech_abi.json", |
|
last_month_block_number, |
|
), |
|
|
|
"0x77af31de935740567cf4ff1986d04b2c964a786a": ( |
|
"new_mech_abi.json", |
|
last_month_block_number, |
|
), |
|
} |
|
print(f"last 60 days block number {last_month_block_number}") |
|
return MECH_TO_INFO |
|
|
|
|
|
@measure_execution_time |
|
def get_mech_events_since_last_run(): |
|
"""Function to download only the new events since the last execution.""" |
|
|
|
|
|
try: |
|
all_trades = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet") |
|
latest_timestamp = max(all_trades.creation_timestamp) |
|
print(f"Updating data since {latest_timestamp}") |
|
except Exception: |
|
print("Error while reading the profitability parquet file") |
|
return None |
|
|
|
|
|
five_seconds = np.timedelta64(5, "s") |
|
last_run_block_number = fetch_block_number( |
|
int(latest_timestamp.timestamp()), |
|
int((latest_timestamp + five_seconds).timestamp()), |
|
) |
|
|
|
last_run_block_number = last_run_block_number.get("number", "") |
|
if last_run_block_number.isdigit(): |
|
last_run_block_number = int(last_run_block_number) |
|
|
|
if last_run_block_number == "": |
|
raise ValueError("Could not find a valid block number for last collected data") |
|
last_block_number = get_last_block_number() |
|
|
|
|
|
requests_dict, duplicatedReqId = collect_all_mech_requests( |
|
from_block=last_run_block_number, |
|
to_block=last_block_number, |
|
filename="new_mech_requests.json", |
|
) |
|
|
|
|
|
delivers_dict, duplicatedIds = collect_all_mech_delivers( |
|
from_block=last_run_block_number, |
|
to_block=last_block_number, |
|
filename="new_mech_delivers.json", |
|
) |
|
if delivers_dict is None: |
|
return None |
|
|
|
clean_mech_delivers("new_mech_requests.json", "new_mech_delivers.json") |
|
|
|
|
|
block_map = fix_duplicate_requestIds( |
|
"new_mech_requests.json", "new_mech_delivers.json" |
|
) |
|
|
|
not_found = merge_requests_delivers( |
|
"new_mech_requests.json", "new_mech_delivers.json", "new_merged_requests.json" |
|
) |
|
|
|
|
|
get_ipfs_data("new_merged_requests.json", "new_tools_info.json") |
|
return latest_timestamp |
|
|
|
|
|
@measure_execution_time |
|
def get_mech_events_last_60_days(): |
|
earliest_block_number = get_last_60_days_block_number() |
|
last_block_number = get_last_block_number() |
|
|
|
requests_dict, duplicatedReqId = collect_all_mech_requests( |
|
from_block=earliest_block_number, |
|
to_block=last_block_number, |
|
filename="mech_requests.json", |
|
) |
|
|
|
|
|
delivers_dict, duplicatedIds = collect_all_mech_delivers( |
|
from_block=earliest_block_number, |
|
to_block=last_block_number, |
|
filename="mech_delivers.json", |
|
) |
|
|
|
|
|
clean_mech_delivers("mech_requests.json", "mech_delivers.json") |
|
|
|
|
|
block_map = fix_duplicate_requestIds("mech_requests.json", "mech_delivers.json") |
|
|
|
|
|
not_found = merge_requests_delivers( |
|
"mech_requests.json", "mech_delivers.json", "merged_requests.json" |
|
) |
|
|
|
|
|
get_ipfs_data("merged_requests.json", "tools_info.json") |
|
|
|
|
|
if __name__ == "__main__": |
|
get_mech_events_last_60_days() |
|
|
|
|
|
|
|
|