File size: 13,509 Bytes
04a2c17
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b60f995
04a2c17
 
 
 
 
b60f995
f9ef62b
 
 
 
 
 
7652a7b
 
 
 
 
 
 
 
 
 
 
 
04a2c17
 
 
 
786c7d5
 
7652a7b
04a2c17
ac347d2
786c7d5
04a2c17
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
786c7d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
285f2a6
 
 
 
786c7d5
285f2a6
786c7d5
 
 
 
285f2a6
786c7d5
285f2a6
786c7d5
 
 
 
 
 
 
 
 
 
285f2a6
786c7d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e01feae
04a2c17
f0e76cb
 
e01feae
 
786c7d5
e01feae
786c7d5
04a2c17
 
 
e01feae
04a2c17
 
 
 
 
 
 
 
 
e01feae
f0e76cb
04a2c17
 
e01feae
04a2c17
 
 
e01feae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
04a2c17
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e01feae
 
 
 
 
 
 
 
fd27df2
e01feae
04a2c17
 
ac347d2
04a2c17
 
 
 
b60f995
c255cf4
 
786c7d5
c255cf4
 
 
b60f995
 
 
 
 
 
c255cf4
 
6992ec1
c255cf4
 
 
 
 
 
 
 
6992ec1
c255cf4
 
 
 
 
 
 
6992ec1
c255cf4
 
 
285f2a6
 
 
786c7d5
 
285f2a6
 
 
786c7d5
 
 
 
 
 
 
 
 
 
 
 
b60f995
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
04a2c17
e01feae
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
#   -*- coding: utf-8 -*-
#   ------------------------------------------------------------------------------
#
#     Copyright 2023 Valory AG
#
#     Licensed under the Apache License, Version 2.0 (the "License");
#     you may not use this file except in compliance with the License.
#     You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#     Unless required by applicable law or agreed to in writing, software
#     distributed under the License is distributed on an "AS IS" BASIS,
#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#     See the License for the specific language governing permissions and
#     limitations under the License.
#
#   ------------------------------------------------------------------------------

import functools
import warnings
from datetime import datetime, timedelta
from typing import Optional, Generator, Callable
import pandas as pd
import requests
from tqdm import tqdm
from typing import List, Dict
from utils import SUBGRAPH_API_KEY, DATA_DIR, TMP_DIR, transform_to_datetime
from web3_utils import (
    FPMM_QS_CREATOR,
    FPMM_PEARL_CREATOR,
    query_omen_xdai_subgraph,
    OMEN_SUBGRAPH_URL,
)
from queries import (
    FPMMS_QUERY,
    ID_FIELD,
    DATA_FIELD,
    ANSWER_FIELD,
    QUERY_FIELD,
    TITLE_FIELD,
    OUTCOMES_FIELD,
    ERROR_FIELD,
    QUESTION_FIELD,
    FPMMS_FIELD,
)

ResponseItemType = List[Dict[str, str]]
SubgraphResponseType = Dict[str, ResponseItemType]
BATCH_SIZE = 1000
DEFAULT_TO_TIMESTAMP = 2147483647  # around year 2038
DEFAULT_FROM_TIMESTAMP = 0

MAX_UINT_HEX = "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"
DEFAULT_FILENAME = "fpmms.parquet"
market_creators_map = {"quickstart": FPMM_QS_CREATOR, "pearl": FPMM_PEARL_CREATOR}


class RetriesExceeded(Exception):
    """Exception to raise when retries are exceeded during data-fetching."""

    def __init__(
        self, msg="Maximum retries were exceeded while trying to fetch the data!"
    ):
        super().__init__(msg)


def hacky_retry(func: Callable, n_retries: int = 3) -> Callable:
    """Create a hacky retry strategy.
        Unfortunately, we cannot use `requests.packages.urllib3.util.retry.Retry`,
        because the subgraph does not return the appropriate status codes in case of failure.
        Instead, it always returns code 200. Thus, we raise exceptions manually inside `make_request`,
        catch those exceptions in the hacky retry decorator and try again.
        Finally, if the allowed number of retries is exceeded, we raise a custom `RetriesExceeded` exception.

    :param func: the input request function.
    :param n_retries: the maximum allowed number of retries.
    :return: The request method with the hacky retry strategy applied.
    """

    @functools.wraps(func)
    def wrapper_hacky_retry(*args, **kwargs) -> SubgraphResponseType:
        """The wrapper for the hacky retry.

        :return: a response dictionary.
        """
        retried = 0

        while retried <= n_retries:
            try:
                if retried > 0:
                    warnings.warn(f"Retrying {retried}/{n_retries}...")

                return func(*args, **kwargs)
            except (ValueError, ConnectionError) as e:
                warnings.warn(e.args[0])
            finally:
                retried += 1

        raise RetriesExceeded()

    return wrapper_hacky_retry


@hacky_retry
def query_subgraph(url: str, query: str, key: str) -> SubgraphResponseType:
    """Query a subgraph.

    Args:
        url: the subgraph's URL.
        query: the query to be used.
        key: the key to use in order to access the required data.

    Returns:
        a response dictionary.
    """
    content = {QUERY_FIELD: query}
    headers = {
        "Accept": "application/json",
        "Content-Type": "application/json",
    }
    res = requests.post(url, json=content, headers=headers)

    if res.status_code != 200:
        raise ConnectionError(
            "Something went wrong while trying to communicate with the subgraph "
            f"(Error: {res.status_code})!\n{res.text}"
        )

    body = res.json()
    if ERROR_FIELD in body.keys():
        raise ValueError(f"The given query is not correct: {body[ERROR_FIELD]}")

    data = body.get(DATA_FIELD, {}).get(key, None)
    if data is None:
        raise ValueError(f"Unknown error encountered!\nRaw response: \n{body}")

    return data


def transform_fpmmTrades(df: pd.DataFrame) -> pd.DataFrame:
    print("Transforming trades dataframe")
    # convert creator to address
    df["creator"] = df["creator"].apply(lambda x: x["id"])

    # normalize fpmm column
    fpmm = pd.json_normalize(df["fpmm"])
    fpmm.columns = [f"fpmm.{col}" for col in fpmm.columns]
    df = pd.concat([df, fpmm], axis=1)

    # drop fpmm column
    df.drop(["fpmm"], axis=1, inplace=True)

    # change creator to creator_address
    df.rename(columns={"creator": "trader_address"}, inplace=True)
    return df


def create_fpmmTrades(
    from_timestamp: int = DEFAULT_FROM_TIMESTAMP,
    to_timestamp: int = DEFAULT_TO_TIMESTAMP,
):
    """Create fpmmTrades for all trades."""
    print("Getting trades from Quickstart markets")
    # Quickstart trades
    qs_trades_json = query_omen_xdai_subgraph(
        trader_category="quickstart",
        from_timestamp=from_timestamp,
        to_timestamp=to_timestamp,
        fpmm_from_timestamp=from_timestamp,
        fpmm_to_timestamp=to_timestamp,
    )

    print(f"length of the qs_trades_json dataset {len(qs_trades_json)}")

    # convert to dataframe
    qs_df = pd.DataFrame(qs_trades_json["data"]["fpmmTrades"])
    qs_df["market_creator"] = "quickstart"
    qs_df = transform_fpmmTrades(qs_df)

    # Pearl trades
    print("Getting trades from Pearl markets")
    pearl_trades_json = query_omen_xdai_subgraph(
        trader_category="pearl",
        from_timestamp=from_timestamp,
        to_timestamp=DEFAULT_TO_TIMESTAMP,
        fpmm_from_timestamp=from_timestamp,
        fpmm_to_timestamp=DEFAULT_TO_TIMESTAMP,
    )

    print(f"length of the pearl_trades_json dataset {len(pearl_trades_json)}")

    # convert to dataframe
    pearl_df = pd.DataFrame(pearl_trades_json["data"]["fpmmTrades"])
    pearl_df["market_creator"] = "pearl"
    pearl_df = transform_fpmmTrades(pearl_df)

    return pd.concat([qs_df, pearl_df], ignore_index=True)


def fpmms_fetcher(trader_category: str) -> Generator[ResponseItemType, int, None]:
    """An indefinite fetcher for the FPMMs."""
    omen_subgraph = OMEN_SUBGRAPH_URL.substitute(subgraph_api_key=SUBGRAPH_API_KEY)
    print(f"omen_subgraph = {omen_subgraph}")

    if trader_category == "pearl":
        creator_id = FPMM_PEARL_CREATOR
    else:  # quickstart
        creator_id = FPMM_QS_CREATOR
    while True:
        fpmm_id = yield
        fpmms_query = FPMMS_QUERY.substitute(
            creator=creator_id,
            fpmm_id=fpmm_id,
            fpmms_field=FPMMS_FIELD,
            first=BATCH_SIZE,
            id_field=ID_FIELD,
            answer_field=ANSWER_FIELD,
            question_field=QUESTION_FIELD,
            outcomes_field=OUTCOMES_FIELD,
            title_field=TITLE_FIELD,
        )
        print(f"markets query = {fpmms_query}")
        yield query_subgraph(omen_subgraph, fpmms_query, FPMMS_FIELD)


def fetch_qs_fpmms() -> pd.DataFrame:
    """Fetch all the fpmms of the creator."""
    latest_id = ""
    fpmms = []
    trader_category = "quickstart"
    print(f"Getting markets for {trader_category}")
    fetcher = fpmms_fetcher(trader_category)
    for _ in tqdm(fetcher, unit="fpmms", unit_scale=BATCH_SIZE):
        batch = fetcher.send(latest_id)
        if len(batch) == 0:
            break

        latest_id = batch[-1].get(ID_FIELD, "")
        if latest_id == "":
            raise ValueError(f"Unexpected data format retrieved: {batch}")

        fpmms.extend(batch)

    return pd.DataFrame(fpmms)


def fetch_pearl_fpmms() -> pd.DataFrame:
    """Fetch all the fpmms of the creator."""
    latest_id = ""
    fpmms = []
    trader_category = "pearl"
    print(f"Getting markets for {trader_category}")
    fetcher = fpmms_fetcher(trader_category)
    for _ in tqdm(fetcher, unit="fpmms", unit_scale=BATCH_SIZE):
        batch = fetcher.send(latest_id)
        if len(batch) == 0:
            break

        latest_id = batch[-1].get(ID_FIELD, "")
        if latest_id == "":
            raise ValueError(f"Unexpected data format retrieved: {batch}")

        fpmms.extend(batch)

    return pd.DataFrame(fpmms)


def get_answer(fpmm: pd.Series) -> str:
    """Get an answer from its index, using Series of an FPMM."""
    return fpmm[QUESTION_FIELD][OUTCOMES_FIELD][fpmm[ANSWER_FIELD]]


def transform_fpmms(fpmms: pd.DataFrame) -> pd.DataFrame:
    """Transform an FPMMS dataframe."""
    transformed = fpmms.dropna()
    transformed = transformed.drop_duplicates([ID_FIELD])
    transformed = transformed.loc[transformed[ANSWER_FIELD] != MAX_UINT_HEX]
    transformed.loc[:, ANSWER_FIELD] = (
        transformed[ANSWER_FIELD].str.slice(-1).astype(int)
    )
    transformed.loc[:, ANSWER_FIELD] = transformed.apply(get_answer, axis=1)
    transformed = transformed.drop(columns=[QUESTION_FIELD])

    return transformed


def etl(filename: Optional[str] = None) -> pd.DataFrame:
    """Fetch, process, store and return the markets as a Dataframe."""
    qs_fpmms = fetch_qs_fpmms()
    qs_fpmms = transform_fpmms(qs_fpmms)
    qs_fpmms["market_creator"] = "quickstart"
    print(f"Results for the market creator quickstart. Len = {len(qs_fpmms)}")

    pearl_fpmms = fetch_pearl_fpmms()
    pearl_fpmms = transform_fpmms(pearl_fpmms)
    pearl_fpmms["market_creator"] = "pearl"
    print(f"Results for the market creator pearl. Len = {len(pearl_fpmms)}")
    fpmms = pd.concat([qs_fpmms, pearl_fpmms], ignore_index=True)

    if filename:
        fpmms.to_parquet(DATA_DIR / filename, index=False)

    return fpmms


def read_global_trades_file() -> pd.DataFrame:
    try:
        trades_filename = "fpmmTrades.parquet"
        fpmms_trades = pd.read_parquet(TMP_DIR / trades_filename)
    except FileNotFoundError:
        print("Error: fpmmTrades.parquet not found. No market creator added")
        return
    return fpmms_trades


def add_market_creator(tools: pd.DataFrame) -> None:
    # Check if fpmmTrades.parquet is in the same directory
    fpmms_trades = read_global_trades_file()
    tools["market_creator"] = ""
    # traverse the list of traders
    tools_no_market_creator = 0
    traders_list = list(tools.trader_address.unique())
    for trader_address in traders_list:
        market_creator = ""
        try:
            trades = fpmms_trades[fpmms_trades["trader_address"] == trader_address]
            market_creator = trades.iloc[0]["market_creator"]  # first value is enough
        except Exception:
            print(f"ERROR getting the market creator of {trader_address}")
            tools_no_market_creator += 1
            continue
        # update
        tools.loc[tools["trader_address"] == trader_address, "market_creator"] = (
            market_creator
        )
    # filter those tools where we don't have market creator info
    tools = tools.loc[tools["market_creator"] != ""]
    print(f"Number of tools with no market creator info = {tools_no_market_creator}")
    return tools


def fpmmTrades_etl(
    trades_filename: str, from_timestamp: int, to_timestamp: int = DEFAULT_TO_TIMESTAMP
) -> None:
    print("Generating the trades file")
    try:
        fpmmTrades = create_fpmmTrades(
            from_timestamp=from_timestamp, to_timestamp=to_timestamp
        )
    except FileNotFoundError:
        print(f"Error creating {trades_filename} file .")

    # make sure trader_address is in the columns
    assert "trader_address" in fpmmTrades.columns, "trader_address column not found"

    # lowercase and strip creator_address
    fpmmTrades["trader_address"] = fpmmTrades["trader_address"].str.lower().str.strip()
    fpmmTrades.to_parquet(DATA_DIR / trades_filename, index=False)
    return


def check_current_week_data(trades_df: pd.DataFrame) -> pd.DataFrame:
    """Function to check if all current weeks data is present, if not, then add the missing data from previous file"""
    # Get current date
    now = datetime.now()

    # Get start of the current week (Monday)
    start_of_week = now - timedelta(days=now.weekday())
    start_of_week = start_of_week.replace(hour=0, minute=0, second=0, microsecond=0)
    print(f"start of the week = {start_of_week}")

    trades_df["creation_timestamp"] = pd.to_datetime(trades_df["creationTimestamp"])
    trades_df["creation_date"] = trades_df["creation_timestamp"].dt.date
    trades_df["creation_date"] = pd.to_datetime(trades_df["creation_date"])
    # Check dataframe
    min_date = min(trades_df.creation_date)
    if min_date > start_of_week:
        # missing data of current week in the trades file
        fpmms_trades = read_global_trades_file()
        # get missing data
        missing_data = fpmms_trades[
            (fpmms_trades["creation_date"] >= start_of_week)
            & (fpmms_trades["creation_date"] < min_date)
        ]
        merge_df = pd.concat([trades_df, missing_data], ignore_index=True)
        merge_df.drop_duplicates("id", keep="last", inplace=True)
        return merge_df
    # no update needed
    return trades_df


if __name__ == "__main__":
    etl("all_fpmms.parquet")