edge-flux-sayan-4 / src /pipeline.py
loveisgone's picture
Upload folder using huggingface_hub
be08337 verified
from diffusers import FluxPipeline, AutoencoderKL, AutoencoderTiny
from diffusers.image_processor import VaeImageProcessor
from diffusers.schedulers import FlowMatchEulerDiscreteScheduler
from huggingface_hub.constants import HF_HUB_CACHE
from transformers import T5EncoderModel, T5TokenizerFast, CLIPTokenizer, CLIPTextModel
import torch
import torch._dynamo
import gc
from PIL import Image as img
from PIL.Image import Image
from pipelines.models import TextToImageRequest
from torch import Generator
import time
from diffusers import DiffusionPipeline
from torchao.quantization import quantize_, int8_weight_only, fpx_weight_only
import torch
import math
from typing import Type, Dict, Any, Tuple, Callable, Optional, Union
import ghanta
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from diffusers.configuration_utils import ConfigMixin, register_to_config
from diffusers.loaders import FromOriginalModelMixin, PeftAdapterMixin
from diffusers.models.attention import FeedForward
from diffusers.models.attention_processor import (
Attention,
AttentionProcessor,
FluxAttnProcessor2_0,
FusedFluxAttnProcessor2_0,
)
from diffusers.models.modeling_utils import ModelMixin
from diffusers.models.normalization import AdaLayerNormContinuous, AdaLayerNormZero, AdaLayerNormZeroSingle
from diffusers.utils import USE_PEFT_BACKEND, is_torch_version, logging, scale_lora_layers, unscale_lora_layers
from diffusers.utils.import_utils import is_torch_npu_available
from diffusers.utils.torch_utils import maybe_allow_in_graph
from diffusers.models.embeddings import CombinedTimestepGuidanceTextProjEmbeddings, CombinedTimestepTextProjEmbeddings, FluxPosEmbed
from diffusers.models.modeling_outputs import Transformer2DModelOutput
import os
os.environ['PYTORCH_CUDA_ALLOC_CONF']="expandable_segments:True"
os.environ["TOKENIZERS_PARALLELISM"] = "True"
torch._dynamo.config.suppress_errors = True
class BasicQuantization:
def __init__(self, bits=1):
self.bits = bits
self.qmin = -(2**(bits-1))
self.qmax = 2**(bits-1) - 1
def quantize_tensor(self, tensor):
scale = (tensor.max() - tensor.min()) / (self.qmax - self.qmin)
zero_point = self.qmin - torch.round(tensor.min() / scale)
qtensor = torch.round(tensor / scale + zero_point)
qtensor = torch.clamp(qtensor, self.qmin, self.qmax)
return (qtensor - zero_point) * scale, scale, zero_point
class ModelQuantization:
def __init__(self, model, bits=7):
self.model = model
self.quant = BasicQuantization(bits)
def quantize_model(self):
for name, module in self.model.named_modules():
if isinstance(module, torch.nn.Linear):
if hasattr(module, 'weightML'):
quantized_weight, _, _ = self.quant.quantize_tensor(module.weight)
module.weight = torch.nn.Parameter(quantized_weight)
if hasattr(module, 'bias') and module.bias is not None:
quantized_bias, _, _ = self.quant.quantize_tensor(module.bias)
module.bias = torch.nn.Parameter(quantized_bias)
def inicializar_generador(dispositivo: torch.device, respaldo: torch.Generator = None):
if dispositivo.type == "cpu":
return torch.Generator(device="cpu").set_state(torch.get_rng_state())
elif dispositivo.type == "cuda":
return torch.Generator(device=dispositivo).set_state(torch.cuda.get_rng_state())
else:
if respaldo is None:
return inicializar_generador(torch.device("cpu"))
else:
return respaldo
def calcular_fusion(x: torch.Tensor, info_tome: Dict[str, Any]) -> Tuple[Callable, ...]:
alto_original, ancho_original = info_tome["size"]
tokens_originales = alto_original * ancho_original
submuestreo = int(math.ceil(math.sqrt(tokens_originales // x.shape[1])))
argumentos = info_tome["args"]
if submuestreo <= argumentos["down"]:
ancho = int(math.ceil(ancho_original / submuestreo))
alto = int(math.ceil(alto_original / submuestreo))
radio = int(x.shape[1] * argumentos["ratio"])
if argumentos["generator"] is None:
argumentos["generator"] = inicializar_generador(x.device)
elif argumentos["generator"].device != x.device:
argumentos["generator"] = inicializar_generador(x.device, respaldo=argumentos["generator"])
usar_aleatoriedad = argumentos["rando"]
fusion, desfusion = ghanta.emparejamiento_suave_aleatorio_2d(
x, ancho, alto, argumentos["sx"], argumentos["sy"], radio,
sin_aleatoriedad=not usar_aleatoriedad, generador=argumentos["generator"]
)
else:
fusion, desfusion = (hacer_nada, hacer_nada)
fusion_a, desfusion_a = (fusion, desfusion) if argumentos["m1"] else (hacer_nada, hacer_nada)
fusion_c, desfusion_c = (fusion, desfusion) if argumentos["m2"] else (hacer_nada, hacer_nada)
fusion_m, desfusion_m = (fusion, desfusion) if argumentos["m3"] else (hacer_nada, hacer_nada)
return fusion_a, fusion_c, fusion_m, desfusion_a, desfusion_c, desfusion_m
from diffusers import FluxPipeline, FluxTransformer2DModel
Pipeline = None
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.enabled = True
torch.backends.cudnn.benchmark = True
ckpt_id = "black-forest-labs/FLUX.1-schnell"
ckpt_revision = "741f7c3ce8b383c54771c7003378a50191e9efe9"
TinyVAE = "madebyollin/taef1"
TinyVAE_REV = "2d552378e58c9c94201075708d7de4e1163b2689"
def empty_cache():
gc.collect()
torch.cuda.empty_cache()
torch.cuda.reset_max_memory_allocated()
torch.cuda.reset_peak_memory_stats()
def load_pipeline() -> Pipeline:
path = os.path.join(HF_HUB_CACHE, "models--manbeast3b--flux.1-schnell-full1/snapshots/cb1b599b0d712b9aab2c4df3ad27b050a27ec146/transformer")
transformer = FluxTransformer2DModel.from_pretrained(path, torch_dtype=torch.bfloat16, use_safetensors=False)
pipeline = FluxPipeline.from_pretrained(ckpt_id, revision=ckpt_revision, transformer=transformer, local_files_only=True, torch_dtype=torch.bfloat16,)
pipeline.to("cuda")
quantize_(pipeline.vae, int8_weight_only())
pipeline.transformer = torch.compile(pipeline.transformer, mode="max-autotune", fullgraph=True)
for _ in range(3):
pipeline(prompt="insensible, timbale, pothery, electrovital, actinogram, taxis, intracerebellar, centrodesmus", width=1024, height=1024, guidance_scale=0.0, num_inference_steps=4, max_sequence_length=256)
return pipeline
sample = None
@torch.no_grad()
def infer(request: TextToImageRequest, pipeline: Pipeline, generator: Generator) -> Image:
global sample
if not sample:
sample=1
empty_cache()
image=pipeline(request.prompt,generator=generator, guidance_scale=0.0, num_inference_steps=4, max_sequence_length=256, height=request.height, width=request.width, output_type="pil").images[0]
return image