|
from distutils.version import LooseVersion |
|
from types import MethodType |
|
from typing import List, Optional, Tuple, Union |
|
import warnings |
|
|
|
import torch |
|
from torch import nn |
|
import torch.nn.functional as F |
|
|
|
from timm.models.registry import register_model |
|
from timm.data.constants import IMAGENET_DEFAULT_MEAN, IMAGENET_DEFAULT_STD |
|
|
|
from .forward_intermediates import forward_intermediates |
|
from .input_conditioner import InputConditioner |
|
|
|
_has_torch_sdpa = hasattr(F, 'scaled_dot_product_attention') |
|
|
|
|
|
class PaliGemmaWrapper(nn.Module): |
|
def __init__(self, vis_model: nn.Module, embed_dim: int): |
|
super().__init__() |
|
|
|
self.vis_model = vis_model |
|
self.embed_dim = embed_dim |
|
|
|
@property |
|
def patch_size(self): |
|
return self.vis_model.embeddings.patch_size |
|
|
|
@property |
|
def blocks(self): |
|
return self.vis_model.encoder.layers |
|
|
|
@property |
|
def embed_dim(self): |
|
return self.vis_model.embeddings.embed_dim |
|
|
|
def forward(self, x: torch.Tensor): |
|
outputs = self.vis_model( |
|
x, |
|
return_dict=False, |
|
interpolate_pos_encoding=True, |
|
) |
|
|
|
features = outputs[0].to(torch.float32) |
|
|
|
summary = features.mean(dim=1) |
|
|
|
return summary, features |
|
|
|
def forward_features(self, x: torch.Tensor): |
|
return self(x) |
|
|
|
|
|
def _get_paligemma_model(repo: str, embed_dim: int = None, dtype: torch.dtype = torch.bfloat16): |
|
from transformers import PaliGemmaForConditionalGeneration, __version__ as tx_version |
|
|
|
if LooseVersion(tx_version) > LooseVersion('4.44.2'): |
|
warnings.warn(f'Your transformers version "{tx_version}" is higher than 4.44.2, and for whatever reason, PaliGemma might be broken.') |
|
|
|
extra_args = dict() |
|
|
|
if dtype is not None: |
|
extra_args['torch_dtype'] = dtype |
|
rev = str(dtype).split('.')[-1] |
|
extra_args['revision'] = rev |
|
|
|
model = PaliGemmaForConditionalGeneration.from_pretrained(repo, **extra_args) |
|
|
|
vis_model = model.vision_tower.vision_model |
|
|
|
vis_model = PaliGemmaWrapper(vis_model, embed_dim) |
|
|
|
return vis_model |
|
|
|
@register_model |
|
def paligemma_896_student(**kwargs): |
|
model = _get_paligemma_model('google/paligemma-3b-pt-896', embed_dim=1152, dtype=None) |
|
|
|
return model |
|
|
|
|
|
def dv2_sdpa(self, x: torch.Tensor) -> torch.Tensor: |
|
B, N, C = x.shape |
|
qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4) |
|
|
|
q, k, v = qkv[0], qkv[1], qkv[2] |
|
x = F.scaled_dot_product_attention( |
|
q, k, v, |
|
is_causal=False, |
|
dropout_p=self.attn_drop.p if self.training else 0., |
|
scale=self.scale, |
|
) |
|
x = x.transpose(1, 2).reshape(B, N, C) |
|
x = self.proj(x) |
|
x = self.proj_drop(x) |
|
return x |
|
|
|
def _load_dino_v2(dino_v2_model, cache_dir: Optional[str] = None, pretrained=True, **kwargs): |
|
if cache_dir: |
|
torch.hub.set_dir(cache_dir) |
|
model: nn.Module = torch.hub.load( |
|
'facebookresearch/dinov2', |
|
dino_v2_model, |
|
pretrained=pretrained, |
|
|
|
) |
|
|
|
if _has_torch_sdpa: |
|
for n, m in model.named_modules(): |
|
if n.endswith('.attn'): |
|
m.forward = MethodType(dv2_sdpa, m) |
|
|
|
return model |
|
|
|
class DinoWrapper(nn.Module): |
|
def __init__(self, dino_model: nn.Module): |
|
super().__init__() |
|
|
|
self.inner = dino_model |
|
dino_model.blocks = nn.Sequential(*dino_model.blocks) |
|
|
|
@property |
|
def embed_dim(self): |
|
return self.inner.embed_dim |
|
|
|
@property |
|
def patch_size(self): |
|
return self.inner.patch_size |
|
|
|
@property |
|
def num_cls_tokens(self): |
|
return getattr(self.inner, 'num_tokens', 1) |
|
|
|
@property |
|
def num_registers(self): |
|
return getattr(self.inner, 'num_register_tokens', 0) |
|
|
|
@property |
|
def num_summary_tokens(self): |
|
return self.num_cls_tokens + self.num_registers |
|
|
|
@property |
|
def blocks(self): |
|
return self.inner.blocks |
|
|
|
def forward(self, *args, **kwargs) -> Tuple[torch.Tensor, torch.Tensor]: |
|
parts = self.inner.forward_features(*args, **kwargs) |
|
|
|
cls_token = parts['x_norm_clstoken'] |
|
features = parts['x_norm_patchtokens'] |
|
|
|
return cls_token, features |
|
|
|
def forward_features(self, x: torch.Tensor): |
|
x = self.inner.prepare_tokens_with_masks(x) |
|
x = self.inner.blocks(x) |
|
x_norm = self.inner.norm(x) |
|
|
|
return x_norm[:, 0], x_norm[:, self.num_summary_tokens:] |
|
|
|
def patchify(self, x: torch.Tensor) -> torch.Tensor: |
|
return self.inner.prepare_tokens_with_masks(x) |
|
|
|
def forward_intermediates(self, |
|
x: torch.Tensor, |
|
norm: bool = False, |
|
**kwargs, |
|
) -> Union[List[torch.Tensor], Tuple[torch.Tensor, List[torch.Tensor]]]: |
|
return forward_intermediates( |
|
self, |
|
patch_extractor=self.inner.prepare_tokens_with_masks, |
|
num_summary_tokens=self.num_summary_tokens, |
|
num_cls_tokens=self.num_cls_tokens, |
|
norm=self.inner.norm if norm else lambda y: y, |
|
x=x, |
|
**kwargs, |
|
) |
|
|
|
|
|
def _dino_student(arch: str, **kwargs): |
|
from . import dinov2_arch |
|
|
|
factory = getattr(dinov2_arch, arch) |
|
model = factory() |
|
|
|
model = DinoWrapper(model) |
|
|
|
conditioner = InputConditioner( |
|
input_scale=1.0, |
|
norm_mean=IMAGENET_DEFAULT_MEAN, |
|
norm_std=IMAGENET_DEFAULT_STD, |
|
) |
|
|
|
model.input_conditioner = conditioner |
|
|
|
return model |
|
|
|
|
|
@register_model |
|
def dino_v2_l_student(**kwargs): |
|
return _dino_student('dinov2_vitl14_reg', **kwargs) |
|
|
|
@register_model |
|
def dino_v2_g_student(**kwargs): |
|
return _dino_student('dinov2_vitg14_reg', **kwargs) |
|
|