Jonas Becker
1st try
7f19394
# Pip-Packages -----------------------------------------------------
import importlib
import os
import sys
from datetime import datetime
from pathlib import Path
import numpy as np
import pandas as pd
import torch
from torch import optim
from torch.utils.data import DataLoader
# From local package -----------------------------------------------
from disvae.models.losses import get_loss_f
from disvae.models.vae import init_specific_model
from disvae.training import Trainer
from disvae.utils.modelIO import save_model
# Loss stuff:
def parse_losses(p_model, filename="train_losses.log"):
df = pd.read_csv(Path(p_model) / filename)
losses = df["Loss"].unique()
rtn = [np.array(df[df["Loss"] == l]["Value"]) for l in losses]
rtn = pd.DataFrame(np.array(rtn).T, columns=losses)
return rtn
def get_kl_loss_latent(df):
"""df muss bereits geparsed sein!"""
rtn = {int(c.split("_")[-1]): df[c].iloc[-1] for c in df if "kl_loss_" in c}
rtn = dict(sorted(rtn.items(), key=lambda item: item[1], reverse=True))
return rtn
def get_kl_dict(p_model):
df = parse_losses(p_model)
rtn = get_kl_loss_latent(df)
return rtn
# Datalaader convinience stuff
# def get_dataloader(dataset: torch.data.Dataset, batch_size, num_workers):
# # Funktion ist recht kompliziert. Das geht im Notebook schnell
# # Diese Dinge werden auch zur Visualisierung des Datasets benötigt
# # p_dataset_module, dataset_class, dataset_args
# # Import module
# # if p_dataset_module not in sys.path:
# # sys.path.append(str(Path(p_dataset_module).parent))
# # Dataset = getattr(
# # importlib.import_module(Path(p_dataset_module).stem), dataset_class
# # )
# # # Ab hier an, wenn das normal importiert würde
# # ds = Dataset(**dataset_args)
#
# return loader
def get_export_dir(base_dir: str, folder_name):
if folder_name is None:
folder_name = "Model_" + (
datetime.now().replace(microsecond=0).isoformat()
).replace(" ", "_").replace(":", "-")
rtn = Path(base_dir) / folder_name
if not rtn.exists():
os.makedirs(rtn)
else:
raise ValueError("Output directory already exists.")
return rtn
def train_model(model, data_loader, loss_f, device, lr, epochs, export_dir):
trainer = Trainer(
model,
optim.Adam(model.parameters(), lr=lr),
loss_f,
device=device,
# logger=logger,
save_dir=export_dir,
is_progress_bar=True,
) # ,
# gif_visualizer=gif_visualizer)
trainer(data_loader, epochs=epochs, checkpoint_every=10)
save_model(trainer.model, export_dir)
# , metadata=config) # Speichern passiert auch schon vorher
# gif_visualizer = GifTraversalsTraining(model, args.dataset, exp_dir)
def train(dataset, config) -> str:
# Validate Config?
print("1) Set device")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device:\t\t {device}")
print("2) Get dataloader")
dataloader = DataLoader(
dataset,
batch_size=config["data_params"]["batch_size"],
shuffle=True,
pin_memory=torch.cuda.is_available,
num_workers=config["data_params"]["num_workers"],
)
print("3) Build model")
img_size = list(dataloader.dataset[0][0].shape)
print(f"Image size: \t {img_size}")
model = init_specific_model(img_size=img_size, **config["model_params"])
model = model.to(device) # make sure trainer and viz on same device
print("4) Build loss function")
loss_f = get_loss_f(
n_data=len(dataloader.dataset), device=device, **config["loss_params"]
)
print("5) Parse Export Params")
export_dir = get_export_dir(**config["export_params"])
print("6) Training model")
train_model(
model=model,
data_loader=dataloader,
loss_f=loss_f,
device=device,
export_dir=export_dir,
**config["trainer_params"],
)
return export_dir