rosacastillo's picture
updating week format starting on Monday, new staking contracts and new weekly data
285f2a6
raw
history blame
5.77 kB
import logging
from datetime import datetime
import pandas as pd
from markets import etl as mkt_etl, DEFAULT_FILENAME as MARKETS_FILENAME, fpmmTrades_etl
from tools import DEFAULT_FILENAME as TOOLS_FILENAME, generate_tools_file
from profitability import run_profitability_analysis, add_trades_profitability
from utils import (
get_question,
current_answer,
RPC,
measure_execution_time,
DATA_DIR,
HIST_DIR,
TMP_DIR,
)
from get_mech_info import (
get_mech_events_since_last_run,
update_fpmmTrades_parquet,
update_json_files,
)
from update_tools_accuracy import compute_tools_accuracy
from cleaning_old_info import clean_old_data_from_parquet_files
from web3_utils import updating_timestamps
from manage_space_files import move_files
from cloud_storage import upload_historical_file
from tools_metrics import compute_tools_based_datasets
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
def add_current_answer(tools_filename: str):
# Get currentAnswer from FPMMS
fpmms = pd.read_parquet(DATA_DIR / MARKETS_FILENAME)
tools = pd.read_parquet(DATA_DIR / tools_filename)
# Get the question from the tools
logging.info("Getting the question and current answer for the tools")
tools["title"] = tools["prompt_request"].apply(lambda x: get_question(x))
tools["currentAnswer"] = tools["title"].apply(lambda x: current_answer(x, fpmms))
tools["currentAnswer"] = tools["currentAnswer"].str.replace("yes", "Yes")
tools["currentAnswer"] = tools["currentAnswer"].str.replace("no", "No")
# Save the tools data after the updates on the content
tools.to_parquet(DATA_DIR / tools_filename, index=False)
del fpmms
def save_historical_data():
"""Function to save a copy of the main trades and tools file
into the historical folder"""
print("Saving historical data copies")
current_datetime = datetime.now()
timestamp = current_datetime.strftime("%Y%m%d_%H%M%S")
try:
tools = pd.read_parquet(TMP_DIR / "tools.parquet")
filename = f"tools_{timestamp}.parquet"
tools.to_parquet(HIST_DIR / filename, index=False)
# save into cloud storage
upload_historical_file(filename)
except Exception as e:
print(f"Error saving tools file in the historical folder {e}")
try:
all_trades = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet")
filename = f"all_trades_profitability_{timestamp}.parquet"
all_trades.to_parquet(HIST_DIR / filename, index=False)
# save into cloud storage
upload_historical_file(filename)
except Exception as e:
print(
f"Error saving all_trades_profitability file in the historical folder {e}"
)
@measure_execution_time
def only_new_weekly_analysis():
"""Run weekly analysis for the FPMMS project."""
rpc = RPC
# Run markets ETL
logging.info("Running markets ETL")
mkt_etl(MARKETS_FILENAME)
logging.info("Markets ETL completed")
# Mech events ETL
logging.info("Generating the mech json files")
# get only new data
latest_timestamp = get_mech_events_since_last_run(logger)
if latest_timestamp == None:
print("Error while getting the mech events")
return
logging.info(f"Finished generating the mech json files from {latest_timestamp}")
# FpmmTrades ETL
fpmmTrades_etl(
trades_filename="new_fpmmTrades.parquet",
from_timestamp=int(latest_timestamp.timestamp()),
)
# merge with previous file
print("Merging with previous fpmmTrades file")
update_fpmmTrades_parquet(trades_filename="new_fpmmTrades.parquet")
# Run tools ETL
logging.info("Generate and parse the tools content")
# generate only new file
generate_tools_file("new_tools_info.json", "new_tools.parquet")
logging.info("Tools ETL completed")
add_current_answer("new_tools.parquet")
# # Run profitability analysis
logging.info("Running profitability analysis")
run_profitability_analysis(
tools_filename="new_tools.parquet",
trades_filename="new_fpmmTrades.parquet",
merge=True,
)
logging.info("Profitability analysis completed")
# merge new json files with old json files
update_json_files()
save_historical_data()
try:
clean_old_data_from_parquet_files("2024-11-19")
except Exception as e:
print("Error cleaning the oldest information from parquet files")
print(f"reason = {e}")
compute_tools_accuracy()
compute_tools_based_datasets()
# # move to tmp folder the new generated files
move_files()
logging.info("Weekly analysis files generated and saved")
def restoring_trades_data(from_date: str, to_date: str):
# Convert the string to datetime64[ns, UTC]
min_date_utc = pd.to_datetime(from_date, format="%Y-%m-%d", utc=True)
max_date_utc = pd.to_datetime(to_date, format="%Y-%m-%d", utc=True)
logging.info("Running markets ETL")
mkt_etl(MARKETS_FILENAME)
logging.info("Markets ETL completed")
fpmmTrades_etl(
trades_filename="missing_fpmmTrades.parquet",
from_timestamp=int(min_date_utc.timestamp()),
to_timestamp=int(max_date_utc.timestamp()),
)
# merge with the old file
print("Merging with previous fpmmTrades file")
update_fpmmTrades_parquet(trades_filename="missing_fpmmTrades.parquet")
# adding tools information
add_trades_profitability(trades_filename="missing_fpmmTrades.parquet")
if __name__ == "__main__":
only_new_weekly_analysis()
# restoring_trades_data("2024-12-28", "2025-01-07")