|
import sys |
|
import pickle |
|
import gc |
|
import time |
|
import requests |
|
from functools import partial |
|
from string import Template |
|
from datetime import datetime |
|
from concurrent.futures import ThreadPoolExecutor |
|
from collections import defaultdict |
|
from tqdm import tqdm |
|
from web3 import Web3 |
|
from typing import Any, Optional |
|
from web3.types import BlockParams |
|
from utils import JSON_DATA_DIR, DATA_DIR, SUBGRAPH_API_KEY, to_content |
|
from queries import conditional_tokens_gc_user_query, omen_xdai_trades_query |
|
import pandas as pd |
|
|
|
REDUCE_FACTOR = 0.25 |
|
SLEEP = 0.5 |
|
QUERY_BATCH_SIZE = 1000 |
|
FPMM_QS_CREATOR = "0x89c5cc945dd550bcffb72fe42bff002429f46fec" |
|
FPMM_PEARL_CREATOR = "0xFfc8029154ECD55ABED15BD428bA596E7D23f557" |
|
LATEST_BLOCK: Optional[int] = None |
|
LATEST_BLOCK_NAME: BlockParams = "latest" |
|
BLOCK_DATA_NUMBER = "number" |
|
BLOCKS_CHUNK_SIZE = 10_000 |
|
N_IPFS_RETRIES = 1 |
|
N_RPC_RETRIES = 100 |
|
RPC_POLL_INTERVAL = 0.05 |
|
|
|
IPFS_POLL_INTERVAL = 0.2 |
|
|
|
headers = { |
|
"Accept": "application/json, multipart/mixed", |
|
"Content-Type": "application/json", |
|
} |
|
|
|
|
|
def parse_args() -> str: |
|
"""Parse the arguments and return the RPC.""" |
|
if len(sys.argv) != 2: |
|
raise ValueError("Expected the RPC as a positional argument.") |
|
return sys.argv[1] |
|
|
|
|
|
def read_abi(abi_path: str) -> str: |
|
"""Read and return the wxDAI contract's ABI.""" |
|
with open(abi_path) as abi_file: |
|
return abi_file.read() |
|
|
|
|
|
def update_block_request_map(block_request_id_map: dict) -> None: |
|
print("Saving block request id map info") |
|
with open(JSON_DATA_DIR / "block_request_id_map.pickle", "wb") as handle: |
|
pickle.dump(block_request_id_map, handle, protocol=pickle.HIGHEST_PROTOCOL) |
|
|
|
|
|
def reduce_window(contract_instance, event, from_block, batch_size, latest_block): |
|
"""Dynamically reduce the batch size window.""" |
|
keep_fraction = 1 - REDUCE_FACTOR |
|
events_filter = contract_instance.events[event].build_filter() |
|
events_filter.fromBlock = from_block |
|
batch_size = int(batch_size * keep_fraction) |
|
events_filter.toBlock = min(from_block + batch_size, latest_block) |
|
tqdm.write(f"RPC timed out! Resizing batch size to {batch_size}.") |
|
time.sleep(SLEEP) |
|
return events_filter, batch_size |
|
|
|
|
|
def block_number_to_timestamp(block_number: int, web3: Web3) -> str: |
|
"""Convert a block number to a timestamp.""" |
|
block = web3.eth.get_block(block_number) |
|
timestamp = datetime.utcfromtimestamp(block["timestamp"]) |
|
try: |
|
timestamp_str = timestamp.strftime("%Y-%m-%d %H:%M:%S") |
|
timestamp = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S.%f") |
|
except Exception as e: |
|
timestamp = datetime.utcfromtimestamp(block["timestamp"]) |
|
return timestamp.strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
def parallelize_timestamp_conversion(df: pd.DataFrame, function: callable) -> list: |
|
"""Parallelize the timestamp conversion.""" |
|
block_numbers = df["request_block"].tolist() |
|
with ThreadPoolExecutor(max_workers=10) as executor: |
|
results = list( |
|
tqdm(executor.map(function, block_numbers), total=len(block_numbers)) |
|
) |
|
return results |
|
|
|
|
|
def updating_timestamps(rpc: str, tools_filename: str): |
|
web3 = Web3(Web3.HTTPProvider(rpc)) |
|
|
|
tools = pd.read_parquet(DATA_DIR / tools_filename) |
|
|
|
|
|
print("Converting block number to timestamp") |
|
t_map = pickle.load(open(DATA_DIR / "t_map.pkl", "rb")) |
|
tools["request_time"] = tools["request_block"].map(t_map) |
|
|
|
no_data = tools["request_time"].isna().sum() |
|
print(f"Total rows with no request time info = {no_data}") |
|
|
|
|
|
missing_time_indices = tools[tools["request_time"].isna()].index |
|
if not missing_time_indices.empty: |
|
partial_block_number_to_timestamp = partial( |
|
block_number_to_timestamp, web3=web3 |
|
) |
|
missing_timestamps = parallelize_timestamp_conversion( |
|
tools.loc[missing_time_indices], partial_block_number_to_timestamp |
|
) |
|
|
|
|
|
for i, timestamp in zip(missing_time_indices, missing_timestamps): |
|
tools.at[i, "request_time"] = timestamp |
|
|
|
tools["request_month_year"] = pd.to_datetime(tools["request_time"]).dt.strftime( |
|
"%Y-%m" |
|
) |
|
tools["request_month_year_week"] = ( |
|
pd.to_datetime(tools["request_time"]).dt.to_period("W").astype(str) |
|
) |
|
|
|
|
|
print(f"Updating file {tools_filename} with timestamps") |
|
tools.to_parquet(DATA_DIR / tools_filename, index=False) |
|
|
|
|
|
new_timestamps = ( |
|
tools[["request_block", "request_time"]] |
|
.dropna() |
|
.set_index("request_block") |
|
.to_dict()["request_time"] |
|
) |
|
t_map.update(new_timestamps) |
|
|
|
|
|
cutoff_date = datetime(2024, 9, 9) |
|
filtered_map = { |
|
k: v |
|
for k, v in t_map.items() |
|
if datetime.strptime(v, "%Y-%m-%d %H:%M:%S") < cutoff_date |
|
} |
|
|
|
with open(DATA_DIR / "t_map.pkl", "wb") as f: |
|
pickle.dump(filtered_map, f) |
|
|
|
|
|
del tools |
|
del t_map |
|
gc.collect() |
|
|
|
|
|
def query_conditional_tokens_gc_subgraph(creator: str) -> dict[str, Any]: |
|
"""Query the subgraph.""" |
|
SUBGRAPH_URL = Template( |
|
"""https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/7s9rGBffUTL8kDZuxvvpuc46v44iuDarbrADBFw5uVp2""" |
|
) |
|
subgraph = SUBGRAPH_URL.substitute(subgraph_api_key=SUBGRAPH_API_KEY) |
|
all_results: dict[str, Any] = {"data": {"user": {"userPositions": []}}} |
|
userPositions_id_gt = "" |
|
while True: |
|
query = conditional_tokens_gc_user_query.substitute( |
|
id=creator.lower(), |
|
first=QUERY_BATCH_SIZE, |
|
userPositions_id_gt=userPositions_id_gt, |
|
) |
|
content_json = {"query": query} |
|
print("sending query to subgraph") |
|
res = requests.post(subgraph, headers=headers, json=content_json) |
|
result_json = res.json() |
|
|
|
user_data = result_json.get("data", {}).get("user", {}) |
|
|
|
if not user_data: |
|
break |
|
|
|
user_positions = user_data.get("userPositions", []) |
|
|
|
if user_positions: |
|
all_results["data"]["user"]["userPositions"].extend(user_positions) |
|
userPositions_id_gt = user_positions[len(user_positions) - 1]["id"] |
|
else: |
|
break |
|
|
|
if len(all_results["data"]["user"]["userPositions"]) == 0: |
|
return {"data": {"user": None}} |
|
|
|
return all_results |
|
|
|
|
|
def query_omen_xdai_subgraph( |
|
trader_category: str, |
|
from_timestamp: float, |
|
to_timestamp: float, |
|
fpmm_from_timestamp: float, |
|
fpmm_to_timestamp: float, |
|
) -> dict[str, Any]: |
|
"""Query the subgraph.""" |
|
OMEN_SUBGRAPH_URL = Template( |
|
"""https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/9fUVQpFwzpdWS9bq5WkAnmKbNNcoBwatMR4yZq81pbbz""" |
|
) |
|
omen_subgraph = OMEN_SUBGRAPH_URL.substitute(subgraph_api_key=SUBGRAPH_API_KEY) |
|
print(f"omen_subgraph = {omen_subgraph}") |
|
grouped_results = defaultdict(list) |
|
id_gt = "" |
|
if trader_category == "quickstart": |
|
creator_id = FPMM_QS_CREATOR.lower() |
|
else: |
|
creator_id = FPMM_PEARL_CREATOR.lower() |
|
|
|
while True: |
|
query = omen_xdai_trades_query.substitute( |
|
fpmm_creator=creator_id, |
|
creationTimestamp_gte=int(from_timestamp), |
|
creationTimestamp_lte=int(to_timestamp), |
|
fpmm_creationTimestamp_gte=int(fpmm_from_timestamp), |
|
fpmm_creationTimestamp_lte=int(fpmm_to_timestamp), |
|
first=QUERY_BATCH_SIZE, |
|
id_gt=id_gt, |
|
) |
|
content_json = to_content(query) |
|
|
|
res = requests.post(omen_subgraph, headers=headers, json=content_json) |
|
result_json = res.json() |
|
|
|
user_trades = result_json.get("data", {}).get("fpmmTrades", []) |
|
|
|
if not user_trades: |
|
break |
|
|
|
for trade in user_trades: |
|
fpmm_id = trade.get("fpmm", {}).get("id") |
|
grouped_results[fpmm_id].append(trade) |
|
|
|
id_gt = user_trades[len(user_trades) - 1]["id"] |
|
|
|
all_results = { |
|
"data": { |
|
"fpmmTrades": [ |
|
trade |
|
for trades_list in grouped_results.values() |
|
for trade in trades_list |
|
] |
|
} |
|
} |
|
|
|
return all_results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|