Spaces:
Sleeping
Sleeping
File size: 6,538 Bytes
9bdaa77 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# Copyright 2022 DeepMind Technologies Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Didactic example of an autoregressive Transformer-based language model.
Glossary of shapes:
- B: Batch size.
- T: Sequence length.
- D: Model embedding size.
- H: Number of attention heads.
- V: Vocabulary size.
Forked from: haiku.examples.transformer.model
"""
import collections
import dataclasses
from typing import Callable, Optional
import chex
import haiku as hk
import jax
import jax.numpy as jnp
import numpy as np
from tracr.transformer import attention
# hk.Modules are not always callable: github.com/deepmind/dm-haiku/issues/52
# Ideally, we'd want a type:
# CallableHaikuModule = Intersection[Callable[..., jax.Array], hk.Module]
# But Intersection does not exist (yet): github.com/python/typing/issues/213
CallableHaikuModule = Callable[..., jax.Array]
@chex.dataclass
class TransformerOutput:
layer_outputs: list[jax.Array] # [B, T, D]
residuals: list[jax.Array] # [B, T, D]
attn_logits: list[jax.Array] # [B, H, T, T]
output: jax.Array # [B, T, D]
input_embeddings: jax.Array # [B, T, D]
@dataclasses.dataclass
class TransformerConfig:
num_heads: int
num_layers: int
key_size: int
mlp_hidden_size: int
dropout_rate: float
activation_function: Callable[[jax.Array], jax.Array] = jax.nn.gelu
layer_norm: bool = True
causal: bool = False
@dataclasses.dataclass
class Transformer(hk.Module):
"""A transformer stack."""
config: TransformerConfig
name: Optional[str] = None
def __call__(
self,
embeddings: jax.Array, # [B, T, D]
mask: jax.Array, # [B, T]
*,
use_dropout: bool = True,
) -> TransformerOutput:
"""Transforms input embedding sequences to output embedding sequences."""
def layer_norm(x: jax.Array) -> jax.Array:
"""Applies a unique LayerNorm to x with default settings."""
if self.config.layer_norm:
return hk.LayerNorm(axis=-1, create_scale=True, create_offset=True)(x)
return x
initializer = hk.initializers.VarianceScaling(2 / self.config.num_layers)
dropout_rate = self.config.dropout_rate if use_dropout else 0.
_, seq_len, model_size = embeddings.shape
# Compute causal mask for autoregressive sequence modelling.
mask = mask[:, None, None, :] # [B, H=1, T'=1, T]
mask = mask.repeat(seq_len, axis=2) # [B, H=1, T, T]
if self.config.causal:
causal_mask = np.ones((1, 1, seq_len, seq_len)) # [B=1, H=1, T, T]
causal_mask = np.tril(causal_mask)
mask = mask * causal_mask # [B, H=1, T, T]
# Set up activation collection.
collected = collections.defaultdict(list)
def collect(**kwargs):
for k, v in kwargs.items():
collected[k].append(v)
residual = embeddings
for layer in range(self.config.num_layers):
with hk.experimental.name_scope(f"layer_{layer}"):
# First the attention block.
attn_block = attention.MultiHeadAttention(
num_heads=self.config.num_heads,
key_size=self.config.key_size,
model_size=model_size,
w_init=initializer,
name="attn")
attn_in = layer_norm(residual)
attn_out = attn_block(attn_in, attn_in, attn_in, mask=mask)
attn_out, attn_logits = attn_out.out, attn_out.logits
if dropout_rate > 0:
attn_out = hk.dropout(hk.next_rng_key(), dropout_rate, attn_out)
residual = residual + attn_out
collect(
residuals=residual, layer_outputs=attn_out, attn_logits=attn_logits)
# Then the dense block.
with hk.experimental.name_scope("mlp"):
dense_block = hk.Sequential([
hk.Linear(
self.config.mlp_hidden_size,
w_init=initializer,
name="linear_1"),
self.config.activation_function,
hk.Linear(model_size, w_init=initializer, name="linear_2"),
])
dense_in = layer_norm(residual)
dense_out = dense_block(dense_in)
if dropout_rate > 0:
dense_out = hk.dropout(hk.next_rng_key(), dropout_rate, dense_out)
residual = residual + dense_out
collect(residuals=residual, layer_outputs=dense_out)
return TransformerOutput(
residuals=collected["residuals"],
layer_outputs=collected["layer_outputs"],
attn_logits=collected["attn_logits"],
output=layer_norm(residual),
input_embeddings=embeddings,
)
@chex.dataclass
class CompiledTransformerModelOutput:
transformer_output: TransformerOutput
unembedded_output: jax.Array # [B, T]
@dataclasses.dataclass
class CompiledTransformerModel(hk.Module):
"""A transformer model with one-hot embeddings."""
transformer: Transformer
token_embed: CallableHaikuModule
position_embed: CallableHaikuModule
unembed: CallableHaikuModule
use_unembed_argmax: bool
pad_token: Optional[int] = None
def embed(self, tokens: jax.Array) -> jax.Array:
token_embeddings = self.token_embed(tokens)
positional_embeddings = self.position_embed(jnp.indices(tokens.shape)[-1])
return token_embeddings + positional_embeddings # [B, T, D]
def __call__(
self,
tokens: jax.Array,
use_dropout: bool = True,
) -> CompiledTransformerModelOutput:
"""Embed tokens, pass through model, and unembed output."""
if self.pad_token is None:
input_mask = jnp.ones_like(tokens)
else:
input_mask = (tokens != self.pad_token)
input_embeddings = self.embed(tokens)
transformer_output = self.transformer(
input_embeddings,
input_mask,
use_dropout=use_dropout,
)
return CompiledTransformerModelOutput(
transformer_output=transformer_output,
unembedded_output=self.unembed(
transformer_output.output,
use_unembed_argmax=self.use_unembed_argmax,
),
)
|