arshy commited on
Commit
ac347d2
Β·
1 Parent(s): cebb53d

use parquet instead of csv

Browse files
app.py CHANGED
@@ -106,8 +106,8 @@ def refresh_data():
106
 
107
  logging.info("Refreshing data...")
108
 
109
- tools_df = pd.read_csv("./data/tools.csv", low_memory=False)
110
- trades_df = pd.read_csv("./data/all_trades_profitability.csv")
111
  trades_df = prepare_trades(trades_df)
112
  error_df = get_error_data(tools_df=tools_df, inc_tools=INC_TOOLS)
113
  error_overall_df = get_error_data_overall(error_df=error_df)
@@ -134,8 +134,8 @@ def pull_refresh_data():
134
  refresh_data()
135
 
136
 
137
- tools_df = pd.read_csv("./data/tools.csv", low_memory=False)
138
- trades_df = pd.read_csv("./data/all_trades_profitability.csv")
139
  trades_df = prepare_trades(trades_df)
140
 
141
 
 
106
 
107
  logging.info("Refreshing data...")
108
 
109
+ tools_df = pd.read_parquet("./data/tools.parquet")
110
+ trades_df = pd.read_parquet("./data/all_trades_profitability.parquet")
111
  trades_df = prepare_trades(trades_df)
112
  error_df = get_error_data(tools_df=tools_df, inc_tools=INC_TOOLS)
113
  error_overall_df = get_error_data_overall(error_df=error_df)
 
134
  refresh_data()
135
 
136
 
137
+ tools_df = pd.read_parquet("./data/tools.parquet")
138
+ trades_df = pd.read_parquet("./data/all_trades_profitability.parquet")
139
  trades_df = prepare_trades(trades_df)
140
 
141
 
data/{all_trades_profitability.csv β†’ all_trades_profitability.parquet} RENAMED
@@ -1,3 +1,3 @@
1
  version https://git-lfs.github.com/spec/v1
2
- oid sha256:ea9047bacb53f4f2d396242ae39939517fde4b4061f425c992e981cc92c5b452
3
- size 34257800
 
1
  version https://git-lfs.github.com/spec/v1
2
+ oid sha256:ae0de6d7e607b8ac33140081ab5415b9c16e7359d23b196e555535af0d78965c
3
+ size 8251611
data/{delivers.csv β†’ delivers.parquet} RENAMED
@@ -1,3 +1,3 @@
1
  version https://git-lfs.github.com/spec/v1
2
- oid sha256:d3a7745e110366ca4314970a7dfac8caeab964c234c0dc4cf1528a878aaf15b0
3
- size 2905109283
 
1
  version https://git-lfs.github.com/spec/v1
2
+ oid sha256:5002d6ef2bf5d2e69def7f6c69090f72c961c0eee7724870b0960c20514b1180
3
+ size 1707150876
data/{fpmmTrades.csv β†’ fpmmTrades.parquet} RENAMED
@@ -1,3 +1,3 @@
1
  version https://git-lfs.github.com/spec/v1
2
- oid sha256:6a297b050b6c7c88c8fe7d0c597f362b95c0af735f65ff3218e9c748bdcbb820
3
- size 76092671
 
1
  version https://git-lfs.github.com/spec/v1
2
+ oid sha256:bb0cd005a2bb7b37b04e0388538249ab6434c9de532b337fcee775ab9205064c
3
+ size 20528876
data/{fpmms.csv β†’ fpmms.parquet} RENAMED
@@ -1,3 +1,3 @@
1
  version https://git-lfs.github.com/spec/v1
2
- oid sha256:c313ba383cbff6ed1bdacadade252d9e5ae8b66359336c99b3ad7845842e533d
3
- size 448312
 
1
  version https://git-lfs.github.com/spec/v1
2
+ oid sha256:5b0b82cf173571152d11bbcabd94f675e8d84c148925f47a96c5192d9b9e2f67
3
+ size 319767
data/requests.csv DELETED
@@ -1,3 +0,0 @@
1
- version https://git-lfs.github.com/spec/v1
2
- oid sha256:861e85f3437c0c75001e8b10731b91c643f0e0ef0bab214257c26d2a25fa9628
3
- size 168361105
 
 
 
 
data/requests.parquet ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ version https://git-lfs.github.com/spec/v1
2
+ oid sha256:a3de88b6c91037ed4245a60473dcca4dce395d1583ec5cb39f79ab0e42759904
3
+ size 46486507
data/{summary_profitability.csv β†’ summary_profitability.parquet} RENAMED
@@ -1,3 +1,3 @@
1
  version https://git-lfs.github.com/spec/v1
2
- oid sha256:60f49e3d95e3abc0d93b7518e3287f9fbbe65211b3447ed21030ab4b5415c7a9
3
- size 65116
 
1
  version https://git-lfs.github.com/spec/v1
2
+ oid sha256:0ef6d6a03b5f872d0228881b74e3a2427c4e8a5f7fd02776eb70683605ccbb4b
3
+ size 52394
data/t_map.pkl CHANGED
@@ -1,3 +1,3 @@
1
  version https://git-lfs.github.com/spec/v1
2
- oid sha256:c1a007c0c28c3ab7cd85597a9cedde455c346b481adec25c6ab0223f50808c9c
3
- size 7422234
 
1
  version https://git-lfs.github.com/spec/v1
2
+ oid sha256:2738a5a8e98ca83c409251237cc338ed540c0ea58779bf23ea59255fa88b42d5
3
+ size 7749840
data/tools.csv DELETED
@@ -1,3 +0,0 @@
1
- version https://git-lfs.github.com/spec/v1
2
- oid sha256:0510d9bc9ae10a2d5f7813fff775dc199a677dae6b5dc93f564509e60045f6cf
3
- size 3018115904
 
 
 
 
data/tools.parquet ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ version https://git-lfs.github.com/spec/v1
2
+ oid sha256:b72e906e6ba9e73fc39bd46eea8a17f00cfd15ecca8971bf35bc1c86c837bd99
3
+ size 1713482531
increase_zero_mech_calls.ipynb ADDED
The diff for this file is too large to render. See raw diff
 
scripts/markets.py CHANGED
@@ -46,7 +46,7 @@ QUESTION_FIELD = "question"
46
  OUTCOMES_FIELD = "outcomes"
47
  TITLE_FIELD = "title"
48
  MAX_UINT_HEX = "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"
49
- DEFAULT_FILENAME = "fpmms.csv"
50
  SCRIPTS_DIR = Path(__file__).parent
51
  ROOT_DIR = SCRIPTS_DIR.parent
52
  DATA_DIR = ROOT_DIR / "data"
@@ -218,10 +218,11 @@ def etl(filename: Optional[str] = None) -> pd.DataFrame:
218
  fpmms = transform_fpmms(fpmms)
219
 
220
  if filename:
221
- fpmms.to_csv(DATA_DIR / filename, index=False)
222
 
223
  return fpmms
224
 
225
 
226
  if __name__ == "__main__":
227
  etl(DEFAULT_FILENAME)
 
 
46
  OUTCOMES_FIELD = "outcomes"
47
  TITLE_FIELD = "title"
48
  MAX_UINT_HEX = "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"
49
+ DEFAULT_FILENAME = "fpmms.parquet"
50
  SCRIPTS_DIR = Path(__file__).parent
51
  ROOT_DIR = SCRIPTS_DIR.parent
52
  DATA_DIR = ROOT_DIR / "data"
 
218
  fpmms = transform_fpmms(fpmms)
219
 
220
  if filename:
221
+ fpmms.to_parquet(DATA_DIR / filename, index=False)
222
 
223
  return fpmms
224
 
225
 
226
  if __name__ == "__main__":
227
  etl(DEFAULT_FILENAME)
228
+
scripts/profitability.py CHANGED
@@ -385,7 +385,7 @@ def create_fpmmTrades(rpc: str):
385
  df.rename(columns={"creator": "trader_address"}, inplace=True)
386
 
387
  # save to csv
388
- df.to_csv(DATA_DIR / "fpmmTrades.csv", index=False)
389
 
390
  return df
391
 
@@ -396,7 +396,7 @@ def prepare_profitalibity_data(rpc: str):
396
  # Check if tools.py is in the same directory
397
  try:
398
  # load tools.csv
399
- tools = pd.read_csv(DATA_DIR / "tools.csv")
400
 
401
  # make sure creator_address is in the columns
402
  assert "trader_address" in tools.columns, "trader_address column not found"
@@ -407,21 +407,21 @@ def prepare_profitalibity_data(rpc: str):
407
  # drop duplicates
408
  tools.drop_duplicates(inplace=True)
409
 
410
- print("tools.csv loaded")
411
  except FileNotFoundError:
412
- print("tools.csv not found. Please run tools.py first.")
413
  return
414
 
415
  # Check if fpmmTrades.csv is in the same directory
416
  try:
417
  # load fpmmTrades.csv
418
- fpmmTrades = pd.read_csv(DATA_DIR / "fpmmTrades.csv")
419
- print("fpmmTrades.csv loaded")
420
  except FileNotFoundError:
421
- print("fpmmTrades.csv not found. Creating fpmmTrades.csv...")
422
  fpmmTrades = create_fpmmTrades(rpc)
423
- fpmmTrades.to_csv(DATA_DIR / "fpmmTrades.csv", index=False)
424
- fpmmTrades = pd.read_csv(DATA_DIR / "fpmmTrades.csv")
425
 
426
  # make sure trader_address is in the columns
427
  assert "trader_address" in fpmmTrades.columns, "trader_address column not found"
@@ -434,13 +434,13 @@ def prepare_profitalibity_data(rpc: str):
434
 
435
  def determine_market_status(trade, current_answer):
436
  """Determine the market status of a trade."""
437
- if current_answer is np.nan and time.time() >= trade["fpmm.openingTimestamp"]:
438
  return MarketState.PENDING
439
  elif current_answer == np.nan:
440
  return MarketState.OPEN
441
  elif trade["fpmm.isPendingArbitration"]:
442
  return MarketState.ARBITRATING
443
- elif time.time() < trade["fpmm.answerFinalizedTimestamp"]:
444
  return MarketState.FINALIZING
445
  return MarketState.CLOSED
446
 
@@ -468,9 +468,12 @@ def analyse_trader(
468
  # Iterate over the trades
469
  for i, trade in tqdm(trades.iterrows(), total=len(trades), desc="Analysing trades"):
470
  try:
 
 
 
471
  # Parsing and computing shared values
472
  creation_timestamp_utc = datetime.datetime.fromtimestamp(
473
- trade["creationTimestamp"], tz=datetime.timezone.utc
474
  )
475
  collateral_amount = wei_to_unit(float(trade["collateralAmount"]))
476
  fee_amount = wei_to_unit(float(trade["feeAmount"]))
@@ -497,7 +500,7 @@ def analyse_trader(
497
  if is_invalid:
498
  earnings = collateral_amount
499
  winner_trade = False
500
- elif trade["outcomeIndex"] == current_answer:
501
  earnings = outcome_tokens_traded
502
  winner_trade = True
503
 
@@ -610,6 +613,7 @@ def run_profitability_analysis(rpc):
610
  # load dfs from csv for analysis
611
  print("Preparing data...")
612
  fpmmTrades, tools = prepare_profitalibity_data(rpc)
 
613
 
614
  # all trades profitability df
615
  print("Analysing trades...")
@@ -620,8 +624,8 @@ def run_profitability_analysis(rpc):
620
  summary_df = summary_analyse(all_trades_df)
621
 
622
  # save to csv
623
- all_trades_df.to_csv(DATA_DIR / "all_trades_profitability.csv", index=False)
624
- summary_df.to_csv(DATA_DIR / "summary_profitability.csv", index=False)
625
 
626
  print("Done!")
627
 
 
385
  df.rename(columns={"creator": "trader_address"}, inplace=True)
386
 
387
  # save to csv
388
+ df.to_parquet(DATA_DIR / "fpmmTrades.parquet", index=False)
389
 
390
  return df
391
 
 
396
  # Check if tools.py is in the same directory
397
  try:
398
  # load tools.csv
399
+ tools = pd.read_parquet(DATA_DIR / "tools.parquet")
400
 
401
  # make sure creator_address is in the columns
402
  assert "trader_address" in tools.columns, "trader_address column not found"
 
407
  # drop duplicates
408
  tools.drop_duplicates(inplace=True)
409
 
410
+ print("tools.parquet loaded")
411
  except FileNotFoundError:
412
+ print("tools.parquet not found. Please run tools.py first.")
413
  return
414
 
415
  # Check if fpmmTrades.csv is in the same directory
416
  try:
417
  # load fpmmTrades.csv
418
+ fpmmTrades = pd.read_parquet(DATA_DIR / "fpmmTrades.parquet")
419
+ print("fpmmTrades.parquet loaded")
420
  except FileNotFoundError:
421
+ print("fpmmTrades.parquet not found. Creating fpmmTrades.parquet...")
422
  fpmmTrades = create_fpmmTrades(rpc)
423
+ fpmmTrades.to_parquet(DATA_DIR / "fpmmTrades.parquet", index=False)
424
+ fpmmTrades = pd.read_parquet(DATA_DIR / "fpmmTrades.parquet")
425
 
426
  # make sure trader_address is in the columns
427
  assert "trader_address" in fpmmTrades.columns, "trader_address column not found"
 
434
 
435
  def determine_market_status(trade, current_answer):
436
  """Determine the market status of a trade."""
437
+ if current_answer is np.nan and time.time() >= int(trade["fpmm.openingTimestamp"]):
438
  return MarketState.PENDING
439
  elif current_answer == np.nan:
440
  return MarketState.OPEN
441
  elif trade["fpmm.isPendingArbitration"]:
442
  return MarketState.ARBITRATING
443
+ elif time.time() < int(trade["fpmm.answerFinalizedTimestamp"]):
444
  return MarketState.FINALIZING
445
  return MarketState.CLOSED
446
 
 
468
  # Iterate over the trades
469
  for i, trade in tqdm(trades.iterrows(), total=len(trades), desc="Analysing trades"):
470
  try:
471
+ if not trade['fpmm.currentAnswer']:
472
+ print(f"Skipping trade {i} because currentAnswer is NaN")
473
+ continue
474
  # Parsing and computing shared values
475
  creation_timestamp_utc = datetime.datetime.fromtimestamp(
476
+ int(trade["creationTimestamp"]), tz=datetime.timezone.utc
477
  )
478
  collateral_amount = wei_to_unit(float(trade["collateralAmount"]))
479
  fee_amount = wei_to_unit(float(trade["feeAmount"]))
 
500
  if is_invalid:
501
  earnings = collateral_amount
502
  winner_trade = False
503
+ elif int(trade["outcomeIndex"]) == current_answer:
504
  earnings = outcome_tokens_traded
505
  winner_trade = True
506
 
 
613
  # load dfs from csv for analysis
614
  print("Preparing data...")
615
  fpmmTrades, tools = prepare_profitalibity_data(rpc)
616
+ tools['trader_address'] = tools['trader_address'].str.lower()
617
 
618
  # all trades profitability df
619
  print("Analysing trades...")
 
624
  summary_df = summary_analyse(all_trades_df)
625
 
626
  # save to csv
627
+ all_trades_df.to_parquet(DATA_DIR / "all_trades_profitability.parquet", index=False)
628
+ summary_df.to_parquet(DATA_DIR / "summary_profitability.parquet", index=False)
629
 
630
  print("Done!")
631
 
scripts/pull_data.py CHANGED
@@ -85,16 +85,16 @@ def weekly_analysis():
85
 
86
  # Run profitability analysis
87
  logging.info("Running profitability analysis")
88
- if os.path.exists(DATA_DIR / "fpmmTrades.csv"):
89
- os.remove(DATA_DIR / "fpmmTrades.csv")
90
  run_profitability_analysis(
91
  rpc=rpc,
92
  )
93
  logging.info("Profitability analysis completed")
94
 
95
  # Get currentAnswer from FPMMS
96
- fpmms = pd.read_csv(DATA_DIR / MARKETS_FILENAME)
97
- tools = pd.read_csv(DATA_DIR / TOOLS_FILENAME)
98
 
99
  # Get the question from the tools
100
  logging.info("Getting the question and current answer for the tools")
@@ -123,7 +123,7 @@ def weekly_analysis():
123
  tools['request_month_year_week'] = pd.to_datetime(tools['request_time']).dt.to_period('W').astype(str)
124
 
125
  # Save the tools
126
- tools.to_csv(DATA_DIR / TOOLS_FILENAME, index=False)
127
 
128
  # Update t_map with new timestamps
129
  new_timestamps = tools[['request_block', 'request_time']].dropna().set_index('request_block').to_dict()['request_time']
 
85
 
86
  # Run profitability analysis
87
  logging.info("Running profitability analysis")
88
+ if os.path.exists(DATA_DIR / "fpmmTrades.parquet"):
89
+ os.remove(DATA_DIR / "fpmmTrades.parquet")
90
  run_profitability_analysis(
91
  rpc=rpc,
92
  )
93
  logging.info("Profitability analysis completed")
94
 
95
  # Get currentAnswer from FPMMS
96
+ fpmms = pd.read_parquet(DATA_DIR / MARKETS_FILENAME)
97
+ tools = pd.read_parquet(DATA_DIR / TOOLS_FILENAME)
98
 
99
  # Get the question from the tools
100
  logging.info("Getting the question and current answer for the tools")
 
123
  tools['request_month_year_week'] = pd.to_datetime(tools['request_time']).dt.to_period('W').astype(str)
124
 
125
  # Save the tools
126
+ tools.to_parquet(DATA_DIR / TOOLS_FILENAME, index=False)
127
 
128
  # Update t_map with new timestamps
129
  new_timestamps = tools[['request_block', 'request_time']].dropna().set_index('request_block').to_dict()['request_time']
scripts/tools.py CHANGED
@@ -86,7 +86,7 @@ IPFS_ADDRESS = f"{HTTPS}gateway.autonolas.tech/ipfs/"
86
  IPFS_LINKS_SERIES_NAME = "ipfs_links"
87
  BACKOFF_FACTOR = 1
88
  STATUS_FORCELIST = [404, 500, 502, 503, 504]
89
- DEFAULT_FILENAME = "tools.csv"
90
  RE_RPC_FILTER_ERROR = r"Filter with id: '\d+' does not exist."
91
  ABI_ERROR = "The event signature did not match the provided ABI"
92
  SLEEP = 0.5
@@ -580,7 +580,7 @@ def transform_deliver(contents: pd.DataFrame, full_contents=False) -> pd.DataFra
580
 
581
  def gen_event_filename(event_name: MechEventName) -> str:
582
  """Generate the filename of an event."""
583
- return f"{event_name.value.lower()}s.csv"
584
 
585
 
586
  def read_n_last_lines(filename: str, n: int = 1) -> str:
@@ -605,33 +605,38 @@ def get_earliest_block(event_name: MechEventName) -> int:
605
  if not os.path.exists(DATA_DIR / filename):
606
  return 0
607
 
608
- cols = pd.read_csv(DATA_DIR / filename, index_col=0, nrows=0).columns.tolist()
609
- last_line_buff = StringIO(read_n_last_lines(DATA_DIR/filename))
610
- last_line_series = pd.read_csv(last_line_buff, names=cols)
611
  block_field = f"{event_name.value.lower()}_{BLOCK_FIELD}"
612
- return int(last_line_series[block_field].values[0])
613
 
614
 
615
  def store_progress(
616
  filename: str,
617
- event_to_contents: Dict[MechEventName, pd.DataFrame],
618
  tools: pd.DataFrame,
619
  ) -> None:
620
  """Store the given progress."""
621
  if filename:
 
622
  for event_name, content in event_to_contents.items():
623
- event_filename = gen_event_filename(event_name)
624
-
625
- if "result" in content.columns:
626
- content.drop(columns=["result"], inplace=True)
627
-
628
- content.to_csv(DATA_DIR / event_filename, index=False, escapechar="\\")
629
-
630
- # drop result and error columns
631
- if "result" in tools.columns:
632
- tools.drop(columns=["result"], inplace=True)
633
-
634
- tools.to_csv(DATA_DIR / filename, index=False, escapechar="\\")
 
 
 
 
 
 
635
 
636
 
637
  def etl(
@@ -736,7 +741,7 @@ def etl(
736
  events_filename = gen_event_filename(event_name)
737
 
738
  if os.path.exists(DATA_DIR / events_filename):
739
- old = pd.read_csv(DATA_DIR / events_filename)
740
 
741
  # Reset index to avoid index conflicts
742
  old.reset_index(drop=True, inplace=True)
 
86
  IPFS_LINKS_SERIES_NAME = "ipfs_links"
87
  BACKOFF_FACTOR = 1
88
  STATUS_FORCELIST = [404, 500, 502, 503, 504]
89
+ DEFAULT_FILENAME = "tools.parquet"
90
  RE_RPC_FILTER_ERROR = r"Filter with id: '\d+' does not exist."
91
  ABI_ERROR = "The event signature did not match the provided ABI"
92
  SLEEP = 0.5
 
580
 
581
  def gen_event_filename(event_name: MechEventName) -> str:
582
  """Generate the filename of an event."""
583
+ return f"{event_name.value.lower()}s.parquet"
584
 
585
 
586
  def read_n_last_lines(filename: str, n: int = 1) -> str:
 
605
  if not os.path.exists(DATA_DIR / filename):
606
  return 0
607
 
608
+ df = pd.read_parquet(DATA_DIR / filename)
 
 
609
  block_field = f"{event_name.value.lower()}_{BLOCK_FIELD}"
610
+ return int(df[block_field].max())
611
 
612
 
613
  def store_progress(
614
  filename: str,
615
+ event_to_contents: Dict[str, pd.DataFrame],
616
  tools: pd.DataFrame,
617
  ) -> None:
618
  """Store the given progress."""
619
  if filename:
620
+ DATA_DIR.mkdir(parents=True, exist_ok=True) # Ensure the directory exists
621
  for event_name, content in event_to_contents.items():
622
+ event_filename = gen_event_filename(event_name) # Ensure this function returns a valid filename string
623
+ try:
624
+ if "result" in content.columns:
625
+ content = content.drop(columns=["result"]) # Avoid in-place modification
626
+ if 'error' in content.columns:
627
+ content['error'] = content['error'].astype(bool)
628
+ content.to_parquet(DATA_DIR / event_filename, index=False)
629
+ except Exception as e:
630
+ print(f"Failed to write {event_name}: {e}")
631
+ # Drop result and error columns for tools DataFrame
632
+ try:
633
+ if "result" in tools.columns:
634
+ tools = tools.drop(columns=["result"])
635
+ if 'error' in tools.columns:
636
+ tools['error'] = tools['error'].astype(bool)
637
+ tools.to_parquet(DATA_DIR / filename, index=False)
638
+ except Exception as e:
639
+ print(f"Failed to write tools data: {e}")
640
 
641
 
642
  def etl(
 
741
  events_filename = gen_event_filename(event_name)
742
 
743
  if os.path.exists(DATA_DIR / events_filename):
744
+ old = pd.read_parquet(DATA_DIR / events_filename)
745
 
746
  # Reset index to avoid index conflicts
747
  old.reset_index(drop=True, inplace=True)
test.ipynb CHANGED
The diff for this file is too large to render. See raw diff