rosacastillo's picture
updating week format starting on Monday, new staking contracts and new weekly data
285f2a6
raw
history blame
6.35 kB
from web3 import Web3
import os
import requests
import time
import pickle
from datetime import datetime, timezone
from functools import partial
import pandas as pd
import pytz
from tqdm import tqdm
from utils import DATA_DIR, TMP_DIR, measure_execution_time
from concurrent.futures import ThreadPoolExecutor
GNOSIS_API_INTERVAL = 0.2 # 5 calls in 1 second
GNOSIS_URL = "https://api.gnosisscan.io/api"
GNOSIS_API_KEY = os.environ.get("GNOSIS_API_KEY", None)
# https://api.gnosisscan.io/api?module=account&action=txlist&address=0x1fe2b09de07475b1027b0c73a5bf52693b31a52e&startblock=36626348&endblock=36626348&page=1&offset=10&sort=asc&apikey=${gnosis_api_key}""
# Connect to Gnosis Chain RPC
w3 = Web3(Web3.HTTPProvider("https://rpc.gnosischain.com"))
def parallelize_timestamp_computation(df: pd.DataFrame, function: callable) -> list:
"""Parallelize the timestamp conversion."""
tx_hashes = df["tx_hash"].tolist()
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(tqdm(executor.map(function, tx_hashes), total=len(tx_hashes)))
return results
def transform_timestamp_to_datetime(timestamp):
dt = datetime.fromtimestamp(timestamp, timezone.utc)
return dt
def get_tx_hash(trader_address, request_block):
"""Function to get the transaction hash from the address and block number"""
params = {
"module": "account",
"action": "txlist",
"address": trader_address,
"page": 1,
"offset": 100,
"startblock": request_block,
"endblock": request_block,
"sort": "asc",
"apikey": GNOSIS_API_KEY,
}
try:
response = requests.get(GNOSIS_URL, params=params)
tx_list = response.json()["result"]
time.sleep(GNOSIS_API_INTERVAL)
if len(tx_list) > 1:
raise ValueError("More than one transaction found")
return tx_list[0]["hash"]
except Exception as e:
return None
def add_tx_hash_info(filename: str = "tools.parquet"):
"""Function to add the hash info to the saved tools parquet file"""
tools = pd.read_parquet(DATA_DIR / filename)
tools["tx_hash"] = None
total_errors = 0
for i, mech_request in tqdm(
tools.iterrows(), total=len(tools), desc="Adding tx hash"
):
try:
trader_address = mech_request["trader_address"]
block_number = mech_request["request_block"]
tools.at[i, "tx_hash"] = get_tx_hash(
trader_address=trader_address, request_block=block_number
)
except Exception as e:
print(f"Error with mech request {mech_request}")
total_errors += 1
continue
print(f"Total number of errors = {total_errors}")
tools.to_parquet(DATA_DIR / filename)
def get_transaction_timestamp(tx_hash: str, web3: Web3):
try:
# Get transaction data
tx = web3.eth.get_transaction(tx_hash)
# Get block data
block = web3.eth.get_block(tx["blockNumber"])
# Get timestamp
timestamp = block["timestamp"]
# Convert to datetime
dt = datetime.fromtimestamp(timestamp, tz=pytz.UTC)
# return {
# "timestamp": timestamp,
# "datetime": dt,
# "from_address": tx["from"],
# "to_address": tx["to"],
# "success": True,
# }
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception as e:
print(f"Error getting the timestamp from {tx_hash}")
return None
@measure_execution_time
def compute_request_time(tools_df: pd.DataFrame) -> pd.DataFrame:
"""Function to compute the request timestamp from the tx hash"""
# read the local info
try:
gnosis_info = pickle.load(open(TMP_DIR / "gnosis_info.pkl", "rb"))
except Exception:
print("File not found or not created. Creating a new one")
gnosis_info = {}
# any previous information?
tools_df["request_time"] = tools_df["tx_hash"].map(gnosis_info)
# Identify tools with missing request_time and fill them
missing_time_indices = tools_df[tools_df["request_time"].isna()].index
print(f"length of missing_time_indices = {len(missing_time_indices)}")
# traverse all tx hashes and get the timestamp of each tx
partial_mech_request_timestamp = partial(get_transaction_timestamp, web3=w3)
missing_timestamps = parallelize_timestamp_computation(
tools_df.loc[missing_time_indices], partial_mech_request_timestamp
)
# Update the original DataFrame with the missing timestamps
for i, timestamp in zip(missing_time_indices, missing_timestamps):
tools_df.at[i, "request_time"] = timestamp
# creating other time fields
tools_df["request_month_year"] = pd.to_datetime(
tools_df["request_time"]
).dt.strftime("%Y-%m")
tools_df["request_month_year_week"] = (
pd.to_datetime(tools_df["request_time"])
.dt.to_period("W")
.dt.start_time.dt.strftime("%b-%d-%Y")
)
# Update t_map with new timestamps
new_timestamps = (
tools_df[["tx_hash", "request_time"]]
.dropna()
.set_index("tx_hash")
.to_dict()["request_time"]
)
gnosis_info.update(new_timestamps)
# saving gnosis info
with open(TMP_DIR / "gnosis_info.pkl", "wb") as f:
pickle.dump(gnosis_info, f)
return tools_df
def get_account_details(address):
# gnosis_url = GNOSIS_URL.substitute(gnosis_api_key=GNOSIS_API_KEY, tx_hash=tx_hash)
params = {
"module": "account",
"action": "txlistinternal",
"address": address,
#'page': 1,
#'offset': 100,
#'startblock': 0,
#'endblock': 9999999999,
#'sort': 'asc',
"apikey": GNOSIS_API_KEY,
}
try:
response = requests.get(GNOSIS_URL, params=params)
return response.json()
except Exception as e:
return {"error": str(e)}
if __name__ == "__main__":
# tx_data = "0x783BFA045BDE2D0BCD65280D97A29E7BD9E4FDC10985848690C9797E767140F4"
new_tools = pd.read_parquet(DATA_DIR / "new_tools.parquet")
new_tools = compute_request_time(new_tools)
new_tools.to_parquet(DATA_DIR / "new_tools.parquet")
# result = get_tx_hash("0x1fe2b09de07475b1027b0c73a5bf52693b31a52e", 36626348)
# print(result)