RASP-Synthesis / tracr /transformer /compressed_model.py
Vladimir Mikulik
add typing_extensions to list of deps.
d4d39d0
# Copyright 2022 DeepMind Technologies Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Modified transformer to learn a linear compression of the residual stream.
CompressedTransformer adds three arguments compared to Transformer:
- embedding_size: the size of the compressed residual stream.
- unembed_at_every_layer: whether to apply the unembedding before applying
attention and MLP layers
- return_activations: whether to return all model activations rather than just
the outputs
"""
import collections
import dataclasses
from typing import Optional
import haiku as hk
import jax
import numpy as np
from tracr.transformer import attention
from tracr.transformer import model
@dataclasses.dataclass
class CompressedTransformer(hk.Module):
"""A transformer stack with linearly compressed residual stream."""
config: model.TransformerConfig
name: Optional[str] = None
def __call__(
self,
embeddings: jax.Array, # [B, T, D]
mask: jax.Array, # [B, T]
*,
use_dropout: bool = True,
embedding_size: Optional[int] = None,
unembed_at_every_layer: bool = False,
) -> model.TransformerOutput: # [B, T, D]
"""Transforms input embedding sequences to output embedding sequences.
Args:
embeddings: Input embeddings to pass through the model.
mask: Boolean mask to restrict the inputs the model uses.
use_dropout: Turns dropout on/off.
embedding_size: Dimension to compress the residual stream to.
unembed_at_every_layer: Whether to unembed the residual stream when
reading the input for every layer (keeping the layer input sizes) or to
only unembed before the model output (compressing the layer inputs).
Returns:
The outputs of the forward pass through the transformer.
"""
def layer_norm(x: jax.Array) -> jax.Array:
"""Applies a unique LayerNorm to x with default settings."""
if self.config.layer_norm:
return hk.LayerNorm(axis=-1, create_scale=True, create_offset=True)(x)
return x
initializer = hk.initializers.VarianceScaling(2 / self.config.num_layers)
dropout_rate = self.config.dropout_rate if use_dropout else 0.
_, seq_len, model_size = embeddings.shape
# To compress the model, we multiply with a matrix W when reading from
# the residual stream, and with W^T when writing to the residual stream.
if embedding_size is not None:
# [to_size, from_size]
w_emb = hk.get_parameter(
"w_emb", (embedding_size, model_size),
init=hk.initializers.RandomNormal())
write_to_residual = lambda x: x @ w_emb.T
read_from_residual = lambda x: x @ w_emb
if not unembed_at_every_layer:
model_size = embedding_size
else:
write_to_residual = lambda x: x
read_from_residual = lambda x: x
# Compute causal mask for autoregressive sequence modelling.
mask = mask[:, None, None, :] # [B, H=1, T'=1, T]
mask = mask.repeat(seq_len, axis=2) # [B, H=1, T, T]
if self.config.causal:
causal_mask = np.ones((1, 1, seq_len, seq_len)) # [B=1, H=1, T, T]
causal_mask = np.tril(causal_mask)
mask = mask * causal_mask # [B, H=1, T, T]
# Set up activation collection.
collected = collections.defaultdict(list)
def collect(**kwargs):
for k, v in kwargs.items():
collected[k].append(v)
residual = write_to_residual(embeddings)
for layer in range(self.config.num_layers):
with hk.experimental.name_scope(f"layer_{layer}"):
# First the attention block.
attn_block = attention.MultiHeadAttention(
num_heads=self.config.num_heads,
key_size=self.config.key_size,
model_size=model_size,
w_init=initializer,
name="attn")
attn_in = residual
if unembed_at_every_layer:
attn_in = read_from_residual(attn_in)
attn_in = layer_norm(attn_in)
attn_out = attn_block(attn_in, attn_in, attn_in, mask=mask)
attn_out, attn_logits = attn_out.out, attn_out.logits
if dropout_rate > 0:
attn_out = hk.dropout(hk.next_rng_key(), dropout_rate, attn_out)
if unembed_at_every_layer:
collect(layer_outputs=attn_out, attn_logits=attn_logits)
else:
collect(
layer_outputs=read_from_residual(attn_out),
attn_logits=attn_logits,
)
if unembed_at_every_layer:
attn_out = write_to_residual(attn_out)
residual = residual + attn_out
collect(residuals=residual)
# Then the dense block.
with hk.experimental.name_scope("mlp"):
dense_block = hk.Sequential([
hk.Linear(
self.config.mlp_hidden_size,
w_init=initializer,
name="linear_1"),
self.config.activation_function,
hk.Linear(model_size, w_init=initializer, name="linear_2"),
])
dense_in = residual
if unembed_at_every_layer:
dense_in = read_from_residual(dense_in)
dense_in = layer_norm(dense_in)
dense_out = dense_block(dense_in)
if dropout_rate > 0:
dense_out = hk.dropout(hk.next_rng_key(), dropout_rate, dense_out)
if unembed_at_every_layer:
collect(layer_outputs=dense_out)
else:
collect(layer_outputs=read_from_residual(dense_out))
if unembed_at_every_layer:
dense_out = write_to_residual(dense_out)
residual = residual + dense_out
collect(residuals=residual)
output = read_from_residual(residual)
output = layer_norm(output)
return model.TransformerOutput(
layer_outputs=collected["layer_outputs"],
residuals=collected["residuals"],
attn_logits=collected["attn_logits"],
output=output,
input_embeddings=embeddings,
)