Text Generation
Transformers
Safetensors
llama
text-generation-inference
Inference Endpoints
mfromm's picture
Update gptx_tokenizer.py
ee1e3f9 verified
raw
history blame
16.1 kB
from __future__ import annotations
import json
import os
import warnings
from pathlib import Path
from typing import Any, Dict, List, Mapping, Optional, Tuple, Union
import sentencepiece as spm
from huggingface_hub import hf_hub_download, list_repo_files
from transformers.tokenization_utils import PreTrainedTokenizer
from transformers.tokenization_utils_base import TOKENIZER_CONFIG_FILE
# Define special tokens used in the tokenizer
EOD_TOKEN = "<eod>"
PAD_TOKEN = "<pad>"
BOS_TOKEN = "<s>"
EOS_TOKEN = "</s>"
UNK_TOKEN = "<unk>"
REPO_ID = "openGPT-X/Teuken-7B-chat-v0.4"
class HFGPTXTokenizer(PreTrainedTokenizer):
"""
A custom tokenizer class that extends Hugging Face's PreTrainedTokenizer.
It is specifically designed to work with SentencePiece models and integrates
with Hugging Face's tokenizer utilities.
"""
model_file_glob = "*tokenizer.json"
vocab_files_names = {"tokenizer_file": "tokenizer.json"}
decode_kwargs: List[str] = []
def _encode(self, text: str, return_tokens: bool = False, is_continuation: bool = False):
"""
Encode a given text using the tokenizer.
Args:
text (str): The text to encode.
return_tokens (bool): If True, returns token strings instead of token IDs.
is_continuation (bool): If True, uses a continuation tokenizer (if available).
Returns:
List[int] or List[str]: Encoded text as a list of token IDs or token strings.
"""
assert self.tok is not None, "No tokenizer is currently loaded"
# Variant with additional sp processor:
tokenizer = self.continuation_tokenizer if is_continuation else self.tok
if return_tokens:
return tokenizer.encode_as_pieces(text)
else:
return tokenizer.encode(text)
def create_list_of_special_tokens(self) -> List[str]:
"""
Create a list of special tokens, including the BOS, EOS, PAD, EOD tokens,
and 256 additional placeholder tokens.
Returns:
List[str]: List of special tokens.
"""
return [self.bos_token, self.eos_token, self.pad_token, self.eod_token] + [
f"<placeholder_tok_{i}>" for i in range(256)
]
def find_tokenizer_config(self, config_path: Path, repo_id: str = None) -> Path:
if repo_id is None:
raise ValueError("repo_id must be provided if config_path is not a local file")
try:
# List all files in the repo
repo_files = list_repo_files(repo_id)
# Find the tokenizer config file
tokenizer_files = [f for f in repo_files if f.endswith('tokenizer_config.json')]
if not tokenizer_files:
raise FileNotFoundError(f"No tokenizer_config.json file found in repository {repo_id}")
# Use the first tokenizer_config.json file found
tokenizer_config_file = tokenizer_files[0]
print(f"Found tokenizer config file: {tokenizer_config_file}")
# Download the file
tokenizer_config_file_or_name = hf_hub_download(repo_id=repo_id, filename=tokenizer_config_file)
print(f"Downloaded tokenizer config file to: {tokenizer_config_file_or_name}")
return tokenizer_config_file_or_name
except Exception as e:
raise OSError(f"Failed to download tokenizer model: {str(e)}")
def instantiate_from_file_or_name(self, model_file_or_name: str, repo_id: str = None):
"""
Load the tokenizer model from a file or download it from a repository.
Args:
model_file_or_name (str): Path to the model file or the model name.
repo_id (str, optional): Repository ID from which to download the model file.
Returns:
spm.SentencePieceProcessor: Loaded SentencePieceProcessor instance.
Raises:
ValueError: If repo_id is not provided when model_file_or_name is not a file.
OSError: If the model file cannot be loaded or downloaded.
"""
if not os.path.isfile(model_file_or_name):
if repo_id is None:
raise ValueError("repo_id must be provided if model_file_or_name is not a local file")
try:
# List all files in the repo
repo_files = list_repo_files(repo_id)
# Find the tokenizer model file
tokenizer_files = [f for f in repo_files if f.endswith('.model')]
if not tokenizer_files:
raise FileNotFoundError(f"No .model file found in repository {repo_id}")
# Use the first .model file found
model_file = tokenizer_files[0]
print(f"Found tokenizer model file: {model_file}")
# Download the file
model_file_or_name = hf_hub_download(repo_id=repo_id, filename=model_file)
print(f"Downloaded tokenizer model to: {model_file_or_name}")
except Exception as e:
raise OSError(f"Failed to download tokenizer model: {str(e)}")
try:
return spm.SentencePieceProcessor(model_file=model_file_or_name)
except Exception as e:
raise OSError(f"Failed to load tokenizer model: {str(e)}")
def __init__(
self,
model_path: Optional[str] = None,
config_path: Optional[str] = None,
**kwargs: Any,
) -> None:
"""
Initialize the tokenizer.
Args:
model_path (Optional[str]): Path to the tokenizer model file.
config_path (Optional[str]): Path to the tokenizer configuration file.
**kwargs: Additional keyword arguments passed to the superclass.
This method also ensures backward compatibility by setting
`clean_up_tokenization_spaces` to False by default.
"""
# Prevent cleanup of tokenization spaces to maintain backward compatibility
self.clean_up_tokenization_spaces = kwargs.setdefault("clean_up_tokenization_spaces", False)
self.vocab = None
cp_path = kwargs.get("name_or_path", ".")
if model_path is None:
model_path = str(Path(cp_path) / self.vocab_files_names["tokenizer_file"])
self.tok = self.instantiate_from_file_or_name(model_path, repo_id=REPO_ID)
super().__init__(**kwargs)
# Specify special tokens which we know the value of.
# EOD from `tok` is used as what is called EOS in HuggingFace.
# Since there is no corresponding mapping for EOS from `tok` in
# HuggingFace, it is treated as an additional special token.
# Same for all other special tokens.
self.eos_token = EOD_TOKEN
self.bos_token = BOS_TOKEN
self.pad_token = PAD_TOKEN
if not self.additional_special_tokens:
self.additional_special_tokens = [
token
for token in self.create_list_of_special_tokens()
# Filter out the special tokens we added manually.
if token
not in [
self.eos_token,
self.bos_token,
self.pad_token,
]
]
if config_path is None:
config_path = str(Path(cp_path) / TOKENIZER_CONFIG_FILE)
if os.path.isfile(config_path):
self.tokenizer_config = self.load_json(Path(config_path))
else: # Load from repo
self.tokenizer_config = self.load_json(Path(self.find_tokenizer_config(Path(config_path), repo_id=REPO_ID)))
@property
def vocab_size(self) -> int:
"""
Get the size of the tokenizer vocabulary.
Returns:
int: The size of the vocabulary.
"""
return self.tok.GetPieceSize()
def get_vocab(self) -> Dict[str, int]:
"""
Get the vocabulary as a dictionary mapping token strings to their IDs.
Returns:
Dict[str, int]: Vocabulary mapping.
"""
if self.vocab is None:
self.vocab = {self.tok.IdToPiece(i): i for i in range(self.vocab_size)}
return self.vocab
def _tokenize(self, text: str, **kwargs) -> List[int]:
"""
Tokenize the input text.
Args:
text (str): Text to tokenize.
**kwargs: Additional keyword arguments.
Returns:
List[int]: List of token IDs.
"""
return_tokens = kwargs.pop("return_tokens", True)
return self._encode(text, return_tokens=return_tokens, **kwargs)
def _convert_token_to_id(self, token: str) -> int:
"""
Convert a token string to its corresponding ID.
Args:
token (str): The token to convert.
Returns:
int: The token's ID.
Raises:
ValueError: If the token is unknown and cannot be encoded to a single ID.
"""
return self.tok.PieceToId(token)
def decode(
self,
token_ids: Union[List[int], List[List[int]]],
num_threads: Optional[int] = None,
) -> str:
"""
Decode a list of token IDs into a string.
Args:
token_ids (Union[List[int], List[List[int]]]): List of token IDs or lists of token IDs.
num_threads (Optional[int]): Number of threads to use for decoding.
Returns:
str: Decoded string.
"""
return self.tok.decode(input=token_ids, num_threads=num_threads)
def _convert_id_to_token(self, index: int) -> str:
"""
Convert a token ID to its corresponding token string.
Args:
index (int): Token ID.
Returns:
str: Corresponding token string.
"""
return self.tok.IdToPiece(index)
def convert_tokens_to_string(self, tokens: List[str]) -> str:
"""
Convert a list of tokens into a single string.
Args:
tokens (List[str]): List of token strings.
Returns:
str: Concatenated string of tokens.
"""
return self.tok.DecodePieces(tokens)
def _tok_decode(self, token_ids: List[int], **kwargs: Any) -> str:
"""
Internal method to decode token IDs with additional arguments.
Args:
token_ids (List[int]): List of token IDs.
**kwargs: Additional arguments to pass to the decode method.
Returns:
str: Decoded string.
This method also issues a warning if unsupported arguments are provided.
"""
passed_kwargs = {key: value for (key, value) in kwargs.items() if key in self.decode_kwargs}
if len(passed_kwargs) != len(kwargs):
warnings.warn("silently ignoring some arguments to `decode` due to missing " "support from the tokenizer.")
text = self.decode(token_ids, **passed_kwargs)
return text
def save_tokenizer(self, save_dir: str) -> None:
if not os.path.isdir(save_dir):
print(f"Vocabulary path ({save_dir}) should be a directory")
return
out_vocab_file = os.path.join(save_dir, "tokenizer.model")
# if os.path.abspath(self.vocab_file) != os.path.abspath(out_vocab_file) and os.path.isfile(self.vocab_file):
# copyfile(self.vocab_file, out_vocab_file)
# elif not os.path.isfile(self.vocab_file):
with open(out_vocab_file, "wb") as f:
content_spiece_model = self.tok.serialized_model_proto()
f.write(content_spiece_model)
return (out_vocab_file,)
def _decode(
self,
token_ids: List[int],
skip_special_tokens: bool = False,
clean_up_tokenization_spaces: bool = None,
spaces_between_special_tokens: bool = True,
**kwargs: Any,
) -> str:
text = self._tok_decode(
token_ids,
skip_special_tokens=skip_special_tokens,
spaces_between_special_tokens=spaces_between_special_tokens,
**kwargs,
)
clean_up_tokenization_spaces = (
clean_up_tokenization_spaces
if clean_up_tokenization_spaces is not None
else self.clean_up_tokenization_spaces
)
if clean_up_tokenization_spaces:
warnings.warn(
"when cleaning up tokenization spaces, this will not behave "
"like the original `GPTXTokenizer`., Please supply "
"`clean_up_tokenization_spaces=False` for decoding."
)
clean_text = self.clean_up_tokenization(text)
return clean_text
else:
return text
def save_vocabulary(
self,
save_directory: str,
filename_prefix: Optional[str] = None,
) -> Tuple[str]:
filename_prefix = filename_prefix + "-" if filename_prefix else ""
save_directory = Path(save_directory)
self._save_tokenizer_config(save_directory, filename_prefix)
tokenizer_file_path = self._save_tokenizer(save_directory, filename_prefix)
return (tokenizer_file_path,)
def _save_tokenizer_config(
self,
save_directory: Path,
filename_prefix: str,
) -> str:
self.save_tokenizer_config(save_directory)
old_tokenizer_config_path = save_directory / TOKENIZER_CONFIG_FILE
assert old_tokenizer_config_path.is_file(), "tokenizer config path changed"
new_tokenizer_config_path = save_directory / (filename_prefix + old_tokenizer_config_path.name)
old_tokenizer_config_path.replace(new_tokenizer_config_path)
return str(new_tokenizer_config_path)
def _find_tokenizer_files(self, save_directory: Path) -> List[Path]:
files = list(Path(save_directory).glob(self.model_file_glob))
return files
def _get_tokenizer_file(self, files: List[Path]):
assert files, "no saved tokenizer file found"
assert len(files) <= 1, "cannot handle multiple saved tokenizer files"
return files[0]
def _save_tokenizer(
self,
save_directory: Path,
filename_prefix: str,
) -> str:
self.save_tokenizer(str(save_directory))
tokenizer_files = self._find_tokenizer_files(save_directory)
old_tokenizer_file_path = self._get_tokenizer_file(tokenizer_files)
assert old_tokenizer_file_path.is_file(), "could not access saved tokenizer file"
new_tokenizer_file_path = save_directory / (filename_prefix + self.vocab_files_names["tokenizer_file"])
old_tokenizer_file_path.replace(new_tokenizer_file_path)
return str(new_tokenizer_file_path)
def save_tokenizer_config(self, save_dir: Path) -> None:
# convert Path to str
for k in self.tokenizer_config:
if isinstance(self.tokenizer_config[k], Path):
self.tokenizer_config[k] = str(self.tokenizer_config[k])
info_file = save_dir / "tokenizer_config.json"
with info_file.open("w") as f:
json.dump(self.tokenizer_config, f, indent=4)
def load_json(self, path: Path) -> dict:
with path.open("r") as f:
return json.load(f)
class SPTokenizer(HFGPTXTokenizer):
model_file_glob = "*tokenizer.model"
vocab_files_names = {"tokenizer_file": "tokenizer.model"}
decode_kwargs = ["num_threads"]
# `is_continuation` does not work without this, but it doesn't
# implement all APIs of `PreTrainedTokenizer`.
def encode(self, text: str, **kwargs) -> List[int]:
return_tokens = kwargs.pop('return_tokens', False)
is_continuation = kwargs.pop('is_continuation', False)
return self._encode(
text,
return_tokens=return_tokens,
is_continuation=is_continuation,
)