1. WHY YOU SHOULD CARE ABOUT TOKENIZERS
Imagine you are building a rocket. You spend months designing the engine,
perfecting the aerodynamics, and stress-testing the fuel tanks. Then, on
launch day, someone forgets to convert the fuel measurement from pounds to
kilograms. The rocket crashes. This is not a hypothetical story -- it
actually happened to NASA in 1999 with the Mars Climate Orbiter, and it cost
193 million dollars.
Tokenizers are the unit-conversion layer between human language and the
mathematical machinery of a large language model. Get them wrong, and
everything downstream is broken, no matter how sophisticated your neural
network is. Get them right, and you unlock the full expressive power of the
model.
Most tutorials on LLMs treat the tokenizer as a black box: you call
encode(), you get a list of integers, you move on. This tutorial refuses to
do that. We will open the black box, examine every gear and spring inside,
and then build our own from scratch. By the end, you will understand not
only how tokenizers work but why they are designed the way they are, what
trade-offs each design decision involves, and how to build one that is fast,
correct, and production-ready.
We will also connect our tokenizer to real LLM inference backends so that
you can use it immediately with actual models running on your hardware,
whether that hardware is an Apple Silicon Mac, an NVIDIA GPU, an AMD GPU,
an Intel accelerator, or a plain CPU.
No prior knowledge of tokenizers, NLP, or machine learning is required.
You do need to be comfortable with Python and have a basic understanding of
what a neural network is at a conceptual level.
2. THE FUNDAMENTAL PROBLEM: MACHINES DON'T READ WORDS
Neural networks, at their core, are functions that transform vectors of
floating-point numbers into other vectors of floating-point numbers. They
cannot operate on strings. They cannot operate on characters. They operate
exclusively on numbers.
So the very first question any language model must answer is: how do we
turn text into numbers?
The naive answer is: assign each character a number. 'a' becomes 1, 'b'
becomes 2, and so on. This works, but it has a profound problem. The number
2 is not "close to" the number 1 in any meaningful linguistic sense. The
model has no way to know that 'a' and 'b' are both letters, that they appear
in similar contexts, or that they share any structural relationship. Raw
integers carry no semantic information.
The solution that modern LLMs use is an embedding table. Instead of mapping
each token to a single integer, we map each token to a dense vector of
floating-point numbers (typically 512 to 8192 dimensions). These vectors are
learned during training, and they encode rich semantic and syntactic
information. Words that appear in similar contexts end up with similar
vectors. The word "king" minus the vector for "man" plus the vector for
"woman" famously produces a vector close to "queen."
But before we can look up a vector in the embedding table, we need an
integer index. And to get an integer index, we need to split the text into
discrete units and assign each unit a stable index. That splitting and
indexing process is exactly what a tokenizer does.
The tokenizer sits at the very entrance of the LLM pipeline:
Raw Text
|
v
[TOKENIZER] <-- This is what we are building
|
v
List of Integer IDs (e.g., [1045, 2293, 2653, 1012])
|
v
Embedding Lookup
|
v
Matrix of Float Vectors
|
v
Transformer Layers
|
v
Output Logits
|
v
[DETOKENIZER] <-- The reverse process
|
v
Generated Text
The tokenizer also runs in reverse during generation. When the model outputs
a probability distribution over its vocabulary and we sample a token ID from
that distribution, we need to convert that ID back into a string. This
reverse process is called decoding or detokenization.
A tokenizer is therefore a bidirectional mapping between strings and
sequences of integers. It must be fast, deterministic, lossless (you must
be able to perfectly reconstruct the original text from the token IDs), and
it must produce a vocabulary of manageable size.
3. A BRIEF HISTORY OF TEXT ENCODING
To appreciate why modern tokenizers are designed the way they are, it helps
to understand the history of encoding text as numbers.
In the early days of computing, ASCII (American Standard Code for Information
Interchange) was the dominant encoding. ASCII maps 128 characters -- the
26 English letters in upper and lower case, the digits 0-9, punctuation,
and some control characters -- to integers from 0 to 127. It is elegant and
simple, but it is catastrophically limited. It cannot represent accented
characters, Chinese, Arabic, Hebrew, emoji, or any of the thousands of
scripts used by human beings around the world.
Extended ASCII and various regional code pages attempted to address this by
using 256 values instead of 128, but this created a fragmented ecosystem
where a document encoded in one code page was gibberish in another.
Unicode was the solution. It defines a universal character set with over
1.1 million possible code points, covering virtually every writing system
on Earth plus emoji and many specialized symbols. UTF-8 is the most popular
encoding of Unicode. It is a variable-length encoding: ASCII characters
take one byte, common European characters take two bytes, most Asian
characters take three bytes, and supplementary characters (like many emoji)
take four bytes.
UTF-8 is brilliant because it is backward-compatible with ASCII and
space-efficient for English text, while still being universal. It is the
encoding used by virtually all modern software, including all major LLMs.
Understanding UTF-8 at the byte level is important for tokenizer design,
as we will see when we discuss byte-level BPE.
4. THE VOCABULARY: THE HEART OF EVERY TOKENIZER
The vocabulary is the complete set of tokens that the tokenizer knows about.
Each token is a string (which could be a character, a word fragment, a whole
word, a punctuation mark, a special symbol, or even a raw byte), and each
token is assigned a unique integer ID.
The vocabulary is fixed at training time. Once a model is trained with a
particular vocabulary, you cannot add new tokens without retraining (or at
least fine-tuning) the model, because the embedding table has exactly one
row per vocabulary entry, and the output projection layer has exactly one
column per vocabulary entry.
Vocabulary size is a critical hyperparameter. Consider the trade-offs:
A very small vocabulary (say, 256 entries for all possible bytes) means that
every piece of text can be encoded, but common words get split into many
tokens, making sequences very long. Longer sequences are more expensive to
process because the attention mechanism in Transformers has quadratic
complexity in sequence length.
A very large vocabulary (say, one entry per English word, which would be
hundreds of thousands of entries) means that common words are single tokens
and sequences are short, but the vocabulary cannot cover all words. Any word
not seen during training becomes an "unknown" token, losing all information.
Additionally, rare words appear so infrequently in training data that their
embeddings are poorly learned.
Modern LLMs use vocabularies in the range of 32,000 to 200,000 tokens.
GPT-2 used 50,257 tokens. GPT-4 uses approximately 100,277 tokens.
LLaMA 3 uses 128,256 tokens. These sizes represent a carefully tuned
compromise between sequence length efficiency and vocabulary coverage.
The vocabulary is typically stored as two data structures:
token_to_id: a dictionary mapping each token string to its integer ID.
id_to_token: a list (or dictionary) mapping each integer ID back to its
token string.
These two structures are inverses of each other and together define the
complete vocabulary.
5. TYPES OF TOKENIZERS: A GUIDED TOUR
There are several fundamentally different approaches to tokenization. Each
has its own philosophy, strengths, and weaknesses. Understanding all of them
will help you appreciate why the dominant approach (subword tokenization)
won out.
5.1 CHARACTER-LEVEL TOKENIZERS
The simplest possible tokenizer treats each character as a token. The
vocabulary is the set of all distinct characters in the training corpus,
which is typically a few hundred to a few thousand entries.
Advantages of character-level tokenization include the fact that the
vocabulary is tiny and completely covers any input text (there are no
unknown tokens). The model can, in principle, learn to spell any word.
The disadvantages are severe. A typical English word is 4-5 characters long,
so a sentence of 20 words becomes a sequence of 80-100 tokens. The
Transformer's attention mechanism must then model dependencies across all
100 positions, which is expensive and difficult. The model must learn
everything about language from scratch at the character level, which requires
enormous amounts of training data and compute.
Character-level models were popular in early neural language model research
but are rarely used for large-scale LLMs today.
5.2 WORD-LEVEL TOKENIZERS
At the opposite extreme, a word-level tokenizer splits text on whitespace
and punctuation, treating each word as a single token. The vocabulary
consists of the most frequent words in the training corpus, with a special
[UNK] token for any word not in the vocabulary.
This approach produces short sequences (one token per word) and captures
whole-word semantics directly. However, it has two fatal flaws.
The first flaw is the out-of-vocabulary (OOV) problem. Any word not seen
during training -- including misspellings, technical jargon, names, and
newly coined words -- maps to [UNK], losing all information. A model that
sees [UNK] cannot distinguish between "Schwarzenegger" and "supercalifra-
gilisticexpialidocious."
The second flaw is morphological blindness. The words "run," "runs,"
"running," "runner," and "ran" are all related, but a word-level tokenizer
treats them as completely independent tokens with separate embeddings. The
model must learn their relationships from co-occurrence patterns alone,
which requires much more data than if the shared root "run" were explicit.
Word-level tokenizers were the standard in NLP before 2018 but have been
almost entirely replaced by subword approaches.
5.3 SUBWORD TOKENIZERS
Subword tokenization is the approach used by virtually all modern LLMs. The
key insight is that words can be decomposed into meaningful sub-units. The
word "tokenization" can be split into "token" and "ization." The word
"unhappiness" can be split into "un," "happy," and "ness." These sub-units
appear in many different words, so the model can learn their meanings
efficiently and generalize to new words by composing known sub-units.
Subword tokenizers learn their vocabulary from the training corpus using a
statistical algorithm. The most important subword algorithms are:
Byte-Pair Encoding (BPE) was originally a data compression algorithm,
adapted for NLP by Sennrich et al. in 2016. It starts with a vocabulary of
individual characters (or bytes) and iteratively merges the most frequent
adjacent pair of tokens into a new token. GPT-2, GPT-3, GPT-4, and LLaMA
all use variants of BPE.
WordPiece is used by BERT and its derivatives. It is similar to BPE but
uses a different merge criterion: instead of merging the most frequent pair,
it merges the pair that maximizes the likelihood of the training data under
a unigram language model. WordPiece tokens for non-initial subwords are
prefixed with "##" to indicate that they are continuations.
Unigram Language Model tokenization, used by SentencePiece (and thus by
many multilingual models), takes the opposite approach. It starts with a
large vocabulary and iteratively removes tokens that contribute least to
the training corpus likelihood, until the vocabulary reaches the desired
size.
We will focus on BPE in this tutorial because it is the most widely used
algorithm and the one that powers the most influential LLMs.
5.4 BYTE-LEVEL TOKENIZERS
A key limitation of character-level BPE is that the initial vocabulary must
cover all characters in the training corpus. For a multilingual model, this
could mean thousands of characters across dozens of scripts, making the
initial vocabulary large before any merges happen.
Byte-level BPE solves this elegantly. Instead of starting with characters,
it starts with the 256 possible byte values (0-255). Every possible string
of text, in any language, in any encoding, can be represented as a sequence
of bytes. So the initial vocabulary is always exactly 256 entries, and the
tokenizer is guaranteed to handle any input without unknown tokens.
GPT-2 introduced byte-level BPE, and it has been used by GPT-3, GPT-4,
LLaMA, Mistral, and most other major LLMs since then. It is the approach
we will implement in this tutorial.
The one subtlety is that raw bytes are not printable. To make the vocabulary
human-readable and to avoid issues with whitespace and control characters,
GPT-2 introduced a mapping from each of the 256 bytes to a printable Unicode
character. We will implement this mapping in detail in Section 8.
5.5 SENTENCEPIECE
SentencePiece, developed by Google, is a tokenization library that
implements both BPE and Unigram Language Model tokenization. Its key
distinguishing feature is that it treats the input text as a raw sequence
of Unicode characters, including spaces, without any pre-tokenization step.
This makes it language-agnostic and particularly well-suited for languages
like Japanese and Chinese that do not use spaces between words.
SentencePiece encodes spaces as a special character (U+2581, a lower one
eighth block: ▁) that is prepended to each word. This allows the tokenizer
to distinguish between "run" at the start of a word and "run" in the middle
of a word (as in "outrun").
Models like T5, LLaMA (version 1 and 2), and many multilingual models use
SentencePiece. LLaMA 3 switched to a tiktoken-style byte-level BPE
vocabulary, which is what we implement here.
6. WHAT MAKES AN EXCELLENT TOKENIZER
Now that we understand the landscape, let us define what we are aiming for.
An excellent tokenizer for a modern LLM has the following properties.
Complete coverage means that the tokenizer can encode any possible input
without producing unknown tokens. Byte-level BPE achieves this by
guaranteeing that any byte sequence can be represented.
Lossless encoding means that the original text can be perfectly reconstructed
from the token IDs. This is non-negotiable: if your tokenizer loses
information, the model cannot generate correct text.
Efficiency means that the tokenizer produces short token sequences for
typical text. Shorter sequences reduce computational cost during inference
and training. A good tokenizer for English text should produce roughly
3-4 characters per token on average.
Consistency means that the same text always produces the same token IDs,
and that the tokenizer handles edge cases (empty strings, very long strings,
unusual Unicode, emoji, mixed scripts) correctly and deterministically.
Speed means that the tokenizer can process text quickly. During training,
the tokenizer may need to process hundreds of gigabytes of text. During
inference, it runs on every user input and every generated token. A slow
tokenizer is a bottleneck.
Correct handling of whitespace is subtle but critical. The tokenizer must
preserve information about spaces, newlines, and tabs in a way that allows
perfect reconstruction. It must also handle the difference between a word
at the start of a sentence (preceded by nothing or a newline) and a word
in the middle of a sentence (preceded by a space).
Support for special tokens is essential. Modern LLMs use special tokens to
delimit conversations, mark the beginning and end of text, separate system
prompts from user messages, and more. The tokenizer must handle these tokens
specially, ensuring they are never split and always map to their designated IDs.
Portability means that the tokenizer can be saved to disk and loaded back
perfectly, producing identical results. It should use a standard format
(such as JSON) that can be read by multiple implementations in multiple
programming languages.
7. NORMALIZATION AND PRE-TOKENIZATION
Before the core tokenization algorithm runs, two preprocessing steps are
typically applied: normalization and pre-tokenization. Understanding these
steps is essential because they significantly affect the quality and
consistency of the tokenizer.
NORMALIZATION
Normalization transforms the raw input text into a canonical form. Common
normalization operations include Unicode normalization (NFC, NFD, NFKC, or
NFKD), lowercasing, stripping accents, and replacing unusual whitespace
characters with standard spaces.
Unicode normalization deserves special attention. Unicode allows some
characters to be represented in multiple ways. For example, the character
"e with acute accent" (e) can be represented as a single code point U+00E9
or as two code points: U+0065 (e) followed by U+0301 (combining acute
accent). NFC (Canonical Decomposition, followed by Canonical Composition)
normalizes to the composed form (single code point). NFD decomposes to
separate base character plus combining marks. NFKC additionally applies
compatibility decompositions, converting characters like the ligature "fi"
into "fi" and the fraction "1/2" into "1/2".
For LLMs, NFC normalization is the most common choice. It ensures that
visually identical text always produces the same token sequence.
GPT-2 style tokenizers (and most modern LLMs) actually perform minimal
normalization, preferring to let the byte-level encoding handle all
Unicode. This is a valid approach because byte-level BPE is already
completely invariant to encoding issues.
PRE-TOKENIZATION
Pre-tokenization splits the text into coarse chunks before the subword
algorithm runs. This is important because we generally do not want the BPE
algorithm to merge tokens across word boundaries. For example, we do not
want "the" and "cat" to be merged into "thecat" just because they happen
to appear adjacent frequently.
The GPT-2 pre-tokenizer uses a regular expression that splits text at
word boundaries while preserving spaces. The famous GPT-2 regex pattern is:
(?i:'s|'t|'re|'ve|'m|'ll|'d)|[^\r\n\p{L}\p{N}]?\p{L}+|
\p{N}{1,3}|\s?[^\s\p{L}\p{N}]+[\r\n]*|\s*[\r\n]+|\s+(?!\S)|\s+
This pattern, while complex, captures several important behaviors. It
handles English contractions ('s, 't, 're, 've, 'm, 'll, 'd) as single
units. It captures sequences of letters (optionally preceded by a space).
It captures sequences of up to 3 digits. It captures sequences of
non-letter, non-digit characters (punctuation, symbols). It handles
whitespace and newlines carefully.
The result is that the text is split into a list of "pre-tokens," each of
which is then independently processed by the BPE algorithm. Crucially,
BPE merges can only happen within a pre-token, not across pre-token
boundaries.
Let us see this in action with a small example. Given the input text
"Hello, world! I'm learning tokenization.", the GPT-2 pre-tokenizer
produces something like:
["Hello", ",", " world", "!", " I", "'m", " learning", " tokenization", "."]
Notice that the space before "world" is attached to "world," not to the
comma. This is a deliberate design choice: in GPT-2 style tokenizers,
spaces are attached to the following word. This means that the token for
" world" (with a leading space) is different from the token for "world"
(without a leading space), which allows the model to learn the distinction
between a word at the start of a sentence and a word in the middle.
The following code demonstrates a simplified pre-tokenizer using Python's
regex module, which supports Unicode character classes (required for \p{L}
and \p{N}):
```python
import regex # pip install regex
# The GPT-2 / tiktoken pre-tokenization pattern.
# This pattern is used by GPT-2, GPT-3, GPT-4, and LLaMA 3.
GPT2_SPLIT_PATTERN = (
r"(?i:'s|'t|'re|'ve|'m|'ll|'d)"
r"|[^\r\n\p{L}\p{N}]?\p{L}+"
r"|\p{N}{1,3}"
r"|\s?[^\s\p{L}\p{N}]+[\r\n]*"
r"|\s*[\r\n]+"
r"|\s+(?!\S)"
r"|\s+"
)
def pre_tokenize(text: str) -> list[str]:
"""
Split text into pre-tokens using the GPT-2 regex pattern.
Each pre-token will be independently processed by BPE.
Spaces are attached to the following word, not the preceding one.
"""
return regex.findall(GPT2_SPLIT_PATTERN, text)
# Running example: encoding a simple sentence.
sample_text = "Hello, world! I'm learning tokenization."
pre_tokens = pre_tokenize(sample_text)
print("Pre-tokens:", pre_tokens)
# Output: ['Hello', ',', ' world', '!', " I", "'m",
# ' learning', ' tokenization', '.']
The regex library (note: not the standard re module) is required here
because the standard re module does not support Unicode property escapes
like \p{L} (any Unicode letter) and \p{N} (any Unicode digit). The
regex module is a drop-in replacement for re that adds these and many
other features. You can install it with pip install regex.
The pre-tokenization step is deterministic and fast. It runs in O(n) time
where n is the length of the input text, and it produces a list of strings
that are then fed into the BPE encoding algorithm.
8. BYTE-PAIR ENCODING: THE ALGORITHM THAT POWERS GPT
We now arrive at the core of our tokenizer: the Byte-Pair Encoding algorithm.
BPE has two phases: training (learning the merge rules from a corpus) and
encoding (applying those rules to new text). We will cover both in detail.
8.1 THE CORE IDEA
BPE is fundamentally a compression algorithm. Its insight is that if two
symbols appear adjacent to each other very frequently in the data, we can
treat them as a single symbol and represent them more compactly.
Imagine you have the text "aaabdaaabac". The pair "aa" appears three times.
If we replace every occurrence of "aa" with a new symbol "Z", we get
"ZabdZabac". We have reduced the length from 11 to 9. We can then look for
the next most frequent pair in the new text and merge that too.
Applied to tokenization, we start with a vocabulary of individual bytes
(or characters) and iteratively merge the most frequent adjacent pair of
tokens. Each merge creates a new token. We continue until we have performed
a predetermined number of merges, which determines the final vocabulary size.
The merge rules are ordered. The order in which merges were learned during
training is the order in which they must be applied during encoding. This
is critical: applying merges in the wrong order produces different (and
incorrect) tokenizations.
8.2 TRAINING BPE STEP BY STEP
Let us trace through the BPE training algorithm on a tiny toy corpus to
build intuition before looking at the code.
Suppose our corpus (after pre-tokenization) contains these words with their
frequencies:
"low" : 5 times
"lower" : 2 times
"newest" : 6 times
"widest" : 3 times
Step 1: Initialize. Represent each word as a sequence of characters (or
bytes), with a special end-of-word marker. For clarity, we use a space to
separate characters:
l o w : 5
l o w e r : 2
n e w e s t : 6
w i d e s t : 3
Initial vocabulary: {l, o, w, e, r, n, s, t, i, d}
Step 2: Count all adjacent pairs across all words (weighted by frequency):
(l, o): 5 + 2 = 7
(o, w): 5 + 2 = 7
(w, e): 2 + 6 = 8 <-- most frequent!
(e, r): 2
(n, e): 6
(e, w): 6
(e, s): 6 + 3 = 9 <-- wait, let me recount
...
Actually let us be precise. The pairs in each word are:
"l o w" (freq 5): pairs (l,o), (o,w)
"l o w e r" (freq 2): pairs (l,o), (o,w), (w,e), (e,r)
"n e w e s t" (freq 6): pairs (n,e), (e,w), (w,e), (e,s), (s,t)
"w i d e s t" (freq 3): pairs (w,i), (i,d), (d,e), (e,s), (s,t)
Pair frequencies:
(l,o): 5+2 = 7
(o,w): 5+2 = 7
(w,e): 2+6 = 8
(e,r): 2
(n,e): 6
(e,w): 6
(e,s): 6+3 = 9 <-- most frequent!
(s,t): 6+3 = 9 <-- tied!
(w,i): 3
(i,d): 3
(d,e): 3
The most frequent pair is (e,s) and (s,t), both with frequency 9. We pick
one (say, (e,s)) and merge it into a new token "es":
l o w : 5
l o w e r : 2
n e w es t : 6
w i d es t : 3
New vocabulary: {l, o, w, e, r, n, s, t, i, d, es}
Merge rule 1: (e, s) -> es
Step 3: Recount pairs and find the next most frequent:
(s,t) no longer exists as a pair (s was merged with e).
(es,t): 6+3 = 9 <-- most frequent!
Merge rule 2: (es, t) -> est
l o w : 5
l o w e r : 2
n e w est : 6
w i d est : 3
And so on. We continue until we have performed the desired number of merges.
In practice, the training corpus contains billions of words, and we perform
tens of thousands of merges. The algorithm is the same, just at much larger
scale.
Now let us look at the actual training code. This is the core of our
running example:
from collections import defaultdict
from typing import Iterator
def build_byte_vocab() -> dict[int, str]:
"""
Build the initial byte-level vocabulary.
GPT-2 maps each of the 256 possible byte values to a printable Unicode
character. Bytes that are already printable ASCII non-whitespace characters
map to themselves. The remaining bytes map to Unicode characters starting
at U+0100 (Latin Extended-A block), chosen to avoid control characters
and whitespace.
This mapping ensures that every token in the vocabulary is a printable
string, which makes the vocabulary human-readable and avoids issues with
null bytes, control characters, and whitespace in token strings.
Returns a dict mapping byte value (0-255) to its string representation.
"""
# Bytes that are already "nice" printable ASCII characters.
# These are: printable ASCII (33-126) plus a few extras (161-172, 174-255).
bs = (
list(range(ord('!'), ord('~') + 1)) # ! through ~ (33-126)
+ list(range(ord('\xa1'), ord('\xac') + 1)) # 161-172
+ list(range(ord('\xae'), ord('\xff') + 1)) # 174-255
)
cs = bs[:] # These bytes map to themselves (as Unicode code points).
# The remaining bytes (0-32, 127-160, 173) need to be mapped to
# printable characters. We use code points starting at 256.
n = 0
for b in range(256):
if b not in bs:
bs.append(b)
cs.append(256 + n)
n += 1
# Build the mapping: byte value -> single Unicode character string.
return {b: chr(c) for b, c in zip(bs, cs)}
def get_stats(
vocab: dict[tuple[str, ...], int]
) -> dict[tuple[str, str], int]:
"""
Count the frequency of every adjacent pair of tokens across all words
in the vocabulary. Each word is represented as a tuple of token strings,
and has an associated frequency count.
This is the inner loop of BPE training and must be efficient.
We use a defaultdict to accumulate counts.
"""
pairs = defaultdict(int)
for word, freq in vocab.items():
# Iterate over adjacent pairs in the token sequence for this word.
for i in range(len(word) - 1):
pairs[(word[i], word[i + 1])] += freq
return pairs
def merge_vocab(
pair: tuple[str, str],
vocab: dict[tuple[str, ...], int],
) -> dict[tuple[str, ...], int]:
"""
Apply a single BPE merge to the entire vocabulary.
For every word in the vocabulary, replace every occurrence of the
adjacent pair `pair` with the merged token (the concatenation of the
two strings in the pair). Return the updated vocabulary.
This function creates a new vocabulary dict rather than modifying
the input in place, which makes it easier to reason about correctness.
"""
new_vocab: dict[tuple[str, ...], int] = {}
merged_token = pair[0] + pair[1]
for word, freq in vocab.items():
new_word: list[str] = []
i = 0
while i < len(word):
# Check if the current position starts with the target pair.
if i < len(word) - 1 and word[i] == pair[0] and word[i + 1] == pair[1]:
new_word.append(merged_token)
i += 2 # Skip both tokens in the pair.
else:
new_word.append(word[i])
i += 1
new_vocab[tuple(new_word)] = freq
return new_vocab
The three functions above form the building blocks of BPE training. The
build_byte_vocab function implements the GPT-2 byte-to-character mapping,
which is a crucial detail that many tutorials gloss over. The get_stats
function counts all adjacent pairs in the current vocabulary state. The
merge_vocab function applies a single merge rule to the entire vocabulary.
Now we can write the main training loop that ties these together:
def train_bpe(
corpus_iterator: Iterator[str],
vocab_size: int,
min_frequency: int = 2,
verbose: bool = False,
) -> tuple[dict[str, int], list[tuple[str, str]]]:
"""
Train a byte-level BPE tokenizer on the given corpus.
Parameters
----------
corpus_iterator : Iterator[str]
An iterator that yields text strings (e.g., lines from a file,
or documents from a dataset). The corpus can be arbitrarily large
because we process it in chunks.
vocab_size : int
The desired final vocabulary size, including the 256 base byte tokens
and any special tokens. Must be greater than 256.
min_frequency : int
Minimum frequency for a pair to be merged. Pairs that appear fewer
than this many times are not merged. Default is 2.
verbose : bool
If True, print progress information during training.
Returns
-------
token_to_id : dict[str, int]
The final vocabulary mapping token strings to integer IDs.
merges : list[tuple[str, str]]
The ordered list of merge rules. The order is critical: merges must
be applied in this order during encoding.
"""
import regex
# The GPT-2 pre-tokenization pattern.
split_pattern = regex.compile(
r"(?i:'s|'t|'re|'ve|'m|'ll|'d)"
r"|[^\r\n\p{L}\p{N}]?\p{L}+"
r"|\p{N}{1,3}"
r"|\s?[^\s\p{L}\p{N}]+[\r\n]*"
r"|\s*[\r\n]+"
r"|\s+(?!\S)"
r"|\s+"
)
# Build the byte-to-character mapping.
byte_to_char = build_byte_vocab()
# Step 1: Build the initial word frequency table from the corpus.
# Each word is represented as a tuple of single-character byte tokens.
word_freqs: dict[tuple[str, ...], int] = defaultdict(int)
for text in corpus_iterator:
# Pre-tokenize the text into word-level chunks.
pre_tokens = split_pattern.findall(text)
for pre_token in pre_tokens:
# Encode the pre-token as UTF-8 bytes, then map each byte
# to its printable character representation.
byte_seq = pre_token.encode("utf-8")
char_seq = tuple(byte_to_char[b] for b in byte_seq)
word_freqs[char_seq] += 1
if verbose:
print(f"Corpus processed. Unique pre-token types: {len(word_freqs)}")
# Step 2: Initialize the vocabulary with the 256 base byte tokens.
# We assign IDs 0-255 to the byte tokens in the order defined by
# build_byte_vocab (which matches the GPT-2 ordering).
char_to_byte = {v: k for k, v in byte_to_char.items()}
# Sort by byte value to get a consistent ordering.
initial_tokens = sorted(byte_to_char.values(), key=lambda c: char_to_byte[c])
token_to_id: dict[str, int] = {tok: i for i, tok in enumerate(initial_tokens)}
# The number of merges we need to perform.
num_merges = vocab_size - len(token_to_id)
if num_merges <= 0:
raise ValueError(
f"vocab_size ({vocab_size}) must be greater than the number of "
f"base tokens ({len(token_to_id)})."
)
merges: list[tuple[str, str]] = []
vocab = dict(word_freqs) # Working copy of the word frequency table.
# Step 3: Iteratively find and apply the most frequent merge.
for merge_idx in range(num_merges):
# Count all adjacent pairs in the current vocabulary state.
pairs = get_stats(vocab)
if not pairs:
if verbose:
print(f"No more pairs to merge after {merge_idx} merges.")
break
# Find the most frequent pair. In case of a tie, we use lexicographic
# ordering of the pair as a tiebreaker to ensure determinism.
best_pair = max(pairs, key=lambda p: (pairs[p], p))
best_freq = pairs[best_pair]
if best_freq < min_frequency:
if verbose:
print(
f"Stopping: best pair frequency {best_freq} "
f"< min_frequency {min_frequency}"
)
break
# Apply the merge to the vocabulary.
vocab = merge_vocab(best_pair, vocab)
# Record the merge rule and add the new token to the vocabulary.
merges.append(best_pair)
new_token = best_pair[0] + best_pair[1]
token_to_id[new_token] = len(token_to_id)
if verbose and merge_idx % 100 == 0:
print(
f"Merge {merge_idx + 1}/{num_merges}: "
f"'{best_pair[0]}' + '{best_pair[1]}' -> '{new_token}' "
f"(freq={best_freq})"
)
return token_to_id, merges
The training loop is the heart of BPE. Notice several important design
decisions embedded in this code.
First, we process the corpus lazily using an iterator. This means we can
train on corpora that are far too large to fit in memory, as long as we
can iterate over them line by line or document by document.
Second, we use a tiebreaker when selecting the best pair: when two pairs
have the same frequency, we pick the lexicographically smaller one. This
ensures that training is deterministic -- running the same training loop
twice on the same corpus always produces the same merge rules.
Third, we track both the token_to_id dictionary (which maps token strings
to IDs) and the merges list (which records the ordered merge rules). Both
are needed: token_to_id is used for fast lookup during encoding, and
merges is used to reconstruct the tokenizer from disk.
8.3 ENCODING WITH A TRAINED BPE VOCABULARY
Once we have trained the BPE tokenizer (or loaded a pre-trained one), we
need to encode new text. The encoding algorithm works as follows:
First, pre-tokenize the input text using the same regex pattern used during
training. This splits the text into pre-tokens (word-level chunks).
Second, for each pre-token, convert it to a sequence of byte tokens using
the byte-to-character mapping.
Third, apply the BPE merge rules to each pre-token's byte sequence. The
rules must be applied in the exact order they were learned during training.
The naive approach to applying merge rules is to iterate through all merge
rules for each pre-token, which is O(num_merges * len(pre_token)) per
pre-token. This is too slow for production use.
The efficient approach, used by OpenAI's tiktoken library, is to maintain
a priority queue (heap) of all possible merges in the current token sequence,
ordered by merge rank (the index of the merge in the merge list). We
repeatedly apply the highest-priority (lowest-rank) merge until no more
merges are possible.
Let us implement this efficient encoding algorithm:
import heapq
def encode_pre_token(
pre_token_bytes: bytes,
byte_to_char: dict[int, str],
merge_ranks: dict[tuple[str, str], int],
) -> list[int]:
"""
Encode a single pre-token (given as raw bytes) into a list of token IDs
using the BPE merge rules.
This function implements the efficient heap-based BPE encoding algorithm.
Rather than iterating over all merge rules for each token sequence, we
maintain a min-heap of (rank, position) pairs, where rank is the merge
rank of the pair at that position, and position is the index in the
current token list.
Parameters
----------
pre_token_bytes : bytes
The raw UTF-8 bytes of the pre-token to encode.
byte_to_char : dict[int, str]
Mapping from byte values to their printable character representations.
merge_ranks : dict[tuple[str, str], int]
Mapping from (left_token, right_token) pairs to their merge rank
(the index in the ordered merge list, starting from 0).
Lower rank means higher priority (was learned earlier).
Returns
-------
list[int]
The list of token IDs for this pre-token.
"""
# Convert bytes to initial token sequence (one token per byte).
tokens: list[str] = [byte_to_char[b] for b in pre_token_bytes]
if len(tokens) == 1:
# Single-byte pre-token: no merges possible.
return tokens
# We represent the token sequence as a doubly-linked list to allow
# O(1) merges. We use a list of [token, next_idx] pairs where next_idx
# is the index of the next active token (-1 for the last token).
# This avoids O(n) list splicing on every merge.
#
# For simplicity and correctness in this implementation, we use a
# different approach: a list with None markers for deleted positions.
# This is O(n) per merge in the worst case but correct and readable.
# The production implementation in the Addendum uses a more efficient
# approach.
while True:
best_rank = None
best_i = -1
# Find the adjacent pair with the lowest merge rank.
for i in range(len(tokens) - 1):
pair = (tokens[i], tokens[i + 1])
rank = merge_ranks.get(pair)
if rank is not None:
if best_rank is None or rank < best_rank:
best_rank = rank
best_i = i
# If no mergeable pair was found, we are done.
if best_i == -1:
break
# Apply the merge at position best_i.
merged = tokens[best_i] + tokens[best_i + 1]
tokens = tokens[:best_i] + [merged] + tokens[best_i + 2:]
return tokens
The encoding function above is correct but not maximally efficient for very
long sequences. The inner loop is O(n) where n is the current number of
tokens, and we may perform up to n-1 merges, giving O(n^2) overall. For
typical pre-tokens (which are at most a few hundred bytes), this is
perfectly acceptable. The full production implementation in the Addendum
uses a more efficient approach for very long sequences.
Now let us write the full encoding function that handles an entire string:
def encode(
text: str,
token_to_id: dict[str, int],
merges: list[tuple[str, str]],
special_tokens: dict[str, int],
split_pattern: "regex.Pattern",
byte_to_char: dict[int, str],
) -> list[int]:
"""
Encode a text string into a list of token IDs.
Special tokens are handled first: the text is split at special token
boundaries, and special tokens are mapped directly to their IDs without
going through the BPE algorithm. The remaining text chunks are then
pre-tokenized and BPE-encoded.
Parameters
----------
text : str
The input text to encode.
token_to_id : dict[str, int]
The vocabulary mapping token strings to integer IDs.
merges : list[tuple[str, str]]
The ordered list of BPE merge rules.
special_tokens : dict[str, int]
Mapping from special token strings to their IDs.
Special tokens are not split by the BPE algorithm.
split_pattern : regex.Pattern
The compiled pre-tokenization regex pattern.
byte_to_char : dict[int, str]
Mapping from byte values to their character representations.
Returns
-------
list[int]
The list of token IDs for the input text.
"""
import re
# Build the merge rank lookup: pair -> rank (index in merges list).
merge_ranks: dict[tuple[str, str], int] = {
pair: rank for rank, pair in enumerate(merges)
}
ids: list[int] = []
# Handle special tokens by splitting the text at special token boundaries.
# We process text chunks between special tokens with BPE, and map special
# tokens directly to their IDs.
if special_tokens:
# Build a regex that matches any special token.
# Sort by length descending to match longer tokens first.
sorted_specials = sorted(special_tokens.keys(), key=len, reverse=True)
special_pattern = re.compile(
"(" + "|".join(re.escape(s) for s in sorted_specials) + ")"
)
chunks = special_pattern.split(text)
else:
chunks = [text]
for chunk in chunks:
if not chunk:
continue
if chunk in special_tokens:
# This chunk is a special token; map it directly.
ids.append(special_tokens[chunk])
else:
# Pre-tokenize and BPE-encode this chunk.
pre_tokens = split_pattern.findall(chunk)
for pre_token in pre_tokens:
pre_token_bytes = pre_token.encode("utf-8")
token_strings = encode_pre_token(
pre_token_bytes, byte_to_char, merge_ranks
)
for tok_str in token_strings:
ids.append(token_to_id[tok_str])
return ids
Let us trace through our running example to make sure we understand what
is happening. Suppose we have a trained tokenizer and we want to encode
the text "Hello, world!". The pre-tokenizer splits this into:
["Hello", ",", " world", "!"]
For the pre-token "Hello", we first convert to UTF-8 bytes:
H -> 72, e -> 101, l -> 108, l -> 108, o -> 111
Then we map each byte to its printable character (in this case, all these
bytes are in the printable ASCII range, so they map to themselves):
["H", "e", "l", "l", "o"]
Then we apply BPE merges. If the tokenizer has learned (among others):
merge 50: ("H", "e") -> "He"
merge 120: ("He", "l") -> "Hel"
merge 340: ("Hel", "l") -> "Hell"
merge 890: ("Hell", "o") -> "Hello"
Then the sequence evolves as:
["H", "e", "l", "l", "o"]
-> ["He", "l", "l", "o"] (apply merge 50)
-> ["Hel", "l", "o"] (apply merge 120)
-> ["Hell", "o"] (apply merge 340)
-> ["Hello"] (apply merge 890)
And "Hello" maps to some token ID, say 15496. The encoding of the full
text produces a list of such IDs.
8.4 DECODING
Decoding is the reverse process: given a list of token IDs, reconstruct
the original text. This is simpler than encoding.
For each token ID, look up the token string in the id-to-token mapping.
Concatenate all token strings. The result is a string of printable Unicode
characters. But remember: these characters are not the original text -- they
are the byte-level representation. We need to convert them back to bytes
using the inverse of the byte-to-character mapping, and then decode the
bytes as UTF-8.
def decode(
ids: list[int],
id_to_token: list[str],
char_to_byte: dict[str, int],
special_token_ids: set[int],
) -> str:
"""
Decode a list of token IDs back into the original text string.
This function handles both regular BPE tokens (which are decoded via
the byte-level mapping) and special tokens (which are decoded directly
to their string representation).
Parameters
----------
ids : list[int]
The list of token IDs to decode.
id_to_token : list[str]
The vocabulary as a list, where id_to_token[i] is the token string
for token ID i.
char_to_byte : dict[str, int]
The inverse of byte_to_char: maps printable characters back to bytes.
This is used to convert token strings back to raw bytes.
special_token_ids : set[int]
The set of token IDs that correspond to special tokens. Special tokens
are decoded directly to their string representation, not through the
byte mapping.
Returns
-------
str
The decoded text string.
"""
byte_buffer: list[int] = []
result_parts: list[str] = []
for token_id in ids:
token_str = id_to_token[token_id]
if token_id in special_token_ids:
# Flush any accumulated bytes before the special token.
if byte_buffer:
result_parts.append(
bytes(byte_buffer).decode("utf-8", errors="replace")
)
byte_buffer = []
# Append the special token string directly.
result_parts.append(token_str)
else:
# Convert each character in the token string back to a byte.
for char in token_str:
byte_buffer.append(char_to_byte[char])
# Flush any remaining bytes.
if byte_buffer:
result_parts.append(
bytes(byte_buffer).decode("utf-8", errors="replace")
)
return "".join(result_parts)
The use of errors="replace" in the UTF-8 decoding is important. In theory,
a correctly trained and used tokenizer should never produce invalid UTF-8
sequences. In practice, edge cases can arise (for example, if the model
generates a sequence of tokens that, when concatenated at the byte level,
form an incomplete UTF-8 sequence). Using errors="replace" ensures that
decoding never raises an exception, replacing invalid bytes with the Unicode
replacement character (U+FFFD).
Note that we accumulate bytes in a buffer and only decode when we encounter
a special token or reach the end of the sequence. This is necessary because
a single UTF-8 character may be split across multiple tokens. For example,
the emoji character U+1F600 (grinning face, 0xF0 0x9F 0x98 0x80 in UTF-8)
might be tokenized as four separate byte tokens. We must accumulate all four
bytes before attempting to decode them as UTF-8.
9. SPECIAL TOKENS AND CHAT TEMPLATES
Modern LLMs use special tokens to structure their inputs. These tokens serve
as delimiters, markers, and control signals that the model has been trained
to recognize and respond to. Understanding special tokens is essential for
using LLMs correctly.
The most universal special token is the beginning-of-sequence token, often
written as <|bos|>, , or <|begin_of_text|>. It is prepended to every
input to signal the start of a new sequence. Similarly, the end-of-sequence
token (<|eos|>, , <|end_of_text|>) signals the end of a sequence and
causes the model to stop generating.
Chat models use additional special tokens to structure conversations. The
LLaMA 3 chat format uses tokens like <|start_header_id|>, <|end_header_id|>,
and <|eot_id|> to delimit message headers and the end of turns. The GPT-4
tokenizer uses <|im_start|> and <|im_end|> (where "im" stands for
"instruction message").
Here is what a typical LLaMA 3 chat-formatted input looks like:
<|begin_of_text|><|start_header_id|>system<|end_header_id|>
You are a helpful assistant.<|eot_id|><|start_header_id|>user<|end_header_id|>
What is tokenization?<|eot_id|><|start_header_id|>assistant<|end_header_id|>
The tokenizer must handle these special tokens correctly. They must never be
split by the BPE algorithm -- "<|begin_of_text|>" must always map to a single
token ID, not be split into "<", "|", "begin", "_", "of", "_", "text", "|", ">".
The following code demonstrates how to apply a chat template to a list of
messages:
from typing import Literal
MessageRole = Literal["system", "user", "assistant"]
def apply_llama3_chat_template(
messages: list[dict[str, str]],
add_generation_prompt: bool = True,
) -> str:
"""
Apply the LLaMA 3 chat template to a list of messages.
Each message is a dict with keys "role" (one of "system", "user",
"assistant") and "content" (the message text).
The LLaMA 3 template format is:
<|begin_of_text|>
<|start_header_id|>{role}<|end_header_id|>
{content}<|eot_id|>
... (repeated for each message)
<|start_header_id|>assistant<|end_header_id|> (if add_generation_prompt)
Parameters
----------
messages : list[dict[str, str]]
The conversation history as a list of message dicts.
add_generation_prompt : bool
If True, append the assistant header to prompt the model to generate
a response. Set to False when encoding a complete conversation for
training.
Returns
-------
str
The formatted text ready to be passed to the tokenizer's encode().
"""
result = "<|begin_of_text|>"
for message in messages:
role = message["role"]
content = message["content"]
result += f"<|start_header_id|>{role}<|end_header_id|>\n\n"
result += content
result += "<|eot_id|>"
if add_generation_prompt:
result += "<|start_header_id|>assistant<|end_header_id|>\n\n"
return result
def apply_chatml_template(
messages: list[dict[str, str]],
add_generation_prompt: bool = True,
) -> str:
"""
Apply the ChatML template, used by many models including Mistral and Qwen.
The ChatML format is:
<|im_start|>{role}
{content}<|im_end|>
... (repeated for each message)
<|im_start|>assistant (if add_generation_prompt)
Parameters
----------
messages : list[dict[str, str]]
The conversation history.
add_generation_prompt : bool
If True, append the assistant prompt.
Returns
-------
str
The formatted text.
"""
result = ""
for message in messages:
role = message["role"]
content = message["content"]
result += f"<|im_start|>{role}\n{content}<|im_end|>\n"
if add_generation_prompt:
result += "<|im_start|>assistant\n"
return result
The chat template functions above produce the correctly formatted text that
you then pass to the tokenizer's encode() function. The special tokens
in the template (<|begin_of_text|>, <|start_header_id|>, etc.) are handled
by the special token logic in the encoder, ensuring they are mapped to their
designated IDs without being split.
10. SAVING, LOADING, AND PORTABILITY
A tokenizer is only useful if it can be saved to disk and loaded back
identically. The standard format for modern tokenizers is JSON, which is
human-readable, language-agnostic, and widely supported.
The tokenizer file should contain:
The vocabulary (token_to_id mapping).
The ordered list of merge rules.
The special tokens and their IDs.
Metadata (tokenizer type, version, model name, etc.).
This is compatible with the HuggingFace tokenizers library format, which
is the de facto standard for sharing tokenizers.
import json
import os
from pathlib import Path
def save_tokenizer(
token_to_id: dict[str, int],
merges: list[tuple[str, str]],
special_tokens: dict[str, int],
save_directory: str | Path,
tokenizer_name: str = "bpe_tokenizer",
) -> None:
"""
Save a trained BPE tokenizer to a directory in a portable JSON format.
The tokenizer is saved as two files:
- tokenizer.json: The main tokenizer file containing vocabulary,
merges, and special tokens.
- tokenizer_config.json: Metadata about the tokenizer.
This format is compatible with the HuggingFace tokenizers library.
Parameters
----------
token_to_id : dict[str, int]
The vocabulary mapping token strings to integer IDs.
merges : list[tuple[str, str]]
The ordered list of BPE merge rules.
special_tokens : dict[str, int]
Mapping from special token strings to their IDs.
save_directory : str or Path
The directory to save the tokenizer files in.
Will be created if it does not exist.
tokenizer_name : str
A name for the tokenizer, used in the config file.
"""
save_dir = Path(save_directory)
save_dir.mkdir(parents=True, exist_ok=True)
# Build the tokenizer.json structure.
tokenizer_data = {
"version": "1.0",
"type": "BPE",
"model": {
"type": "BPE",
"vocab": token_to_id,
# Merges are stored as "token1 token2" strings, one per line.
"merges": [f"{a} {b}" for a, b in merges],
},
"special_tokens": {
token: {"id": token_id, "content": token}
for token, token_id in special_tokens.items()
},
"added_tokens": [
{
"id": token_id,
"content": token,
"single_word": False,
"lstrip": False,
"rstrip": False,
"normalized": False,
"special": True,
}
for token, token_id in sorted(
special_tokens.items(), key=lambda x: x[1]
)
],
}
tokenizer_path = save_dir / "tokenizer.json"
with open(tokenizer_path, "w", encoding="utf-8") as f:
json.dump(tokenizer_data, f, ensure_ascii=False, indent=2)
# Build the tokenizer_config.json.
config_data = {
"tokenizer_class": "BPETokenizer",
"model_max_length": 131072,
"tokenizer_name": tokenizer_name,
"vocab_size": len(token_to_id),
"num_merges": len(merges),
"bos_token": next(
(t for t in special_tokens if "bos" in t.lower() or "begin" in t.lower()),
None,
),
"eos_token": next(
(t for t in special_tokens if "eos" in t.lower() or "end" in t.lower()),
None,
),
"unk_token": next(
(t for t in special_tokens if "unk" in t.lower()), None
),
"pad_token": next(
(t for t in special_tokens if "pad" in t.lower()), None
),
}
config_path = save_dir / "tokenizer_config.json"
with open(config_path, "w", encoding="utf-8") as f:
json.dump(config_data, f, ensure_ascii=False, indent=2)
print(f"Tokenizer saved to {save_dir}")
print(f" Vocabulary size: {len(token_to_id)}")
print(f" Number of merges: {len(merges)}")
print(f" Special tokens: {list(special_tokens.keys())}")
def load_tokenizer(
load_directory: str | Path,
) -> tuple[dict[str, int], list[tuple[str, str]], dict[str, int]]:
"""
Load a BPE tokenizer from a directory.
Parameters
----------
load_directory : str or Path
The directory containing the tokenizer files.
Returns
-------
token_to_id : dict[str, int]
The vocabulary mapping token strings to integer IDs.
merges : list[tuple[str, str]]
The ordered list of BPE merge rules.
special_tokens : dict[str, int]
Mapping from special token strings to their IDs.
"""
load_dir = Path(load_directory)
tokenizer_path = load_dir / "tokenizer.json"
if not tokenizer_path.exists():
raise FileNotFoundError(
f"tokenizer.json not found in {load_dir}. "
"Make sure you are pointing to a directory saved by save_tokenizer()."
)
with open(tokenizer_path, "r", encoding="utf-8") as f:
data = json.load(f)
token_to_id: dict[str, int] = data["model"]["vocab"]
# Parse merges from "token1 token2" format back to tuples.
merges: list[tuple[str, str]] = []
for merge_str in data["model"]["merges"]:
parts = merge_str.split(" ", 1)
if len(parts) == 2:
merges.append((parts[0], parts[1]))
special_tokens: dict[str, int] = {
token: info["id"]
for token, info in data.get("special_tokens", {}).items()
}
return token_to_id, merges, special_tokens
The save and load functions use a JSON format that is both human-readable
and compatible with the HuggingFace ecosystem. This means you can save a
tokenizer trained with our code and load it with the HuggingFace tokenizers
library, or vice versa.
One important subtlety in the merge serialization: we store merges as
"token1 token2" strings (with a space separator). When loading, we split on
the first space only (using split(" ", 1)) to handle the case where one
of the tokens itself contains a space. In byte-level BPE, token strings
never contain spaces (because spaces are encoded as the byte 0x20, which
maps to a different printable character), so this is not an issue in
practice, but it is good defensive programming.
11. INTEGRATING YOUR TOKENIZER WITH LLM INFERENCE BACKENDS
A tokenizer is only useful in the context of a model. In this section, we
show how to connect our tokenizer to real LLM inference backends, with
automatic hardware detection to use the best available accelerator.
The inference backends we support are:
Apple MLX is Apple's machine learning framework for Apple Silicon (M1, M2,
M3, M4 chips). It uses the unified memory architecture of Apple Silicon to
run models efficiently on both the CPU and GPU without copying data between
them. MLX is the best choice for Mac users.
NVIDIA CUDA via llama-cpp-python is the most common setup for users with
NVIDIA GPUs. llama.cpp is a highly optimized C++ inference engine that
supports GGUF model files. The Python bindings (llama-cpp-python) make it
easy to use from Python.
AMD ROCm is AMD's GPU computing platform, analogous to NVIDIA CUDA. It is
supported by llama.cpp (via HIP) and by PyTorch.
Intel OpenVINO is Intel's inference optimization toolkit, which can
accelerate models on Intel CPUs, integrated GPUs, and discrete GPUs.
CPU fallback via llama.cpp works on any platform and is the fallback when
no GPU is available.
11.1 DETECTING AVAILABLE HARDWARE
The first step is to detect what hardware is available and choose the best
backend:
import platform
import subprocess
import sys
from enum import Enum, auto
class InferenceBackend(Enum):
"""Supported inference backends, in order of preference."""
APPLE_MLX = auto()
NVIDIA_CUDA = auto()
AMD_ROCM = auto()
INTEL_OPENVINO = auto()
CPU_LLAMA_CPP = auto()
def detect_best_backend() -> InferenceBackend:
"""
Detect the best available inference backend for the current hardware.
Detection order:
1. Apple MLX (Apple Silicon Macs)
2. NVIDIA CUDA (NVIDIA GPUs)
3. AMD ROCm (AMD GPUs)
4. Intel OpenVINO (Intel hardware)
5. CPU via llama.cpp (fallback)
Returns
-------
InferenceBackend
The best available backend.
"""
# Check for Apple Silicon (M-series chips).
if platform.system() == "Darwin" and platform.machine() == "arm64":
try:
import mlx.core as mx
# Verify that MLX can actually use the GPU.
_ = mx.array([1.0])
print("Detected: Apple Silicon with MLX support.")
return InferenceBackend.APPLE_MLX
except ImportError:
print("Apple Silicon detected but MLX not installed. "
"Install with: pip install mlx mlx-lm")
# Check for NVIDIA CUDA.
try:
import torch
if torch.cuda.is_available():
device_name = torch.cuda.get_device_name(0)
print(f"Detected: NVIDIA CUDA GPU: {device_name}")
return InferenceBackend.NVIDIA_CUDA
except ImportError:
pass
# Check for AMD ROCm (also appears as CUDA in PyTorch with ROCm build).
try:
import torch
if hasattr(torch, 'version') and 'rocm' in str(torch.version.hip or ''):
print("Detected: AMD ROCm GPU.")
return InferenceBackend.AMD_ROCM
# Alternative check: look for ROCm in the CUDA device name.
if torch.cuda.is_available():
device_name = torch.cuda.get_device_name(0)
if "AMD" in device_name or "Radeon" in device_name:
print(f"Detected: AMD GPU via ROCm: {device_name}")
return InferenceBackend.AMD_ROCM
except ImportError:
pass
# Check for Intel OpenVINO.
try:
from openvino.runtime import Core
core = Core()
available_devices = core.available_devices
if "GPU" in available_devices:
print(f"Detected: Intel OpenVINO with GPU support. "
f"Devices: {available_devices}")
return InferenceBackend.INTEL_OPENVINO
except ImportError:
pass
# Fallback: CPU via llama.cpp.
print("No GPU detected. Falling back to CPU inference via llama.cpp.")
return InferenceBackend.CPU_LLAMA_CPP
The hardware detection function tries each backend in order of preference
and returns the first one that is available and functional. The function
is defensive: it catches ImportError for each optional dependency so that
it works even if some backends are not installed.
11.2 APPLE MLX
For Apple Silicon users, MLX provides excellent performance. The mlx-lm
library provides a high-level interface for running LLMs with MLX:
def run_with_mlx(
prompt: str,
model_path: str,
tokenizer: "BPETokenizer",
max_new_tokens: int = 512,
temperature: float = 0.7,
top_p: float = 0.9,
) -> str:
"""
Run inference using Apple MLX.
This function uses the mlx-lm library, which provides optimized LLM
inference for Apple Silicon. The model must be in MLX format (either
downloaded directly or converted from safetensors/GGUF).
Parameters
----------
prompt : str
The formatted prompt text (after applying a chat template).
model_path : str
Path to the MLX model directory (containing config.json,
model.safetensors or model.npz, and tokenizer files).
tokenizer : BPETokenizer
Our custom tokenizer instance. Note: mlx-lm has its own tokenizer
loading, but we demonstrate integration with our custom tokenizer
for the encoding step.
max_new_tokens : int
Maximum number of tokens to generate.
temperature : float
Sampling temperature. Higher values produce more random outputs.
top_p : float
Top-p (nucleus) sampling parameter.
Returns
-------
str
The generated text (not including the prompt).
"""
try:
from mlx_lm import load, generate
from mlx_lm.utils import generate_step
import mlx.core as mx
except ImportError:
raise ImportError(
"mlx-lm is not installed. Install with: pip install mlx-lm"
)
# Load the model and its built-in tokenizer using mlx-lm.
# mlx-lm handles the model loading, weight conversion, and GPU placement.
model, mlx_tokenizer = load(model_path)
# Use mlx-lm's generate function for inference.
# This handles the autoregressive generation loop efficiently on MLX.
response = generate(
model,
mlx_tokenizer,
prompt=prompt,
max_tokens=max_new_tokens,
temp=temperature,
top_p=top_p,
verbose=False,
)
return response
11.3 NVIDIA CUDA VIA LLAMA-CPP-PYTHON
For NVIDIA GPU users, llama-cpp-python with CUDA support is a highly
efficient option. GGUF models (quantized models in the GGUF format) can
be run with very low memory usage while maintaining good quality:
def run_with_llama_cpp(
prompt: str,
model_path: str,
tokenizer: "BPETokenizer",
max_new_tokens: int = 512,
temperature: float = 0.7,
top_p: float = 0.9,
n_gpu_layers: int = -1,
n_ctx: int = 4096,
) -> str:
"""
Run inference using llama-cpp-python.
This backend works for NVIDIA CUDA, AMD ROCm (via HIP), and CPU.
The model must be in GGUF format.
Parameters
----------
prompt : str
The formatted prompt text.
model_path : str
Path to the GGUF model file.
tokenizer : BPETokenizer
Our custom tokenizer (used for encoding the prompt to count tokens).
max_new_tokens : int
Maximum number of tokens to generate.
temperature : float
Sampling temperature.
top_p : float
Top-p sampling parameter.
n_gpu_layers : int
Number of model layers to offload to GPU. Use -1 to offload all
layers (recommended for NVIDIA/AMD GPUs with sufficient VRAM).
Use 0 for CPU-only inference.
n_ctx : int
Context window size (maximum sequence length).
Returns
-------
str
The generated text.
"""
try:
from llama_cpp import Llama
except ImportError:
raise ImportError(
"llama-cpp-python is not installed.\n"
"For NVIDIA CUDA: CMAKE_ARGS='-DGGML_CUDA=on' pip install llama-cpp-python\n"
"For AMD ROCm: CMAKE_ARGS='-DGGML_HIPBLAS=on' pip install llama-cpp-python\n"
"For CPU only: pip install llama-cpp-python"
)
# Initialize the Llama model.
# n_gpu_layers=-1 means offload all layers to GPU.
# verbose=False suppresses llama.cpp's internal logging.
llm = Llama(
model_path=model_path,
n_gpu_layers=n_gpu_layers,
n_ctx=n_ctx,
verbose=False,
)
# Run inference using llama.cpp's built-in generation.
# We use the raw completion API to have full control over the prompt.
output = llm(
prompt,
max_tokens=max_new_tokens,
temperature=temperature,
top_p=top_p,
echo=False, # Do not include the prompt in the output.
stop=["<|eot_id|>", "<|im_end|>", "</s>"], # Common stop tokens.
)
return output["choices"][0]["text"]
11.4 INTEL OPENVINO
For Intel hardware (including Intel Arc GPUs and Intel integrated graphics),
OpenVINO provides optimized inference:
def run_with_openvino(
prompt: str,
model_path: str,
tokenizer: "BPETokenizer",
max_new_tokens: int = 512,
temperature: float = 0.7,
device: str = "GPU",
) -> str:
"""
Run inference using Intel OpenVINO.
The model must be in OpenVINO IR format (XML + BIN files) or in a format
that can be converted by the optimum-intel library.
Parameters
----------
prompt : str
The formatted prompt text.
model_path : str
Path to the OpenVINO model directory or IR files.
tokenizer : BPETokenizer
Our custom tokenizer for encoding the prompt.
max_new_tokens : int
Maximum number of tokens to generate.
temperature : float
Sampling temperature (applied via our tokenizer's sampling logic).
device : str
The OpenVINO device to use: "GPU", "CPU", "AUTO", or "NPU".
Returns
-------
str
The generated text.
"""
try:
from optimum.intel import OVModelForCausalLM
from transformers import AutoTokenizer as HFTokenizer
except ImportError:
raise ImportError(
"optimum-intel is not installed. "
"Install with: pip install optimum[openvino] optimum-intel"
)
# Load the OpenVINO model using optimum-intel.
# This handles the OpenVINO IR loading and device placement.
ov_model = OVModelForCausalLM.from_pretrained(
model_path,
device=device,
ov_config={"PERFORMANCE_HINT": "LATENCY"},
)
# Use the HuggingFace tokenizer that comes with the model for
# encoding/decoding, as it is pre-configured for the specific model.
hf_tokenizer = HFTokenizer.from_pretrained(model_path)
# Encode the prompt.
inputs = hf_tokenizer(prompt, return_tensors="pt")
# Generate.
outputs = ov_model.generate(
**inputs,
max_new_tokens=max_new_tokens,
do_sample=temperature > 0,
temperature=temperature if temperature > 0 else 1.0,
pad_token_id=hf_tokenizer.eos_token_id,
)
# Decode only the newly generated tokens (not the prompt).
new_token_ids = outputs[0][inputs["input_ids"].shape[1]:]
return hf_tokenizer.decode(new_token_ids, skip_special_tokens=True)
12. PERFORMANCE, BENCHMARKING, AND PITFALLS
Building a correct tokenizer is one thing. Building a fast one is another.
In this section, we discuss performance considerations and common pitfalls.
PERFORMANCE CONSIDERATIONS
The most computationally expensive part of BPE encoding is the inner loop
that finds the best merge to apply. For a pre-token of length n, the naive
implementation is O(n^2) in the worst case (n merges, each requiring a scan
of the current token list). For typical English words (5-15 characters),
this is negligible. For very long sequences (code, URLs, base64-encoded
data), it can become a bottleneck.
The tiktoken library (OpenAI's tokenizer) uses a highly optimized C++
implementation with a priority queue that achieves O(n log n) encoding.
For Python implementations, the main optimization is to minimize Python
object creation and use efficient data structures.
Parallelism is another important optimization. During training, the corpus
processing and pair counting can be parallelized across multiple CPU cores.
During inference, tokenization is typically fast enough that parallelism
is not needed, but for batch processing of many documents, multiprocessing
can provide significant speedups.
Caching is a powerful optimization for the encoding step. If the same
pre-token appears many times (which is common for frequent words), we can
cache the result of encoding it and avoid recomputing the BPE merges. A
simple LRU cache on the encode_pre_token function can dramatically speed
up tokenization of repetitive text.
The following snippet shows how to add caching to the encoding function:
from functools import lru_cache
def make_cached_encoder(
byte_to_char: dict[int, str],
merge_ranks: dict[tuple[str, str], int],
max_cache_size: int = 65536,
):
"""
Create a cached version of the pre-token encoder.
The cache stores the encoded token strings for each unique pre-token
byte sequence. This avoids recomputing BPE merges for frequently
occurring pre-tokens (like common words).
Parameters
----------
byte_to_char : dict[int, str]
Byte-to-character mapping.
merge_ranks : dict[tuple[str, str], int]
Merge rank lookup.
max_cache_size : int
Maximum number of entries in the LRU cache.
Returns
-------
callable
A cached encoding function that takes bytes and returns a tuple
of token strings.
"""
@lru_cache(maxsize=max_cache_size)
def cached_encode(pre_token_bytes: bytes) -> tuple[str, ...]:
"""
Encode a pre-token (as bytes) to a tuple of token strings.
The result is cached by the pre-token bytes.
"""
return tuple(
encode_pre_token(pre_token_bytes, byte_to_char, merge_ranks)
)
return cached_encode
COMMON PITFALLS
The most common pitfall is applying merge rules in the wrong order. The
order of merges is fundamental to BPE: the same set of merge rules applied
in a different order produces different tokenizations. Always store and
apply merges in the exact order they were learned.
Another pitfall is incorrect handling of the byte-to-character mapping.
The GPT-2 mapping is specific and must be implemented exactly. Using a
different mapping (for example, mapping bytes directly to their hex
representation) will produce a different vocabulary and incompatible
tokenizations.
A subtle pitfall is the handling of text that contains special token strings
as literal text. For example, if a user sends the message "Please output
<|end_of_text|> when you are done," the literal string "<|end_of_text|>"
should be treated as regular text, not as the special end-of-text token.
This is a security concern: a malicious user could inject special tokens
to manipulate the model's behavior. The solution is to escape or strip
special tokens from user input before encoding.
Unicode normalization inconsistencies can cause subtle bugs. If the training
corpus was normalized with NFC but the inference input is not normalized,
the same visual text may produce different token IDs. Always apply the same
normalization at inference time as was applied during training.
Off-by-one errors in context length handling are common. The model has a
maximum context length (e.g., 4096 or 8192 tokens). If you encode a prompt
that is longer than this limit, the model will either truncate it (losing
information) or crash. Always check the encoded length before sending to
the model.
RUNNING EXAMPLE
#!/usr/bin/env python3
"""
bpe_tokenizer.py
================
A production-ready Byte-Pair Encoding (BPE) tokenizer for Large Language Models.
This module implements a complete BPE tokenizer compatible with GPT-2, GPT-3,
GPT-4, LLaMA 3, Mistral, Qwen, and other models that use byte-level BPE
tokenization.
Features
--------
- Byte-level BPE encoding and decoding (compatible with GPT-2/tiktoken format).
- Training from a text corpus (file path, iterable of strings, or raw string).
- Loading pre-trained tokenizers from local files or HuggingFace Hub.
- Special token handling (BOS, EOS, PAD, UNK, and arbitrary custom tokens).
- Chat template application (LLaMA 3, ChatML, Alpaca, and custom templates).
- Batch encoding and decoding with truncation and padding.
- Efficient encoding with a heap-based O(n log n) BPE algorithm and LRU caching.
- Cached compiled regex patterns for high-throughput encoding.
- Unicode NFC normalisation applied consistently at train and encode time.
- Multi-backend inference integration:
* Apple MLX (Apple Silicon M-series) with model caching
* NVIDIA CUDA via llama-cpp-python
* AMD ROCm via llama-cpp-python (HIP)
* Intel OpenVINO (CPU, iGPU, Arc GPU, NPU)
* HuggingFace Transformers (universal CPU/GPU fallback)
- Saving and loading in HuggingFace-compatible JSON format.
- Streaming tokenization for large texts and LLM output streams.
- __call__ interface for HuggingFace-style usage.
- Comprehensive input validation and error handling.
- Full type annotations throughout.
Requirements
------------
Python >= 3.9
regex >= 2023.0.0 (pip install regex)
Optional (for inference backends):
mlx >= 0.12.0 (pip install mlx)
mlx-lm >= 0.12.0 (pip install mlx-lm)
llama-cpp-python >= 0.2.0
CPU only : pip install llama-cpp-python
NVIDIA : CMAKE_ARGS="-DGGML_CUDA=on" pip install llama-cpp-python
AMD ROCm : CMAKE_ARGS="-DGGML_HIPBLAS=on" pip install llama-cpp-python
torch >= 2.0.0 (pip install torch)
optimum[openvino] (pip install "optimum[openvino]")
optimum-intel (pip install optimum-intel)
transformers >= 4.35.0 (pip install transformers)
huggingface_hub >= 0.20 (pip install huggingface_hub)
Installation (minimal)
-----------------------
pip install regex
Installation (full, all backends)
----------------------------------
pip install regex huggingface_hub transformers torch mlx mlx-lm \\
"optimum[openvino]" optimum-intel
# For NVIDIA GPU support in llama-cpp-python:
CMAKE_ARGS="-DGGML_CUDA=on" pip install llama-cpp-python
# For AMD ROCm support in llama-cpp-python:
CMAKE_ARGS="-DGGML_HIPBLAS=on" pip install llama-cpp-python
Usage
-----
# Train a new tokenizer:
tokenizer = BPETokenizer()
tokenizer.train(corpus_iterator, vocab_size=32000)
tokenizer.save("./my_tokenizer")
# Load a pre-trained tokenizer from a local directory:
tokenizer = BPETokenizer.from_pretrained("./my_tokenizer")
# Load from HuggingFace Hub:
tokenizer = BPETokenizer.from_huggingface("meta-llama/Meta-Llama-3-8B")
# Encode text:
ids = tokenizer.encode("Hello, world!")
# Decode token IDs:
text = tokenizer.decode(ids)
# HuggingFace-style __call__:
result = tokenizer("Hello, world!", padding=True, truncation=True, max_length=64)
# Apply a chat template and run inference:
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is tokenization?"},
]
response = tokenizer.chat(
messages,
model_path="/path/to/model.gguf",
max_new_tokens=512,
)
# Command-line interface:
python bpe_tokenizer.py train --corpus corpus.txt --vocab-size 32000 --output ./tok
python bpe_tokenizer.py encode --tokenizer ./tok --text "Hello, world!"
python bpe_tokenizer.py decode --tokenizer ./tok --ids 9906 11 1917 0
python bpe_tokenizer.py chat --tokenizer ./tok --model model.gguf
python bpe_tokenizer.py info --tokenizer ./tok
"""
from __future__ import annotations
import argparse
import heapq
import json
import os
import platform
import re
import sys
import time
import unicodedata
from collections import defaultdict
from enum import Enum, auto
from pathlib import Path
from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Literal,
Optional,
Set,
Tuple,
Union,
)
# ---------------------------------------------------------------------------
# The `regex` library is required for Unicode property escapes (\p{L}, \p{N}).
# The standard `re` module does not support these, making it unsuitable for
# the GPT-2 pre-tokenisation pattern.
# ---------------------------------------------------------------------------
try:
import regex
except ImportError as _regex_import_error:
raise ImportError(
"The 'regex' library is required. Install it with:\n"
" pip install regex"
) from _regex_import_error
# ===========================================================================
# CONSTANTS
# ===========================================================================
# The GPT-2 / LLaMA 3 / Mistral pre-tokenisation regex pattern.
# Handles English contractions, letter sequences (optionally space-prefixed),
# digit runs, punctuation/symbol runs, and whitespace carefully.
GPT2_SPLIT_PATTERN: str = (
r"(?i:'s|'t|'re|'ve|'m|'ll|'d)"
r"|[^\r\n\p{L}\p{N}]?\p{L}+"
r"|\p{N}{1,3}"
r"|\s?[^\s\p{L}\p{N}]+[\r\n]*"
r"|\s*[\r\n]+"
r"|\s+(?!\S)"
r"|\s+"
)
# The tiktoken cl100k_base pattern used by GPT-4 and related models.
# Adds Unicode-aware apostrophe variants alongside ASCII ones.
CL100K_SPLIT_PATTERN: str = (
r"(?i:'s|'t|'re|'ve|'m|'ll|'d|\u2019s|\u2019t|\u2019re"
r"|\u2019ve|\u2019m|\u2019ll|\u2019d)"
r"|[^\r\n\p{L}\p{N}]?\p{L}+"
r"|\p{N}{1,3}"
r"|\s?[^\s\p{L}\p{N}]+[\r\n]*"
r"|\s*[\r\n]+"
r"|\s+(?!\S)"
r"|\s+"
)
# Named pattern registry for CLI and config use.
SPLIT_PATTERNS: Dict[str, str] = {
"gpt2": GPT2_SPLIT_PATTERN,
"cl100k": CL100K_SPLIT_PATTERN,
}
# Default special-token sets for common model families.
# IDs are placed above the standard BPE range so they never collide with
# learned merge tokens.
LLAMA3_SPECIAL_TOKENS: Dict[str, int] = {
"<|begin_of_text|>": 128000,
"<|end_of_text|>": 128001,
"<|reserved_special_token_0|>": 128002,
"<|reserved_special_token_1|>": 128003,
"<|finetune_right_pad_id|>": 128004,
"<|reserved_special_token_2|>": 128005,
"<|start_header_id|>": 128006,
"<|end_header_id|>": 128007,
"<|eom_id|>": 128008,
"<|eot_id|>": 128009,
"<|python_tag|>": 128010,
}
CHATML_SPECIAL_TOKENS: Dict[str, int] = {
"<|im_start|>": 32001,
"<|im_end|>": 32002,
}
# Maximum entries kept in the per-instance encoding cache.
_ENCODE_CACHE_MAX_SIZE: int = 65_536
# ===========================================================================
# INFERENCE BACKEND DETECTION
# ===========================================================================
class InferenceBackend(Enum):
"""Supported inference backends, listed in preferred order."""
APPLE_MLX = auto()
NVIDIA_CUDA = auto()
AMD_ROCM = auto()
INTEL_OPENVINO = auto()
CPU_LLAMA_CPP = auto()
HUGGINGFACE_TRANSFORMERS = auto()
def detect_best_backend(verbose: bool = True) -> InferenceBackend:
"""
Detect the best available inference backend for the current hardware.
Checks available hardware and installed libraries in order of performance
preference and returns the first fully functional backend found.
Detection order
---------------
1. Apple MLX -- Apple Silicon (M1/M2/M3/M4) with mlx-lm installed.
2. NVIDIA CUDA -- NVIDIA GPU detected via PyTorch with CUDA build.
3. AMD ROCm -- AMD GPU detected via PyTorch ROCm build or environment.
4. Intel OpenVINO -- Intel GPU/NPU/CPU via openvino runtime.
5. CPU llama.cpp -- llama-cpp-python installed (any platform).
6. HuggingFace Transformers -- last resort, works everywhere.
Parameters
----------
verbose : bool
If True, print detection progress to stdout.
Returns
-------
InferenceBackend
The best available backend for this machine.
"""
def _log(msg: str) -> None:
if verbose:
print(f"[Backend] {msg}")
# ------------------------------------------------------------------
# 1. Apple MLX (Apple Silicon only)
# ------------------------------------------------------------------
if platform.system() == "Darwin" and platform.machine() == "arm64":
try:
import mlx.core as mx # type: ignore[import]
import mlx_lm # type: ignore[import] # noqa: F401
# Smoke-test: create and evaluate a tiny array to confirm GPU works.
_t = mx.array([1.0, 2.0])
mx.eval(_t)
_log("Apple Silicon detected. MLX available. Using Apple MLX backend.")
return InferenceBackend.APPLE_MLX
except ImportError:
_log(
"Apple Silicon detected but mlx / mlx-lm not installed. "
"Install: pip install mlx mlx-lm"
)
except Exception as _e:
_log(f"Apple Silicon detected but MLX initialisation failed: {_e}")
# ------------------------------------------------------------------
# 2. NVIDIA CUDA / 3. AMD ROCm (both surface via torch.cuda)
# ------------------------------------------------------------------
try:
import torch # type: ignore[import]
if torch.cuda.is_available():
device_name: str = torch.cuda.get_device_name(0)
device_count: int = torch.cuda.device_count()
# ROCm builds of PyTorch expose torch.version.hip.
hip_version: Optional[str] = getattr(torch.version, "hip", None)
is_rocm = (
hip_version is not None
or "AMD" in device_name
or "Radeon" in device_name
)
if is_rocm:
_log(
f"AMD GPU detected via ROCm: {device_name} "
f"({device_count} device(s)). Using AMD ROCm backend."
)
return InferenceBackend.AMD_ROCM
else:
_log(
f"NVIDIA GPU detected: {device_name} "
f"({device_count} device(s)). Using NVIDIA CUDA backend."
)
return InferenceBackend.NVIDIA_CUDA
except ImportError:
pass # torch not installed; continue to next check.
# ------------------------------------------------------------------
# 4. Intel OpenVINO
# ------------------------------------------------------------------
try:
from openvino.runtime import Core # type: ignore[import]
_ov_core = Core()
_ov_devices: List[str] = _ov_core.available_devices
_log(
f"Intel OpenVINO available. Devices: {_ov_devices}. "
"Using Intel OpenVINO backend."
)
return InferenceBackend.INTEL_OPENVINO
except ImportError:
pass
# ------------------------------------------------------------------
# 5. CPU via llama-cpp-python
# ------------------------------------------------------------------
try:
import llama_cpp # type: ignore[import] # noqa: F401
_log(
"No GPU detected. llama-cpp-python available. "
"Using CPU llama.cpp backend."
)
return InferenceBackend.CPU_LLAMA_CPP
except ImportError:
pass
# ------------------------------------------------------------------
# 6. HuggingFace Transformers (universal fallback)
# ------------------------------------------------------------------
try:
import transformers # type: ignore[import] # noqa: F401
_log(
"No GPU or llama.cpp detected. "
"Using HuggingFace Transformers backend (CPU)."
)
return InferenceBackend.HUGGINGFACE_TRANSFORMERS
except ImportError:
pass
_log(
"WARNING: No inference backend found. "
"Install at least one of: mlx-lm, llama-cpp-python, torch, transformers."
)
# Return CPU_LLAMA_CPP as the nominal default; the actual call will raise
# an ImportError with installation instructions when invoked.
return InferenceBackend.CPU_LLAMA_CPP
# ===========================================================================
# BYTE-LEVEL VOCABULARY HELPERS
# ===========================================================================
def build_byte_to_char() -> Dict[int, str]:
"""
Build the GPT-2 byte-to-character mapping.
Maps each of the 256 possible byte values (0-255) to a unique, printable
Unicode character. Bytes that are already printable, non-whitespace ASCII
characters (and a handful of Latin-1 supplement characters) map to
themselves. The remaining 68 bytes -- control characters, whitespace,
and a few Latin-1 specials -- map to Unicode code points starting at
U+0100 (Latin Extended-A block).
This mapping guarantees that every token string in the vocabulary consists
entirely of printable characters, making the vocabulary human-readable and
safe to embed in JSON files without escaping issues.
Returns
-------
Dict[int, str]
Mapping from byte value (0-255) to its single-character Unicode string.
"""
# Bytes that are already "nice": printable ASCII (33-126) plus two ranges
# of printable Latin-1 Supplement characters (161-172 and 174-255).
nice_set: Set[int] = (
set(range(33, 127)) # '!' through '~' (94 values)
| set(range(161, 173)) # U+00A1 .. U+00AC (12 values)
| set(range(174, 256)) # U+00AE .. U+00FF (82 values)
) # Total: 188 "nice" bytes
byte_to_char: Dict[int, str] = {}
# "Nice" bytes map to the Unicode character with the same code point.
for b in range(256):
if b in nice_set:
byte_to_char[b] = chr(b)
# The remaining 68 bytes (0-32, 127, 160, 173) map to code points
# starting at U+0100, chosen to be printable and unambiguous.
extra_cp = 256
for b in range(256):
if b not in nice_set:
byte_to_char[b] = chr(extra_cp)
extra_cp += 1
return byte_to_char
def build_char_to_byte(byte_to_char: Dict[int, str]) -> Dict[str, int]:
"""
Build the inverse of the byte-to-character mapping.
Parameters
----------
byte_to_char : Dict[int, str]
The forward mapping produced by :func:`build_byte_to_char`.
Returns
-------
Dict[str, int]
Mapping from the single-character Unicode string back to its byte value.
"""
return {char: byte_val for byte_val, char in byte_to_char.items()}
# ===========================================================================
# BPE TRAINING UTILITIES (module-level, stateless)
# ===========================================================================
def _get_pair_stats(
word_freqs: Dict[Tuple[str, ...], int],
) -> Dict[Tuple[str, str], int]:
"""
Count the frequency of every adjacent token pair across all words.
Each word in *word_freqs* is a tuple of token strings with an associated
corpus frequency. We accumulate pair counts weighted by that frequency.
Parameters
----------
word_freqs : Dict[Tuple[str, ...], int]
Current word-frequency table: token-sequence -> corpus count.
Returns
-------
Dict[Tuple[str, str], int]
Mapping from (left_token, right_token) to total weighted frequency.
"""
pair_counts: Dict[Tuple[str, str], int] = defaultdict(int)
for token_seq, freq in word_freqs.items():
for i in range(len(token_seq) - 1):
pair_counts[(token_seq[i], token_seq[i + 1])] += freq
return pair_counts
def _apply_merge(
pair: Tuple[str, str],
word_freqs: Dict[Tuple[str, ...], int],
) -> Dict[Tuple[str, ...], int]:
"""
Apply a single BPE merge rule to the word-frequency table.
Every occurrence of the adjacent pair *pair* in every token sequence is
replaced by the concatenation of the two tokens. A new dict is returned;
the input is not modified.
Parameters
----------
pair : Tuple[str, str]
The (left_token, right_token) pair to merge.
word_freqs : Dict[Tuple[str, ...], int]
The current word-frequency table.
Returns
-------
Dict[Tuple[str, ...], int]
Updated word-frequency table with the merge applied everywhere.
"""
merged_token = pair[0] + pair[1]
new_word_freqs: Dict[Tuple[str, ...], int] = {}
for token_seq, freq in word_freqs.items():
new_seq: List[str] = []
i = 0
while i < len(token_seq):
if (
i < len(token_seq) - 1
and token_seq[i] == pair[0]
and token_seq[i + 1] == pair[1]
):
new_seq.append(merged_token)
i += 2
else:
new_seq.append(token_seq[i])
i += 1
new_word_freqs[tuple(new_seq)] = freq
return new_word_freqs
# ===========================================================================
# EFFICIENT BPE ENCODING (heap-based, O(n log n))
# ===========================================================================
def _encode_chunk_bpe(
chunk_bytes: bytes,
byte_to_char: Dict[int, str],
merge_ranks: Dict[Tuple[str, str], int],
) -> Tuple[str, ...]:
"""
Encode a single pre-token chunk (raw UTF-8 bytes) using BPE merge rules.
Algorithm
---------
We maintain the token sequence as a doubly-linked list represented by
parallel ``prev`` / ``next_`` index arrays. A min-heap of
``(rank, position, left_tok, right_tok)`` tuples drives merge selection.
Stale heap entries -- where the tokens at the recorded position have
already changed due to an earlier merge -- are detected and skipped by
comparing the stored token strings against the current ``tokens`` array.
This gives O(n log n) time in the number of initial tokens, which is
far better than the naive O(n^2) scan for typical inputs.
Parameters
----------
chunk_bytes : bytes
Raw UTF-8 bytes of the pre-token to encode.
byte_to_char : Dict[int, str]
Mapping from byte values to their printable character representations.
merge_ranks : Dict[Tuple[str, str], int]
Mapping from token pairs to their merge rank (index in the ordered
merge list). Lower rank == higher priority.
Returns
-------
Tuple[str, ...]
The BPE-encoded token strings for this chunk.
"""
n = len(chunk_bytes)
if n == 0:
return ()
# Initialise the token sequence: one entry per byte.
# Elements are set to None when a position is deleted by a merge.
tokens: List[Optional[str]] = [byte_to_char[b] for b in chunk_bytes]
if n == 1:
return (tokens[0],) # type: ignore[return-value]
# Doubly-linked list over active token positions.
# prev[i] = index of the previous active token (-1 if none).
# next_[i] = index of the next active token (n if none / sentinel).
prev: List[int] = list(range(-1, n - 1)) # [-1, 0, 1, ..., n-2]
next_: List[int] = list(range(1, n + 1)) # [ 1, 2, 3, ..., n ]
# Build the initial heap: (rank, position, left_tok, right_tok).
heap: List[Tuple[int, int, str, str]] = []
for i in range(n - 1):
pair = (tokens[i], tokens[i + 1])
rank = merge_ranks.get(pair) # type: ignore[arg-type]
if rank is not None:
heapq.heappush(heap, (rank, i, tokens[i], tokens[i + 1])) # type: ignore[arg-type]
# Process merges in priority order (lowest rank first).
while heap:
rank, pos, left_tok, right_tok = heapq.heappop(heap)
# Skip stale entries: the left token at this position has changed.
if tokens[pos] != left_tok:
continue
# Skip stale entries: the right token (next active after pos) has changed.
next_pos = next_[pos]
if next_pos >= n or tokens[next_pos] != right_tok:
continue
# Apply the merge: write the merged token into `pos` and delete `next_pos`.
merged = left_tok + right_tok
tokens[pos] = merged
tokens[next_pos] = None
# Update the linked list to skip the now-deleted position.
next_after = next_[next_pos]
next_[pos] = next_after
if next_after < n:
prev[next_after] = pos
# Check whether the merged token can form a new pair with its left neighbour.
left_pos = prev[pos]
if left_pos >= 0 and tokens[left_pos] is not None:
new_pair = (tokens[left_pos], merged)
new_rank = merge_ranks.get(new_pair) # type: ignore[arg-type]
if new_rank is not None:
heapq.heappush(
heap,
(new_rank, left_pos, tokens[left_pos], merged), # type: ignore[arg-type]
)
# Check whether the merged token can form a new pair with its right neighbour.
right_pos = next_[pos]
if right_pos < n and tokens[right_pos] is not None:
new_pair = (merged, tokens[right_pos])
new_rank = merge_ranks.get(new_pair) # type: ignore[arg-type]
if new_rank is not None:
heapq.heappush(
heap,
(new_rank, pos, merged, tokens[right_pos]), # type: ignore[arg-type]
)
# Collect surviving (non-None) tokens in their original left-to-right order.
return tuple(t for t in tokens if t is not None)
# ===========================================================================
# CHAT TEMPLATES
# ===========================================================================
class ChatTemplate:
"""
Static factory for chat-template formatting functions.
Each static method accepts a list of message dicts (with ``"role"`` and
``"content"`` keys) and returns a formatted prompt string ready to be
passed to the tokenizer's :meth:`BPETokenizer.encode` method.
"""
@staticmethod
def llama3(
messages: List[Dict[str, str]],
add_generation_prompt: bool = True,
system_prompt: Optional[str] = None,
) -> str:
"""
Apply the LLaMA 3 / LLaMA 3.1 / LLaMA 3.2 chat template.
Format::
<|begin_of_text|>
<|start_header_id|>system<|end_header_id|>\\n\\n{content}<|eot_id|>
<|start_header_id|>user<|end_header_id|>\\n\\n{content}<|eot_id|>
<|start_header_id|>assistant<|end_header_id|>\\n\\n
Parameters
----------
messages : List[Dict[str, str]]
Conversation history. Each dict must have ``"role"`` (one of
``"system"``, ``"user"``, ``"assistant"``) and ``"content"``.
add_generation_prompt : bool
If True, append the assistant header to prompt the model to
generate a response. Set to False when encoding a complete
conversation for supervised fine-tuning.
system_prompt : Optional[str]
If provided and no system message is already present in
*messages*, prepend this text as a system message.
Returns
-------
str
The fully formatted prompt string.
Raises
------
ValueError
If any message has an unrecognised role.
"""
all_messages = list(messages)
if system_prompt and not any(m["role"] == "system" for m in all_messages):
all_messages = [{"role": "system", "content": system_prompt}] + all_messages
result = "<|begin_of_text|>"
for msg in all_messages:
role = msg["role"]
content = msg["content"]
if role not in ("system", "user", "assistant"):
raise ValueError(
f"Invalid role '{role}'. "
"Must be 'system', 'user', or 'assistant'."
)
result += f"<|start_header_id|>{role}<|end_header_id|>\n\n"
result += content
result += "<|eot_id|>"
if add_generation_prompt:
result += "<|start_header_id|>assistant<|end_header_id|>\n\n"
return result
@staticmethod
def chatml(
messages: List[Dict[str, str]],
add_generation_prompt: bool = True,
system_prompt: Optional[str] = None,
) -> str:
"""
Apply the ChatML template (Mistral, Qwen, Phi-3, and many others).
Format::
<|im_start|>system
{content}<|im_end|>
<|im_start|>user
{content}<|im_end|>
<|im_start|>assistant
Parameters
----------
messages : List[Dict[str, str]]
Conversation history.
add_generation_prompt : bool
If True, append ``<|im_start|>assistant\\n`` to prompt generation.
system_prompt : Optional[str]
Optional system prompt to prepend if none is present.
Returns
-------
str
The formatted prompt string.
"""
all_messages = list(messages)
if system_prompt and not any(m["role"] == "system" for m in all_messages):
all_messages = [{"role": "system", "content": system_prompt}] + all_messages
result = ""
for msg in all_messages:
result += f"<|im_start|>{msg['role']}\n{msg['content']}<|im_end|>\n"
if add_generation_prompt:
result += "<|im_start|>assistant\n"
return result
@staticmethod
def alpaca(
messages: List[Dict[str, str]],
add_generation_prompt: bool = True,
system_prompt: Optional[str] = None,
) -> str:
"""
Apply the Alpaca instruction-following template.
Format::
{system}
### Instruction:
{user_content}
### Response:
Only the last user message is used as the instruction.
Parameters
----------
messages : List[Dict[str, str]]
Conversation history.
add_generation_prompt : bool
If True, append ``### Response:\\n``.
system_prompt : Optional[str]
Optional system prompt.
Returns
-------
str
The formatted prompt string.
"""
sys_content = system_prompt or ""
for m in messages:
if m["role"] == "system":
sys_content = m["content"]
break
user_content = ""
for m in reversed(messages):
if m["role"] == "user":
user_content = m["content"]
break
result = ""
if sys_content:
result += sys_content + "\n\n"
result += f"### Instruction:\n{user_content}\n\n"
if add_generation_prompt:
result += "### Response:\n"
return result
@staticmethod
def get_template(name: str) -> Callable[..., str]:
"""
Return a chat-template function by name.
Parameters
----------
name : str
One of ``"llama3"``, ``"chatml"``, ``"alpaca"``.
Returns
-------
Callable[..., str]
The corresponding static template method.
Raises
------
ValueError
If *name* is not recognised.
"""
_registry: Dict[str, Callable[..., str]] = {
"llama3": ChatTemplate.llama3,
"chatml": ChatTemplate.chatml,
"alpaca": ChatTemplate.alpaca,
}
if name not in _registry:
raise ValueError(
f"Unknown chat template '{name}'. "
f"Available templates: {sorted(_registry.keys())}"
)
return _registry[name]
# ===========================================================================
# ENCODING RESULT
# ===========================================================================
class EncodingResult:
"""
Container for the output of :meth:`BPETokenizer.encode_batch`.
Attributes
----------
input_ids : List[int]
Token IDs for this sequence (including any padding).
attention_mask : List[int]
Binary mask: 1 for real tokens, 0 for padding tokens.
token_type_ids : None
Always None for BPE tokenizers (provided for HuggingFace API
compatibility).
tokens : List[str]
Token strings corresponding to each ID (useful for debugging).
"""
__slots__ = ("input_ids", "attention_mask", "token_type_ids", "tokens")
def __init__(
self,
input_ids: List[int],
attention_mask: List[int],
tokens: List[str],
) -> None:
self.input_ids: List[int] = input_ids
self.attention_mask: List[int] = attention_mask
self.token_type_ids: Optional[List[int]] = None # Not used for BPE.
self.tokens: List[str] = tokens
def __len__(self) -> int:
return len(self.input_ids)
def __repr__(self) -> str:
preview = self.input_ids[:8]
ellipsis = "..." if len(self.input_ids) > 8 else ""
return (
f"EncodingResult("
f"input_ids={preview}{ellipsis}, "
f"length={len(self.input_ids)})"
)
# ===========================================================================
# MAIN TOKENIZER CLASS
# ===========================================================================
class BPETokenizer:
"""
A production-ready Byte-Pair Encoding (BPE) tokenizer for LLMs.
Complete tokenization pipeline
--------------------------------
1. Unicode NFC normalisation for input consistency.
2. Pre-tokenisation via a configurable regex pattern (GPT-2 or cl100k).
3. Byte-level encoding using the GPT-2 byte-to-char mapping.
4. BPE merge application using an efficient heap-based algorithm.
5. Special-token handling (never split, always map to designated IDs).
6. Result packaging with optional truncation, padding, and tensor output.
Compatibility
-------------
The tokenizer is wire-compatible with GPT-2, GPT-3, GPT-4, LLaMA 3,
Mistral, Qwen, and any other model that uses byte-level BPE. Tokenizer
files are saved in the HuggingFace ``tokenizer.json`` format and can be
loaded by the HuggingFace ``tokenizers`` library and vice-versa.
Attributes
----------
bos_token : Optional[str]
Beginning-of-sequence special token string.
eos_token : Optional[str]
End-of-sequence special token string.
pad_token : Optional[str]
Padding special token string.
unk_token : Optional[str]
Unknown token string (rarely used in byte-level BPE).
"""
def __init__(
self,
split_pattern: str = GPT2_SPLIT_PATTERN,
) -> None:
"""
Initialise an empty BPETokenizer.
Call :meth:`train` to learn a vocabulary from a corpus, or use
:meth:`from_pretrained` / :meth:`from_huggingface` to load an
existing tokenizer.
Parameters
----------
split_pattern : str
Pre-tokenisation regex pattern. Use :data:`GPT2_SPLIT_PATTERN`
for GPT-2 / LLaMA 3 compatibility (default) or
:data:`CL100K_SPLIT_PATTERN` for GPT-4 / tiktoken compatibility.
"""
self._split_pattern: str = split_pattern
self._compiled_pattern: regex.Pattern = regex.compile(split_pattern)
# Byte-level mappings -- built once, immutable.
self._byte_to_char: Dict[int, str] = build_byte_to_char()
self._char_to_byte: Dict[str, int] = build_char_to_byte(self._byte_to_char)
# Vocabulary.
self._token_to_id: Dict[str, int] = {}
self._id_to_token: List[str] = []
# BPE merge rules (ordered; order is semantically significant).
self._merges: List[Tuple[str, str]] = []
self._merge_ranks: Dict[Tuple[str, str], int] = {}
# Special tokens.
self._special_tokens: Dict[str, int] = {}
self._special_token_ids: Set[int] = set()
# Cached compiled regex patterns for special-token splitting.
# _all_special_pattern : matches any special token (for allowed_special="all").
# _special_pattern_cache: maps frozenset of token strings to compiled pattern.
self._all_special_pattern: Optional[re.Pattern] = None
self._special_pattern_cache: Dict[frozenset, re.Pattern] = {}
# Encoding cache: pre-token bytes -> encoded token strings.
self._encode_cache: Dict[bytes, Tuple[str, ...]] = {}
self._cache_hits: int = 0
self._cache_misses: int = 0
# Convenience token-string properties (populated by add_special_tokens).
self.bos_token: Optional[str] = None
self.eos_token: Optional[str] = None
self.pad_token: Optional[str] = None
self.unk_token: Optional[str] = None
# Inference backend (detected lazily on first chat() call).
self._backend: Optional[InferenceBackend] = None
# Per-backend model caches.
self._mlx_cache: Dict[str, Any] = {}
self._llama_cpp_cache: Dict[Tuple, Any] = {}
self._openvino_cache: Dict[str, Any] = {}
self._transformers_cache: Dict[str, Any] = {}
# -----------------------------------------------------------------------
# TRAINING
# -----------------------------------------------------------------------
def train(
self,
corpus: Union[Iterable[str], str, Path],
vocab_size: int = 32_000,
min_frequency: int = 2,
special_tokens: Optional[Dict[str, int]] = None,
verbose: bool = True,
) -> "BPETokenizer":
"""
Train the BPE tokenizer on a text corpus.
The method processes the corpus, builds the initial 256-entry byte
vocabulary, and iteratively applies BPE merges until the vocabulary
reaches *vocab_size*.
Parameters
----------
corpus : Iterable[str] or str or Path
Training data. Accepted forms:
* An iterable of strings (e.g. a list of documents or a
generator that yields lines).
* A raw string (treated as a single document).
* A :class:`pathlib.Path` or string path to a UTF-8 text file
(read line by line, so arbitrarily large files are supported).
vocab_size : int
Target vocabulary size including the 256 base byte tokens and any
special tokens. Must be greater than 256. Typical values:
32 000 (LLaMA 1/2), 50 257 (GPT-2), 100 277 (GPT-4),
128 256 (LLaMA 3).
min_frequency : int
Minimum corpus frequency for a pair to be merged. Pairs that
appear fewer times are never merged. Increase this value for
very large corpora to speed up training and avoid merging
extremely rare pairs.
special_tokens : Optional[Dict[str, int]]
Special tokens to add after training. These are not subject to
BPE splitting. Their IDs should be >= *vocab_size* to avoid
collisions with regular BPE tokens.
verbose : bool
If True, print training progress (corpus stats, merge progress,
and final summary) to stdout.
Returns
-------
BPETokenizer
``self``, enabling method chaining.
Raises
------
ValueError
If *vocab_size* <= 256 or if the corpus is empty.
"""
if vocab_size <= 256:
raise ValueError(
f"vocab_size must be > 256 (got {vocab_size}). "
"The first 256 IDs are reserved for base byte tokens."
)
# Normalise the corpus input into a uniform text iterator.
text_iter: Iterable[str]
if isinstance(corpus, Path):
corpus_path = corpus
def _file_lines_path() -> Iterator[str]:
with open(corpus_path, "r", encoding="utf-8") as _fh:
yield from _fh
text_iter = _file_lines_path()
elif isinstance(corpus, str):
str_path = Path(corpus)
if str_path.exists() and str_path.is_file():
# It's a path string pointing to an existing file.
def _file_lines_str() -> Iterator[str]:
with open(str_path, "r", encoding="utf-8") as _fh:
yield from _fh
text_iter = _file_lines_str()
else:
# Treat the string itself as the corpus text.
if not Path(corpus).exists():
# Only warn if it looks like a path (contains path separators).
if os.sep in corpus or "/" in corpus:
import warnings
warnings.warn(
f"corpus string '{corpus[:80]}...' looks like a file "
"path but the file does not exist. Treating it as "
"raw text. Pass a pathlib.Path object to force "
"file-reading mode.",
UserWarning,
stacklevel=2,
)
text_iter = [corpus]
else:
text_iter = corpus
if verbose:
print("[BPETokenizer.train] Starting BPE training.")
print(f" Target vocab size : {vocab_size:,}")
print(f" Min pair frequency: {min_frequency}")
# ------------------------------------------------------------------
# Step 1: Build the word-frequency table from the corpus.
# Each word is stored as a tuple of single-character byte tokens.
# ------------------------------------------------------------------
t0 = time.monotonic()
word_freqs: Dict[Tuple[str, ...], int] = defaultdict(int)
doc_count = 0
token_count = 0
for text in text_iter:
# Apply Unicode NFC normalisation for deterministic tokenisation.
text = unicodedata.normalize("NFC", text)
pre_tokens = self._compiled_pattern.findall(text)
for pt in pre_tokens:
byte_seq = pt.encode("utf-8")
char_seq = tuple(self._byte_to_char[b] for b in byte_seq)
word_freqs[char_seq] += 1
token_count += 1
doc_count += 1
if not word_freqs:
raise ValueError("The corpus is empty. Cannot train on empty input.")
elapsed = time.monotonic() - t0
if verbose:
print(f" Corpus processed in {elapsed:.2f}s.")
print(f" Documents : {doc_count:,}")
print(f" Pre-token instances : {token_count:,}")
print(f" Unique pre-token types: {len(word_freqs):,}")
# ------------------------------------------------------------------
# Step 2: Initialise the vocabulary with the 256 base byte tokens.
# Sort by byte value (0-255) for a stable, reproducible ordering.
# ------------------------------------------------------------------
sorted_byte_pairs = sorted(self._byte_to_char.items(), key=lambda x: x[0])
self._token_to_id = {char: idx for idx, (_, char) in enumerate(sorted_byte_pairs)}
self._id_to_token = [char for _, char in sorted_byte_pairs]
# ------------------------------------------------------------------
# Step 3: Iteratively find and apply the most frequent merge.
# ------------------------------------------------------------------
num_merges = vocab_size - len(self._token_to_id)
self._merges = []
current_word_freqs: Dict[Tuple[str, ...], int] = dict(word_freqs)
if verbose:
print(f" Performing up to {num_merges:,} merges...")
for merge_idx in range(num_merges):
pair_stats = _get_pair_stats(current_word_freqs)
if not pair_stats:
if verbose:
print(f" No more pairs after {merge_idx} merges.")
break
# Select the most frequent pair; use lexicographic order as a
# tiebreaker to guarantee deterministic training runs.
best_pair = max(pair_stats, key=lambda p: (pair_stats[p], p))
best_freq = pair_stats[best_pair]
if best_freq < min_frequency:
if verbose:
print(
f" Stopping: best pair frequency {best_freq} "
f"< min_frequency {min_frequency} "
f"after {merge_idx} merges."
)
break
current_word_freqs = _apply_merge(best_pair, current_word_freqs)
self._merges.append(best_pair)
new_token = best_pair[0] + best_pair[1]
new_id = len(self._token_to_id)
self._token_to_id[new_token] = new_id
self._id_to_token.append(new_token)
if verbose and (merge_idx + 1) % 500 == 0:
elapsed = time.monotonic() - t0
print(
f" Merge {merge_idx + 1:,}/{num_merges:,}: "
f"'{best_pair[0]}' + '{best_pair[1]}' -> '{new_token}' "
f"(freq={best_freq:,}, elapsed={elapsed:.1f}s)"
)
# ------------------------------------------------------------------
# Step 4: Build the merge-rank lookup for fast encoding.
# ------------------------------------------------------------------
self._merge_ranks = {pair: rank for rank, pair in enumerate(self._merges)}
# ------------------------------------------------------------------
# Step 5: Add special tokens (if any).
# ------------------------------------------------------------------
if special_tokens:
self.add_special_tokens(special_tokens)
# Invalidate the encoding cache since the vocabulary has changed.
self._encode_cache.clear()
elapsed = time.monotonic() - t0
if verbose:
print(f"[BPETokenizer.train] Done in {elapsed:.2f}s.")
print(f" Final vocab size: {len(self._token_to_id):,}")
print(f" Merges performed: {len(self._merges):,}")
return self
# -----------------------------------------------------------------------
# SPECIAL TOKENS
# -----------------------------------------------------------------------
def add_special_tokens(self, special_tokens: Dict[str, int]) -> None:
"""
Add special tokens to the vocabulary.
Special tokens are never split by the BPE algorithm. They are mapped
directly to their specified IDs. If a specified ID already exists in
the vocabulary, the existing token at that position is overwritten.
This method also updates the convenience properties
(:attr:`bos_token`, :attr:`eos_token`, :attr:`pad_token`,
:attr:`unk_token`) based on common naming conventions, but only if
those properties have not already been set.
Parameters
----------
special_tokens : Dict[str, int]
Mapping from special token strings to their integer IDs.
"""
for token, token_id in special_tokens.items():
self._special_tokens[token] = token_id
self._special_token_ids.add(token_id)
# Extend _id_to_token if the new ID is beyond the current list.
while len(self._id_to_token) <= token_id:
self._id_to_token.append("")
self._id_to_token[token_id] = token
self._token_to_id[token] = token_id
# Update convenience properties using conservative name matching.
# Guard with "is None" so that the first matching token wins and
# subsequent calls do not overwrite an already-assigned property.
for token in special_tokens:
tl = token.lower()
if self.bos_token is None and (
"bos" in tl
or "begin_of_text" in tl
or tl == "<s>"
):
self.bos_token = token
if self.eos_token is None and (
"eos" in tl
or "end_of_text" in tl
or tl == "</s>"
):
self.eos_token = token
if self.pad_token is None and "pad" in tl:
self.pad_token = token
if self.unk_token is None and "unk" in tl:
self.unk_token = token
# Rebuild the cached special-token split patterns.
self._rebuild_special_patterns()
# Invalidate the encoding cache.
self._encode_cache.clear()
self._special_pattern_cache.clear()
def _rebuild_special_patterns(self) -> None:
"""
Rebuild the cached compiled regex patterns for special-token splitting.
Called automatically by :meth:`add_special_tokens`. Builds
``_all_special_pattern`` (used when ``allowed_special="all"``) by
sorting special tokens longest-first so that longer tokens are always
matched before shorter prefixes.
"""
if self._special_tokens:
sorted_specials = sorted(
self._special_tokens.keys(), key=len, reverse=True
)
self._all_special_pattern = re.compile(
"(" + "|".join(re.escape(s) for s in sorted_specials) + ")"
)
else:
self._all_special_pattern = None
def _get_special_pattern(
self,
active_specials: Dict[str, int],
) -> Optional[re.Pattern]:
"""
Return a compiled regex pattern for the given set of active special tokens.
Results are cached by the frozenset of active token strings so that
repeated calls with the same set (the common case) do not recompile.
Parameters
----------
active_specials : Dict[str, int]
The special tokens that should be recognised in this encode call.
Returns
-------
Optional[re.Pattern]
Compiled pattern, or None if *active_specials* is empty.
"""
if not active_specials:
return None
# Fast path: if active_specials is exactly self._special_tokens,
# use the pre-built pattern.
if active_specials is self._special_tokens:
return self._all_special_pattern
cache_key = frozenset(active_specials.keys())
# Also fast-path when all special tokens are active (same keys).
if cache_key == frozenset(self._special_tokens.keys()):
return self._all_special_pattern
cached = self._special_pattern_cache.get(cache_key)
if cached is not None:
return cached
sorted_specials = sorted(active_specials.keys(), key=len, reverse=True)
pattern = re.compile(
"(" + "|".join(re.escape(s) for s in sorted_specials) + ")"
)
self._special_pattern_cache[cache_key] = pattern
return pattern
# -----------------------------------------------------------------------
# CONVENIENCE PROPERTIES
# -----------------------------------------------------------------------
@property
def bos_token_id(self) -> Optional[int]:
"""Integer ID of the BOS token, or None if not set."""
return self._special_tokens.get(self.bos_token) if self.bos_token else None
@property
def eos_token_id(self) -> Optional[int]:
"""Integer ID of the EOS token, or None if not set."""
return self._special_tokens.get(self.eos_token) if self.eos_token else None
@property
def pad_token_id(self) -> Optional[int]:
"""Integer ID of the PAD token, or None if not set."""
return self._special_tokens.get(self.pad_token) if self.pad_token else None
@property
def unk_token_id(self) -> Optional[int]:
"""Integer ID of the UNK token, or None if not set."""
return self._special_tokens.get(self.unk_token) if self.unk_token else None
@property
def vocab_size(self) -> int:
"""Total number of tokens in the vocabulary (including special tokens)."""
return len(self._token_to_id)
# -----------------------------------------------------------------------
# INTERNAL ENCODING HELPER
# -----------------------------------------------------------------------
def _encode_chunk(self, chunk_bytes: bytes) -> Tuple[str, ...]:
"""
Encode a pre-token chunk (raw bytes) to token strings, with caching.
Results are cached by the raw byte sequence. Common pre-tokens such
as frequent English words are encoded only once per tokenizer lifetime,
dramatically reducing CPU time for repetitive text.
Parameters
----------
chunk_bytes : bytes
Raw UTF-8 bytes of the pre-token.
Returns
-------
Tuple[str, ...]
BPE-encoded token strings.
"""
cached = self._encode_cache.get(chunk_bytes)
if cached is not None:
self._cache_hits += 1
return cached
self._cache_misses += 1
result = _encode_chunk_bpe(chunk_bytes, self._byte_to_char, self._merge_ranks)
if len(self._encode_cache) < _ENCODE_CACHE_MAX_SIZE:
self._encode_cache[chunk_bytes] = result
return result
# -----------------------------------------------------------------------
# ENCODING
# -----------------------------------------------------------------------
def encode(
self,
text: str,
add_special_tokens: bool = False,
allowed_special: Union[Set[str], Literal["all", "none"]] = "none",
) -> List[int]:
"""
Encode a text string into a list of token IDs.
Unicode NFC normalisation is applied to *text* before tokenisation,
consistent with the normalisation applied during :meth:`train`. This
guarantees that semantically identical strings (e.g. the same word in
NFC vs NFD form) always produce the same token IDs.
Parameters
----------
text : str
The input text to encode.
add_special_tokens : bool
If True, prepend the BOS token ID and append the EOS token ID
(when those tokens are defined in the vocabulary).
allowed_special : Set[str] or "all" or "none"
Controls which special token strings are recognised inside *text*.
``"all"``
Every special token in the vocabulary is recognised and mapped
to its designated ID without BPE splitting.
``"none"`` (default)
No special tokens are recognised. Any special token string
that appears in *text* is encoded as regular text. This is
the safe default that prevents prompt-injection attacks.
``Set[str]``
Only the specified special tokens are recognised.
Returns
-------
List[int]
Ordered list of token IDs.
Raises
------
ValueError
If the tokenizer has no vocabulary (not yet trained or loaded).
RuntimeError
If the BPE algorithm produces a token string not present in the
vocabulary (indicates an internal bug).
"""
if not self._token_to_id:
raise ValueError(
"Tokenizer has no vocabulary. "
"Call train() or from_pretrained() first."
)
# Apply NFC normalisation for consistency with training.
text = unicodedata.normalize("NFC", text)
# Resolve the set of active special tokens and get the cached pattern.
active_specials: Dict[str, int]
special_re: Optional[re.Pattern]
if allowed_special == "all":
active_specials = self._special_tokens
special_re = self._all_special_pattern
elif allowed_special == "none":
active_specials = {}
special_re = None
else:
# allowed_special is a Set[str].
active_specials = {
k: v
for k, v in self._special_tokens.items()
if k in allowed_special
}
special_re = self._get_special_pattern(active_specials)
ids: List[int] = []
if add_special_tokens and self.bos_token_id is not None:
ids.append(self.bos_token_id)
# Split the text at special-token boundaries (if any are active).
chunks: List[str]
if special_re is not None:
chunks = special_re.split(text)
else:
chunks = [text]
for chunk in chunks:
if not chunk:
continue
if chunk in active_specials:
ids.append(active_specials[chunk])
else:
for pre_token in self._compiled_pattern.findall(chunk):
chunk_bytes = pre_token.encode("utf-8")
token_strings = self._encode_chunk(chunk_bytes)
for tok_str in token_strings:
tok_id = self._token_to_id.get(tok_str)
if tok_id is None:
raise RuntimeError(
f"BPE produced token string {tok_str!r} "
"that is not in the vocabulary. "
"This is an internal bug; please report it."
)
ids.append(tok_id)
if add_special_tokens and self.eos_token_id is not None:
ids.append(self.eos_token_id)
return ids
def encode_batch(
self,
texts: List[str],
add_special_tokens: bool = False,
allowed_special: Union[Set[str], Literal["all", "none"]] = "none",
padding: bool = False,
truncation: bool = False,
max_length: Optional[int] = None,
return_tensors: Optional[str] = None,
) -> Union[List[EncodingResult], Dict[str, Any]]:
"""
Encode a batch of text strings.
Parameters
----------
texts : List[str]
The texts to encode.
add_special_tokens : bool
If True, add BOS/EOS tokens to each sequence.
allowed_special : Set[str] or "all" or "none"
Special token handling (see :meth:`encode`).
padding : bool
If True, pad all sequences to the same length. The target length
is the length of the longest sequence in the batch, capped at
*max_length* if *max_length* is specified.
truncation : bool
If True, truncate sequences longer than *max_length*. Requires
*max_length* to be set.
max_length : Optional[int]
Maximum sequence length for truncation and/or padding.
return_tensors : Optional[str]
``"pt"`` -- return a dict of PyTorch ``LongTensor`` objects.
``"np"`` -- return a dict of NumPy ``int64`` arrays.
``None`` -- return a list of :class:`EncodingResult` objects.
Returns
-------
List[EncodingResult] or Dict[str, tensor]
Encoded batch. When *return_tensors* is set, all sequences must
have the same length (use *padding=True* to ensure this).
Raises
------
ValueError
If *truncation=True* but *max_length* is not specified.
If *return_tensors* is set but sequences have different lengths
and *padding=False*.
"""
if not texts:
if return_tensors is None:
return []
return {"input_ids": [], "attention_mask": []}
if truncation and max_length is None:
raise ValueError(
"truncation=True requires max_length to be specified. "
"Example: encode_batch(texts, truncation=True, max_length=512)"
)
# Encode each text individually.
all_ids: List[List[int]] = []
for text in texts:
ids = self.encode(
text,
add_special_tokens=add_special_tokens,
allowed_special=allowed_special,
)
if truncation and max_length is not None:
ids = ids[:max_length]
all_ids.append(ids)
# Determine the target length for padding.
target_length: Optional[int] = None
if padding:
target_length = max(len(ids) for ids in all_ids)
if max_length is not None:
target_length = min(target_length, max_length)
# Validate that tensor output is possible when sequences differ in length.
if return_tensors is not None and target_length is None:
lengths = {len(ids) for ids in all_ids}
if len(lengths) > 1:
raise ValueError(
"Cannot return tensors when sequences have different lengths "
"and padding=False. Set padding=True or ensure all inputs "
"have the same length."
)
pad_id = self.pad_token_id if self.pad_token_id is not None else 0
results: List[EncodingResult] = []
for ids in all_ids:
attn_mask = [1] * len(ids)
if target_length is not None and len(ids) < target_length:
pad_len = target_length - len(ids)
ids = ids + [pad_id] * pad_len
attn_mask = attn_mask + [0] * pad_len
tok_strings = [
self._id_to_token[i] if 0 <= i < len(self._id_to_token) else ""
for i in ids
]
results.append(EncodingResult(ids, attn_mask, tok_strings))
if return_tensors == "pt":
try:
import torch # type: ignore[import]
except ImportError:
raise ImportError(
"PyTorch is required for return_tensors='pt'. "
"Install with: pip install torch"
)
return {
"input_ids": torch.tensor(
[r.input_ids for r in results], dtype=torch.long
),
"attention_mask": torch.tensor(
[r.attention_mask for r in results], dtype=torch.long
),
}
if return_tensors == "np":
try:
import numpy as np # type: ignore[import]
except ImportError:
raise ImportError(
"NumPy is required for return_tensors='np'. "
"Install with: pip install numpy"
)
return {
"input_ids": np.array(
[r.input_ids for r in results], dtype=np.int64
),
"attention_mask": np.array(
[r.attention_mask for r in results], dtype=np.int64
),
}
return results
def __call__(
self,
text: Union[str, List[str]],
add_special_tokens: bool = False,
allowed_special: Union[Set[str], Literal["all", "none"]] = "none",
padding: bool = False,
truncation: bool = False,
max_length: Optional[int] = None,
return_tensors: Optional[str] = None,
) -> Union[List[int], List[EncodingResult], Dict[str, Any]]:
"""
HuggingFace-style callable interface.
Calling the tokenizer directly is equivalent to calling
:meth:`encode` for a single string or :meth:`encode_batch` for a
list of strings.
Parameters
----------
text : str or List[str]
A single text string or a list of text strings.
add_special_tokens : bool
Add BOS/EOS tokens.
allowed_special : Set[str] or "all" or "none"
Special token handling.
padding : bool
Pad to the longest sequence in the batch (batch mode only).
truncation : bool
Truncate to *max_length* (batch mode only). Requires *max_length*.
max_length : Optional[int]
Maximum sequence length.
return_tensors : Optional[str]
``"pt"`` or ``"np"`` for tensor output (batch mode only).
Returns
-------
List[int] or List[EncodingResult] or Dict[str, tensor]
Encoded output.
"""
if isinstance(text, str):
return self.encode(
text,
add_special_tokens=add_special_tokens,
allowed_special=allowed_special,
)
return self.encode_batch(
text,
add_special_tokens=add_special_tokens,
allowed_special=allowed_special,
padding=padding,
truncation=truncation,
max_length=max_length,
return_tensors=return_tensors,
)
# -----------------------------------------------------------------------
# DECODING
# -----------------------------------------------------------------------
def decode(
self,
ids: List[int],
skip_special_tokens: bool = False,
errors: str = "replace",
) -> str:
"""
Decode a list of token IDs back into a text string.
The method accumulates byte values from regular BPE tokens into a
buffer and flushes the buffer as UTF-8 text whenever a special token
is encountered or the sequence ends. This correctly handles multi-byte
UTF-8 characters that span multiple tokens.
Parameters
----------
ids : List[int]
Token IDs to decode.
skip_special_tokens : bool
If True, special tokens are omitted from the output.
If False (default), special token strings are included verbatim.
errors : str
Error handling for UTF-8 decoding: ``"strict"``, ``"ignore"``,
or ``"replace"`` (default). ``"replace"`` substitutes the
Unicode replacement character (U+FFFD) for invalid byte sequences,
which can arise when the model generates a truncated multi-byte
character.
Returns
-------
str
The decoded text.
"""
byte_buffer: List[int] = []
result_parts: List[str] = []
for token_id in ids:
if token_id < 0 or token_id >= len(self._id_to_token):
continue # Silently skip out-of-range IDs.
token_str = self._id_to_token[token_id]
if token_id in self._special_token_ids:
# Flush accumulated bytes before inserting the special token.
if byte_buffer:
result_parts.append(
bytes(byte_buffer).decode("utf-8", errors=errors)
)
byte_buffer = []
if not skip_special_tokens:
result_parts.append(token_str)
else:
# Convert each character in the token string back to a byte.
for char in token_str:
bval = self._char_to_byte.get(char)
if bval is not None:
byte_buffer.append(bval)
# Characters not in the mapping are silently skipped;
# this should never happen with a correctly trained tokenizer.
# Flush any remaining bytes.
if byte_buffer:
result_parts.append(
bytes(byte_buffer).decode("utf-8", errors=errors)
)
return "".join(result_parts)
def decode_batch(
self,
batch_ids: List[List[int]],
skip_special_tokens: bool = False,
skip_padding: bool = True,
) -> List[str]:
"""
Decode a batch of token ID lists.
Parameters
----------
batch_ids : List[List[int]]
A list of token ID sequences.
skip_special_tokens : bool
If True, omit special tokens from the output.
skip_padding : bool
If True (default) and the pad token is defined, strip trailing
pad tokens from each sequence before decoding. This prevents
padding tokens from appearing as null bytes or other artefacts
in the decoded output when the pad token is not a special token.
Returns
-------
List[str]
The decoded strings, one per input sequence.
"""
pad_id = self.pad_token_id
def _strip_padding(ids: List[int]) -> List[int]:
if not skip_padding or pad_id is None:
return ids
# Strip trailing pad tokens.
end = len(ids)
while end > 0 and ids[end - 1] == pad_id:
end -= 1
return ids[:end]
return [
self.decode(
_strip_padding(ids),
skip_special_tokens=skip_special_tokens,
)
for ids in batch_ids
]
# -----------------------------------------------------------------------
# VOCABULARY UTILITIES
# -----------------------------------------------------------------------
def get_vocab(self) -> Dict[str, int]:
"""Return a copy of the full vocabulary mapping (token -> ID)."""
return dict(self._token_to_id)
def tokenize(self, text: str) -> List[str]:
"""
Tokenize text and return token strings instead of IDs.
Useful for debugging and for inspecting how the tokenizer splits text.
Parameters
----------
text : str
Input text.
Returns
-------
List[str]
Token strings in the order they appear in the encoded sequence.
"""
ids = self.encode(text, allowed_special="all")
return [
self._id_to_token[i]
for i in ids
if 0 <= i < len(self._id_to_token)
]
def convert_tokens_to_ids(
self, tokens: Union[str, List[str]]
) -> Union[int, List[int]]:
"""
Convert token string(s) to integer ID(s).
Parameters
----------
tokens : str or List[str]
A single token string or a list of token strings.
Returns
-------
int or List[int]
The corresponding ID(s). Returns -1 for unknown tokens.
"""
if isinstance(tokens, str):
return self._token_to_id.get(tokens, -1)
return [self._token_to_id.get(t, -1) for t in tokens]
def convert_ids_to_tokens(
self, ids: Union[int, List[int]]
) -> Union[str, List[str]]:
"""
Convert token ID(s) to token string(s).
Parameters
----------
ids : int or List[int]
A single token ID or a list of token IDs.
Returns
-------
str or List[str]
The corresponding token string(s). Returns ``""`` for
out-of-range IDs.
"""
if isinstance(ids, int):
return (
self._id_to_token[ids]
if 0 <= ids < len(self._id_to_token)
else ""
)
return [
self._id_to_token[i] if 0 <= i < len(self._id_to_token) else ""
for i in ids
]
def convert_tokens_to_string(self, tokens: List[str]) -> str:
"""
Convert a list of token strings to a decoded text string.
Parameters
----------
tokens : List[str]
Token strings as they appear in the vocabulary.
Returns
-------
str
The decoded text.
"""
ids = [self._token_to_id[t] for t in tokens if t in self._token_to_id]
return self.decode(ids)
def count_tokens(self, text: str) -> int:
"""
Count the number of tokens that *text* encodes to.
Parameters
----------
text : str
Input text.
Returns
-------
int
Token count.
"""
return len(self.encode(text, allowed_special="all"))
def truncate(
self,
text: str,
max_tokens: int,
add_special_tokens: bool = False,
) -> str:
"""
Truncate *text* so that it encodes to at most *max_tokens* tokens.
The method encodes the text, truncates the token ID list, and decodes
back to a string. The returned string is guaranteed to encode to
exactly *max_tokens* tokens (or fewer if the original text is shorter).
Parameters
----------
text : str
Input text to truncate.
max_tokens : int
Maximum allowed token count.
add_special_tokens : bool
If True, BOS/EOS tokens are included in the count and will be
present in the returned string.
Returns
-------
str
The truncated text.
"""
ids = self.encode(
text,
add_special_tokens=add_special_tokens,
allowed_special="all",
)
if len(ids) <= max_tokens:
return text
return self.decode(
ids[:max_tokens],
skip_special_tokens=not add_special_tokens,
)
# -----------------------------------------------------------------------
# SAVING AND LOADING
# -----------------------------------------------------------------------
def save(
self,
directory: Union[str, Path],
name: str = "tokenizer",
verbose: bool = True,
) -> None:
"""
Save the tokenizer to a directory in HuggingFace-compatible JSON format.
Two files are written:
``tokenizer.json``
Contains the vocabulary, merge rules, and special tokens.
Compatible with the HuggingFace ``tokenizers`` library.
``tokenizer_config.json``
Contains metadata: tokenizer class, vocabulary size, number of
merges, BOS/EOS/PAD/UNK token strings, and the split-pattern name.
Parameters
----------
directory : str or Path
Output directory. Created (including parents) if it does not exist.
name : str
A human-readable name for this tokenizer, stored in the config.
verbose : bool
If True, print a summary after saving.
"""
save_dir = Path(directory)
save_dir.mkdir(parents=True, exist_ok=True)
tokenizer_data: Dict[str, Any] = {
"version": "1.0",
"type": "BPE",
"model": {
"type": "BPE",
"vocab": self._token_to_id,
# Merges are stored as "left right" strings (space-separated).
# Token strings in byte-level BPE never contain literal spaces
# (the space byte maps to a non-space Unicode character), so
# splitting on the first space is always unambiguous.
"merges": [f"{a} {b}" for a, b in self._merges],
},
"split_pattern": self._split_pattern,
"special_tokens": {
token: {"id": token_id, "content": token}
for token, token_id in self._special_tokens.items()
},
"added_tokens": [
{
"id": token_id,
"content": token,
"single_word": False,
"lstrip": False,
"rstrip": False,
"normalized": False,
"special": True,
}
for token, token_id in sorted(
self._special_tokens.items(), key=lambda x: x[1]
)
],
}
with open(save_dir / "tokenizer.json", "w", encoding="utf-8") as fh:
json.dump(tokenizer_data, fh, ensure_ascii=False, indent=2)
# Determine the pattern name for the config.
if self._split_pattern == GPT2_SPLIT_PATTERN:
pattern_name = "gpt2"
elif self._split_pattern == CL100K_SPLIT_PATTERN:
pattern_name = "cl100k"
else:
pattern_name = "custom"
config_data: Dict[str, Any] = {
"tokenizer_class": "BPETokenizer",
"tokenizer_name": name,
"vocab_size": self.vocab_size,
"num_merges": len(self._merges),
"model_max_length": 131_072,
"bos_token": self.bos_token,
"eos_token": self.eos_token,
"pad_token": self.pad_token,
"unk_token": self.unk_token,
"split_pattern_name": pattern_name,
}
with open(save_dir / "tokenizer_config.json", "w", encoding="utf-8") as fh:
json.dump(config_data, fh, ensure_ascii=False, indent=2)
if verbose:
print(f"Tokenizer saved to '{save_dir}'.")
print(f" Vocabulary size : {self.vocab_size:,}")
print(f" Merges : {len(self._merges):,}")
print(f" Special tokens : {len(self._special_tokens)}")
@classmethod
def from_pretrained(
cls,
directory: Union[str, Path],
verbose: bool = True,
) -> "BPETokenizer":
"""
Load a :class:`BPETokenizer` from a local directory.
The directory must contain a ``tokenizer.json`` file as written by
:meth:`save`. A ``tokenizer_config.json`` file is optional but
provides BOS/EOS/PAD/UNK token information.
Parameters
----------
directory : str or Path
Directory containing the tokenizer files.
verbose : bool
If True, print a summary after loading.
Returns
-------
BPETokenizer
The loaded tokenizer, ready to use.
Raises
------
FileNotFoundError
If ``tokenizer.json`` is not found in *directory*.
"""
load_dir = Path(directory)
tok_path = load_dir / "tokenizer.json"
if not tok_path.exists():
raise FileNotFoundError(
f"tokenizer.json not found in '{load_dir}'. "
"Make sure the directory was created by BPETokenizer.save()."
)
with open(tok_path, "r", encoding="utf-8") as fh:
data: Dict[str, Any] = json.load(fh)
split_pattern = data.get("split_pattern", GPT2_SPLIT_PATTERN)
tokenizer = cls(split_pattern=split_pattern)
# Load vocabulary (cast to Dict[str, int] for type safety).
raw_vocab = data["model"]["vocab"]
tokenizer._token_to_id = {str(k): int(v) for k, v in raw_vocab.items()}
max_id = max(tokenizer._token_to_id.values()) if tokenizer._token_to_id else -1
tokenizer._id_to_token = [""] * (max_id + 1)
for tok_str, tok_id in tokenizer._token_to_id.items():
tokenizer._id_to_token[tok_id] = tok_str
# Load merges.
for merge_str in data["model"].get("merges", []):
parts = merge_str.split(" ", 1)
if len(parts) == 2:
tokenizer._merges.append((parts[0], parts[1]))
tokenizer._merge_ranks = {
pair: rank for rank, pair in enumerate(tokenizer._merges)
}
# Load special tokens.
special_tokens: Dict[str, int] = {}
for tok_str, info in data.get("special_tokens", {}).items():
if isinstance(info, dict):
special_tokens[str(tok_str)] = int(info["id"])
if special_tokens:
tokenizer.add_special_tokens(special_tokens)
# Load config for convenience properties (only if not already set
# by add_special_tokens above).
cfg_path = load_dir / "tokenizer_config.json"
if cfg_path.exists():
with open(cfg_path, "r", encoding="utf-8") as fh:
cfg: Dict[str, Any] = json.load(fh)
if tokenizer.bos_token is None:
tokenizer.bos_token = cfg.get("bos_token")
if tokenizer.eos_token is None:
tokenizer.eos_token = cfg.get("eos_token")
if tokenizer.pad_token is None:
tokenizer.pad_token = cfg.get("pad_token")
if tokenizer.unk_token is None:
tokenizer.unk_token = cfg.get("unk_token")
if verbose:
print(f"Tokenizer loaded from '{load_dir}'.")
print(f" Vocabulary size: {tokenizer.vocab_size:,}")
print(f" Merges : {len(tokenizer._merges):,}")
return tokenizer
@classmethod
def from_huggingface(
cls,
model_name_or_path: str,
cache_dir: Optional[str] = None,
token: Optional[str] = None,
verbose: bool = True,
) -> "BPETokenizer":
"""
Load a :class:`BPETokenizer` from the HuggingFace Hub or a local
HuggingFace model directory.
Downloads ``tokenizer.json`` (and optionally ``tokenizer_config.json``)
from the Hub and constructs a fully functional tokenizer. Supports
any model that uses byte-level BPE (GPT-2, LLaMA, Mistral, Qwen, etc.).
Parameters
----------
model_name_or_path : str
HuggingFace model identifier (e.g. ``"meta-llama/Meta-Llama-3-8B"``)
or a local path to a HuggingFace model directory.
cache_dir : Optional[str]
Directory for caching downloaded files. Defaults to
``~/.cache/huggingface/hub``.
token : Optional[str]
HuggingFace API token for accessing gated / private models.
verbose : bool
If True, print download progress and a summary after loading.
Returns
-------
BPETokenizer
The loaded tokenizer.
Raises
------
ImportError
If ``huggingface_hub`` is not installed.
RuntimeError
If the tokenizer files cannot be downloaded.
ValueError
If the downloaded ``tokenizer.json`` contains no vocabulary.
"""
try:
from huggingface_hub import hf_hub_download # type: ignore[import]
except ImportError:
raise ImportError(
"huggingface_hub is required for from_huggingface(). "
"Install with: pip install huggingface_hub"
)
# If it looks like a local directory, load directly.
local_path = Path(model_name_or_path)
if local_path.exists() and local_path.is_dir():
return cls.from_pretrained(local_path, verbose=verbose)
if verbose:
print(f"Downloading tokenizer from HuggingFace Hub: {model_name_or_path}")
try:
tok_json_path = hf_hub_download(
repo_id=model_name_or_path,
filename="tokenizer.json",
cache_dir=cache_dir,
token=token,
)
except Exception as exc:
raise RuntimeError(
f"Failed to download tokenizer.json "
f"from '{model_name_or_path}': {exc}"
) from exc
with open(tok_json_path, "r", encoding="utf-8") as fh:
data: Dict[str, Any] = json.load(fh)
tokenizer = cls()
model_data: Dict[str, Any] = data.get("model", {})
# Vocabulary (cast for type safety).
raw_vocab = model_data.get("vocab", {})
if not raw_vocab:
raise ValueError(
f"No vocabulary found in tokenizer.json "
f"from '{model_name_or_path}'."
)
tokenizer._token_to_id = {str(k): int(v) for k, v in raw_vocab.items()}
max_id = max(tokenizer._token_to_id.values())
tokenizer._id_to_token = [""] * (max_id + 1)
for tok_str, tok_id in tokenizer._token_to_id.items():
tokenizer._id_to_token[tok_id] = tok_str
# Merges.
for merge_entry in model_data.get("merges", []):
if isinstance(merge_entry, str):
parts = merge_entry.split(" ", 1)
if len(parts) == 2:
tokenizer._merges.append((parts[0], parts[1]))
elif isinstance(merge_entry, (list, tuple)) and len(merge_entry) == 2:
tokenizer._merges.append((str(merge_entry[0]), str(merge_entry[1])))
tokenizer._merge_ranks = {
pair: rank for rank, pair in enumerate(tokenizer._merges)
}
# Special tokens from "added_tokens" (HuggingFace standard location).
special_tokens: Dict[str, int] = {}
for added in data.get("added_tokens", []):
if added.get("special", False):
special_tokens[str(added["content"])] = int(added["id"])
# Also check the "special_tokens" field (our own save format).
for tok_str, info in data.get("special_tokens", {}).items():
if isinstance(info, dict):
tok_id_val = info.get("id", info.get("ids", [None])[0])
if tok_id_val is not None:
special_tokens[str(tok_str)] = int(tok_id_val)
if special_tokens:
tokenizer.add_special_tokens(special_tokens)
# Optionally download tokenizer_config.json for BOS/EOS info.
try:
cfg_path = hf_hub_download(
repo_id=model_name_or_path,
filename="tokenizer_config.json",
cache_dir=cache_dir,
token=token,
)
with open(cfg_path, "r", encoding="utf-8") as fh:
cfg: Dict[str, Any] = json.load(fh)
def _tok_str_from_cfg(val: Any) -> Optional[str]:
"""Extract a token string from a config value (str or dict)."""
if isinstance(val, str):
return val
if isinstance(val, dict):
return val.get("content")
return None
if tokenizer.bos_token is None:
tokenizer.bos_token = _tok_str_from_cfg(cfg.get("bos_token"))
if tokenizer.eos_token is None:
tokenizer.eos_token = _tok_str_from_cfg(cfg.get("eos_token"))
if tokenizer.pad_token is None:
tokenizer.pad_token = _tok_str_from_cfg(cfg.get("pad_token"))
if tokenizer.unk_token is None:
tokenizer.unk_token = _tok_str_from_cfg(cfg.get("unk_token"))
except Exception:
pass # Config is optional; continue without it.
if verbose:
print(
f"Tokenizer loaded from HuggingFace Hub: {model_name_or_path}"
)
print(f" Vocabulary size: {tokenizer.vocab_size:,}")
print(f" Merges : {len(tokenizer._merges):,}")
return tokenizer
# -----------------------------------------------------------------------
# CHAT TEMPLATES
# -----------------------------------------------------------------------
def apply_chat_template(
self,
messages: List[Dict[str, str]],
template: str = "llama3",
add_generation_prompt: bool = True,
system_prompt: Optional[str] = None,
tokenize: bool = True,
allowed_special: Union[Set[str], Literal["all", "none"]] = "all",
) -> Union[str, List[int]]:
"""
Apply a chat template to a list of messages and optionally tokenize.
Parameters
----------
messages : List[Dict[str, str]]
Conversation history (dicts with ``"role"`` and ``"content"``).
template : str
Chat template name: ``"llama3"``, ``"chatml"``, or ``"alpaca"``.
add_generation_prompt : bool
If True, append the assistant generation prompt.
system_prompt : Optional[str]
Optional system prompt to prepend if none is present in *messages*.
tokenize : bool
If True (default), return token IDs.
If False, return the formatted string.
allowed_special : Set[str] or "all" or "none"
Special token handling (only relevant when *tokenize=True*).
Returns
-------
str or List[int]
Formatted prompt string (if *tokenize=False*) or token IDs.
"""
template_fn = ChatTemplate.get_template(template)
formatted: str = template_fn(
messages,
add_generation_prompt=add_generation_prompt,
system_prompt=system_prompt,
)
if not tokenize:
return formatted
return self.encode(formatted, allowed_special=allowed_special)
# -----------------------------------------------------------------------
# INFERENCE INTEGRATION
# -----------------------------------------------------------------------
def chat(
self,
messages: List[Dict[str, str]],
model_path: str,
template: str = "llama3",
system_prompt: Optional[str] = None,
max_new_tokens: int = 512,
temperature: float = 0.7,
top_p: float = 0.9,
backend: Optional[InferenceBackend] = None,
n_gpu_layers: int = -1,
n_ctx: int = 8_192,
verbose: bool = False,
) -> str:
"""
Apply a chat template and run LLM inference with the best available backend.
The method automatically detects the best inference backend for the
current hardware (Apple MLX, NVIDIA CUDA, AMD ROCm, Intel OpenVINO,
or CPU via llama.cpp), applies the specified chat template, and
generates a response.
Parameters
----------
messages : List[Dict[str, str]]
Conversation history.
model_path : str
Path to the model.
* **Apple MLX**: directory containing MLX model files
(``config.json``, ``*.safetensors`` or ``*.npz``).
* **llama.cpp backends** (NVIDIA, AMD, CPU): path to a GGUF file.
* **Intel OpenVINO**: directory with OpenVINO IR files or a
HuggingFace model directory (converted on first use).
* **HuggingFace Transformers**: HuggingFace model directory or
Hub identifier.
template : str
Chat template name: ``"llama3"``, ``"chatml"``, or ``"alpaca"``.
system_prompt : Optional[str]
Optional system prompt.
max_new_tokens : int
Maximum number of tokens to generate.
temperature : float
Sampling temperature. 0 = greedy decoding.
top_p : float
Top-p (nucleus) sampling parameter.
backend : Optional[InferenceBackend]
Force a specific backend. If None, auto-detect on first call and
persist the detected backend for subsequent calls.
n_gpu_layers : int
For llama.cpp backends: number of transformer layers to offload
to the GPU. ``-1`` offloads all layers. ``0`` is CPU-only.
n_ctx : int
For llama.cpp backends: maximum context window size in tokens.
verbose : bool
If True, print backend selection, prompt token count, and
generation throughput.
Returns
-------
str
The generated response text (not including the prompt).
"""
# Resolve backend: use explicit override, or detect and persist.
if backend is not None:
# Persist the explicitly chosen backend for future calls.
self._backend = backend
else:
if self._backend is None:
self._backend = detect_best_backend(verbose=verbose)
backend = self._backend
# Format the prompt (tokenize=False returns str).
prompt: str = self.apply_chat_template( # type: ignore[assignment]
messages,
template=template,
add_generation_prompt=True,
system_prompt=system_prompt,
tokenize=False,
)
if verbose:
prompt_ids = self.encode(prompt, allowed_special="all")
print(f"[chat] Backend : {backend.name}")
print(f"[chat] Prompt tokens: {len(prompt_ids):,}")
t0 = time.monotonic()
if backend == InferenceBackend.APPLE_MLX:
response = self._run_mlx(
prompt, model_path, max_new_tokens, temperature, top_p
)
elif backend in (
InferenceBackend.NVIDIA_CUDA,
InferenceBackend.AMD_ROCM,
InferenceBackend.CPU_LLAMA_CPP,
):
response = self._run_llama_cpp(
prompt, model_path,
max_new_tokens, temperature, top_p,
n_gpu_layers, n_ctx,
)
elif backend == InferenceBackend.INTEL_OPENVINO:
response = self._run_openvino(
prompt, model_path, max_new_tokens, temperature
)
elif backend == InferenceBackend.HUGGINGFACE_TRANSFORMERS:
response = self._run_transformers(
prompt, model_path, max_new_tokens, temperature, top_p
)
else:
raise ValueError(f"Unsupported backend: {backend}")
if verbose:
elapsed = time.monotonic() - t0
resp_ids = self.encode(response, allowed_special="all")
tps = len(resp_ids) / elapsed if elapsed > 0 else 0.0
print(f"[chat] Response tokens: {len(resp_ids):,}")
print(f"[chat] Time : {elapsed:.2f}s ({tps:.1f} tok/s)")
return response
# -----------------------------------------------------------------------
# BACKEND IMPLEMENTATIONS
# -----------------------------------------------------------------------
def _run_mlx(
self,
prompt: str,
model_path: str,
max_new_tokens: int,
temperature: float,
top_p: float,
) -> str:
"""
Run inference using Apple MLX via the mlx-lm library.
The loaded model and tokenizer are cached in ``self._mlx_cache``
keyed by *model_path*, so repeated calls do not reload from disk.
"""
try:
from mlx_lm import load, generate # type: ignore[import]
except ImportError:
raise ImportError(
"mlx-lm is not installed. Install with: pip install mlx-lm"
)
if model_path not in self._mlx_cache:
self._mlx_cache[model_path] = load(model_path)
model, mlx_tokenizer = self._mlx_cache[model_path]
response: str = generate(
model,
mlx_tokenizer,
prompt=prompt,
max_tokens=max_new_tokens,
temp=temperature,
top_p=top_p,
verbose=False,
)
return response
def _run_llama_cpp(
self,
prompt: str,
model_path: str,
max_new_tokens: int,
temperature: float,
top_p: float,
n_gpu_layers: int,
n_ctx: int,
) -> str:
"""
Run inference using llama-cpp-python.
Supports NVIDIA CUDA (compiled with ``-DGGML_CUDA=on``),
AMD ROCm (compiled with ``-DGGML_HIPBLAS=on``), and CPU.
The model must be in GGUF format.
The loaded ``Llama`` instance is cached in ``self._llama_cpp_cache``
keyed by ``(model_path, n_gpu_layers, n_ctx)``.
"""
try:
from llama_cpp import Llama # type: ignore[import]
except ImportError:
raise ImportError(
"llama-cpp-python is not installed.\n"
" CPU only : pip install llama-cpp-python\n"
" NVIDIA : CMAKE_ARGS='-DGGML_CUDA=on' "
"pip install llama-cpp-python\n"
" AMD ROCm : CMAKE_ARGS='-DGGML_HIPBLAS=on' "
"pip install llama-cpp-python"
)
cache_key: Tuple = (model_path, n_gpu_layers, n_ctx)
if cache_key not in self._llama_cpp_cache:
self._llama_cpp_cache[cache_key] = Llama(
model_path=model_path,
n_gpu_layers=n_gpu_layers,
n_ctx=n_ctx,
verbose=False,
)
llm: Any = self._llama_cpp_cache[cache_key]
# Build stop tokens from the tokenizer's own special tokens.
stop_tokens: List[str] = [
tok for tok in self._special_tokens
if any(
kw in tok.lower()
for kw in ("eot", "eos", "end", "im_end", "</s>")
)
]
if not stop_tokens:
# Sensible defaults covering the most common model families.
stop_tokens = ["<|eot_id|>", "<|im_end|>", "</s>", "<|end_of_text|>"]
output: Any = llm(
prompt,
max_tokens=max_new_tokens,
temperature=temperature,
top_p=top_p,
echo=False,
stop=stop_tokens,
)
return output["choices"][0]["text"]
def _run_openvino(
self,
prompt: str,
model_path: str,
max_new_tokens: int,
temperature: float,
) -> str:
"""
Run inference using Intel OpenVINO via the optimum-intel library.
The model can be a HuggingFace model directory (converted to OpenVINO
IR on first use by optimum-intel) or a pre-converted OpenVINO IR
directory.
The loaded model and tokenizer are cached in ``self._openvino_cache``
keyed by *model_path*.
"""
try:
from optimum.intel import OVModelForCausalLM # type: ignore[import]
from transformers import AutoTokenizer as _HFTok # type: ignore[import]
except ImportError:
raise ImportError(
"optimum-intel is not installed. "
'Install with: pip install "optimum[openvino]" optimum-intel'
)
if model_path not in self._openvino_cache:
ov_model = OVModelForCausalLM.from_pretrained(
model_path,
device="AUTO",
ov_config={"PERFORMANCE_HINT": "LATENCY"},
)
hf_tok = _HFTok.from_pretrained(model_path)
self._openvino_cache[model_path] = (ov_model, hf_tok)
ov_model, hf_tok = self._openvino_cache[model_path]
inputs = hf_tok(prompt, return_tensors="pt")
outputs = ov_model.generate(
**inputs,
max_new_tokens=max_new_tokens,
do_sample=temperature > 0.0,
temperature=temperature if temperature > 0.0 else 1.0,
pad_token_id=hf_tok.eos_token_id,
)
new_ids = outputs[0][inputs["input_ids"].shape[1]:]
return hf_tok.decode(new_ids, skip_special_tokens=True)
def _run_transformers(
self,
prompt: str,
model_path: str,
max_new_tokens: int,
temperature: float,
top_p: float,
) -> str:
"""
Run inference using HuggingFace Transformers (universal CPU/GPU fallback).
Loads the model with ``device_map="auto"`` so it uses any available
GPU (CUDA or ROCm via PyTorch) or falls back to CPU.
The loaded model and tokenizer are cached in
``self._transformers_cache`` keyed by *model_path*.
"""
try:
import torch # type: ignore[import]
from transformers import ( # type: ignore[import]
AutoModelForCausalLM,
AutoTokenizer as _HFTok,
)
except ImportError:
raise ImportError(
"transformers and torch are required for the HuggingFace backend. "
"Install with: pip install transformers torch"
)
if model_path not in self._transformers_cache:
hf_tok = _HFTok.from_pretrained(model_path)
model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=(
torch.float16 if torch.cuda.is_available() else torch.float32
),
device_map="auto",
)
self._transformers_cache[model_path] = (model, hf_tok)
model, hf_tok = self._transformers_cache[model_path]
inputs = hf_tok(prompt, return_tensors="pt").to(model.device)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=max_new_tokens,
do_sample=temperature > 0.0,
temperature=temperature if temperature > 0.0 else 1.0,
top_p=top_p,
pad_token_id=hf_tok.eos_token_id,
)
new_ids = outputs[0][inputs["input_ids"].shape[1]:]
return hf_tok.decode(new_ids, skip_special_tokens=True)
# -----------------------------------------------------------------------
# STREAMING TOKENIZATION
# -----------------------------------------------------------------------
def encode_streaming(
self,
text_stream: Iterable[str],
allowed_special: Union[Set[str], Literal["all", "none"]] = "none",
) -> Iterator[List[int]]:
"""
Encode a stream of text chunks, yielding token IDs for each chunk.
Because BPE operates on pre-tokens (word-level chunks), a token
boundary may not align with an arbitrary chunk boundary. This method
buffers text across chunk boundaries and only encodes complete
pre-tokens, ensuring that the concatenation of all yielded ID lists
is identical to encoding the full text at once.
The boundary detection uses ``regex.finditer`` on the accumulated
buffer to find exact pre-token span positions, avoiding the ambiguity
of substring searches when the same pre-token appears multiple times.
Parameters
----------
text_stream : Iterable[str]
An iterable of text chunks (e.g. from a streaming LLM response).
allowed_special : Set[str] or "all" or "none"
Special token handling (see :meth:`encode`).
Yields
------
List[int]
Token IDs for the encodable portion of the current buffer.
The final yield flushes any remaining buffered text.
"""
buffer = ""
for chunk in text_stream:
buffer += chunk
# Find all pre-token match spans in the current buffer.
matches = list(self._compiled_pattern.finditer(buffer))
if len(matches) < 2:
# Fewer than two pre-tokens: the last one may be incomplete.
# Keep everything in the buffer and wait for more input.
continue
# Everything up to (but not including) the start of the last
# pre-token is safe to encode: the last pre-token might be
# extended by the next chunk.
last_match_start = matches[-1].start()
safe_text = buffer[:last_match_start]
buffer = buffer[last_match_start:]
if safe_text:
ids = self.encode(safe_text, allowed_special=allowed_special)
if ids:
yield ids
# Flush the remaining buffer.
if buffer:
ids = self.encode(buffer, allowed_special=allowed_special)
if ids:
yield ids
# -----------------------------------------------------------------------
# STATISTICS AND DIAGNOSTICS
# -----------------------------------------------------------------------
def get_cache_stats(self) -> Dict[str, Any]:
"""
Return statistics about the encoding cache.
Returns
-------
Dict[str, Any]
A dict with keys:
``"hits"``
Number of cache hits since the last :meth:`clear_cache` call.
``"misses"``
Number of cache misses.
``"size"``
Current number of entries in the cache.
``"hit_rate_pct"``
Cache hit rate as a percentage (float, rounded to one decimal
place).
"""
total = self._cache_hits + self._cache_misses
hit_rate = round(100.0 * self._cache_hits / total, 1) if total > 0 else 0.0
return {
"hits": self._cache_hits,
"misses": self._cache_misses,
"size": len(self._encode_cache),
"hit_rate_pct": hit_rate,
}
def clear_cache(self) -> None:
"""Clear the encoding cache and reset hit/miss counters."""
self._encode_cache.clear()
self._cache_hits = 0
self._cache_misses = 0
# -----------------------------------------------------------------------
# DUNDER METHODS
# -----------------------------------------------------------------------
def __len__(self) -> int:
return self.vocab_size
def __repr__(self) -> str:
return (
f"BPETokenizer("
f"vocab_size={self.vocab_size:,}, "
f"merges={len(self._merges):,}, "
f"special_tokens={len(self._special_tokens)})"
)
# ===========================================================================
# COMMAND-LINE INTERFACE
# ===========================================================================
def _cli_main() -> None:
"""
Entry point for the command-line interface.
Subcommands
-----------
train Train a new tokenizer from a text file.
encode Encode text (or a file) to token IDs.
decode Decode a sequence of token IDs to text.
chat Start an interactive multi-turn chat session with an LLM.
info Display detailed information about a saved tokenizer.
"""
parser = argparse.ArgumentParser(
prog="bpe_tokenizer",
description="BPE Tokenizer for Large Language Models",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples
--------
Train a tokenizer on a text file:
python bpe_tokenizer.py train \\
--corpus corpus.txt --vocab-size 32000 --output ./my_tokenizer
Encode a string:
python bpe_tokenizer.py encode \\
--tokenizer ./my_tokenizer --text "Hello, world!" --show-tokens
Encode a file:
python bpe_tokenizer.py encode \\
--tokenizer ./my_tokenizer --file document.txt
Decode token IDs:
python bpe_tokenizer.py decode \\
--tokenizer ./my_tokenizer --ids 9906 11 1917 0
Interactive chat (auto-detects best backend):
python bpe_tokenizer.py chat \\
--tokenizer ./my_tokenizer --model /path/to/model.gguf
Show tokenizer info:
python bpe_tokenizer.py info --tokenizer ./my_tokenizer
""",
)
subparsers = parser.add_subparsers(dest="command", required=True)
# ------------------------------------------------------------------
# train
# ------------------------------------------------------------------
p_train = subparsers.add_parser("train", help="Train a new BPE tokenizer.")
p_train.add_argument(
"--corpus", required=True,
help="Path to the training corpus (UTF-8 text file).",
)
p_train.add_argument(
"--vocab-size", type=int, default=32_000,
help="Target vocabulary size (default: 32000).",
)
p_train.add_argument(
"--min-frequency", type=int, default=2,
help="Minimum pair frequency for merging (default: 2).",
)
p_train.add_argument(
"--output", required=True,
help="Output directory for the saved tokenizer.",
)
p_train.add_argument(
"--pattern", choices=["gpt2", "cl100k"], default="gpt2",
help="Pre-tokenisation pattern (default: gpt2).",
)
# ------------------------------------------------------------------
# encode
# ------------------------------------------------------------------
p_encode = subparsers.add_parser("encode", help="Encode text to token IDs.")
p_encode.add_argument(
"--tokenizer", required=True,
help="Path to a saved tokenizer directory.",
)
encode_input = p_encode.add_mutually_exclusive_group(required=True)
encode_input.add_argument("--text", help="Text string to encode.")
encode_input.add_argument(
"--file", help="Path to a UTF-8 text file to encode."
)
p_encode.add_argument(
"--show-tokens", action="store_true",
help="Print token strings alongside IDs.",
)
# ------------------------------------------------------------------
# decode
# ------------------------------------------------------------------
p_decode = subparsers.add_parser("decode", help="Decode token IDs to text.")
p_decode.add_argument(
"--tokenizer", required=True,
help="Path to a saved tokenizer directory.",
)
p_decode.add_argument(
"--ids", nargs="+", type=int, required=True,
help="Space-separated list of token IDs to decode.",
)
# ------------------------------------------------------------------
# chat
# ------------------------------------------------------------------
p_chat = subparsers.add_parser(
"chat", help="Interactive multi-turn chat session with an LLM."
)
p_chat.add_argument(
"--tokenizer", required=True,
help="Path to a saved tokenizer directory.",
)
p_chat.add_argument(
"--model", required=True,
help="Path to the model file or directory.",
)
p_chat.add_argument(
"--template", choices=["llama3", "chatml", "alpaca"], default="llama3",
help="Chat template to use (default: llama3).",
)
p_chat.add_argument("--system", help="System prompt text.")
p_chat.add_argument(
"--max-tokens", type=int, default=512,
help="Maximum tokens to generate per turn (default: 512).",
)
p_chat.add_argument(
"--temperature", type=float, default=0.7,
help="Sampling temperature (default: 0.7).",
)
p_chat.add_argument(
"--top-p", type=float, default=0.9,
help="Top-p sampling parameter (default: 0.9).",
)
p_chat.add_argument(
"--n-gpu-layers", type=int, default=-1,
help="GPU layers for llama.cpp; -1 = all (default: -1).",
)
p_chat.add_argument(
"--n-ctx", type=int, default=8_192,
help="Context window size for llama.cpp (default: 8192).",
)
# ------------------------------------------------------------------
# info
# ------------------------------------------------------------------
p_info = subparsers.add_parser(
"info", help="Display information about a saved tokenizer."
)
p_info.add_argument(
"--tokenizer", required=True,
help="Path to a saved tokenizer directory.",
)
p_info.add_argument(
"--sample-text",
default="Hello, world! This is a tokenization test.",
help="Sample text for the encoding demonstration.",
)
args = parser.parse_args()
# ------------------------------------------------------------------
# Dispatch
# ------------------------------------------------------------------
if args.command == "train":
pattern = SPLIT_PATTERNS.get(args.pattern, GPT2_SPLIT_PATTERN)
tokenizer = BPETokenizer(split_pattern=pattern)
tokenizer.train(
corpus=Path(args.corpus),
vocab_size=args.vocab_size,
min_frequency=args.min_frequency,
verbose=True,
)
tokenizer.save(args.output)
elif args.command == "encode":
tokenizer = BPETokenizer.from_pretrained(args.tokenizer)
if args.text:
text = args.text
else:
with open(args.file, "r", encoding="utf-8") as fh:
text = fh.read()
ids = tokenizer.encode(text, allowed_special="all")
print(f"Token count : {len(ids)}")
print(f"Token IDs : {ids}")
if args.show_tokens:
tokens = tokenizer.convert_ids_to_tokens(ids)
print(f"Tokens : {tokens}")
elif args.command == "decode":
tokenizer = BPETokenizer.from_pretrained(args.tokenizer)
text = tokenizer.decode(args.ids)
print(f"Decoded text: {text!r}")
elif args.command == "chat":
tokenizer = BPETokenizer.from_pretrained(args.tokenizer)
messages: List[Dict[str, str]] = []
print("=" * 60)
print("BPETokenizer Chat Session")
print(f"Model : {args.model}")
print(f"Template : {args.template}")
print("Type 'quit' or press Ctrl-C to exit.")
print("=" * 60)
while True:
try:
user_input = input("\nYou: ").strip()
except (EOFError, KeyboardInterrupt):
print("\nExiting.")
break
if user_input.lower() in ("quit", "exit", "q"):
break
if not user_input:
continue
messages.append({"role": "user", "content": user_input})
try:
response = tokenizer.chat(
messages=messages,
model_path=args.model,
template=args.template,
system_prompt=args.system,
max_new_tokens=args.max_tokens,
temperature=args.temperature,
top_p=args.top_p,
n_gpu_layers=args.n_gpu_layers,
n_ctx=args.n_ctx,
verbose=True,
)
except Exception as exc:
print(f"[Error] {exc}")
# Remove the failed user message so the conversation stays
# consistent and the user can try again.
messages.pop()
continue
print(f"\nAssistant: {response}")
messages.append({"role": "assistant", "content": response})
elif args.command == "info":
tokenizer = BPETokenizer.from_pretrained(args.tokenizer)
print()
print("=" * 60)
print("TOKENIZER INFORMATION")
print("=" * 60)
print(f"Vocabulary size : {tokenizer.vocab_size:,}")
print(f"Number of merges : {len(tokenizer._merges):,}")
print(f"Special tokens : {len(tokenizer._special_tokens)}")
if tokenizer._special_tokens:
for tok, tok_id in sorted(
tokenizer._special_tokens.items(), key=lambda x: x[1]
):
print(f" {tok_id:8d} {tok}")
print(f"BOS token : {tokenizer.bos_token!r}")
print(f"EOS token : {tokenizer.eos_token!r}")
print(f"PAD token : {tokenizer.pad_token!r}")
print(f"UNK token : {tokenizer.unk_token!r}")
print()
sample = args.sample_text
ids = tokenizer.encode(sample, allowed_special="all")
tokens = tokenizer.convert_ids_to_tokens(ids)
decoded = tokenizer.decode(ids)
print(f"Sample text : {sample!r}")
print(f"Token count : {len(ids)}")
print(f"Token IDs : {ids}")
print(f"Tokens : {tokens}")
print(f"Decoded : {decoded!r}")
print(f"Lossless : {decoded == sample}")
print()
print("First 10 vocabulary entries:")
for i in range(min(10, len(tokenizer._id_to_token))):
tok = tokenizer._id_to_token[i]
print(f" {i:6d} {tok!r}")
print("Last 10 non-empty vocabulary entries:")
shown = 0
for i in range(len(tokenizer._id_to_token) - 1, -1, -1):
tok = tokenizer._id_to_token[i]
if tok:
print(f" {i:6d} {tok!r}")
shown += 1
if shown >= 10:
break
# ===========================================================================
# MODULE ENTRY POINT
# ===========================================================================
if __name__ == "__main__":
_cli_main()
Now the supporting artifacts:
requirements.txt
# bpe_tokenizer.py — runtime requirements
# Python >= 3.9 is required.
# ── Required ──────────────────────────────────────────────────────────────
regex>=2023.6.3
# ── Optional: HuggingFace Hub integration (from_huggingface) ──────────────
huggingface_hub>=0.20.0
# ── Optional: batch tensor output (return_tensors="np") ───────────────────
numpy>=1.24.0
# ── Optional: PyTorch tensor output + HuggingFace Transformers backend ────
torch>=2.0.0
transformers>=4.35.0
# ── Optional: Intel OpenVINO backend ──────────────────────────────────────
# Uncomment to enable:
# optimum[openvino]>=1.14.0
# optimum-intel>=1.14.0
# ── Optional: Apple MLX backend (Apple Silicon only) ──────────────────────
# Uncomment to enable:
# mlx>=0.12.0
# mlx-lm>=0.12.0
# ── Optional: llama-cpp-python (CPU / NVIDIA / AMD) ───────────────────────
# See README.md for GPU-specific build instructions.
# llama-cpp-python>=0.2.56
setup.py
"""
setup.py
--------
Install bpe_tokenizer as a Python package.
Usage
-----
pip install -e . # editable / development install
pip install . # regular install
pip install ".[hub]" # + HuggingFace Hub support
pip install ".[transformers]" # + HuggingFace Transformers backend
pip install ".[apple]" # + Apple MLX backend
pip install ".[openvino]" # + Intel OpenVINO backend
pip install ".[full]" # + all optional CPU/GPU dependencies
"""
from setuptools import setup, find_packages
setup(
name="bpe_tokenizer",
version="1.0.0",
description=(
"A production-ready Byte-Pair Encoding tokenizer "
"for Large Language Models"
),
long_description=open("README.md", encoding="utf-8").read(),
long_description_content_type="text/markdown",
author="Your Name",
python_requires=">=3.9",
py_modules=["bpe_tokenizer"],
install_requires=[
"regex>=2023.6.3",
],
extras_require={
"hub": [
"huggingface_hub>=0.20.0",
],
"transformers": [
"torch>=2.0.0",
"transformers>=4.35.0",
"huggingface_hub>=0.20.0",
],
"openvino": [
"optimum[openvino]>=1.14.0",
"optimum-intel>=1.14.0",
"transformers>=4.35.0",
],
"apple": [
"mlx>=0.12.0",
"mlx-lm>=0.12.0",
],
"full": [
"huggingface_hub>=0.20.0",
"numpy>=1.24.0",
"torch>=2.0.0",
"transformers>=4.35.0",
],
},
entry_points={
"console_scripts": [
"bpe-tokenizer=bpe_tokenizer:_cli_main",
],
},
classifiers=[
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Intended Audience :: Developers",
"Intended Audience :: Science/Research",
],
)
README.md
# BPE Tokenizer for Large Language Models
A production-ready Byte-Pair Encoding (BPE) tokenizer compatible with
GPT-2, GPT-3, GPT-4, LLaMA 3, Mistral, Qwen, and any model that uses
byte-level BPE tokenization.
## Requirements
- Python >= 3.9
- `regex` library (`pip install regex`)
## Installation
### Minimal (tokenizer only, no inference)
```bash
pip install regex
```
### With HuggingFace Hub support
```bash
pip install regex huggingface_hub
```
### With all CPU/GPU inference backends
```bash
pip install regex huggingface_hub transformers torch numpy
```
### Apple Silicon (MLX backend)
```bash
pip install regex huggingface_hub mlx mlx-lm
```
### NVIDIA GPU (llama-cpp-python with CUDA)
```bash
pip install regex huggingface_hub
CMAKE_ARGS="-DGGML_CUDA=on" pip install llama-cpp-python
```
### AMD GPU (llama-cpp-python with ROCm/HIP)
```bash
pip install regex huggingface_hub
CMAKE_ARGS="-DGGML_HIPBLAS=on" pip install llama-cpp-python
```
### Intel OpenVINO
```bash
pip install regex huggingface_hub "optimum[openvino]" optimum-intel transformers
```
## Quick Start
### Train a tokenizer
```python
from bpe_tokenizer import BPETokenizer
tokenizer = BPETokenizer()
tokenizer.train(
corpus="path/to/corpus.txt", # or an iterable of strings
vocab_size=32000,
verbose=True,
)
tokenizer.save("./my_tokenizer")
```
### Load a pre-trained tokenizer
```python
# From a local directory saved by BPETokenizer.save():
tokenizer = BPETokenizer.from_pretrained("./my_tokenizer")
# From the HuggingFace Hub:
tokenizer = BPETokenizer.from_huggingface(
"meta-llama/Meta-Llama-3-8B",
token="hf_...", # required for gated models
)
```
### Encode and decode
```python
ids = tokenizer.encode("Hello, world!")
text = tokenizer.decode(ids)
tokens = tokenizer.tokenize("Hello, world!")
count = tokenizer.count_tokens("Hello, world!")
```
### HuggingFace-style `__call__`
```python
# Single string -> List[int]
ids = tokenizer("Hello, world!")
# Batch with padding -> Dict[str, tensor]
batch = tokenizer(
["Short.", "A longer sentence here."],
padding=True,
truncation=True,
max_length=64,
return_tensors="pt",
)
```
### Chat with an LLM (auto-detects best backend)
```python
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain byte-pair encoding."},
]
response = tokenizer.chat(
messages=messages,
model_path="/path/to/model.gguf", # GGUF for llama.cpp backends
template="llama3",
max_new_tokens=512,
temperature=0.7,
verbose=True,
)
print(response)
```
## Command-Line Interface
```bash
# Train
python bpe_tokenizer.py train \
--corpus corpus.txt --vocab-size 32000 --output ./my_tokenizer
# Encode
python bpe_tokenizer.py encode \
--tokenizer ./my_tokenizer --text "Hello, world!" --show-tokens
# Decode
python bpe_tokenizer.py decode \
--tokenizer ./my_tokenizer --ids 9906 11 1917 0
# Interactive chat
python bpe_tokenizer.py chat \
--tokenizer ./my_tokenizer \
--model /path/to/model.gguf \
--template llama3 \
--system "You are a helpful assistant."
# Tokenizer info
python bpe_tokenizer.py info --tokenizer ./my_tokenizer
```
## Supported Inference Backends
| Backend | Hardware | Model Format | Notes |
|---|---|---|---|
| Apple MLX | Apple Silicon (M1/M2/M3/M4) | MLX safetensors | Fastest on Mac |
| NVIDIA CUDA | NVIDIA GPU | GGUF | Compile llama-cpp-python with `-DGGML_CUDA=on` |
| AMD ROCm | AMD GPU | GGUF | Compile llama-cpp-python with `-DGGML_HIPBLAS=on` |
| Intel OpenVINO | Intel CPU/iGPU/Arc/NPU | HF or IR | Via optimum-intel |
| HuggingFace Transformers | Any (CPU/GPU) | HF safetensors | Universal fallback |
## Key Design Decisions
- **NFC normalisation** is applied at both train and encode time, ensuring
that semantically identical Unicode strings always produce the same tokens.
- **Cached regex patterns** for special-token splitting avoid recompiling
the same pattern on every `encode()` call.
- **Per-backend model caches** prevent reloading model weights on every
`chat()` call.
- **Heap-based BPE** runs in O(n log n) per pre-token vs O(n²) for naive
scanning.
## License
MIT
example_usage.py
#!/usr/bin/env python3
"""
example_usage.py
================
Demonstrates all major features of BPETokenizer.
Run with:
python example_usage.py
No GPU or internet connection is required for the tokenizer training and
encode/decode demonstrations. The chat() demonstration requires a model
file and will be skipped if BPE_MODEL_PATH is not set.
"""
from __future__ import annotations
import os
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from bpe_tokenizer import (
BPETokenizer,
ChatTemplate,
LLAMA3_SPECIAL_TOKENS,
CHATML_SPECIAL_TOKENS,
GPT2_SPLIT_PATTERN,
CL100K_SPLIT_PATTERN,
detect_best_backend,
)
def section(title: str) -> None:
print()
print("=" * 70)
print(f" {title}")
print("=" * 70)
# ===========================================================================
# 1. TRAINING FROM SCRATCH
# ===========================================================================
section("1. Training a BPE tokenizer from scratch")
corpus_lines = [
"The quick brown fox jumps over the lazy dog.\n",
"Tokenization is the process of splitting text into tokens.\n",
"Large language models use byte-pair encoding for tokenization.\n",
"Python is a great programming language for machine learning.\n",
"The transformer architecture revolutionized natural language processing.\n",
"Byte-pair encoding starts with individual bytes and merges frequent pairs.\n",
"Special tokens mark the beginning and end of sequences.\n",
"The vocabulary size is a critical hyperparameter for LLMs.\n",
"Subword tokenization balances vocabulary size and sequence length.\n",
"Unicode normalization ensures consistent tokenization across encodings.\n",
] * 300
tokenizer = BPETokenizer(split_pattern=GPT2_SPLIT_PATTERN)
tokenizer.train(
corpus=iter(corpus_lines),
vocab_size=512,
min_frequency=2,
special_tokens={
"<|begin_of_text|>": 256,
"<|end_of_text|>": 257,
"<|eot_id|>": 258,
},
verbose=True,
)
print(f"\nTokenizer: {tokenizer!r}")
# ===========================================================================
# 2. ENCODE AND DECODE
# ===========================================================================
section("2. Encoding and decoding")
test_sentences = [
"Hello, world!",
"Tokenization is fascinating.",
"Unicode: café, naïve, 日本語, 🎉",
"Code: def hello(): print('world')",
"Numbers: 42, 3.14159, 1_000_000",
]
for sentence in test_sentences:
ids = tokenizer.encode(sentence, allowed_special="all")
decoded = tokenizer.decode(ids)
tokens = tokenizer.tokenize(sentence)
lossless = decoded == sentence
print(f"\n Input : {sentence!r}")
print(f" IDs : {ids}")
print(f" Tokens : {tokens}")
print(f" Decoded : {decoded!r}")
print(f" Lossless: {lossless}")
assert lossless, f"LOSSLESS CHECK FAILED for: {sentence!r}"
print("\nAll lossless checks passed.")
# ===========================================================================
# 3. NFC NORMALISATION CONSISTENCY
# ===========================================================================
section("3. NFC normalisation consistency")
# 'é' can be represented as U+00E9 (NFC) or U+0065 U+0301 (NFD).
nfc_text = "caf\u00e9" # NFC: single code point
nfd_text = "cafe\u0301" # NFD: base + combining accent
ids_nfc = tokenizer.encode(nfc_text)
ids_nfd = tokenizer.encode(nfd_text)
print(f" NFC input : {nfc_text!r} -> IDs: {ids_nfc}")
print(f" NFD input : {nfd_text!r} -> IDs: {ids_nfd}")
print(f" Same IDs : {ids_nfc == ids_nfd}")
assert ids_nfc == ids_nfd, "NFC/NFD normalisation is not consistent!"
print(" NFC normalisation check passed.")
# ===========================================================================
# 4. SPECIAL TOKENS
# ===========================================================================
section("4. Special token handling")
text_with_specials = "<|begin_of_text|>Hello, world!<|end_of_text|>"
ids_no_specials = tokenizer.encode(text_with_specials, allowed_special="none")
ids_with_specials = tokenizer.encode(text_with_specials, allowed_special="all")
print(f" allowed_special='none' -> {len(ids_no_specials)} tokens")
print(f" allowed_special='all' -> {len(ids_with_specials)} tokens")
print(f" IDs: {ids_with_specials}")
decoded_skip = tokenizer.decode(ids_with_specials, skip_special_tokens=True)
decoded_keep = tokenizer.decode(ids_with_specials, skip_special_tokens=False)
print(f" Decoded (skip specials): {decoded_skip!r}")
print(f" Decoded (keep specials): {decoded_keep!r}")
# ===========================================================================
# 5. BATCH ENCODING
# ===========================================================================
section("5. Batch encoding with padding and truncation")
batch_texts = [
"Short.",
"This is a medium-length sentence for testing.",
"Tokenization is the process of splitting text into smaller units called tokens.",
]
results = tokenizer.encode_batch(
batch_texts,
padding=True,
truncation=True,
max_length=32,
)
for i, result in enumerate(results):
print(f"\n [{i}] {batch_texts[i]!r}")
print(f" input_ids : {result.input_ids}")
print(f" attention_mask: {result.attention_mask}")
print(f" length : {len(result)}")
# ===========================================================================
# 6. DECODE BATCH WITH PADDING STRIPPING
# ===========================================================================
section("6. decode_batch with padding stripping")
padded_batch = [r.input_ids for r in results]
decoded_batch = tokenizer.decode_batch(padded_batch, skip_padding=True)
for i, text in enumerate(decoded_batch):
print(f" [{i}] {text!r}")
# ===========================================================================
# 7. __CALL__ INTERFACE
# ===========================================================================
section("7. HuggingFace-style __call__ interface")
ids_single = tokenizer("Hello from __call__!", allowed_special="all")
print(f" Single string -> {ids_single}")
batch_result = tokenizer(
["First sentence.", "Second, longer sentence here."],
padding=True,
truncation=True,
max_length=16,
)
print(f" Batch result type: {type(batch_result)}")
for r in batch_result:
print(f" {r}")
# ===========================================================================
# 8. CHAT TEMPLATES
# ===========================================================================
section("8. Chat template formatting")
messages = [
{"role": "system", "content": "You are a helpful AI assistant."},
{"role": "user", "content": "What is byte-pair encoding?"},
{"role": "assistant", "content": "BPE is a subword tokenization algorithm."},
{"role": "user", "content": "How does it work?"},
]
for tmpl_name in ("llama3", "chatml", "alpaca"):
formatted = tokenizer.apply_chat_template(
messages,
template=tmpl_name,
add_generation_prompt=True,
tokenize=False,
)
print(f"\n Template: {tmpl_name}")
print(f" {'─' * 50}")
preview = str(formatted)[:300]
print(f" {preview}{'...' if len(str(formatted)) > 300 else ''}")
# ===========================================================================
# 9. STREAMING TOKENIZATION
# ===========================================================================
section("9. Streaming tokenization")
def simulated_stream():
chunks = [
"The ", "quick ", "brown ", "fox ", "jumps ",
"over ", "the ", "lazy ", "dog. ",
"Tokenization ", "is ", "fascinating!",
]
for chunk in chunks:
yield chunk
all_ids = []
chunk_count = 0
for ids_chunk in tokenizer.encode_streaming(
simulated_stream(), allowed_special="all"
):
all_ids.extend(ids_chunk)
chunk_count += 1
print(f" Chunk {chunk_count}: {ids_chunk}")
full_text = "The quick brown fox jumps over the lazy dog. Tokenization is fascinating!"
full_ids = tokenizer.encode(full_text, allowed_special="all")
print(f"\n Streaming total IDs : {all_ids}")
print(f" Full encode IDs : {full_ids}")
print(f" Match : {all_ids == full_ids}")
# ===========================================================================
# 10. SAVE AND LOAD
# ===========================================================================
section("10. Save and load")
save_dir = Path("./demo_tokenizer_output")
tokenizer.save(save_dir, name="demo_tokenizer")
loaded = BPETokenizer.from_pretrained(save_dir)
original_ids = tokenizer.encode("Hello, tokenization!", allowed_special="all")
loaded_ids = loaded.encode("Hello, tokenization!", allowed_special="all")
print(f"\n Original IDs : {original_ids}")
print(f" Loaded IDs : {loaded_ids}")
print(f" Identical : {original_ids == loaded_ids}")
assert original_ids == loaded_ids, "Loaded tokenizer produces different results!"
# ===========================================================================
# 11. VOCABULARY UTILITIES
# ===========================================================================
section("11. Vocabulary utilities")
print(f" vocab_size : {tokenizer.vocab_size}")
print(f" count_tokens('Hi!') : {tokenizer.count_tokens('Hi!')}")
print(f" tokenize('Hi!') : {tokenizer.tokenize('Hi!')}")
print(f" convert_tokens_to_ids(['H', 'i']): "
f"{tokenizer.convert_tokens_to_ids(['H', 'i'])}")
print(f" convert_ids_to_tokens([72, 105]) : "
f"{tokenizer.convert_ids_to_tokens([72, 105])}")
truncated = tokenizer.truncate("Hello, world! This is a test.", max_tokens=5)
print(f" truncate to 5 tokens: {truncated!r}")
stats = tokenizer.get_cache_stats()
print(f" Cache stats: {stats}")
# ===========================================================================
# 12. ENCODE_BATCH VALIDATION
# ===========================================================================
section("12. encode_batch validation")
# Verify that truncation=True without max_length raises ValueError.
try:
tokenizer.encode_batch(["test"], truncation=True)
print(" ERROR: Should have raised ValueError!")
except ValueError as e:
print(f" Correctly raised ValueError: {e}")
# Verify that return_tensors with jagged sequences raises ValueError.
try:
tokenizer.encode_batch(
["short", "much longer sentence here"],
return_tensors="pt",
)
print(" ERROR: Should have raised ValueError!")
except (ValueError, ImportError) as e:
print(f" Correctly raised error: {type(e).__name__}: {e}")
# ===========================================================================
# 13. BACKEND DETECTION
# ===========================================================================
section("13. Hardware / backend detection")
backend = detect_best_backend(verbose=True)
print(f"\n Selected backend: {backend.name}")
# ===========================================================================
# 14. CHAT INFERENCE (optional)
# ===========================================================================
section("14. Chat inference (optional)")
model_path = os.environ.get("BPE_MODEL_PATH", "")
if not model_path:
print(
" Skipped: set the BPE_MODEL_PATH environment variable to the path\n"
" of a GGUF model file (for llama.cpp) or an MLX model directory\n"
" to run this demonstration.\n"
"\n"
" Example:\n"
" export BPE_MODEL_PATH=/path/to/llama-3-8b.Q4_K_M.gguf\n"
" python example_usage.py"
)
else:
hf_model = os.environ.get("BPE_HF_MODEL", "")
if hf_model:
try:
chat_tokenizer = BPETokenizer.from_huggingface(
hf_model,
token=os.environ.get("HF_TOKEN"),
)
except Exception as exc:
print(f" Could not load HF tokenizer ({exc}); using demo tokenizer.")
chat_tokenizer = tokenizer
else:
chat_tokenizer = tokenizer
chat_messages = [
{
"role": "user",
"content": "In one sentence, what is byte-pair encoding?",
}
]
try:
response = chat_tokenizer.chat(
messages=chat_messages,
model_path=model_path,
template=os.environ.get("BPE_TEMPLATE", "llama3"),
max_new_tokens=128,
temperature=0.7,
verbose=True,
)
print(f"\n Response: {response}")
except Exception as exc:
print(f" Chat failed: {exc}")
# ===========================================================================
# DONE
# ===========================================================================
section("All demonstrations complete")
print(" The BPETokenizer is working correctly on this system.")
print()
CONCLUSION
You have now traveled the complete journey from raw text to token IDs and
back again. You understand why tokenizers exist, how the major approaches
differ, and what makes byte-level BPE the dominant choice for modern LLMs.
More importantly, you have built a production-ready implementation from
scratch, with efficient encoding, multi-backend inference support, and a
clean, extensible architecture.
The key insights to carry with you are these. Tokenization is not a trivial
preprocessing step -- it fundamentally shapes what the model can and cannot
learn. The byte-level approach guarantees complete coverage of any input
without unknown tokens. BPE merge rules must be applied in exactly the order
they were learned, and this order is the heart of the tokenizer's identity.
Special tokens are not just conveniences; they are the grammar of the
human-model interface. And finally, a tokenizer is only as good as its
integration with the rest of the system -- which is why we built ours to
work seamlessly with every major inference backend.
The code in the Addendum is not a toy. It handles edge cases, caches
efficiently, validates inputs, and integrates with real hardware. You can
use it today, extend it for your specific needs, and trust it in production.
Happy tokenizing.