{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "from datetime import datetime\n", "from tqdm import tqdm\n", "\n", "import time\n", "import requests\n", "import datetime\n", "import pandas as pd\n", "from collections import defaultdict\n", "from typing import Any, Union, List\n", "from string import Template\n", "from enum import Enum\n", "from tqdm import tqdm\n", "import numpy as np\n", "from pathlib import Path\n", "import pickle" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# trades = pd.read_parquet('/Users/arshath/play/openautonomy/olas-prediction-live-dashboard_old/data/all_trades_profitability.parquet')\n", "tools = pd.read_parquet('/Users/arshath/play/openautonomy/olas-prediction-live-dashboard_old/data/tools.parquet')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tools.groupby(['request_month_year_week', 'error']).size().unstack()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "t_map = pickle.load(open('./data/t_map.pkl', 'rb'))\n", "tools['request_time'] = tools['request_block'].map(t_map)\n", "tools.to_parquet('./data/tools.parquet')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tools['request_time'] = pd.to_datetime(tools['request_time'])\n", "tools = tools[tools['request_time'] >= pd.to_datetime('2024-05-01')]\n", "tools['request_block'].max()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "requests = pd.read_parquet(\"./data/requests.parquet\")\n", "delivers = pd.read_parquet(\"./data/delivers.parquet\")\n", "print(requests.shape)\n", "print(delivers.shape)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "requests[requests['request_block'] <= 33714082].reset_index(drop=True).to_parquet(\"./data/requests.parquet\")\n", "delivers[delivers['deliver_block'] <= 33714082].reset_index(drop=True).to_parquet(\"./data/delivers.parquet\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys \n", "\n", "sys.path.append('./')\n", "from scripts.tools import *" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "RPCs = [\n", " \"https://lb.nodies.app/v1/406d8dcc043f4cb3959ed7d6673d311a\",\n", "]\n", "w3s = [Web3(HTTPProvider(r)) for r in RPCs]\n", "session = create_session()\n", "event_to_transformer = {\n", " MechEventName.REQUEST: transform_request,\n", " MechEventName.DELIVER: transform_deliver,\n", "}\n", "mech_to_info = {\n", " to_checksum_address(address): (\n", " os.path.join(CONTRACTS_PATH, filename),\n", " earliest_block,\n", " )\n", " for address, (filename, earliest_block) in MECH_TO_INFO.items()\n", "}\n", "event_to_contents = {}\n", "\n", "# latest_block = w3s[0].eth.get_block(LATEST_BLOCK_NAME)[BLOCK_DATA_NUMBER]\n", "latest_block = 34032575\n", "\n", "next_start_block = latest_block - 300\n", "\n", "events_request = []\n", "events_deliver = []\n", "# Loop through events in event_to_transformer\n", "for event_name, transformer in event_to_transformer.items():\n", " print(f\"Fetching {event_name.value} events\")\n", " for address, (abi, earliest_block) in mech_to_info.items():\n", " # parallelize the fetching of events\n", " with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:\n", " futures = []\n", " for i in range(\n", " next_start_block, latest_block, BLOCKS_CHUNK_SIZE * SNAPSHOT_RATE\n", " ):\n", " futures.append(\n", " executor.submit(\n", " get_events,\n", " random.choice(w3s),\n", " event_name.value,\n", " address,\n", " abi,\n", " i,\n", " min(i + BLOCKS_CHUNK_SIZE * SNAPSHOT_RATE, latest_block),\n", " )\n", " )\n", "\n", " for future in tqdm(\n", " as_completed(futures),\n", " total=len(futures),\n", " desc=f\"Fetching {event_name.value} Events\",\n", " ):\n", " current_mech_events = future.result()\n", " if event_name == MechEventName.REQUEST:\n", " events_request.extend(current_mech_events)\n", " elif event_name == MechEventName.DELIVER:\n", " events_deliver.extend(current_mech_events)\n", "\n", " parsed_request = parse_events(events_request)\n", " parsed_deliver = parse_events(events_deliver)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "contents_request = []\n", "with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:\n", " futures = []\n", " for i in range(0, len(parsed_request), GET_CONTENTS_BATCH_SIZE):\n", " futures.append(\n", " executor.submit(\n", " get_contents,\n", " session,\n", " parsed_request[i : i + GET_CONTENTS_BATCH_SIZE],\n", " MechEventName.REQUEST,\n", " )\n", " )\n", "\n", " for future in tqdm(\n", " as_completed(futures),\n", " total=len(futures),\n", " desc=f\"Fetching {event_name.value} Contents\",\n", " ):\n", " current_mech_contents = future.result()\n", " contents_request.append(current_mech_contents)\n", "\n", "contents_request = pd.concat(contents_request, ignore_index=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "contents_deliver = []\n", "with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:\n", " futures = []\n", " for i in range(0, len(parsed_deliver), GET_CONTENTS_BATCH_SIZE):\n", " futures.append(\n", " executor.submit(\n", " get_contents,\n", " session,\n", " parsed_deliver[i : i + GET_CONTENTS_BATCH_SIZE],\n", " MechEventName.DELIVER,\n", " )\n", " )\n", "\n", " for future in tqdm(\n", " as_completed(futures),\n", " total=len(futures),\n", " desc=f\"Fetching {event_name.value} Contents\",\n", " ):\n", " current_mech_contents = future.result()\n", " contents_deliver.append(current_mech_contents)\n", "\n", "contents_deliver = pd.concat(contents_deliver, ignore_index=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "full_contents = True\n", "transformed_request = event_to_transformer[MechEventName.REQUEST](contents_request)\n", "transformed_deliver = event_to_transformer[MechEventName.DELIVER](contents_deliver, full_contents=full_contents)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "transformed_request.shape" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "transformed_deliver.shape" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tools = pd.merge(transformed_request, transformed_deliver, on=REQUEST_ID_FIELD)\n", "tools.columns" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def store_progress(\n", " filename: str,\n", " event_to_contents: Dict[str, pd.DataFrame],\n", " tools: pd.DataFrame,\n", ") -> None:\n", " \"\"\"Store the given progress.\"\"\"\n", " if filename:\n", " DATA_DIR.mkdir(parents=True, exist_ok=True) # Ensure the directory exists\n", " for event_name, content in event_to_contents.items():\n", " event_filename = gen_event_filename(event_name) # Ensure this function returns a valid filename string\n", " try:\n", " if \"result\" in content.columns:\n", " content = content.drop(columns=[\"result\"]) # Avoid in-place modification\n", " if 'error' in content.columns:\n", " content['error'] = content['error'].astype(bool)\n", " content.to_parquet(DATA_DIR / event_filename, index=False)\n", " except Exception as e:\n", " print(f\"Failed to write {event_name}: {e}\")\n", " try:\n", " if \"result\" in tools.columns:\n", " tools = tools.drop(columns=[\"result\"])\n", " if 'error' in tools.columns:\n", " tools['error'] = tools['error'].astype(bool)\n", " tools.to_parquet(DATA_DIR / filename, index=False)\n", " except Exception as e:\n", " print(f\"Failed to write tools data: {e}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# store_progress(filename, event_to_contents, tools)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if 'result' in transformed_deliver.columns:\n", " transformed_deliver = transformed_deliver.drop(columns=['result'])\n", "if 'error' in transformed_deliver.columns:\n", " transformed_deliver['error'] = transformed_deliver['error'].astype(bool)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "transformed_deliver.to_parquet(\"transformed_deliver.parquet\", index=False)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "d = pd.read_parquet(\"transformed_deliver.parquet\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### duck db" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import duckdb\n", "from datetime import datetime, timedelta\n", "\n", "# Calculate the date for two months ago\n", "two_months_ago = (datetime.now() - timedelta(days=60)).strftime('%Y-%m-%d')\n", "\n", "# Connect to an in-memory DuckDB instance\n", "con = duckdb.connect(':memory:')\n", "\n", "# Perform a SQL query to select data from the past two months directly from the Parquet file\n", "query = f\"\"\"\n", "SELECT *\n", "FROM read_parquet('/Users/arshath/play/openautonomy/olas-prediction-live-dashboard_old/data/tools.parquet')\n", "WHERE request_time >= '{two_months_ago}'\n", "\"\"\"\n", "\n", "# Fetch the result as a pandas DataFrame\n", "df = con.execute(query).fetchdf()\n", "\n", "# Close the connection\n", "con.close()\n", "\n", "# Print the DataFrame\n", "print(df)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "akash", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.14" } }, "nbformat": 4, "nbformat_minor": 2 }