import argparse import json import os import shutil from tempfile import TemporaryDirectory from typing import List, Optional from huggingface_hub import CommitInfo, CommitOperationAdd, Discussion, HfApi, hf_hub_download from huggingface_hub.file_download import repo_folder_name class AlreadyExists(Exception): pass def is_index_stable_diffusion_like(config_dict): if "_class_name" not in config_dict: return False compatible_classes = [ "AltDiffusionImg2ImgPipeline", "AltDiffusionPipeline", "CycleDiffusionPipeline", "StableDiffusionImageVariationPipeline", "StableDiffusionImg2ImgPipeline", "StableDiffusionInpaintPipeline", "StableDiffusionInpaintPipelineLegacy", "StableDiffusionPipeline", "StableDiffusionPipelineSafe", "StableDiffusionUpscalePipeline", "VersatileDiffusionDualGuidedPipeline", "VersatileDiffusionImageVariationPipeline", "VersatileDiffusionPipeline", "VersatileDiffusionTextToImagePipeline", "OnnxStableDiffusionImg2ImgPipeline", "OnnxStableDiffusionInpaintPipeline", "OnnxStableDiffusionInpaintPipelineLegacy", "OnnxStableDiffusionPipeline", "StableDiffusionOnnxPipeline", "FlaxStableDiffusionPipeline", ] return config_dict["_class_name"] in compatible_classes def convert_single(model_id: str, folder: str) -> List["CommitOperationAdd"]: config_file = "unet/config.json" os.makedirs(os.path.join(folder, "unet"), exist_ok=True) model_index_file = hf_hub_download(repo_id=model_id, filename="model_index.json") with open(model_index_file, "r") as f: index_dict = json.load(f) if not is_index_stable_diffusion_like(index_dict): print(f"{model_id} is not of type stable diffusion.") return False, False old_config_file = hf_hub_download(repo_id=model_id, filename=config_file) new_config_file = os.path.join(folder, config_file) success = convert_file(old_config_file, new_config_file) if success: operations = [CommitOperationAdd(path_in_repo=config_file, path_or_fileobj=new_config_file)] model_type = success return operations, model_type else: return False, False def convert_file( old_config: str, new_config: str, ): with open(old_config, "r") as f: old_dict = json.load(f) is_stable_diffusion = "down_block_types" in old_dict and list(old_dict["down_block_types"]) == ["CrossAttnDownBlock2D", "CrossAttnDownBlock2D", "CrossAttnDownBlock2D", "DownBlock2D"] is_stable_diffusion_1 = is_stable_diffusion and ("use_linear_projection" not in old_dict or old_dict["use_linear_projection"] is False) is_stable_diffusion_2 = is_stable_diffusion and ("use_linear_projection" in old_dict and old_dict["use_linear_projection"] is True) if not is_stable_diffusion_1 and not is_stable_diffusion_2: print("No matching config") return False if is_stable_diffusion_1: if old_dict["sample_size"] == 64: print("Dict correct") return False print("Correct stable diffusion 1") old_dict["sample_size"] = 64 if is_stable_diffusion_2: if old_dict["sample_size"] == 96: print("Dict correct") return False print("Correct stable diffusion 2") old_dict["sample_size"] = 96 with open(new_config, 'w') as f: json_str = json.dumps(old_dict, indent=2, sort_keys=True) + "\n" f.write(json_str) return "Stable Diffusion 1" if is_stable_diffusion_1 else "Stable Diffusion 2" def previous_pr(api: "HfApi", model_id: str, pr_title: str) -> Optional["Discussion"]: try: discussions = api.get_repo_discussions(repo_id=model_id) except Exception: return None for discussion in discussions: if discussion.status == "open" and discussion.is_pull_request and discussion.title == pr_title: return discussion def convert(api: "HfApi", model_id: str, force: bool = False) -> Optional["CommitInfo"]: pr_title = "Correct `sample_size` of {}'s unet to have correct width and height default" info = api.model_info(model_id) filenames = set(s.rfilename for s in info.siblings) if "unet/config.json" not in filenames: print(f"Model: {model_id} has no 'unet/config.json' file to change") return if "vae/config.json" not in filenames: print(f"Model: {model_id} has no 'vae/config.json' file to change") return with TemporaryDirectory() as d: folder = os.path.join(d, repo_folder_name(repo_id=model_id, repo_type="models")) os.makedirs(folder) new_pr = None try: operations = None pr = previous_pr(api, model_id, pr_title) if pr is not None and not force: url = f"https://huggingface.co/{model_id}/discussions/{pr.num}" new_pr = pr raise AlreadyExists(f"Model {model_id} already has an open PR check out {url}") else: operations, model_type = convert_single(model_id, folder) if operations: pr_title = pr_title.format(model_type) if model_type == "Stable Diffusion 1": sample_size = 64 image_size = 512 elif model_type == "Stable Diffusion 2": sample_size = 96 image_size = 768 pr_description = ( f"Since `diffusers==0.9.0` the width and height is automatically inferred from the `sample_size` attribute of your unet's config. It seems like your diffusion model has the same architecture as {model_type} which means that when using this model, by default an image size of {image_size}x{image_size} should be generated. This in turn means the unet's sample size should be **{sample_size}**. \n\n In order to suppress to update your configuration on the fly and to suppress the deprecation warning added in this PR: https://github.com/huggingface/diffusers/pull/1406/files#r1035703505 it is strongly recommended to merge this PR." ) new_pr = api.create_commit( repo_id=model_id, operations=operations, commit_message=pr_title, commit_description=pr_description, create_pr=True, ) print(f"Pr created at {new_pr.pr_url}") else: print(f"No files to convert for {model_id}") finally: shutil.rmtree(folder) return new_pr if __name__ == "__main__": DESCRIPTION = """ Simple utility tool to convert automatically some weights on the hub to `safetensors` format. It is PyTorch exclusive for now. It works by downloading the weights (PT), converting them locally, and uploading them back as a PR on the hub. """ parser = argparse.ArgumentParser(description=DESCRIPTION) parser.add_argument( "model_id", type=str, help="The name of the model on the hub to convert. E.g. `gpt2` or `facebook/wav2vec2-base-960h`", ) parser.add_argument( "--force", action="store_true", help="Create the PR even if it already exists of if the model was already converted.", ) args = parser.parse_args() model_id = args.model_id api = HfApi() convert(api, model_id, force=args.force)