rosacastillo's picture
adding new unknown trader category
f7c2ff7
raw
history blame
15.9 kB
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
#
# Copyright 2023 Valory AG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ------------------------------------------------------------------------------
import time
import datetime
import pandas as pd
from typing import Any
from enum import Enum
from tqdm import tqdm
import numpy as np
import os
from web3_utils import query_conditional_tokens_gc_subgraph
from get_mech_info import (
DATETIME_60_DAYS_AGO,
update_fpmmTrades_parquet,
update_tools_parquet,
update_all_trades_parquet,
)
from utils import (
wei_to_unit,
convert_hex_to_int,
JSON_DATA_DIR,
DATA_DIR,
DEFAULT_MECH_FEE,
)
from staking import label_trades_by_staking
from nr_mech_calls import create_unknown_traders_df
DUST_THRESHOLD = 10000000000000
INVALID_ANSWER = -1
DEFAULT_60_DAYS_AGO_TIMESTAMP = (DATETIME_60_DAYS_AGO).timestamp()
WXDAI_CONTRACT_ADDRESS = "0xe91D153E0b41518A2Ce8Dd3D7944Fa863463a97d"
DUST_THRESHOLD = 10000000000000
class MarketState(Enum):
"""Market state"""
OPEN = 1
PENDING = 2
FINALIZING = 3
ARBITRATING = 4
CLOSED = 5
def __str__(self) -> str:
"""Prints the market status."""
return self.name.capitalize()
class MarketAttribute(Enum):
"""Attribute"""
NUM_TRADES = "Num_trades"
WINNER_TRADES = "Winner_trades"
NUM_REDEEMED = "Num_redeemed"
INVESTMENT = "Investment"
FEES = "Fees"
MECH_CALLS = "Mech_calls"
MECH_FEES = "Mech_fees"
EARNINGS = "Earnings"
NET_EARNINGS = "Net_earnings"
REDEMPTIONS = "Redemptions"
ROI = "ROI"
def __str__(self) -> str:
"""Prints the attribute."""
return self.value
def __repr__(self) -> str:
"""Prints the attribute representation."""
return self.name
@staticmethod
def argparse(s: str) -> "MarketAttribute":
"""Performs string conversion to MarketAttribute."""
try:
return MarketAttribute[s.upper()]
except KeyError as e:
raise ValueError(f"Invalid MarketAttribute: {s}") from e
ALL_TRADES_STATS_DF_COLS = [
"trader_address",
"market_creator",
"trade_id",
"creation_timestamp",
"title",
"market_status",
"collateral_amount",
"outcome_index",
"trade_fee_amount",
"outcomes_tokens_traded",
"current_answer",
"is_invalid",
"winning_trade",
"earnings",
"redeemed",
"redeemed_amount",
"num_mech_calls",
"mech_fee_amount",
"net_earnings",
"roi",
]
SUMMARY_STATS_DF_COLS = [
"trader_address",
"num_trades",
"num_winning_trades",
"num_redeemed",
"total_investment",
"total_trade_fees",
"num_mech_calls",
"total_mech_fees",
"total_earnings",
"total_redeemed_amount",
"total_net_earnings",
"total_net_earnings_wo_mech_fees",
"total_roi",
"total_roi_wo_mech_fees",
"mean_mech_calls_per_trade",
"mean_mech_fee_amount_per_trade",
]
def _is_redeemed(user_json: dict[str, Any], fpmmTrade: dict[str, Any]) -> bool:
"""Returns whether the user has redeemed the position."""
user_positions = user_json["data"]["user"]["userPositions"]
condition_id = fpmmTrade["fpmm.condition.id"]
for position in user_positions:
position_condition_ids = position["position"]["conditionIds"]
balance = int(position["balance"])
if condition_id in position_condition_ids:
if balance == 0:
return True
# return early
return False
return False
def prepare_profitalibity_data(
rpc: str,
tools_filename: str,
trades_filename: str,
) -> pd.DataFrame:
"""Prepare data for profitalibity analysis."""
# Check if tools.parquet is in the same directory
try:
tools = pd.read_parquet(DATA_DIR / tools_filename)
# make sure creator_address is in the columns
assert "trader_address" in tools.columns, "trader_address column not found"
# lowercase and strip creator_address
tools["trader_address"] = tools["trader_address"].str.lower().str.strip()
tools.drop_duplicates(
subset=["request_id", "request_block"], keep="last", inplace=True
)
tools.to_parquet(DATA_DIR / tools_filename)
print(f"{tools_filename} loaded")
except FileNotFoundError:
print("tools.parquet not found. Please run tools.py first.")
return
# Check if fpmmTrades.parquet is in the same directory
print("Reading the trades file")
try:
fpmmTrades = pd.read_parquet(DATA_DIR / trades_filename)
except FileNotFoundError:
print(f"Error reading {trades_filename} file .")
# make sure trader_address is in the columns
assert "trader_address" in fpmmTrades.columns, "trader_address column not found"
# lowercase and strip creator_address
fpmmTrades["trader_address"] = fpmmTrades["trader_address"].str.lower().str.strip()
return fpmmTrades
def determine_market_status(trade, current_answer):
"""Determine the market status of a trade."""
if (current_answer is np.nan or current_answer is None) and time.time() >= int(
trade["fpmm.openingTimestamp"]
):
return MarketState.PENDING
elif current_answer is np.nan or current_answer is None:
return MarketState.OPEN
elif trade["fpmm.isPendingArbitration"]:
return MarketState.ARBITRATING
elif time.time() < int(trade["fpmm.answerFinalizedTimestamp"]):
return MarketState.FINALIZING
return MarketState.CLOSED
def analyse_trader(
trader_address: str,
fpmmTrades: pd.DataFrame,
tools: pd.DataFrame,
daily_info: bool = False,
) -> pd.DataFrame:
"""Analyse a trader's trades"""
# Filter trades and tools for the given trader
trades = fpmmTrades[fpmmTrades["trader_address"] == trader_address]
tools_usage = tools[tools["trader_address"] == trader_address]
# Prepare the DataFrame
trades_df = pd.DataFrame(columns=ALL_TRADES_STATS_DF_COLS)
if trades.empty:
return trades_df
# Fetch user's conditional tokens gc graph
try:
user_json = query_conditional_tokens_gc_subgraph(trader_address)
except Exception as e:
print(f"Error fetching user data: {e}")
return trades_df
# Iterate over the trades
for i, trade in tqdm(trades.iterrows(), total=len(trades), desc="Analysing trades"):
try:
market_answer = trade["fpmm.currentAnswer"]
if not daily_info and not market_answer:
print(f"Skipping trade {i} because currentAnswer is NaN")
continue
# Parsing and computing shared values
creation_timestamp_utc = datetime.datetime.fromtimestamp(
int(trade["creationTimestamp"]), tz=datetime.timezone.utc
)
collateral_amount = wei_to_unit(float(trade["collateralAmount"]))
fee_amount = wei_to_unit(float(trade["feeAmount"]))
outcome_tokens_traded = wei_to_unit(float(trade["outcomeTokensTraded"]))
earnings, winner_trade = (0, False)
redemption = _is_redeemed(user_json, trade)
current_answer = market_answer if market_answer else None
market_creator = trade["market_creator"]
# Determine market status
market_status = determine_market_status(trade, current_answer)
# Skip non-closed markets
if not daily_info and market_status != MarketState.CLOSED:
print(
f"Skipping trade {i} because market is not closed. Market Status: {market_status}"
)
continue
if current_answer is not None:
current_answer = convert_hex_to_int(current_answer)
# Compute invalidity
is_invalid = current_answer == INVALID_ANSWER
# Compute earnings and winner trade status
if current_answer is None:
earnings = 0.0
winner_trade = None
elif is_invalid:
earnings = collateral_amount
winner_trade = False
elif int(trade["outcomeIndex"]) == current_answer:
earnings = outcome_tokens_traded
winner_trade = True
# Compute mech calls
if len(tools_usage) == 0:
print("No tools usage information")
num_mech_calls = 0
else:
try:
num_mech_calls = (
tools_usage["prompt_request"]
.apply(lambda x: trade["title"] in x)
.sum()
)
except Exception:
print(f"Error while getting the number of mech calls")
num_mech_calls = 2 # Average value
net_earnings = (
earnings
- fee_amount
- (num_mech_calls * DEFAULT_MECH_FEE)
- collateral_amount
)
# Assign values to DataFrame
trades_df.loc[i] = {
"trader_address": trader_address,
"market_creator": market_creator,
"trade_id": trade["id"],
"market_status": market_status.name,
"creation_timestamp": creation_timestamp_utc,
"title": trade["title"],
"collateral_amount": collateral_amount,
"outcome_index": trade["outcomeIndex"],
"trade_fee_amount": fee_amount,
"outcomes_tokens_traded": outcome_tokens_traded,
"current_answer": current_answer,
"is_invalid": is_invalid,
"winning_trade": winner_trade,
"earnings": earnings,
"redeemed": redemption,
"redeemed_amount": earnings if redemption else 0,
"num_mech_calls": num_mech_calls,
"mech_fee_amount": num_mech_calls * DEFAULT_MECH_FEE,
"net_earnings": net_earnings,
"roi": net_earnings
/ (collateral_amount + fee_amount + num_mech_calls * DEFAULT_MECH_FEE),
}
except Exception as e:
print(f"Error processing trade {i}: {e}")
print(trade)
continue
return trades_df
def analyse_all_traders(
trades: pd.DataFrame, tools: pd.DataFrame, daily_info: bool = False
) -> pd.DataFrame:
"""Analyse all creators."""
all_traders = []
for trader in tqdm(
trades["trader_address"].unique(),
total=len(trades["trader_address"].unique()),
desc="Analysing creators",
):
all_traders.append(analyse_trader(trader, trades, tools, daily_info))
# concat all creators
all_creators_df = pd.concat(all_traders)
return all_creators_df
def summary_analyse(df):
"""Summarise profitability analysis."""
# Ensure DataFrame is not empty
if df.empty:
return pd.DataFrame(columns=SUMMARY_STATS_DF_COLS)
# Group by trader_address
grouped = df.groupby("trader_address")
# Create summary DataFrame
summary_df = grouped.agg(
num_trades=("trader_address", "size"),
num_winning_trades=("winning_trade", lambda x: float((x).sum())),
num_redeemed=("redeemed", lambda x: float(x.sum())),
total_investment=("collateral_amount", "sum"),
total_trade_fees=("trade_fee_amount", "sum"),
num_mech_calls=("num_mech_calls", "sum"),
total_mech_fees=("mech_fee_amount", "sum"),
total_earnings=("earnings", "sum"),
total_redeemed_amount=("redeemed_amount", "sum"),
total_net_earnings=("net_earnings", "sum"),
)
# Calculating additional columns
summary_df["total_roi"] = (
summary_df["total_net_earnings"] / summary_df["total_investment"]
)
summary_df["mean_mech_calls_per_trade"] = (
summary_df["num_mech_calls"] / summary_df["num_trades"]
)
summary_df["mean_mech_fee_amount_per_trade"] = (
summary_df["total_mech_fees"] / summary_df["num_trades"]
)
summary_df["total_net_earnings_wo_mech_fees"] = (
summary_df["total_net_earnings"] + summary_df["total_mech_fees"]
)
summary_df["total_roi_wo_mech_fees"] = (
summary_df["total_net_earnings_wo_mech_fees"] / summary_df["total_investment"]
)
# Resetting index to include trader_address
summary_df.reset_index(inplace=True)
return summary_df
def run_profitability_analysis(
rpc: str,
tools_filename: str,
trades_filename: str,
merge: bool = False,
):
"""Create all trades analysis."""
# load dfs from data folder for analysis
print(f"Preparing data with {tools_filename} and {trades_filename}")
fpmmTrades = prepare_profitalibity_data(rpc, tools_filename, trades_filename)
if merge:
update_tools_parquet(rpc, tools_filename)
tools = pd.read_parquet(DATA_DIR / "tools.parquet")
print("Analysing trades...")
all_trades_df = analyse_all_traders(fpmmTrades, tools)
# # merge previous files if requested
if merge:
update_fpmmTrades_parquet(trades_filename)
all_trades_df = update_all_trades_parquet(all_trades_df)
# debugging purposes
all_trades_df.to_parquet(JSON_DATA_DIR / "all_trades_df.parquet", index=False)
# filter invalid markets. Condition: "is_invalid" is True
invalid_trades = all_trades_df.loc[all_trades_df["is_invalid"] == True]
if len(invalid_trades) == 0:
print("No new invalid trades")
else:
if merge:
try:
print("Merging invalid trades parquet file")
old_invalid_trades = pd.read_parquet(
DATA_DIR / "invalid_trades.parquet"
)
merge_df = pd.concat(
[old_invalid_trades, invalid_trades], ignore_index=True
)
invalid_trades = merge_df.drop_duplicates()
except Exception as e:
print(f"Error updating the invalid trades parquet {e}")
invalid_trades.to_parquet(DATA_DIR / "invalid_trades.parquet", index=False)
all_trades_df = all_trades_df.loc[all_trades_df["is_invalid"] == False]
# add staking labels
label_trades_by_staking(trades_df=all_trades_df)
# create the unknown traders dataset
unknown_traders_df, all_trades_df = create_unknown_traders_df(
trades_df=all_trades_df
)
unknown_traders_df.to_parquet(DATA_DIR / "unknown_traders.parquet", index=False)
# save to parquet
all_trades_df.to_parquet(DATA_DIR / "all_trades_profitability.parquet", index=False)
# summarize profitability df
print("Summarising trades...")
summary_df = summary_analyse(all_trades_df)
summary_df.to_parquet(DATA_DIR / "summary_profitability.parquet", index=False)
print("Done!")
return all_trades_df, summary_df
if __name__ == "__main__":
rpc = "https://lb.nodies.app/v1/406d8dcc043f4cb3959ed7d6673d311a"
if os.path.exists(DATA_DIR / "fpmmTrades.parquet"):
os.remove(DATA_DIR / "fpmmTrades.parquet")
run_profitability_analysis(rpc)