File size: 4,147 Bytes
ae57283
285f2a6
ae57283
 
 
da55889
ae57283
 
 
 
 
285f2a6
ae57283
 
 
 
 
 
 
 
 
 
 
 
285f2a6
ae57283
 
 
 
 
 
 
 
 
 
 
 
 
 
 
786c7d5
 
 
ae57283
 
 
 
278fab8
ae57283
278fab8
ae57283
278fab8
 
ae57283
278fab8
285f2a6
278fab8
 
 
285f2a6
278fab8
 
ae57283
278fab8
ae57283
960332d
 
 
285f2a6
 
 
 
 
 
 
 
 
960332d
 
 
 
 
 
 
 
 
 
 
 
6992ec1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ae57283
 
278fab8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import pandas as pd
from utils import DATA_DIR, TMP_DIR, transform_to_datetime


def clean_old_data_from_parquet_files(cutoff_date: str):
    print("Cleaning oldest data")
    # Convert the string to datetime64[ns, UTC]
    min_date_utc = pd.to_datetime(cutoff_date, format="%Y-%m-%d", utc=True)

    # clean tools.parquet
    try:
        tools = pd.read_parquet(TMP_DIR / "tools.parquet")

        # make sure creator_address is in the columns
        assert "trader_address" in tools.columns, "trader_address column not found"

        # lowercase and strip creator_address
        tools["trader_address"] = tools["trader_address"].str.lower().str.strip()

        tools["request_time"] = pd.to_datetime(tools["request_time"], utc=True)

        print(f"length before filtering {len(tools)}")
        tools = tools.loc[tools["request_time"] > min_date_utc]
        print(f"length after filtering {len(tools)}")
        tools.to_parquet(TMP_DIR / "tools.parquet", index=False)

    except Exception as e:
        print(f"Error cleaning tools file {e}")

    # clean all_trades_profitability.parquet
    try:
        all_trades = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet")

        all_trades["creation_timestamp"] = pd.to_datetime(
            all_trades["creation_timestamp"], utc=True
        )

        print(f"length before filtering {len(all_trades)}")
        all_trades = all_trades.loc[all_trades["creation_timestamp"] > min_date_utc]
        print(f"length after filtering {len(all_trades)}")
        all_trades.to_parquet(
            DATA_DIR / "all_trades_profitability.parquet", index=False
        )

    except Exception as e:
        print(f"Error cleaning all trades profitability file {e}")

    # clean unknown_traders.parquet
    try:
        unknown_traders = pd.read_parquet(DATA_DIR / "unknown_traders.parquet")

        unknown_traders["creation_timestamp"] = pd.to_datetime(
            unknown_traders["creation_timestamp"], utc=True
        )

        print(f"length unknown traders before filtering {len(unknown_traders)}")
        unknown_traders = unknown_traders.loc[
            unknown_traders["creation_timestamp"] > min_date_utc
        ]
        print(f"length unknown traders after filtering {len(unknown_traders)}")
        unknown_traders.to_parquet(DATA_DIR / "unknown_traders.parquet", index=False)

    except Exception as e:
        print(f"Error cleaning unknown_traders file {e}")

    # clean fpmmTrades.parquet
    try:
        fpmmTrades = pd.read_parquet(TMP_DIR / "fpmmTrades.parquet")
        try:
            fpmmTrades["creationTimestamp"] = fpmmTrades["creationTimestamp"].apply(
                lambda x: transform_to_datetime(x)
            )
        except Exception as e:
            print(f"Transformation not needed")
        fpmmTrades["creation_timestamp"] = pd.to_datetime(
            fpmmTrades["creationTimestamp"]
        )
        fpmmTrades["creation_timestamp"] = pd.to_datetime(
            fpmmTrades["creation_timestamp"], utc=True
        )

        print(f"length before filtering {len(fpmmTrades)}")
        fpmmTrades = fpmmTrades.loc[fpmmTrades["creation_timestamp"] > min_date_utc]
        print(f"length after filtering {len(fpmmTrades)}")
        fpmmTrades.to_parquet(TMP_DIR / "fpmmTrades.parquet", index=False)

    except Exception as e:
        print(f"Error cleaning fpmmTrades file {e}")

    # clean invalid trades parquet
    try:
        invalid_trades = pd.read_parquet(DATA_DIR / "invalid_trades.parquet")

        invalid_trades["creation_timestamp"] = pd.to_datetime(
            invalid_trades["creation_timestamp"], utc=True
        )

        print(f"length before filtering {len(invalid_trades)}")
        invalid_trades = invalid_trades.loc[
            invalid_trades["creation_timestamp"] > min_date_utc
        ]
        print(f"length after filtering {len(invalid_trades)}")
        invalid_trades.to_parquet(DATA_DIR / "invalid_trades.parquet", index=False)

    except Exception as e:
        print(f"Error cleaning fpmmTrades file {e}")


if __name__ == "__main__":
    clean_old_data_from_parquet_files("2024-10-25")