rosacastillo's picture
weekly data and some fixes
b60f995
raw
history blame
11.3 kB
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
#
# Copyright 2023 Valory AG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ------------------------------------------------------------------------------
import json
from typing import (
Optional,
List,
Dict,
Union,
Any,
)
import pandas as pd
import requests
from datetime import datetime
from gnosis_timestamps import transform_timestamp_to_datetime
from requests.adapters import HTTPAdapter
from tqdm import tqdm
from urllib3 import Retry
from markets import add_market_creator
from concurrent.futures import ThreadPoolExecutor, as_completed
from web3_utils import (
N_IPFS_RETRIES,
)
from utils import (
clean,
BLOCK_FIELD,
limit_text,
DATA_DIR,
JSON_DATA_DIR,
MechEvent,
MechEventName,
MechRequest,
MechResponse,
EVENT_TO_MECH_STRUCT,
REQUEST_ID,
HTTP,
HTTPS,
get_result_values,
get_vote,
get_win_probability,
get_prediction_values,
)
CONTRACTS_PATH = "contracts"
MECH_TO_INFO = {
# this block number is when the creator had its first tx ever, and after this mech's creation
"0xff82123dfb52ab75c417195c5fdb87630145ae81": ("old_mech_abi.json", 28911547),
# this block number is when this mech was created
"0x77af31de935740567cf4ff1986d04b2c964a786a": ("new_mech_abi.json", 30776879),
}
# optionally set the latest block to stop searching for the delivered events
EVENT_ARGUMENTS = "args"
DATA = "data"
IPFS_LINKS_SERIES_NAME = "ipfs_links"
BACKOFF_FACTOR = 1
STATUS_FORCELIST = [404, 500, 502, 503, 504]
DEFAULT_FILENAME = "tools.parquet"
ABI_ERROR = "The event signature did not match the provided ABI"
# HTTP_TIMEOUT = 10
# Increasing when ipfs is slow
HTTP_TIMEOUT = 15
IRRELEVANT_TOOLS = [
"openai-text-davinci-002",
"openai-text-davinci-003",
"openai-gpt-3.5-turbo",
"openai-gpt-4",
"stabilityai-stable-diffusion-v1-5",
"stabilityai-stable-diffusion-xl-beta-v2-2-2",
"stabilityai-stable-diffusion-512-v2-1",
"stabilityai-stable-diffusion-768-v2-1",
"deepmind-optimization-strong",
"deepmind-optimization",
]
# this is how frequently we will keep a snapshot of the progress so far in terms of blocks' batches
# for example, the value 1 means that for every `BLOCKS_CHUNK_SIZE` blocks that we search,
# we also store the snapshot
SNAPSHOT_RATE = 10
NUM_WORKERS = 10
GET_CONTENTS_BATCH_SIZE = 1000
class TimestampedRetry(Retry):
def increment(self, *args, **kwargs):
print(f"Retry attempt at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
return super().increment(*args, **kwargs)
def create_session() -> requests.Session:
"""Create a session with a retry strategy."""
session = requests.Session()
retry_strategy = TimestampedRetry(
total=N_IPFS_RETRIES,
backoff_factor=BACKOFF_FACTOR,
status_forcelist=STATUS_FORCELIST,
)
adapter = HTTPAdapter(max_retries=retry_strategy)
for protocol in (HTTP, HTTPS):
session.mount(protocol, adapter)
return session
def request(
session: requests.Session, url: str, timeout: int = HTTP_TIMEOUT
) -> Optional[requests.Response]:
"""Perform a request with a session."""
try:
response = session.get(url, timeout=timeout)
response.raise_for_status()
except requests.exceptions.HTTPError as exc:
tqdm.write(f"HTTP error occurred: {exc}.")
except Exception as exc:
tqdm.write(f"Unexpected error occurred: {exc}.")
else:
return response
return None
def parse_ipfs_response(
session: requests.Session,
url: str,
event: MechEvent,
event_name: MechEventName,
response: requests.Response,
) -> Optional[Dict[str, str]]:
"""Parse a response from IPFS."""
try:
return response.json()
except requests.exceptions.JSONDecodeError:
# this is a workaround because the `metadata.json` file was introduced and removed multiple times
if event_name == MechEvent.REQUEST and url != event.ipfs_request_link:
url = event.ipfs_request_link
response = request(session, url)
if response is None:
tqdm.write(f"Skipping {event=}.")
return None
try:
return response.json()
except requests.exceptions.JSONDecodeError:
pass
tqdm.write(f"Failed to parse response into json for {url=}.")
return None
def parse_ipfs_tools_content(
raw_content: Dict[str, str], event: MechEvent, event_name: MechEventName
) -> Optional[Union[MechRequest, MechResponse]]:
"""Parse tools content from IPFS."""
struct = EVENT_TO_MECH_STRUCT.get(event_name)
raw_content[REQUEST_ID] = str(event.requestId)
raw_content[BLOCK_FIELD] = str(event.for_block)
raw_content["sender"] = str(event.sender)
try:
mech_response = struct(**raw_content)
except (ValueError, TypeError, KeyError):
tqdm.write(f"Could not parse {limit_text(str(raw_content))}")
return None
if event_name == MechEventName.REQUEST and mech_response.tool in IRRELEVANT_TOOLS:
return None
return mech_response
def parse_json_events(json_events: dict, keys_to_traverse: List[int]) -> pd.DataFrame:
"""Function to parse the mech info in a json format"""
all_records = []
for key in keys_to_traverse:
try:
json_input = json_events[key]
output = {}
output["request_id"] = json_input["requestId"]
output["request_block"] = json_input["blockNumber"]
output["request_time"] = transform_timestamp_to_datetime(
int(json_input["blockTimestamp"])
)
output["tx_hash"] = json_input["transactionHash"]
output["prompt_request"] = json_input["ipfsContents"]["prompt"]
output["tool"] = json_input["ipfsContents"]["tool"]
output["nonce"] = json_input["ipfsContents"]["nonce"]
output["trader_address"] = json_input["sender"]
output["deliver_block"] = json_input["deliver"]["blockNumber"]
error_value, error_message, prediction_params = get_result_values(
json_input["deliver"]["ipfsContents"]["result"]
)
error_message_value = json_input.get("error_message", error_message)
output["error"] = error_value
output["error_message"] = error_message_value
output["prompt_response"] = json_input["deliver"]["ipfsContents"]["prompt"]
output["mech_address"] = json_input["deliver"]["sender"]
p_yes_value, p_no_value, confidence_value, info_utility_value = (
get_prediction_values(prediction_params)
)
output["p_yes"] = p_yes_value
output["p_no"] = p_no_value
output["confidence"] = confidence_value
output["info_utility"] = info_utility_value
output["vote"] = get_vote(p_yes_value, p_no_value)
output["win_probability"] = get_win_probability(p_yes_value, p_no_value)
all_records.append(output)
except Exception as e:
print(e)
print(f"Error parsing the key ={key}. Noted as error")
output["error"] = 1
output["error_message"] = "Response parsing error"
output["p_yes"] = None
output["p_no"] = None
output["confidence"] = None
output["info_utility"] = None
output["vote"] = None
output["win_probability"] = None
all_records.append(output)
return pd.DataFrame.from_dict(all_records, orient="columns")
def transform_request(contents: pd.DataFrame) -> pd.DataFrame:
"""Transform the requests dataframe."""
return clean(contents)
def transform_deliver(contents: pd.DataFrame) -> pd.DataFrame:
"""Transform the delivers dataframe."""
unpacked_result = pd.json_normalize(contents.result)
# # drop result column if it exists
if "result" in unpacked_result.columns:
unpacked_result.drop(columns=["result"], inplace=True)
# drop prompt column if it exists
if "prompt" in unpacked_result.columns:
unpacked_result.drop(columns=["prompt"], inplace=True)
# rename prompt column to prompt_deliver
unpacked_result.rename(columns={"prompt": "prompt_deliver"}, inplace=True)
contents = pd.concat((contents, unpacked_result), axis=1)
if "result" in contents.columns:
contents.drop(columns=["result"], inplace=True)
if "prompt" in contents.columns:
contents.drop(columns=["prompt"], inplace=True)
return clean(contents)
def parse_store_json_events_parallel(json_events: Dict[str, Any], output_filename: str):
total_nr_events = len(json_events)
ids_to_traverse = list(json_events.keys())
print(f"Parsing {total_nr_events} events")
contents = []
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
futures = []
for i in range(0, total_nr_events, GET_CONTENTS_BATCH_SIZE):
futures.append(
executor.submit(
parse_json_events,
json_events,
ids_to_traverse[i : i + GET_CONTENTS_BATCH_SIZE],
)
)
for future in tqdm(
as_completed(futures),
total=len(futures),
desc=f"Fetching json contents",
):
current_mech_contents = future.result()
contents.append(current_mech_contents)
tools = pd.concat(contents, ignore_index=True)
print(f"Adding market creators info. Length of the tools file = {len(tools)}")
tools = add_market_creator(tools)
print(
f"Length of the tools dataframe after adding market creators info= {len(tools)}"
)
print(tools.info())
try:
if "result" in tools.columns:
tools = tools.drop(columns=["result"])
tools.to_parquet(DATA_DIR / output_filename, index=False)
except Exception as e:
print(f"Failed to write tools data: {e}")
return tools
def generate_tools_file(input_filename: str, output_filename: str):
"""Function to parse the json mech events and generate the parquet tools file"""
try:
with open(JSON_DATA_DIR / input_filename, "r") as file:
file_contents = json.load(file)
parse_store_json_events_parallel(file_contents, output_filename)
except Exception as e:
print(f"An Exception happened while parsing the json events {e}")
if __name__ == "__main__":
generate_tools_file()