rosacastillo's picture
adding new scripts
b3c2f09
raw
history blame
13 kB
from string import Template
from typing import Any
from datetime import datetime, timedelta, UTC
from utils import SUBGRAPH_API_KEY, measure_execution_time, DATA_DIR
import requests
import pandas as pd
import numpy as np
from mech_request_utils import (
collect_all_mech_delivers,
collect_all_mech_requests,
clean_mech_delivers,
fix_duplicate_requestIds,
merge_requests_delivers,
get_ipfs_data,
merge_json_files,
)
from web3_utils import updating_timestamps
OLD_MECH_SUBGRAPH_URL = (
"https://api.thegraph.com/subgraphs/name/stakewise/ethereum-gnosis"
)
NETWORK_SUBGRAPH_URL = Template(
"""https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/FxV6YUix58SpYmLBwc9gEHkwjfkqwe1X5FJQjn8nKPyA"""
)
SUBGRAPH_HEADERS = {
"Accept": "application/json, multipart/mixed",
"Content-Type": "application/json",
}
QUERY_BATCH_SIZE = 1000
DATETIME_60_DAYS_AGO = datetime.now(UTC) - timedelta(days=60)
DATETIME_10_DAYS_AGO = datetime.now(UTC) - timedelta(days=10)
DATETIME_10_HOURS_AGO = datetime.now(UTC) - timedelta(hours=10)
BLOCK_NUMBER = Template(
"""
{
blocks(
first: 1,
orderBy: timestamp,
orderDirection: asc,
where: {
timestamp_gte: "${timestamp_from}",
timestamp_lte: "${timestamp_to}"
}
){
id,
number,
}
}
"""
)
LATEST_BLOCK_QUERY = """
{
blocks(
first: 1,
orderBy: timestamp,
orderDirection: desc,
){
id,
number,
}
}
"""
def fetch_last_block_number() -> dict:
# print(f"Sending query for the subgraph = {query}")
network_subgraph_url = NETWORK_SUBGRAPH_URL.substitute(
subgraph_api_key=SUBGRAPH_API_KEY
)
query = LATEST_BLOCK_QUERY
response = requests.post(
network_subgraph_url,
headers=SUBGRAPH_HEADERS,
json={"query": query},
timeout=300,
)
result_json = response.json()
print(f"Response of the query={result_json}")
blocks = result_json.get("data", {}).get("blocks", "")
if len(blocks) == 0:
raise ValueError(f"The query {query} did not return any results")
return blocks[0]
def fetch_block_number(timestamp_from: int, timestamp_to: int) -> dict:
"""Get a block number by its timestamp margins."""
query = BLOCK_NUMBER.substitute(
timestamp_from=timestamp_from, timestamp_to=timestamp_to
)
# print(f"Sending query for the subgraph = {query}")
network_subgraph_url = NETWORK_SUBGRAPH_URL.substitute(
subgraph_api_key=SUBGRAPH_API_KEY
)
response = requests.post(
network_subgraph_url,
headers=SUBGRAPH_HEADERS,
json={"query": query},
timeout=300,
)
# print(f"block query: {query}")
result_json = response.json()
print(f"Response of the query={result_json}")
blocks = result_json.get("data", {}).get("blocks", "")
if len(blocks) == 0:
raise ValueError(f"The query {query} did not return any results")
return blocks[0]
def update_json_files():
merge_json_files("mech_requests.json", "new_mech_requests.json")
merge_json_files("mech_delivers.json", "new_mech_delivers.json")
merge_json_files("merged_requests.json", "new_merged_requests.json")
merge_json_files("tools_info.json", "new_tools_info.json")
def update_fpmmTrades_parquet(trades_filename: str) -> pd.DataFrame:
# Read old trades parquet file
try:
old_trades_df = pd.read_parquet(DATA_DIR / "fpmmTrades.parquet")
except Exception as e:
print(f"Error reading old trades parquet file {e}")
return None
try:
new_trades_df = pd.read_parquet(DATA_DIR / trades_filename)
except Exception as e:
print(f"Error reading new trades parquet file {e}")
return None
# merge two dataframes
merge_df = pd.concat([old_trades_df, new_trades_df], ignore_index=True)
# avoid numpy objects
merge_df["fpmm.arbitrationOccurred"] = merge_df["fpmm.arbitrationOccurred"].astype(
bool
)
merge_df["fpmm.isPendingArbitration"] = merge_df[
"fpmm.isPendingArbitration"
].astype(bool)
# Check for duplicates
print(f"Initial length before removing duplicates in fpmmTrades= {len(merge_df)}")
# Remove duplicates
# fpmm.outcomes is a numpy array
merge_df.drop_duplicates("id", inplace=True)
print(f"Final length after removing duplicates in fpmmTrades= {len(merge_df)}")
# save the parquet file
merge_df.to_parquet(DATA_DIR / "fpmmTrades.parquet", index=False)
return
def update_all_trades_parquet(new_trades_df: pd.DataFrame) -> pd.DataFrame:
# Read old all_trades parquet file
try:
old_trades_df = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet")
except Exception as e:
print(f"Error reading old trades parquet file {e}")
return None
# merge two dataframes
merge_df = pd.concat([old_trades_df, new_trades_df], ignore_index=True)
# Check for duplicates
print(f"Initial length before removing duplicates in all_trades= {len(merge_df)}")
# Remove duplicates
merge_df.drop_duplicates("trade_id", inplace=True)
print(f"Final length after removing duplicates in all_trades = {len(merge_df)}")
return merge_df
def update_tools_parquet(rpc: str, new_tools_filename: pd.DataFrame):
try:
old_tools_df = pd.read_parquet(DATA_DIR / "tools.parquet")
except Exception as e:
print(f"Error reading old tools parquet file {e}")
return None
try:
new_tools_df = pd.read_parquet(DATA_DIR / new_tools_filename)
# the new file has no request_time yet
updating_timestamps(rpc, new_tools_filename)
except Exception as e:
print(f"Error reading new trades parquet file {e}")
return None
# merge two dataframes
merge_df = pd.concat([old_tools_df, new_tools_df], ignore_index=True)
# Check for duplicates
print(f"Initial length before removing duplicates in tools= {len(merge_df)}")
# Remove duplicates
merge_df.drop_duplicates(
subset=["request_id", "request_time"], keep="last", inplace=True
)
print(f"Final length after removing duplicates in tools= {len(merge_df)}")
# save the parquet file
merge_df.to_parquet(DATA_DIR / "tools.parquet", index=False)
def get_mech_info_2024() -> dict[str, Any]:
"""Query the subgraph to get the 2024 information from mech."""
date = "2024-01-01"
datetime_jan_2024 = datetime.strptime(date, "%Y-%m-%d")
timestamp_jan_2024 = int(datetime_jan_2024.timestamp())
margin = timedelta(seconds=5)
timestamp_jan_2024_plus_margin = int((datetime_jan_2024 + margin).timestamp())
jan_block_number = fetch_block_number(
timestamp_jan_2024, timestamp_jan_2024_plus_margin
)
# expecting only one block
jan_block_number = jan_block_number.get("number", "")
if jan_block_number.isdigit():
jan_block_number = int(jan_block_number)
if jan_block_number == "":
raise ValueError(
"Could not find a valid block number for the first of January 2024"
)
MECH_TO_INFO = {
# this block number is when the creator had its first tx ever, and after this mech's creation
"0xff82123dfb52ab75c417195c5fdb87630145ae81": (
"old_mech_abi.json",
jan_block_number,
),
# this block number is when this mech was created
"0x77af31de935740567cf4ff1986d04b2c964a786a": (
"new_mech_abi.json",
jan_block_number,
),
}
return MECH_TO_INFO
def get_last_block_number() -> int:
last_block_number = fetch_last_block_number()
# expecting only one block
last_block_number = last_block_number.get("number", "")
if last_block_number.isdigit():
last_block_number = int(last_block_number)
if last_block_number == "":
raise ValueError("Could not find a valid block number for last month data")
return last_block_number
def get_last_60_days_block_number() -> int:
timestamp_60_days_ago = int((DATETIME_60_DAYS_AGO).timestamp())
margin = timedelta(seconds=5)
timestamp_60_days_ago_plus_margin = int((DATETIME_60_DAYS_AGO + margin).timestamp())
last_month_block_number = fetch_block_number(
timestamp_60_days_ago, timestamp_60_days_ago_plus_margin
)
# expecting only one block
last_month_block_number = last_month_block_number.get("number", "")
if last_month_block_number.isdigit():
last_month_block_number = int(last_month_block_number)
if last_month_block_number == "":
raise ValueError("Could not find a valid block number for last month data")
return last_month_block_number
def get_mech_info_last_60_days() -> dict[str, Any]:
"""Query the subgraph to get the last 60 days of information from mech."""
last_month_block_number = get_last_60_days_block_number()
MECH_TO_INFO = {
# this block number is when the creator had its first tx ever, and after this mech's creation
"0xff82123dfb52ab75c417195c5fdb87630145ae81": (
"old_mech_abi.json",
last_month_block_number,
),
# this block number is when this mech was created
"0x77af31de935740567cf4ff1986d04b2c964a786a": (
"new_mech_abi.json",
last_month_block_number,
),
}
print(f"last 60 days block number {last_month_block_number}")
return MECH_TO_INFO
@measure_execution_time
def get_mech_events_since_last_run():
"""Function to download only the new events since the last execution."""
# Read the latest date from stored data
try:
all_trades = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet")
latest_timestamp = max(all_trades.creation_timestamp)
print(f"Updating data since {latest_timestamp}")
except Exception:
print("Error while reading the profitability parquet file")
return None
# Get the block number of lastest date
five_seconds = np.timedelta64(5, "s")
last_run_block_number = fetch_block_number(
int(latest_timestamp.timestamp()),
int((latest_timestamp + five_seconds).timestamp()),
)
# expecting only one block
last_run_block_number = last_run_block_number.get("number", "")
if last_run_block_number.isdigit():
last_run_block_number = int(last_run_block_number)
if last_run_block_number == "":
raise ValueError("Could not find a valid block number for last collected data")
last_block_number = get_last_block_number()
# mech requests
requests_dict, duplicatedReqId = collect_all_mech_requests(
from_block=last_run_block_number,
to_block=last_block_number,
filename="new_mech_requests.json",
)
# mech delivers
delivers_dict, duplicatedIds = collect_all_mech_delivers(
from_block=last_run_block_number,
to_block=last_block_number,
filename="new_mech_delivers.json",
)
if delivers_dict is None:
return None
# clean delivers
clean_mech_delivers("new_mech_requests.json", "new_mech_delivers.json")
# solve duplicated requestIds
block_map = fix_duplicate_requestIds(
"new_mech_requests.json", "new_mech_delivers.json"
)
# merge the two files into one source
not_found = merge_requests_delivers(
"new_mech_requests.json", "new_mech_delivers.json", "new_merged_requests.json"
)
# Add ipfs contents
get_ipfs_data("new_merged_requests.json", "new_tools_info.json")
return latest_timestamp
@measure_execution_time
def get_mech_events_last_60_days():
earliest_block_number = get_last_60_days_block_number()
last_block_number = get_last_block_number()
# mech requests
requests_dict, duplicatedReqId = collect_all_mech_requests(
from_block=earliest_block_number,
to_block=last_block_number,
filename="mech_requests.json",
)
# mech delivers
delivers_dict, duplicatedIds = collect_all_mech_delivers(
from_block=earliest_block_number,
to_block=last_block_number,
filename="mech_delivers.json",
)
# clean delivers
clean_mech_delivers("mech_requests.json", "mech_delivers.json")
# solve duplicated requestIds
block_map = fix_duplicate_requestIds("mech_requests.json", "mech_delivers.json")
# merge the two files into one source
not_found = merge_requests_delivers(
"mech_requests.json", "mech_delivers.json", "merged_requests.json"
)
# Add ipfs contents
get_ipfs_data("merged_requests.json", "tools_info.json")
if __name__ == "__main__":
get_mech_events_last_60_days()
# result = get_mech_info_last_60_days()
# print(result)