import pandas as pd from utils import DATA_DIR, TMP_DIR, transform_to_datetime def clean_old_data_from_parquet_files(cutoff_date: str): print("Cleaning oldest data") # Convert the string to datetime64[ns, UTC] min_date_utc = pd.to_datetime(cutoff_date, format="%Y-%m-%d", utc=True) # clean tools.parquet try: tools = pd.read_parquet(TMP_DIR / "tools.parquet") # make sure creator_address is in the columns assert "trader_address" in tools.columns, "trader_address column not found" # lowercase and strip creator_address tools["trader_address"] = tools["trader_address"].str.lower().str.strip() tools["request_time"] = pd.to_datetime(tools["request_time"], utc=True) print(f"length before filtering {len(tools)}") tools = tools.loc[tools["request_time"] > min_date_utc] print(f"length after filtering {len(tools)}") tools.to_parquet(TMP_DIR / "tools.parquet", index=False) except Exception as e: print(f"Error cleaning tools file {e}") # clean all_trades_profitability.parquet try: all_trades = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet") all_trades["creation_timestamp"] = pd.to_datetime( all_trades["creation_timestamp"], utc=True ) print(f"length before filtering {len(all_trades)}") all_trades = all_trades.loc[all_trades["creation_timestamp"] > min_date_utc] print(f"length after filtering {len(all_trades)}") all_trades.to_parquet( DATA_DIR / "all_trades_profitability.parquet", index=False ) except Exception as e: print(f"Error cleaning all trades profitability file {e}") # clean unknown_traders.parquet try: unknown_traders = pd.read_parquet(DATA_DIR / "unknown_traders.parquet") unknown_traders["creation_timestamp"] = pd.to_datetime( unknown_traders["creation_timestamp"], utc=True ) print(f"length unknown traders before filtering {len(unknown_traders)}") unknown_traders = unknown_traders.loc[ unknown_traders["creation_timestamp"] > min_date_utc ] print(f"length unknown traders after filtering {len(unknown_traders)}") unknown_traders.to_parquet(DATA_DIR / "unknown_traders.parquet", index=False) except Exception as e: print(f"Error cleaning unknown_traders file {e}") # clean fpmmTrades.parquet try: fpmmTrades = pd.read_parquet(TMP_DIR / "fpmmTrades.parquet") try: fpmmTrades["creationTimestamp"] = fpmmTrades["creationTimestamp"].apply( lambda x: transform_to_datetime(x) ) except Exception as e: print(f"Transformation not needed") fpmmTrades["creation_timestamp"] = pd.to_datetime( fpmmTrades["creationTimestamp"] ) fpmmTrades["creation_timestamp"] = pd.to_datetime( fpmmTrades["creation_timestamp"], utc=True ) print(f"length before filtering {len(fpmmTrades)}") fpmmTrades = fpmmTrades.loc[fpmmTrades["creation_timestamp"] > min_date_utc] print(f"length after filtering {len(fpmmTrades)}") fpmmTrades.to_parquet(TMP_DIR / "fpmmTrades.parquet", index=False) except Exception as e: print(f"Error cleaning fpmmTrades file {e}") # clean invalid trades parquet try: invalid_trades = pd.read_parquet(DATA_DIR / "invalid_trades.parquet") invalid_trades["creation_timestamp"] = pd.to_datetime( invalid_trades["creation_timestamp"], utc=True ) print(f"length before filtering {len(invalid_trades)}") invalid_trades = invalid_trades.loc[ invalid_trades["creation_timestamp"] > min_date_utc ] print(f"length after filtering {len(invalid_trades)}") invalid_trades.to_parquet(DATA_DIR / "invalid_trades.parquet", index=False) except Exception as e: print(f"Error cleaning fpmmTrades file {e}") if __name__ == "__main__": clean_old_data_from_parquet_files("2024-10-25")