rosacastillo's picture
updating week format starting on Monday, new staking contracts and new weekly data
285f2a6
raw
history blame
4.15 kB
import pandas as pd
from utils import DATA_DIR, TMP_DIR, transform_to_datetime
def clean_old_data_from_parquet_files(cutoff_date: str):
print("Cleaning oldest data")
# Convert the string to datetime64[ns, UTC]
min_date_utc = pd.to_datetime(cutoff_date, format="%Y-%m-%d", utc=True)
# clean tools.parquet
try:
tools = pd.read_parquet(TMP_DIR / "tools.parquet")
# make sure creator_address is in the columns
assert "trader_address" in tools.columns, "trader_address column not found"
# lowercase and strip creator_address
tools["trader_address"] = tools["trader_address"].str.lower().str.strip()
tools["request_time"] = pd.to_datetime(tools["request_time"], utc=True)
print(f"length before filtering {len(tools)}")
tools = tools.loc[tools["request_time"] > min_date_utc]
print(f"length after filtering {len(tools)}")
tools.to_parquet(TMP_DIR / "tools.parquet", index=False)
except Exception as e:
print(f"Error cleaning tools file {e}")
# clean all_trades_profitability.parquet
try:
all_trades = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet")
all_trades["creation_timestamp"] = pd.to_datetime(
all_trades["creation_timestamp"], utc=True
)
print(f"length before filtering {len(all_trades)}")
all_trades = all_trades.loc[all_trades["creation_timestamp"] > min_date_utc]
print(f"length after filtering {len(all_trades)}")
all_trades.to_parquet(
DATA_DIR / "all_trades_profitability.parquet", index=False
)
except Exception as e:
print(f"Error cleaning all trades profitability file {e}")
# clean unknown_traders.parquet
try:
unknown_traders = pd.read_parquet(DATA_DIR / "unknown_traders.parquet")
unknown_traders["creation_timestamp"] = pd.to_datetime(
unknown_traders["creation_timestamp"], utc=True
)
print(f"length unknown traders before filtering {len(unknown_traders)}")
unknown_traders = unknown_traders.loc[
unknown_traders["creation_timestamp"] > min_date_utc
]
print(f"length unknown traders after filtering {len(unknown_traders)}")
unknown_traders.to_parquet(DATA_DIR / "unknown_traders.parquet", index=False)
except Exception as e:
print(f"Error cleaning unknown_traders file {e}")
# clean fpmmTrades.parquet
try:
fpmmTrades = pd.read_parquet(TMP_DIR / "fpmmTrades.parquet")
try:
fpmmTrades["creationTimestamp"] = fpmmTrades["creationTimestamp"].apply(
lambda x: transform_to_datetime(x)
)
except Exception as e:
print(f"Transformation not needed")
fpmmTrades["creation_timestamp"] = pd.to_datetime(
fpmmTrades["creationTimestamp"]
)
fpmmTrades["creation_timestamp"] = pd.to_datetime(
fpmmTrades["creation_timestamp"], utc=True
)
print(f"length before filtering {len(fpmmTrades)}")
fpmmTrades = fpmmTrades.loc[fpmmTrades["creation_timestamp"] > min_date_utc]
print(f"length after filtering {len(fpmmTrades)}")
fpmmTrades.to_parquet(TMP_DIR / "fpmmTrades.parquet", index=False)
except Exception as e:
print(f"Error cleaning fpmmTrades file {e}")
# clean invalid trades parquet
try:
invalid_trades = pd.read_parquet(DATA_DIR / "invalid_trades.parquet")
invalid_trades["creation_timestamp"] = pd.to_datetime(
invalid_trades["creation_timestamp"], utc=True
)
print(f"length before filtering {len(invalid_trades)}")
invalid_trades = invalid_trades.loc[
invalid_trades["creation_timestamp"] > min_date_utc
]
print(f"length after filtering {len(invalid_trades)}")
invalid_trades.to_parquet(DATA_DIR / "invalid_trades.parquet", index=False)
except Exception as e:
print(f"Error cleaning fpmmTrades file {e}")
if __name__ == "__main__":
clean_old_data_from_parquet_files("2024-10-25")