rosacastillo's picture
updating week format starting on Monday, new staking contracts and new weekly data
285f2a6
raw
history blame
18.7 kB
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
#
# Copyright 2023 Valory AG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ------------------------------------------------------------------------------
import time
import pandas as pd
from typing import Any
from enum import Enum
from tqdm import tqdm
import numpy as np
import os
from web3_utils import query_conditional_tokens_gc_subgraph
from get_mech_info import (
DATETIME_60_DAYS_AGO,
update_tools_parquet,
update_all_trades_parquet,
)
from utils import (
wei_to_unit,
convert_hex_to_int,
JSON_DATA_DIR,
DATA_DIR,
DEFAULT_MECH_FEE,
TMP_DIR,
)
from staking import label_trades_by_staking
from nr_mech_calls import (
create_unknown_traders_df,
transform_to_datetime,
compute_mech_calls_based_on_timestamps,
)
DUST_THRESHOLD = 10000000000000
INVALID_ANSWER = -1
DEFAULT_60_DAYS_AGO_TIMESTAMP = (DATETIME_60_DAYS_AGO).timestamp()
WXDAI_CONTRACT_ADDRESS = "0xe91D153E0b41518A2Ce8Dd3D7944Fa863463a97d"
DUST_THRESHOLD = 10000000000000
class MarketState(Enum):
"""Market state"""
OPEN = 1
PENDING = 2
FINALIZING = 3
ARBITRATING = 4
CLOSED = 5
def __str__(self) -> str:
"""Prints the market status."""
return self.name.capitalize()
class MarketAttribute(Enum):
"""Attribute"""
NUM_TRADES = "Num_trades"
WINNER_TRADES = "Winner_trades"
NUM_REDEEMED = "Num_redeemed"
INVESTMENT = "Investment"
FEES = "Fees"
MECH_CALLS = "Mech_calls"
MECH_FEES = "Mech_fees"
EARNINGS = "Earnings"
NET_EARNINGS = "Net_earnings"
REDEMPTIONS = "Redemptions"
ROI = "ROI"
def __str__(self) -> str:
"""Prints the attribute."""
return self.value
def __repr__(self) -> str:
"""Prints the attribute representation."""
return self.name
@staticmethod
def argparse(s: str) -> "MarketAttribute":
"""Performs string conversion to MarketAttribute."""
try:
return MarketAttribute[s.upper()]
except KeyError as e:
raise ValueError(f"Invalid MarketAttribute: {s}") from e
ALL_TRADES_STATS_DF_COLS = [
"trader_address",
"market_creator",
"trade_id",
"creation_timestamp",
"title",
"market_status",
"collateral_amount",
"outcome_index",
"trade_fee_amount",
"outcomes_tokens_traded",
"current_answer",
"is_invalid",
"winning_trade",
"earnings",
"redeemed",
"redeemed_amount",
"num_mech_calls",
"mech_fee_amount",
"net_earnings",
"roi",
]
def _is_redeemed(user_json: dict[str, Any], fpmmTrade: dict[str, Any]) -> bool:
"""Returns whether the user has redeemed the position."""
user_positions = user_json["data"]["user"]["userPositions"]
condition_id = fpmmTrade["fpmm.condition.id"]
for position in user_positions:
position_condition_ids = position["position"]["conditionIds"]
balance = int(position["balance"])
if condition_id in position_condition_ids:
if balance == 0:
return True
# return early
return False
return False
def prepare_profitalibity_data(
tools_filename: str,
trades_filename: str,
) -> pd.DataFrame:
"""Prepare data for profitalibity analysis."""
# Check if tools.parquet is in the same directory
try:
# tools parquet file
tools = pd.read_parquet(DATA_DIR / tools_filename)
# make sure creator_address is in the columns
assert "trader_address" in tools.columns, "trader_address column not found"
# lowercase and strip creator_address
tools["trader_address"] = tools["trader_address"].str.lower().str.strip()
tools.drop_duplicates(
subset=["request_id", "request_block"], keep="last", inplace=True
)
tools.to_parquet(DATA_DIR / tools_filename)
print(f"{tools_filename} loaded")
except FileNotFoundError:
print(f"{tools_filename} not found.")
return
# Check if fpmmTrades.parquet is in the same directory
print("Reading the new trades file")
try:
fpmmTrades = pd.read_parquet(DATA_DIR / trades_filename)
except FileNotFoundError:
print(f"Error reading {trades_filename} file .")
# make sure trader_address is in the columns
assert "trader_address" in fpmmTrades.columns, "trader_address column not found"
# lowercase and strip creator_address
fpmmTrades["trader_address"] = fpmmTrades["trader_address"].str.lower().str.strip()
return fpmmTrades
def determine_market_status(trade, current_answer):
"""Determine the market status of a trade."""
if (current_answer is np.nan or current_answer is None) and time.time() >= int(
trade["fpmm.openingTimestamp"]
):
return MarketState.PENDING
elif current_answer is np.nan or current_answer is None:
return MarketState.OPEN
elif trade["fpmm.isPendingArbitration"]:
return MarketState.ARBITRATING
elif time.time() < int(trade["fpmm.answerFinalizedTimestamp"]):
return MarketState.FINALIZING
return MarketState.CLOSED
def analyse_trader(
trader_address: str,
fpmmTrades: pd.DataFrame,
trader_estimated_mech_calls: pd.DataFrame,
daily_info: bool = False,
) -> pd.DataFrame:
"""Analyse a trader's trades"""
fpmmTrades["creation_timestamp"] = pd.to_datetime(fpmmTrades["creationTimestamp"])
fpmmTrades["creation_date"] = fpmmTrades["creation_timestamp"].dt.date
# Filter trades and tools for the given trader
trades = fpmmTrades[fpmmTrades["trader_address"] == trader_address]
# Prepare the DataFrame
trades_df = pd.DataFrame(columns=ALL_TRADES_STATS_DF_COLS)
if trades.empty:
return trades_df
# Fetch user's conditional tokens gc graph
try:
user_json = query_conditional_tokens_gc_subgraph(trader_address)
except Exception as e:
print(f"Error fetching user data: {e}")
return trades_df
# Iterate over the trades
trades_answer_nan = 0
trades_no_closed_market = 0
for i, trade in tqdm(trades.iterrows(), total=len(trades), desc="Analysing trades"):
try:
market_answer = trade["fpmm.currentAnswer"]
trading_day = trade["creation_date"]
trade_id = trade["id"]
if not daily_info and not market_answer:
# print(f"Skipping trade {i} because currentAnswer is NaN")
trades_answer_nan += 1
continue
# Parsing and computing shared values
collateral_amount = wei_to_unit(float(trade["collateralAmount"]))
fee_amount = wei_to_unit(float(trade["feeAmount"]))
outcome_tokens_traded = wei_to_unit(float(trade["outcomeTokensTraded"]))
earnings, winner_trade = (0, False)
redemption = _is_redeemed(user_json, trade)
current_answer = market_answer if market_answer else None
market_creator = trade["market_creator"]
# Determine market status
market_status = determine_market_status(trade, current_answer)
# Skip non-closed markets
if not daily_info and market_status != MarketState.CLOSED:
# print(
# f"Skipping trade {i} because market is not closed. Market Status: {market_status}"
# )
trades_no_closed_market += 1
continue
if current_answer is not None:
current_answer = convert_hex_to_int(current_answer)
# Compute invalidity
is_invalid = current_answer == INVALID_ANSWER
# Compute earnings and winner trade status
if current_answer is None:
earnings = 0.0
winner_trade = None
elif is_invalid:
earnings = collateral_amount
winner_trade = False
elif int(trade["outcomeIndex"]) == current_answer:
earnings = outcome_tokens_traded
winner_trade = True
# Compute mech calls using the title, and trade id
if daily_info:
total_mech_calls = trader_estimated_mech_calls.loc[
(trader_estimated_mech_calls["trading_day"] == trading_day),
"total_mech_calls",
].iloc[0]
else:
total_mech_calls = trader_estimated_mech_calls.loc[
(trader_estimated_mech_calls["market"] == trade["title"])
& (trader_estimated_mech_calls["trade_id"] == trade_id),
"total_mech_calls",
].iloc[0]
net_earnings = (
earnings
- fee_amount
- (total_mech_calls * DEFAULT_MECH_FEE)
- collateral_amount
)
# Assign values to DataFrame
trades_df.loc[i] = {
"trader_address": trader_address,
"market_creator": market_creator,
"trade_id": trade["id"],
"market_status": market_status.name,
"creation_timestamp": trade["creationTimestamp"],
"title": trade["title"],
"collateral_amount": collateral_amount,
"outcome_index": trade["outcomeIndex"],
"trade_fee_amount": fee_amount,
"outcomes_tokens_traded": outcome_tokens_traded,
"current_answer": current_answer,
"is_invalid": is_invalid,
"winning_trade": winner_trade,
"earnings": earnings,
"redeemed": redemption,
"redeemed_amount": earnings if redemption else 0,
"num_mech_calls": total_mech_calls,
"mech_fee_amount": total_mech_calls * DEFAULT_MECH_FEE,
"net_earnings": net_earnings,
"roi": net_earnings
/ (
collateral_amount + fee_amount + total_mech_calls * DEFAULT_MECH_FEE
),
}
except Exception as e:
print(f"Error processing trade {i}: {e}")
print(trade)
continue
print(f"Number of trades where currentAnswer is NaN = {trades_answer_nan}")
print(
f"Number of trades where the market is not closed = {trades_no_closed_market}"
)
return trades_df
def analyse_all_traders(
trades: pd.DataFrame,
estimated_mech_calls: pd.DataFrame,
daily_info: bool = False,
) -> pd.DataFrame:
"""Analyse all creators."""
all_traders = []
for trader in tqdm(
trades["trader_address"].unique(),
total=len(trades["trader_address"].unique()),
desc="Analysing creators",
):
trader_estimated_mech_calls = estimated_mech_calls.loc[
estimated_mech_calls["trader_address"] == trader
]
all_traders.append(
analyse_trader(trader, trades, trader_estimated_mech_calls, daily_info)
)
# concat all creators
all_creators_df = pd.concat(all_traders)
return all_creators_df
def run_profitability_analysis(
tools_filename: str,
trades_filename: str,
merge: bool = False,
):
"""Create all trades analysis."""
# load dfs from data folder for analysis
print(f"Preparing data with {tools_filename} and {trades_filename}")
fpmmTrades = prepare_profitalibity_data(tools_filename, trades_filename)
if merge:
update_tools_parquet(tools_filename)
tools = pd.read_parquet(TMP_DIR / "tools.parquet")
try:
fpmmTrades["creationTimestamp"] = fpmmTrades["creationTimestamp"].apply(
lambda x: transform_to_datetime(x)
)
except Exception as e:
print(f"Transformation not needed")
print("Computing the estimated mech calls dataset")
trade_mech_calls = compute_mech_calls_based_on_timestamps(
fpmmTrades=fpmmTrades, tools=tools
)
trade_mech_calls.to_parquet(TMP_DIR / "trade_mech_calls.parquet")
print(trade_mech_calls.total_mech_calls.describe())
print("Analysing trades...")
all_trades_df = analyse_all_traders(fpmmTrades, trade_mech_calls)
# # merge previous files if requested
if merge:
all_trades_df = update_all_trades_parquet(all_trades_df)
# debugging purposes
all_trades_df.to_parquet(JSON_DATA_DIR / "all_trades_df.parquet", index=False)
# filter invalid markets. Condition: "is_invalid" is True
invalid_trades = all_trades_df.loc[all_trades_df["is_invalid"] == True]
if len(invalid_trades) == 0:
print("No new invalid trades")
else:
if merge:
try:
print("Merging invalid trades parquet file")
old_invalid_trades = pd.read_parquet(
DATA_DIR / "invalid_trades.parquet"
)
merge_df = pd.concat(
[old_invalid_trades, invalid_trades], ignore_index=True
)
invalid_trades = merge_df.drop_duplicates()
except Exception as e:
print(f"Error updating the invalid trades parquet {e}")
invalid_trades.to_parquet(DATA_DIR / "invalid_trades.parquet", index=False)
all_trades_df = all_trades_df.loc[all_trades_df["is_invalid"] == False]
all_trades_df = label_trades_by_staking(trades_df=all_trades_df)
print("Creating unknown traders dataset")
unknown_traders_df, all_trades_df = create_unknown_traders_df(
trades_df=all_trades_df
)
# merge with previous unknown traders dataset
previous_unknown_traders = pd.read_parquet(DATA_DIR / "unknown_traders.parquet")
unknown_traders_df: pd.DataFrame = pd.concat(
[unknown_traders_df, previous_unknown_traders], ignore_index=True
)
unknown_traders_df.drop_duplicates("trade_id", keep="last", inplace=True)
unknown_traders_df.to_parquet(DATA_DIR / "unknown_traders.parquet", index=False)
# save to parquet
all_trades_df.to_parquet(DATA_DIR / "all_trades_profitability.parquet", index=False)
print("Done!")
return all_trades_df
def add_trades_profitability(trades_filename: str):
print("Reading the trades file")
try:
fpmmTrades = pd.read_parquet(DATA_DIR / trades_filename)
except FileNotFoundError:
print(f"Error reading {trades_filename} file .")
# make sure trader_address is in the columns
assert "trader_address" in fpmmTrades.columns, "trader_address column not found"
# lowercase and strip creator_address
fpmmTrades["trader_address"] = fpmmTrades["trader_address"].str.lower().str.strip()
print("Reading tools parquet file")
tools = pd.read_parquet(TMP_DIR / "tools.parquet")
try:
fpmmTrades["creationTimestamp"] = fpmmTrades["creationTimestamp"].apply(
lambda x: transform_to_datetime(x)
)
except Exception as e:
print(f"Transformation not needed")
print("Computing the estimated mech calls dataset")
trade_mech_calls = compute_mech_calls_based_on_timestamps(
fpmmTrades=fpmmTrades, tools=tools
)
print(trade_mech_calls.total_mech_calls.describe())
print("Analysing trades...")
all_trades_df = analyse_all_traders(fpmmTrades, trade_mech_calls)
# debugging purposes
all_trades_df.to_parquet(JSON_DATA_DIR / "missing_trades_df.parquet", index=False)
# filter invalid markets. Condition: "is_invalid" is True
print("Checking invalid trades")
invalid_trades = all_trades_df.loc[all_trades_df["is_invalid"] == True]
if len(invalid_trades) > 0:
try:
print("Merging invalid trades parquet file")
old_invalid_trades = pd.read_parquet(DATA_DIR / "invalid_trades.parquet")
merge_df = pd.concat(
[old_invalid_trades, invalid_trades], ignore_index=True
)
invalid_trades = merge_df.drop_duplicates("trade_id")
except Exception as e:
print(f"Error updating the invalid trades parquet {e}")
invalid_trades.to_parquet(DATA_DIR / "invalid_trades.parquet", index=False)
all_trades_df = all_trades_df.loc[all_trades_df["is_invalid"] == False]
print("Adding staking labels")
all_trades_df = label_trades_by_staking(trades_df=all_trades_df)
print("Creating unknown traders dataset")
unknown_traders_df, all_trades_df = create_unknown_traders_df(
trades_df=all_trades_df
)
if len(unknown_traders_df) > 0:
print("Merging unknown traders info")
# merge with previous unknown traders dataset
previous_unknown_traders = pd.read_parquet(DATA_DIR / "unknown_traders.parquet")
unknown_traders_df: pd.DataFrame = pd.concat(
[unknown_traders_df, previous_unknown_traders], ignore_index=True
)
unknown_traders_df.drop_duplicates("trade_id", keep="last", inplace=True)
unknown_traders_df.to_parquet(DATA_DIR / "unknown_traders.parquet", index=False)
print("merge with previous all_trades_profitability")
old_trades = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet")
all_trades_df: pd.DataFrame = pd.concat(
[all_trades_df, old_trades], ignore_index=True
)
all_trades_df.drop_duplicates("trade_id", keep="last", inplace=True)
all_trades_df.to_parquet(DATA_DIR / "all_trades_profitability.parquet", index=False)
if __name__ == "__main__":
# updating the whole fpmmTrades parquet file instead of just the new ones
# trade_mech_calls = pd.read_parquet(TMP_DIR / "result_df.parquet")
# fpmmTrades = pd.read_parquet(TMP_DIR / "fpmmTrades.parquet")
# fpmmTrades["creationTimestamp"] = fpmmTrades["creationTimestamp"].apply(
# lambda x: transform_to_datetime(x)
# )
# all_trades_df = analyse_all_traders(fpmmTrades, trade_mech_calls)
# all_trades_df.to_parquet(TMP_DIR / "all_trades_df.parquet", index=False)
run_profitability_analysis("file1", "file2")