Spaces:
Runtime error
Runtime error
File size: 6,645 Bytes
31f2f28 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
"""PyTorch layer for estimating optical flow by a residual flow pyramid.
This approach of estimating optical flow between two images can be traced back
to [1], but is also used by later neural optical flow computation methods such
as SpyNet [2] and PWC-Net [3].
The basic idea is that the optical flow is first estimated in a coarse
resolution, then the flow is upsampled to warp the higher resolution image and
then a residual correction is computed and added to the estimated flow. This
process is repeated in a pyramid on coarse to fine order to successively
increase the resolution of both optical flow and the warped image.
In here, the optical flow predictor is used as an internal component for the
film_net frame interpolator, to warp the two input images into the inbetween,
target frame.
[1] F. Glazer, Hierarchical motion detection. PhD thesis, 1987.
[2] A. Ranjan and M. J. Black, Optical Flow Estimation using a Spatial Pyramid
Network. 2016
[3] D. Sun X. Yang, M-Y. Liu and J. Kautz, PWC-Net: CNNs for Optical Flow Using
Pyramid, Warping, and Cost Volume, 2017
"""
from typing import List
import torch
from torch import nn
from torch.nn import functional as F
import util
class FlowEstimator(nn.Module):
"""Small-receptive field predictor for computing the flow between two images.
This is used to compute the residual flow fields in PyramidFlowEstimator.
Note that while the number of 3x3 convolutions & filters to apply is
configurable, two extra 1x1 convolutions are appended to extract the flow in
the end.
Attributes:
name: The name of the layer
num_convs: Number of 3x3 convolutions to apply
num_filters: Number of filters in each 3x3 convolution
"""
def __init__(self, in_channels: int, num_convs: int, num_filters: int):
super(FlowEstimator, self).__init__()
self._convs = nn.ModuleList()
for i in range(num_convs):
self._convs.append(util.Conv2d(in_channels=in_channels, out_channels=num_filters, size=3))
in_channels = num_filters
self._convs.append(util.Conv2d(in_channels, num_filters // 2, size=1))
in_channels = num_filters // 2
# For the final convolution, we want no activation at all to predict the
# optical flow vector values. We have done extensive testing on explicitly
# bounding these values using sigmoid, but it turned out that having no
# activation gives better results.
self._convs.append(util.Conv2d(in_channels, 2, size=1, activation=None))
def forward(self, features_a: torch.Tensor, features_b: torch.Tensor) -> torch.Tensor:
"""Estimates optical flow between two images.
Args:
features_a: per pixel feature vectors for image A (B x H x W x C)
features_b: per pixel feature vectors for image B (B x H x W x C)
Returns:
A tensor with optical flow from A to B
"""
net = torch.cat([features_a, features_b], dim=1)
for conv in self._convs:
net = conv(net)
return net
class PyramidFlowEstimator(nn.Module):
"""Predicts optical flow by coarse-to-fine refinement.
"""
def __init__(self, filters: int = 64,
flow_convs: tuple = (3, 3, 3, 3),
flow_filters: tuple = (32, 64, 128, 256)):
super(PyramidFlowEstimator, self).__init__()
in_channels = filters << 1
predictors = []
for i in range(len(flow_convs)):
predictors.append(
FlowEstimator(
in_channels=in_channels,
num_convs=flow_convs[i],
num_filters=flow_filters[i]))
in_channels += filters << (i + 2)
self._predictor = predictors[-1]
self._predictors = nn.ModuleList(predictors[:-1][::-1])
def forward(self, feature_pyramid_a: List[torch.Tensor],
feature_pyramid_b: List[torch.Tensor]) -> List[torch.Tensor]:
"""Estimates residual flow pyramids between two image pyramids.
Each image pyramid is represented as a list of tensors in fine-to-coarse
order. Each individual image is represented as a tensor where each pixel is
a vector of image features.
util.flow_pyramid_synthesis can be used to convert the residual flow
pyramid returned by this method into a flow pyramid, where each level
encodes the flow instead of a residual correction.
Args:
feature_pyramid_a: image pyramid as a list in fine-to-coarse order
feature_pyramid_b: image pyramid as a list in fine-to-coarse order
Returns:
List of flow tensors, in fine-to-coarse order, each level encoding the
difference against the bilinearly upsampled version from the coarser
level. The coarsest flow tensor, e.g. the last element in the array is the
'DC-term', e.g. not a residual (alternatively you can think of it being a
residual against zero).
"""
levels = len(feature_pyramid_a)
v = self._predictor(feature_pyramid_a[-1], feature_pyramid_b[-1])
residuals = [v]
for i in range(levels - 2, len(self._predictors) - 1, -1):
# Upsamples the flow to match the current pyramid level. Also, scales the
# magnitude by two to reflect the new size.
level_size = feature_pyramid_a[i].shape[2:4]
v = F.interpolate(2 * v, size=level_size, mode='bilinear')
# Warp feature_pyramid_b[i] image based on the current flow estimate.
warped = util.warp(feature_pyramid_b[i], v)
# Estimate the residual flow between pyramid_a[i] and warped image:
v_residual = self._predictor(feature_pyramid_a[i], warped)
residuals.insert(0, v_residual)
v = v_residual + v
for k, predictor in enumerate(self._predictors):
i = len(self._predictors) - 1 - k
# Upsamples the flow to match the current pyramid level. Also, scales the
# magnitude by two to reflect the new size.
level_size = feature_pyramid_a[i].shape[2:4]
v = F.interpolate(2 * v, size=level_size, mode='bilinear')
# Warp feature_pyramid_b[i] image based on the current flow estimate.
warped = util.warp(feature_pyramid_b[i], v)
# Estimate the residual flow between pyramid_a[i] and warped image:
v_residual = predictor(feature_pyramid_a[i], warped)
residuals.insert(0, v_residual)
v = v_residual + v
return residuals
|