|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import json |
|
from typing import ( |
|
Optional, |
|
List, |
|
Dict, |
|
Union, |
|
Any, |
|
) |
|
import pandas as pd |
|
import requests |
|
from datetime import datetime |
|
from gnosis_timestamps import transform_timestamp_to_datetime |
|
from requests.adapters import HTTPAdapter |
|
from tqdm import tqdm |
|
from urllib3 import Retry |
|
from markets import add_market_creator |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
from web3_utils import ( |
|
N_IPFS_RETRIES, |
|
) |
|
from utils import ( |
|
clean, |
|
BLOCK_FIELD, |
|
limit_text, |
|
DATA_DIR, |
|
JSON_DATA_DIR, |
|
MechEvent, |
|
MechEventName, |
|
MechRequest, |
|
MechResponse, |
|
EVENT_TO_MECH_STRUCT, |
|
REQUEST_ID, |
|
HTTP, |
|
HTTPS, |
|
get_result_values, |
|
get_vote, |
|
get_win_probability, |
|
get_prediction_values, |
|
) |
|
|
|
CONTRACTS_PATH = "contracts" |
|
MECH_TO_INFO = { |
|
|
|
"0xff82123dfb52ab75c417195c5fdb87630145ae81": ("old_mech_abi.json", 28911547), |
|
|
|
"0x77af31de935740567cf4ff1986d04b2c964a786a": ("new_mech_abi.json", 30776879), |
|
} |
|
|
|
|
|
EVENT_ARGUMENTS = "args" |
|
DATA = "data" |
|
IPFS_LINKS_SERIES_NAME = "ipfs_links" |
|
BACKOFF_FACTOR = 1 |
|
STATUS_FORCELIST = [404, 500, 502, 503, 504] |
|
DEFAULT_FILENAME = "tools.parquet" |
|
ABI_ERROR = "The event signature did not match the provided ABI" |
|
|
|
|
|
HTTP_TIMEOUT = 15 |
|
|
|
IRRELEVANT_TOOLS = [ |
|
"openai-text-davinci-002", |
|
"openai-text-davinci-003", |
|
"openai-gpt-3.5-turbo", |
|
"openai-gpt-4", |
|
"stabilityai-stable-diffusion-v1-5", |
|
"stabilityai-stable-diffusion-xl-beta-v2-2-2", |
|
"stabilityai-stable-diffusion-512-v2-1", |
|
"stabilityai-stable-diffusion-768-v2-1", |
|
"deepmind-optimization-strong", |
|
"deepmind-optimization", |
|
] |
|
|
|
|
|
|
|
SNAPSHOT_RATE = 10 |
|
NUM_WORKERS = 10 |
|
GET_CONTENTS_BATCH_SIZE = 1000 |
|
|
|
|
|
class TimestampedRetry(Retry): |
|
def increment(self, *args, **kwargs): |
|
print(f"Retry attempt at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") |
|
return super().increment(*args, **kwargs) |
|
|
|
|
|
def create_session() -> requests.Session: |
|
"""Create a session with a retry strategy.""" |
|
session = requests.Session() |
|
retry_strategy = TimestampedRetry( |
|
total=N_IPFS_RETRIES, |
|
backoff_factor=BACKOFF_FACTOR, |
|
status_forcelist=STATUS_FORCELIST, |
|
) |
|
adapter = HTTPAdapter(max_retries=retry_strategy) |
|
for protocol in (HTTP, HTTPS): |
|
session.mount(protocol, adapter) |
|
|
|
return session |
|
|
|
|
|
def request( |
|
session: requests.Session, url: str, timeout: int = HTTP_TIMEOUT |
|
) -> Optional[requests.Response]: |
|
"""Perform a request with a session.""" |
|
try: |
|
response = session.get(url, timeout=timeout) |
|
response.raise_for_status() |
|
except requests.exceptions.HTTPError as exc: |
|
tqdm.write(f"HTTP error occurred: {exc}.") |
|
except Exception as exc: |
|
tqdm.write(f"Unexpected error occurred: {exc}.") |
|
else: |
|
return response |
|
return None |
|
|
|
|
|
def parse_ipfs_response( |
|
session: requests.Session, |
|
url: str, |
|
event: MechEvent, |
|
event_name: MechEventName, |
|
response: requests.Response, |
|
) -> Optional[Dict[str, str]]: |
|
"""Parse a response from IPFS.""" |
|
try: |
|
return response.json() |
|
except requests.exceptions.JSONDecodeError: |
|
|
|
if event_name == MechEvent.REQUEST and url != event.ipfs_request_link: |
|
url = event.ipfs_request_link |
|
response = request(session, url) |
|
if response is None: |
|
tqdm.write(f"Skipping {event=}.") |
|
return None |
|
|
|
try: |
|
return response.json() |
|
except requests.exceptions.JSONDecodeError: |
|
pass |
|
|
|
tqdm.write(f"Failed to parse response into json for {url=}.") |
|
return None |
|
|
|
|
|
def parse_ipfs_tools_content( |
|
raw_content: Dict[str, str], event: MechEvent, event_name: MechEventName |
|
) -> Optional[Union[MechRequest, MechResponse]]: |
|
"""Parse tools content from IPFS.""" |
|
struct = EVENT_TO_MECH_STRUCT.get(event_name) |
|
raw_content[REQUEST_ID] = str(event.requestId) |
|
raw_content[BLOCK_FIELD] = str(event.for_block) |
|
raw_content["sender"] = str(event.sender) |
|
|
|
try: |
|
mech_response = struct(**raw_content) |
|
except (ValueError, TypeError, KeyError): |
|
tqdm.write(f"Could not parse {limit_text(str(raw_content))}") |
|
return None |
|
|
|
if event_name == MechEventName.REQUEST and mech_response.tool in IRRELEVANT_TOOLS: |
|
return None |
|
|
|
return mech_response |
|
|
|
|
|
def parse_json_events(json_events: dict, keys_to_traverse: List[int]) -> pd.DataFrame: |
|
"""Function to parse the mech info in a json format""" |
|
all_records = [] |
|
for key in keys_to_traverse: |
|
try: |
|
json_input = json_events[key] |
|
output = {} |
|
output["request_id"] = json_input["requestId"] |
|
output["request_block"] = json_input["blockNumber"] |
|
output["request_time"] = transform_timestamp_to_datetime( |
|
int(json_input["blockTimestamp"]) |
|
) |
|
output["tx_hash"] = json_input["transactionHash"] |
|
output["prompt_request"] = json_input["ipfsContents"]["prompt"] |
|
output["tool"] = json_input["ipfsContents"]["tool"] |
|
output["nonce"] = json_input["ipfsContents"]["nonce"] |
|
output["trader_address"] = json_input["sender"] |
|
output["deliver_block"] = json_input["deliver"]["blockNumber"] |
|
error_value, error_message, prediction_params = get_result_values( |
|
json_input["deliver"]["ipfsContents"]["result"] |
|
) |
|
error_message_value = json_input.get("error_message", error_message) |
|
output["error"] = error_value |
|
output["error_message"] = error_message_value |
|
output["prompt_response"] = json_input["deliver"]["ipfsContents"]["prompt"] |
|
output["mech_address"] = json_input["deliver"]["sender"] |
|
p_yes_value, p_no_value, confidence_value, info_utility_value = ( |
|
get_prediction_values(prediction_params) |
|
) |
|
output["p_yes"] = p_yes_value |
|
output["p_no"] = p_no_value |
|
output["confidence"] = confidence_value |
|
output["info_utility"] = info_utility_value |
|
output["vote"] = get_vote(p_yes_value, p_no_value) |
|
output["win_probability"] = get_win_probability(p_yes_value, p_no_value) |
|
all_records.append(output) |
|
except Exception as e: |
|
print(e) |
|
print(f"Error parsing the key ={key}. Noted as error") |
|
output["error"] = 1 |
|
output["error_message"] = "Response parsing error" |
|
output["p_yes"] = None |
|
output["p_no"] = None |
|
output["confidence"] = None |
|
output["info_utility"] = None |
|
output["vote"] = None |
|
output["win_probability"] = None |
|
all_records.append(output) |
|
|
|
return pd.DataFrame.from_dict(all_records, orient="columns") |
|
|
|
|
|
def transform_request(contents: pd.DataFrame) -> pd.DataFrame: |
|
"""Transform the requests dataframe.""" |
|
return clean(contents) |
|
|
|
|
|
def transform_deliver(contents: pd.DataFrame) -> pd.DataFrame: |
|
"""Transform the delivers dataframe.""" |
|
unpacked_result = pd.json_normalize(contents.result) |
|
|
|
if "result" in unpacked_result.columns: |
|
unpacked_result.drop(columns=["result"], inplace=True) |
|
|
|
|
|
if "prompt" in unpacked_result.columns: |
|
unpacked_result.drop(columns=["prompt"], inplace=True) |
|
|
|
|
|
unpacked_result.rename(columns={"prompt": "prompt_deliver"}, inplace=True) |
|
contents = pd.concat((contents, unpacked_result), axis=1) |
|
|
|
if "result" in contents.columns: |
|
contents.drop(columns=["result"], inplace=True) |
|
|
|
if "prompt" in contents.columns: |
|
contents.drop(columns=["prompt"], inplace=True) |
|
|
|
return clean(contents) |
|
|
|
|
|
def parse_store_json_events_parallel(json_events: Dict[str, Any], output_filename: str): |
|
total_nr_events = len(json_events) |
|
ids_to_traverse = list(json_events.keys()) |
|
print(f"Parsing {total_nr_events} events") |
|
contents = [] |
|
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: |
|
futures = [] |
|
for i in range(0, total_nr_events, GET_CONTENTS_BATCH_SIZE): |
|
futures.append( |
|
executor.submit( |
|
parse_json_events, |
|
json_events, |
|
ids_to_traverse[i : i + GET_CONTENTS_BATCH_SIZE], |
|
) |
|
) |
|
|
|
for future in tqdm( |
|
as_completed(futures), |
|
total=len(futures), |
|
desc=f"Fetching json contents", |
|
): |
|
current_mech_contents = future.result() |
|
contents.append(current_mech_contents) |
|
|
|
tools = pd.concat(contents, ignore_index=True) |
|
print(f"Adding market creators info. Length of the tools file = {len(tools)}") |
|
tools = add_market_creator(tools) |
|
print( |
|
f"Length of the tools dataframe after adding market creators info= {len(tools)}" |
|
) |
|
print(tools.info()) |
|
try: |
|
if "result" in tools.columns: |
|
tools = tools.drop(columns=["result"]) |
|
tools.to_parquet(DATA_DIR / output_filename, index=False) |
|
except Exception as e: |
|
print(f"Failed to write tools data: {e}") |
|
|
|
return tools |
|
|
|
|
|
def generate_tools_file(input_filename: str, output_filename: str): |
|
"""Function to parse the json mech events and generate the parquet tools file""" |
|
try: |
|
with open(JSON_DATA_DIR / input_filename, "r") as file: |
|
file_contents = json.load(file) |
|
parse_store_json_events_parallel(file_contents, output_filename) |
|
except Exception as e: |
|
print(f"An Exception happened while parsing the json events {e}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
generate_tools_file() |
|
|