Chris Xiao
upload files
c642393
# Copyright 2020 Division of Medical Image Computing, German Cancer Research Center (DKFZ), Heidelberg, Germany
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from batchgenerators.augmentations.utils import pad_nd_image
from nnunet.utilities.random_stuff import no_op
from nnunet.utilities.to_torch import to_cuda, maybe_to_torch
from torch import nn
import torch
from scipy.ndimage.filters import gaussian_filter
from typing import Union, Tuple, List
from torch.cuda.amp import autocast
class NeuralNetwork(nn.Module):
def __init__(self):
super(NeuralNetwork, self).__init__()
def get_device(self):
if next(self.parameters()).device.type == "cpu":
return "cpu"
else:
return next(self.parameters()).device.index
def set_device(self, device):
if device == "cpu":
self.cpu()
else:
self.cuda(device)
def forward(self, x):
raise NotImplementedError
class SegmentationNetwork(NeuralNetwork):
def __init__(self):
super(NeuralNetwork, self).__init__()
# if we have 5 pooling then our patch size must be divisible by 2**5
self.input_shape_must_be_divisible_by = None # for example in a 2d network that does 5 pool in x and 6 pool
# in y this would be (32, 64)
# we need to know this because we need to know if we are a 2d or a 3d netowrk
self.conv_op = None # nn.Conv2d or nn.Conv3d
# this tells us how many channels we have in the output. Important for preallocation in inference
self.num_classes = None # number of channels in the output
# depending on the loss, we do not hard code a nonlinearity into the architecture. To aggregate predictions
# during inference, we need to apply the nonlinearity, however. So it is important to let the newtork know what
# to apply in inference. For the most part this will be softmax
self.inference_apply_nonlin = lambda x: x # softmax_helper
# This is for saving a gaussian importance map for inference. It weights voxels higher that are closer to the
# center. Prediction at the borders are often less accurate and are thus downweighted. Creating these Gaussians
# can be expensive, so it makes sense to save and reuse them.
self._gaussian_3d = self._patch_size_for_gaussian_3d = None
self._gaussian_2d = self._patch_size_for_gaussian_2d = None
def predict_3D(self, x: np.ndarray, do_mirroring: bool, mirror_axes: Tuple[int, ...] = (0, 1, 2),
use_sliding_window: bool = False,
step_size: float = 0.5, patch_size: Tuple[int, ...] = None, regions_class_order: Tuple[int, ...] = None,
use_gaussian: bool = False, pad_border_mode: str = "constant",
pad_kwargs: dict = None, all_in_gpu: bool = False,
verbose: bool = True, mixed_precision: bool = True) -> Tuple[np.ndarray, np.ndarray]:
"""
Use this function to predict a 3D image. It does not matter whether the network is a 2D or 3D U-Net, it will
detect that automatically and run the appropriate code.
When running predictions, you need to specify whether you want to run fully convolutional of sliding window
based inference. We very strongly recommend you use sliding window with the default settings.
It is the responsibility of the user to make sure the network is in the proper mode (eval for inference!). If
the network is not in eval mode it will print a warning.
:param x: Your input data. Must be a nd.ndarray of shape (c, x, y, z).
:param do_mirroring: If True, use test time data augmentation in the form of mirroring
:param mirror_axes: Determines which axes to use for mirroing. Per default, mirroring is done along all three
axes
:param use_sliding_window: if True, run sliding window prediction. Heavily recommended! This is also the default
:param step_size: When running sliding window prediction, the step size determines the distance between adjacent
predictions. The smaller the step size, the denser the predictions (and the longer it takes!). Step size is given
as a fraction of the patch_size. 0.5 is the default and means that wen advance by patch_size * 0.5 between
predictions. step_size cannot be larger than 1!
:param patch_size: The patch size that was used for training the network. Do not use different patch sizes here,
this will either crash or give potentially less accurate segmentations
:param regions_class_order: Fabian only
:param use_gaussian: (Only applies to sliding window prediction) If True, uses a Gaussian importance weighting
to weigh predictions closer to the center of the current patch higher than those at the borders. The reason
behind this is that the segmentation accuracy decreases towards the borders. Default (and recommended): True
:param pad_border_mode: leave this alone
:param pad_kwargs: leave this alone
:param all_in_gpu: experimental. You probably want to leave this as is it
:param verbose: Do you want a wall of text? If yes then set this to True
:param mixed_precision: if True, will run inference in mixed precision with autocast()
:return:
"""
torch.cuda.empty_cache()
assert step_size <= 1, 'step_size must be smaller than 1. Otherwise there will be a gap between consecutive ' \
'predictions'
if verbose: print("debug: mirroring", do_mirroring, "mirror_axes", mirror_axes)
if pad_kwargs is None:
pad_kwargs = {'constant_values': 0}
# A very long time ago the mirror axes were (2, 3, 4) for a 3d network. This is just to intercept any old
# code that uses this convention
if len(mirror_axes):
if self.conv_op == nn.Conv2d:
if max(mirror_axes) > 1:
raise ValueError("mirror axes. duh")
if self.conv_op == nn.Conv3d:
if max(mirror_axes) > 2:
raise ValueError("mirror axes. duh")
if self.training:
print('WARNING! Network is in train mode during inference. This may be intended, or not...')
assert len(x.shape) == 4, "data must have shape (c,x,y,z)"
if mixed_precision:
context = autocast
else:
context = no_op
with context():
with torch.no_grad():
if self.conv_op == nn.Conv3d:
if use_sliding_window:
res = self._internal_predict_3D_3Dconv_tiled(x, step_size, do_mirroring, mirror_axes, patch_size,
regions_class_order, use_gaussian, pad_border_mode,
pad_kwargs=pad_kwargs, all_in_gpu=all_in_gpu,
verbose=verbose)
else:
res = self._internal_predict_3D_3Dconv(x, patch_size, do_mirroring, mirror_axes, regions_class_order,
pad_border_mode, pad_kwargs=pad_kwargs, verbose=verbose)
elif self.conv_op == nn.Conv2d:
if use_sliding_window:
res = self._internal_predict_3D_2Dconv_tiled(x, patch_size, do_mirroring, mirror_axes, step_size,
regions_class_order, use_gaussian, pad_border_mode,
pad_kwargs, all_in_gpu, False)
else:
res = self._internal_predict_3D_2Dconv(x, patch_size, do_mirroring, mirror_axes, regions_class_order,
pad_border_mode, pad_kwargs, all_in_gpu, False)
else:
raise RuntimeError("Invalid conv op, cannot determine what dimensionality (2d/3d) the network is")
return res
def predict_2D(self, x, do_mirroring: bool, mirror_axes: tuple = (0, 1, 2), use_sliding_window: bool = False,
step_size: float = 0.5, patch_size: tuple = None, regions_class_order: tuple = None,
use_gaussian: bool = False, pad_border_mode: str = "constant",
pad_kwargs: dict = None, all_in_gpu: bool = False,
verbose: bool = True, mixed_precision: bool = True) -> Tuple[np.ndarray, np.ndarray]:
"""
Use this function to predict a 2D image. If this is a 3D U-Net it will crash because you cannot predict a 2D
image with that (you dummy).
When running predictions, you need to specify whether you want to run fully convolutional of sliding window
based inference. We very strongly recommend you use sliding window with the default settings.
It is the responsibility of the user to make sure the network is in the proper mode (eval for inference!). If
the network is not in eval mode it will print a warning.
:param x: Your input data. Must be a nd.ndarray of shape (c, x, y).
:param do_mirroring: If True, use test time data augmentation in the form of mirroring
:param mirror_axes: Determines which axes to use for mirroing. Per default, mirroring is done along all three
axes
:param use_sliding_window: if True, run sliding window prediction. Heavily recommended! This is also the default
:param step_size: When running sliding window prediction, the step size determines the distance between adjacent
predictions. The smaller the step size, the denser the predictions (and the longer it takes!). Step size is given
as a fraction of the patch_size. 0.5 is the default and means that wen advance by patch_size * 0.5 between
predictions. step_size cannot be larger than 1!
:param patch_size: The patch size that was used for training the network. Do not use different patch sizes here,
this will either crash or give potentially less accurate segmentations
:param regions_class_order: Fabian only
:param use_gaussian: (Only applies to sliding window prediction) If True, uses a Gaussian importance weighting
to weigh predictions closer to the center of the current patch higher than those at the borders. The reason
behind this is that the segmentation accuracy decreases towards the borders. Default (and recommended): True
:param pad_border_mode: leave this alone
:param pad_kwargs: leave this alone
:param all_in_gpu: experimental. You probably want to leave this as is it
:param verbose: Do you want a wall of text? If yes then set this to True
:return:
"""
torch.cuda.empty_cache()
assert step_size <= 1, 'step_size must be smaler than 1. Otherwise there will be a gap between consecutive ' \
'predictions'
if self.conv_op == nn.Conv3d:
raise RuntimeError("Cannot predict 2d if the network is 3d. Dummy.")
if verbose: print("debug: mirroring", do_mirroring, "mirror_axes", mirror_axes)
if pad_kwargs is None:
pad_kwargs = {'constant_values': 0}
# A very long time ago the mirror axes were (2, 3) for a 2d network. This is just to intercept any old
# code that uses this convention
if len(mirror_axes):
if max(mirror_axes) > 1:
raise ValueError("mirror axes. duh")
if self.training:
print('WARNING! Network is in train mode during inference. This may be intended, or not...')
assert len(x.shape) == 3, "data must have shape (c,x,y)"
if mixed_precision:
context = autocast
else:
context = no_op
with context():
with torch.no_grad():
if self.conv_op == nn.Conv2d:
if use_sliding_window:
res = self._internal_predict_2D_2Dconv_tiled(x, step_size, do_mirroring, mirror_axes, patch_size,
regions_class_order, use_gaussian, pad_border_mode,
pad_kwargs, all_in_gpu, verbose)
else:
res = self._internal_predict_2D_2Dconv(x, patch_size, do_mirroring, mirror_axes, regions_class_order,
pad_border_mode, pad_kwargs, verbose)
else:
raise RuntimeError("Invalid conv op, cannot determine what dimensionality (2d/3d) the network is")
return res
@staticmethod
def _get_gaussian(patch_size, sigma_scale=1. / 8) -> np.ndarray:
tmp = np.zeros(patch_size)
center_coords = [i // 2 for i in patch_size]
sigmas = [i * sigma_scale for i in patch_size]
tmp[tuple(center_coords)] = 1
gaussian_importance_map = gaussian_filter(tmp, sigmas, 0, mode='constant', cval=0)
gaussian_importance_map = gaussian_importance_map / np.max(gaussian_importance_map) * 1
gaussian_importance_map = gaussian_importance_map.astype(np.float32)
# gaussian_importance_map cannot be 0, otherwise we may end up with nans!
gaussian_importance_map[gaussian_importance_map == 0] = np.min(
gaussian_importance_map[gaussian_importance_map != 0])
return gaussian_importance_map
@staticmethod
def _compute_steps_for_sliding_window(patch_size: Tuple[int, ...], image_size: Tuple[int, ...], step_size: float) -> List[List[int]]:
assert [i >= j for i, j in zip(image_size, patch_size)], "image size must be as large or larger than patch_size"
assert 0 < step_size <= 1, 'step_size must be larger than 0 and smaller or equal to 1'
# our step width is patch_size*step_size at most, but can be narrower. For example if we have image size of
# 110, patch size of 64 and step_size of 0.5, then we want to make 3 steps starting at coordinate 0, 23, 46
target_step_sizes_in_voxels = [i * step_size for i in patch_size]
num_steps = [int(np.ceil((i - k) / j)) + 1 for i, j, k in zip(image_size, target_step_sizes_in_voxels, patch_size)]
steps = []
for dim in range(len(patch_size)):
# the highest step value for this dimension is
max_step_value = image_size[dim] - patch_size[dim]
if num_steps[dim] > 1:
actual_step_size = max_step_value / (num_steps[dim] - 1)
else:
actual_step_size = 99999999999 # does not matter because there is only one step at 0
steps_here = [int(np.round(actual_step_size * i)) for i in range(num_steps[dim])]
steps.append(steps_here)
return steps
def _internal_predict_3D_3Dconv_tiled(self, x: np.ndarray, step_size: float, do_mirroring: bool, mirror_axes: tuple,
patch_size: tuple, regions_class_order: tuple, use_gaussian: bool,
pad_border_mode: str, pad_kwargs: dict, all_in_gpu: bool,
verbose: bool) -> Tuple[np.ndarray, np.ndarray]:
# better safe than sorry
assert len(x.shape) == 4, "x must be (c, x, y, z)"
if verbose: print("step_size:", step_size)
if verbose: print("do mirror:", do_mirroring)
assert patch_size is not None, "patch_size cannot be None for tiled prediction"
# for sliding window inference the image must at least be as large as the patch size. It does not matter
# whether the shape is divisible by 2**num_pool as long as the patch size is
data, slicer = pad_nd_image(x, patch_size, pad_border_mode, pad_kwargs, True, None)
data_shape = data.shape # still c, x, y, z
# compute the steps for sliding window
steps = self._compute_steps_for_sliding_window(patch_size, data_shape[1:], step_size)
num_tiles = len(steps[0]) * len(steps[1]) * len(steps[2])
if verbose:
print("data shape:", data_shape)
print("patch size:", patch_size)
print("steps (x, y, and z):", steps)
print("number of tiles:", num_tiles)
# we only need to compute that once. It can take a while to compute this due to the large sigma in
# gaussian_filter
if use_gaussian and num_tiles > 1:
if self._gaussian_3d is None or not all(
[i == j for i, j in zip(patch_size, self._patch_size_for_gaussian_3d)]):
if verbose: print('computing Gaussian')
gaussian_importance_map = self._get_gaussian(patch_size, sigma_scale=1. / 8)
self._gaussian_3d = gaussian_importance_map
self._patch_size_for_gaussian_3d = patch_size
if verbose: print("done")
else:
if verbose: print("using precomputed Gaussian")
gaussian_importance_map = self._gaussian_3d
gaussian_importance_map = torch.from_numpy(gaussian_importance_map)
#predict on cpu if cuda not available
if torch.cuda.is_available():
gaussian_importance_map = gaussian_importance_map.cuda(self.get_device(), non_blocking=True)
else:
gaussian_importance_map = None
if all_in_gpu:
# If we run the inference in GPU only (meaning all tensors are allocated on the GPU, this reduces
# CPU-GPU communication but required more GPU memory) we need to preallocate a few things on GPU
if use_gaussian and num_tiles > 1:
# half precision for the outputs should be good enough. If the outputs here are half, the
# gaussian_importance_map should be as well
gaussian_importance_map = gaussian_importance_map.half()
# make sure we did not round anything to 0
gaussian_importance_map[gaussian_importance_map == 0] = gaussian_importance_map[
gaussian_importance_map != 0].min()
add_for_nb_of_preds = gaussian_importance_map
else:
add_for_nb_of_preds = torch.ones(patch_size, device=self.get_device())
if verbose: print("initializing result array (on GPU)")
aggregated_results = torch.zeros([self.num_classes] + list(data.shape[1:]), dtype=torch.half,
device=self.get_device())
if verbose: print("moving data to GPU")
data = torch.from_numpy(data).cuda(self.get_device(), non_blocking=True)
if verbose: print("initializing result_numsamples (on GPU)")
aggregated_nb_of_predictions = torch.zeros([self.num_classes] + list(data.shape[1:]), dtype=torch.half,
device=self.get_device())
else:
if use_gaussian and num_tiles > 1:
add_for_nb_of_preds = self._gaussian_3d
else:
add_for_nb_of_preds = np.ones(patch_size, dtype=np.float32)
aggregated_results = np.zeros([self.num_classes] + list(data.shape[1:]), dtype=np.float32)
aggregated_nb_of_predictions = np.zeros([self.num_classes] + list(data.shape[1:]), dtype=np.float32)
for x in steps[0]:
lb_x = x
ub_x = x + patch_size[0]
for y in steps[1]:
lb_y = y
ub_y = y + patch_size[1]
for z in steps[2]:
lb_z = z
ub_z = z + patch_size[2]
predicted_patch = self._internal_maybe_mirror_and_pred_3D(
data[None, :, lb_x:ub_x, lb_y:ub_y, lb_z:ub_z], mirror_axes, do_mirroring,
gaussian_importance_map)[0]
if all_in_gpu:
predicted_patch = predicted_patch.half()
else:
predicted_patch = predicted_patch.cpu().numpy()
aggregated_results[:, lb_x:ub_x, lb_y:ub_y, lb_z:ub_z] += predicted_patch
aggregated_nb_of_predictions[:, lb_x:ub_x, lb_y:ub_y, lb_z:ub_z] += add_for_nb_of_preds
# we reverse the padding here (remeber that we padded the input to be at least as large as the patch size
slicer = tuple(
[slice(0, aggregated_results.shape[i]) for i in
range(len(aggregated_results.shape) - (len(slicer) - 1))] + slicer[1:])
aggregated_results = aggregated_results[slicer]
aggregated_nb_of_predictions = aggregated_nb_of_predictions[slicer]
# computing the class_probabilities by dividing the aggregated result with result_numsamples
aggregated_results /= aggregated_nb_of_predictions
del aggregated_nb_of_predictions
if regions_class_order is None:
predicted_segmentation = aggregated_results.argmax(0)
else:
if all_in_gpu:
class_probabilities_here = aggregated_results.detach().cpu().numpy()
else:
class_probabilities_here = aggregated_results
predicted_segmentation = np.zeros(class_probabilities_here.shape[1:], dtype=np.float32)
for i, c in enumerate(regions_class_order):
predicted_segmentation[class_probabilities_here[i] > 0.5] = c
if all_in_gpu:
if verbose: print("copying results to CPU")
if regions_class_order is None:
predicted_segmentation = predicted_segmentation.detach().cpu().numpy()
aggregated_results = aggregated_results.detach().cpu().numpy()
if verbose: print("prediction done")
return predicted_segmentation, aggregated_results
def _internal_predict_2D_2Dconv(self, x: np.ndarray, min_size: Tuple[int, int], do_mirroring: bool,
mirror_axes: tuple = (0, 1, 2), regions_class_order: tuple = None,
pad_border_mode: str = "constant", pad_kwargs: dict = None,
verbose: bool = True) -> Tuple[np.ndarray, np.ndarray]:
"""
This one does fully convolutional inference. No sliding window
"""
assert len(x.shape) == 3, "x must be (c, x, y)"
assert self.input_shape_must_be_divisible_by is not None, 'input_shape_must_be_divisible_by must be set to ' \
'run _internal_predict_2D_2Dconv'
if verbose: print("do mirror:", do_mirroring)
data, slicer = pad_nd_image(x, min_size, pad_border_mode, pad_kwargs, True,
self.input_shape_must_be_divisible_by)
predicted_probabilities = self._internal_maybe_mirror_and_pred_2D(data[None], mirror_axes, do_mirroring,
None)[0]
slicer = tuple(
[slice(0, predicted_probabilities.shape[i]) for i in range(len(predicted_probabilities.shape) -
(len(slicer) - 1))] + slicer[1:])
predicted_probabilities = predicted_probabilities[slicer]
if regions_class_order is None:
predicted_segmentation = predicted_probabilities.argmax(0)
predicted_segmentation = predicted_segmentation.detach().cpu().numpy()
predicted_probabilities = predicted_probabilities.detach().cpu().numpy()
else:
predicted_probabilities = predicted_probabilities.detach().cpu().numpy()
predicted_segmentation = np.zeros(predicted_probabilities.shape[1:], dtype=np.float32)
for i, c in enumerate(regions_class_order):
predicted_segmentation[predicted_probabilities[i] > 0.5] = c
return predicted_segmentation, predicted_probabilities
def _internal_predict_3D_3Dconv(self, x: np.ndarray, min_size: Tuple[int, ...], do_mirroring: bool,
mirror_axes: tuple = (0, 1, 2), regions_class_order: tuple = None,
pad_border_mode: str = "constant", pad_kwargs: dict = None,
verbose: bool = True) -> Tuple[np.ndarray, np.ndarray]:
"""
This one does fully convolutional inference. No sliding window
"""
assert len(x.shape) == 4, "x must be (c, x, y, z)"
assert self.input_shape_must_be_divisible_by is not None, 'input_shape_must_be_divisible_by must be set to ' \
'run _internal_predict_3D_3Dconv'
if verbose: print("do mirror:", do_mirroring)
data, slicer = pad_nd_image(x, min_size, pad_border_mode, pad_kwargs, True,
self.input_shape_must_be_divisible_by)
predicted_probabilities = self._internal_maybe_mirror_and_pred_3D(data[None], mirror_axes, do_mirroring,
None)[0]
slicer = tuple(
[slice(0, predicted_probabilities.shape[i]) for i in range(len(predicted_probabilities.shape) -
(len(slicer) - 1))] + slicer[1:])
predicted_probabilities = predicted_probabilities[slicer]
if regions_class_order is None:
predicted_segmentation = predicted_probabilities.argmax(0)
predicted_segmentation = predicted_segmentation.detach().cpu().numpy()
predicted_probabilities = predicted_probabilities.detach().cpu().numpy()
else:
predicted_probabilities = predicted_probabilities.detach().cpu().numpy()
predicted_segmentation = np.zeros(predicted_probabilities.shape[1:], dtype=np.float32)
for i, c in enumerate(regions_class_order):
predicted_segmentation[predicted_probabilities[i] > 0.5] = c
return predicted_segmentation, predicted_probabilities
def _internal_maybe_mirror_and_pred_3D(self, x: Union[np.ndarray, torch.tensor], mirror_axes: tuple,
do_mirroring: bool = True,
mult: np.ndarray or torch.tensor = None) -> torch.tensor:
assert len(x.shape) == 5, 'x must be (b, c, x, y, z)'
# if cuda available:
# everything in here takes place on the GPU. If x and mult are not yet on GPU this will be taken care of here
# we now return a cuda tensor! Not numpy array!
x = maybe_to_torch(x)
result_torch = torch.zeros([1, self.num_classes] + list(x.shape[2:]),
dtype=torch.float)
if torch.cuda.is_available():
x = to_cuda(x, gpu_id=self.get_device())
result_torch = result_torch.cuda(self.get_device(), non_blocking=True)
if mult is not None:
mult = maybe_to_torch(mult)
if torch.cuda.is_available():
mult = to_cuda(mult, gpu_id=self.get_device())
if do_mirroring:
mirror_idx = 8
num_results = 2 ** len(mirror_axes)
else:
mirror_idx = 1
num_results = 1
for m in range(mirror_idx):
if m == 0:
pred = self.inference_apply_nonlin(self(x))
result_torch += 1 / num_results * pred
if m == 1 and (2 in mirror_axes):
pred = self.inference_apply_nonlin(self(torch.flip(x, (4, ))))
result_torch += 1 / num_results * torch.flip(pred, (4,))
if m == 2 and (1 in mirror_axes):
pred = self.inference_apply_nonlin(self(torch.flip(x, (3, ))))
result_torch += 1 / num_results * torch.flip(pred, (3,))
if m == 3 and (2 in mirror_axes) and (1 in mirror_axes):
pred = self.inference_apply_nonlin(self(torch.flip(x, (4, 3))))
result_torch += 1 / num_results * torch.flip(pred, (4, 3))
if m == 4 and (0 in mirror_axes):
pred = self.inference_apply_nonlin(self(torch.flip(x, (2, ))))
result_torch += 1 / num_results * torch.flip(pred, (2,))
if m == 5 and (0 in mirror_axes) and (2 in mirror_axes):
pred = self.inference_apply_nonlin(self(torch.flip(x, (4, 2))))
result_torch += 1 / num_results * torch.flip(pred, (4, 2))
if m == 6 and (0 in mirror_axes) and (1 in mirror_axes):
pred = self.inference_apply_nonlin(self(torch.flip(x, (3, 2))))
result_torch += 1 / num_results * torch.flip(pred, (3, 2))
if m == 7 and (0 in mirror_axes) and (1 in mirror_axes) and (2 in mirror_axes):
pred = self.inference_apply_nonlin(self(torch.flip(x, (4, 3, 2))))
result_torch += 1 / num_results * torch.flip(pred, (4, 3, 2))
if mult is not None:
result_torch[:, :] *= mult
return result_torch
def _internal_maybe_mirror_and_pred_2D(self, x: Union[np.ndarray, torch.tensor], mirror_axes: tuple,
do_mirroring: bool = True,
mult: np.ndarray or torch.tensor = None) -> torch.tensor:
# if cuda available:
# everything in here takes place on the GPU. If x and mult are not yet on GPU this will be taken care of here
# we now return a cuda tensor! Not numpy array!
assert len(x.shape) == 4, 'x must be (b, c, x, y)'
x = maybe_to_torch(x)
result_torch = torch.zeros([x.shape[0], self.num_classes] + list(x.shape[2:]), dtype=torch.float)
if torch.cuda.is_available():
x = to_cuda(x, gpu_id=self.get_device())
result_torch = result_torch.cuda(self.get_device(), non_blocking=True)
if mult is not None:
mult = maybe_to_torch(mult)
if torch.cuda.is_available():
mult = to_cuda(mult, gpu_id=self.get_device())
if do_mirroring:
mirror_idx = 4
num_results = 2 ** len(mirror_axes)
else:
mirror_idx = 1
num_results = 1
for m in range(mirror_idx):
if m == 0:
pred = self.inference_apply_nonlin(self(x))
result_torch += 1 / num_results * pred
if m == 1 and (1 in mirror_axes):
pred = self.inference_apply_nonlin(self(torch.flip(x, (3, ))))
result_torch += 1 / num_results * torch.flip(pred, (3, ))
if m == 2 and (0 in mirror_axes):
pred = self.inference_apply_nonlin(self(torch.flip(x, (2, ))))
result_torch += 1 / num_results * torch.flip(pred, (2, ))
if m == 3 and (0 in mirror_axes) and (1 in mirror_axes):
pred = self.inference_apply_nonlin(self(torch.flip(x, (3, 2))))
result_torch += 1 / num_results * torch.flip(pred, (3, 2))
if mult is not None:
result_torch[:, :] *= mult
return result_torch
def _internal_predict_2D_2Dconv_tiled(self, x: np.ndarray, step_size: float, do_mirroring: bool, mirror_axes: tuple,
patch_size: tuple, regions_class_order: tuple, use_gaussian: bool,
pad_border_mode: str, pad_kwargs: dict, all_in_gpu: bool,
verbose: bool) -> Tuple[np.ndarray, np.ndarray]:
# better safe than sorry
assert len(x.shape) == 3, "x must be (c, x, y)"
if verbose: print("step_size:", step_size)
if verbose: print("do mirror:", do_mirroring)
assert patch_size is not None, "patch_size cannot be None for tiled prediction"
# for sliding window inference the image must at least be as large as the patch size. It does not matter
# whether the shape is divisible by 2**num_pool as long as the patch size is
data, slicer = pad_nd_image(x, patch_size, pad_border_mode, pad_kwargs, True, None)
data_shape = data.shape # still c, x, y
# compute the steps for sliding window
steps = self._compute_steps_for_sliding_window(patch_size, data_shape[1:], step_size)
num_tiles = len(steps[0]) * len(steps[1])
if verbose:
print("data shape:", data_shape)
print("patch size:", patch_size)
print("steps (x, y, and z):", steps)
print("number of tiles:", num_tiles)
# we only need to compute that once. It can take a while to compute this due to the large sigma in
# gaussian_filter
if use_gaussian and num_tiles > 1:
if self._gaussian_2d is None or not all(
[i == j for i, j in zip(patch_size, self._patch_size_for_gaussian_2d)]):
if verbose: print('computing Gaussian')
gaussian_importance_map = self._get_gaussian(patch_size, sigma_scale=1. / 8)
self._gaussian_2d = gaussian_importance_map
self._patch_size_for_gaussian_2d = patch_size
else:
if verbose: print("using precomputed Gaussian")
gaussian_importance_map = self._gaussian_2d
gaussian_importance_map = torch.from_numpy(gaussian_importance_map)
if torch.cuda.is_available():
gaussian_importance_map = gaussian_importance_map.cuda(self.get_device(), non_blocking=True)
else:
gaussian_importance_map = None
if all_in_gpu:
# If we run the inference in GPU only (meaning all tensors are allocated on the GPU, this reduces
# CPU-GPU communication but required more GPU memory) we need to preallocate a few things on GPU
if use_gaussian and num_tiles > 1:
# half precision for the outputs should be good enough. If the outputs here are half, the
# gaussian_importance_map should be as well
gaussian_importance_map = gaussian_importance_map.half()
# make sure we did not round anything to 0
gaussian_importance_map[gaussian_importance_map == 0] = gaussian_importance_map[
gaussian_importance_map != 0].min()
add_for_nb_of_preds = gaussian_importance_map
else:
add_for_nb_of_preds = torch.ones(patch_size, device=self.get_device())
if verbose: print("initializing result array (on GPU)")
aggregated_results = torch.zeros([self.num_classes] + list(data.shape[1:]), dtype=torch.half,
device=self.get_device())
if verbose: print("moving data to GPU")
data = torch.from_numpy(data).cuda(self.get_device(), non_blocking=True)
if verbose: print("initializing result_numsamples (on GPU)")
aggregated_nb_of_predictions = torch.zeros([self.num_classes] + list(data.shape[1:]), dtype=torch.half,
device=self.get_device())
else:
if use_gaussian and num_tiles > 1:
add_for_nb_of_preds = self._gaussian_2d
else:
add_for_nb_of_preds = np.ones(patch_size, dtype=np.float32)
aggregated_results = np.zeros([self.num_classes] + list(data.shape[1:]), dtype=np.float32)
aggregated_nb_of_predictions = np.zeros([self.num_classes] + list(data.shape[1:]), dtype=np.float32)
for x in steps[0]:
lb_x = x
ub_x = x + patch_size[0]
for y in steps[1]:
lb_y = y
ub_y = y + patch_size[1]
predicted_patch = self._internal_maybe_mirror_and_pred_2D(
data[None, :, lb_x:ub_x, lb_y:ub_y], mirror_axes, do_mirroring,
gaussian_importance_map)[0]
if all_in_gpu:
predicted_patch = predicted_patch.half()
else:
predicted_patch = predicted_patch.cpu().numpy()
aggregated_results[:, lb_x:ub_x, lb_y:ub_y] += predicted_patch
aggregated_nb_of_predictions[:, lb_x:ub_x, lb_y:ub_y] += add_for_nb_of_preds
# we reverse the padding here (remeber that we padded the input to be at least as large as the patch size
slicer = tuple(
[slice(0, aggregated_results.shape[i]) for i in
range(len(aggregated_results.shape) - (len(slicer) - 1))] + slicer[1:])
aggregated_results = aggregated_results[slicer]
aggregated_nb_of_predictions = aggregated_nb_of_predictions[slicer]
# computing the class_probabilities by dividing the aggregated result with result_numsamples
class_probabilities = aggregated_results / aggregated_nb_of_predictions
if regions_class_order is None:
predicted_segmentation = class_probabilities.argmax(0)
else:
if all_in_gpu:
class_probabilities_here = class_probabilities.detach().cpu().numpy()
else:
class_probabilities_here = class_probabilities
predicted_segmentation = np.zeros(class_probabilities_here.shape[1:], dtype=np.float32)
for i, c in enumerate(regions_class_order):
predicted_segmentation[class_probabilities_here[i] > 0.5] = c
if all_in_gpu:
if verbose: print("copying results to CPU")
if regions_class_order is None:
predicted_segmentation = predicted_segmentation.detach().cpu().numpy()
class_probabilities = class_probabilities.detach().cpu().numpy()
if verbose: print("prediction done")
return predicted_segmentation, class_probabilities
def _internal_predict_3D_2Dconv(self, x: np.ndarray, min_size: Tuple[int, int], do_mirroring: bool,
mirror_axes: tuple = (0, 1), regions_class_order: tuple = None,
pad_border_mode: str = "constant", pad_kwargs: dict = None,
all_in_gpu: bool = False, verbose: bool = True) -> Tuple[np.ndarray, np.ndarray]:
if all_in_gpu:
raise NotImplementedError
assert len(x.shape) == 4, "data must be c, x, y, z"
predicted_segmentation = []
softmax_pred = []
for s in range(x.shape[1]):
pred_seg, softmax_pres = self._internal_predict_2D_2Dconv(
x[:, s], min_size, do_mirroring, mirror_axes, regions_class_order, pad_border_mode, pad_kwargs, verbose)
predicted_segmentation.append(pred_seg[None])
softmax_pred.append(softmax_pres[None])
predicted_segmentation = np.vstack(predicted_segmentation)
softmax_pred = np.vstack(softmax_pred).transpose((1, 0, 2, 3))
return predicted_segmentation, softmax_pred
def predict_3D_pseudo3D_2Dconv(self, x: np.ndarray, min_size: Tuple[int, int], do_mirroring: bool,
mirror_axes: tuple = (0, 1), regions_class_order: tuple = None,
pseudo3D_slices: int = 5, all_in_gpu: bool = False,
pad_border_mode: str = "constant", pad_kwargs: dict = None,
verbose: bool = True) -> Tuple[np.ndarray, np.ndarray]:
if all_in_gpu:
raise NotImplementedError
assert len(x.shape) == 4, "data must be c, x, y, z"
assert pseudo3D_slices % 2 == 1, "pseudo3D_slices must be odd"
extra_slices = (pseudo3D_slices - 1) // 2
shp_for_pad = np.array(x.shape)
shp_for_pad[1] = extra_slices
pad = np.zeros(shp_for_pad, dtype=np.float32)
data = np.concatenate((pad, x, pad), 1)
predicted_segmentation = []
softmax_pred = []
for s in range(extra_slices, data.shape[1] - extra_slices):
d = data[:, (s - extra_slices):(s + extra_slices + 1)]
d = d.reshape((-1, d.shape[-2], d.shape[-1]))
pred_seg, softmax_pres = \
self._internal_predict_2D_2Dconv(d, min_size, do_mirroring, mirror_axes,
regions_class_order, pad_border_mode, pad_kwargs, verbose)
predicted_segmentation.append(pred_seg[None])
softmax_pred.append(softmax_pres[None])
predicted_segmentation = np.vstack(predicted_segmentation)
softmax_pred = np.vstack(softmax_pred).transpose((1, 0, 2, 3))
return predicted_segmentation, softmax_pred
def _internal_predict_3D_2Dconv_tiled(self, x: np.ndarray, patch_size: Tuple[int, int], do_mirroring: bool,
mirror_axes: tuple = (0, 1), step_size: float = 0.5,
regions_class_order: tuple = None, use_gaussian: bool = False,
pad_border_mode: str = "edge", pad_kwargs: dict =None,
all_in_gpu: bool = False,
verbose: bool = True) -> Tuple[np.ndarray, np.ndarray]:
if all_in_gpu:
raise NotImplementedError
assert len(x.shape) == 4, "data must be c, x, y, z"
predicted_segmentation = []
softmax_pred = []
for s in range(x.shape[1]):
pred_seg, softmax_pres = self._internal_predict_2D_2Dconv_tiled(
x[:, s], step_size, do_mirroring, mirror_axes, patch_size, regions_class_order, use_gaussian,
pad_border_mode, pad_kwargs, all_in_gpu, verbose)
predicted_segmentation.append(pred_seg[None])
softmax_pred.append(softmax_pres[None])
predicted_segmentation = np.vstack(predicted_segmentation)
softmax_pred = np.vstack(softmax_pred).transpose((1, 0, 2, 3))
return predicted_segmentation, softmax_pred
if __name__ == '__main__':
print(SegmentationNetwork._compute_steps_for_sliding_window((30, 224, 224), (162, 529, 529), 0.5))
print(SegmentationNetwork._compute_steps_for_sliding_window((30, 224, 224), (162, 529, 529), 1))
print(SegmentationNetwork._compute_steps_for_sliding_window((30, 224, 224), (162, 529, 529), 0.1))
print(SegmentationNetwork._compute_steps_for_sliding_window((30, 224, 224), (60, 448, 224), 1))
print(SegmentationNetwork._compute_steps_for_sliding_window((30, 224, 224), (60, 448, 224), 0.5))
print(SegmentationNetwork._compute_steps_for_sliding_window((30, 224, 224), (30, 224, 224), 1))
print(SegmentationNetwork._compute_steps_for_sliding_window((30, 224, 224), (30, 224, 224), 0.125))
print(SegmentationNetwork._compute_steps_for_sliding_window((123, 54, 123), (246, 162, 369), 0.25))