rosacastillo's picture
updating week format starting on Monday, new staking contracts and new weekly data
285f2a6
raw
history blame
12.5 kB
from string import Template
from typing import Any
from datetime import datetime, timedelta, UTC
from utils import (
SUBGRAPH_API_KEY,
measure_execution_time,
DATA_DIR,
TMP_DIR,
NETWORK_SUBGRAPH_URL,
transform_to_datetime,
)
import requests
import pandas as pd
import numpy as np
from mech_request_utils import (
collect_all_mech_delivers,
collect_all_mech_requests,
clean_mech_delivers,
fix_duplicate_requestIds,
merge_requests_delivers,
get_ipfs_data,
merge_json_files,
)
SUBGRAPH_HEADERS = {
"Accept": "application/json, multipart/mixed",
"Content-Type": "application/json",
}
QUERY_BATCH_SIZE = 1000
DATETIME_60_DAYS_AGO = datetime.now(UTC) - timedelta(days=60)
DATETIME_10_DAYS_AGO = datetime.now(UTC) - timedelta(days=10)
DATETIME_10_HOURS_AGO = datetime.now(UTC) - timedelta(hours=10)
BLOCK_NUMBER = Template(
"""
{
blocks(
first: 1,
orderBy: timestamp,
orderDirection: asc,
where: {
timestamp_gte: "${timestamp_from}",
timestamp_lte: "${timestamp_to}"
}
){
id,
number,
}
}
"""
)
LATEST_BLOCK_QUERY = """
{
blocks(
first: 1,
orderBy: timestamp,
orderDirection: desc,
){
id,
number,
}
}
"""
def fetch_last_block_number() -> dict:
# print(f"Sending query for the subgraph = {query}")
network_subgraph_url = NETWORK_SUBGRAPH_URL.substitute(
subgraph_api_key=SUBGRAPH_API_KEY
)
query = LATEST_BLOCK_QUERY
response = requests.post(
network_subgraph_url,
headers=SUBGRAPH_HEADERS,
json={"query": query},
timeout=300,
)
result_json = response.json()
print(f"Response of the query={result_json}")
blocks = result_json.get("data", {}).get("blocks", "")
if len(blocks) == 0:
raise ValueError(f"The query {query} did not return any results")
return blocks[0]
def fetch_block_number(timestamp_from: int, timestamp_to: int) -> dict:
"""Get a block number by its timestamp margins."""
query = BLOCK_NUMBER.substitute(
timestamp_from=timestamp_from, timestamp_to=timestamp_to
)
# print(f"Sending query for the subgraph = {query}")
network_subgraph_url = NETWORK_SUBGRAPH_URL.substitute(
subgraph_api_key=SUBGRAPH_API_KEY
)
response = requests.post(
network_subgraph_url,
headers=SUBGRAPH_HEADERS,
json={"query": query},
timeout=300,
)
# print(f"block query: {query}")
result_json = response.json()
print(f"Response of the query={result_json}")
blocks = result_json.get("data", {}).get("blocks", "")
if len(blocks) == 0:
raise ValueError(f"The query {query} did not return any results")
return blocks[0]
def update_json_files():
merge_json_files("mech_requests.json", "new_mech_requests.json")
merge_json_files("mech_delivers.json", "new_mech_delivers.json")
merge_json_files("merged_requests.json", "new_merged_requests.json")
merge_json_files("tools_info.json", "new_tools_info.json")
def update_fpmmTrades_parquet(trades_filename: str) -> pd.DataFrame:
# Read old trades parquet file
try:
old_trades_df = pd.read_parquet(TMP_DIR / "fpmmTrades.parquet")
except Exception as e:
print(f"Error reading old trades parquet file {e}")
return None
try:
new_trades_df = pd.read_parquet(DATA_DIR / trades_filename)
except Exception as e:
print(f"Error reading new trades parquet file {e}")
return None
# lowercase and strip creator_address
new_trades_df["trader_address"] = (
new_trades_df["trader_address"].str.lower().str.strip()
)
# ensure creationTimestamp compatibility
try:
new_trades_df["creationTimestamp"] = new_trades_df["creationTimestamp"].apply(
lambda x: transform_to_datetime(x)
)
except Exception as e:
print(f"Transformation not needed")
try:
old_trades_df["creationTimestamp"] = old_trades_df["creationTimestamp"].apply(
lambda x: transform_to_datetime(x)
)
except Exception as e:
print(f"Transformation not needed")
# merge two dataframes
merge_df = pd.concat([old_trades_df, new_trades_df], ignore_index=True)
# avoid numpy objects
merge_df["fpmm.arbitrationOccurred"] = merge_df["fpmm.arbitrationOccurred"].astype(
bool
)
merge_df["fpmm.isPendingArbitration"] = merge_df[
"fpmm.isPendingArbitration"
].astype(bool)
# Check for duplicates
print(f"Initial length before removing duplicates in fpmmTrades= {len(merge_df)}")
# Remove duplicates
# fpmm.outcomes is a numpy array
merge_df.drop_duplicates("id", keep="last", inplace=True)
print(f"Final length after removing duplicates in fpmmTrades= {len(merge_df)}")
# save the parquet file
merge_df.to_parquet(TMP_DIR / "fpmmTrades.parquet", index=False)
return
def update_all_trades_parquet(new_trades_df: pd.DataFrame) -> pd.DataFrame:
# Read old all_trades parquet file
try:
old_trades_df = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet")
except Exception as e:
print(f"Error reading old trades parquet file {e}")
return None
# merge two dataframes
merge_df = pd.concat([old_trades_df, new_trades_df], ignore_index=True)
# Check for duplicates
print(f"Initial length before removing duplicates in all_trades= {len(merge_df)}")
# Remove duplicates
merge_df.drop_duplicates("trade_id", inplace=True)
print(f"Final length after removing duplicates in all_trades = {len(merge_df)}")
return merge_df
def update_tools_parquet(new_tools_filename: pd.DataFrame):
try:
old_tools_df = pd.read_parquet(TMP_DIR / "tools.parquet")
except Exception as e:
print(f"Error reading old tools parquet file {e}")
return None
try:
new_tools_df = pd.read_parquet(DATA_DIR / new_tools_filename)
except Exception as e:
print(f"Error reading new trades parquet file {e}")
return None
# merge two dataframes
merge_df = pd.concat([old_tools_df, new_tools_df], ignore_index=True)
# Check for duplicates
print(f"Initial length before removing duplicates in tools= {len(merge_df)}")
# Remove duplicates
merge_df.drop_duplicates(
subset=["request_id", "request_time"], keep="last", inplace=True
)
print(f"Final length after removing duplicates in tools= {len(merge_df)}")
# save the parquet file
merge_df.to_parquet(TMP_DIR / "tools.parquet", index=False)
def get_mech_info_2024() -> dict[str, Any]:
"""Query the subgraph to get the 2024 information from mech."""
date = "2024-01-01"
datetime_jan_2024 = datetime.strptime(date, "%Y-%m-%d")
timestamp_jan_2024 = int(datetime_jan_2024.timestamp())
margin = timedelta(seconds=5)
timestamp_jan_2024_plus_margin = int((datetime_jan_2024 + margin).timestamp())
jan_block_number = fetch_block_number(
timestamp_jan_2024, timestamp_jan_2024_plus_margin
)
# expecting only one block
jan_block_number = jan_block_number.get("number", "")
if jan_block_number.isdigit():
jan_block_number = int(jan_block_number)
if jan_block_number == "":
raise ValueError(
"Could not find a valid block number for the first of January 2024"
)
MECH_TO_INFO = {
# this block number is when the creator had its first tx ever, and after this mech's creation
"0xff82123dfb52ab75c417195c5fdb87630145ae81": (
"old_mech_abi.json",
jan_block_number,
),
# this block number is when this mech was created
"0x77af31de935740567cf4ff1986d04b2c964a786a": (
"new_mech_abi.json",
jan_block_number,
),
}
return MECH_TO_INFO
def get_last_block_number() -> int:
last_block_number = fetch_last_block_number()
# expecting only one block
last_block_number = last_block_number.get("number", "")
if last_block_number.isdigit():
last_block_number = int(last_block_number)
if last_block_number == "":
raise ValueError("Could not find a valid block number for last month data")
return last_block_number
def get_last_60_days_block_number() -> int:
timestamp_60_days_ago = int((DATETIME_60_DAYS_AGO).timestamp())
margin = timedelta(seconds=5)
timestamp_60_days_ago_plus_margin = int((DATETIME_60_DAYS_AGO + margin).timestamp())
last_month_block_number = fetch_block_number(
timestamp_60_days_ago, timestamp_60_days_ago_plus_margin
)
# expecting only one block
last_month_block_number = last_month_block_number.get("number", "")
if last_month_block_number.isdigit():
last_month_block_number = int(last_month_block_number)
if last_month_block_number == "":
raise ValueError("Could not find a valid block number for last month data")
return last_month_block_number
def get_mech_info_last_60_days() -> dict[str, Any]:
"""Query the subgraph to get the last 60 days of information from mech."""
last_month_block_number = get_last_60_days_block_number()
MECH_TO_INFO = {
# this block number is when the creator had its first tx ever, and after this mech's creation
"0xff82123dfb52ab75c417195c5fdb87630145ae81": (
"old_mech_abi.json",
last_month_block_number,
),
# this block number is when this mech was created
"0x77af31de935740567cf4ff1986d04b2c964a786a": (
"new_mech_abi.json",
last_month_block_number,
),
}
print(f"last 60 days block number {last_month_block_number}")
return MECH_TO_INFO
@measure_execution_time
def get_mech_events_since_last_run(logger):
"""Function to download only the new events since the last execution."""
# Read the latest date from stored data
try:
all_trades = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet")
latest_timestamp = max(all_trades.creation_timestamp)
# cutoff_date = "2024-12-22"
# latest_timestamp = pd.Timestamp(
# datetime.strptime(cutoff_date, "%Y-%m-%d")
# ).tz_localize("UTC")
print(f"Updating data since {latest_timestamp}")
except Exception:
print("Error while reading the profitability parquet file")
return None
# Get the block number of lastest date
five_seconds = np.timedelta64(5, "s")
last_run_block_number = fetch_block_number(
int(latest_timestamp.timestamp()),
int((latest_timestamp + five_seconds).timestamp()),
)
# expecting only one block
last_run_block_number = last_run_block_number.get("number", "")
if last_run_block_number.isdigit():
last_run_block_number = int(last_run_block_number)
if last_run_block_number == "":
raise ValueError("Could not find a valid block number for last collected data")
last_block_number = get_last_block_number()
# mech requests
requests_dict, duplicatedReqId, nr_errors = collect_all_mech_requests(
from_block=last_run_block_number,
to_block=last_block_number,
filename="new_mech_requests.json",
)
print(f"NUMBER OF MECH REQUEST ERRORS={nr_errors}")
# mech delivers
delivers_dict, duplicatedIds, nr_errors = collect_all_mech_delivers(
from_block=last_run_block_number,
to_block=last_block_number,
filename="new_mech_delivers.json",
)
print(f"NUMBER OF MECH DELIVER ERRORS={nr_errors}")
if delivers_dict is None:
return None
# clean delivers
clean_mech_delivers("new_mech_requests.json", "new_mech_delivers.json")
# solve duplicated requestIds
block_map = fix_duplicate_requestIds(
"new_mech_requests.json", "new_mech_delivers.json"
)
# merge the two files into one source
not_found = merge_requests_delivers(
"new_mech_requests.json", "new_mech_delivers.json", "new_merged_requests.json"
)
# Add ipfs contents
get_ipfs_data("new_merged_requests.json", "new_tools_info.json", logger)
return latest_timestamp
if __name__ == "__main__":
get_mech_events_since_last_run()
# result = get_mech_info_last_60_days()
# print(result)