File size: 6,348 Bytes
278fab8 285f2a6 278fab8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
from web3 import Web3
import os
import requests
import time
import pickle
from datetime import datetime, timezone
from functools import partial
import pandas as pd
import pytz
from tqdm import tqdm
from utils import DATA_DIR, TMP_DIR, measure_execution_time
from concurrent.futures import ThreadPoolExecutor
GNOSIS_API_INTERVAL = 0.2 # 5 calls in 1 second
GNOSIS_URL = "https://api.gnosisscan.io/api"
GNOSIS_API_KEY = os.environ.get("GNOSIS_API_KEY", None)
# https://api.gnosisscan.io/api?module=account&action=txlist&address=0x1fe2b09de07475b1027b0c73a5bf52693b31a52e&startblock=36626348&endblock=36626348&page=1&offset=10&sort=asc&apikey=${gnosis_api_key}""
# Connect to Gnosis Chain RPC
w3 = Web3(Web3.HTTPProvider("https://rpc.gnosischain.com"))
def parallelize_timestamp_computation(df: pd.DataFrame, function: callable) -> list:
"""Parallelize the timestamp conversion."""
tx_hashes = df["tx_hash"].tolist()
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(tqdm(executor.map(function, tx_hashes), total=len(tx_hashes)))
return results
def transform_timestamp_to_datetime(timestamp):
dt = datetime.fromtimestamp(timestamp, timezone.utc)
return dt
def get_tx_hash(trader_address, request_block):
"""Function to get the transaction hash from the address and block number"""
params = {
"module": "account",
"action": "txlist",
"address": trader_address,
"page": 1,
"offset": 100,
"startblock": request_block,
"endblock": request_block,
"sort": "asc",
"apikey": GNOSIS_API_KEY,
}
try:
response = requests.get(GNOSIS_URL, params=params)
tx_list = response.json()["result"]
time.sleep(GNOSIS_API_INTERVAL)
if len(tx_list) > 1:
raise ValueError("More than one transaction found")
return tx_list[0]["hash"]
except Exception as e:
return None
def add_tx_hash_info(filename: str = "tools.parquet"):
"""Function to add the hash info to the saved tools parquet file"""
tools = pd.read_parquet(DATA_DIR / filename)
tools["tx_hash"] = None
total_errors = 0
for i, mech_request in tqdm(
tools.iterrows(), total=len(tools), desc="Adding tx hash"
):
try:
trader_address = mech_request["trader_address"]
block_number = mech_request["request_block"]
tools.at[i, "tx_hash"] = get_tx_hash(
trader_address=trader_address, request_block=block_number
)
except Exception as e:
print(f"Error with mech request {mech_request}")
total_errors += 1
continue
print(f"Total number of errors = {total_errors}")
tools.to_parquet(DATA_DIR / filename)
def get_transaction_timestamp(tx_hash: str, web3: Web3):
try:
# Get transaction data
tx = web3.eth.get_transaction(tx_hash)
# Get block data
block = web3.eth.get_block(tx["blockNumber"])
# Get timestamp
timestamp = block["timestamp"]
# Convert to datetime
dt = datetime.fromtimestamp(timestamp, tz=pytz.UTC)
# return {
# "timestamp": timestamp,
# "datetime": dt,
# "from_address": tx["from"],
# "to_address": tx["to"],
# "success": True,
# }
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception as e:
print(f"Error getting the timestamp from {tx_hash}")
return None
@measure_execution_time
def compute_request_time(tools_df: pd.DataFrame) -> pd.DataFrame:
"""Function to compute the request timestamp from the tx hash"""
# read the local info
try:
gnosis_info = pickle.load(open(TMP_DIR / "gnosis_info.pkl", "rb"))
except Exception:
print("File not found or not created. Creating a new one")
gnosis_info = {}
# any previous information?
tools_df["request_time"] = tools_df["tx_hash"].map(gnosis_info)
# Identify tools with missing request_time and fill them
missing_time_indices = tools_df[tools_df["request_time"].isna()].index
print(f"length of missing_time_indices = {len(missing_time_indices)}")
# traverse all tx hashes and get the timestamp of each tx
partial_mech_request_timestamp = partial(get_transaction_timestamp, web3=w3)
missing_timestamps = parallelize_timestamp_computation(
tools_df.loc[missing_time_indices], partial_mech_request_timestamp
)
# Update the original DataFrame with the missing timestamps
for i, timestamp in zip(missing_time_indices, missing_timestamps):
tools_df.at[i, "request_time"] = timestamp
# creating other time fields
tools_df["request_month_year"] = pd.to_datetime(
tools_df["request_time"]
).dt.strftime("%Y-%m")
tools_df["request_month_year_week"] = (
pd.to_datetime(tools_df["request_time"])
.dt.to_period("W")
.dt.start_time.dt.strftime("%b-%d-%Y")
)
# Update t_map with new timestamps
new_timestamps = (
tools_df[["tx_hash", "request_time"]]
.dropna()
.set_index("tx_hash")
.to_dict()["request_time"]
)
gnosis_info.update(new_timestamps)
# saving gnosis info
with open(TMP_DIR / "gnosis_info.pkl", "wb") as f:
pickle.dump(gnosis_info, f)
return tools_df
def get_account_details(address):
# gnosis_url = GNOSIS_URL.substitute(gnosis_api_key=GNOSIS_API_KEY, tx_hash=tx_hash)
params = {
"module": "account",
"action": "txlistinternal",
"address": address,
#'page': 1,
#'offset': 100,
#'startblock': 0,
#'endblock': 9999999999,
#'sort': 'asc',
"apikey": GNOSIS_API_KEY,
}
try:
response = requests.get(GNOSIS_URL, params=params)
return response.json()
except Exception as e:
return {"error": str(e)}
if __name__ == "__main__":
# tx_data = "0x783BFA045BDE2D0BCD65280D97A29E7BD9E4FDC10985848690C9797E767140F4"
new_tools = pd.read_parquet(DATA_DIR / "new_tools.parquet")
new_tools = compute_request_time(new_tools)
new_tools.to_parquet(DATA_DIR / "new_tools.parquet")
# result = get_tx_hash("0x1fe2b09de07475b1027b0c73a5bf52693b31a52e", 36626348)
# print(result)
|