Spaces:
Sleeping
Sleeping
# Copyright 2022 DeepMind Technologies Limited. All Rights Reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# ============================================================================== | |
"""RASP programs only using the subset of RASP supported by the compiler.""" | |
from typing import List, Sequence | |
from tracr.rasp import rasp | |
### Programs that work only under non-causal evaluation. | |
def make_length() -> rasp.SOp: | |
"""Creates the `length` SOp using selector width primitive. | |
Example usage: | |
length = make_length() | |
length("abcdefg") | |
>> [7.0, 7.0, 7.0, 7.0, 7.0, 7.0, 7.0] | |
Returns: | |
length: SOp mapping an input to a sequence, where every element | |
is the length of that sequence. | |
""" | |
all_true_selector = rasp.Select( | |
rasp.tokens, rasp.tokens, rasp.Comparison.TRUE).named("all_true_selector") | |
return rasp.SelectorWidth(all_true_selector).named("length") | |
length = make_length() | |
def make_reverse(sop: rasp.SOp) -> rasp.SOp: | |
"""Create an SOp that reverses a sequence, using length primitive. | |
Example usage: | |
reverse = make_reverse(rasp.tokens) | |
reverse("Hello") | |
>> ['o', 'l', 'l', 'e', 'H'] | |
Args: | |
sop: an SOp | |
Returns: | |
reverse : SOp that reverses the input sequence. | |
""" | |
opp_idx = (length - rasp.indices).named("opp_idx") | |
opp_idx = (opp_idx - 1).named("opp_idx-1") | |
reverse_selector = rasp.Select(rasp.indices, opp_idx, | |
rasp.Comparison.EQ).named("reverse_selector") | |
return rasp.Aggregate(reverse_selector, sop).named("reverse") | |
def make_pair_balance(sop: rasp.SOp, open_token: str, | |
close_token: str) -> rasp.SOp: | |
"""Return fraction of previous open tokens minus the fraction of close tokens. | |
(As implemented in the RASP paper.) | |
If the outputs are always non-negative and end in 0, that implies the input | |
has balanced parentheses. | |
Example usage: | |
num_l = make_pair_balance(rasp.tokens, "(", ")") | |
num_l("a()b(c))") | |
>> [0, 1/2, 0, 0, 1/5, 1/6, 0, -1/8] | |
Args: | |
sop: Input SOp. | |
open_token: Token that counts positive. | |
close_token: Token that counts negative. | |
Returns: | |
pair_balance: SOp mapping an input to a sequence, where every element | |
is the fraction of previous open tokens minus previous close tokens. | |
""" | |
bools_open = rasp.numerical(sop == open_token).named("bools_open") | |
opens = rasp.numerical(make_frac_prevs(bools_open)).named("opens") | |
bools_close = rasp.numerical(sop == close_token).named("bools_close") | |
closes = rasp.numerical(make_frac_prevs(bools_close)).named("closes") | |
pair_balance = rasp.numerical(rasp.LinearSequenceMap(opens, closes, 1, -1)) | |
return pair_balance.named("pair_balance") | |
def make_shuffle_dyck(pairs: List[str]) -> rasp.SOp: | |
"""Returns 1 if a set of parentheses are balanced, 0 else. | |
(As implemented in the RASP paper.) | |
Example usage: | |
shuffle_dyck2 = make_shuffle_dyck(pairs=["()", "{}"]) | |
shuffle_dyck2("({)}") | |
>> [1, 1, 1, 1] | |
shuffle_dyck2("(){)}") | |
>> [0, 0, 0, 0, 0] | |
Args: | |
pairs: List of pairs of open and close tokens that each should be balanced. | |
""" | |
assert len(pairs) >= 1 | |
# Compute running balance of each type of parenthesis | |
balances = [] | |
for pair in pairs: | |
assert len(pair) == 2 | |
open_token, close_token = pair | |
balance = make_pair_balance( | |
rasp.tokens, open_token=open_token, | |
close_token=close_token).named(f"balance_{pair}") | |
balances.append(balance) | |
# Check if balances where negative anywhere -> parentheses not balanced | |
any_negative = balances[0] < 0 | |
for balance in balances[1:]: | |
any_negative = any_negative | (balance < 0) | |
# Convert to numerical SOp | |
any_negative = rasp.numerical(rasp.Map(lambda x: x, | |
any_negative)).named("any_negative") | |
select_all = rasp.Select(rasp.indices, rasp.indices, | |
rasp.Comparison.TRUE).named("select_all") | |
has_neg = rasp.numerical(rasp.Aggregate(select_all, any_negative, | |
default=0)).named("has_neg") | |
# Check if all balances are 0 at the end -> closed all parentheses | |
all_zero = balances[0] == 0 | |
for balance in balances[1:]: | |
all_zero = all_zero & (balance == 0) | |
select_last = rasp.Select(rasp.indices, length - 1, | |
rasp.Comparison.EQ).named("select_last") | |
last_zero = rasp.Aggregate(select_last, all_zero).named("last_zero") | |
not_has_neg = (~has_neg).named("not_has_neg") | |
return (last_zero & not_has_neg).named("shuffle_dyck") | |
def make_shuffle_dyck2() -> rasp.SOp: | |
return make_shuffle_dyck(pairs=["()", "{}"]).named("shuffle_dyck2") | |
def make_hist() -> rasp.SOp: | |
"""Returns the number of times each token occurs in the input. | |
(As implemented in the RASP paper.) | |
Example usage: | |
hist = make_hist() | |
hist("abac") | |
>> [2, 1, 2, 1] | |
""" | |
same_tok = rasp.Select(rasp.tokens, rasp.tokens, | |
rasp.Comparison.EQ).named("same_tok") | |
return rasp.SelectorWidth(same_tok).named("hist") | |
def make_sort_unique(vals: rasp.SOp, keys: rasp.SOp) -> rasp.SOp: | |
"""Returns vals sorted by < relation on keys. | |
Only supports unique keys. | |
Example usage: | |
sort = make_sort(rasp.tokens, rasp.tokens) | |
sort([2, 4, 3, 1]) | |
>> [1, 2, 3, 4] | |
Args: | |
vals: Values to sort. | |
keys: Keys for sorting. | |
""" | |
smaller = rasp.Select(keys, keys, rasp.Comparison.LT).named("smaller") | |
target_pos = rasp.SelectorWidth(smaller).named("target_pos") | |
sel_new = rasp.Select(target_pos, rasp.indices, rasp.Comparison.EQ) | |
return rasp.Aggregate(sel_new, vals).named("sort") | |
def make_sort(vals: rasp.SOp, keys: rasp.SOp, *, max_seq_len: int, | |
min_key: float) -> rasp.SOp: | |
"""Returns vals sorted by < relation on keys, which don't need to be unique. | |
The implementation differs from the RASP paper, as it avoids using | |
compositions of selectors to break ties. Instead, it uses the arguments | |
max_seq_len and min_key to ensure the keys are unique. | |
Note that this approach only works for numerical keys. | |
Example usage: | |
sort = make_sort(rasp.tokens, rasp.tokens, 5, 1) | |
sort([2, 4, 3, 1]) | |
>> [1, 2, 3, 4] | |
sort([2, 4, 1, 2]) | |
>> [1, 2, 2, 4] | |
Args: | |
vals: Values to sort. | |
keys: Keys for sorting. | |
max_seq_len: Maximum sequence length (used to ensure keys are unique) | |
min_key: Minimum key value (used to ensure keys are unique) | |
Returns: | |
Output SOp of sort program. | |
""" | |
keys = rasp.SequenceMap(lambda x, i: x + min_key * i / max_seq_len, keys, | |
rasp.indices) | |
return make_sort_unique(vals, keys) | |
def make_sort_freq(max_seq_len: int) -> rasp.SOp: | |
"""Returns tokens sorted by the frequency they appear in the input. | |
Tokens the appear the same amount of times are output in the same order as in | |
the input. | |
Example usage: | |
sort = make_sort_freq(rasp.tokens, rasp.tokens, 5) | |
sort([2, 4, 2, 1]) | |
>> [2, 2, 4, 1] | |
Args: | |
max_seq_len: Maximum sequence length (used to ensure keys are unique) | |
""" | |
hist = -1 * make_hist().named("hist") | |
return make_sort( | |
rasp.tokens, hist, max_seq_len=max_seq_len, min_key=1).named("sort_freq") | |
### Programs that work under both causal and regular evaluation. | |
def make_frac_prevs(bools: rasp.SOp) -> rasp.SOp: | |
"""Count the fraction of previous tokens where a specific condition was True. | |
(As implemented in the RASP paper.) | |
Example usage: | |
num_l = make_frac_prevs(rasp.tokens=="l") | |
num_l("hello") | |
>> [0, 0, 1/3, 1/2, 2/5] | |
Args: | |
bools: SOp mapping a sequence to a sequence of booleans. | |
Returns: | |
frac_prevs: SOp mapping an input to a sequence, where every element | |
is the fraction of previous "True" tokens. | |
""" | |
bools = rasp.numerical(bools) | |
prevs = rasp.Select(rasp.indices, rasp.indices, rasp.Comparison.LEQ) | |
return rasp.numerical(rasp.Aggregate(prevs, bools, | |
default=0)).named("frac_prevs") | |
def shift_by(offset: int, /, sop: rasp.SOp) -> rasp.SOp: | |
"""Returns the sop, shifted by `offset`, None-padded.""" | |
select_off_by_offset = rasp.Select(rasp.indices, rasp.indices, | |
lambda k, q: q == k + offset) | |
out = rasp.Aggregate(select_off_by_offset, sop, default=None) | |
return out.named(f"shift_by({offset})") | |
def detect_pattern(sop: rasp.SOp, pattern: Sequence[rasp.Value]) -> rasp.SOp: | |
"""Returns an SOp which is True at the final element of the pattern. | |
The first len(pattern) - 1 elements of the output SOp are None-padded. | |
detect_pattern(tokens, "abc")("abcabc") == [None, None, T, F, F, T] | |
Args: | |
sop: the SOp in which to look for patterns. | |
pattern: a sequence of values to look for. | |
Returns: | |
a sop which detects the pattern. | |
""" | |
if len(pattern) < 1: | |
raise ValueError(f"Length of `pattern` must be at least 1. Got {pattern}") | |
# detectors[i] will be a boolean-valued SOp which is true at position j iff | |
# the i'th (from the end) element of the pattern was detected at position j-i. | |
detectors = [] | |
for i, element in enumerate(reversed(pattern)): | |
detector = sop == element | |
if i != 0: | |
detector = shift_by(i, detector) | |
detectors.append(detector) | |
# All that's left is to take the AND over all detectors. | |
pattern_detected = detectors.pop() | |
while detectors: | |
pattern_detected = pattern_detected & detectors.pop() | |
return pattern_detected.named(f"detect_pattern({pattern})") | |
def make_count_less_freq(n: int) -> rasp.SOp: | |
"""Returns how many tokens appear fewer than n times in the input. | |
The output sequence contains this count in each position. | |
Example usage: | |
count_less_freq = make_count_less_freq(2) | |
count_less_freq(["a", "a", "a", "b", "b", "c"]) | |
>> [3, 3, 3, 3, 3, 3] | |
count_less_freq(["a", "a", "c", "b", "b", "c"]) | |
>> [6, 6, 6, 6, 6, 6] | |
Args: | |
n: Integer to compare token frequences to. | |
""" | |
hist = make_hist().named("hist") | |
select_less = rasp.Select(hist, hist, | |
lambda x, y: x <= n).named("select_less") | |
return rasp.SelectorWidth(select_less).named("count_less_freq") | |
def make_count(sop, token): | |
"""Returns the count of `token` in `sop`. | |
The output sequence contains this count in each position. | |
Example usage: | |
count = make_count(tokens, "a") | |
count(["a", "a", "a", "b", "b", "c"]) | |
>> [3, 3, 3, 3, 3, 3] | |
count(["c", "a", "b", "c"]) | |
>> [1, 1, 1, 1] | |
Args: | |
sop: Sop to count tokens in. | |
token: Token to count. | |
""" | |
return rasp.SelectorWidth(rasp.Select( | |
sop, sop, lambda k, q: k == token)).named(f"count_{token}") | |
def make_nary_sequencemap(f, *sops): | |
"""Returns an SOp that simulates an n-ary SequenceMap. | |
Uses multiple binary SequenceMaps to convert n SOps x_1, x_2, ..., x_n | |
into a single SOp arguments that takes n-tuples as value. The n-ary sequence | |
map implementing f is then a Map on this resulting SOp. | |
Note that the intermediate variables representing tuples of varying length | |
will be encoded categorically, and can become very high-dimensional. So, | |
using this function might lead to very large compiled models. | |
Args: | |
f: Function with n arguments. | |
*sops: Sequence of SOps, one for each argument of f. | |
""" | |
values, *sops = sops | |
for sop in sops: | |
# x is a single entry in the first iteration but a tuple in later iterations | |
values = rasp.SequenceMap( | |
lambda x, y: (*x, y) if isinstance(x, tuple) else (x, y), values, sop) | |
return rasp.Map(lambda args: f(*args), values) | |