|
|
|
|
|
|
|
|
|
|
|
|
|
from collections import namedtuple |
|
from dataclasses import dataclass |
|
|
|
import logging |
|
import typing as tp |
|
import torch |
|
|
|
LayoutCoord = namedtuple('LayoutCoord', ['t', 'q']) |
|
PatternLayout = tp.List[tp.List[LayoutCoord]] |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
@dataclass |
|
class Pattern: |
|
"""Base implementation of a pattern over a sequence with multiple codebooks. |
|
|
|
The codebook pattern consists in a layout, defining for each sequence step |
|
the list of coordinates of each codebook timestep in the resulting interleaved sequence. |
|
The first item of the pattern is always an empty list in order to properly insert a special token |
|
to start with. For convenience, we also keep track of ``n_q`` the number of codebooks used for the pattern |
|
and ``timesteps`` the number of timesteps corresponding to the original sequence. |
|
|
|
The pattern provides convenient methods to build and revert interleaved sequences from it: |
|
``build_pattern_sequence`` maps a given a dense input tensor of multi-codebook sequence from [B, K, T] |
|
to the interleaved sequence of shape [B, K, S] applying the pattern, with B being the batch size, |
|
K being the number of codebooks, T the number of original timesteps and S the number of sequence steps |
|
for the output sequence. The unfilled positions are replaced with a special token and the built sequence |
|
is returned along with a mask indicating valid tokens. |
|
``revert_pattern_sequence`` maps back an interleaved sequence of shape [B, K, S] to the original alignment |
|
of codebooks across timesteps to an output tensor of shape [B, K, T], using again a special token and a mask |
|
to fill and specify invalid positions if needed. |
|
See the dedicated methods for more details. |
|
""" |
|
|
|
|
|
|
|
|
|
layout: PatternLayout |
|
timesteps: int |
|
n_q: int |
|
|
|
def __post_init__(self): |
|
|
|
|
|
self._build_reverted_sequence_scatter_indexes = self._build_reverted_sequence_scatter_indexes |
|
self._build_pattern_sequence_scatter_indexes = self._build_pattern_sequence_scatter_indexes |
|
print("New pattern, time steps: %d, sequence steps: %d", self.timesteps, len(self.layout)) |
|
|
|
@property |
|
def max_delay(self): |
|
max_t_in_seq_coords = 0 |
|
for seq_coords in self.layout[1:]: |
|
for coords in seq_coords: |
|
max_t_in_seq_coords = max(max_t_in_seq_coords, coords.t + 1) |
|
return max_t_in_seq_coords - self.timesteps |
|
|
|
@property |
|
def valid_layout(self): |
|
valid_step = len(self.layout) - self.max_delay |
|
return self.layout[:valid_step] |
|
|
|
def starts_with_special_token(self): |
|
return self.layout[0] == [] |
|
|
|
def get_sequence_coords_with_timestep(self, t: int, q: tp.Optional[int] = None): |
|
"""Get codebook coordinates in the layout that corresponds to the specified timestep t |
|
and optionally to the codebook q. Coordinates are returned as a tuple with the sequence step |
|
and the actual codebook coordinates. |
|
""" |
|
assert t <= self.timesteps, "provided timesteps is greater than the pattern's number of timesteps" |
|
if q is not None: |
|
assert q <= self.n_q, "provided number of codebooks is greater than the pattern's number of codebooks" |
|
coords = [] |
|
for s, seq_codes in enumerate(self.layout): |
|
for code in seq_codes: |
|
if code.t == t and (q is None or code.q == q): |
|
coords.append((s, code)) |
|
return coords |
|
|
|
def get_steps_with_timestep(self, t: int, q: tp.Optional[int] = None) -> tp.List[int]: |
|
return [step for step, coords in self.get_sequence_coords_with_timestep(t, q)] |
|
|
|
def get_first_step_with_timesteps(self, t: int, q: tp.Optional[int] = None) -> tp.Optional[int]: |
|
steps_with_timesteps = self.get_steps_with_timestep(t, q) |
|
return steps_with_timesteps[0] if len(steps_with_timesteps) > 0 else None |
|
|
|
def _build_pattern_sequence_scatter_indexes(self, timesteps: int, n_q: int, keep_only_valid_steps: bool, |
|
device: tp.Union[torch.device, str] = 'cpu'): |
|
"""Build scatter indexes corresponding to the pattern, up to the provided sequence_steps. |
|
|
|
Args: |
|
timesteps (int): Maximum number of timesteps steps to consider. |
|
keep_only_valid_steps (bool): Restrict the pattern layout to match only valid steps. |
|
device (torch.device or str): Device for created tensors. |
|
Returns: |
|
indexes (torch.Tensor): Indexes corresponding to the sequence, of shape [K, S]. |
|
mask (torch.Tensor): Mask corresponding to indexes that matches valid indexes, of shape [K, S]. |
|
""" |
|
assert n_q == self.n_q, f"invalid number of codebooks for the sequence and the pattern: {n_q} != {self.n_q}" |
|
assert timesteps <= self.timesteps, "invalid number of timesteps used to build the sequence from the pattern" |
|
|
|
|
|
ref_layout = self.valid_layout if keep_only_valid_steps else self.layout |
|
|
|
indexes = torch.zeros(n_q, len(ref_layout), dtype=torch.long).numpy() |
|
mask = torch.zeros(n_q, len(ref_layout), dtype=torch.bool).numpy() |
|
|
|
|
|
|
|
indexes[:] = n_q * timesteps |
|
|
|
for s, sequence_coords in enumerate(ref_layout): |
|
for coords in sequence_coords: |
|
if coords.t < timesteps: |
|
indexes[coords.q, s] = coords.t + coords.q * timesteps |
|
mask[coords.q, s] = 1 |
|
indexes = torch.from_numpy(indexes).to(device) |
|
mask = torch.from_numpy(mask).to(device) |
|
return indexes, mask |
|
|
|
def build_pattern_sequence(self, |
|
z, |
|
special_token, |
|
keep_only_valid_steps=False): |
|
B, K, T = z.shape |
|
indexes, mask = self._build_pattern_sequence_scatter_indexes( |
|
T, K, keep_only_valid_steps=keep_only_valid_steps, device=str(z.device) |
|
) |
|
z = z.view(B, -1) |
|
|
|
z = torch.cat([z, torch.zeros_like(z[:, :1]) + special_token], dim=1) |
|
values = z[:, indexes.view(-1)] |
|
values = values.view(B, K, indexes.shape[-1]) |
|
|
|
|
|
|
|
|
|
|
|
return values, indexes, mask |
|
|
|
def _build_reverted_sequence_scatter_indexes(self, sequence_steps: int, n_q: int, |
|
keep_only_valid_steps: bool = False, |
|
is_model_output: bool = False, |
|
device: tp.Union[torch.device, str] = 'cpu'): |
|
"""Builds scatter indexes required to retrieve the original multi-codebook sequence |
|
from interleaving pattern. |
|
|
|
Args: |
|
sequence_steps (int): Sequence steps. |
|
n_q (int): Number of codebooks. |
|
keep_only_valid_steps (bool): Build a sequence from the pattern up to valid (= fully defined) steps. |
|
Steps that are beyond valid steps will be replaced by the special_token in that case. |
|
is_model_output (bool): Whether to keep the sequence item corresponding to initial special token or not. |
|
device (torch.device or str): Device for created tensors. |
|
Returns: |
|
indexes (torch.Tensor): Indexes for reconstructing the output, of shape [K, T]. |
|
mask (torch.Tensor): Mask corresponding to indexes that matches valid indexes of shape [K, T]. |
|
""" |
|
ref_layout = self.valid_layout if keep_only_valid_steps else self.layout |
|
|
|
timesteps = self.timesteps |
|
assert n_q == self.n_q, f"invalid number of codebooks for the sequence and the pattern: {n_q} != {self.n_q}" |
|
assert sequence_steps <= len(ref_layout), \ |
|
f"sequence to revert is longer than the defined pattern: {sequence_steps} > {len(ref_layout)}" |
|
|
|
|
|
if is_model_output and self.starts_with_special_token(): |
|
ref_layout = ref_layout[1:] |
|
|
|
|
|
indexes = torch.zeros(n_q, timesteps, dtype=torch.long).numpy() |
|
mask = torch.zeros(n_q, timesteps, dtype=torch.bool).numpy() |
|
|
|
indexes[:] = n_q * sequence_steps |
|
for s, sequence_codes in enumerate(ref_layout): |
|
if s < sequence_steps: |
|
for code in sequence_codes: |
|
if code.t < timesteps: |
|
indexes[code.q, code.t] = s + code.q * sequence_steps |
|
mask[code.q, code.t] = 1 |
|
indexes = torch.from_numpy(indexes).to(device) |
|
mask = torch.from_numpy(mask).to(device) |
|
return indexes, mask |
|
|
|
def revert_pattern_sequence(self, |
|
s, |
|
special_token, |
|
keep_only_valid_steps=False): |
|
"""SPECIAL TOKEN NOT DELETED HERE !!!! |
|
|
|
Args: |
|
s (torch.Tensor): Interleaved sequence tensor obtained from the pattern, of shape [B, K, S]. |
|
special_token (int or float): Special token used to fill non-pattern coordinates in the new sequence. |
|
Returns: |
|
values (torch.Tensor) : Interleaved sequence matching the pattern, of shape [B, K, T] with T |
|
indexes (torch.Tensor): Indexes corresponding to the interleaved sequence, of shape [K, T]. |
|
mask (torch.Tensor) : Mask corresponding to indexes that matches valid indexes of shape [K, T]. |
|
shall this mask delete special token id; |
|
""" |
|
B, K, S = s.shape |
|
indexes, mask = self._build_reverted_sequence_scatter_indexes( |
|
S, K, keep_only_valid_steps, is_model_output=False, device=str(s.device) |
|
) |
|
s = s.view(B, -1) |
|
|
|
s = torch.cat([s, torch.zeros_like(s[:, :1]) + special_token], dim=1) |
|
values = s[:, indexes.view(-1)] |
|
values = values.view(B, K, indexes.shape[-1]) |
|
|
|
return values, indexes, mask |
|
|
|
|
|
|
|
|
|
|
|
class DelayedPatternProvider(): |
|
"""Provider for delayed pattern across delayed codebooks. |
|
Codebooks are delayed in the sequence and sequence steps will contain codebooks |
|
from different timesteps. |
|
|
|
Example: |
|
Taking timesteps=4 and n_q=3, delays=None, the multi-codebook sequence: |
|
[[1, 2, 3, 4], |
|
[1, 2, 3, 4], |
|
[1, 2, 3, 4]] |
|
The resulting sequence obtained from the returned pattern is: |
|
[[S, 1, 2, 3, 4], |
|
[S, S, 1, 2, 3], |
|
[S, S, S, 1, 2]] |
|
(with S being a special token) |
|
|
|
Args: |
|
n_q (int): Number of codebooks. |
|
delays (list of int, optional): Delay for each of the codebooks. |
|
If delays not defined, each codebook is delayed by 1 compared to the previous one. |
|
flatten_first (int): Flatten the first N timesteps. |
|
empty_initial (int): Prepend with N empty list of coordinates. |
|
""" |
|
def __init__(self, |
|
n_q, |
|
delays, |
|
flatten_first=0, |
|
empty_initial=0): |
|
self.n_q = n_q |
|
if delays is None: |
|
delays = list(range(n_q)) |
|
print(f'{delays=} PATTERN __ini') |
|
self.delays = delays |
|
self.flatten_first = flatten_first |
|
self.empty_initial = empty_initial |
|
assert len(self.delays) == self.n_q |
|
assert sorted(self.delays) == self.delays |
|
|
|
def get_pattern(self, timesteps): |
|
|
|
|
|
|
|
omit_special_token = self.empty_initial < 0 |
|
|
|
out: PatternLayout = [] if omit_special_token else [[]] |
|
max_delay = max(self.delays) |
|
if self.empty_initial: |
|
out += [[] for _ in range(self.empty_initial)] |
|
if self.flatten_first: |
|
for t in range(min(timesteps, self.flatten_first)): |
|
for q in range(self.n_q): |
|
out.append([LayoutCoord(t, q)]) |
|
for t in range(self.flatten_first, timesteps + max_delay): |
|
v = [] |
|
for q, delay in enumerate(self.delays): |
|
t_for_q = t - delay |
|
if t_for_q >= self.flatten_first: |
|
v.append(LayoutCoord(t_for_q, q)) |
|
out.append(v) |
|
|
|
return Pattern(out, n_q=self.n_q, timesteps=timesteps) |
|
|
|
|
|
|
|
|