from web3 import Web3 import os import requests import time import pickle from datetime import datetime, timezone from functools import partial import pandas as pd import pytz from tqdm import tqdm from utils import DATA_DIR, TMP_DIR, measure_execution_time from concurrent.futures import ThreadPoolExecutor GNOSIS_API_INTERVAL = 0.2 # 5 calls in 1 second GNOSIS_URL = "https://api.gnosisscan.io/api" GNOSIS_API_KEY = os.environ.get("GNOSIS_API_KEY", None) # https://api.gnosisscan.io/api?module=account&action=txlist&address=0x1fe2b09de07475b1027b0c73a5bf52693b31a52e&startblock=36626348&endblock=36626348&page=1&offset=10&sort=asc&apikey=${gnosis_api_key}"" # Connect to Gnosis Chain RPC w3 = Web3(Web3.HTTPProvider("https://rpc.gnosischain.com")) def parallelize_timestamp_computation(df: pd.DataFrame, function: callable) -> list: """Parallelize the timestamp conversion.""" tx_hashes = df["tx_hash"].tolist() with ThreadPoolExecutor(max_workers=10) as executor: results = list(tqdm(executor.map(function, tx_hashes), total=len(tx_hashes))) return results def transform_timestamp_to_datetime(timestamp): dt = datetime.fromtimestamp(timestamp, timezone.utc) return dt def get_tx_hash(trader_address, request_block): """Function to get the transaction hash from the address and block number""" params = { "module": "account", "action": "txlist", "address": trader_address, "page": 1, "offset": 100, "startblock": request_block, "endblock": request_block, "sort": "asc", "apikey": GNOSIS_API_KEY, } try: response = requests.get(GNOSIS_URL, params=params) tx_list = response.json()["result"] time.sleep(GNOSIS_API_INTERVAL) if len(tx_list) > 1: raise ValueError("More than one transaction found") return tx_list[0]["hash"] except Exception as e: return None def add_tx_hash_info(filename: str = "tools.parquet"): """Function to add the hash info to the saved tools parquet file""" tools = pd.read_parquet(DATA_DIR / filename) tools["tx_hash"] = None total_errors = 0 for i, mech_request in tqdm( tools.iterrows(), total=len(tools), desc="Adding tx hash" ): try: trader_address = mech_request["trader_address"] block_number = mech_request["request_block"] tools.at[i, "tx_hash"] = get_tx_hash( trader_address=trader_address, request_block=block_number ) except Exception as e: print(f"Error with mech request {mech_request}") total_errors += 1 continue print(f"Total number of errors = {total_errors}") tools.to_parquet(DATA_DIR / filename) def get_transaction_timestamp(tx_hash: str, web3: Web3): try: # Get transaction data tx = web3.eth.get_transaction(tx_hash) # Get block data block = web3.eth.get_block(tx["blockNumber"]) # Get timestamp timestamp = block["timestamp"] # Convert to datetime dt = datetime.fromtimestamp(timestamp, tz=pytz.UTC) # return { # "timestamp": timestamp, # "datetime": dt, # "from_address": tx["from"], # "to_address": tx["to"], # "success": True, # } return dt.strftime("%Y-%m-%d %H:%M:%S") except Exception as e: print(f"Error getting the timestamp from {tx_hash}") return None @measure_execution_time def compute_request_time(tools_df: pd.DataFrame) -> pd.DataFrame: """Function to compute the request timestamp from the tx hash""" # read the local info try: gnosis_info = pickle.load(open(TMP_DIR / "gnosis_info.pkl", "rb")) except Exception: print("File not found or not created. Creating a new one") gnosis_info = {} # any previous information? tools_df["request_time"] = tools_df["tx_hash"].map(gnosis_info) # Identify tools with missing request_time and fill them missing_time_indices = tools_df[tools_df["request_time"].isna()].index print(f"length of missing_time_indices = {len(missing_time_indices)}") # traverse all tx hashes and get the timestamp of each tx partial_mech_request_timestamp = partial(get_transaction_timestamp, web3=w3) missing_timestamps = parallelize_timestamp_computation( tools_df.loc[missing_time_indices], partial_mech_request_timestamp ) # Update the original DataFrame with the missing timestamps for i, timestamp in zip(missing_time_indices, missing_timestamps): tools_df.at[i, "request_time"] = timestamp # creating other time fields tools_df["request_month_year"] = pd.to_datetime( tools_df["request_time"] ).dt.strftime("%Y-%m") tools_df["request_month_year_week"] = ( pd.to_datetime(tools_df["request_time"]) .dt.to_period("W") .dt.start_time.dt.strftime("%b-%d-%Y") ) # Update t_map with new timestamps new_timestamps = ( tools_df[["tx_hash", "request_time"]] .dropna() .set_index("tx_hash") .to_dict()["request_time"] ) gnosis_info.update(new_timestamps) # saving gnosis info with open(TMP_DIR / "gnosis_info.pkl", "wb") as f: pickle.dump(gnosis_info, f) return tools_df def get_account_details(address): # gnosis_url = GNOSIS_URL.substitute(gnosis_api_key=GNOSIS_API_KEY, tx_hash=tx_hash) params = { "module": "account", "action": "txlistinternal", "address": address, #'page': 1, #'offset': 100, #'startblock': 0, #'endblock': 9999999999, #'sort': 'asc', "apikey": GNOSIS_API_KEY, } try: response = requests.get(GNOSIS_URL, params=params) return response.json() except Exception as e: return {"error": str(e)} if __name__ == "__main__": # tx_data = "0x783BFA045BDE2D0BCD65280D97A29E7BD9E4FDC10985848690C9797E767140F4" new_tools = pd.read_parquet(DATA_DIR / "new_tools.parquet") new_tools = compute_request_time(new_tools) new_tools.to_parquet(DATA_DIR / "new_tools.parquet") # result = get_tx_hash("0x1fe2b09de07475b1027b0c73a5bf52693b31a52e", 36626348) # print(result)