updating week format starting on Monday, new staking contracts and new weekly data
285f2a6
from web3 import Web3 | |
import os | |
import requests | |
import time | |
import pickle | |
from datetime import datetime, timezone | |
from functools import partial | |
import pandas as pd | |
import pytz | |
from tqdm import tqdm | |
from utils import DATA_DIR, TMP_DIR, measure_execution_time | |
from concurrent.futures import ThreadPoolExecutor | |
GNOSIS_API_INTERVAL = 0.2 # 5 calls in 1 second | |
GNOSIS_URL = "https://api.gnosisscan.io/api" | |
GNOSIS_API_KEY = os.environ.get("GNOSIS_API_KEY", None) | |
# https://api.gnosisscan.io/api?module=account&action=txlist&address=0x1fe2b09de07475b1027b0c73a5bf52693b31a52e&startblock=36626348&endblock=36626348&page=1&offset=10&sort=asc&apikey=${gnosis_api_key}"" | |
# Connect to Gnosis Chain RPC | |
w3 = Web3(Web3.HTTPProvider("https://rpc.gnosischain.com")) | |
def parallelize_timestamp_computation(df: pd.DataFrame, function: callable) -> list: | |
"""Parallelize the timestamp conversion.""" | |
tx_hashes = df["tx_hash"].tolist() | |
with ThreadPoolExecutor(max_workers=10) as executor: | |
results = list(tqdm(executor.map(function, tx_hashes), total=len(tx_hashes))) | |
return results | |
def transform_timestamp_to_datetime(timestamp): | |
dt = datetime.fromtimestamp(timestamp, timezone.utc) | |
return dt | |
def get_tx_hash(trader_address, request_block): | |
"""Function to get the transaction hash from the address and block number""" | |
params = { | |
"module": "account", | |
"action": "txlist", | |
"address": trader_address, | |
"page": 1, | |
"offset": 100, | |
"startblock": request_block, | |
"endblock": request_block, | |
"sort": "asc", | |
"apikey": GNOSIS_API_KEY, | |
} | |
try: | |
response = requests.get(GNOSIS_URL, params=params) | |
tx_list = response.json()["result"] | |
time.sleep(GNOSIS_API_INTERVAL) | |
if len(tx_list) > 1: | |
raise ValueError("More than one transaction found") | |
return tx_list[0]["hash"] | |
except Exception as e: | |
return None | |
def add_tx_hash_info(filename: str = "tools.parquet"): | |
"""Function to add the hash info to the saved tools parquet file""" | |
tools = pd.read_parquet(DATA_DIR / filename) | |
tools["tx_hash"] = None | |
total_errors = 0 | |
for i, mech_request in tqdm( | |
tools.iterrows(), total=len(tools), desc="Adding tx hash" | |
): | |
try: | |
trader_address = mech_request["trader_address"] | |
block_number = mech_request["request_block"] | |
tools.at[i, "tx_hash"] = get_tx_hash( | |
trader_address=trader_address, request_block=block_number | |
) | |
except Exception as e: | |
print(f"Error with mech request {mech_request}") | |
total_errors += 1 | |
continue | |
print(f"Total number of errors = {total_errors}") | |
tools.to_parquet(DATA_DIR / filename) | |
def get_transaction_timestamp(tx_hash: str, web3: Web3): | |
try: | |
# Get transaction data | |
tx = web3.eth.get_transaction(tx_hash) | |
# Get block data | |
block = web3.eth.get_block(tx["blockNumber"]) | |
# Get timestamp | |
timestamp = block["timestamp"] | |
# Convert to datetime | |
dt = datetime.fromtimestamp(timestamp, tz=pytz.UTC) | |
# return { | |
# "timestamp": timestamp, | |
# "datetime": dt, | |
# "from_address": tx["from"], | |
# "to_address": tx["to"], | |
# "success": True, | |
# } | |
return dt.strftime("%Y-%m-%d %H:%M:%S") | |
except Exception as e: | |
print(f"Error getting the timestamp from {tx_hash}") | |
return None | |
def compute_request_time(tools_df: pd.DataFrame) -> pd.DataFrame: | |
"""Function to compute the request timestamp from the tx hash""" | |
# read the local info | |
try: | |
gnosis_info = pickle.load(open(TMP_DIR / "gnosis_info.pkl", "rb")) | |
except Exception: | |
print("File not found or not created. Creating a new one") | |
gnosis_info = {} | |
# any previous information? | |
tools_df["request_time"] = tools_df["tx_hash"].map(gnosis_info) | |
# Identify tools with missing request_time and fill them | |
missing_time_indices = tools_df[tools_df["request_time"].isna()].index | |
print(f"length of missing_time_indices = {len(missing_time_indices)}") | |
# traverse all tx hashes and get the timestamp of each tx | |
partial_mech_request_timestamp = partial(get_transaction_timestamp, web3=w3) | |
missing_timestamps = parallelize_timestamp_computation( | |
tools_df.loc[missing_time_indices], partial_mech_request_timestamp | |
) | |
# Update the original DataFrame with the missing timestamps | |
for i, timestamp in zip(missing_time_indices, missing_timestamps): | |
tools_df.at[i, "request_time"] = timestamp | |
# creating other time fields | |
tools_df["request_month_year"] = pd.to_datetime( | |
tools_df["request_time"] | |
).dt.strftime("%Y-%m") | |
tools_df["request_month_year_week"] = ( | |
pd.to_datetime(tools_df["request_time"]) | |
.dt.to_period("W") | |
.dt.start_time.dt.strftime("%b-%d-%Y") | |
) | |
# Update t_map with new timestamps | |
new_timestamps = ( | |
tools_df[["tx_hash", "request_time"]] | |
.dropna() | |
.set_index("tx_hash") | |
.to_dict()["request_time"] | |
) | |
gnosis_info.update(new_timestamps) | |
# saving gnosis info | |
with open(TMP_DIR / "gnosis_info.pkl", "wb") as f: | |
pickle.dump(gnosis_info, f) | |
return tools_df | |
def get_account_details(address): | |
# gnosis_url = GNOSIS_URL.substitute(gnosis_api_key=GNOSIS_API_KEY, tx_hash=tx_hash) | |
params = { | |
"module": "account", | |
"action": "txlistinternal", | |
"address": address, | |
#'page': 1, | |
#'offset': 100, | |
#'startblock': 0, | |
#'endblock': 9999999999, | |
#'sort': 'asc', | |
"apikey": GNOSIS_API_KEY, | |
} | |
try: | |
response = requests.get(GNOSIS_URL, params=params) | |
return response.json() | |
except Exception as e: | |
return {"error": str(e)} | |
if __name__ == "__main__": | |
# tx_data = "0x783BFA045BDE2D0BCD65280D97A29E7BD9E4FDC10985848690C9797E767140F4" | |
new_tools = pd.read_parquet(DATA_DIR / "new_tools.parquet") | |
new_tools = compute_request_time(new_tools) | |
new_tools.to_parquet(DATA_DIR / "new_tools.parquet") | |
# result = get_tx_hash("0x1fe2b09de07475b1027b0c73a5bf52693b31a52e", 36626348) | |
# print(result) | |