Spaces:
Running
on
L40S
Running
on
L40S
# Copyright (c) Facebook, Inc. and its affiliates. | |
import random | |
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple | |
import torch | |
from torch import nn | |
SampledData = Any | |
ModelOutput = Any | |
def _grouper(iterable: Iterable[Any], n: int, fillvalue=None) -> Iterator[Tuple[Any]]: | |
""" | |
Group elements of an iterable by chunks of size `n`, e.g. | |
grouper(range(9), 4) -> | |
(0, 1, 2, 3), (4, 5, 6, 7), (8, None, None, None) | |
""" | |
it = iter(iterable) | |
while True: | |
values = [] | |
for _ in range(n): | |
try: | |
value = next(it) | |
except StopIteration: | |
if values: | |
values.extend([fillvalue] * (n - len(values))) | |
yield tuple(values) | |
return | |
values.append(value) | |
yield tuple(values) | |
class ScoreBasedFilter: | |
""" | |
Filters entries in model output based on their scores | |
Discards all entries with score less than the specified minimum | |
""" | |
def __init__(self, min_score: float = 0.8): | |
self.min_score = min_score | |
def __call__(self, model_output: ModelOutput) -> ModelOutput: | |
for model_output_i in model_output: | |
instances = model_output_i["instances"] | |
if not instances.has("scores"): | |
continue | |
instances_filtered = instances[instances.scores >= self.min_score] | |
model_output_i["instances"] = instances_filtered | |
return model_output | |
class InferenceBasedLoader: | |
""" | |
Data loader based on results inferred by a model. Consists of: | |
- a data loader that provides batches of images | |
- a model that is used to infer the results | |
- a data sampler that converts inferred results to annotations | |
""" | |
def __init__( | |
self, | |
model: nn.Module, | |
data_loader: Iterable[List[Dict[str, Any]]], | |
data_sampler: Optional[Callable[[ModelOutput], List[SampledData]]] = None, | |
data_filter: Optional[Callable[[ModelOutput], ModelOutput]] = None, | |
shuffle: bool = True, | |
batch_size: int = 4, | |
inference_batch_size: int = 4, | |
drop_last: bool = False, | |
category_to_class_mapping: Optional[dict] = None, | |
): | |
""" | |
Constructor | |
Args: | |
model (torch.nn.Module): model used to produce data | |
data_loader (Iterable[List[Dict[str, Any]]]): iterable that provides | |
dictionaries with "images" and "categories" fields to perform inference on | |
data_sampler (Callable: ModelOutput -> SampledData): functor | |
that produces annotation data from inference results; | |
(optional, default: None) | |
data_filter (Callable: ModelOutput -> ModelOutput): filter | |
that selects model outputs for further processing | |
(optional, default: None) | |
shuffle (bool): if True, the input images get shuffled | |
batch_size (int): batch size for the produced annotation data | |
inference_batch_size (int): batch size for input images | |
drop_last (bool): if True, drop the last batch if it is undersized | |
category_to_class_mapping (dict): category to class mapping | |
""" | |
self.model = model | |
self.model.eval() | |
self.data_loader = data_loader | |
self.data_sampler = data_sampler | |
self.data_filter = data_filter | |
self.shuffle = shuffle | |
self.batch_size = batch_size | |
self.inference_batch_size = inference_batch_size | |
self.drop_last = drop_last | |
if category_to_class_mapping is not None: | |
self.category_to_class_mapping = category_to_class_mapping | |
else: | |
self.category_to_class_mapping = {} | |
def __iter__(self) -> Iterator[List[SampledData]]: | |
for batch in self.data_loader: | |
# batch : List[Dict[str: Tensor[N, C, H, W], str: Optional[str]]] | |
# images_batch : Tensor[N, C, H, W] | |
# image : Tensor[C, H, W] | |
images_and_categories = [ | |
{"image": image, "category": category} | |
for element in batch | |
for image, category in zip(element["images"], element["categories"]) | |
] | |
if not images_and_categories: | |
continue | |
if self.shuffle: | |
random.shuffle(images_and_categories) | |
yield from self._produce_data(images_and_categories) # pyre-ignore[6] | |
def _produce_data( | |
self, images_and_categories: List[Tuple[torch.Tensor, Optional[str]]] | |
) -> Iterator[List[SampledData]]: | |
""" | |
Produce batches of data from images | |
Args: | |
images_and_categories (List[Tuple[torch.Tensor, Optional[str]]]): | |
list of images and corresponding categories to process | |
Returns: | |
Iterator over batches of data sampled from model outputs | |
""" | |
data_batches: List[SampledData] = [] | |
category_to_class_mapping = self.category_to_class_mapping | |
batched_images_and_categories = _grouper(images_and_categories, self.inference_batch_size) | |
for batch in batched_images_and_categories: | |
batch = [ | |
{ | |
"image": image_and_category["image"].to(self.model.device), | |
"category": image_and_category["category"], | |
} | |
for image_and_category in batch | |
if image_and_category is not None | |
] | |
if not batch: | |
continue | |
with torch.no_grad(): | |
model_output = self.model(batch) | |
for model_output_i, batch_i in zip(model_output, batch): | |
assert len(batch_i["image"].shape) == 3 | |
model_output_i["image"] = batch_i["image"] | |
instance_class = category_to_class_mapping.get(batch_i["category"], 0) | |
model_output_i["instances"].dataset_classes = torch.tensor( | |
[instance_class] * len(model_output_i["instances"]) | |
) | |
model_output_filtered = ( | |
model_output if self.data_filter is None else self.data_filter(model_output) | |
) | |
data = ( | |
model_output_filtered | |
if self.data_sampler is None | |
else self.data_sampler(model_output_filtered) | |
) | |
for data_i in data: | |
if len(data_i["instances"]): | |
data_batches.append(data_i) | |
if len(data_batches) >= self.batch_size: | |
yield data_batches[: self.batch_size] | |
data_batches = data_batches[self.batch_size :] | |
if not self.drop_last and data_batches: | |
yield data_batches | |