removed dependency with tools.parquet and new mech calls computation timestamps based
278fab8
import json | |
import sys | |
from typing import Any, List | |
from utils import RPC, DATA_DIR, TMP_DIR | |
import requests | |
from tqdm import tqdm | |
from web3 import Web3 | |
import pandas as pd | |
import pickle | |
import os | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
NUM_WORKERS = 10 | |
DEPRECATED_STAKING_PROGRAMS = { | |
"quickstart_alpha_everest": "0x5add592ce0a1B5DceCebB5Dcac086Cd9F9e3eA5C", | |
"quickstart_alpha_alpine": "0x2Ef503950Be67a98746F484DA0bBAdA339DF3326", | |
"quickstart_alpha_coastal": "0x43fB32f25dce34EB76c78C7A42C8F40F84BCD237", | |
} | |
STAKING_PROGRAMS_QS = { | |
"quickstart_beta_hobbyist": "0x389B46c259631Acd6a69Bde8B6cEe218230bAE8C", | |
"quickstart_beta_hobbyist_2": "0x238EB6993b90a978ec6AAD7530d6429c949C08DA", | |
"quickstart_beta_expert": "0x5344B7DD311e5d3DdDd46A4f71481bD7b05AAA3e", | |
"quickstart_beta_expert_2": "0xb964e44c126410df341ae04B13aB10A985fE3513", | |
"quickstart_beta_expert_3": "0x80faD33Cadb5F53f9D29F02Db97D682E8b101618", | |
} | |
STAKING_PROGRAMS_PEARL = { | |
"pearl_alpha": "0xEE9F19b5DF06c7E8Bfc7B28745dcf944C504198A", | |
"pearl_beta": "0xeF44Fb0842DDeF59D37f85D61A1eF492bbA6135d", | |
"pearl_beta_2": "0x1c2F82413666d2a3fD8bC337b0268e62dDF67434", | |
} | |
SERVICE_REGISTRY_ADDRESS = "0x9338b5153AE39BB89f50468E608eD9d764B755fD" | |
def _get_contract(address: str) -> Any: | |
w3 = Web3(Web3.HTTPProvider(RPC)) | |
abi = _get_abi(address) | |
contract = w3.eth.contract(address=Web3.to_checksum_address(address), abi=abi) | |
return contract | |
def _get_abi(address: str) -> List: | |
contract_abi_url = ( | |
"https://gnosis.blockscout.com/api/v2/smart-contracts/{contract_address}" | |
) | |
response = requests.get(contract_abi_url.format(contract_address=address)).json() | |
if "result" in response: | |
result = response["result"] | |
try: | |
abi = json.loads(result) | |
except json.JSONDecodeError: | |
print("Error: Failed to parse 'result' field as JSON") | |
sys.exit(1) | |
else: | |
abi = response.get("abi") | |
return abi if abi else [] | |
def get_service_safe(service_id: int) -> str: | |
"""Gets the service Safe""" | |
service_registry = _get_contract(SERVICE_REGISTRY_ADDRESS) | |
service_safe_address = service_registry.functions.getService(service_id).call()[1] | |
return service_safe_address | |
def list_contract_functions(contract): | |
function_names = [] | |
for item in contract.abi: | |
if item.get("type") == "function": | |
function_names.append(item.get("name")) | |
return function_names | |
def get_service_data(service_registry: Any, service_id: int) -> dict: | |
tmp_map = {} | |
# Get the list of addresses | |
# print(f"getting addresses from service id ={service_id}") | |
# available_functions = list_contract_functions(service_registry) | |
# print("Available Contract Functions:") | |
# for func in available_functions: | |
# print(f"- {func}") | |
data = service_registry.functions.getService(service_id).call() | |
try: | |
owner_data = service_registry.functions.ownerOf(service_id).call() | |
except Exception as e: | |
tqdm.write(f"Error: no owner data infor from {service_id}") | |
return None | |
# print(f"owner data = {owner_data}") | |
address = data[1] | |
state = data[-1] | |
# print(f"address = {address}") | |
# print(f"state={state}") | |
if address != "0x0000000000000000000000000000000000000000": | |
tmp_map[service_id] = { | |
"safe_address": address, | |
"state": state, | |
"owner_address": owner_data, | |
} | |
return tmp_map | |
def update_service_map(start: int = 1, end: int = 1000): | |
if os.path.exists(DATA_DIR / "service_map.pkl"): | |
with open(DATA_DIR / "service_map.pkl", "rb") as f: | |
service_map = pickle.load(f) | |
else: | |
service_map = {} | |
print(f"updating service map from service id={start}") | |
# we do not know which is the last service id right now | |
service_registry = _get_contract(SERVICE_REGISTRY_ADDRESS) | |
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: | |
futures = [] | |
for service_id in range(start, end): | |
futures.append( | |
executor.submit( | |
get_service_data, | |
service_registry, | |
service_id, | |
) | |
) | |
for future in tqdm( | |
as_completed(futures), | |
total=len(futures), | |
desc=f"Fetching all service data from contracts", | |
): | |
partial_dict = future.result() | |
if partial_dict: | |
service_map.update(partial_dict) | |
with open(DATA_DIR / "service_map.pkl", "wb") as f: | |
pickle.dump(service_map, f) | |
def check_owner_staking_contract(owner_address: str) -> str: | |
staking = "non_staking" | |
owner_address = owner_address.lower() | |
# check quickstart staking contracts | |
qs_list = [x.lower() for x in STAKING_PROGRAMS_QS.values()] | |
if owner_address in qs_list: | |
return "quickstart" | |
# check pearl staking contracts | |
pearl_list = [x.lower() for x in STAKING_PROGRAMS_PEARL.values()] | |
if owner_address in pearl_list: | |
return "pearl" | |
# check legacy staking contracts | |
deprec_list = [x.lower() for x in DEPRECATED_STAKING_PROGRAMS.values()] | |
if owner_address in deprec_list: | |
return "quickstart" | |
return staking | |
def get_trader_address_staking(trader_address: str, service_map: dict) -> str: | |
# check if there is any service id linked with that trader address | |
found_key = -1 | |
for key, value in service_map.items(): | |
if value["safe_address"].lower() == trader_address.lower(): | |
# found a service | |
found_key = key | |
break | |
if found_key == -1: | |
return "non_Olas" | |
owner = service_map[found_key]["owner_address"] | |
return check_owner_staking_contract(owner_address=owner) | |
def label_trades_by_staking(trades_df: pd.DataFrame, start: int = None) -> None: | |
with open(DATA_DIR / "service_map.pkl", "rb") as f: | |
service_map = pickle.load(f) | |
# get the last service id | |
keys = service_map.keys() | |
if start is None: | |
last_key = max(keys) | |
else: | |
last_key = start | |
print(f"last service key = {last_key}") | |
update_service_map(start=last_key) | |
all_traders = trades_df.trader_address.unique() | |
trades_df["staking"] = "" | |
for trader in tqdm(all_traders, desc="Labeling traders by staking", unit="trader"): | |
# tqdm.write(f"checking trader {trader}") | |
staking_label = get_trader_address_staking(trader, service_map) | |
if staking_label: | |
trades_df.loc[trades_df["trader_address"] == trader, "staking"] = ( | |
staking_label | |
) | |
# tqdm.write(f"statking label {staking_label}") | |
return trades_df | |
if __name__ == "__main__": | |
# create_service_map() | |
trades_df = pd.read_parquet(TMP_DIR / "all_trades_df.parquet") | |
trades_df = trades_df.loc[trades_df["is_invalid"] == False] | |
trades_df = label_trades_by_staking(trades_df=trades_df, start=8) | |
print(trades_df.staking.value_counts()) | |
trades_df.to_parquet(TMP_DIR / "result_staking.parquet", index=False) | |