# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # This source code is licensed under the license found in the # LICENSE file in the root directory of this source tree. import torch torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.allow_tf32 = True import argparse import os import numpy as np import torch import yaml from tqdm import tqdm from diffusion import create_diffusion from models import DiT_models def find_model(model_name): assert os.path.isfile(model_name), f"Could not find DiT checkpoint at {model_name}" checkpoint = torch.load(model_name, map_location=lambda storage, loc: storage) if "ema" in checkpoint: # supports checkpoints from train.py print("Using EMA model") checkpoint = checkpoint["ema"] else: print("Using model") checkpoint = checkpoint["model"] return checkpoint def get_batch( step, batch_size, seq_len, DEVICE, data_file, data_dim, data_mean, data_std ): # Load dataset from memmap file arr = np.memmap(data_file, dtype=np.float16, mode="r") arr = np.memmap( data_file, dtype=np.float16, mode="r", shape=(arr.shape[0] // (data_dim + 3), data_dim + 3), ) # Create random number generator rng = np.random.Generator(np.random.PCG64(seed=step)) # Generate start indices and convert to integer array start_indices = rng.choice( arr.shape[0] - seq_len, size=batch_size, replace=False ).astype(np.int64) # Create batch data array batch_data = np.zeros((batch_size, seq_len, data_dim + 3), dtype=np.float16) # Fill batch data one sequence at a time for i, start_idx in enumerate(start_indices): batch_data[i] = arr[start_idx : start_idx + seq_len] # Extract features x = batch_data[:, :, :data_dim].astype(np.float16) x = np.moveaxis(x, 1, 2) phone = batch_data[:, :, data_dim].astype(np.int32) speaker_id = batch_data[:, :, data_dim + 1].astype(np.int32) phone_kind = batch_data[:, :, data_dim + 2].astype(np.int32) # convert to torch tensors x = torch.from_numpy(x).to(DEVICE) x = (x - data_mean) / data_std phone = torch.from_numpy(phone).to(DEVICE) speaker_id = torch.from_numpy(speaker_id).to(DEVICE) phone_kind = torch.from_numpy(phone_kind).to(DEVICE) return x, speaker_id, phone, phone_kind def get_data(config_path, seed=0): with open(config_path, "r") as f: config = yaml.safe_load(f) data_config = config["data"] model_config = config["model"] device = "cuda" # if torch.cuda.is_available() else "cpu" x, speaker_id, phone, phone_kind = get_batch( seed, 1, seq_len=model_config["input_size"], DEVICE=device, data_file=data_config["data_path"], data_dim=data_config["data_dim"], data_mean=data_config["data_mean"], data_std=data_config["data_std"], ) return x, speaker_id, phone, phone_kind def plot_samples(samples, x): # Create figure and axis fig, ax = plt.subplots(figsize=(20, 4)) plt.tight_layout() # Function to update frame def update(frame): ax.clear() ax.text( 0.02, 0.98, f"{frame+1} / 1000", transform=ax.transAxes, verticalalignment="top", color="black", ) if samples[frame].shape[1] > 1: im = ax.imshow( samples[frame].cpu().numpy()[0], origin="lower", aspect="auto", interpolation="none", vmin=-5, vmax=5, ) return [im] elif samples[frame].shape[1] == 1: line1 = ax.plot(samples[frame].cpu().numpy()[0, 0])[0] line2 = ax.plot(x.cpu().numpy()[0, 0])[0] plt.ylim(-10, 10) return [line1, line2] # Create animation with progress bar anim = animation.FuncAnimation( fig, update, frames=tqdm(range(len(samples)), desc="Generating animation"), interval=1000 / 60, blit=True, # 24 fps ) # Save as MP4 anim.save("animation.mp4", fps=60, extra_args=["-vcodec", "libx264"]) plt.close() model_cache = {} def sample( config_path, ckpt_path, cfg_scale=4.0, num_sampling_steps=1000, seed=0, speaker_id=None, phone=None, phone_kind=None, ): global model_cache torch.manual_seed(seed) torch.set_grad_enabled(False) device = "cuda" # if torch.cuda.is_available() else "cpu" with open(config_path, "r") as f: config = yaml.safe_load(f) data_config = config["data"] model_config = config["model"] if ckpt_path not in model_cache: # Load model: model = DiT_models[model_config["name"]]( input_size=model_config["input_size"], embedding_vocab_size=model_config["embedding_vocab_size"], learn_sigma=model_config["learn_sigma"], in_channels=data_config["data_dim"], ).to(device).bfloat16 state_dict = find_model(ckpt_path) model.load_state_dict(state_dict) model.eval() # important! model = model.bfloat16() model_cache[ckpt_path] = model else: model = model_cache[ckpt_path] diffusion = create_diffusion(str(num_sampling_steps)) n = 1 z = torch.randn(n, data_config["data_dim"], speaker_id.shape[1], device=device) attn_mask = speaker_id[:, None, :] == speaker_id[:, :, None] attn_mask = attn_mask.unsqueeze(1) attn_mask = torch.cat([attn_mask, attn_mask], 0) # Setup classifier-free guidance: z = torch.cat([z, z], 0) unconditional_value = model.y_embedder.unconditional_value phone_null = torch.full_like(phone, unconditional_value) speaker_id_null = torch.full_like(speaker_id, unconditional_value) phone = torch.cat([phone, phone_null], 0) speaker_id = torch.cat([speaker_id, speaker_id_null], 0) phone_kind_null = torch.full_like(phone_kind, unconditional_value) phone_kind = torch.cat([phone_kind, phone_kind_null], 0) model_kwargs = dict( phone=phone, speaker_id=speaker_id, phone_kind=phone_kind, cfg_scale=cfg_scale, attn_mask=attn_mask, ) with torch.no_grad(): with torch.autocast(device_type="cuda", dtype=torch.bfloat16): samples = diffusion.p_sample_loop( model.forward_with_cfg, z.shape, z, clip_denoised=False, model_kwargs=model_kwargs, progress=True, device=device, ) samples = [s.chunk(2, dim=0)[0] for s in samples] # Remove null class samples return samples if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--config", type=str, required=True) parser.add_argument("--ckpt", type=str, required=True) parser.add_argument("--cfg-scale", type=float, default=4.0) parser.add_argument("--num-sampling-steps", type=int, default=1000) parser.add_argument("--seed", type=int, default=0) args = parser.parse_args() x, speaker_id, phone, phone_kind = get_data(args.config, args.seed) samples = sample( args.config, args.ckpt, args.cfg_scale, args.num_sampling_steps, args.seed, speaker_id, phone, phone_kind, ) plot_samples(samples, x)