updating week format starting on Monday, new staking contracts and new weekly data
285f2a6
import sys | |
import pickle | |
import gc | |
import time | |
import requests | |
from functools import partial | |
from string import Template | |
from datetime import datetime | |
from concurrent.futures import ThreadPoolExecutor | |
from collections import defaultdict | |
from tqdm import tqdm | |
from web3 import Web3 | |
from typing import Any, Optional | |
from web3.types import BlockParams | |
from utils import ( | |
JSON_DATA_DIR, | |
DATA_DIR, | |
SUBGRAPH_API_KEY, | |
to_content, | |
SUBGRAPH_URL, | |
HIST_DIR, | |
TMP_DIR, | |
) | |
from queries import conditional_tokens_gc_user_query, omen_xdai_trades_query | |
import pandas as pd | |
REDUCE_FACTOR = 0.25 | |
SLEEP = 0.5 | |
QUERY_BATCH_SIZE = 1000 | |
FPMM_QS_CREATOR = "0x89c5cc945dd550bcffb72fe42bff002429f46fec" | |
FPMM_PEARL_CREATOR = "0xFfc8029154ECD55ABED15BD428bA596E7D23f557" | |
LATEST_BLOCK: Optional[int] = None | |
LATEST_BLOCK_NAME: BlockParams = "latest" | |
BLOCK_DATA_NUMBER = "number" | |
BLOCKS_CHUNK_SIZE = 10_000 | |
N_IPFS_RETRIES = 4 | |
N_RPC_RETRIES = 100 | |
RPC_POLL_INTERVAL = 0.05 | |
SUBGRAPH_POLL_INTERVAL = 0.05 | |
IPFS_POLL_INTERVAL = 0.2 # 5 calls per second | |
OMEN_SUBGRAPH_URL = Template( | |
"""https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/9fUVQpFwzpdWS9bq5WkAnmKbNNcoBwatMR4yZq81pbbz""" | |
) | |
headers = { | |
"Accept": "application/json, multipart/mixed", | |
"Content-Type": "application/json", | |
} | |
def parse_args() -> str: | |
"""Parse the arguments and return the RPC.""" | |
if len(sys.argv) != 2: | |
raise ValueError("Expected the RPC as a positional argument.") | |
return sys.argv[1] | |
def read_abi(abi_path: str) -> str: | |
"""Read and return the wxDAI contract's ABI.""" | |
with open(abi_path) as abi_file: | |
return abi_file.read() | |
def update_block_request_map(block_request_id_map: dict) -> None: | |
print("Saving block request id map info") | |
with open(JSON_DATA_DIR / "block_request_id_map.pickle", "wb") as handle: | |
pickle.dump(block_request_id_map, handle, protocol=pickle.HIGHEST_PROTOCOL) | |
def reduce_window(contract_instance, event, from_block, batch_size, latest_block): | |
"""Dynamically reduce the batch size window.""" | |
keep_fraction = 1 - REDUCE_FACTOR | |
events_filter = contract_instance.events[event].build_filter() | |
events_filter.fromBlock = from_block | |
batch_size = int(batch_size * keep_fraction) | |
events_filter.toBlock = min(from_block + batch_size, latest_block) | |
tqdm.write(f"RPC timed out! Resizing batch size to {batch_size}.") | |
time.sleep(SLEEP) | |
return events_filter, batch_size | |
def block_number_to_timestamp(block_number: int, web3: Web3) -> str: | |
"""Convert a block number to a timestamp.""" | |
block = web3.eth.get_block(block_number) | |
timestamp = datetime.utcfromtimestamp(block["timestamp"]) | |
try: | |
timestamp_str = timestamp.strftime("%Y-%m-%d %H:%M:%S") | |
timestamp = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S.%f") | |
except Exception as e: | |
timestamp = datetime.utcfromtimestamp(block["timestamp"]) | |
return timestamp.strftime("%Y-%m-%d %H:%M:%S") | |
def parallelize_timestamp_conversion(df: pd.DataFrame, function: callable) -> list: | |
"""Parallelize the timestamp conversion.""" | |
block_numbers = df["request_block"].tolist() | |
with ThreadPoolExecutor(max_workers=10) as executor: | |
results = list( | |
tqdm(executor.map(function, block_numbers), total=len(block_numbers)) | |
) | |
return results | |
def updating_timestamps(rpc: str, tools_filename: str): | |
web3 = Web3(Web3.HTTPProvider(rpc)) | |
tools = pd.read_parquet(TMP_DIR / tools_filename) | |
# Convert block number to timestamp | |
print("Converting block number to timestamp") | |
t_map = pickle.load(open(TMP_DIR / "t_map.pkl", "rb")) | |
tools["request_time"] = tools["request_block"].map(t_map) | |
no_data = tools["request_time"].isna().sum() | |
print(f"Total rows with no request time info = {no_data}") | |
# Identify tools with missing request_time and fill them | |
missing_time_indices = tools[tools["request_time"].isna()].index | |
if not missing_time_indices.empty: | |
partial_block_number_to_timestamp = partial( | |
block_number_to_timestamp, web3=web3 | |
) | |
missing_timestamps = parallelize_timestamp_conversion( | |
tools.loc[missing_time_indices], partial_block_number_to_timestamp | |
) | |
# Update the original DataFrame with the missing timestamps | |
for i, timestamp in zip(missing_time_indices, missing_timestamps): | |
tools.at[i, "request_time"] = timestamp | |
tools["request_month_year"] = pd.to_datetime(tools["request_time"]).dt.strftime( | |
"%Y-%m" | |
) | |
tools["request_month_year_week"] = ( | |
pd.to_datetime(tools["request_time"]) | |
.dt.to_period("W") | |
.dt.start_time.dt.strftime("%b-%d-%Y") | |
) | |
# Save the tools data after the updates on the content | |
print(f"Updating file {tools_filename} with timestamps") | |
tools.to_parquet(TMP_DIR / tools_filename, index=False) | |
# Update t_map with new timestamps | |
new_timestamps = ( | |
tools[["request_block", "request_time"]] | |
.dropna() | |
.set_index("request_block") | |
.to_dict()["request_time"] | |
) | |
t_map.update(new_timestamps) | |
# filtering old timestamps | |
cutoff_date = datetime(2024, 9, 9) | |
filtered_map = { | |
k: v | |
for k, v in t_map.items() | |
if datetime.strptime(v, "%Y-%m-%d %H:%M:%S") < cutoff_date | |
} | |
with open(DATA_DIR / "t_map.pkl", "wb") as f: | |
pickle.dump(filtered_map, f) | |
# clean and release all memory | |
del tools | |
del t_map | |
gc.collect() | |
def query_conditional_tokens_gc_subgraph(creator: str) -> dict[str, Any]: | |
"""Query the subgraph.""" | |
subgraph = SUBGRAPH_URL.substitute(subgraph_api_key=SUBGRAPH_API_KEY) | |
all_results: dict[str, Any] = {"data": {"user": {"userPositions": []}}} | |
userPositions_id_gt = "" | |
while True: | |
query = conditional_tokens_gc_user_query.substitute( | |
id=creator.lower(), | |
first=QUERY_BATCH_SIZE, | |
userPositions_id_gt=userPositions_id_gt, | |
) | |
content_json = {"query": query} | |
# print("sending query to subgraph") | |
res = requests.post(subgraph, headers=headers, json=content_json) | |
result_json = res.json() | |
# print(f"result = {result_json}") | |
user_data = result_json.get("data", {}).get("user", {}) | |
if not user_data: | |
break | |
user_positions = user_data.get("userPositions", []) | |
if user_positions: | |
all_results["data"]["user"]["userPositions"].extend(user_positions) | |
userPositions_id_gt = user_positions[len(user_positions) - 1]["id"] | |
else: | |
break | |
if len(all_results["data"]["user"]["userPositions"]) == 0: | |
return {"data": {"user": None}} | |
return all_results | |
def query_omen_xdai_subgraph( | |
trader_category: str, | |
from_timestamp: float, | |
to_timestamp: float, | |
fpmm_from_timestamp: float, | |
fpmm_to_timestamp: float, | |
) -> dict[str, Any]: | |
"""Query the subgraph.""" | |
omen_subgraph = OMEN_SUBGRAPH_URL.substitute(subgraph_api_key=SUBGRAPH_API_KEY) | |
print(f"omen_subgraph = {omen_subgraph}") | |
grouped_results = defaultdict(list) | |
id_gt = "" | |
if trader_category == "quickstart": | |
creator_id = FPMM_QS_CREATOR.lower() | |
else: # pearl | |
creator_id = FPMM_PEARL_CREATOR.lower() | |
while True: | |
query = omen_xdai_trades_query.substitute( | |
fpmm_creator=creator_id, | |
creationTimestamp_gte=int(from_timestamp), | |
creationTimestamp_lte=int(to_timestamp), | |
fpmm_creationTimestamp_gte=int(fpmm_from_timestamp), | |
fpmm_creationTimestamp_lte=int(fpmm_to_timestamp), | |
first=QUERY_BATCH_SIZE, | |
id_gt=id_gt, | |
) | |
print(f"omen query={query}") | |
content_json = to_content(query) | |
res = requests.post(omen_subgraph, headers=headers, json=content_json) | |
result_json = res.json() | |
# print(f"result = {result_json}") | |
user_trades = result_json.get("data", {}).get("fpmmTrades", []) | |
if not user_trades: | |
break | |
for trade in user_trades: | |
fpmm_id = trade.get("fpmm", {}).get("id") | |
grouped_results[fpmm_id].append(trade) | |
id_gt = user_trades[len(user_trades) - 1]["id"] | |
all_results = { | |
"data": { | |
"fpmmTrades": [ | |
trade | |
for trades_list in grouped_results.values() | |
for trade in trades_list | |
] | |
} | |
} | |
return all_results | |
# def get_earliest_block(event_name: MechEventName) -> int: | |
# """Get the earliest block number to use when filtering for events.""" | |
# filename = gen_event_filename(event_name) | |
# if not os.path.exists(DATA_DIR / filename): | |
# return 0 | |
# df = pd.read_parquet(DATA_DIR / filename) | |
# block_field = f"{event_name.value.lower()}_{BLOCK_FIELD}" | |
# earliest_block = int(df[block_field].max()) | |
# # clean and release all memory | |
# del df | |
# gc.collect() | |
# return earliest_block | |