import sys |
import pickle |
import gc |
import time |
import requests |
from functools import partial |
from string import Template |
from datetime import datetime |
from concurrent.futures import ThreadPoolExecutor |
from collections import defaultdict |
from tqdm import tqdm |
from web3 import Web3 |
from typing import Any, Optional |
from web3.types import BlockParams |
from utils import JSON_DATA_DIR, DATA_DIR, SUBGRAPH_API_KEY, to_content |
from queries import conditional_tokens_gc_user_query, omen_xdai_trades_query |
import pandas as pd |
SLEEP = 0.5 |
FPMM_QS_CREATOR = "0x89c5cc945dd550bcffb72fe42bff002429f46fec" |
FPMM_PEARL_CREATOR = "0xFfc8029154ECD55ABED15BD428bA596E7D23f557" |
LATEST_BLOCK: Optional[int] = None |
LATEST_BLOCK_NAME: BlockParams = "latest" |
BLOCK_DATA_NUMBER = "number" |
headers = { |
"Accept": "application/json, multipart/mixed", |
"Content-Type": "application/json", |
} |
def parse_args() -> str: |
"""Parse the arguments and return the RPC.""" |
if len(sys.argv) != 2: |
raise ValueError("Expected the RPC as a positional argument.") |
return sys.argv[1] |
def read_abi(abi_path: str) -> str: |
"""Read and return the wxDAI contract's ABI.""" |
with open(abi_path) as abi_file: |
return abi_file.read() |
def update_block_request_map(block_request_id_map: dict) -> None: |
print("Saving block request id map info") |
with open(JSON_DATA_DIR / "block_request_id_map.pickle", "wb") as handle: |
pickle.dump(block_request_id_map, handle, protocol=pickle.HIGHEST_PROTOCOL) |
def reduce_window(contract_instance, event, from_block, batch_size, latest_block): |
"""Dynamically reduce the batch size window.""" |
keep_fraction = 1 - REDUCE_FACTOR |
events_filter = contract_instance.events[event].build_filter() |
events_filter.fromBlock = from_block |
batch_size = int(batch_size * keep_fraction) |
events_filter.toBlock = min(from_block + batch_size, latest_block) |
tqdm.write(f"RPC timed out! Resizing batch size to {batch_size}.") |
time.sleep(SLEEP) |
return events_filter, batch_size |
def block_number_to_timestamp(block_number: int, web3: Web3) -> str: |
"""Convert a block number to a timestamp.""" |
block = web3.eth.get_block(block_number) |
timestamp = datetime.utcfromtimestamp(block["timestamp"]) |
try: |
timestamp_str = timestamp.strftime("%Y-%m-%d %H:%M:%S") |
timestamp = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S.%f") |
except Exception as e: |
timestamp = datetime.utcfromtimestamp(block["timestamp"]) |
return timestamp.strftime("%Y-%m-%d %H:%M:%S") |
def parallelize_timestamp_conversion(df: pd.DataFrame, function: callable) -> list: |
"""Parallelize the timestamp conversion.""" |
block_numbers = df["request_block"].tolist() |
with ThreadPoolExecutor(max_workers=10) as executor: |
results = list( |
tqdm(executor.map(function, block_numbers), total=len(block_numbers)) |
) |
return results |
def updating_timestamps(rpc: str, tools_filename: str): |
web3 = Web3(Web3.HTTPProvider(rpc)) |
tools = pd.read_parquet(DATA_DIR / tools_filename) |
print("Converting block number to timestamp") |
t_map = pickle.load(open(DATA_DIR / "t_map.pkl", "rb")) |
tools["request_time"] = tools["request_block"].map(t_map) |
no_data = tools["request_time"].isna().sum() |
print(f"Total rows with no request time info = {no_data}") |
missing_time_indices = tools[tools["request_time"].isna()].index |
if not missing_time_indices.empty: |
partial_block_number_to_timestamp = partial( |
block_number_to_timestamp, web3=web3 |
) |
missing_timestamps = parallelize_timestamp_conversion( |
tools.loc[missing_time_indices], partial_block_number_to_timestamp |
) |
for i, timestamp in zip(missing_time_indices, missing_timestamps): |
tools.at[i, "request_time"] = timestamp |
tools["request_month_year"] = pd.to_datetime(tools["request_time"]).dt.strftime( |
"%Y-%m" |
) |
tools["request_month_year_week"] = ( |
pd.to_datetime(tools["request_time"]).dt.to_period("W").astype(str) |
) |
print(f"Updating file {tools_filename} with timestamps") |
tools.to_parquet(DATA_DIR / tools_filename, index=False) |
new_timestamps = ( |
tools[["request_block", "request_time"]] |
.dropna() |
.set_index("request_block") |
.to_dict()["request_time"] |
) |
t_map.update(new_timestamps) |
cutoff_date = datetime(2024, 9, 9) |
filtered_map = { |
k: v |
for k, v in t_map.items() |
if datetime.strptime(v, "%Y-%m-%d %H:%M:%S") < cutoff_date |
} |
with open(DATA_DIR / "t_map.pkl", "wb") as f: |
pickle.dump(filtered_map, f) |
del tools |
del t_map |
gc.collect() |
def query_conditional_tokens_gc_subgraph(creator: str) -> dict[str, Any]: |
"""Query the subgraph.""" |
SUBGRAPH_URL = Template( |
"""https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/7s9rGBffUTL8kDZuxvvpuc46v44iuDarbrADBFw5uVp2""" |
) |
subgraph = SUBGRAPH_URL.substitute(subgraph_api_key=SUBGRAPH_API_KEY) |
all_results: dict[str, Any] = {"data": {"user": {"userPositions": []}}} |
userPositions_id_gt = "" |
while True: |
query = conditional_tokens_gc_user_query.substitute( |
id=creator.lower(), |
userPositions_id_gt=userPositions_id_gt, |
) |
content_json = {"query": query} |
print("sending query to subgraph") |
res = requests.post(subgraph, headers=headers, json=content_json) |
result_json = res.json() |
user_data = result_json.get("data", {}).get("user", {}) |
if not user_data: |
break |
user_positions = user_data.get("userPositions", []) |
if user_positions: |
all_results["data"]["user"]["userPositions"].extend(user_positions) |
userPositions_id_gt = user_positions[len(user_positions) - 1]["id"] |
else: |
break |
if len(all_results["data"]["user"]["userPositions"]) == 0: |
return {"data": {"user": None}} |
return all_results |
def query_omen_xdai_subgraph( |
trader_category: str, |
from_timestamp: float, |
to_timestamp: float, |
fpmm_from_timestamp: float, |
fpmm_to_timestamp: float, |
) -> dict[str, Any]: |
"""Query the subgraph.""" |
"""https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/9fUVQpFwzpdWS9bq5WkAnmKbNNcoBwatMR4yZq81pbbz""" |
) |
omen_subgraph = OMEN_SUBGRAPH_URL.substitute(subgraph_api_key=SUBGRAPH_API_KEY) |
print(f"omen_subgraph = {omen_subgraph}") |
grouped_results = defaultdict(list) |
id_gt = "" |
if trader_category == "quickstart": |
creator_id = FPMM_QS_CREATOR.lower() |
else: |
creator_id = FPMM_PEARL_CREATOR.lower() |
while True: |
query = omen_xdai_trades_query.substitute( |
fpmm_creator=creator_id, |
creationTimestamp_gte=int(from_timestamp), |
creationTimestamp_lte=int(to_timestamp), |
fpmm_creationTimestamp_gte=int(fpmm_from_timestamp), |
fpmm_creationTimestamp_lte=int(fpmm_to_timestamp), |
id_gt=id_gt, |
) |
content_json = to_content(query) |
res = requests.post(omen_subgraph, headers=headers, json=content_json) |
result_json = res.json() |
user_trades = result_json.get("data", {}).get("fpmmTrades", []) |
if not user_trades: |
break |
for trade in user_trades: |
fpmm_id = trade.get("fpmm", {}).get("id") |
grouped_results[fpmm_id].append(trade) |
id_gt = user_trades[len(user_trades) - 1]["id"] |
all_results = { |
"data": { |
"fpmmTrades": [ |
trade |
for trades_list in grouped_results.values() |
for trade in trades_list |
] |
} |
} |
return all_results |