updating week format starting on Monday, new staking contracts and new weekly data
285f2a6
# -*- coding: utf-8 -*- | |
# ------------------------------------------------------------------------------ | |
# | |
# Copyright 2023 Valory AG | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
# ------------------------------------------------------------------------------ | |
import functools | |
import warnings | |
from datetime import datetime, timedelta | |
from typing import Optional, Generator, Callable | |
import pandas as pd | |
import requests | |
from tqdm import tqdm | |
from typing import List, Dict | |
from utils import SUBGRAPH_API_KEY, DATA_DIR, TMP_DIR, transform_to_datetime | |
from web3_utils import ( | |
FPMM_QS_CREATOR, | |
FPMM_PEARL_CREATOR, | |
query_omen_xdai_subgraph, | |
OMEN_SUBGRAPH_URL, | |
) | |
from queries import ( | |
FPMMS_QUERY, | |
ID_FIELD, | |
DATA_FIELD, | |
ANSWER_FIELD, | |
QUERY_FIELD, | |
TITLE_FIELD, | |
OUTCOMES_FIELD, | |
ERROR_FIELD, | |
QUESTION_FIELD, | |
FPMMS_FIELD, | |
) | |
ResponseItemType = List[Dict[str, str]] | |
SubgraphResponseType = Dict[str, ResponseItemType] | |
BATCH_SIZE = 1000 | |
DEFAULT_TO_TIMESTAMP = 2147483647 # around year 2038 | |
DEFAULT_FROM_TIMESTAMP = 0 | |
MAX_UINT_HEX = "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" | |
DEFAULT_FILENAME = "fpmms.parquet" | |
market_creators_map = {"quickstart": FPMM_QS_CREATOR, "pearl": FPMM_PEARL_CREATOR} | |
class RetriesExceeded(Exception): | |
"""Exception to raise when retries are exceeded during data-fetching.""" | |
def __init__( | |
self, msg="Maximum retries were exceeded while trying to fetch the data!" | |
): | |
super().__init__(msg) | |
def hacky_retry(func: Callable, n_retries: int = 3) -> Callable: | |
"""Create a hacky retry strategy. | |
Unfortunately, we cannot use `requests.packages.urllib3.util.retry.Retry`, | |
because the subgraph does not return the appropriate status codes in case of failure. | |
Instead, it always returns code 200. Thus, we raise exceptions manually inside `make_request`, | |
catch those exceptions in the hacky retry decorator and try again. | |
Finally, if the allowed number of retries is exceeded, we raise a custom `RetriesExceeded` exception. | |
:param func: the input request function. | |
:param n_retries: the maximum allowed number of retries. | |
:return: The request method with the hacky retry strategy applied. | |
""" | |
def wrapper_hacky_retry(*args, **kwargs) -> SubgraphResponseType: | |
"""The wrapper for the hacky retry. | |
:return: a response dictionary. | |
""" | |
retried = 0 | |
while retried <= n_retries: | |
try: | |
if retried > 0: | |
warnings.warn(f"Retrying {retried}/{n_retries}...") | |
return func(*args, **kwargs) | |
except (ValueError, ConnectionError) as e: | |
warnings.warn(e.args[0]) | |
finally: | |
retried += 1 | |
raise RetriesExceeded() | |
return wrapper_hacky_retry | |
def query_subgraph(url: str, query: str, key: str) -> SubgraphResponseType: | |
"""Query a subgraph. | |
Args: | |
url: the subgraph's URL. | |
query: the query to be used. | |
key: the key to use in order to access the required data. | |
Returns: | |
a response dictionary. | |
""" | |
content = {QUERY_FIELD: query} | |
headers = { | |
"Accept": "application/json", | |
"Content-Type": "application/json", | |
} | |
res = requests.post(url, json=content, headers=headers) | |
if res.status_code != 200: | |
raise ConnectionError( | |
"Something went wrong while trying to communicate with the subgraph " | |
f"(Error: {res.status_code})!\n{res.text}" | |
) | |
body = res.json() | |
if ERROR_FIELD in body.keys(): | |
raise ValueError(f"The given query is not correct: {body[ERROR_FIELD]}") | |
data = body.get(DATA_FIELD, {}).get(key, None) | |
if data is None: | |
raise ValueError(f"Unknown error encountered!\nRaw response: \n{body}") | |
return data | |
def transform_fpmmTrades(df: pd.DataFrame) -> pd.DataFrame: | |
print("Transforming trades dataframe") | |
# convert creator to address | |
df["creator"] = df["creator"].apply(lambda x: x["id"]) | |
# normalize fpmm column | |
fpmm = pd.json_normalize(df["fpmm"]) | |
fpmm.columns = [f"fpmm.{col}" for col in fpmm.columns] | |
df = pd.concat([df, fpmm], axis=1) | |
# drop fpmm column | |
df.drop(["fpmm"], axis=1, inplace=True) | |
# change creator to creator_address | |
df.rename(columns={"creator": "trader_address"}, inplace=True) | |
return df | |
def create_fpmmTrades( | |
from_timestamp: int = DEFAULT_FROM_TIMESTAMP, | |
to_timestamp: int = DEFAULT_TO_TIMESTAMP, | |
): | |
"""Create fpmmTrades for all trades.""" | |
print("Getting trades from Quickstart markets") | |
# Quickstart trades | |
qs_trades_json = query_omen_xdai_subgraph( | |
trader_category="quickstart", | |
from_timestamp=from_timestamp, | |
to_timestamp=to_timestamp, | |
fpmm_from_timestamp=from_timestamp, | |
fpmm_to_timestamp=to_timestamp, | |
) | |
print(f"length of the qs_trades_json dataset {len(qs_trades_json)}") | |
# convert to dataframe | |
qs_df = pd.DataFrame(qs_trades_json["data"]["fpmmTrades"]) | |
qs_df["market_creator"] = "quickstart" | |
qs_df = transform_fpmmTrades(qs_df) | |
# Pearl trades | |
print("Getting trades from Pearl markets") | |
pearl_trades_json = query_omen_xdai_subgraph( | |
trader_category="pearl", | |
from_timestamp=from_timestamp, | |
to_timestamp=DEFAULT_TO_TIMESTAMP, | |
fpmm_from_timestamp=from_timestamp, | |
fpmm_to_timestamp=DEFAULT_TO_TIMESTAMP, | |
) | |
print(f"length of the pearl_trades_json dataset {len(pearl_trades_json)}") | |
# convert to dataframe | |
pearl_df = pd.DataFrame(pearl_trades_json["data"]["fpmmTrades"]) | |
pearl_df["market_creator"] = "pearl" | |
pearl_df = transform_fpmmTrades(pearl_df) | |
return pd.concat([qs_df, pearl_df], ignore_index=True) | |
def fpmms_fetcher(trader_category: str) -> Generator[ResponseItemType, int, None]: | |
"""An indefinite fetcher for the FPMMs.""" | |
omen_subgraph = OMEN_SUBGRAPH_URL.substitute(subgraph_api_key=SUBGRAPH_API_KEY) | |
print(f"omen_subgraph = {omen_subgraph}") | |
if trader_category == "pearl": | |
creator_id = FPMM_PEARL_CREATOR | |
else: # quickstart | |
creator_id = FPMM_QS_CREATOR | |
while True: | |
fpmm_id = yield | |
fpmms_query = FPMMS_QUERY.substitute( | |
creator=creator_id, | |
fpmm_id=fpmm_id, | |
fpmms_field=FPMMS_FIELD, | |
first=BATCH_SIZE, | |
id_field=ID_FIELD, | |
answer_field=ANSWER_FIELD, | |
question_field=QUESTION_FIELD, | |
outcomes_field=OUTCOMES_FIELD, | |
title_field=TITLE_FIELD, | |
) | |
print(f"markets query = {fpmms_query}") | |
yield query_subgraph(omen_subgraph, fpmms_query, FPMMS_FIELD) | |
def fetch_qs_fpmms() -> pd.DataFrame: | |
"""Fetch all the fpmms of the creator.""" | |
latest_id = "" | |
fpmms = [] | |
trader_category = "quickstart" | |
print(f"Getting markets for {trader_category}") | |
fetcher = fpmms_fetcher(trader_category) | |
for _ in tqdm(fetcher, unit="fpmms", unit_scale=BATCH_SIZE): | |
batch = fetcher.send(latest_id) | |
if len(batch) == 0: | |
break | |
latest_id = batch[-1].get(ID_FIELD, "") | |
if latest_id == "": | |
raise ValueError(f"Unexpected data format retrieved: {batch}") | |
fpmms.extend(batch) | |
return pd.DataFrame(fpmms) | |
def fetch_pearl_fpmms() -> pd.DataFrame: | |
"""Fetch all the fpmms of the creator.""" | |
latest_id = "" | |
fpmms = [] | |
trader_category = "pearl" | |
print(f"Getting markets for {trader_category}") | |
fetcher = fpmms_fetcher(trader_category) | |
for _ in tqdm(fetcher, unit="fpmms", unit_scale=BATCH_SIZE): | |
batch = fetcher.send(latest_id) | |
if len(batch) == 0: | |
break | |
latest_id = batch[-1].get(ID_FIELD, "") | |
if latest_id == "": | |
raise ValueError(f"Unexpected data format retrieved: {batch}") | |
fpmms.extend(batch) | |
return pd.DataFrame(fpmms) | |
def get_answer(fpmm: pd.Series) -> str: | |
"""Get an answer from its index, using Series of an FPMM.""" | |
return fpmm[QUESTION_FIELD][OUTCOMES_FIELD][fpmm[ANSWER_FIELD]] | |
def transform_fpmms(fpmms: pd.DataFrame) -> pd.DataFrame: | |
"""Transform an FPMMS dataframe.""" | |
transformed = fpmms.dropna() | |
transformed = transformed.drop_duplicates([ID_FIELD]) | |
transformed = transformed.loc[transformed[ANSWER_FIELD] != MAX_UINT_HEX] | |
transformed.loc[:, ANSWER_FIELD] = ( | |
transformed[ANSWER_FIELD].str.slice(-1).astype(int) | |
) | |
transformed.loc[:, ANSWER_FIELD] = transformed.apply(get_answer, axis=1) | |
transformed = transformed.drop(columns=[QUESTION_FIELD]) | |
return transformed | |
def etl(filename: Optional[str] = None) -> pd.DataFrame: | |
"""Fetch, process, store and return the markets as a Dataframe.""" | |
qs_fpmms = fetch_qs_fpmms() | |
qs_fpmms = transform_fpmms(qs_fpmms) | |
qs_fpmms["market_creator"] = "quickstart" | |
print(f"Results for the market creator quickstart. Len = {len(qs_fpmms)}") | |
pearl_fpmms = fetch_pearl_fpmms() | |
pearl_fpmms = transform_fpmms(pearl_fpmms) | |
pearl_fpmms["market_creator"] = "pearl" | |
print(f"Results for the market creator pearl. Len = {len(pearl_fpmms)}") | |
fpmms = pd.concat([qs_fpmms, pearl_fpmms], ignore_index=True) | |
if filename: | |
fpmms.to_parquet(DATA_DIR / filename, index=False) | |
return fpmms | |
def read_global_trades_file() -> pd.DataFrame: | |
try: | |
trades_filename = "fpmmTrades.parquet" | |
fpmms_trades = pd.read_parquet(TMP_DIR / trades_filename) | |
except FileNotFoundError: | |
print("Error: fpmmTrades.parquet not found. No market creator added") | |
return | |
return fpmms_trades | |
def add_market_creator(tools: pd.DataFrame) -> None: | |
# Check if fpmmTrades.parquet is in the same directory | |
fpmms_trades = read_global_trades_file() | |
tools["market_creator"] = "" | |
# traverse the list of traders | |
tools_no_market_creator = 0 | |
traders_list = list(tools.trader_address.unique()) | |
for trader_address in traders_list: | |
market_creator = "" | |
try: | |
trades = fpmms_trades[fpmms_trades["trader_address"] == trader_address] | |
market_creator = trades.iloc[0]["market_creator"] # first value is enough | |
except Exception: | |
print(f"ERROR getting the market creator of {trader_address}") | |
tools_no_market_creator += 1 | |
continue | |
# update | |
tools.loc[tools["trader_address"] == trader_address, "market_creator"] = ( | |
market_creator | |
) | |
# filter those tools where we don't have market creator info | |
tools = tools.loc[tools["market_creator"] != ""] | |
print(f"Number of tools with no market creator info = {tools_no_market_creator}") | |
return tools | |
def fpmmTrades_etl( | |
trades_filename: str, from_timestamp: int, to_timestamp: int = DEFAULT_TO_TIMESTAMP | |
) -> None: | |
print("Generating the trades file") | |
try: | |
fpmmTrades = create_fpmmTrades( | |
from_timestamp=from_timestamp, to_timestamp=to_timestamp | |
) | |
except FileNotFoundError: | |
print(f"Error creating {trades_filename} file .") | |
# make sure trader_address is in the columns | |
assert "trader_address" in fpmmTrades.columns, "trader_address column not found" | |
# lowercase and strip creator_address | |
fpmmTrades["trader_address"] = fpmmTrades["trader_address"].str.lower().str.strip() | |
fpmmTrades.to_parquet(DATA_DIR / trades_filename, index=False) | |
return | |
def check_current_week_data(trades_df: pd.DataFrame) -> pd.DataFrame: | |
"""Function to check if all current weeks data is present, if not, then add the missing data from previous file""" | |
# Get current date | |
now = datetime.now() | |
# Get start of the current week (Monday) | |
start_of_week = now - timedelta(days=now.weekday()) | |
start_of_week = start_of_week.replace(hour=0, minute=0, second=0, microsecond=0) | |
print(f"start of the week = {start_of_week}") | |
trades_df["creation_timestamp"] = pd.to_datetime(trades_df["creationTimestamp"]) | |
trades_df["creation_date"] = trades_df["creation_timestamp"].dt.date | |
trades_df["creation_date"] = pd.to_datetime(trades_df["creation_date"]) | |
# Check dataframe | |
min_date = min(trades_df.creation_date) | |
if min_date > start_of_week: | |
# missing data of current week in the trades file | |
fpmms_trades = read_global_trades_file() | |
# get missing data | |
missing_data = fpmms_trades[ | |
(fpmms_trades["creation_date"] >= start_of_week) | |
& (fpmms_trades["creation_date"] < min_date) | |
] | |
merge_df = pd.concat([trades_df, missing_data], ignore_index=True) | |
merge_df.drop_duplicates("id", keep="last", inplace=True) | |
return merge_df | |
# no update needed | |
return trades_df | |
if __name__ == "__main__": | |
etl("all_fpmms.parquet") | |