# -*- coding: utf-8 -*- # ------------------------------------------------------------------------------ # # Copyright 2023 Valory AG # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # ------------------------------------------------------------------------------ import json import os.path import re import sys import time import random from dataclasses import dataclass from enum import Enum from io import StringIO from typing import ( Optional, List, Dict, Any, Union, Callable, Tuple, ) import pandas as pd import requests from json.decoder import JSONDecodeError from eth_typing import ChecksumAddress from eth_utils import to_checksum_address from requests.adapters import HTTPAdapter from requests.exceptions import ( ReadTimeout as RequestsReadTimeoutError, HTTPError as RequestsHTTPError, ) from tqdm import tqdm from urllib3 import Retry from urllib3.exceptions import ( ReadTimeoutError as Urllib3ReadTimeoutError, HTTPError as Urllib3HTTPError, ) from web3 import Web3, HTTPProvider from web3.exceptions import MismatchedABI from web3.types import BlockParams from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path CONTRACTS_PATH = "contracts" MECH_TO_INFO = { # this block number is when the creator had its first tx ever, and after this mech's creation "0xff82123dfb52ab75c417195c5fdb87630145ae81": ("old_mech_abi.json", 28911547), # this block number is when this mech was created "0x77af31de935740567cf4ff1986d04b2c964a786a": ("new_mech_abi.json", 30776879), } # optionally set the latest block to stop searching for the delivered events LATEST_BLOCK: Optional[int] = None LATEST_BLOCK_NAME: BlockParams = "latest" BLOCK_DATA_NUMBER = "number" BLOCKS_CHUNK_SIZE = 10_000 REDUCE_FACTOR = 0.25 EVENT_ARGUMENTS = "args" DATA = "data" REQUEST_ID = "requestId" REQUEST_ID_FIELD = "request_id" REQUEST_SENDER = "sender" PROMPT_FIELD = "prompt" BLOCK_FIELD = "block" CID_PREFIX = "f01701220" HTTP = "http://" HTTPS = HTTP[:4] + "s" + HTTP[4:] IPFS_ADDRESS = f"{HTTPS}gateway.autonolas.tech/ipfs/" IPFS_LINKS_SERIES_NAME = "ipfs_links" BACKOFF_FACTOR = 1 STATUS_FORCELIST = [404, 500, 502, 503, 504] DEFAULT_FILENAME = "tools.parquet" RE_RPC_FILTER_ERROR = r"Filter with id: '\d+' does not exist." ABI_ERROR = "The event signature did not match the provided ABI" SLEEP = 0.5 HTTP_TIMEOUT = 10 N_IPFS_RETRIES = 1 N_RPC_RETRIES = 100 RPC_POLL_INTERVAL = 0.05 IPFS_POLL_INTERVAL = 0.05 FORMAT_UPDATE_BLOCK_NUMBER = 30411638 IRRELEVANT_TOOLS = [ "openai-text-davinci-002", "openai-text-davinci-003", "openai-gpt-3.5-turbo", "openai-gpt-4", "stabilityai-stable-diffusion-v1-5", "stabilityai-stable-diffusion-xl-beta-v2-2-2", "stabilityai-stable-diffusion-512-v2-1", "stabilityai-stable-diffusion-768-v2-1", "deepmind-optimization-strong", "deepmind-optimization", ] # this is how frequently we will keep a snapshot of the progress so far in terms of blocks' batches # for example, the value 1 means that for every `BLOCKS_CHUNK_SIZE` blocks that we search, we also store the snapshot SNAPSHOT_RATE = 10 NUM_WORKERS = 10 GET_CONTENTS_BATCH_SIZE = 1000 SCRIPTS_DIR = Path(__file__).parent ROOT_DIR = SCRIPTS_DIR.parent DATA_DIR = ROOT_DIR / "data" class MechEventName(Enum): """The mech's event names.""" REQUEST = "Request" DELIVER = "Deliver" @dataclass class MechEvent: """A mech's on-chain event representation.""" for_block: int requestId: int data: bytes sender: str def _ipfs_link(self) -> Optional[str]: """Get the ipfs link for the data.""" return f"{IPFS_ADDRESS}{CID_PREFIX}{self.data.hex()}" @property def ipfs_request_link(self) -> Optional[str]: """Get the IPFS link for the request.""" return f"{self._ipfs_link()}/metadata.json" @property def ipfs_deliver_link(self) -> Optional[str]: """Get the IPFS link for the deliver.""" if self.requestId is None: return None return f"{self._ipfs_link()}/{self.requestId}" def ipfs_link(self, event_name: MechEventName) -> Optional[str]: """Get the ipfs link based on the event.""" if event_name == MechEventName.REQUEST: if self.for_block < FORMAT_UPDATE_BLOCK_NUMBER: return self._ipfs_link() return self.ipfs_request_link if event_name == MechEventName.DELIVER: return self.ipfs_deliver_link return None @dataclass(init=False) class MechRequest: """A structure for a request to a mech.""" request_id: Optional[int] request_block: Optional[int] prompt_request: Optional[str] tool: Optional[str] nonce: Optional[str] trader_address: Optional[str] def __init__(self, **kwargs: Any) -> None: """Initialize the request ignoring extra keys.""" self.request_id = int(kwargs.pop(REQUEST_ID, 0)) self.request_block = int(kwargs.pop(BLOCK_FIELD, 0)) self.prompt_request = kwargs.pop(PROMPT_FIELD, None) self.tool = kwargs.pop("tool", None) self.nonce = kwargs.pop("nonce", None) self.trader_address = kwargs.pop("sender", None) @dataclass(init=False) class PredictionResponse: """A response of a prediction.""" p_yes: float p_no: float confidence: float info_utility: float vote: Optional[str] win_probability: Optional[float] def __init__(self, **kwargs: Any) -> None: """Initialize the mech's prediction ignoring extra keys.""" try: self.p_yes = float(kwargs.pop("p_yes")) self.p_no = float(kwargs.pop("p_no")) self.confidence = float(kwargs.pop("confidence")) self.info_utility = float(kwargs.pop("info_utility")) self.win_probability = 0 # Validate probabilities probabilities = { "p_yes": self.p_yes, "p_no": self.p_no, "confidence": self.confidence, "info_utility": self.info_utility, } for name, prob in probabilities.items(): if not 0 <= prob <= 1: raise ValueError(f"{name} probability is out of bounds: {prob}") if self.p_yes + self.p_no != 1: raise ValueError( f"Sum of p_yes and p_no is not 1: {self.p_yes} + {self.p_no}" ) self.vote = self.get_vote() self.win_probability = self.get_win_probability() except KeyError as e: raise KeyError(f"Missing key in PredictionResponse: {e}") except ValueError as e: raise ValueError(f"Invalid value in PredictionResponse: {e}") def get_vote(self) -> Optional[str]: """Return the vote.""" if self.p_no == self.p_yes: return None if self.p_no > self.p_yes: return "No" return "Yes" def get_win_probability(self) -> Optional[float]: """Return the probability estimation for winning with vote.""" return max(self.p_no, self.p_yes) @dataclass(init=False) class MechResponse: """A structure for the response of a mech.""" request_id: int deliver_block: Optional[int] result: Optional[PredictionResponse] error: Optional[str] error_message: Optional[str] prompt_response: Optional[str] mech_address: Optional[str] def __init__(self, **kwargs: Any) -> None: """Initialize the mech's response ignoring extra keys.""" self.error = kwargs.get("error", None) self.request_id = int(kwargs.get(REQUEST_ID, 0)) self.deliver_block = int(kwargs.get(BLOCK_FIELD, 0)) self.result = kwargs.get("result", None) self.prompt_response = kwargs.get(PROMPT_FIELD, None) self.mech_address = kwargs.get("sender", None) if self.result != "Invalid response": self.error_message = kwargs.get("error_message", None) try: if isinstance(self.result, str): kwargs = json.loads(self.result) self.result = PredictionResponse(**kwargs) self.error = 0 except JSONDecodeError: self.error_message = "Response parsing error" self.error = 1 except Exception as e: self.error_message = str(e) self.error = 1 else: self.error_message = "Invalid response from tool" self.error = 1 self.result = None EVENT_TO_MECH_STRUCT = { MechEventName.REQUEST: MechRequest, MechEventName.DELIVER: MechResponse, } def parse_args() -> str: """Parse the arguments and return the RPC.""" if len(sys.argv) != 2: raise ValueError("Expected the RPC as a positional argument.") return sys.argv[1] def read_abi(abi_path: str) -> str: """Read and return the wxDAI contract's ABI.""" with open(abi_path) as abi_file: return abi_file.read() def reduce_window(contract_instance, event, from_block, batch_size, latest_block): """Dynamically reduce the batch size window.""" keep_fraction = 1 - REDUCE_FACTOR events_filter = contract_instance.events[event].build_filter() events_filter.fromBlock = from_block batch_size = int(batch_size * keep_fraction) events_filter.toBlock = min(from_block + batch_size, latest_block) tqdm.write(f"RPC timed out! Resizing batch size to {batch_size}.") time.sleep(SLEEP) return events_filter, batch_size def get_events( w3: Web3, event: str, mech_address: ChecksumAddress, mech_abi_path: str, earliest_block: int, latest_block: int, ) -> List: """Get the delivered events.""" abi = read_abi(mech_abi_path) contract_instance = w3.eth.contract(address=mech_address, abi=abi) events = [] from_block = earliest_block batch_size = BLOCKS_CHUNK_SIZE with tqdm( total=latest_block - from_block, desc=f"Searching {event} events for mech {mech_address}", unit="blocks", ) as pbar: while from_block < latest_block: events_filter = contract_instance.events[event].build_filter() events_filter.fromBlock = from_block events_filter.toBlock = min(from_block + batch_size, latest_block) entries = None retries = 0 while entries is None: try: entries = events_filter.deploy(w3).get_all_entries() retries = 0 except (RequestsHTTPError, Urllib3HTTPError) as exc: if "Request Entity Too Large" in exc.args[0]: events_filter, batch_size = reduce_window( contract_instance, event, from_block, batch_size, latest_block, ) except (Urllib3ReadTimeoutError, RequestsReadTimeoutError): events_filter, batch_size = reduce_window( contract_instance, event, from_block, batch_size, latest_block ) except Exception as exc: retries += 1 if retries == N_RPC_RETRIES: tqdm.write( f"Skipping events for blocks {events_filter.fromBlock} - {events_filter.toBlock} " f"as the retries have been exceeded." ) break sleep = SLEEP * retries if ( ( isinstance(exc, ValueError) and re.match( RE_RPC_FILTER_ERROR, exc.args[0].get("message", "") ) is None ) and not isinstance(exc, ValueError) and not isinstance(exc, MismatchedABI) ): tqdm.write( f"An error was raised from the RPC: {exc}\n Retrying in {sleep} seconds." ) time.sleep(sleep) from_block += batch_size pbar.update(batch_size) if entries is None: continue chunk = list(entries) events.extend(chunk) time.sleep(RPC_POLL_INTERVAL) return events def parse_events(raw_events: List) -> List[MechEvent]: """Parse all the specified MechEvents.""" parsed_events = [] for event in raw_events: for_block = event.get("blockNumber", 0) args = event.get(EVENT_ARGUMENTS, {}) request_id = args.get(REQUEST_ID, 0) data = args.get(DATA, b"") sender = args.get(REQUEST_SENDER, "") parsed_event = MechEvent(for_block, request_id, data, sender) parsed_events.append(parsed_event) return parsed_events def create_session() -> requests.Session: """Create a session with a retry strategy.""" session = requests.Session() retry_strategy = Retry( total=N_IPFS_RETRIES + 1, backoff_factor=BACKOFF_FACTOR, status_forcelist=STATUS_FORCELIST, ) adapter = HTTPAdapter(max_retries=retry_strategy) for protocol in (HTTP, HTTPS): session.mount(protocol, adapter) return session def request( session: requests.Session, url: str, timeout: int = HTTP_TIMEOUT ) -> Optional[requests.Response]: """Perform a request with a session.""" try: response = session.get(url, timeout=timeout) response.raise_for_status() except requests.exceptions.HTTPError as exc: tqdm.write(f"HTTP error occurred: {exc}.") except Exception as exc: tqdm.write(f"Unexpected error occurred: {exc}.") else: return response return None def limit_text(text: str, limit: int = 200) -> str: """Limit the given text""" if len(text) > limit: return f"{text[:limit]}..." return text def parse_ipfs_response( session: requests.Session, url: str, event: MechEvent, event_name: MechEventName, response: requests.Response, ) -> Optional[Dict[str, str]]: """Parse a response from IPFS.""" try: return response.json() except requests.exceptions.JSONDecodeError: # this is a workaround because the `metadata.json` file was introduced and removed multiple times if event_name == MechEventName.REQUEST and url != event.ipfs_request_link: url = event.ipfs_request_link response = request(session, url) if response is None: tqdm.write(f"Skipping {event=}.") return None try: return response.json() except requests.exceptions.JSONDecodeError: pass tqdm.write(f"Failed to parse response into json for {url=}.") return None def parse_ipfs_tools_content( raw_content: Dict[str, str], event: MechEvent, event_name: MechEventName ) -> Optional[Union[MechRequest, MechResponse]]: """Parse tools content from IPFS.""" struct = EVENT_TO_MECH_STRUCT.get(event_name) raw_content[REQUEST_ID] = str(event.requestId) raw_content[BLOCK_FIELD] = str(event.for_block) raw_content["sender"] = str(event.sender) try: mech_response = struct(**raw_content) except (ValueError, TypeError, KeyError): tqdm.write(f"Could not parse {limit_text(str(raw_content))}") return None if event_name == MechEventName.REQUEST and mech_response.tool in IRRELEVANT_TOOLS: return None return mech_response def get_contents( session: requests.Session, events: List[MechEvent], event_name: MechEventName ) -> pd.DataFrame: """Fetch the tools' responses.""" contents = [] for event in tqdm(events, desc=f"Tools' results", unit="results"): url = event.ipfs_link(event_name) response = request(session, url) if response is None: tqdm.write(f"Skipping {event=}.") continue raw_content = parse_ipfs_response(session, url, event, event_name, response) if raw_content is None: continue mech_response = parse_ipfs_tools_content(raw_content, event, event_name) if mech_response is None: continue contents.append(mech_response) time.sleep(IPFS_POLL_INTERVAL) return pd.DataFrame(contents) def check_for_dicts(df: pd.DataFrame) -> List[str]: """Check for columns that contain dictionaries.""" dict_columns = [] for column in df.columns: if df[column].apply(lambda x: isinstance(x, dict)).any(): dict_columns.append(column) return dict_columns def drop_dict_rows(df: pd.DataFrame, dict_columns: List[str]) -> pd.DataFrame: """Drop rows that contain dictionaries.""" for column in dict_columns: df = df[~df[column].apply(lambda x: isinstance(x, dict))] return df def clean(df: pd.DataFrame) -> pd.DataFrame: """Clean the dataframe.""" dict_columns = check_for_dicts(df) df = drop_dict_rows(df, dict_columns) cleaned = df.drop_duplicates() cleaned[REQUEST_ID_FIELD] = cleaned[REQUEST_ID_FIELD].astype("str") return cleaned def transform_request(contents: pd.DataFrame) -> pd.DataFrame: """Transform the requests dataframe.""" return clean(contents) def transform_deliver(contents: pd.DataFrame, full_contents=False) -> pd.DataFrame: """Transform the delivers dataframe.""" unpacked_result = pd.json_normalize(contents.result) # # drop result column if it exists if "result" in unpacked_result.columns: unpacked_result.drop(columns=["result"], inplace=True) # drop prompt column if it exists if "prompt" in unpacked_result.columns: unpacked_result.drop(columns=["prompt"], inplace=True) # rename prompt column to prompt_deliver unpacked_result.rename(columns={"prompt": "prompt_deliver"}, inplace=True) contents = pd.concat((contents, unpacked_result), axis=1) if "result" in contents.columns: contents.drop(columns=["result"], inplace=True) if "prompt" in contents.columns: contents.drop(columns=["prompt"], inplace=True) return clean(contents) def gen_event_filename(event_name: MechEventName) -> str: """Generate the filename of an event.""" return f"{event_name.value.lower()}s.parquet" def read_n_last_lines(filename: str, n: int = 1) -> str: """Return the `n` last lines' content of a file.""" num_newlines = 0 with open(filename, "rb") as f: try: f.seek(-2, os.SEEK_END) while num_newlines < n: f.seek(-2, os.SEEK_CUR) if f.read(1) == b"\n": num_newlines += 1 except OSError: f.seek(0) last_line = f.readline().decode() return last_line def get_earliest_block(event_name: MechEventName) -> int: """Get the earliest block number to use when filtering for events.""" filename = gen_event_filename(event_name) if not os.path.exists(DATA_DIR / filename): return 0 df = pd.read_parquet(DATA_DIR / filename) block_field = f"{event_name.value.lower()}_{BLOCK_FIELD}" return int(df[block_field].max()) def store_progress( filename: str, event_to_contents: Dict[str, pd.DataFrame], tools: pd.DataFrame, ) -> None: """Store the given progress.""" print("starting") if filename: DATA_DIR.mkdir(parents=True, exist_ok=True) # Ensure the directory exists for event_name, content in event_to_contents.items(): event_filename = gen_event_filename(event_name) # Ensure this function returns a valid filename string try: if "result" in content.columns: content = content.drop(columns=["result"]) # Avoid in-place modification content.to_parquet(DATA_DIR / event_filename, index=False) except Exception as e: print(f"Failed to write {event_name}: {e}") # Drop result and error columns for tools DataFrame try: if "result" in tools.columns: tools = tools.drop(columns=["result"]) tools.to_parquet(DATA_DIR / filename, index=False) except Exception as e: print(f"Failed to write tools data: {e}") def etl( rpcs: List[str], filename: Optional[str] = None, full_contents: bool = True ) -> pd.DataFrame: """Fetch from on-chain events, process, store and return the tools' results on all the questions as a Dataframe.""" w3s = [Web3(HTTPProvider(r)) for r in rpcs] session = create_session() event_to_transformer = { MechEventName.REQUEST: transform_request, MechEventName.DELIVER: transform_deliver, } mech_to_info = { to_checksum_address(address): ( os.path.join(CONTRACTS_PATH, filename), earliest_block, ) for address, (filename, earliest_block) in MECH_TO_INFO.items() } event_to_contents = {} latest_block = LATEST_BLOCK if latest_block is None: latest_block = w3s[0].eth.get_block(LATEST_BLOCK_NAME)[BLOCK_DATA_NUMBER] next_start_block = None # Loop through events in event_to_transformer for event_name, transformer in event_to_transformer.items(): if next_start_block is None: next_start_block_base = get_earliest_block(event_name) # Loop through mech addresses in mech_to_info events = [] for address, (abi, earliest_block) in mech_to_info.items(): if next_start_block_base == 0: next_start_block = earliest_block else: next_start_block = next_start_block_base print( f"Searching for {event_name.value} events for mech {address} from block {next_start_block} to {latest_block}." ) # parallelize the fetching of events with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: futures = [] for i in range( next_start_block, latest_block, BLOCKS_CHUNK_SIZE * SNAPSHOT_RATE ): futures.append( executor.submit( get_events, random.choice(w3s), event_name.value, address, abi, i, min(i + BLOCKS_CHUNK_SIZE * SNAPSHOT_RATE, latest_block), ) ) for future in tqdm( as_completed(futures), total=len(futures), desc=f"Fetching {event_name.value} Events", ): current_mech_events = future.result() events.extend(current_mech_events) parsed = parse_events(events) contents = [] with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: futures = [] for i in range(0, len(parsed), GET_CONTENTS_BATCH_SIZE): futures.append( executor.submit( get_contents, session, parsed[i : i + GET_CONTENTS_BATCH_SIZE], event_name, ) ) for future in tqdm( as_completed(futures), total=len(futures), desc=f"Fetching {event_name.value} Contents", ): current_mech_contents = future.result() contents.append(current_mech_contents) contents = pd.concat(contents, ignore_index=True) full_contents = True if event_name == MechEventName.REQUEST: transformed = transformer(contents) elif event_name == MechEventName.DELIVER: transformed = transformer(contents, full_contents=full_contents) events_filename = gen_event_filename(event_name) if os.path.exists(DATA_DIR / events_filename): old = pd.read_parquet(DATA_DIR / events_filename) # Reset index to avoid index conflicts old.reset_index(drop=True, inplace=True) transformed.reset_index(drop=True, inplace=True) # Concatenate DataFrames transformed = pd.concat([old, transformed], ignore_index=True) # Drop duplicates if necessary transformed.drop_duplicates(subset=REQUEST_ID_FIELD, inplace=True) event_to_contents[event_name] = transformed.copy() # Store progress tools = pd.merge(*event_to_contents.values(), on=REQUEST_ID_FIELD) store_progress(filename, event_to_contents, tools) return tools if __name__ == "__main__": RPCs = [ "https://lb.nodies.app/v1/406d8dcc043f4cb3959ed7d6673d311a", ] tools = etl(rpcs=RPCs, filename=DEFAULT_FILENAME, full_contents=True)