File size: 4,147 Bytes
ae57283 285f2a6 ae57283 da55889 ae57283 285f2a6 ae57283 285f2a6 ae57283 786c7d5 ae57283 278fab8 ae57283 278fab8 ae57283 278fab8 ae57283 278fab8 285f2a6 278fab8 285f2a6 278fab8 ae57283 278fab8 ae57283 960332d 285f2a6 960332d 6992ec1 ae57283 278fab8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
import pandas as pd
from utils import DATA_DIR, TMP_DIR, transform_to_datetime
def clean_old_data_from_parquet_files(cutoff_date: str):
print("Cleaning oldest data")
# Convert the string to datetime64[ns, UTC]
min_date_utc = pd.to_datetime(cutoff_date, format="%Y-%m-%d", utc=True)
# clean tools.parquet
try:
tools = pd.read_parquet(TMP_DIR / "tools.parquet")
# make sure creator_address is in the columns
assert "trader_address" in tools.columns, "trader_address column not found"
# lowercase and strip creator_address
tools["trader_address"] = tools["trader_address"].str.lower().str.strip()
tools["request_time"] = pd.to_datetime(tools["request_time"], utc=True)
print(f"length before filtering {len(tools)}")
tools = tools.loc[tools["request_time"] > min_date_utc]
print(f"length after filtering {len(tools)}")
tools.to_parquet(TMP_DIR / "tools.parquet", index=False)
except Exception as e:
print(f"Error cleaning tools file {e}")
# clean all_trades_profitability.parquet
try:
all_trades = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet")
all_trades["creation_timestamp"] = pd.to_datetime(
all_trades["creation_timestamp"], utc=True
)
print(f"length before filtering {len(all_trades)}")
all_trades = all_trades.loc[all_trades["creation_timestamp"] > min_date_utc]
print(f"length after filtering {len(all_trades)}")
all_trades.to_parquet(
DATA_DIR / "all_trades_profitability.parquet", index=False
)
except Exception as e:
print(f"Error cleaning all trades profitability file {e}")
# clean unknown_traders.parquet
try:
unknown_traders = pd.read_parquet(DATA_DIR / "unknown_traders.parquet")
unknown_traders["creation_timestamp"] = pd.to_datetime(
unknown_traders["creation_timestamp"], utc=True
)
print(f"length unknown traders before filtering {len(unknown_traders)}")
unknown_traders = unknown_traders.loc[
unknown_traders["creation_timestamp"] > min_date_utc
]
print(f"length unknown traders after filtering {len(unknown_traders)}")
unknown_traders.to_parquet(DATA_DIR / "unknown_traders.parquet", index=False)
except Exception as e:
print(f"Error cleaning unknown_traders file {e}")
# clean fpmmTrades.parquet
try:
fpmmTrades = pd.read_parquet(TMP_DIR / "fpmmTrades.parquet")
try:
fpmmTrades["creationTimestamp"] = fpmmTrades["creationTimestamp"].apply(
lambda x: transform_to_datetime(x)
)
except Exception as e:
print(f"Transformation not needed")
fpmmTrades["creation_timestamp"] = pd.to_datetime(
fpmmTrades["creationTimestamp"]
)
fpmmTrades["creation_timestamp"] = pd.to_datetime(
fpmmTrades["creation_timestamp"], utc=True
)
print(f"length before filtering {len(fpmmTrades)}")
fpmmTrades = fpmmTrades.loc[fpmmTrades["creation_timestamp"] > min_date_utc]
print(f"length after filtering {len(fpmmTrades)}")
fpmmTrades.to_parquet(TMP_DIR / "fpmmTrades.parquet", index=False)
except Exception as e:
print(f"Error cleaning fpmmTrades file {e}")
# clean invalid trades parquet
try:
invalid_trades = pd.read_parquet(DATA_DIR / "invalid_trades.parquet")
invalid_trades["creation_timestamp"] = pd.to_datetime(
invalid_trades["creation_timestamp"], utc=True
)
print(f"length before filtering {len(invalid_trades)}")
invalid_trades = invalid_trades.loc[
invalid_trades["creation_timestamp"] > min_date_utc
]
print(f"length after filtering {len(invalid_trades)}")
invalid_trades.to_parquet(DATA_DIR / "invalid_trades.parquet", index=False)
except Exception as e:
print(f"Error cleaning fpmmTrades file {e}")
if __name__ == "__main__":
clean_old_data_from_parquet_files("2024-10-25")
|