#! fork: https://github.com/NVIDIA/TensorRT/blob/main/demo/Diffusion/models.py # # SPDX-FileCopyrightText: Copyright (c) 1993-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import gc import onnx import onnx_graphsurgeon as gs import torch from onnx import shape_inference from polygraphy.backend.onnx.loader import fold_constants class Optimizer: def __init__(self, onnx_path, verbose=False): self.graph = gs.import_onnx(onnx.load(onnx_path)) self.verbose = verbose def info(self, prefix): if self.verbose: print( f"{prefix} .. {len(self.graph.nodes)} nodes, {len(self.graph.tensors().keys())} tensors, {len(self.graph.inputs)} inputs, {len(self.graph.outputs)} outputs" ) def cleanup(self, return_onnx=False): self.graph.cleanup().toposort() if return_onnx: return gs.export_onnx(self.graph) def select_outputs(self, keep, names=None): self.graph.outputs = [self.graph.outputs[o] for o in keep] if names: for i, name in enumerate(names): self.graph.outputs[i].name = name def fold_constants(self, return_onnx=False): onnx_graph = fold_constants(gs.export_onnx(self.graph), allow_onnxruntime_shape_inference=True) self.graph = gs.import_onnx(onnx_graph) if return_onnx: return onnx_graph def infer_shapes(self, return_onnx=False): onnx_graph = gs.export_onnx(self.graph) if onnx_graph.ByteSize() > 2147483648: raise TypeError(f"ERROR: model size exceeds supported 2GB limit, {onnx_graph.ByteSize() / 2147483648}") else: onnx_graph = shape_inference.infer_shapes(onnx_graph) self.graph = gs.import_onnx(onnx_graph) if return_onnx: return onnx_graph def infer_shapes_with_external(self, save_path, return_onnx=False): # https://github.com/onnx/onnx/blob/main/docs/PythonAPIOverview.md#running-shape-inference-on-an-onnx-model onnx_graph = gs.export_onnx(self.graph) onnx.save_model( onnx_graph, save_path, save_as_external_data=True, all_tensors_to_one_file=False, size_threshold=1024, ) shape_inference.infer_shapes_path(save_path, save_path) self.graph = gs.import_onnx(onnx.load(save_path)) if return_onnx: return onnx.load(save_path) class BaseModel: def __init__( self, fp16=False, device="cuda", verbose=True, max_batch_size=16, min_batch_size=1, embedding_dim=768, text_maxlen=77, ): self.name = "SD Model" self.fp16 = fp16 self.device = device self.verbose = verbose self.min_batch = min_batch_size self.max_batch = max_batch_size self.min_image_shape = 256 # min image resolution: 256x256 self.max_image_shape = 1024 # max image resolution: 1024x1024 self.min_latent_shape = self.min_image_shape // 8 self.max_latent_shape = self.max_image_shape // 8 self.embedding_dim = embedding_dim self.text_maxlen = text_maxlen def get_model(self): pass def get_input_names(self): pass def get_output_names(self): pass def get_dynamic_axes(self): return None def get_sample_input(self, batch_size, image_height, image_width): pass def get_input_profile(self, batch_size, image_height, image_width, static_batch, static_shape): return None def get_shape_dict(self, batch_size, image_height, image_width): return None def optimize(self, onnx_path, onnx_opt_path): opt = Optimizer(onnx_path, verbose=self.verbose) opt.info(self.name + ": original") opt.cleanup() opt.info(self.name + ": cleanup") opt.fold_constants() opt.info(self.name + ": fold constants") opt.infer_shapes() opt.info(self.name + ": shape inference") onnx_opt_graph = opt.cleanup(return_onnx=True) opt.info(self.name + ": finished") onnx.save(onnx_opt_graph, onnx_opt_path) opt.info(self.name + f": saved to {onnx_opt_path}") del onnx_opt_graph gc.collect() torch.cuda.empty_cache() def check_dims(self, batch_size, image_height, image_width): assert batch_size >= self.min_batch and batch_size <= self.max_batch assert image_height % 8 == 0 or image_width % 8 == 0 latent_height = image_height // 8 latent_width = image_width // 8 assert latent_height >= self.min_latent_shape and latent_height <= self.max_latent_shape assert latent_width >= self.min_latent_shape and latent_width <= self.max_latent_shape return (latent_height, latent_width) def get_minmax_dims(self, batch_size, image_height, image_width, static_batch, static_shape): min_batch = batch_size if static_batch else self.min_batch max_batch = batch_size if static_batch else self.max_batch latent_height = image_height // 8 latent_width = image_width // 8 min_image_height = image_height if static_shape else self.min_image_shape max_image_height = image_height if static_shape else self.max_image_shape min_image_width = image_width if static_shape else self.min_image_shape max_image_width = image_width if static_shape else self.max_image_shape min_latent_height = latent_height if static_shape else self.min_latent_shape max_latent_height = latent_height if static_shape else self.max_latent_shape min_latent_width = latent_width if static_shape else self.min_latent_shape max_latent_width = latent_width if static_shape else self.max_latent_shape return ( min_batch, max_batch, min_image_height, max_image_height, min_image_width, max_image_width, min_latent_height, max_latent_height, min_latent_width, max_latent_width, ) class CLIP(BaseModel): def __init__(self, device, max_batch_size, embedding_dim, min_batch_size=1): super(CLIP, self).__init__( device=device, max_batch_size=max_batch_size, min_batch_size=min_batch_size, embedding_dim=embedding_dim, ) self.name = "CLIP" def get_input_names(self): return ["input_ids"] def get_output_names(self): return ["text_embeddings", "pooler_output"] def get_dynamic_axes(self): return {"input_ids": {0: "B"}, "text_embeddings": {0: "B"}} def get_input_profile(self, batch_size, image_height, image_width, static_batch, static_shape): self.check_dims(batch_size, image_height, image_width) min_batch, max_batch, _, _, _, _, _, _, _, _ = self.get_minmax_dims( batch_size, image_height, image_width, static_batch, static_shape ) return { "input_ids": [ (min_batch, self.text_maxlen), (batch_size, self.text_maxlen), (max_batch, self.text_maxlen), ] } def get_shape_dict(self, batch_size, image_height, image_width): self.check_dims(batch_size, image_height, image_width) return { "input_ids": (batch_size, self.text_maxlen), "text_embeddings": (batch_size, self.text_maxlen, self.embedding_dim), } def get_sample_input(self, batch_size, image_height, image_width): self.check_dims(batch_size, image_height, image_width) return torch.zeros(batch_size, self.text_maxlen, dtype=torch.int32, device=self.device) def optimize(self, onnx_path, onnx_opt_path): opt = Optimizer(onnx_path) opt.info(self.name + ": original") opt.select_outputs([0]) # delete graph output#1 opt.cleanup() opt.info(self.name + ": remove output[1]") opt.fold_constants() opt.info(self.name + ": fold constants") opt.infer_shapes() opt.info(self.name + ": shape inference") opt.select_outputs([0], names=["text_embeddings"]) # rename network output opt.info(self.name + ": remove output[0]") onnx_opt_graph = opt.cleanup(return_onnx=True) opt.info(self.name + ": finished") onnx.save(onnx_opt_graph, onnx_opt_path) opt.info(self.name + f": saved to {onnx_opt_path}") del onnx_opt_graph gc.collect() torch.cuda.empty_cache() class InflatedUNetDepth(BaseModel): def __init__( self, fp16=False, device="cuda", max_batch_size=16, min_batch_size=1, embedding_dim=768, text_maxlen=77, unet_dim=4, kv_cache_list=None, ): super().__init__( fp16=fp16, device=device, max_batch_size=max_batch_size, min_batch_size=min_batch_size, embedding_dim=embedding_dim, text_maxlen=text_maxlen, ) self.kv_cache_list = kv_cache_list self.unet_dim = unet_dim self.name = "UNet" self.streaming_length = 1 self.window_size = 16 def get_input_names(self): input_list = ["sample", "timestep", "encoder_hidden_states", "temporal_attention_mask", "depth_sample"] input_list += [f"kv_cache_{i}" for i in range(len(self.kv_cache_list))] input_list += ["pe_idx", "update_idx"] return input_list def get_output_names(self): output_list = ["latent"] output_list += [f"kv_cache_out_{i}" for i in range(len(self.kv_cache_list))] return output_list def get_dynamic_axes(self): # NOTE: disable dynamic axes return {} def get_input_profile(self, batch_size, image_height, image_width, static_batch, static_shape): latent_height, latent_width = self.check_dims(batch_size, image_height, image_width) ( min_batch, max_batch, _, _, _, _, min_latent_height, max_latent_height, min_latent_width, max_latent_width, ) = self.get_minmax_dims(batch_size, image_height, image_width, static_batch, static_shape) input_profile = { "sample": [ (min_batch, self.unet_dim, self.streaming_length, min_latent_height, min_latent_width), (batch_size, self.unet_dim, self.streaming_length, latent_height, latent_width), (max_batch, self.unet_dim, self.streaming_length, max_latent_height, max_latent_width), ], "timestep": [(min_batch,), (batch_size,), (max_batch,)], "encoder_hidden_states": [ (min_batch, self.text_maxlen, self.embedding_dim), (batch_size, self.text_maxlen, self.embedding_dim), (max_batch, self.text_maxlen, self.embedding_dim), ], "temporal_attention_mask": [ (min_batch, self.window_size), (batch_size, self.window_size), (max_batch, self.window_size), ], "depth_sample": [ (min_batch, self.unet_dim, self.streaming_length, min_latent_height, min_latent_width), (batch_size, self.unet_dim, self.streaming_length, latent_height, latent_width), (max_batch, self.unet_dim, self.streaming_length, max_latent_height, max_latent_width), ], } for idx, tensor in enumerate(self.kv_cache_list): input_profile[f"kv_cache_{idx}"] = [tuple(tensor.shape)] * 3 input_profile["pe_idx"] = [ (min_batch, self.window_size), (batch_size, self.window_size), (max_batch, self.window_size), ] input_profile["update_idx"] = [ (min_batch,), (batch_size,), (max_batch,), ] return input_profile def get_sample_input(self, batch_size, image_height, image_width): latent_height, latent_width = self.check_dims(batch_size, image_height, image_width) dtype = torch.float16 if self.fp16 else torch.float32 attn_mask = torch.zeros((batch_size, self.window_size), dtype=torch.bool, device=self.device) attn_mask[:, :8] = True attn_mask[0, -1] = True attn_bias = torch.zeros_like(attn_mask, dtype=dtype, device=self.device) attn_bias.masked_fill_(attn_mask.logical_not(), float("-inf")) pe_idx = torch.arange(self.window_size).unsqueeze(0).repeat(batch_size, 1).cuda() update_idx = torch.ones(batch_size, dtype=torch.int64).cuda() * 8 update_idx[1] = 8 + 1 return ( torch.randn( batch_size, self.unet_dim, self.streaming_length, latent_height, latent_width, dtype=dtype, device=self.device, ), torch.ones((batch_size,), dtype=dtype, device=self.device), torch.randn(batch_size, self.text_maxlen, self.embedding_dim, dtype=dtype, device=self.device), attn_bias, torch.randn( batch_size, self.unet_dim, self.streaming_length, latent_height, latent_width, dtype=dtype, device=self.device, ), self.kv_cache_list, pe_idx, update_idx, ) def optimize(self, onnx_path, onnx_opt_path): """Onnx graph optimization function for model with external data.""" opt = Optimizer(onnx_path, verbose=self.verbose) opt.info(self.name + ": original") opt.cleanup() opt.info(self.name + ": cleanup") opt.fold_constants() opt.info(self.name + ": fold constants") opt.infer_shapes_with_external(onnx_opt_path) opt.info(self.name + ": shape inference") onnx_opt_graph = opt.cleanup(return_onnx=True) opt.info(self.name + ": finished") onnx.save( onnx_opt_graph, onnx_opt_path, save_as_external_data=True, all_tensors_to_one_file=False, size_threshold=1024, ) opt.info(self.name + f": saved to {onnx_opt_path}") del onnx_opt_graph gc.collect() torch.cuda.empty_cache() class Midas(BaseModel): def __init__( self, fp16=False, device="cuda", max_batch_size=16, min_batch_size=1, embedding_dim=768, text_maxlen=77, ): super().__init__( fp16=fp16, device=device, max_batch_size=max_batch_size, min_batch_size=min_batch_size, embedding_dim=embedding_dim, text_maxlen=text_maxlen, ) self.img_dim = 3 self.name = "midas" def get_input_names(self): return ["images"] def get_output_names(self): return ["depth_map"] def get_dynamic_axes(self): return { "images": {0: "F"}, "depth_map": {0: "F"}, } def get_input_profile(self, batch_size, image_height, image_width, static_batch, static_shape): latent_height, latent_width = self.check_dims(batch_size, image_height, image_width) ( min_batch, max_batch, _, _, _, _, min_latent_height, max_latent_height, min_latent_width, max_latent_width, ) = self.get_minmax_dims(batch_size, image_height, image_width, static_batch, static_shape) return { "images": [ (min_batch, self.img_dim, image_height, image_width), (batch_size, self.img_dim, image_height, image_width), (max_batch, self.img_dim, image_height, image_width), ], } def get_sample_input(self, batch_size, image_height, image_width): dtype = torch.float16 if self.fp16 else torch.float32 return torch.randn(batch_size, self.img_dim, image_height, image_width, dtype=dtype, device=self.device) class VAE(BaseModel): def __init__(self, device, max_batch_size, min_batch_size=1): super(VAE, self).__init__( device=device, max_batch_size=max_batch_size, min_batch_size=min_batch_size, embedding_dim=None, ) self.name = "VAE decoder" def get_input_names(self): return ["latent"] def get_output_names(self): return ["images"] def get_dynamic_axes(self): return { "latent": {0: "B", 2: "H", 3: "W"}, "images": {0: "B", 2: "8H", 3: "8W"}, } def get_input_profile(self, batch_size, image_height, image_width, static_batch, static_shape): latent_height, latent_width = self.check_dims(batch_size, image_height, image_width) ( min_batch, max_batch, _, _, _, _, min_latent_height, max_latent_height, min_latent_width, max_latent_width, ) = self.get_minmax_dims(batch_size, image_height, image_width, static_batch, static_shape) return { "latent": [ (min_batch, 4, min_latent_height, min_latent_width), (batch_size, 4, latent_height, latent_width), (max_batch, 4, max_latent_height, max_latent_width), ] } def get_shape_dict(self, batch_size, image_height, image_width): latent_height, latent_width = self.check_dims(batch_size, image_height, image_width) return { "latent": (batch_size, 4, latent_height, latent_width), "images": (batch_size, 3, image_height, image_width), } def get_sample_input(self, batch_size, image_height, image_width): latent_height, latent_width = self.check_dims(batch_size, image_height, image_width) return torch.randn( batch_size, 4, latent_height, latent_width, dtype=torch.float32, device=self.device, ) class VAEEncoder(BaseModel): def __init__(self, device, max_batch_size, min_batch_size=1): super(VAEEncoder, self).__init__( device=device, max_batch_size=max_batch_size, min_batch_size=min_batch_size, embedding_dim=None, ) self.name = "VAE encoder" def get_input_names(self): return ["images"] def get_output_names(self): return ["latent"] def get_dynamic_axes(self): return { "images": {0: "B", 2: "8H", 3: "8W"}, "latent": {0: "B", 2: "H", 3: "W"}, } def get_input_profile(self, batch_size, image_height, image_width, static_batch, static_shape): assert batch_size >= self.min_batch and batch_size <= self.max_batch min_batch = batch_size if static_batch else self.min_batch max_batch = batch_size if static_batch else self.max_batch self.check_dims(batch_size, image_height, image_width) ( min_batch, max_batch, min_image_height, max_image_height, min_image_width, max_image_width, _, _, _, _, ) = self.get_minmax_dims(batch_size, image_height, image_width, static_batch, static_shape) return { "images": [ (min_batch, 3, min_image_height, min_image_width), (batch_size, 3, image_height, image_width), (max_batch, 3, max_image_height, max_image_width), ], } def get_shape_dict(self, batch_size, image_height, image_width): latent_height, latent_width = self.check_dims(batch_size, image_height, image_width) return { "images": (batch_size, 3, image_height, image_width), "latent": (batch_size, 4, latent_height, latent_width), } def get_sample_input(self, batch_size, image_height, image_width): self.check_dims(batch_size, image_height, image_width) return torch.randn( batch_size, 3, image_height, image_width, dtype=torch.float32, device=self.device, )