import json import os import time from typing import List, Any, Optional, Union, Tuple import numpy as np import pandas as pd import gc import re from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from enum import Enum from string import Template from json.decoder import JSONDecodeError DEFAULT_MECH_FEE = 0.01 REDUCE_FACTOR = 0.25 SLEEP = 0.5 REQUEST_ID_FIELD = "request_id" SCRIPTS_DIR = Path(__file__).parent ROOT_DIR = SCRIPTS_DIR.parent DATA_DIR = ROOT_DIR / "data" JSON_DATA_DIR = ROOT_DIR / "json_data" HIST_DIR = ROOT_DIR / "historical_data" TMP_DIR = ROOT_DIR / "tmp" BLOCK_FIELD = "block" CID_PREFIX = "f01701220" REQUEST_ID = "requestId" REQUEST_SENDER = "sender" PROMPT_FIELD = "prompt" HTTP = "http://" HTTPS = HTTP[:4] + "s" + HTTP[4:] FORMAT_UPDATE_BLOCK_NUMBER = 30411638 INVALID_ANSWER_HEX = ( "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" ) OLD_IPFS_ADDRESS = "https://gateway.autonolas.tech/ipfs/" IPFS_ADDRESS = "https://gateway.gcp.autonolas.tech/ipfs/" INC_TOOLS = [ "prediction-online", "prediction-offline", "claude-prediction-online", "claude-prediction-offline", "prediction-offline-sme", "prediction-online-sme", "prediction-request-rag", "prediction-request-reasoning", "prediction-url-cot-claude", "prediction-request-rag-claude", "prediction-request-reasoning-claude", "superforcaster", ] SUBGRAPH_URL = Template( """https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/7s9rGBffUTL8kDZuxvvpuc46v44iuDarbrADBFw5uVp2""" ) OMEN_SUBGRAPH_URL = Template( """https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/9fUVQpFwzpdWS9bq5WkAnmKbNNcoBwatMR4yZq81pbbz""" ) NETWORK_SUBGRAPH_URL = Template( """https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/FxV6YUix58SpYmLBwc9gEHkwjfkqwe1X5FJQjn8nKPyA""" ) # THEGRAPH_ENDPOINT = ( # "https://api.studio.thegraph.com/query/78829/mech-predict/version/latest" # ) MECH_SUBGRAPH_URL = Template( """https://gateway.thegraph.com/api/${subgraph_api_key}/subgraphs/id/4YGoX3iXUni1NBhWJS5xyKcntrAzssfytJK7PQxxQk5g""" ) SUBGRAPH_API_KEY = os.environ.get("SUBGRAPH_API_KEY", None) RPC = os.environ.get("RPC", None) class MechEventName(Enum): """The mech's event names.""" REQUEST = "Request" DELIVER = "Deliver" @dataclass class MechEvent: """A mech's on-chain event representation.""" for_block: int requestId: int data: bytes sender: str def _ipfs_link(self) -> Optional[str]: """Get the ipfs link for the data.""" return f"{IPFS_ADDRESS}{CID_PREFIX}{self.data.hex()}" @property def ipfs_request_link(self) -> Optional[str]: """Get the IPFS link for the request.""" return f"{self._ipfs_link()}/metadata.json" @property def ipfs_deliver_link(self) -> Optional[str]: """Get the IPFS link for the deliver.""" if self.requestId is None: return None return f"{self._ipfs_link()}/{self.requestId}" def ipfs_link(self, event_name: MechEventName) -> Optional[str]: """Get the ipfs link based on the event.""" if event_name == MechEventName.REQUEST: if self.for_block < FORMAT_UPDATE_BLOCK_NUMBER: return self._ipfs_link() return self.ipfs_request_link if event_name == MechEventName.DELIVER: return self.ipfs_deliver_link return None @dataclass(init=False) class MechRequest: """A structure for a request to a mech.""" request_id: Optional[int] request_block: Optional[int] prompt_request: Optional[str] tool: Optional[str] nonce: Optional[str] trader_address: Optional[str] def __init__(self, **kwargs: Any) -> None: """Initialize the request ignoring extra keys.""" self.request_id = int(kwargs.pop(REQUEST_ID, 0)) self.request_block = int(kwargs.pop(BLOCK_FIELD, 0)) self.prompt_request = kwargs.pop(PROMPT_FIELD, None) self.tool = kwargs.pop("tool", None) self.nonce = kwargs.pop("nonce", None) self.trader_address = kwargs.pop("sender", None) @dataclass(init=False) class PredictionResponse: """A response of a prediction.""" p_yes: float p_no: float confidence: float info_utility: float vote: Optional[str] win_probability: Optional[float] def __init__(self, **kwargs: Any) -> None: """Initialize the mech's prediction ignoring extra keys.""" try: self.p_yes = float(kwargs.pop("p_yes")) self.p_no = float(kwargs.pop("p_no")) self.confidence = float(kwargs.pop("confidence")) self.info_utility = float(kwargs.pop("info_utility")) self.win_probability = 0 # Validate probabilities probabilities = { "p_yes": self.p_yes, "p_no": self.p_no, "confidence": self.confidence, "info_utility": self.info_utility, } for name, prob in probabilities.items(): if not 0 <= prob <= 1: raise ValueError(f"{name} probability is out of bounds: {prob}") if self.p_yes + self.p_no != 1: raise ValueError( f"Sum of p_yes and p_no is not 1: {self.p_yes} + {self.p_no}" ) self.vote = self.get_vote() self.win_probability = self.get_win_probability() except KeyError as e: raise KeyError(f"Missing key in PredictionResponse: {e}") except ValueError as e: raise ValueError(f"Invalid value in PredictionResponse: {e}") def get_vote(self) -> Optional[str]: """Return the vote.""" if self.p_no == self.p_yes: return None if self.p_no > self.p_yes: return "No" return "Yes" def get_win_probability(self) -> Optional[float]: """Return the probability estimation for winning with vote.""" return max(self.p_no, self.p_yes) @dataclass(init=False) class MechResponse: """A structure for the response of a mech.""" request_id: int deliver_block: Optional[int] result: Optional[PredictionResponse] error: Optional[str] error_message: Optional[str] prompt_response: Optional[str] mech_address: Optional[str] def __init__(self, **kwargs: Any) -> None: """Initialize the mech's response ignoring extra keys.""" self.error = kwargs.get("error", None) self.request_id = int(kwargs.get(REQUEST_ID, 0)) self.deliver_block = int(kwargs.get(BLOCK_FIELD, 0)) self.result = kwargs.get("result", None) self.prompt_response = kwargs.get(PROMPT_FIELD, None) self.mech_address = kwargs.get("sender", None) if self.result != "Invalid response": self.error_message = kwargs.get("error_message", None) try: if isinstance(self.result, str): kwargs = json.loads(self.result) self.result = PredictionResponse(**kwargs) self.error = 0 except JSONDecodeError: self.error_message = "Response parsing error" self.error = 1 except Exception as e: self.error_message = str(e) self.error = 1 else: self.error_message = "Invalid response from tool" self.error = 1 self.result = None EVENT_TO_MECH_STRUCT = { MechEventName.REQUEST: MechRequest, MechEventName.DELIVER: MechResponse, } def transform_to_datetime(x): return datetime.fromtimestamp(int(x), tz=timezone.utc) def measure_execution_time(func): def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {execution_time:.6f} seconds") return result return wrapper def limit_text(text: str, limit: int = 200) -> str: """Limit the given text""" if len(text) > limit: return f"{text[:limit]}..." return text def check_for_dicts(df: pd.DataFrame) -> List[str]: """Check for columns that contain dictionaries.""" dict_columns = [] for column in df.columns: if df[column].apply(lambda x: isinstance(x, dict)).any(): dict_columns.append(column) return dict_columns def drop_dict_rows(df: pd.DataFrame, dict_columns: List[str]) -> pd.DataFrame: """Drop rows that contain dictionaries.""" for column in dict_columns: df = df[~df[column].apply(lambda x: isinstance(x, dict))] return df def clean(df: pd.DataFrame) -> pd.DataFrame: """Clean the dataframe.""" dict_columns = check_for_dicts(df) df = drop_dict_rows(df, dict_columns) cleaned = df.drop_duplicates() cleaned[REQUEST_ID_FIELD] = cleaned[REQUEST_ID_FIELD].astype("str") return cleaned def gen_event_filename(event_name: MechEventName) -> str: """Generate the filename of an event.""" return f"{event_name.value.lower()}s.parquet" def read_n_last_lines(filename: str, n: int = 1) -> str: """Return the `n` last lines' content of a file.""" num_newlines = 0 with open(filename, "rb") as f: try: f.seek(-2, os.SEEK_END) while num_newlines < n: f.seek(-2, os.SEEK_CUR) if f.read(1) == b"\n": num_newlines += 1 except OSError: f.seek(0) last_line = f.readline().decode() return last_line def get_question(text: str) -> str: """Get the question from a text.""" # Regex to find text within double quotes pattern = r'"([^"]*)"' # Find all occurrences questions = re.findall(pattern, text) # Assuming you want the first question if there are multiple question = questions[0] if questions else None return question def current_answer(text: str, fpmms: pd.DataFrame) -> Optional[str]: """Get the current answer for a question.""" row = fpmms[fpmms["title"] == text] if row.shape[0] == 0: return None return row["currentAnswer"].values[0] def convert_hex_to_int(x: Union[str, float]) -> Union[int, float]: """Convert hex to int""" if isinstance(x, float): return np.nan if isinstance(x, str): if x == INVALID_ANSWER_HEX: return -1 return int(x, 16) def wei_to_unit(wei: int) -> float: """Converts wei to currency unit.""" return wei / 10**18 def get_vote(p_yes, p_no) -> Optional[str]: """Return the vote.""" if p_no == p_yes: return None if p_no > p_yes: return "No" return "Yes" def get_win_probability(p_yes, p_no) -> Optional[float]: """Return the probability estimation for winning with vote.""" return max(p_no, p_yes) def get_result_values(result: str) -> Tuple: if result == "Invalid response": return 1, "Invalid response from tool", None error_message = None params = None try: if isinstance(result, str): params = json.loads(result) error_value = 0 except JSONDecodeError: error_message = "Response parsing error" error_value = 1 except Exception as e: error_message = str(e) error_value = 1 return error_value, error_message, params def get_prediction_values(params: dict) -> Tuple: p_yes = float(params.pop("p_yes")) p_no = float(params.pop("p_no")) confidence = float(params.pop("confidence")) info_utility = float(params.pop("info_utility")) return p_yes, p_no, confidence, info_utility def to_content(q: str) -> dict[str, Any]: """Convert the given query string to payload content, i.e., add it under a `queries` key and convert it to bytes.""" finalized_query = { "query": q, "variables": None, "extensions": {"headers": None}, } return finalized_query def read_parquet_files(tools_filename: str, trades_filename: str): # Check if tools.parquet is in the same directory try: tools = pd.read_parquet(DATA_DIR / tools_filename) # make sure creator_address is in the columns assert "trader_address" in tools.columns, "trader_address column not found" # lowercase and strip creator_address tools["trader_address"] = tools["trader_address"].str.lower().str.strip() # drop duplicates tools.drop_duplicates(inplace=True) print(f"{tools_filename} loaded") except FileNotFoundError: print("tools.parquet not found. Please run tools.py first.") return try: fpmmTrades = pd.read_parquet(DATA_DIR / trades_filename) fpmmTrades["trader_address"] = ( fpmmTrades["trader_address"].str.lower().str.strip() ) except FileNotFoundError: print("fpmmsTrades.parquet not found.") return return tools, fpmmTrades