updating week format starting on Monday, new staking contracts and new weekly data
285f2a6
import pandas as pd | |
from utils import DATA_DIR, TMP_DIR, transform_to_datetime | |
def clean_old_data_from_parquet_files(cutoff_date: str): | |
print("Cleaning oldest data") | |
# Convert the string to datetime64[ns, UTC] | |
min_date_utc = pd.to_datetime(cutoff_date, format="%Y-%m-%d", utc=True) | |
# clean tools.parquet | |
try: | |
tools = pd.read_parquet(TMP_DIR / "tools.parquet") | |
# make sure creator_address is in the columns | |
assert "trader_address" in tools.columns, "trader_address column not found" | |
# lowercase and strip creator_address | |
tools["trader_address"] = tools["trader_address"].str.lower().str.strip() | |
tools["request_time"] = pd.to_datetime(tools["request_time"], utc=True) | |
print(f"length before filtering {len(tools)}") | |
tools = tools.loc[tools["request_time"] > min_date_utc] | |
print(f"length after filtering {len(tools)}") | |
tools.to_parquet(TMP_DIR / "tools.parquet", index=False) | |
except Exception as e: | |
print(f"Error cleaning tools file {e}") | |
# clean all_trades_profitability.parquet | |
try: | |
all_trades = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet") | |
all_trades["creation_timestamp"] = pd.to_datetime( | |
all_trades["creation_timestamp"], utc=True | |
) | |
print(f"length before filtering {len(all_trades)}") | |
all_trades = all_trades.loc[all_trades["creation_timestamp"] > min_date_utc] | |
print(f"length after filtering {len(all_trades)}") | |
all_trades.to_parquet( | |
DATA_DIR / "all_trades_profitability.parquet", index=False | |
) | |
except Exception as e: | |
print(f"Error cleaning all trades profitability file {e}") | |
# clean unknown_traders.parquet | |
try: | |
unknown_traders = pd.read_parquet(DATA_DIR / "unknown_traders.parquet") | |
unknown_traders["creation_timestamp"] = pd.to_datetime( | |
unknown_traders["creation_timestamp"], utc=True | |
) | |
print(f"length unknown traders before filtering {len(unknown_traders)}") | |
unknown_traders = unknown_traders.loc[ | |
unknown_traders["creation_timestamp"] > min_date_utc | |
] | |
print(f"length unknown traders after filtering {len(unknown_traders)}") | |
unknown_traders.to_parquet(DATA_DIR / "unknown_traders.parquet", index=False) | |
except Exception as e: | |
print(f"Error cleaning unknown_traders file {e}") | |
# clean fpmmTrades.parquet | |
try: | |
fpmmTrades = pd.read_parquet(TMP_DIR / "fpmmTrades.parquet") | |
try: | |
fpmmTrades["creationTimestamp"] = fpmmTrades["creationTimestamp"].apply( | |
lambda x: transform_to_datetime(x) | |
) | |
except Exception as e: | |
print(f"Transformation not needed") | |
fpmmTrades["creation_timestamp"] = pd.to_datetime( | |
fpmmTrades["creationTimestamp"] | |
) | |
fpmmTrades["creation_timestamp"] = pd.to_datetime( | |
fpmmTrades["creation_timestamp"], utc=True | |
) | |
print(f"length before filtering {len(fpmmTrades)}") | |
fpmmTrades = fpmmTrades.loc[fpmmTrades["creation_timestamp"] > min_date_utc] | |
print(f"length after filtering {len(fpmmTrades)}") | |
fpmmTrades.to_parquet(TMP_DIR / "fpmmTrades.parquet", index=False) | |
except Exception as e: | |
print(f"Error cleaning fpmmTrades file {e}") | |
# clean invalid trades parquet | |
try: | |
invalid_trades = pd.read_parquet(DATA_DIR / "invalid_trades.parquet") | |
invalid_trades["creation_timestamp"] = pd.to_datetime( | |
invalid_trades["creation_timestamp"], utc=True | |
) | |
print(f"length before filtering {len(invalid_trades)}") | |
invalid_trades = invalid_trades.loc[ | |
invalid_trades["creation_timestamp"] > min_date_utc | |
] | |
print(f"length after filtering {len(invalid_trades)}") | |
invalid_trades.to_parquet(DATA_DIR / "invalid_trades.parquet", index=False) | |
except Exception as e: | |
print(f"Error cleaning fpmmTrades file {e}") | |
if __name__ == "__main__": | |
clean_old_data_from_parquet_files("2024-10-25") | |