updating week format starting on Monday, new staking contracts and new weekly data
285f2a6
from string import Template | |
from typing import Any | |
from datetime import datetime, timedelta, UTC | |
from utils import ( | |
SUBGRAPH_API_KEY, | |
measure_execution_time, | |
DATA_DIR, | |
TMP_DIR, | |
NETWORK_SUBGRAPH_URL, | |
transform_to_datetime, | |
) | |
import requests | |
import pandas as pd | |
import numpy as np | |
from mech_request_utils import ( | |
collect_all_mech_delivers, | |
collect_all_mech_requests, | |
clean_mech_delivers, | |
fix_duplicate_requestIds, | |
merge_requests_delivers, | |
get_ipfs_data, | |
merge_json_files, | |
) | |
SUBGRAPH_HEADERS = { | |
"Accept": "application/json, multipart/mixed", | |
"Content-Type": "application/json", | |
} | |
QUERY_BATCH_SIZE = 1000 | |
DATETIME_60_DAYS_AGO = datetime.now(UTC) - timedelta(days=60) | |
DATETIME_10_DAYS_AGO = datetime.now(UTC) - timedelta(days=10) | |
DATETIME_10_HOURS_AGO = datetime.now(UTC) - timedelta(hours=10) | |
BLOCK_NUMBER = Template( | |
""" | |
{ | |
blocks( | |
first: 1, | |
orderBy: timestamp, | |
orderDirection: asc, | |
where: { | |
timestamp_gte: "${timestamp_from}", | |
timestamp_lte: "${timestamp_to}" | |
} | |
){ | |
id, | |
number, | |
} | |
} | |
""" | |
) | |
LATEST_BLOCK_QUERY = """ | |
{ | |
blocks( | |
first: 1, | |
orderBy: timestamp, | |
orderDirection: desc, | |
){ | |
id, | |
number, | |
} | |
} | |
""" | |
def fetch_last_block_number() -> dict: | |
# print(f"Sending query for the subgraph = {query}") | |
network_subgraph_url = NETWORK_SUBGRAPH_URL.substitute( | |
subgraph_api_key=SUBGRAPH_API_KEY | |
) | |
query = LATEST_BLOCK_QUERY | |
response = requests.post( | |
network_subgraph_url, | |
headers=SUBGRAPH_HEADERS, | |
json={"query": query}, | |
timeout=300, | |
) | |
result_json = response.json() | |
print(f"Response of the query={result_json}") | |
blocks = result_json.get("data", {}).get("blocks", "") | |
if len(blocks) == 0: | |
raise ValueError(f"The query {query} did not return any results") | |
return blocks[0] | |
def fetch_block_number(timestamp_from: int, timestamp_to: int) -> dict: | |
"""Get a block number by its timestamp margins.""" | |
query = BLOCK_NUMBER.substitute( | |
timestamp_from=timestamp_from, timestamp_to=timestamp_to | |
) | |
# print(f"Sending query for the subgraph = {query}") | |
network_subgraph_url = NETWORK_SUBGRAPH_URL.substitute( | |
subgraph_api_key=SUBGRAPH_API_KEY | |
) | |
response = requests.post( | |
network_subgraph_url, | |
headers=SUBGRAPH_HEADERS, | |
json={"query": query}, | |
timeout=300, | |
) | |
# print(f"block query: {query}") | |
result_json = response.json() | |
print(f"Response of the query={result_json}") | |
blocks = result_json.get("data", {}).get("blocks", "") | |
if len(blocks) == 0: | |
raise ValueError(f"The query {query} did not return any results") | |
return blocks[0] | |
def update_json_files(): | |
merge_json_files("mech_requests.json", "new_mech_requests.json") | |
merge_json_files("mech_delivers.json", "new_mech_delivers.json") | |
merge_json_files("merged_requests.json", "new_merged_requests.json") | |
merge_json_files("tools_info.json", "new_tools_info.json") | |
def update_fpmmTrades_parquet(trades_filename: str) -> pd.DataFrame: | |
# Read old trades parquet file | |
try: | |
old_trades_df = pd.read_parquet(TMP_DIR / "fpmmTrades.parquet") | |
except Exception as e: | |
print(f"Error reading old trades parquet file {e}") | |
return None | |
try: | |
new_trades_df = pd.read_parquet(DATA_DIR / trades_filename) | |
except Exception as e: | |
print(f"Error reading new trades parquet file {e}") | |
return None | |
# lowercase and strip creator_address | |
new_trades_df["trader_address"] = ( | |
new_trades_df["trader_address"].str.lower().str.strip() | |
) | |
# ensure creationTimestamp compatibility | |
try: | |
new_trades_df["creationTimestamp"] = new_trades_df["creationTimestamp"].apply( | |
lambda x: transform_to_datetime(x) | |
) | |
except Exception as e: | |
print(f"Transformation not needed") | |
try: | |
old_trades_df["creationTimestamp"] = old_trades_df["creationTimestamp"].apply( | |
lambda x: transform_to_datetime(x) | |
) | |
except Exception as e: | |
print(f"Transformation not needed") | |
# merge two dataframes | |
merge_df = pd.concat([old_trades_df, new_trades_df], ignore_index=True) | |
# avoid numpy objects | |
merge_df["fpmm.arbitrationOccurred"] = merge_df["fpmm.arbitrationOccurred"].astype( | |
bool | |
) | |
merge_df["fpmm.isPendingArbitration"] = merge_df[ | |
"fpmm.isPendingArbitration" | |
].astype(bool) | |
# Check for duplicates | |
print(f"Initial length before removing duplicates in fpmmTrades= {len(merge_df)}") | |
# Remove duplicates | |
# fpmm.outcomes is a numpy array | |
merge_df.drop_duplicates("id", keep="last", inplace=True) | |
print(f"Final length after removing duplicates in fpmmTrades= {len(merge_df)}") | |
# save the parquet file | |
merge_df.to_parquet(TMP_DIR / "fpmmTrades.parquet", index=False) | |
return | |
def update_all_trades_parquet(new_trades_df: pd.DataFrame) -> pd.DataFrame: | |
# Read old all_trades parquet file | |
try: | |
old_trades_df = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet") | |
except Exception as e: | |
print(f"Error reading old trades parquet file {e}") | |
return None | |
# merge two dataframes | |
merge_df = pd.concat([old_trades_df, new_trades_df], ignore_index=True) | |
# Check for duplicates | |
print(f"Initial length before removing duplicates in all_trades= {len(merge_df)}") | |
# Remove duplicates | |
merge_df.drop_duplicates("trade_id", inplace=True) | |
print(f"Final length after removing duplicates in all_trades = {len(merge_df)}") | |
return merge_df | |
def update_tools_parquet(new_tools_filename: pd.DataFrame): | |
try: | |
old_tools_df = pd.read_parquet(TMP_DIR / "tools.parquet") | |
except Exception as e: | |
print(f"Error reading old tools parquet file {e}") | |
return None | |
try: | |
new_tools_df = pd.read_parquet(DATA_DIR / new_tools_filename) | |
except Exception as e: | |
print(f"Error reading new trades parquet file {e}") | |
return None | |
# merge two dataframes | |
merge_df = pd.concat([old_tools_df, new_tools_df], ignore_index=True) | |
# Check for duplicates | |
print(f"Initial length before removing duplicates in tools= {len(merge_df)}") | |
# Remove duplicates | |
merge_df.drop_duplicates( | |
subset=["request_id", "request_time"], keep="last", inplace=True | |
) | |
print(f"Final length after removing duplicates in tools= {len(merge_df)}") | |
# save the parquet file | |
merge_df.to_parquet(TMP_DIR / "tools.parquet", index=False) | |
def get_mech_info_2024() -> dict[str, Any]: | |
"""Query the subgraph to get the 2024 information from mech.""" | |
date = "2024-01-01" | |
datetime_jan_2024 = datetime.strptime(date, "%Y-%m-%d") | |
timestamp_jan_2024 = int(datetime_jan_2024.timestamp()) | |
margin = timedelta(seconds=5) | |
timestamp_jan_2024_plus_margin = int((datetime_jan_2024 + margin).timestamp()) | |
jan_block_number = fetch_block_number( | |
timestamp_jan_2024, timestamp_jan_2024_plus_margin | |
) | |
# expecting only one block | |
jan_block_number = jan_block_number.get("number", "") | |
if jan_block_number.isdigit(): | |
jan_block_number = int(jan_block_number) | |
if jan_block_number == "": | |
raise ValueError( | |
"Could not find a valid block number for the first of January 2024" | |
) | |
MECH_TO_INFO = { | |
# this block number is when the creator had its first tx ever, and after this mech's creation | |
"0xff82123dfb52ab75c417195c5fdb87630145ae81": ( | |
"old_mech_abi.json", | |
jan_block_number, | |
), | |
# this block number is when this mech was created | |
"0x77af31de935740567cf4ff1986d04b2c964a786a": ( | |
"new_mech_abi.json", | |
jan_block_number, | |
), | |
} | |
return MECH_TO_INFO | |
def get_last_block_number() -> int: | |
last_block_number = fetch_last_block_number() | |
# expecting only one block | |
last_block_number = last_block_number.get("number", "") | |
if last_block_number.isdigit(): | |
last_block_number = int(last_block_number) | |
if last_block_number == "": | |
raise ValueError("Could not find a valid block number for last month data") | |
return last_block_number | |
def get_last_60_days_block_number() -> int: | |
timestamp_60_days_ago = int((DATETIME_60_DAYS_AGO).timestamp()) | |
margin = timedelta(seconds=5) | |
timestamp_60_days_ago_plus_margin = int((DATETIME_60_DAYS_AGO + margin).timestamp()) | |
last_month_block_number = fetch_block_number( | |
timestamp_60_days_ago, timestamp_60_days_ago_plus_margin | |
) | |
# expecting only one block | |
last_month_block_number = last_month_block_number.get("number", "") | |
if last_month_block_number.isdigit(): | |
last_month_block_number = int(last_month_block_number) | |
if last_month_block_number == "": | |
raise ValueError("Could not find a valid block number for last month data") | |
return last_month_block_number | |
def get_mech_info_last_60_days() -> dict[str, Any]: | |
"""Query the subgraph to get the last 60 days of information from mech.""" | |
last_month_block_number = get_last_60_days_block_number() | |
MECH_TO_INFO = { | |
# this block number is when the creator had its first tx ever, and after this mech's creation | |
"0xff82123dfb52ab75c417195c5fdb87630145ae81": ( | |
"old_mech_abi.json", | |
last_month_block_number, | |
), | |
# this block number is when this mech was created | |
"0x77af31de935740567cf4ff1986d04b2c964a786a": ( | |
"new_mech_abi.json", | |
last_month_block_number, | |
), | |
} | |
print(f"last 60 days block number {last_month_block_number}") | |
return MECH_TO_INFO | |
def get_mech_events_since_last_run(logger): | |
"""Function to download only the new events since the last execution.""" | |
# Read the latest date from stored data | |
try: | |
all_trades = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet") | |
latest_timestamp = max(all_trades.creation_timestamp) | |
# cutoff_date = "2024-12-22" | |
# latest_timestamp = pd.Timestamp( | |
# datetime.strptime(cutoff_date, "%Y-%m-%d") | |
# ).tz_localize("UTC") | |
print(f"Updating data since {latest_timestamp}") | |
except Exception: | |
print("Error while reading the profitability parquet file") | |
return None | |
# Get the block number of lastest date | |
five_seconds = np.timedelta64(5, "s") | |
last_run_block_number = fetch_block_number( | |
int(latest_timestamp.timestamp()), | |
int((latest_timestamp + five_seconds).timestamp()), | |
) | |
# expecting only one block | |
last_run_block_number = last_run_block_number.get("number", "") | |
if last_run_block_number.isdigit(): | |
last_run_block_number = int(last_run_block_number) | |
if last_run_block_number == "": | |
raise ValueError("Could not find a valid block number for last collected data") | |
last_block_number = get_last_block_number() | |
# mech requests | |
requests_dict, duplicatedReqId, nr_errors = collect_all_mech_requests( | |
from_block=last_run_block_number, | |
to_block=last_block_number, | |
filename="new_mech_requests.json", | |
) | |
print(f"NUMBER OF MECH REQUEST ERRORS={nr_errors}") | |
# mech delivers | |
delivers_dict, duplicatedIds, nr_errors = collect_all_mech_delivers( | |
from_block=last_run_block_number, | |
to_block=last_block_number, | |
filename="new_mech_delivers.json", | |
) | |
print(f"NUMBER OF MECH DELIVER ERRORS={nr_errors}") | |
if delivers_dict is None: | |
return None | |
# clean delivers | |
clean_mech_delivers("new_mech_requests.json", "new_mech_delivers.json") | |
# solve duplicated requestIds | |
block_map = fix_duplicate_requestIds( | |
"new_mech_requests.json", "new_mech_delivers.json" | |
) | |
# merge the two files into one source | |
not_found = merge_requests_delivers( | |
"new_mech_requests.json", "new_mech_delivers.json", "new_merged_requests.json" | |
) | |
# Add ipfs contents | |
get_ipfs_data("new_merged_requests.json", "new_tools_info.json", logger) | |
return latest_timestamp | |
if __name__ == "__main__": | |
get_mech_events_since_last_run() | |
# result = get_mech_info_last_60_days() | |
# print(result) | |