Spaces:
Sleeping
Sleeping
from typing import * | |
import torch | |
from polygraphy import cuda | |
from live2diff.animatediff.models.unet_depth_streaming import UNet3DConditionStreamingOutput | |
from .utilities import Engine | |
try: | |
from diffusers.models.autoencoder_tiny import AutoencoderTinyOutput | |
except ImportError: | |
from dataclasses import dataclass | |
from diffusers.utils import BaseOutput | |
class AutoencoderTinyOutput(BaseOutput): | |
""" | |
Output of AutoencoderTiny encoding method. | |
Args: | |
latents (`torch.Tensor`): Encoded outputs of the `Encoder`. | |
""" | |
latents: torch.Tensor | |
try: | |
from diffusers.models.vae import DecoderOutput | |
except ImportError: | |
from dataclasses import dataclass | |
from diffusers.utils import BaseOutput | |
class DecoderOutput(BaseOutput): | |
r""" | |
Output of decoding method. | |
Args: | |
sample (`torch.FloatTensor` of shape `(batch_size, num_channels, height, width)`): | |
The decoded output sample from the last layer of the model. | |
""" | |
sample: torch.FloatTensor | |
class AutoencoderKLEngine: | |
def __init__( | |
self, | |
encoder_path: str, | |
decoder_path: str, | |
stream: cuda.Stream, | |
scaling_factor: int, | |
use_cuda_graph: bool = False, | |
): | |
self.encoder = Engine(encoder_path) | |
self.decoder = Engine(decoder_path) | |
self.stream = stream | |
self.vae_scale_factor = scaling_factor | |
self.use_cuda_graph = use_cuda_graph | |
self.encoder.load() | |
self.decoder.load() | |
self.encoder.activate() | |
self.decoder.activate() | |
def encode(self, images: torch.Tensor, **kwargs): | |
self.encoder.allocate_buffers( | |
shape_dict={ | |
"images": images.shape, | |
"latent": ( | |
images.shape[0], | |
4, | |
images.shape[2] // self.vae_scale_factor, | |
images.shape[3] // self.vae_scale_factor, | |
), | |
}, | |
device=images.device, | |
) | |
latents = self.encoder.infer( | |
{"images": images}, | |
self.stream, | |
use_cuda_graph=self.use_cuda_graph, | |
)["latent"] | |
return AutoencoderTinyOutput(latents=latents) | |
def decode(self, latent: torch.Tensor, **kwargs): | |
self.decoder.allocate_buffers( | |
shape_dict={ | |
"latent": latent.shape, | |
"images": ( | |
latent.shape[0], | |
3, | |
latent.shape[2] * self.vae_scale_factor, | |
latent.shape[3] * self.vae_scale_factor, | |
), | |
}, | |
device=latent.device, | |
) | |
images = self.decoder.infer( | |
{"latent": latent}, | |
self.stream, | |
use_cuda_graph=self.use_cuda_graph, | |
)["images"] | |
return DecoderOutput(sample=images) | |
def to(self, *args, **kwargs): | |
pass | |
def forward(self, *args, **kwargs): | |
pass | |
class UNet2DConditionModelDepthEngine: | |
def __init__(self, filepath: str, stream: cuda.Stream, use_cuda_graph: bool = False): | |
self.engine = Engine(filepath) | |
self.stream = stream | |
self.use_cuda_graph = use_cuda_graph | |
self.init_profiler() | |
self.engine.load() | |
self.engine.activate(profiler=self.profiler) | |
self.has_allocated = False | |
def init_profiler(self): | |
import tensorrt | |
class Profiler(tensorrt.IProfiler): | |
def __init__(self): | |
tensorrt.IProfiler.__init__(self) | |
def report_layer_time(self, layer_name, ms): | |
print(f"{layer_name}: {ms} ms") | |
self.profiler = Profiler() | |
def __call__( | |
self, | |
latent_model_input: torch.Tensor, | |
timestep: torch.Tensor, | |
encoder_hidden_states: torch.Tensor, | |
temporal_attention_mask: torch.Tensor, | |
depth_sample: torch.Tensor, | |
kv_cache: List[torch.Tensor], | |
pe_idx: torch.Tensor, | |
update_idx: torch.Tensor, | |
**kwargs, | |
) -> Any: | |
if timestep.dtype != torch.float32: | |
timestep = timestep.float() | |
feed_dict = { | |
"sample": latent_model_input, | |
"timestep": timestep, | |
"encoder_hidden_states": encoder_hidden_states, | |
"temporal_attention_mask": temporal_attention_mask, | |
"depth_sample": depth_sample, | |
"pe_idx": pe_idx, | |
"update_idx": update_idx, | |
} | |
for idx, cache in enumerate(kv_cache): | |
feed_dict[f"kv_cache_{idx}"] = cache | |
shape_dict = {k: v.shape for k, v in feed_dict.items()} | |
if not self.has_allocated: | |
self.engine.allocate_buffers( | |
shape_dict=shape_dict, | |
device=latent_model_input.device, | |
) | |
self.has_allocated = True | |
output = self.engine.infer( | |
feed_dict, | |
self.stream, | |
use_cuda_graph=self.use_cuda_graph, | |
) | |
noise_pred = output["latent"] | |
kv_cache = [output[f"kv_cache_out_{idx}"] for idx in range(len(kv_cache))] | |
return UNet3DConditionStreamingOutput(sample=noise_pred, kv_cache=kv_cache) | |
def to(self, *args, **kwargs): | |
pass | |
def forward(self, *args, **kwargs): | |
pass | |
class MidasEngine: | |
def __init__(self, filepath: str, stream: cuda.Stream, use_cuda_graph: bool = False): | |
self.engine = Engine(filepath) | |
self.stream = stream | |
self.use_cuda_graph = use_cuda_graph | |
self.engine.load() | |
self.engine.activate() | |
self.has_allocated = False | |
self.default_batch_size = 1 | |
def __call__( | |
self, | |
images: torch.Tensor, | |
**kwargs, | |
) -> Any: | |
if not self.has_allocated or images.shape[0] != self.default_batch_size: | |
bz = images.shape[0] | |
self.engine.allocate_buffers( | |
shape_dict={ | |
"images": (bz, 3, 384, 384), | |
"depth_map": (bz, 384, 384), | |
}, | |
device=images.device, | |
) | |
self.has_allocated = True | |
self.default_batch_size = bz | |
depth_map = self.engine.infer( | |
{ | |
"images": images, | |
}, | |
self.stream, | |
use_cuda_graph=self.use_cuda_graph, | |
)["depth_map"] # (1, 384, 384) | |
return depth_map | |
def norm(self, x): | |
return (x - x.min()) / (x.max() - x.min()) | |
def to(self, *args, **kwargs): | |
pass | |
def forward(self, *args, **kwargs): | |
pass | |