updating week format starting on Monday, new staking contracts and new weekly data
285f2a6
# -*- coding: utf-8 -*- | |
# ------------------------------------------------------------------------------ | |
# | |
# Copyright 2023 Valory AG | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
# ------------------------------------------------------------------------------ | |
import time | |
import pandas as pd | |
from typing import Any | |
from enum import Enum | |
from tqdm import tqdm | |
import numpy as np | |
import os | |
from web3_utils import query_conditional_tokens_gc_subgraph | |
from get_mech_info import ( | |
DATETIME_60_DAYS_AGO, | |
update_tools_parquet, | |
update_all_trades_parquet, | |
) | |
from utils import ( | |
wei_to_unit, | |
convert_hex_to_int, | |
JSON_DATA_DIR, | |
DATA_DIR, | |
DEFAULT_MECH_FEE, | |
TMP_DIR, | |
) | |
from staking import label_trades_by_staking | |
from nr_mech_calls import ( | |
create_unknown_traders_df, | |
transform_to_datetime, | |
compute_mech_calls_based_on_timestamps, | |
) | |
DUST_THRESHOLD = 10000000000000 | |
INVALID_ANSWER = -1 | |
DEFAULT_60_DAYS_AGO_TIMESTAMP = (DATETIME_60_DAYS_AGO).timestamp() | |
WXDAI_CONTRACT_ADDRESS = "0xe91D153E0b41518A2Ce8Dd3D7944Fa863463a97d" | |
DUST_THRESHOLD = 10000000000000 | |
class MarketState(Enum): | |
"""Market state""" | |
OPEN = 1 | |
PENDING = 2 | |
FINALIZING = 3 | |
ARBITRATING = 4 | |
CLOSED = 5 | |
def __str__(self) -> str: | |
"""Prints the market status.""" | |
return self.name.capitalize() | |
class MarketAttribute(Enum): | |
"""Attribute""" | |
NUM_TRADES = "Num_trades" | |
WINNER_TRADES = "Winner_trades" | |
NUM_REDEEMED = "Num_redeemed" | |
INVESTMENT = "Investment" | |
FEES = "Fees" | |
MECH_CALLS = "Mech_calls" | |
MECH_FEES = "Mech_fees" | |
EARNINGS = "Earnings" | |
NET_EARNINGS = "Net_earnings" | |
REDEMPTIONS = "Redemptions" | |
ROI = "ROI" | |
def __str__(self) -> str: | |
"""Prints the attribute.""" | |
return self.value | |
def __repr__(self) -> str: | |
"""Prints the attribute representation.""" | |
return self.name | |
def argparse(s: str) -> "MarketAttribute": | |
"""Performs string conversion to MarketAttribute.""" | |
try: | |
return MarketAttribute[s.upper()] | |
except KeyError as e: | |
raise ValueError(f"Invalid MarketAttribute: {s}") from e | |
ALL_TRADES_STATS_DF_COLS = [ | |
"trader_address", | |
"market_creator", | |
"trade_id", | |
"creation_timestamp", | |
"title", | |
"market_status", | |
"collateral_amount", | |
"outcome_index", | |
"trade_fee_amount", | |
"outcomes_tokens_traded", | |
"current_answer", | |
"is_invalid", | |
"winning_trade", | |
"earnings", | |
"redeemed", | |
"redeemed_amount", | |
"num_mech_calls", | |
"mech_fee_amount", | |
"net_earnings", | |
"roi", | |
] | |
def _is_redeemed(user_json: dict[str, Any], fpmmTrade: dict[str, Any]) -> bool: | |
"""Returns whether the user has redeemed the position.""" | |
user_positions = user_json["data"]["user"]["userPositions"] | |
condition_id = fpmmTrade["fpmm.condition.id"] | |
for position in user_positions: | |
position_condition_ids = position["position"]["conditionIds"] | |
balance = int(position["balance"]) | |
if condition_id in position_condition_ids: | |
if balance == 0: | |
return True | |
# return early | |
return False | |
return False | |
def prepare_profitalibity_data( | |
tools_filename: str, | |
trades_filename: str, | |
) -> pd.DataFrame: | |
"""Prepare data for profitalibity analysis.""" | |
# Check if tools.parquet is in the same directory | |
try: | |
# tools parquet file | |
tools = pd.read_parquet(DATA_DIR / tools_filename) | |
# make sure creator_address is in the columns | |
assert "trader_address" in tools.columns, "trader_address column not found" | |
# lowercase and strip creator_address | |
tools["trader_address"] = tools["trader_address"].str.lower().str.strip() | |
tools.drop_duplicates( | |
subset=["request_id", "request_block"], keep="last", inplace=True | |
) | |
tools.to_parquet(DATA_DIR / tools_filename) | |
print(f"{tools_filename} loaded") | |
except FileNotFoundError: | |
print(f"{tools_filename} not found.") | |
return | |
# Check if fpmmTrades.parquet is in the same directory | |
print("Reading the new trades file") | |
try: | |
fpmmTrades = pd.read_parquet(DATA_DIR / trades_filename) | |
except FileNotFoundError: | |
print(f"Error reading {trades_filename} file .") | |
# make sure trader_address is in the columns | |
assert "trader_address" in fpmmTrades.columns, "trader_address column not found" | |
# lowercase and strip creator_address | |
fpmmTrades["trader_address"] = fpmmTrades["trader_address"].str.lower().str.strip() | |
return fpmmTrades | |
def determine_market_status(trade, current_answer): | |
"""Determine the market status of a trade.""" | |
if (current_answer is np.nan or current_answer is None) and time.time() >= int( | |
trade["fpmm.openingTimestamp"] | |
): | |
return MarketState.PENDING | |
elif current_answer is np.nan or current_answer is None: | |
return MarketState.OPEN | |
elif trade["fpmm.isPendingArbitration"]: | |
return MarketState.ARBITRATING | |
elif time.time() < int(trade["fpmm.answerFinalizedTimestamp"]): | |
return MarketState.FINALIZING | |
return MarketState.CLOSED | |
def analyse_trader( | |
trader_address: str, | |
fpmmTrades: pd.DataFrame, | |
trader_estimated_mech_calls: pd.DataFrame, | |
daily_info: bool = False, | |
) -> pd.DataFrame: | |
"""Analyse a trader's trades""" | |
fpmmTrades["creation_timestamp"] = pd.to_datetime(fpmmTrades["creationTimestamp"]) | |
fpmmTrades["creation_date"] = fpmmTrades["creation_timestamp"].dt.date | |
# Filter trades and tools for the given trader | |
trades = fpmmTrades[fpmmTrades["trader_address"] == trader_address] | |
# Prepare the DataFrame | |
trades_df = pd.DataFrame(columns=ALL_TRADES_STATS_DF_COLS) | |
if trades.empty: | |
return trades_df | |
# Fetch user's conditional tokens gc graph | |
try: | |
user_json = query_conditional_tokens_gc_subgraph(trader_address) | |
except Exception as e: | |
print(f"Error fetching user data: {e}") | |
return trades_df | |
# Iterate over the trades | |
trades_answer_nan = 0 | |
trades_no_closed_market = 0 | |
for i, trade in tqdm(trades.iterrows(), total=len(trades), desc="Analysing trades"): | |
try: | |
market_answer = trade["fpmm.currentAnswer"] | |
trading_day = trade["creation_date"] | |
trade_id = trade["id"] | |
if not daily_info and not market_answer: | |
# print(f"Skipping trade {i} because currentAnswer is NaN") | |
trades_answer_nan += 1 | |
continue | |
# Parsing and computing shared values | |
collateral_amount = wei_to_unit(float(trade["collateralAmount"])) | |
fee_amount = wei_to_unit(float(trade["feeAmount"])) | |
outcome_tokens_traded = wei_to_unit(float(trade["outcomeTokensTraded"])) | |
earnings, winner_trade = (0, False) | |
redemption = _is_redeemed(user_json, trade) | |
current_answer = market_answer if market_answer else None | |
market_creator = trade["market_creator"] | |
# Determine market status | |
market_status = determine_market_status(trade, current_answer) | |
# Skip non-closed markets | |
if not daily_info and market_status != MarketState.CLOSED: | |
# print( | |
# f"Skipping trade {i} because market is not closed. Market Status: {market_status}" | |
# ) | |
trades_no_closed_market += 1 | |
continue | |
if current_answer is not None: | |
current_answer = convert_hex_to_int(current_answer) | |
# Compute invalidity | |
is_invalid = current_answer == INVALID_ANSWER | |
# Compute earnings and winner trade status | |
if current_answer is None: | |
earnings = 0.0 | |
winner_trade = None | |
elif is_invalid: | |
earnings = collateral_amount | |
winner_trade = False | |
elif int(trade["outcomeIndex"]) == current_answer: | |
earnings = outcome_tokens_traded | |
winner_trade = True | |
# Compute mech calls using the title, and trade id | |
if daily_info: | |
total_mech_calls = trader_estimated_mech_calls.loc[ | |
(trader_estimated_mech_calls["trading_day"] == trading_day), | |
"total_mech_calls", | |
].iloc[0] | |
else: | |
total_mech_calls = trader_estimated_mech_calls.loc[ | |
(trader_estimated_mech_calls["market"] == trade["title"]) | |
& (trader_estimated_mech_calls["trade_id"] == trade_id), | |
"total_mech_calls", | |
].iloc[0] | |
net_earnings = ( | |
earnings | |
- fee_amount | |
- (total_mech_calls * DEFAULT_MECH_FEE) | |
- collateral_amount | |
) | |
# Assign values to DataFrame | |
trades_df.loc[i] = { | |
"trader_address": trader_address, | |
"market_creator": market_creator, | |
"trade_id": trade["id"], | |
"market_status": market_status.name, | |
"creation_timestamp": trade["creationTimestamp"], | |
"title": trade["title"], | |
"collateral_amount": collateral_amount, | |
"outcome_index": trade["outcomeIndex"], | |
"trade_fee_amount": fee_amount, | |
"outcomes_tokens_traded": outcome_tokens_traded, | |
"current_answer": current_answer, | |
"is_invalid": is_invalid, | |
"winning_trade": winner_trade, | |
"earnings": earnings, | |
"redeemed": redemption, | |
"redeemed_amount": earnings if redemption else 0, | |
"num_mech_calls": total_mech_calls, | |
"mech_fee_amount": total_mech_calls * DEFAULT_MECH_FEE, | |
"net_earnings": net_earnings, | |
"roi": net_earnings | |
/ ( | |
collateral_amount + fee_amount + total_mech_calls * DEFAULT_MECH_FEE | |
), | |
} | |
except Exception as e: | |
print(f"Error processing trade {i}: {e}") | |
print(trade) | |
continue | |
print(f"Number of trades where currentAnswer is NaN = {trades_answer_nan}") | |
print( | |
f"Number of trades where the market is not closed = {trades_no_closed_market}" | |
) | |
return trades_df | |
def analyse_all_traders( | |
trades: pd.DataFrame, | |
estimated_mech_calls: pd.DataFrame, | |
daily_info: bool = False, | |
) -> pd.DataFrame: | |
"""Analyse all creators.""" | |
all_traders = [] | |
for trader in tqdm( | |
trades["trader_address"].unique(), | |
total=len(trades["trader_address"].unique()), | |
desc="Analysing creators", | |
): | |
trader_estimated_mech_calls = estimated_mech_calls.loc[ | |
estimated_mech_calls["trader_address"] == trader | |
] | |
all_traders.append( | |
analyse_trader(trader, trades, trader_estimated_mech_calls, daily_info) | |
) | |
# concat all creators | |
all_creators_df = pd.concat(all_traders) | |
return all_creators_df | |
def run_profitability_analysis( | |
tools_filename: str, | |
trades_filename: str, | |
merge: bool = False, | |
): | |
"""Create all trades analysis.""" | |
# load dfs from data folder for analysis | |
print(f"Preparing data with {tools_filename} and {trades_filename}") | |
fpmmTrades = prepare_profitalibity_data(tools_filename, trades_filename) | |
if merge: | |
update_tools_parquet(tools_filename) | |
tools = pd.read_parquet(TMP_DIR / "tools.parquet") | |
try: | |
fpmmTrades["creationTimestamp"] = fpmmTrades["creationTimestamp"].apply( | |
lambda x: transform_to_datetime(x) | |
) | |
except Exception as e: | |
print(f"Transformation not needed") | |
print("Computing the estimated mech calls dataset") | |
trade_mech_calls = compute_mech_calls_based_on_timestamps( | |
fpmmTrades=fpmmTrades, tools=tools | |
) | |
trade_mech_calls.to_parquet(TMP_DIR / "trade_mech_calls.parquet") | |
print(trade_mech_calls.total_mech_calls.describe()) | |
print("Analysing trades...") | |
all_trades_df = analyse_all_traders(fpmmTrades, trade_mech_calls) | |
# # merge previous files if requested | |
if merge: | |
all_trades_df = update_all_trades_parquet(all_trades_df) | |
# debugging purposes | |
all_trades_df.to_parquet(JSON_DATA_DIR / "all_trades_df.parquet", index=False) | |
# filter invalid markets. Condition: "is_invalid" is True | |
invalid_trades = all_trades_df.loc[all_trades_df["is_invalid"] == True] | |
if len(invalid_trades) == 0: | |
print("No new invalid trades") | |
else: | |
if merge: | |
try: | |
print("Merging invalid trades parquet file") | |
old_invalid_trades = pd.read_parquet( | |
DATA_DIR / "invalid_trades.parquet" | |
) | |
merge_df = pd.concat( | |
[old_invalid_trades, invalid_trades], ignore_index=True | |
) | |
invalid_trades = merge_df.drop_duplicates() | |
except Exception as e: | |
print(f"Error updating the invalid trades parquet {e}") | |
invalid_trades.to_parquet(DATA_DIR / "invalid_trades.parquet", index=False) | |
all_trades_df = all_trades_df.loc[all_trades_df["is_invalid"] == False] | |
all_trades_df = label_trades_by_staking(trades_df=all_trades_df) | |
print("Creating unknown traders dataset") | |
unknown_traders_df, all_trades_df = create_unknown_traders_df( | |
trades_df=all_trades_df | |
) | |
# merge with previous unknown traders dataset | |
previous_unknown_traders = pd.read_parquet(DATA_DIR / "unknown_traders.parquet") | |
unknown_traders_df: pd.DataFrame = pd.concat( | |
[unknown_traders_df, previous_unknown_traders], ignore_index=True | |
) | |
unknown_traders_df.drop_duplicates("trade_id", keep="last", inplace=True) | |
unknown_traders_df.to_parquet(DATA_DIR / "unknown_traders.parquet", index=False) | |
# save to parquet | |
all_trades_df.to_parquet(DATA_DIR / "all_trades_profitability.parquet", index=False) | |
print("Done!") | |
return all_trades_df | |
def add_trades_profitability(trades_filename: str): | |
print("Reading the trades file") | |
try: | |
fpmmTrades = pd.read_parquet(DATA_DIR / trades_filename) | |
except FileNotFoundError: | |
print(f"Error reading {trades_filename} file .") | |
# make sure trader_address is in the columns | |
assert "trader_address" in fpmmTrades.columns, "trader_address column not found" | |
# lowercase and strip creator_address | |
fpmmTrades["trader_address"] = fpmmTrades["trader_address"].str.lower().str.strip() | |
print("Reading tools parquet file") | |
tools = pd.read_parquet(TMP_DIR / "tools.parquet") | |
try: | |
fpmmTrades["creationTimestamp"] = fpmmTrades["creationTimestamp"].apply( | |
lambda x: transform_to_datetime(x) | |
) | |
except Exception as e: | |
print(f"Transformation not needed") | |
print("Computing the estimated mech calls dataset") | |
trade_mech_calls = compute_mech_calls_based_on_timestamps( | |
fpmmTrades=fpmmTrades, tools=tools | |
) | |
print(trade_mech_calls.total_mech_calls.describe()) | |
print("Analysing trades...") | |
all_trades_df = analyse_all_traders(fpmmTrades, trade_mech_calls) | |
# debugging purposes | |
all_trades_df.to_parquet(JSON_DATA_DIR / "missing_trades_df.parquet", index=False) | |
# filter invalid markets. Condition: "is_invalid" is True | |
print("Checking invalid trades") | |
invalid_trades = all_trades_df.loc[all_trades_df["is_invalid"] == True] | |
if len(invalid_trades) > 0: | |
try: | |
print("Merging invalid trades parquet file") | |
old_invalid_trades = pd.read_parquet(DATA_DIR / "invalid_trades.parquet") | |
merge_df = pd.concat( | |
[old_invalid_trades, invalid_trades], ignore_index=True | |
) | |
invalid_trades = merge_df.drop_duplicates("trade_id") | |
except Exception as e: | |
print(f"Error updating the invalid trades parquet {e}") | |
invalid_trades.to_parquet(DATA_DIR / "invalid_trades.parquet", index=False) | |
all_trades_df = all_trades_df.loc[all_trades_df["is_invalid"] == False] | |
print("Adding staking labels") | |
all_trades_df = label_trades_by_staking(trades_df=all_trades_df) | |
print("Creating unknown traders dataset") | |
unknown_traders_df, all_trades_df = create_unknown_traders_df( | |
trades_df=all_trades_df | |
) | |
if len(unknown_traders_df) > 0: | |
print("Merging unknown traders info") | |
# merge with previous unknown traders dataset | |
previous_unknown_traders = pd.read_parquet(DATA_DIR / "unknown_traders.parquet") | |
unknown_traders_df: pd.DataFrame = pd.concat( | |
[unknown_traders_df, previous_unknown_traders], ignore_index=True | |
) | |
unknown_traders_df.drop_duplicates("trade_id", keep="last", inplace=True) | |
unknown_traders_df.to_parquet(DATA_DIR / "unknown_traders.parquet", index=False) | |
print("merge with previous all_trades_profitability") | |
old_trades = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet") | |
all_trades_df: pd.DataFrame = pd.concat( | |
[all_trades_df, old_trades], ignore_index=True | |
) | |
all_trades_df.drop_duplicates("trade_id", keep="last", inplace=True) | |
all_trades_df.to_parquet(DATA_DIR / "all_trades_profitability.parquet", index=False) | |
if __name__ == "__main__": | |
# updating the whole fpmmTrades parquet file instead of just the new ones | |
# trade_mech_calls = pd.read_parquet(TMP_DIR / "result_df.parquet") | |
# fpmmTrades = pd.read_parquet(TMP_DIR / "fpmmTrades.parquet") | |
# fpmmTrades["creationTimestamp"] = fpmmTrades["creationTimestamp"].apply( | |
# lambda x: transform_to_datetime(x) | |
# ) | |
# all_trades_df = analyse_all_traders(fpmmTrades, trade_mech_calls) | |
# all_trades_df.to_parquet(TMP_DIR / "all_trades_df.parquet", index=False) | |
run_profitability_analysis("file1", "file2") | |