|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import gradio as gr |
|
from env import BASE_MODEL_NAME, LORA_WEIGHTS_PATH, PROMPTS |
|
|
|
examples = [ |
|
[ |
|
PROMPTS, |
|
'low quality', |
|
7.5, |
|
512, |
|
512, |
|
25, |
|
"DPMSolver" |
|
], |
|
] |
|
import inspect |
|
import os |
|
import random |
|
import re |
|
import time |
|
from typing import Callable, List, Optional, Union |
|
|
|
import numpy as np |
|
import paddle |
|
import PIL |
|
import PIL.Image |
|
from packaging import version |
|
|
|
from paddlenlp.transformers import CLIPFeatureExtractor, CLIPTextModel, CLIPTokenizer |
|
|
|
from ppdiffusers.configuration_utils import FrozenDict |
|
from ppdiffusers.models import AutoencoderKL, UNet2DConditionModel |
|
from ppdiffusers.pipeline_utils import DiffusionPipeline |
|
from ppdiffusers.schedulers import ( |
|
DDIMScheduler, |
|
DPMSolverMultistepScheduler, |
|
EulerAncestralDiscreteScheduler, |
|
EulerDiscreteScheduler, |
|
LMSDiscreteScheduler, |
|
PNDMScheduler, |
|
HeunDiscreteScheduler, |
|
KDPM2AncestralDiscreteScheduler, |
|
KDPM2DiscreteScheduler, |
|
|
|
) |
|
from ppdiffusers.utils import PIL_INTERPOLATION, deprecate, logging |
|
from ppdiffusers.utils.testing_utils import load_image |
|
from ppdiffusers.pipelines.stable_diffusion import StableDiffusionPipelineOutput |
|
from ppdiffusers.pipelines.stable_diffusion.safety_checker import StableDiffusionSafetyChecker |
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
|
|
def save_all(images, FORMAT="jpg", OUTDIR="./outputs/"): |
|
if not isinstance(images, (list, tuple)): |
|
images = [images] |
|
for image in images: |
|
PRECISION = "fp32" |
|
argument = image.argument |
|
os.makedirs(OUTDIR, exist_ok=True) |
|
epoch_time = argument["epoch_time"] |
|
PROMPT = argument["prompt"] |
|
NEGPROMPT = argument["negative_prompt"] |
|
HEIGHT = argument["height"] |
|
WIDTH = argument["width"] |
|
SEED = argument["seed"] |
|
STRENGTH = argument.get("strength", 1) |
|
INFERENCE_STEPS = argument["num_inference_steps"] |
|
GUIDANCE_SCALE = argument["guidance_scale"] |
|
|
|
filename = f"{str(epoch_time)}_scale_{GUIDANCE_SCALE}_steps_{INFERENCE_STEPS}_seed_{SEED}.{FORMAT}" |
|
filedir = f"{OUTDIR}/{filename}" |
|
image.save(filedir) |
|
with open(f"{OUTDIR}/{epoch_time}_prompt.txt", "w") as file: |
|
file.write( |
|
f"PROMPT: {PROMPT}\nNEG_PROMPT: {NEGPROMPT}\n\nINFERENCE_STEPS: {INFERENCE_STEPS}\nHeight: {HEIGHT}\nWidth: {WIDTH}\nSeed: {SEED}\n\nPrecision: {PRECISION}\nSTRENGTH: {STRENGTH}\nGUIDANCE_SCALE: {GUIDANCE_SCALE}" |
|
) |
|
|
|
|
|
re_attention = re.compile( |
|
r""" |
|
\\\(| |
|
\\\)| |
|
\\\[| |
|
\\]| |
|
\\\\| |
|
\\| |
|
\(| |
|
\[| |
|
:([+-]?[.\d]+)\)| |
|
\)| |
|
]| |
|
[^\\()\[\]:]+| |
|
: |
|
""", |
|
re.X, |
|
) |
|
|
|
|
|
def parse_prompt_attention(text): |
|
""" |
|
Parses a string with attention tokens and returns a list of pairs: text and its associated weight. |
|
Accepted tokens are: |
|
(abc) - increases attention to abc by a multiplier of 1.1 |
|
(abc:3.12) - increases attention to abc by a multiplier of 3.12 |
|
[abc] - decreases attention to abc by a multiplier of 1.1 |
|
\( - literal character '(' |
|
\[ - literal character '[' |
|
\) - literal character ')' |
|
\] - literal character ']' |
|
\\ - literal character '\' |
|
anything else - just text |
|
>>> parse_prompt_attention('normal text') |
|
[['normal text', 1.0]] |
|
>>> parse_prompt_attention('an (important) word') |
|
[['an ', 1.0], ['important', 1.1], [' word', 1.0]] |
|
>>> parse_prompt_attention('(unbalanced') |
|
[['unbalanced', 1.1]] |
|
>>> parse_prompt_attention('\(literal\]') |
|
[['(literal]', 1.0]] |
|
>>> parse_prompt_attention('(unnecessary)(parens)') |
|
[['unnecessaryparens', 1.1]] |
|
>>> parse_prompt_attention('a (((house:1.3)) [on] a (hill:0.5), sun, (((sky))).') |
|
[['a ', 1.0], |
|
['house', 1.5730000000000004], |
|
[' ', 1.1], |
|
['on', 1.0], |
|
[' a ', 1.1], |
|
['hill', 0.55], |
|
[', sun, ', 1.1], |
|
['sky', 1.4641000000000006], |
|
['.', 1.1]] |
|
""" |
|
|
|
res = [] |
|
round_brackets = [] |
|
square_brackets = [] |
|
|
|
round_bracket_multiplier = 1.1 |
|
square_bracket_multiplier = 1 / 1.1 |
|
|
|
def multiply_range(start_position, multiplier): |
|
for p in range(start_position, len(res)): |
|
res[p][1] *= multiplier |
|
|
|
for m in re_attention.finditer(text): |
|
text = m.group(0) |
|
weight = m.group(1) |
|
|
|
if text.startswith("\\"): |
|
res.append([text[1:], 1.0]) |
|
elif text == "(": |
|
round_brackets.append(len(res)) |
|
elif text == "[": |
|
square_brackets.append(len(res)) |
|
elif weight is not None and len(round_brackets) > 0: |
|
multiply_range(round_brackets.pop(), float(weight)) |
|
elif text == ")" and len(round_brackets) > 0: |
|
multiply_range(round_brackets.pop(), round_bracket_multiplier) |
|
elif text == "]" and len(square_brackets) > 0: |
|
multiply_range(square_brackets.pop(), square_bracket_multiplier) |
|
else: |
|
res.append([text, 1.0]) |
|
|
|
for pos in round_brackets: |
|
multiply_range(pos, round_bracket_multiplier) |
|
|
|
for pos in square_brackets: |
|
multiply_range(pos, square_bracket_multiplier) |
|
|
|
if len(res) == 0: |
|
res = [["", 1.0]] |
|
|
|
|
|
i = 0 |
|
while i + 1 < len(res): |
|
if res[i][1] == res[i + 1][1]: |
|
res[i][0] += res[i + 1][0] |
|
res.pop(i + 1) |
|
else: |
|
i += 1 |
|
|
|
return res |
|
|
|
|
|
def get_prompts_with_weights(pipe: DiffusionPipeline, prompt: List[str], max_length: int): |
|
r""" |
|
Tokenize a list of prompts and return its tokens with weights of each token. |
|
|
|
No padding, starting or ending token is included. |
|
""" |
|
tokens = [] |
|
weights = [] |
|
for text in prompt: |
|
texts_and_weights = parse_prompt_attention(text) |
|
text_token = [] |
|
text_weight = [] |
|
for word, weight in texts_and_weights: |
|
|
|
token = pipe.tokenizer(word).input_ids[1:-1] |
|
text_token += token |
|
|
|
|
|
text_weight += [weight] * len(token) |
|
|
|
|
|
if len(text_token) > max_length: |
|
break |
|
|
|
|
|
if len(text_token) > max_length: |
|
text_token = text_token[:max_length] |
|
text_weight = text_weight[:max_length] |
|
|
|
tokens.append(text_token) |
|
weights.append(text_weight) |
|
return tokens, weights |
|
|
|
|
|
def pad_tokens_and_weights(tokens, weights, max_length, bos, eos, pad, no_boseos_middle=True, chunk_length=77): |
|
r""" |
|
Pad the tokens (with starting and ending tokens) and weights (with 1.0) to max_length. |
|
""" |
|
max_embeddings_multiples = (max_length - 2) // (chunk_length - 2) |
|
weights_length = max_length if no_boseos_middle else max_embeddings_multiples * chunk_length |
|
for i in range(len(tokens)): |
|
tokens[i] = [bos] + tokens[i] + [eos] + [pad] * (max_length - 2 - len(tokens[i])) |
|
if no_boseos_middle: |
|
weights[i] = [1.0] + weights[i] + [1.0] * (max_length - 1 - len(weights[i])) |
|
else: |
|
w = [] |
|
if len(weights[i]) == 0: |
|
w = [1.0] * weights_length |
|
else: |
|
for j in range((len(weights[i]) - 1) // chunk_length + 1): |
|
w.append(1.0) |
|
w += weights[i][j * chunk_length : min(len(weights[i]), (j + 1) * chunk_length)] |
|
w.append(1.0) |
|
w += [1.0] * (weights_length - len(w)) |
|
weights[i] = w[:] |
|
|
|
return tokens, weights |
|
|
|
|
|
def get_unweighted_text_embeddings( |
|
pipe: DiffusionPipeline, text_input: paddle.Tensor, chunk_length: int, no_boseos_middle: Optional[bool] = True |
|
): |
|
""" |
|
When the length of tokens is a multiple of the capacity of the text encoder, |
|
it should be split into chunks and sent to the text encoder individually. |
|
""" |
|
max_embeddings_multiples = (text_input.shape[1] - 2) // (chunk_length - 2) |
|
if max_embeddings_multiples > 1: |
|
text_embeddings = [] |
|
for i in range(max_embeddings_multiples): |
|
|
|
text_input_chunk = text_input[:, i * (chunk_length - 2) : (i + 1) * (chunk_length - 2) + 2].clone() |
|
|
|
|
|
text_input_chunk[:, 0] = text_input[0, 0] |
|
text_input_chunk[:, -1] = text_input[0, -1] |
|
|
|
text_embedding = pipe.text_encoder(text_input_chunk)[0] |
|
|
|
if no_boseos_middle: |
|
if i == 0: |
|
|
|
text_embedding = text_embedding[:, :-1] |
|
elif i == max_embeddings_multiples - 1: |
|
|
|
text_embedding = text_embedding[:, 1:] |
|
else: |
|
|
|
text_embedding = text_embedding[:, 1:-1] |
|
|
|
text_embeddings.append(text_embedding) |
|
text_embeddings = paddle.concat(text_embeddings, axis=1) |
|
else: |
|
text_embeddings = pipe.text_encoder(text_input)[0] |
|
return text_embeddings |
|
|
|
|
|
def get_weighted_text_embeddings( |
|
pipe: DiffusionPipeline, |
|
prompt: Union[str, List[str]], |
|
uncond_prompt: Optional[Union[str, List[str]]] = None, |
|
max_embeddings_multiples: Optional[int] = 1, |
|
no_boseos_middle: Optional[bool] = False, |
|
skip_parsing: Optional[bool] = False, |
|
skip_weighting: Optional[bool] = False, |
|
**kwargs |
|
): |
|
r""" |
|
Prompts can be assigned with local weights using brackets. For example, |
|
prompt 'A (very beautiful) masterpiece' highlights the words 'very beautiful', |
|
and the embedding tokens corresponding to the words get multiplied by a constant, 1.1. |
|
|
|
Also, to regularize of the embedding, the weighted embedding would be scaled to preserve the original mean. |
|
|
|
Args: |
|
pipe (`DiffusionPipeline`): |
|
Pipe to provide access to the tokenizer and the text encoder. |
|
prompt (`str` or `List[str]`): |
|
The prompt or prompts to guide the image generation. |
|
uncond_prompt (`str` or `List[str]`): |
|
The unconditional prompt or prompts for guide the image generation. If unconditional prompt |
|
is provided, the embeddings of prompt and uncond_prompt are concatenated. |
|
max_embeddings_multiples (`int`, *optional*, defaults to `1`): |
|
The max multiple length of prompt embeddings compared to the max output length of text encoder. |
|
no_boseos_middle (`bool`, *optional*, defaults to `False`): |
|
If the length of text token is multiples of the capacity of text encoder, whether reserve the starting and |
|
ending token in each of the chunk in the middle. |
|
skip_parsing (`bool`, *optional*, defaults to `False`): |
|
Skip the parsing of brackets. |
|
skip_weighting (`bool`, *optional*, defaults to `False`): |
|
Skip the weighting. When the parsing is skipped, it is forced True. |
|
""" |
|
max_length = (pipe.tokenizer.model_max_length - 2) * max_embeddings_multiples + 2 |
|
if isinstance(prompt, str): |
|
prompt = [prompt] |
|
|
|
if not skip_parsing: |
|
prompt_tokens, prompt_weights = get_prompts_with_weights(pipe, prompt, max_length - 2) |
|
if uncond_prompt is not None: |
|
if isinstance(uncond_prompt, str): |
|
uncond_prompt = [uncond_prompt] |
|
uncond_tokens, uncond_weights = get_prompts_with_weights(pipe, uncond_prompt, max_length - 2) |
|
else: |
|
prompt_tokens = [ |
|
token[1:-1] for token in pipe.tokenizer(prompt, max_length=max_length, truncation=True).input_ids |
|
] |
|
prompt_weights = [[1.0] * len(token) for token in prompt_tokens] |
|
if uncond_prompt is not None: |
|
if isinstance(uncond_prompt, str): |
|
uncond_prompt = [uncond_prompt] |
|
uncond_tokens = [ |
|
token[1:-1] |
|
for token in pipe.tokenizer(uncond_prompt, max_length=max_length, truncation=True).input_ids |
|
] |
|
uncond_weights = [[1.0] * len(token) for token in uncond_tokens] |
|
|
|
|
|
max_length = max([len(token) for token in prompt_tokens]) |
|
if uncond_prompt is not None: |
|
max_length = max(max_length, max([len(token) for token in uncond_tokens])) |
|
|
|
max_embeddings_multiples = min( |
|
max_embeddings_multiples, (max_length - 1) // (pipe.tokenizer.model_max_length - 2) + 1 |
|
) |
|
max_embeddings_multiples = max(1, max_embeddings_multiples) |
|
max_length = (pipe.tokenizer.model_max_length - 2) * max_embeddings_multiples + 2 |
|
|
|
|
|
|
|
bos = pipe.tokenizer.bos_token_id if pipe.tokenizer.bos_token_id is not None else pipe.tokenizer.cls_token_id |
|
eos = pipe.tokenizer.eos_token_id if pipe.tokenizer.eos_token_id is not None else pipe.tokenizer.sep_token_id |
|
pad = pipe.tokenizer.pad_token_id |
|
prompt_tokens, prompt_weights = pad_tokens_and_weights( |
|
prompt_tokens, |
|
prompt_weights, |
|
max_length, |
|
bos, |
|
eos, |
|
pad, |
|
no_boseos_middle=no_boseos_middle, |
|
chunk_length=pipe.tokenizer.model_max_length, |
|
) |
|
prompt_tokens = paddle.to_tensor(prompt_tokens) |
|
if uncond_prompt is not None: |
|
uncond_tokens, uncond_weights = pad_tokens_and_weights( |
|
uncond_tokens, |
|
uncond_weights, |
|
max_length, |
|
bos, |
|
eos, |
|
pad, |
|
no_boseos_middle=no_boseos_middle, |
|
chunk_length=pipe.tokenizer.model_max_length, |
|
) |
|
uncond_tokens = paddle.to_tensor(uncond_tokens) |
|
|
|
|
|
text_embeddings = get_unweighted_text_embeddings( |
|
pipe, prompt_tokens, pipe.tokenizer.model_max_length, no_boseos_middle=no_boseos_middle |
|
) |
|
prompt_weights = paddle.to_tensor(prompt_weights, dtype=text_embeddings.dtype) |
|
if uncond_prompt is not None: |
|
uncond_embeddings = get_unweighted_text_embeddings( |
|
pipe, uncond_tokens, pipe.tokenizer.model_max_length, no_boseos_middle=no_boseos_middle |
|
) |
|
uncond_weights = paddle.to_tensor(uncond_weights, dtype=uncond_embeddings.dtype) |
|
|
|
|
|
|
|
if (not skip_parsing) and (not skip_weighting): |
|
previous_mean = text_embeddings.mean(axis=[-2, -1]) |
|
text_embeddings *= prompt_weights.unsqueeze(-1) |
|
text_embeddings *= previous_mean / text_embeddings.mean(axis=[-2, -1]) |
|
if uncond_prompt is not None: |
|
previous_mean = uncond_embeddings.mean(axis=[-2, -1]) |
|
uncond_embeddings *= uncond_weights.unsqueeze(-1) |
|
uncond_embeddings *= previous_mean / uncond_embeddings.mean(axis=[-2, -1]) |
|
|
|
|
|
|
|
|
|
if uncond_prompt is not None: |
|
text_embeddings = paddle.concat([uncond_embeddings, text_embeddings]) |
|
|
|
return text_embeddings |
|
|
|
|
|
def preprocess_image(image): |
|
w, h = image.size |
|
w, h = map(lambda x: x - x % 32, (w, h)) |
|
image = image.resize((w, h), resample=PIL_INTERPOLATION["lanczos"]) |
|
image = np.array(image).astype(np.float32) / 255.0 |
|
image = image[None].transpose(0, 3, 1, 2) |
|
image = paddle.to_tensor(image) |
|
return 2.0 * image - 1.0 |
|
|
|
|
|
def preprocess_mask(mask): |
|
mask = mask.convert("L") |
|
w, h = mask.size |
|
w, h = map(lambda x: x - x % 32, (w, h)) |
|
mask = mask.resize((w // 8, h // 8), resample=PIL_INTERPOLATION["nearest"]) |
|
mask = np.array(mask).astype(np.float32) / 255.0 |
|
mask = np.tile(mask, (4, 1, 1)) |
|
mask = mask[None].transpose(0, 1, 2, 3) |
|
mask = 1 - mask |
|
mask = paddle.to_tensor(mask) |
|
return mask |
|
|
|
|
|
class StableDiffusionPipelineAllinOne(DiffusionPipeline): |
|
r""" |
|
Pipeline for text-to-image image-to-image inpainting generation using Stable Diffusion. |
|
|
|
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the |
|
library implements for all the pipelines (such as downloading or saving, running on a particular xxxx, etc.) |
|
|
|
Args: |
|
vae ([`AutoencoderKL`]): |
|
Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations. |
|
text_encoder ([`CLIPTextModel`]): |
|
Frozen text-encoder. Stable Diffusion uses the text portion of |
|
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specifically |
|
the [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant. |
|
tokenizer (`CLIPTokenizer`): |
|
Tokenizer of class |
|
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer). |
|
unet ([`UNet2DConditionModel`]): Conditional U-Net architecture to denoise the encoded image latents. |
|
scheduler ([`SchedulerMixin`]): |
|
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of |
|
[`DDIMScheduler`], [`LMSDiscreteScheduler`], [`PNDMScheduler`], [`EulerDiscreteScheduler`], [`EulerAncestralDiscreteScheduler`] |
|
or [`DPMSolverMultistepScheduler`]. |
|
safety_checker ([`StableDiffusionSafetyChecker`]): |
|
Classification module that estimates whether generated images could be considered offensive or harmful. |
|
Please, refer to the [model card](https://huggingface.co/junnyu/stable-diffusion-v1-4-paddle) for details. |
|
feature_extractor ([`CLIPFeatureExtractor`]): |
|
Model that extracts features from generated images to be used as inputs for the `safety_checker`. |
|
""" |
|
_optional_components = ["safety_checker", "feature_extractor"] |
|
|
|
def __init__( |
|
self, |
|
vae: AutoencoderKL, |
|
text_encoder: CLIPTextModel, |
|
tokenizer: CLIPTokenizer, |
|
unet: UNet2DConditionModel, |
|
scheduler: Union[ |
|
DDIMScheduler, |
|
PNDMScheduler, |
|
LMSDiscreteScheduler, |
|
EulerDiscreteScheduler, |
|
EulerAncestralDiscreteScheduler, |
|
DPMSolverMultistepScheduler, |
|
], |
|
safety_checker: StableDiffusionSafetyChecker, |
|
feature_extractor: CLIPFeatureExtractor, |
|
requires_safety_checker: bool = False, |
|
): |
|
if hasattr(scheduler.config, "steps_offset") and scheduler.config.steps_offset != 1: |
|
deprecation_message = ( |
|
f"The configuration file of this scheduler: {scheduler} is outdated. `steps_offset`" |
|
f" should be set to 1 instead of {scheduler.config.steps_offset}. Please make sure " |
|
"to update the config accordingly as leaving `steps_offset` might led to incorrect results" |
|
" in future versions. If you have downloaded this checkpoint from the Hugging Face Hub," |
|
" it would be very nice if you could open a Pull request for the `scheduler/scheduler_config.json`" |
|
" file" |
|
) |
|
deprecate("steps_offset!=1", "1.0.0", deprecation_message, standard_warn=False) |
|
new_config = dict(scheduler.config) |
|
new_config["steps_offset"] = 1 |
|
scheduler._internal_dict = FrozenDict(new_config) |
|
|
|
if hasattr(scheduler.config, "clip_sample") and scheduler.config.clip_sample is True: |
|
deprecation_message = ( |
|
f"The configuration file of this scheduler: {scheduler} has not set the configuration `clip_sample`." |
|
" `clip_sample` should be set to False in the configuration file. Please make sure to update the" |
|
" config accordingly as not setting `clip_sample` in the config might lead to incorrect results in" |
|
" future versions. If you have downloaded this checkpoint from the Hugging Face Hub, it would be very" |
|
" nice if you could open a Pull request for the `scheduler/scheduler_config.json` file" |
|
) |
|
deprecate("clip_sample not set", "1.0.0", deprecation_message, standard_warn=False) |
|
new_config = dict(scheduler.config) |
|
new_config["clip_sample"] = False |
|
scheduler._internal_dict = FrozenDict(new_config) |
|
|
|
if safety_checker is None and requires_safety_checker: |
|
logger.warning( |
|
f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure" |
|
" that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered" |
|
" results in services or applications open to the public. PaddleNLP team, diffusers team and Hugging Face" |
|
" strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling" |
|
" it only for use-cases that involve analyzing network behavior or auditing its results. For more" |
|
" information, please have a look at https://github.com/huggingface/diffusers/pull/254 ." |
|
) |
|
if safety_checker is not None and feature_extractor is None: |
|
raise ValueError( |
|
"Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety" |
|
" checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead." |
|
) |
|
is_unet_version_less_0_9_0 = hasattr(unet.config, "_ppdiffusers_version") and version.parse( |
|
version.parse(unet.config._ppdiffusers_version).base_version |
|
) < version.parse("0.9.0.dev0") |
|
is_unet_sample_size_less_64 = hasattr(unet.config, "sample_size") and unet.config.sample_size < 64 |
|
if is_unet_version_less_0_9_0 and is_unet_sample_size_less_64: |
|
deprecation_message = ( |
|
"The configuration file of the unet has set the default `sample_size` to smaller than" |
|
" 64 which seems highly unlikely .If you're checkpoint is a fine-tuned version of any of the" |
|
" following: \n- CompVis/stable-diffusion-v1-4 \n- CompVis/stable-diffusion-v1-3 \n-" |
|
" CompVis/stable-diffusion-v1-2 \n- CompVis/stable-diffusion-v1-1 \n- runwayml/stable-diffusion-v1-5" |
|
" \n- runwayml/stable-diffusion-inpainting \n you should change 'sample_size' to 64 in the" |
|
" configuration file. Please make sure to update the config accordingly as leaving `sample_size=32`" |
|
" in the config might lead to incorrect results in future versions. If you have downloaded this" |
|
" checkpoint from the Hugging Face Hub, it would be very nice if you could open a Pull request for" |
|
" the `unet/config.json` file" |
|
) |
|
deprecate("sample_size<64", "1.0.0", deprecation_message, standard_warn=False) |
|
new_config = dict(unet.config) |
|
new_config["sample_size"] = 64 |
|
unet._internal_dict = FrozenDict(new_config) |
|
|
|
self.register_modules( |
|
vae=vae, |
|
text_encoder=text_encoder, |
|
tokenizer=tokenizer, |
|
unet=unet, |
|
scheduler=scheduler, |
|
safety_checker=safety_checker, |
|
feature_extractor=feature_extractor, |
|
) |
|
self.register_to_config(requires_safety_checker=requires_safety_checker) |
|
|
|
def create_scheduler(self, name="DPMSolver"): |
|
config = self.scheduler.config |
|
if name == "DPMSolver": |
|
return DPMSolverMultistepScheduler.from_config( |
|
config, |
|
thresholding=False, |
|
algorithm_type="dpmsolver++", |
|
solver_type="midpoint", |
|
lower_order_final=True, |
|
) |
|
if name == "EulerDiscrete": |
|
return EulerDiscreteScheduler.from_config(config) |
|
elif name == "EulerAncestralDiscrete": |
|
return EulerAncestralDiscreteScheduler.from_config(config) |
|
elif name == "PNDM": |
|
return PNDMScheduler.from_config(config) |
|
elif name == "DDIM": |
|
return DDIMScheduler.from_config(config) |
|
elif name == "LMSDiscrete": |
|
return LMSDiscreteScheduler.from_config(config) |
|
elif name == "HeunDiscrete": |
|
return HeunDiscreteScheduler.from_config(config) |
|
elif name == "KDPM2AncestralDiscrete": |
|
return KDPM2AncestralDiscreteScheduler.from_config(config) |
|
elif name == "KDPM2Discrete": |
|
return KDPM2DiscreteScheduler.from_config(config) |
|
else: |
|
raise NotImplementedError |
|
|
|
def enable_attention_slicing(self, slice_size: Optional[Union[str, int]] = "auto"): |
|
r""" |
|
Enable sliced attention computation. |
|
|
|
When this option is enabled, the attention module will split the input tensor in slices, to compute attention |
|
in several steps. This is useful to save some memory in exchange for a small speed decrease. |
|
|
|
Args: |
|
slice_size (`str` or `int`, *optional*, defaults to `"auto"`): |
|
When `"auto"`, halves the input to the attention heads, so attention will be computed in two steps. If |
|
a number is provided, uses as many slices as `attention_head_dim // slice_size`. In this case, |
|
`attention_head_dim` must be a multiple of `slice_size`. |
|
""" |
|
if slice_size == "auto": |
|
if isinstance(self.unet.config.attention_head_dim, int): |
|
|
|
|
|
slice_size = self.unet.config.attention_head_dim // 2 |
|
else: |
|
|
|
slice_size = min(self.unet.config.attention_head_dim) |
|
self.unet.set_attention_slice(slice_size) |
|
|
|
def disable_attention_slicing(self): |
|
r""" |
|
Disable sliced attention computation. If `enable_attention_slicing` was previously invoked, this method will go |
|
back to computing attention in one step. |
|
""" |
|
|
|
self.enable_attention_slicing(None) |
|
|
|
def __call__(self, *args, **kwargs): |
|
return self.text2image(*args, **kwargs) |
|
|
|
def text2img(self, *args, **kwargs): |
|
return self.text2image(*args, **kwargs) |
|
|
|
def _encode_prompt( |
|
self, |
|
prompt, |
|
negative_prompt, |
|
max_embeddings_multiples, |
|
no_boseos_middle, |
|
skip_parsing, |
|
skip_weighting, |
|
do_classifier_free_guidance, |
|
num_images_per_prompt, |
|
): |
|
if do_classifier_free_guidance and negative_prompt is None: |
|
negative_prompt = "" |
|
text_embeddings = get_weighted_text_embeddings( |
|
self, prompt, negative_prompt, max_embeddings_multiples, no_boseos_middle, skip_parsing, skip_weighting |
|
) |
|
|
|
bs_embed, seq_len, _ = text_embeddings.shape |
|
text_embeddings = text_embeddings.tile([1, num_images_per_prompt, 1]) |
|
text_embeddings = text_embeddings.reshape([bs_embed * num_images_per_prompt, seq_len, -1]) |
|
return text_embeddings |
|
|
|
def run_safety_checker(self, image, dtype): |
|
if self.safety_checker is not None: |
|
safety_checker_input = self.feature_extractor(self.numpy_to_pil(image), return_tensors="pd") |
|
image, has_nsfw_concept = self.safety_checker( |
|
images=image, clip_input=safety_checker_input.pixel_values.cast(dtype) |
|
) |
|
else: |
|
has_nsfw_concept = None |
|
return image, has_nsfw_concept |
|
|
|
def decode_latents(self, latents): |
|
latents = 1 / 0.18215 * latents |
|
image = self.vae.decode(latents).sample |
|
image = (image / 2 + 0.5).clip(0, 1) |
|
|
|
image = image.transpose([0, 2, 3, 1]).cast("float32").numpy() |
|
return image |
|
|
|
def prepare_extra_step_kwargs(self, eta, scheduler): |
|
|
|
|
|
|
|
|
|
|
|
accepts_eta = "eta" in set(inspect.signature(scheduler.step).parameters.keys()) |
|
extra_step_kwargs = {} |
|
if accepts_eta: |
|
extra_step_kwargs["eta"] = eta |
|
|
|
return extra_step_kwargs |
|
|
|
def check_inputs_text2img(self, prompt, height, width, callback_steps): |
|
if not isinstance(prompt, str) and not isinstance(prompt, list): |
|
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}") |
|
|
|
if height % 8 != 0 or width % 8 != 0: |
|
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.") |
|
|
|
if (callback_steps is None) or ( |
|
callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0) |
|
): |
|
raise ValueError( |
|
f"`callback_steps` has to be a positive integer but is {callback_steps} of type" |
|
f" {type(callback_steps)}." |
|
) |
|
|
|
def check_inputs_img2img_inpaint(self, prompt, strength, callback_steps): |
|
if not isinstance(prompt, str) and not isinstance(prompt, list): |
|
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}") |
|
|
|
if strength < 0 or strength > 1: |
|
raise ValueError(f"The value of strength should in [1.0, 1.0] but is {strength}") |
|
|
|
if (callback_steps is None) or ( |
|
callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0) |
|
): |
|
raise ValueError( |
|
f"`callback_steps` has to be a positive integer but is {callback_steps} of type" |
|
f" {type(callback_steps)}." |
|
) |
|
|
|
def prepare_latents_text2img(self, batch_size, num_channels_latents, height, width, dtype, latents=None, scheduler=None): |
|
shape = [batch_size, num_channels_latents, height // 8, width // 8] |
|
if latents is None: |
|
latents = paddle.randn(shape, dtype=dtype) |
|
else: |
|
if latents.shape != shape: |
|
raise ValueError(f"Unexpected latents shape, got {latents.shape}, expected {shape}") |
|
|
|
|
|
latents = latents * scheduler.init_noise_sigma |
|
return latents |
|
|
|
def prepare_latents_img2img(self, image, timestep, num_images_per_prompt, dtype, scheduler): |
|
image = image.cast(dtype=dtype) |
|
init_latent_dist = self.vae.encode(image).latent_dist |
|
init_latents = init_latent_dist.sample() |
|
init_latents = 0.18215 * init_latents |
|
|
|
b, c, h, w = init_latents.shape |
|
init_latents = init_latents.tile([1, num_images_per_prompt, 1, 1]) |
|
init_latents = init_latents.reshape([b * num_images_per_prompt, c, h, w]) |
|
|
|
|
|
noise = paddle.randn(init_latents.shape, dtype=dtype) |
|
|
|
|
|
init_latents = scheduler.add_noise(init_latents, noise, timestep) |
|
latents = init_latents |
|
|
|
return latents |
|
|
|
def get_timesteps(self, num_inference_steps, strength, scheduler): |
|
|
|
offset = scheduler.config.get("steps_offset", 0) |
|
init_timestep = int(num_inference_steps * strength) + offset |
|
init_timestep = min(init_timestep, num_inference_steps) |
|
|
|
t_start = max(num_inference_steps - init_timestep + offset, 0) |
|
timesteps = scheduler.timesteps[t_start:] |
|
|
|
return timesteps, num_inference_steps - t_start |
|
|
|
def prepare_latents_inpaint(self, image, timestep, num_images_per_prompt, dtype, scheduler): |
|
image = image.cast(dtype) |
|
init_latent_dist = self.vae.encode(image).latent_dist |
|
init_latents = init_latent_dist.sample() |
|
init_latents = 0.18215 * init_latents |
|
|
|
b, c, h, w = init_latents.shape |
|
init_latents = init_latents.tile([1, num_images_per_prompt, 1, 1]) |
|
init_latents = init_latents.reshape([b * num_images_per_prompt, c, h, w]) |
|
|
|
init_latents_orig = init_latents |
|
|
|
|
|
noise = paddle.randn(init_latents.shape, dtype=dtype) |
|
init_latents = scheduler.add_noise(init_latents, noise, timestep) |
|
latents = init_latents |
|
return latents, init_latents_orig, noise |
|
|
|
@paddle.no_grad() |
|
def text2image( |
|
self, |
|
prompt: Union[str, List[str]], |
|
height: int = 512, |
|
width: int = 512, |
|
num_inference_steps: int = 50, |
|
guidance_scale: float = 7.5, |
|
negative_prompt: Optional[Union[str, List[str]]] = None, |
|
num_images_per_prompt: Optional[int] = 1, |
|
eta: float = 0.0, |
|
seed: Optional[int] = None, |
|
latents: Optional[paddle.Tensor] = None, |
|
output_type: Optional[str] = "pil", |
|
return_dict: bool = True, |
|
callback: Optional[Callable[[int, int, paddle.Tensor], None]] = None, |
|
callback_steps: Optional[int] = 1, |
|
|
|
max_embeddings_multiples: Optional[int] = 1, |
|
no_boseos_middle: Optional[bool] = False, |
|
skip_parsing: Optional[bool] = False, |
|
skip_weighting: Optional[bool] = False, |
|
scheduler=None, |
|
**kwargs, |
|
): |
|
r""" |
|
Function invoked when calling the pipeline for generation. |
|
|
|
Args: |
|
prompt (`str` or `List[str]`): |
|
The prompt or prompts to guide the image generation. |
|
height (`int`, *optional*, defaults to 512): |
|
The height in pixels of the generated image. |
|
width (`int`, *optional*, defaults to 512): |
|
The width in pixels of the generated image. |
|
num_inference_steps (`int`, *optional*, defaults to 50): |
|
The number of denoising steps. More denoising steps usually lead to a higher quality image at the |
|
expense of slower inference. |
|
guidance_scale (`float`, *optional*, defaults to 7.5): |
|
Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598). |
|
`guidance_scale` is defined as `w` of equation 2. of [Imagen |
|
Paper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale > |
|
1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`, |
|
usually at the expense of lower image quality. |
|
negative_prompt (`str` or `List[str]`, *optional*): |
|
The prompt or prompts not to guide the image generation. Ignored when not using guidance (i.e., ignored |
|
if `guidance_scale` is less than `1`). |
|
num_images_per_prompt (`int`, *optional*, defaults to 1): |
|
The number of images to generate per prompt. |
|
eta (`float`, *optional*, defaults to 0.0): |
|
Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to |
|
[`schedulers.DDIMScheduler`], will be ignored for others. |
|
seed (`int`, *optional*): |
|
Random number seed. |
|
latents (`paddle.Tensor`, *optional*): |
|
Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for image |
|
generation. Can be used to tweak the same generation with different prompts. If not provided, a latents |
|
tensor will ge generated by sampling using the supplied random `seed`. |
|
output_type (`str`, *optional*, defaults to `"pil"`): |
|
The output format of the generate image. Choose between |
|
[PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`. |
|
return_dict (`bool`, *optional*, defaults to `True`): |
|
Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a |
|
plain tuple. |
|
callback (`Callable`, *optional*): |
|
A function that will be called every `callback_steps` steps during inference. The function will be |
|
called with the following arguments: `callback(step: int, timestep: int, latents: paddle.Tensor)`. |
|
callback_steps (`int`, *optional*, defaults to 1): |
|
The frequency at which the `callback` function will be called. If not specified, the callback will be |
|
called at every step. |
|
|
|
Returns: |
|
[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`: |
|
[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple. |
|
When returning a tuple, the first element is a list with the generated images, and the second element is a |
|
list of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work" |
|
(nsfw) content, according to the `safety_checker`. |
|
""" |
|
if scheduler is None: |
|
scheduler = self.scheduler |
|
seed = random.randint(0, 2**32) if seed is None else seed |
|
argument = dict( |
|
prompt=prompt, |
|
negative_prompt=negative_prompt, |
|
height=height, |
|
width=width, |
|
num_inference_steps=num_inference_steps, |
|
guidance_scale=guidance_scale, |
|
num_images_per_prompt=num_images_per_prompt, |
|
eta=eta, |
|
seed=seed, |
|
latents=latents, |
|
max_embeddings_multiples=max_embeddings_multiples, |
|
no_boseos_middle=no_boseos_middle, |
|
skip_parsing=skip_parsing, |
|
skip_weighting=skip_weighting, |
|
epoch_time=time.time(), |
|
) |
|
paddle.seed(seed) |
|
|
|
self.check_inputs_text2img(prompt, height, width, callback_steps) |
|
|
|
|
|
batch_size = 1 if isinstance(prompt, str) else len(prompt) |
|
|
|
|
|
|
|
do_classifier_free_guidance = guidance_scale > 1.0 |
|
|
|
|
|
text_embeddings = self._encode_prompt( |
|
prompt, |
|
negative_prompt, |
|
max_embeddings_multiples, |
|
no_boseos_middle, |
|
skip_parsing, |
|
skip_weighting, |
|
do_classifier_free_guidance, |
|
num_images_per_prompt, |
|
) |
|
|
|
|
|
scheduler.set_timesteps(num_inference_steps) |
|
timesteps = scheduler.timesteps |
|
|
|
|
|
num_channels_latents = self.unet.in_channels |
|
latents = self.prepare_latents_text2img( |
|
batch_size * num_images_per_prompt, |
|
num_channels_latents, |
|
height, |
|
width, |
|
text_embeddings.dtype, |
|
latents, |
|
scheduler=scheduler, |
|
) |
|
|
|
|
|
extra_step_kwargs = self.prepare_extra_step_kwargs(eta, scheduler) |
|
|
|
|
|
num_warmup_steps = len(timesteps) - num_inference_steps * scheduler.order |
|
with self.progress_bar(total=num_inference_steps) as progress_bar: |
|
for i, t in enumerate(timesteps): |
|
|
|
latent_model_input = paddle.concat([latents] * 2) if do_classifier_free_guidance else latents |
|
latent_model_input = scheduler.scale_model_input(latent_model_input, t) |
|
|
|
|
|
noise_pred = self.unet(latent_model_input, t, encoder_hidden_states=text_embeddings).sample |
|
|
|
|
|
if do_classifier_free_guidance: |
|
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) |
|
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) |
|
|
|
|
|
latents = scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample |
|
|
|
|
|
if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % scheduler.order == 0): |
|
progress_bar.update() |
|
if callback is not None and i % callback_steps == 0: |
|
callback(progress_bar.n, progress_bar.total, progress_bar) |
|
|
|
|
|
image = self.decode_latents(latents) |
|
|
|
|
|
image, has_nsfw_concept = self.run_safety_checker(image, text_embeddings.dtype) |
|
|
|
|
|
if output_type == "pil": |
|
image = self.numpy_to_pil(image, argument=argument) |
|
|
|
if not return_dict: |
|
return (image, has_nsfw_concept) |
|
|
|
return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept) |
|
|
|
@paddle.no_grad() |
|
def img2img( |
|
self, |
|
prompt: Union[str, List[str]], |
|
image: Union[paddle.Tensor, PIL.Image.Image], |
|
strength: float = 0.8, |
|
height=None, |
|
width=None, |
|
num_inference_steps: Optional[int] = 50, |
|
guidance_scale: Optional[float] = 7.5, |
|
negative_prompt: Optional[Union[str, List[str]]] = None, |
|
num_images_per_prompt: Optional[int] = 1, |
|
eta: Optional[float] = 0.0, |
|
seed: Optional[int] = None, |
|
output_type: Optional[str] = "pil", |
|
return_dict: bool = True, |
|
callback: Optional[Callable[[int, int, paddle.Tensor], None]] = None, |
|
callback_steps: Optional[int] = 1, |
|
|
|
max_embeddings_multiples: Optional[int] = 1, |
|
no_boseos_middle: Optional[bool] = False, |
|
skip_parsing: Optional[bool] = False, |
|
skip_weighting: Optional[bool] = False, |
|
scheduler=None, |
|
**kwargs, |
|
): |
|
r""" |
|
Function invoked when calling the pipeline for generation. |
|
|
|
Args: |
|
prompt (`str` or `List[str]`): |
|
The prompt or prompts to guide the image generation. |
|
image (`paddle.Tensor` or `PIL.Image.Image`): |
|
`Image`, or tensor representing an image batch, that will be used as the starting point for the |
|
process. |
|
strength (`float`, *optional*, defaults to 0.8): |
|
Conceptually, indicates how much to transform the reference `image`. Must be between 0 and 1. |
|
`image` will be used as a starting point, adding more noise to it the larger the `strength`. The |
|
number of denoising steps depends on the amount of noise initially added. When `strength` is 1, added |
|
noise will be maximum and the denoising process will run for the full number of iterations specified in |
|
`num_inference_steps`. A value of 1, therefore, essentially ignores `image`. |
|
num_inference_steps (`int`, *optional*, defaults to 50): |
|
The number of denoising steps. More denoising steps usually lead to a higher quality image at the |
|
expense of slower inference. This parameter will be modulated by `strength`. |
|
guidance_scale (`float`, *optional*, defaults to 7.5): |
|
Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598). |
|
`guidance_scale` is defined as `w` of equation 2. of [Imagen |
|
Paper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale > |
|
1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`, |
|
usually at the expense of lower image quality. |
|
negative_prompt (`str` or `List[str]`, *optional*): |
|
The prompt or prompts not to guide the image generation. Ignored when not using guidance (i.e., ignored |
|
if `guidance_scale` is less than `1`). |
|
num_images_per_prompt (`int`, *optional*, defaults to 1): |
|
The number of images to generate per prompt. |
|
eta (`float`, *optional*, defaults to 0.0): |
|
Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to |
|
[`schedulers.DDIMScheduler`], will be ignored for others. |
|
seed (`int`, *optional*): |
|
A random seed. |
|
output_type (`str`, *optional*, defaults to `"pil"`): |
|
The output format of the generate image. Choose between |
|
[PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`. |
|
return_dict (`bool`, *optional*, defaults to `True`): |
|
Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a |
|
plain tuple. |
|
callback (`Callable`, *optional*): |
|
A function that will be called every `callback_steps` steps during inference. The function will be |
|
called with the following arguments: `callback(step: int, timestep: int, latents: paddle.Tensor)`. |
|
callback_steps (`int`, *optional*, defaults to 1): |
|
The frequency at which the `callback` function will be called. If not specified, the callback will be |
|
called at every step. |
|
|
|
Returns: |
|
[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`: |
|
[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple. |
|
When returning a tuple, the first element is a list with the generated images, and the second element is a |
|
list of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work" |
|
(nsfw) content, according to the `safety_checker`. |
|
""" |
|
if scheduler is None: |
|
scheduler = self.scheduler |
|
seed = random.randint(0, 2**32) if seed is None else seed |
|
image_str = image |
|
if isinstance(image_str, str): |
|
image = load_image(image_str) |
|
|
|
if height is None and width is None: |
|
width = (image.size[0] // 8) * 8 |
|
height = (image.size[1] // 8) * 8 |
|
elif height is None and width is not None: |
|
height = (image.size[1] // 8) * 8 |
|
elif width is None and height is not None: |
|
width = (image.size[0] // 8) * 8 |
|
else: |
|
height = height |
|
width = width |
|
|
|
argument = dict( |
|
prompt=prompt, |
|
image=image_str, |
|
negative_prompt=negative_prompt, |
|
height=height, |
|
width=width, |
|
strength=strength, |
|
num_inference_steps=num_inference_steps, |
|
guidance_scale=guidance_scale, |
|
num_images_per_prompt=num_images_per_prompt, |
|
eta=eta, |
|
seed=seed, |
|
max_embeddings_multiples=max_embeddings_multiples, |
|
no_boseos_middle=no_boseos_middle, |
|
skip_parsing=skip_parsing, |
|
skip_weighting=skip_weighting, |
|
epoch_time=time.time(), |
|
) |
|
paddle.seed(seed) |
|
|
|
|
|
self.check_inputs_img2img_inpaint(prompt, strength, callback_steps) |
|
|
|
|
|
batch_size = 1 if isinstance(prompt, str) else len(prompt) |
|
|
|
|
|
|
|
do_classifier_free_guidance = guidance_scale > 1.0 |
|
|
|
|
|
text_embeddings = self._encode_prompt( |
|
prompt, |
|
negative_prompt, |
|
max_embeddings_multiples, |
|
no_boseos_middle, |
|
skip_parsing, |
|
skip_weighting, |
|
do_classifier_free_guidance, |
|
num_images_per_prompt, |
|
) |
|
|
|
|
|
if isinstance(image, PIL.Image.Image): |
|
image = image.resize((width, height)) |
|
image = preprocess_image(image) |
|
|
|
|
|
scheduler.set_timesteps(num_inference_steps) |
|
timesteps, num_inference_steps = self.get_timesteps(num_inference_steps, strength, scheduler) |
|
latent_timestep = timesteps[:1].tile([batch_size * num_images_per_prompt]) |
|
|
|
|
|
latents = self.prepare_latents_img2img(image, latent_timestep, num_images_per_prompt, text_embeddings.dtype, scheduler) |
|
|
|
|
|
extra_step_kwargs = self.prepare_extra_step_kwargs(eta, scheduler) |
|
|
|
|
|
num_warmup_steps = len(timesteps) - num_inference_steps * scheduler.order |
|
with self.progress_bar(total=num_inference_steps) as progress_bar: |
|
for i, t in enumerate(timesteps): |
|
|
|
latent_model_input = paddle.concat([latents] * 2) if do_classifier_free_guidance else latents |
|
latent_model_input = scheduler.scale_model_input(latent_model_input, t) |
|
|
|
|
|
noise_pred = self.unet(latent_model_input, t, encoder_hidden_states=text_embeddings).sample |
|
|
|
|
|
if do_classifier_free_guidance: |
|
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) |
|
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) |
|
|
|
|
|
latents = scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample |
|
|
|
|
|
if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % scheduler.order == 0): |
|
progress_bar.update() |
|
if callback is not None and i % callback_steps == 0: |
|
callback(progress_bar.n, progress_bar.total, progress_bar) |
|
|
|
|
|
image = self.decode_latents(latents) |
|
|
|
|
|
image, has_nsfw_concept = self.run_safety_checker(image, text_embeddings.dtype) |
|
|
|
|
|
if output_type == "pil": |
|
image = self.numpy_to_pil(image, argument=argument) |
|
|
|
if not return_dict: |
|
return (image, has_nsfw_concept) |
|
|
|
return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept) |
|
|
|
@paddle.no_grad() |
|
def inpaint( |
|
self, |
|
prompt: Union[str, List[str]], |
|
image: Union[paddle.Tensor, PIL.Image.Image], |
|
mask_image: Union[paddle.Tensor, PIL.Image.Image], |
|
height=None, |
|
width=None, |
|
strength: float = 0.8, |
|
num_inference_steps: Optional[int] = 50, |
|
guidance_scale: Optional[float] = 7.5, |
|
negative_prompt: Optional[Union[str, List[str]]] = None, |
|
num_images_per_prompt: Optional[int] = 1, |
|
eta: Optional[float] = 0.0, |
|
seed: Optional[int] = None, |
|
output_type: Optional[str] = "pil", |
|
return_dict: bool = True, |
|
callback: Optional[Callable[[int, int, paddle.Tensor], None]] = None, |
|
callback_steps: Optional[int] = 1, |
|
|
|
max_embeddings_multiples: Optional[int] = 1, |
|
no_boseos_middle: Optional[bool] = False, |
|
skip_parsing: Optional[bool] = False, |
|
skip_weighting: Optional[bool] = False, |
|
scheduler=None, |
|
**kwargs, |
|
): |
|
r""" |
|
Function invoked when calling the pipeline for generation. |
|
|
|
Args: |
|
prompt (`str` or `List[str]`): |
|
The prompt or prompts to guide the image generation. |
|
image (`paddle.Tensor` or `PIL.Image.Image`): |
|
`Image`, or tensor representing an image batch, that will be used as the starting point for the |
|
process. This is the image whose masked region will be inpainted. |
|
mask_image (`paddle.Tensor` or `PIL.Image.Image`): |
|
`Image`, or tensor representing an image batch, to mask `image`. White pixels in the mask will be |
|
replaced by noise and therefore repainted, while black pixels will be preserved. If `mask_image` is a |
|
PIL image, it will be converted to a single channel (luminance) before use. If it's a tensor, it should |
|
contain one color channel (L) instead of 3, so the expected shape would be `(B, H, W, 1)`. |
|
strength (`float`, *optional*, defaults to 0.8): |
|
Conceptually, indicates how much to inpaint the masked area. Must be between 0 and 1. When `strength` |
|
is 1, the denoising process will be run on the masked area for the full number of iterations specified |
|
in `num_inference_steps`. `image` will be used as a reference for the masked area, adding more |
|
noise to that region the larger the `strength`. If `strength` is 0, no inpainting will occur. |
|
num_inference_steps (`int`, *optional*, defaults to 50): |
|
The reference number of denoising steps. More denoising steps usually lead to a higher quality image at |
|
the expense of slower inference. This parameter will be modulated by `strength`, as explained above. |
|
guidance_scale (`float`, *optional*, defaults to 7.5): |
|
Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598). |
|
`guidance_scale` is defined as `w` of equation 2. of [Imagen |
|
Paper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale > |
|
1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`, |
|
usually at the expense of lower image quality. |
|
negative_prompt (`str` or `List[str]`, *optional*): |
|
The prompt or prompts not to guide the image generation. Ignored when not using guidance (i.e., ignored |
|
if `guidance_scale` is less than `1`). |
|
num_images_per_prompt (`int`, *optional*, defaults to 1): |
|
The number of images to generate per prompt. |
|
eta (`float`, *optional*, defaults to 0.0): |
|
Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to |
|
[`schedulers.DDIMScheduler`], will be ignored for others. |
|
seed (`int`, *optional*): |
|
A random seed. |
|
output_type (`str`, *optional*, defaults to `"pil"`): |
|
The output format of the generate image. Choose between |
|
[PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`. |
|
return_dict (`bool`, *optional*, defaults to `True`): |
|
Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a |
|
plain tuple. |
|
callback (`Callable`, *optional*): |
|
A function that will be called every `callback_steps` steps during inference. The function will be |
|
called with the following arguments: `callback(step: int, timestep: int, latents: paddle.Tensor)`. |
|
callback_steps (`int`, *optional*, defaults to 1): |
|
The frequency at which the `callback` function will be called. If not specified, the callback will be |
|
called at every step. |
|
|
|
Returns: |
|
[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`: |
|
[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple. |
|
When returning a tuple, the first element is a list with the generated images, and the second element is a |
|
list of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work" |
|
(nsfw) content, according to the `safety_checker`. |
|
""" |
|
if scheduler is None: |
|
scheduler = self.scheduler |
|
seed = random.randint(0, 2**32) if seed is None else seed |
|
image_str = image |
|
mask_image_str = mask_image |
|
|
|
if isinstance(image_str, str): |
|
image = load_image(image_str) |
|
if isinstance(mask_image_str, str): |
|
mask_image = load_image(mask_image_str) |
|
|
|
if height is None and width is None: |
|
width = (image.size[0] // 8) * 8 |
|
height = (image.size[1] // 8) * 8 |
|
elif height is None and width is not None: |
|
height = (image.size[1] // 8) * 8 |
|
elif width is None and height is not None: |
|
width = (image.size[0] // 8) * 8 |
|
else: |
|
height = height |
|
width = width |
|
|
|
argument = dict( |
|
prompt=prompt, |
|
image=image_str, |
|
mask_image=mask_image_str, |
|
negative_prompt=negative_prompt, |
|
height=height, |
|
width=width, |
|
strength=strength, |
|
num_inference_steps=num_inference_steps, |
|
guidance_scale=guidance_scale, |
|
num_images_per_prompt=num_images_per_prompt, |
|
eta=eta, |
|
seed=seed, |
|
max_embeddings_multiples=max_embeddings_multiples, |
|
no_boseos_middle=no_boseos_middle, |
|
skip_parsing=skip_parsing, |
|
skip_weighting=skip_weighting, |
|
epoch_time=time.time(), |
|
) |
|
paddle.seed(seed) |
|
|
|
|
|
self.check_inputs_img2img_inpaint(prompt, strength, callback_steps) |
|
|
|
|
|
batch_size = 1 if isinstance(prompt, str) else len(prompt) |
|
|
|
|
|
|
|
do_classifier_free_guidance = guidance_scale > 1.0 |
|
|
|
|
|
text_embeddings = self._encode_prompt( |
|
prompt, |
|
negative_prompt, |
|
max_embeddings_multiples, |
|
no_boseos_middle, |
|
skip_parsing, |
|
skip_weighting, |
|
do_classifier_free_guidance, |
|
num_images_per_prompt, |
|
) |
|
|
|
if not isinstance(image, paddle.Tensor): |
|
image = image.resize((width, height)) |
|
image = preprocess_image(image) |
|
|
|
if not isinstance(mask_image, paddle.Tensor): |
|
mask_image = mask_image.resize((width, height)) |
|
mask_image = preprocess_mask(mask_image) |
|
|
|
|
|
scheduler.set_timesteps(num_inference_steps) |
|
timesteps, num_inference_steps = self.get_timesteps(num_inference_steps, strength, scheduler) |
|
latent_timestep = timesteps[:1].tile([batch_size * num_images_per_prompt]) |
|
|
|
|
|
|
|
latents, init_latents_orig, noise = self.prepare_latents_inpaint( |
|
image, latent_timestep, num_images_per_prompt, text_embeddings.dtype, scheduler |
|
) |
|
|
|
|
|
mask = mask_image.cast(latents.dtype) |
|
mask = paddle.concat([mask] * batch_size * num_images_per_prompt) |
|
|
|
|
|
extra_step_kwargs = self.prepare_extra_step_kwargs(eta, scheduler) |
|
|
|
|
|
num_warmup_steps = len(timesteps) - num_inference_steps * scheduler.order |
|
with self.progress_bar(total=num_inference_steps) as progress_bar: |
|
for i, t in enumerate(timesteps): |
|
|
|
latent_model_input = paddle.concat([latents] * 2) if do_classifier_free_guidance else latents |
|
latent_model_input = scheduler.scale_model_input(latent_model_input, t) |
|
|
|
|
|
noise_pred = self.unet(latent_model_input, t, encoder_hidden_states=text_embeddings).sample |
|
|
|
|
|
if do_classifier_free_guidance: |
|
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) |
|
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) |
|
|
|
|
|
latents = scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample |
|
|
|
init_latents_proper = scheduler.add_noise(init_latents_orig, noise, t) |
|
|
|
latents = (init_latents_proper * mask) + (latents * (1 - mask)) |
|
|
|
|
|
if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % scheduler.order == 0): |
|
progress_bar.update() |
|
if callback is not None and i % callback_steps == 0: |
|
callback(progress_bar.n, progress_bar.total, progress_bar) |
|
|
|
|
|
image = self.decode_latents(latents) |
|
|
|
|
|
image, has_nsfw_concept = self.run_safety_checker(image, text_embeddings.dtype) |
|
|
|
|
|
if output_type == "pil": |
|
image = self.numpy_to_pil(image, argument=argument) |
|
|
|
if not return_dict: |
|
return (image, has_nsfw_concept) |
|
|
|
return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept) |
|
|
|
@staticmethod |
|
def numpy_to_pil(images, **kwargs): |
|
""" |
|
Convert a numpy image or a batch of images to a PIL image. |
|
""" |
|
if images.ndim == 3: |
|
images = images[None, ...] |
|
images = (images * 255).round().astype("uint8") |
|
pil_images = [] |
|
argument = kwargs.pop("argument", None) |
|
for image in images: |
|
image = PIL.Image.fromarray(image) |
|
if argument is not None: |
|
image.argument = argument |
|
pil_images.append(image) |
|
|
|
return pil_images |
|
pipeline = StableDiffusionPipelineAllinOne.from_pretrained(BASE_MODEL_NAME, safety_checker=None) |
|
|
|
if LORA_WEIGHTS_PATH is not None: |
|
pipeline.unet.load_attn_procs(LORA_WEIGHTS_PATH, from_hf_hub=True) |
|
|
|
support_scheduler = [ |
|
"DPMSolver", |
|
"EulerDiscrete", |
|
"EulerAncestralDiscrete", |
|
"PNDM", |
|
"DDIM", |
|
"LMSDiscrete", |
|
"HeunDiscrete", |
|
"KDPM2AncestralDiscrete", |
|
"KDPM2Discrete" |
|
] |
|
|
|
|
|
def infer(prompt, negative, scale, height, width, num_inference_steps, scheduler_name): |
|
scheduler = pipeline.create_scheduler(scheduler_name) |
|
|
|
images = pipeline( |
|
prompt=prompt, negative_prompt=negative, guidance_scale=scale, height=height, width=width, num_inference_steps=num_inference_steps, scheduler=scheduler, |
|
).images |
|
return images |
|
|
|
|
|
css = """ |
|
.gradio-container { |
|
font-family: 'IBM Plex Sans', sans-serif; |
|
} |
|
.gr-button { |
|
color: white; |
|
border-color: black; |
|
background: black; |
|
} |
|
input[type='range'] { |
|
accent-color: black; |
|
} |
|
.dark input[type='range'] { |
|
accent-color: #dfdfdf; |
|
} |
|
.container { |
|
max-width: 730px; |
|
margin: auto; |
|
padding-top: 1.5rem; |
|
} |
|
#gallery { |
|
min-height: 22rem; |
|
margin-bottom: 15px; |
|
margin-left: auto; |
|
margin-right: auto; |
|
border-bottom-right-radius: .5rem !important; |
|
border-bottom-left-radius: .5rem !important; |
|
} |
|
#gallery>div>.h-full { |
|
min-height: 20rem; |
|
} |
|
.details:hover { |
|
text-decoration: underline; |
|
} |
|
.gr-button { |
|
white-space: nowrap; |
|
} |
|
.gr-button:focus { |
|
border-color: rgb(147 197 253 / var(--tw-border-opacity)); |
|
outline: none; |
|
box-shadow: var(--tw-ring-offset-shadow), var(--tw-ring-shadow), var(--tw-shadow, 0 0 #0000); |
|
--tw-border-opacity: 1; |
|
--tw-ring-offset-shadow: var(--tw-ring-inset) 0 0 0 var(--tw-ring-offset-width) var(--tw-ring-offset-color); |
|
--tw-ring-shadow: var(--tw-ring-inset) 0 0 0 calc(3px var(--tw-ring-offset-width)) var(--tw-ring-color); |
|
--tw-ring-color: rgb(191 219 254 / var(--tw-ring-opacity)); |
|
--tw-ring-opacity: .5; |
|
} |
|
#advanced-btn { |
|
font-size: .7rem !important; |
|
line-height: 19px; |
|
margin-top: 12px; |
|
margin-bottom: 12px; |
|
padding: 2px 8px; |
|
border-radius: 14px !important; |
|
} |
|
#advanced-options { |
|
display: none; |
|
margin-bottom: 20px; |
|
} |
|
.footer { |
|
margin-bottom: 45px; |
|
margin-top: 35px; |
|
text-align: center; |
|
border-bottom: 1px solid #e5e5e5; |
|
} |
|
.footer>p { |
|
font-size: .8rem; |
|
display: inline-block; |
|
padding: 0 10px; |
|
transform: translateY(10px); |
|
background: white; |
|
} |
|
.dark .footer { |
|
border-color: #303030; |
|
} |
|
.dark .footer>p { |
|
background: #0b0f19; |
|
} |
|
.acknowledgments h4{ |
|
margin: 1.25em 0 .25em 0; |
|
font-weight: bold; |
|
font-size: 115%; |
|
} |
|
.animate-spin { |
|
animation: spin 1s linear infinite; |
|
} |
|
@keyframes spin { |
|
from { |
|
transform: rotate(0deg); |
|
} |
|
to { |
|
transform: rotate(360deg); |
|
} |
|
} |
|
#share-btn-container { |
|
display: flex; padding-left: 0.5rem !important; padding-right: 0.5rem !important; background-color: #000000; justify-content: center; align-items: center; border-radius: 9999px !important; width: 13rem; |
|
margin-top: 10px; |
|
margin-left: auto; |
|
} |
|
#share-btn { |
|
all: initial; color: #ffffff;font-weight: 600; cursor:pointer; font-family: 'IBM Plex Sans', sans-serif; margin-left: 0.5rem !important; padding-top: 0.25rem !important; padding-bottom: 0.25rem !important;right:0; |
|
} |
|
#share-btn * { |
|
all: unset; |
|
} |
|
#share-btn-container div:nth-child(-n+2){ |
|
width: auto !important; |
|
min-height: 0px !important; |
|
} |
|
#share-btn-container .wrap { |
|
display: none !important; |
|
} |
|
|
|
.gr-form{ |
|
flex: 1 1 50%; border-top-right-radius: 0; border-bottom-right-radius: 0; |
|
} |
|
#prompt-container{ |
|
gap: 0; |
|
} |
|
#prompt-text-input, #negative-prompt-text-input{padding: .45rem 0.625rem} |
|
#component-16{border-top-width: 1px!important;margin-top: 1em} |
|
.image_duplication{position: absolute; width: 100px; left: 50px} |
|
""" |
|
|
|
block = gr.Blocks(css=css) |
|
|
|
with block: |
|
gr.HTML( |
|
""" |
|
<div style="text-align: center; margin: 0 auto;"> |
|
<div |
|
style=" |
|
display: inline-flex; |
|
align-items: center; |
|
gap: 0.8rem; |
|
font-size: 1.75rem; |
|
" |
|
> |
|
<svg |
|
width="0.65em" |
|
height="0.65em" |
|
viewBox="0 0 115 115" |
|
fill="none" |
|
xmlns="http://www.w3.org/2000/svg" |
|
> |
|
<rect width="23" height="23" fill="white"></rect> |
|
<rect y="69" width="23" height="23" fill="white"></rect> |
|
<rect x="23" width="23" height="23" fill="#AEAEAE"></rect> |
|
<rect x="23" y="69" width="23" height="23" fill="#AEAEAE"></rect> |
|
<rect x="46" width="23" height="23" fill="white"></rect> |
|
<rect x="46" y="69" width="23" height="23" fill="white"></rect> |
|
<rect x="69" width="23" height="23" fill="black"></rect> |
|
<rect x="69" y="69" width="23" height="23" fill="black"></rect> |
|
<rect x="92" width="23" height="23" fill="#D9D9D9"></rect> |
|
<rect x="92" y="69" width="23" height="23" fill="#AEAEAE"></rect> |
|
<rect x="115" y="46" width="23" height="23" fill="white"></rect> |
|
<rect x="115" y="115" width="23" height="23" fill="white"></rect> |
|
<rect x="115" y="69" width="23" height="23" fill="#D9D9D9"></rect> |
|
<rect x="92" y="46" width="23" height="23" fill="#AEAEAE"></rect> |
|
<rect x="92" y="115" width="23" height="23" fill="#AEAEAE"></rect> |
|
<rect x="92" y="69" width="23" height="23" fill="white"></rect> |
|
<rect x="69" y="46" width="23" height="23" fill="white"></rect> |
|
<rect x="69" y="115" width="23" height="23" fill="white"></rect> |
|
<rect x="69" y="69" width="23" height="23" fill="#D9D9D9"></rect> |
|
<rect x="46" y="46" width="23" height="23" fill="black"></rect> |
|
<rect x="46" y="115" width="23" height="23" fill="black"></rect> |
|
<rect x="46" y="69" width="23" height="23" fill="black"></rect> |
|
<rect x="23" y="46" width="23" height="23" fill="#D9D9D9"></rect> |
|
<rect x="23" y="115" width="23" height="23" fill="#AEAEAE"></rect> |
|
<rect x="23" y="69" width="23" height="23" fill="black"></rect> |
|
</svg> |
|
<h1 style="font-weight: 900; margin-bottom: 7px;margin-top:5px"> |
|
Dreambooth LoRa Demo |
|
</h1> |
|
</div> |
|
</div> |
|
""" |
|
) |
|
with gr.Group(): |
|
with gr.Box(): |
|
with gr.Row(elem_id="prompt-container").style(mobile_collapse=False, equal_height=True): |
|
with gr.Column(): |
|
text = gr.Textbox( |
|
label="Enter your prompt", |
|
value=PROMPTS, |
|
show_label=False, |
|
max_lines=1, |
|
placeholder="Enter your prompt", |
|
elem_id="prompt-text-input", |
|
).style( |
|
border=(True, False, True, True), |
|
rounded=(True, False, False, True), |
|
container=False, |
|
) |
|
negative = gr.Textbox( |
|
label="Enter your negative prompt", |
|
show_label=False, |
|
max_lines=1, |
|
placeholder="Enter a negative prompt", |
|
elem_id="negative-prompt-text-input", |
|
).style( |
|
border=(True, False, True, True), |
|
rounded=(True, False, False, True), |
|
container=False, |
|
) |
|
btn = gr.Button("Generate image").style( |
|
margin=False, |
|
rounded=(False, True, True, False), |
|
full_width=False, |
|
) |
|
|
|
gallery = gr.Gallery( |
|
label="Generated images", show_label=False, elem_id="gallery" |
|
).style(grid=[1], height="auto") |
|
|
|
|
|
with gr.Accordion("Advanced settings", open=False): |
|
scheduler_name = gr.Dropdown( |
|
label="scheduler_name", choices=support_scheduler, value="DPMSolver" |
|
) |
|
guidance_scale = gr.Slider( |
|
label="Guidance Scale", minimum=1, maximum=30, value=7.5, step=0.1 |
|
) |
|
height = gr.Slider( |
|
label="Height", minimum=256, maximum=1024, value=512, step=8 |
|
) |
|
width = gr.Slider( |
|
label="Width", minimum=256, maximum=1024, value=512, step=0.1 |
|
) |
|
num_inference_steps = gr.Slider( |
|
label="num_inference_steps", minimum=10, maximum=100, value=25, step=1 |
|
) |
|
|
|
|
|
inputs = [text, negative, guidance_scale, height, width, num_inference_steps, scheduler_name] |
|
|
|
|
|
negative.submit(infer, inputs=inputs, outputs=gallery) |
|
text.submit(infer, inputs=inputs, outputs=gallery) |
|
btn.click(infer, inputs=inputs, outputs=gallery) |
|
|
|
|
|
gr.HTML( |
|
""" |
|
<div class="footer"> |
|
<p>Model by <a href="https://www.paddlepaddle.org.cn/" style="text-decoration: underline;" target="_blank">PaddlePaddle</a> - Gradio Demo by 🤗 Hugging Face |
|
</p> |
|
</div> |
|
<div class="acknowledgments"> |
|
<p><h4>LICENSE</h4> |
|
The model is licensed with a <a href="https://huggingface.co/stabilityai/stable-diffusion-2/blob/main/LICENSE-MODEL" style="text-decoration: underline;" target="_blank">CreativeML OpenRAIL++</a> license. The authors claim no rights on the outputs you generate, you are free to use them and are accountable for their use which must not go against the provisions set in this license. The license forbids you from sharing any content that violates any laws, produce any harm to a person, disseminate any personal information that would be meant for harm, spread misinformation and target vulnerable groups. For the full list of restrictions please <a href="https://huggingface.co/spaces/CompVis/stable-diffusion-license" target="_blank" style="text-decoration: underline;" target="_blank">read the license</a></p> |
|
<p><h4>Biases and content acknowledgment</h4> |
|
Despite how impressive being able to turn text into image is, beware to the fact that this model may output content that reinforces or exacerbates societal biases, as well as realistic faces, pornography and violence. The model was trained on the <a href="https://laion.ai/blog/laion-5b/" style="text-decoration: underline;" target="_blank">LAION-5B dataset</a>, which scraped non-curated image-text-pairs from the internet (the exception being the removal of illegal content) and is meant for research purposes. You can read more in the <a href="https://huggingface.co/CompVis/stable-diffusion-v1-4" style="text-decoration: underline;" target="_blank">model card</a></p> |
|
</div> |
|
""" |
|
) |
|
|
|
block.launch(server_name="0.0.0.0", server_port=8221) |
|
|
|
|