File size: 7,684 Bytes
2d3bbc7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# Copyright (c) 2023-2024, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple
from typing import Callable, Dict, Optional, List, Union

from timm.models import VisionTransformer
import torch
from torch import nn
from transformers import PretrainedConfig, PreTrainedModel


from .common import RESOURCE_MAP, DEFAULT_VERSION

# Import all required modules.
from .adaptor_base import AdaptorBase, RadioOutput, AdaptorInput
from .adaptor_generic import GenericAdaptor, AdaptorBase
from .adaptor_mlp import create_mlp_from_config
from .adaptor_registry import adaptor_registry
from .cls_token import ClsToken
from .dinov2_arch import dinov2_vitg14_reg
from .enable_cpe_support import enable_cpe
from .enable_spectral_reparam import configure_spectral_reparam_from_args
from .eradio_model import eradio
from .feature_normalizer import FeatureNormalizer, IntermediateFeatureNormalizer
from .forward_intermediates import forward_intermediates
from .radio_model import create_model_from_args
from .radio_model import RADIOModel as RADIOModelBase, Resolution
from .input_conditioner import get_default_conditioner, InputConditioner
from .open_clip_adaptor import OpenCLIP_RADIO
from .vit_patch_generator import ViTPatchGenerator
from .vitdet import apply_vitdet_arch, VitDetArgs

# Register extra models
from .extra_timm_models import *
from .extra_models import *


class RADIOConfig(PretrainedConfig):
    """Pretrained Hugging Face configuration for RADIO models."""

    def __init__(
        self,
        args: Optional[dict] = None,
        version: Optional[str] = DEFAULT_VERSION,
        patch_size: Optional[int] = None,
        max_resolution: Optional[int] = None,
        preferred_resolution: Optional[Resolution] = None,
        adaptor_names: Union[str, List[str]] = None,
        adaptor_configs: Dict[str, Dict[str, int]] = None,
        vitdet_window_size: Optional[int] = None,
        feature_normalizer_config: Optional[dict] = None,
        inter_feature_normalizer_config: Optional[dict] = None,
        **kwargs,
    ):
        self.args = args
        for field in ["dtype", "amp_dtype"]:
            if self.args is not None and field in self.args:
                # Convert to a string in order to make it serializable.
                # For example for torch.float32 we will store "float32",
                # for "bfloat16" we will store "bfloat16".
                self.args[field] = str(args[field]).split(".")[-1]
        self.version = version
        resource = RESOURCE_MAP[version]
        self.patch_size = patch_size or resource.patch_size
        self.max_resolution = max_resolution or resource.max_resolution
        self.preferred_resolution = (
            preferred_resolution or resource.preferred_resolution
        )
        self.adaptor_names = adaptor_names
        self.adaptor_configs = adaptor_configs
        self.vitdet_window_size = vitdet_window_size
        self.feature_normalizer_config = feature_normalizer_config
        self.inter_feature_normalizer_config = inter_feature_normalizer_config
        super().__init__(**kwargs)



class RADIOModel(PreTrainedModel):
    """Pretrained Hugging Face model for RADIO.

    This class inherits from PreTrainedModel, which provides
    HuggingFace's functionality for loading and saving models.
    """

    config_class = RADIOConfig

    def __init__(self, config: RADIOConfig):
        super().__init__(config)

        RADIOArgs = namedtuple("RADIOArgs", config.args.keys())
        args = RADIOArgs(**config.args)
        self.config = config

        model = create_model_from_args(args)
        input_conditioner: InputConditioner = get_default_conditioner()

        dtype = getattr(args, "dtype", torch.float32)
        if isinstance(dtype, str):
            # Convert the dtype's string representation back to a dtype.
            dtype = getattr(torch, dtype)
        model.to(dtype=dtype)
        input_conditioner.dtype = dtype

        summary_idxs = torch.tensor(
            [i for i, t in enumerate(args.teachers) if t.get("use_summary", True)],
            dtype=torch.int64,
        )

        adaptor_configs = config.adaptor_configs
        adaptor_names = config.adaptor_names or []

        adaptors = dict()
        for adaptor_name in adaptor_names:
            mlp_config = adaptor_configs[adaptor_name]
            adaptor = GenericAdaptor(args, None, None, mlp_config)
            adaptor.head_idx = mlp_config["head_idx"]
            adaptors[adaptor_name] = adaptor

        feature_normalizer = None
        if config.feature_normalizer_config is not None:
            # Actual normalization values will be restored when loading checkpoint weights.
            feature_normalizer = FeatureNormalizer(config.feature_normalizer_config["embed_dim"])

        inter_feature_normalizer = None
        if config.inter_feature_normalizer_config is not None:
            inter_feature_normalizer = IntermediateFeatureNormalizer(
                config.inter_feature_normalizer_config["num_intermediates"],
                config.inter_feature_normalizer_config["embed_dim"],
                rot_per_layer=config.inter_feature_normalizer_config["rot_per_layer"],
                dtype=dtype)

        self.radio_model = RADIOModelBase(
            model,
            input_conditioner,
            summary_idxs=summary_idxs,
            patch_size=config.patch_size,
            max_resolution=config.max_resolution,
            window_size=config.vitdet_window_size,
            preferred_resolution=config.preferred_resolution,
            adaptors=adaptors,
            feature_normalizer=feature_normalizer,
            inter_feature_normalizer=inter_feature_normalizer,
        )

    @property
    def adaptors(self) -> nn.ModuleDict:
        return self.radio_model.adaptors

    @property
    def model(self) -> VisionTransformer:
        return self.radio_model.model

    @property
    def input_conditioner(self) -> InputConditioner:
        return self.radio_model.input_conditioner

    @property
    def num_summary_tokens(self) -> int:
        return self.radio_model.num_summary_tokens

    @property
    def patch_size(self) -> int:
        return self.radio_model.patch_size

    @property
    def max_resolution(self) -> int:
        return self.radio_model.max_resolution

    @property
    def preferred_resolution(self) -> Resolution:
        return self.radio_model.preferred_resolution

    @property
    def window_size(self) -> int:
        return self.radio_model.window_size

    @property
    def min_resolution_step(self) -> int:
        return self.radio_model.min_resolution_step

    def make_preprocessor_external(self) -> Callable[[torch.Tensor], torch.Tensor]:
        return self.radio_model.make_preprocessor_external()

    def get_nearest_supported_resolution(self, height: int, width: int) -> Resolution:
        return self.radio_model.get_nearest_supported_resolution(height, width)

    def switch_to_deploy(self):
        return self.radio_model.switch_to_deploy()

    def forward(self, x: torch.Tensor):
        return self.radio_model.forward(x)