# -*- coding: utf-8 -*- # ------------------------------------------------------------------------------ # # Copyright 2023 Valory AG # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # ------------------------------------------------------------------------------ import time import pandas as pd from typing import Any from enum import Enum from tqdm import tqdm import numpy as np import os from web3_utils import query_conditional_tokens_gc_subgraph from get_mech_info import ( DATETIME_60_DAYS_AGO, update_tools_parquet, update_all_trades_parquet, ) from utils import ( wei_to_unit, convert_hex_to_int, JSON_DATA_DIR, DATA_DIR, DEFAULT_MECH_FEE, TMP_DIR, ) from staking import label_trades_by_staking from nr_mech_calls import ( create_unknown_traders_df, transform_to_datetime, compute_mech_calls_based_on_timestamps, ) DUST_THRESHOLD = 10000000000000 INVALID_ANSWER = -1 DEFAULT_60_DAYS_AGO_TIMESTAMP = (DATETIME_60_DAYS_AGO).timestamp() WXDAI_CONTRACT_ADDRESS = "0xe91D153E0b41518A2Ce8Dd3D7944Fa863463a97d" DUST_THRESHOLD = 10000000000000 class MarketState(Enum): """Market state""" OPEN = 1 PENDING = 2 FINALIZING = 3 ARBITRATING = 4 CLOSED = 5 def __str__(self) -> str: """Prints the market status.""" return self.name.capitalize() class MarketAttribute(Enum): """Attribute""" NUM_TRADES = "Num_trades" WINNER_TRADES = "Winner_trades" NUM_REDEEMED = "Num_redeemed" INVESTMENT = "Investment" FEES = "Fees" MECH_CALLS = "Mech_calls" MECH_FEES = "Mech_fees" EARNINGS = "Earnings" NET_EARNINGS = "Net_earnings" REDEMPTIONS = "Redemptions" ROI = "ROI" def __str__(self) -> str: """Prints the attribute.""" return self.value def __repr__(self) -> str: """Prints the attribute representation.""" return self.name @staticmethod def argparse(s: str) -> "MarketAttribute": """Performs string conversion to MarketAttribute.""" try: return MarketAttribute[s.upper()] except KeyError as e: raise ValueError(f"Invalid MarketAttribute: {s}") from e ALL_TRADES_STATS_DF_COLS = [ "trader_address", "market_creator", "trade_id", "creation_timestamp", "title", "market_status", "collateral_amount", "outcome_index", "trade_fee_amount", "outcomes_tokens_traded", "current_answer", "is_invalid", "winning_trade", "earnings", "redeemed", "redeemed_amount", "num_mech_calls", "mech_fee_amount", "net_earnings", "roi", ] def _is_redeemed(user_json: dict[str, Any], fpmmTrade: dict[str, Any]) -> bool: """Returns whether the user has redeemed the position.""" user_positions = user_json["data"]["user"]["userPositions"] condition_id = fpmmTrade["fpmm.condition.id"] for position in user_positions: position_condition_ids = position["position"]["conditionIds"] balance = int(position["balance"]) if condition_id in position_condition_ids: if balance == 0: return True # return early return False return False def prepare_profitalibity_data( tools_filename: str, trades_filename: str, ) -> pd.DataFrame: """Prepare data for profitalibity analysis.""" # Check if tools.parquet is in the same directory try: # tools parquet file tools = pd.read_parquet(DATA_DIR / tools_filename) # make sure creator_address is in the columns assert "trader_address" in tools.columns, "trader_address column not found" # lowercase and strip creator_address tools["trader_address"] = tools["trader_address"].str.lower().str.strip() tools.drop_duplicates( subset=["request_id", "request_block"], keep="last", inplace=True ) tools.to_parquet(DATA_DIR / tools_filename) print(f"{tools_filename} loaded") except FileNotFoundError: print(f"{tools_filename} not found.") return # Check if fpmmTrades.parquet is in the same directory print("Reading the new trades file") try: fpmmTrades = pd.read_parquet(DATA_DIR / trades_filename) except FileNotFoundError: print(f"Error reading {trades_filename} file .") # make sure trader_address is in the columns assert "trader_address" in fpmmTrades.columns, "trader_address column not found" # lowercase and strip creator_address fpmmTrades["trader_address"] = fpmmTrades["trader_address"].str.lower().str.strip() return fpmmTrades def determine_market_status(trade, current_answer): """Determine the market status of a trade.""" if (current_answer is np.nan or current_answer is None) and time.time() >= int( trade["fpmm.openingTimestamp"] ): return MarketState.PENDING elif current_answer is np.nan or current_answer is None: return MarketState.OPEN elif trade["fpmm.isPendingArbitration"]: return MarketState.ARBITRATING elif time.time() < int(trade["fpmm.answerFinalizedTimestamp"]): return MarketState.FINALIZING return MarketState.CLOSED def analyse_trader( trader_address: str, fpmmTrades: pd.DataFrame, trader_estimated_mech_calls: pd.DataFrame, daily_info: bool = False, ) -> pd.DataFrame: """Analyse a trader's trades""" fpmmTrades["creation_timestamp"] = pd.to_datetime(fpmmTrades["creationTimestamp"]) fpmmTrades["creation_date"] = fpmmTrades["creation_timestamp"].dt.date # Filter trades and tools for the given trader trades = fpmmTrades[fpmmTrades["trader_address"] == trader_address] # Prepare the DataFrame trades_df = pd.DataFrame(columns=ALL_TRADES_STATS_DF_COLS) if trades.empty: return trades_df # Fetch user's conditional tokens gc graph try: user_json = query_conditional_tokens_gc_subgraph(trader_address) except Exception as e: print(f"Error fetching user data: {e}") return trades_df # Iterate over the trades trades_answer_nan = 0 trades_no_closed_market = 0 for i, trade in tqdm(trades.iterrows(), total=len(trades), desc="Analysing trades"): try: market_answer = trade["fpmm.currentAnswer"] trading_day = trade["creation_date"] trade_id = trade["id"] if not daily_info and not market_answer: # print(f"Skipping trade {i} because currentAnswer is NaN") trades_answer_nan += 1 continue # Parsing and computing shared values collateral_amount = wei_to_unit(float(trade["collateralAmount"])) fee_amount = wei_to_unit(float(trade["feeAmount"])) outcome_tokens_traded = wei_to_unit(float(trade["outcomeTokensTraded"])) earnings, winner_trade = (0, False) redemption = _is_redeemed(user_json, trade) current_answer = market_answer if market_answer else None market_creator = trade["market_creator"] # Determine market status market_status = determine_market_status(trade, current_answer) # Skip non-closed markets if not daily_info and market_status != MarketState.CLOSED: # print( # f"Skipping trade {i} because market is not closed. Market Status: {market_status}" # ) trades_no_closed_market += 1 continue if current_answer is not None: current_answer = convert_hex_to_int(current_answer) # Compute invalidity is_invalid = current_answer == INVALID_ANSWER # Compute earnings and winner trade status if current_answer is None: earnings = 0.0 winner_trade = None elif is_invalid: earnings = collateral_amount winner_trade = False elif int(trade["outcomeIndex"]) == current_answer: earnings = outcome_tokens_traded winner_trade = True # Compute mech calls using the title, and trade id if daily_info: total_mech_calls = trader_estimated_mech_calls.loc[ (trader_estimated_mech_calls["trading_day"] == trading_day), "total_mech_calls", ].iloc[0] else: total_mech_calls = trader_estimated_mech_calls.loc[ (trader_estimated_mech_calls["market"] == trade["title"]) & (trader_estimated_mech_calls["trade_id"] == trade_id), "total_mech_calls", ].iloc[0] net_earnings = ( earnings - fee_amount - (total_mech_calls * DEFAULT_MECH_FEE) - collateral_amount ) # Assign values to DataFrame trades_df.loc[i] = { "trader_address": trader_address, "market_creator": market_creator, "trade_id": trade["id"], "market_status": market_status.name, "creation_timestamp": trade["creationTimestamp"], "title": trade["title"], "collateral_amount": collateral_amount, "outcome_index": trade["outcomeIndex"], "trade_fee_amount": fee_amount, "outcomes_tokens_traded": outcome_tokens_traded, "current_answer": current_answer, "is_invalid": is_invalid, "winning_trade": winner_trade, "earnings": earnings, "redeemed": redemption, "redeemed_amount": earnings if redemption else 0, "num_mech_calls": total_mech_calls, "mech_fee_amount": total_mech_calls * DEFAULT_MECH_FEE, "net_earnings": net_earnings, "roi": net_earnings / ( collateral_amount + fee_amount + total_mech_calls * DEFAULT_MECH_FEE ), } except Exception as e: print(f"Error processing trade {i}: {e}") print(trade) continue print(f"Number of trades where currentAnswer is NaN = {trades_answer_nan}") print( f"Number of trades where the market is not closed = {trades_no_closed_market}" ) return trades_df def analyse_all_traders( trades: pd.DataFrame, estimated_mech_calls: pd.DataFrame, daily_info: bool = False, ) -> pd.DataFrame: """Analyse all creators.""" all_traders = [] for trader in tqdm( trades["trader_address"].unique(), total=len(trades["trader_address"].unique()), desc="Analysing creators", ): trader_estimated_mech_calls = estimated_mech_calls.loc[ estimated_mech_calls["trader_address"] == trader ] all_traders.append( analyse_trader(trader, trades, trader_estimated_mech_calls, daily_info) ) # concat all creators all_creators_df = pd.concat(all_traders) return all_creators_df def run_profitability_analysis( tools_filename: str, trades_filename: str, merge: bool = False, ): """Create all trades analysis.""" # load dfs from data folder for analysis print(f"Preparing data with {tools_filename} and {trades_filename}") fpmmTrades = prepare_profitalibity_data(tools_filename, trades_filename) if merge: update_tools_parquet(tools_filename) tools = pd.read_parquet(TMP_DIR / "tools.parquet") try: fpmmTrades["creationTimestamp"] = fpmmTrades["creationTimestamp"].apply( lambda x: transform_to_datetime(x) ) except Exception as e: print(f"Transformation not needed") print("Computing the estimated mech calls dataset") trade_mech_calls = compute_mech_calls_based_on_timestamps( fpmmTrades=fpmmTrades, tools=tools ) trade_mech_calls.to_parquet(TMP_DIR / "trade_mech_calls.parquet") print(trade_mech_calls.total_mech_calls.describe()) print("Analysing trades...") all_trades_df = analyse_all_traders(fpmmTrades, trade_mech_calls) # # merge previous files if requested if merge: all_trades_df = update_all_trades_parquet(all_trades_df) # debugging purposes all_trades_df.to_parquet(JSON_DATA_DIR / "all_trades_df.parquet", index=False) # filter invalid markets. Condition: "is_invalid" is True invalid_trades = all_trades_df.loc[all_trades_df["is_invalid"] == True] if len(invalid_trades) == 0: print("No new invalid trades") else: if merge: try: print("Merging invalid trades parquet file") old_invalid_trades = pd.read_parquet( DATA_DIR / "invalid_trades.parquet" ) merge_df = pd.concat( [old_invalid_trades, invalid_trades], ignore_index=True ) invalid_trades = merge_df.drop_duplicates() except Exception as e: print(f"Error updating the invalid trades parquet {e}") invalid_trades.to_parquet(DATA_DIR / "invalid_trades.parquet", index=False) all_trades_df = all_trades_df.loc[all_trades_df["is_invalid"] == False] all_trades_df = label_trades_by_staking(trades_df=all_trades_df) print("Creating unknown traders dataset") unknown_traders_df, all_trades_df = create_unknown_traders_df( trades_df=all_trades_df ) # merge with previous unknown traders dataset previous_unknown_traders = pd.read_parquet(DATA_DIR / "unknown_traders.parquet") unknown_traders_df: pd.DataFrame = pd.concat( [unknown_traders_df, previous_unknown_traders], ignore_index=True ) unknown_traders_df.drop_duplicates("trade_id", keep="last", inplace=True) unknown_traders_df.to_parquet(DATA_DIR / "unknown_traders.parquet", index=False) # save to parquet all_trades_df.to_parquet(DATA_DIR / "all_trades_profitability.parquet", index=False) print("Done!") return all_trades_df def add_trades_profitability(trades_filename: str): print("Reading the trades file") try: fpmmTrades = pd.read_parquet(DATA_DIR / trades_filename) except FileNotFoundError: print(f"Error reading {trades_filename} file .") # make sure trader_address is in the columns assert "trader_address" in fpmmTrades.columns, "trader_address column not found" # lowercase and strip creator_address fpmmTrades["trader_address"] = fpmmTrades["trader_address"].str.lower().str.strip() print("Reading tools parquet file") tools = pd.read_parquet(TMP_DIR / "tools.parquet") try: fpmmTrades["creationTimestamp"] = fpmmTrades["creationTimestamp"].apply( lambda x: transform_to_datetime(x) ) except Exception as e: print(f"Transformation not needed") print("Computing the estimated mech calls dataset") trade_mech_calls = compute_mech_calls_based_on_timestamps( fpmmTrades=fpmmTrades, tools=tools ) print(trade_mech_calls.total_mech_calls.describe()) print("Analysing trades...") all_trades_df = analyse_all_traders(fpmmTrades, trade_mech_calls) # debugging purposes all_trades_df.to_parquet(JSON_DATA_DIR / "missing_trades_df.parquet", index=False) # filter invalid markets. Condition: "is_invalid" is True print("Checking invalid trades") invalid_trades = all_trades_df.loc[all_trades_df["is_invalid"] == True] if len(invalid_trades) > 0: try: print("Merging invalid trades parquet file") old_invalid_trades = pd.read_parquet(DATA_DIR / "invalid_trades.parquet") merge_df = pd.concat( [old_invalid_trades, invalid_trades], ignore_index=True ) invalid_trades = merge_df.drop_duplicates("trade_id") except Exception as e: print(f"Error updating the invalid trades parquet {e}") invalid_trades.to_parquet(DATA_DIR / "invalid_trades.parquet", index=False) all_trades_df = all_trades_df.loc[all_trades_df["is_invalid"] == False] print("Adding staking labels") all_trades_df = label_trades_by_staking(trades_df=all_trades_df) print("Creating unknown traders dataset") unknown_traders_df, all_trades_df = create_unknown_traders_df( trades_df=all_trades_df ) if len(unknown_traders_df) > 0: print("Merging unknown traders info") # merge with previous unknown traders dataset previous_unknown_traders = pd.read_parquet(DATA_DIR / "unknown_traders.parquet") unknown_traders_df: pd.DataFrame = pd.concat( [unknown_traders_df, previous_unknown_traders], ignore_index=True ) unknown_traders_df.drop_duplicates("trade_id", keep="last", inplace=True) unknown_traders_df.to_parquet(DATA_DIR / "unknown_traders.parquet", index=False) print("merge with previous all_trades_profitability") old_trades = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet") all_trades_df: pd.DataFrame = pd.concat( [all_trades_df, old_trades], ignore_index=True ) all_trades_df.drop_duplicates("trade_id", keep="last", inplace=True) all_trades_df.to_parquet(DATA_DIR / "all_trades_profitability.parquet", index=False) if __name__ == "__main__": # updating the whole fpmmTrades parquet file instead of just the new ones # trade_mech_calls = pd.read_parquet(TMP_DIR / "result_df.parquet") # fpmmTrades = pd.read_parquet(TMP_DIR / "fpmmTrades.parquet") # fpmmTrades["creationTimestamp"] = fpmmTrades["creationTimestamp"].apply( # lambda x: transform_to_datetime(x) # ) # all_trades_df = analyse_all_traders(fpmmTrades, trade_mech_calls) # all_trades_df.to_parquet(TMP_DIR / "all_trades_df.parquet", index=False) run_profitability_analysis("file1", "file2")