Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import torch | |
from torch import Tensor, nn | |
import spaces | |
import numpy as np | |
import io | |
import base64 | |
from flax import nnx | |
import jax.numpy as jnp | |
from jax import Array as Tensor | |
from transformers import (FlaxCLIPTextModel, CLIPTokenizer, FlaxT5EncoderModel, | |
T5Tokenizer) | |
models = {} | |
class HFEmbedder(nnx.Module): | |
def __init__(self, version: str, max_length: int, **hf_kwargs): | |
self.is_clip = version.startswith("openai") | |
self.max_length = max_length | |
self.output_key = "pooler_output" if self.is_clip else "last_hidden_state" | |
dtype = hf_kwargs.get("dtype", jnp.float32) | |
if self.is_clip: | |
self.tokenizer: CLIPTokenizer = CLIPTokenizer.from_pretrained(version, max_length=max_length) | |
# self.hf_module: CLIPTextModel = CLIPTextModel.from_pretrained(version, **hf_kwargs) | |
self.hf_module, params = FlaxCLIPTextModel.from_pretrained(version, _do_init=False, **hf_kwargs) | |
else: | |
self.tokenizer: T5Tokenizer = T5Tokenizer.from_pretrained(version, max_length=max_length) | |
# self.hf_module: T5EncoderModel = T5EncoderModel.from_pretrained(version, **hf_kwargs) | |
self.hf_module, params = FlaxT5EncoderModel.from_pretrained(version, _do_init=False,**hf_kwargs) | |
self.hf_module._is_initialized = True | |
import jax | |
self.hf_module.params = jax.tree.map(lambda x: jax.device_put(x, jax.devices("cuda")[0]), params) | |
# if dtype==jnp.bfloat16: | |
def tokenize(self, text: list[str]) -> Tensor: | |
batch_encoding = self.tokenizer( | |
text, | |
truncation=True, | |
max_length=self.max_length, | |
return_length=False, | |
return_overflowing_tokens=False, | |
padding="max_length", | |
return_tensors="jax", | |
) | |
return batch_encoding["input_ids"] | |
def __call__(self, input_ids: Tensor) -> Tensor: | |
# outputs = self.hf_module( | |
# input_ids=batch_encoding["input_ids"].to(self.hf_module.device), | |
# attention_mask=None, | |
# output_hidden_states=False, | |
# ) | |
outputs = self.hf_module( | |
input_ids=input_ids, | |
attention_mask=None, | |
output_hidden_states=False, | |
train=False, | |
) | |
return outputs[self.output_key] | |
# def __call__(self, text: list[str]) -> Tensor: | |
# batch_encoding = self.tokenizer( | |
# text, | |
# truncation=True, | |
# max_length=self.max_length, | |
# return_length=False, | |
# return_overflowing_tokens=False, | |
# padding="max_length", | |
# return_tensors="jax", | |
# ) | |
# # outputs = self.hf_module( | |
# # input_ids=batch_encoding["input_ids"].to(self.hf_module.device), | |
# # attention_mask=None, | |
# # output_hidden_states=False, | |
# # ) | |
# outputs = self.hf_module( | |
# input_ids=batch_encoding["input_ids"], | |
# attention_mask=None, | |
# output_hidden_states=False, | |
# train=False, | |
# ) | |
# return outputs[self.output_key] | |
def load_t5(device: str | torch.device = "cuda", max_length: int = 512) -> HFEmbedder: | |
# max length 64, 128, 256 and 512 should work (if your sequence is short enough) | |
return HFEmbedder("lnyan/t5-v1_1-xxl-encoder", max_length=max_length, dtype=jnp.bfloat16) | |
def load_clip(device: str | torch.device = "cuda") -> HFEmbedder: | |
return HFEmbedder("openai/clip-vit-large-patch14", max_length=77, dtype=jnp.bfloat16) | |
def load_encoders(): | |
is_schnell = True | |
t5 = load_t5("cuda", max_length=256 if is_schnell else 512) | |
clip = load_clip("cuda") | |
return t5, clip | |
import numpy as np | |
def b64(txt,vec): | |
buffer = io.BytesIO() | |
jnp.savez(buffer, txt=txt, vec=vec) | |
buffer.seek(0) | |
encoded = base64.b64encode(buffer.getvalue()).decode('utf-8') | |
return encoded | |
# t5,clip=load_encoders() | |
def convert(prompt): | |
t5,clip=models["t5"],models["clip"] | |
if isinstance(prompt, str): | |
prompt = [prompt] | |
txt = t5.tokenize(prompt) | |
txt = t5(txt) | |
vec = clip.tokenize(prompt) | |
vec = clip(vec) | |
return b64(txt,vec) | |
import jax | |
def _to_embed(t5, clip, txt, vec): | |
t5=nnx.merge(*t5) | |
clip=nnx.merge(*clip) | |
return t5(txt), clip(vec) | |
to_embed=jax.jit(_to_embed) | |
# t5_tuple=nnx.split(t5) | |
# clip_tuple=nnx.split(clip) | |
def compile(prompt): | |
t5,clip,t5_tuple,clip_tuple=models["t5"],models["clip"],models["t5_tuple"],models["clip_tuple"] | |
if isinstance(prompt, str): | |
prompt = [prompt] | |
txt = t5.tokenize(prompt) | |
vec = clip.tokenize(prompt) | |
text,vec=to_embed(t5_tuple,clip_tuple,txt,vec) | |
return b64(txt,vec) | |
def load(prompt): | |
is_schnell = True | |
t5 = load_t5("cuda", max_length=256 if is_schnell else 512) | |
clip = load_clip("cuda") | |
models["t5"]=t5 | |
models["clip"]=clip | |
models["t5_tuple"]=nnx.split(t5) | |
models["clip_tuple"]=nnx.split(clip) | |
return "Loaded" | |
print(load("")) | |
with gr.Blocks() as demo: | |
gr.Markdown("""A workaround for flux-flax to fit into 40G VRAM""") | |
with gr.Row(): | |
with gr.Column(): | |
prompt = gr.Textbox(label="prompt") | |
convert_btn = gr.Button(value="Convert") | |
compile_btn = gr.Button(value="Compile") | |
load_btn = gr.Button(value="Load") | |
with gr.Column(): | |
output = gr.Textbox(label="output") | |
load_btn.click(load, inputs=prompt, outputs=output, api_name="load") | |
convert_btn.click(convert, inputs=prompt, outputs=output, api_name="convert") | |
compile_btn.click(compile, inputs=prompt, outputs=output, api_name="compile") | |
demo.launch() | |