rosacastillo commited on
Commit
b3c2f09
·
1 Parent(s): e6dc6fa

adding new scripts

Browse files
scripts/get_mech_info.py CHANGED
@@ -14,6 +14,7 @@ from mech_request_utils import (
14
  get_ipfs_data,
15
  merge_json_files,
16
  )
 
17
 
18
  OLD_MECH_SUBGRAPH_URL = (
19
  "https://api.thegraph.com/subgraphs/name/stakewise/ethereum-gnosis"
@@ -175,7 +176,7 @@ def update_all_trades_parquet(new_trades_df: pd.DataFrame) -> pd.DataFrame:
175
  return merge_df
176
 
177
 
178
- def update_tools_parquet(new_tools_filename: pd.DataFrame):
179
  try:
180
  old_tools_df = pd.read_parquet(DATA_DIR / "tools.parquet")
181
  except Exception as e:
@@ -183,6 +184,8 @@ def update_tools_parquet(new_tools_filename: pd.DataFrame):
183
  return None
184
  try:
185
  new_tools_df = pd.read_parquet(DATA_DIR / new_tools_filename)
 
 
186
  except Exception as e:
187
  print(f"Error reading new trades parquet file {e}")
188
  return None
@@ -194,7 +197,9 @@ def update_tools_parquet(new_tools_filename: pd.DataFrame):
194
  print(f"Initial length before removing duplicates in tools= {len(merge_df)}")
195
 
196
  # Remove duplicates
197
- merge_df.drop_duplicates(inplace=True)
 
 
198
  print(f"Final length after removing duplicates in tools= {len(merge_df)}")
199
 
200
  # save the parquet file
 
14
  get_ipfs_data,
15
  merge_json_files,
16
  )
17
+ from web3_utils import updating_timestamps
18
 
19
  OLD_MECH_SUBGRAPH_URL = (
20
  "https://api.thegraph.com/subgraphs/name/stakewise/ethereum-gnosis"
 
176
  return merge_df
177
 
178
 
179
+ def update_tools_parquet(rpc: str, new_tools_filename: pd.DataFrame):
180
  try:
181
  old_tools_df = pd.read_parquet(DATA_DIR / "tools.parquet")
182
  except Exception as e:
 
184
  return None
185
  try:
186
  new_tools_df = pd.read_parquet(DATA_DIR / new_tools_filename)
187
+ # the new file has no request_time yet
188
+ updating_timestamps(rpc, new_tools_filename)
189
  except Exception as e:
190
  print(f"Error reading new trades parquet file {e}")
191
  return None
 
197
  print(f"Initial length before removing duplicates in tools= {len(merge_df)}")
198
 
199
  # Remove duplicates
200
+ merge_df.drop_duplicates(
201
+ subset=["request_id", "request_time"], keep="last", inplace=True
202
+ )
203
  print(f"Final length after removing duplicates in tools= {len(merge_df)}")
204
 
205
  # save the parquet file
scripts/manage_space_files.py ADDED
@@ -0,0 +1,40 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import shutil
3
+
4
+ # Define the file names to move
5
+ files_to_move = [
6
+ "new_tools.parquet",
7
+ "new_fpmmTrades.parquet",
8
+ "fpmms.parquet",
9
+ "fpmmTrades.parquet",
10
+ ]
11
+
12
+ # Get the current working directory
13
+ current_dir = os.getcwd()
14
+
15
+ # Define source and destination paths
16
+ source_dir = os.path.join(current_dir, "data")
17
+ dest_dir = os.path.join(current_dir, "tmp")
18
+
19
+
20
+ def move_files():
21
+ # Create tmp directory if it doesn't exist
22
+ if not os.path.exists(dest_dir):
23
+ os.makedirs(dest_dir)
24
+ # Move each file
25
+ for file_name in files_to_move:
26
+ source_file = os.path.join(source_dir, file_name)
27
+ dest_file = os.path.join(dest_dir, file_name)
28
+
29
+ try:
30
+ if os.path.exists(source_file):
31
+ shutil.move(source_file, dest_file)
32
+ print(f"Moved {file_name} successfully")
33
+ else:
34
+ print(f"File not found: {file_name}")
35
+ except Exception as e:
36
+ print(f"Error moving {file_name}: {str(e)}")
37
+
38
+
39
+ if __name__ == "__main__":
40
+ move_files()
scripts/profitability.py CHANGED
@@ -28,7 +28,7 @@ from enum import Enum
28
  from tqdm import tqdm
29
  import numpy as np
30
  import os
31
- from pathlib import Path
32
  from get_mech_info import (
33
  DATETIME_60_DAYS_AGO,
34
  update_fpmmTrades_parquet,
@@ -41,6 +41,7 @@ from utils import (
41
  convert_hex_to_int,
42
  _to_content,
43
  JSON_DATA_DIR,
 
44
  )
45
  from queries import omen_xdai_trades_query, conditional_tokens_gc_user_query
46
  from staking import label_trades_by_staking
@@ -58,9 +59,6 @@ DEFAULT_TO_TIMESTAMP = 2147483647 # around year 2038
58
  WXDAI_CONTRACT_ADDRESS = "0xe91D153E0b41518A2Ce8Dd3D7944Fa863463a97d"
59
  DEFAULT_MECH_FEE = 0.01
60
  DUST_THRESHOLD = 10000000000000
61
- SCRIPTS_DIR = Path(__file__).parent
62
- ROOT_DIR = SCRIPTS_DIR.parent
63
- DATA_DIR = ROOT_DIR / "data"
64
 
65
 
66
  class MarketState(Enum):
@@ -331,7 +329,7 @@ def prepare_profitalibity_data(
331
  tools_filename: str,
332
  trades_filename: str,
333
  from_timestamp: float,
334
- ):
335
  """Prepare data for profitalibity analysis."""
336
 
337
  # Check if tools.parquet is in the same directory
@@ -344,9 +342,10 @@ def prepare_profitalibity_data(
344
  # lowercase and strip creator_address
345
  tools["trader_address"] = tools["trader_address"].str.lower().str.strip()
346
 
347
- # drop duplicates
348
- tools.drop_duplicates(inplace=True)
349
-
 
350
  print(f"{tools_filename} loaded")
351
  except FileNotFoundError:
352
  print("tools.parquet not found. Please run tools.py first.")
@@ -366,7 +365,7 @@ def prepare_profitalibity_data(
366
  # lowercase and strip creator_address
367
  fpmmTrades["trader_address"] = fpmmTrades["trader_address"].str.lower().str.strip()
368
 
369
- return fpmmTrades, tools
370
 
371
 
372
  def determine_market_status(trade, current_answer):
@@ -455,6 +454,7 @@ def analyse_trader(
455
 
456
  # Compute mech calls
457
  if len(tools_usage) == 0:
 
458
  num_mech_calls = 0
459
  else:
460
  try:
@@ -582,21 +582,25 @@ def run_profitability_analysis(
582
 
583
  # load dfs from data folder for analysis
584
  print(f"Preparing data with {tools_filename} and {trades_filename}")
585
- fpmmTrades, tools = prepare_profitalibity_data(
586
  rpc, tools_filename, trades_filename, from_timestamp
587
  )
 
 
 
588
 
589
  print("Analysing trades...")
590
  all_trades_df = analyse_all_traders(fpmmTrades, tools)
591
 
592
- # merge previous files if requested
593
  if merge:
594
  update_fpmmTrades_parquet(trades_filename)
595
- update_tools_parquet(tools_filename)
596
  all_trades_df = update_all_trades_parquet(all_trades_df)
597
 
598
  # debugging purposes
599
  all_trades_df.to_parquet(JSON_DATA_DIR / "all_trades_df.parquet")
 
 
600
  # filter invalid markets. Condition: "is_invalid" is True
601
  invalid_trades = all_trades_df.loc[all_trades_df["is_invalid"] == True]
602
  if len(invalid_trades) == 0:
 
28
  from tqdm import tqdm
29
  import numpy as np
30
  import os
31
+
32
  from get_mech_info import (
33
  DATETIME_60_DAYS_AGO,
34
  update_fpmmTrades_parquet,
 
41
  convert_hex_to_int,
42
  _to_content,
43
  JSON_DATA_DIR,
44
+ DATA_DIR,
45
  )
46
  from queries import omen_xdai_trades_query, conditional_tokens_gc_user_query
47
  from staking import label_trades_by_staking
 
59
  WXDAI_CONTRACT_ADDRESS = "0xe91D153E0b41518A2Ce8Dd3D7944Fa863463a97d"
60
  DEFAULT_MECH_FEE = 0.01
61
  DUST_THRESHOLD = 10000000000000
 
 
 
62
 
63
 
64
  class MarketState(Enum):
 
329
  tools_filename: str,
330
  trades_filename: str,
331
  from_timestamp: float,
332
+ ) -> pd.DataFrame:
333
  """Prepare data for profitalibity analysis."""
334
 
335
  # Check if tools.parquet is in the same directory
 
342
  # lowercase and strip creator_address
343
  tools["trader_address"] = tools["trader_address"].str.lower().str.strip()
344
 
345
+ tools.drop_duplicates(
346
+ subset=["request_id", "request_block"], keep="last", inplace=True
347
+ )
348
+ tools.to_parquet(DATA_DIR / tools_filename)
349
  print(f"{tools_filename} loaded")
350
  except FileNotFoundError:
351
  print("tools.parquet not found. Please run tools.py first.")
 
365
  # lowercase and strip creator_address
366
  fpmmTrades["trader_address"] = fpmmTrades["trader_address"].str.lower().str.strip()
367
 
368
+ return fpmmTrades
369
 
370
 
371
  def determine_market_status(trade, current_answer):
 
454
 
455
  # Compute mech calls
456
  if len(tools_usage) == 0:
457
+ print("No tools usage information")
458
  num_mech_calls = 0
459
  else:
460
  try:
 
582
 
583
  # load dfs from data folder for analysis
584
  print(f"Preparing data with {tools_filename} and {trades_filename}")
585
+ fpmmTrades = prepare_profitalibity_data(
586
  rpc, tools_filename, trades_filename, from_timestamp
587
  )
588
+ if merge:
589
+ update_tools_parquet(rpc, tools_filename)
590
+ tools = pd.read_parquet(DATA_DIR / "tools.parquet")
591
 
592
  print("Analysing trades...")
593
  all_trades_df = analyse_all_traders(fpmmTrades, tools)
594
 
595
+ # # merge previous files if requested
596
  if merge:
597
  update_fpmmTrades_parquet(trades_filename)
 
598
  all_trades_df = update_all_trades_parquet(all_trades_df)
599
 
600
  # debugging purposes
601
  all_trades_df.to_parquet(JSON_DATA_DIR / "all_trades_df.parquet")
602
+
603
+ # all_trades_df = pd.read_parquet(JSON_DATA_DIR / "all_trades_df.parquet")
604
  # filter invalid markets. Condition: "is_invalid" is True
605
  invalid_trades = all_trades_df.loc[all_trades_df["is_invalid"] == True]
606
  if len(invalid_trades) == 0:
scripts/pull_data.py CHANGED
@@ -1,19 +1,20 @@
1
  import logging
2
- import pickle
3
  from datetime import datetime
4
- from concurrent.futures import ThreadPoolExecutor
5
- from tqdm import tqdm
6
- from web3 import Web3
7
  import pandas as pd
8
- from pathlib import Path
9
- from functools import partial
10
  from markets import (
11
  etl as mkt_etl,
12
  DEFAULT_FILENAME as MARKETS_FILENAME,
13
  )
14
  from tools import DEFAULT_FILENAME as TOOLS_FILENAME, generate_tools_file
15
  from profitability import run_profitability_analysis, DEFAULT_60_DAYS_AGO_TIMESTAMP
16
- from utils import get_question, current_answer, RPC, measure_execution_time
 
 
 
 
 
 
 
17
  from get_mech_info import (
18
  get_mech_events_last_60_days,
19
  get_mech_events_since_last_run,
@@ -21,31 +22,10 @@ from get_mech_info import (
21
  )
22
  from update_tools_accuracy import compute_tools_accuracy
23
  from cleaning_old_info import clean_old_data_from_parquet_files
24
- import gc
25
-
26
- logging.basicConfig(level=logging.INFO)
27
-
28
- SCRIPTS_DIR = Path(__file__).parent
29
- ROOT_DIR = SCRIPTS_DIR.parent
30
- DATA_DIR = ROOT_DIR / "data"
31
- HIST_DIR = ROOT_DIR / "historical_data"
32
 
33
 
34
- def block_number_to_timestamp(block_number: int, web3: Web3) -> str:
35
- """Convert a block number to a timestamp."""
36
- block = web3.eth.get_block(block_number)
37
- timestamp = datetime.utcfromtimestamp(block["timestamp"])
38
- return timestamp.strftime("%Y-%m-%d %H:%M:%S")
39
-
40
-
41
- def parallelize_timestamp_conversion(df: pd.DataFrame, function: callable) -> list:
42
- """Parallelize the timestamp conversion."""
43
- block_numbers = df["request_block"].tolist()
44
- with ThreadPoolExecutor(max_workers=10) as executor:
45
- results = list(
46
- tqdm(executor.map(function, block_numbers), total=len(block_numbers))
47
- )
48
- return results
49
 
50
 
51
  def add_current_answer(tools_filename: str):
@@ -65,61 +45,6 @@ def add_current_answer(tools_filename: str):
65
  del fpmms
66
 
67
 
68
- def updating_timestamps(rpc: str, tools_filename: str):
69
- web3 = Web3(Web3.HTTPProvider(rpc))
70
-
71
- tools = pd.read_parquet(DATA_DIR / tools_filename)
72
-
73
- # Convert block number to timestamp
74
- logging.info("Converting block number to timestamp")
75
- t_map = pickle.load(open(DATA_DIR / "t_map.pkl", "rb"))
76
- tools["request_time"] = tools["request_block"].map(t_map)
77
-
78
- no_data = tools["request_time"].isna().sum()
79
- logging.info(f"Total rows with no request time info = {no_data}")
80
-
81
- # Identify tools with missing request_time and fill them
82
- missing_time_indices = tools[tools["request_time"].isna()].index
83
- if not missing_time_indices.empty:
84
- partial_block_number_to_timestamp = partial(
85
- block_number_to_timestamp, web3=web3
86
- )
87
- missing_timestamps = parallelize_timestamp_conversion(
88
- tools.loc[missing_time_indices], partial_block_number_to_timestamp
89
- )
90
-
91
- # Update the original DataFrame with the missing timestamps
92
- for i, timestamp in zip(missing_time_indices, missing_timestamps):
93
- tools.at[i, "request_time"] = timestamp
94
-
95
- tools["request_month_year"] = pd.to_datetime(tools["request_time"]).dt.strftime(
96
- "%Y-%m"
97
- )
98
- tools["request_month_year_week"] = (
99
- pd.to_datetime(tools["request_time"]).dt.to_period("W").astype(str)
100
- )
101
-
102
- # Save the tools data after the updates on the content
103
- tools.to_parquet(DATA_DIR / tools_filename, index=False)
104
-
105
- # Update t_map with new timestamps
106
- new_timestamps = (
107
- tools[["request_block", "request_time"]]
108
- .dropna()
109
- .set_index("request_block")
110
- .to_dict()["request_time"]
111
- )
112
- t_map.update(new_timestamps)
113
-
114
- with open(DATA_DIR / "t_map.pkl", "wb") as f:
115
- pickle.dump(t_map, f)
116
-
117
- # clean and release all memory
118
- del tools
119
- del t_map
120
- gc.collect()
121
-
122
-
123
  def save_historical_data():
124
  """Function to save a copy of the main trades and tools file
125
  into the historical folder"""
@@ -196,14 +121,14 @@ def only_new_weekly_analysis():
196
 
197
  save_historical_data()
198
 
199
- clean_old_data_from_parquet_files("2024-09-29")
200
 
201
  compute_tools_accuracy()
202
 
203
  logging.info("Weekly analysis files generated and saved")
204
 
205
 
206
- def weekly_analysis():
207
  """Run weekly analysis for the FPMMS project."""
208
  rpc = RPC
209
  # Run markets ETL
 
1
  import logging
 
2
  from datetime import datetime
 
 
 
3
  import pandas as pd
 
 
4
  from markets import (
5
  etl as mkt_etl,
6
  DEFAULT_FILENAME as MARKETS_FILENAME,
7
  )
8
  from tools import DEFAULT_FILENAME as TOOLS_FILENAME, generate_tools_file
9
  from profitability import run_profitability_analysis, DEFAULT_60_DAYS_AGO_TIMESTAMP
10
+ from utils import (
11
+ get_question,
12
+ current_answer,
13
+ RPC,
14
+ measure_execution_time,
15
+ DATA_DIR,
16
+ HIST_DIR,
17
+ )
18
  from get_mech_info import (
19
  get_mech_events_last_60_days,
20
  get_mech_events_since_last_run,
 
22
  )
23
  from update_tools_accuracy import compute_tools_accuracy
24
  from cleaning_old_info import clean_old_data_from_parquet_files
25
+ from web3_utils import updating_timestamps
 
 
 
 
 
 
 
26
 
27
 
28
+ logging.basicConfig(level=logging.INFO)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
 
30
 
31
  def add_current_answer(tools_filename: str):
 
45
  del fpmms
46
 
47
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
  def save_historical_data():
49
  """Function to save a copy of the main trades and tools file
50
  into the historical folder"""
 
121
 
122
  save_historical_data()
123
 
124
+ clean_old_data_from_parquet_files("2024-10-06")
125
 
126
  compute_tools_accuracy()
127
 
128
  logging.info("Weekly analysis files generated and saved")
129
 
130
 
131
+ def old_weekly_analysis():
132
  """Run weekly analysis for the FPMMS project."""
133
  rpc = RPC
134
  # Run markets ETL
scripts/staking.py CHANGED
@@ -110,7 +110,7 @@ def update_service_map(start: int = 1, end: int = 1000):
110
  service_map = pickle.load(f)
111
  else:
112
  service_map = {}
113
-
114
  # we do not know which is the last service id right now
115
  service_registry = _get_contract(SERVICE_REGISTRY_ADDRESS)
116
  with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
@@ -174,16 +174,16 @@ def get_trader_address_staking(trader_address: str, service_map: dict) -> str:
174
  return check_owner_staking_contract(owner_address=owner)
175
 
176
 
177
- def label_trades_by_staking(
178
- trades_df: pd.DataFrame, update: bool = True
179
- ) -> pd.DataFrame:
180
  with open(DATA_DIR / "service_map.pkl", "rb") as f:
181
  service_map = pickle.load(f)
182
  # get the last service id
183
  keys = service_map.keys()
184
- last_key = max(keys)
185
- if update:
186
- update_service_map(start=last_key)
 
 
187
  all_traders = trades_df.trader_address.unique()
188
  trades_df["staking"] = ""
189
  for trader in tqdm(all_traders, desc="Labeling traders by staking", unit="trader"):
@@ -200,17 +200,6 @@ def label_trades_by_staking(
200
  if __name__ == "__main__":
201
  # create_service_map()
202
  trades_df = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet")
203
- label_trades_by_staking(trades_df=trades_df)
204
- print(
205
- trades_df[
206
- [
207
- "trader_address",
208
- "creation_timestamp",
209
- "market_creator",
210
- "staking",
211
- "collateral_amount",
212
- ]
213
- ]
214
- )
215
  print(trades_df.staking.value_counts())
216
  trades_df.to_parquet(DATA_DIR / "all_trades_profitability.parquet", index=False)
 
110
  service_map = pickle.load(f)
111
  else:
112
  service_map = {}
113
+ print(f"updating service map from service id={start}")
114
  # we do not know which is the last service id right now
115
  service_registry = _get_contract(SERVICE_REGISTRY_ADDRESS)
116
  with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
 
174
  return check_owner_staking_contract(owner_address=owner)
175
 
176
 
177
+ def label_trades_by_staking(trades_df: pd.DataFrame, start: int = None) -> pd.DataFrame:
 
 
178
  with open(DATA_DIR / "service_map.pkl", "rb") as f:
179
  service_map = pickle.load(f)
180
  # get the last service id
181
  keys = service_map.keys()
182
+ if start is None:
183
+ last_key = max(keys)
184
+ else:
185
+ last_key = start
186
+ update_service_map(start=last_key)
187
  all_traders = trades_df.trader_address.unique()
188
  trades_df["staking"] = ""
189
  for trader in tqdm(all_traders, desc="Labeling traders by staking", unit="trader"):
 
200
  if __name__ == "__main__":
201
  # create_service_map()
202
  trades_df = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet")
203
+ label_trades_by_staking(trades_df=trades_df, start=20)
 
 
 
 
 
 
 
 
 
 
 
204
  print(trades_df.staking.value_counts())
205
  trades_df.to_parquet(DATA_DIR / "all_trades_profitability.parquet", index=False)