Gül Sena Altıntaş
Further improvements
ce07484
import os
import re
import traceback
import unicodedata
import tiktoken
from transformers import AutoTokenizer, XGLMTokenizerFast
from mappings import MODEL_MAP, TOKENIZER_INFO
TOKENIZER_CACHE = {}
class TokenMonsterTokenizer:
def __init__(self, name):
import tokenmonster
self.name = name
self.vocab = tokenmonster.load(name.split("/")[-1])
def __call__(self, text, **kwargs):
ids = list(self.vocab.tokenize(text))
return {"input_ids": ids}
def convert_ids_to_tokens(self, ids):
return [self.vocab.decode(id_) for id_ in ids]
def get_token_type(token_text):
if re.match(r"^\s+$", token_text):
return "whitespace"
elif re.match(r"^[a-zA-Z]+$", token_text):
return "word"
elif re.match(r"^\d+$", token_text):
return "number"
elif re.match(r"^[^\w\s]+$", token_text):
return "punctuation"
elif token_text.startswith("<") and token_text.endswith(">"):
return "special"
else:
return "mixed"
def is_subword(token_text, model, is_first):
if not token_text or token_text.isspace():
return False
if token_text.startswith("<") and token_text.endswith(">"):
return False # special token
if model in {
"llama-2",
"llama-3",
"gemma-2",
"bloom",
"aya-expanse",
"comma",
}:
return (
not (token_text.startswith("▁") or token_text.startswith("Ġ"))
and not is_first
)
elif model == "bert":
return token_text.startswith("##")
elif model in {"qwen3", "qwen2.5"}:
return (
not (token_text.startswith("▁") or token_text.startswith("Ġ"))
and not is_first
)
elif model in {"gpt-4", "gpt-2", "byt5"}:
return not token_text.startswith(" ") and not is_first
else:
return not is_first
def tokenize_with_tiktoken(text, model):
enc = tiktoken.encoding_for_model(model)
# Process the entire text at once, not line by line
token_ids = enc.encode(text)
token_data = []
current_text_pos = 0
# Build character-to-token mapping
char_to_tokens = {}
# Decode each token and find its position in the original text
for i, token_id in enumerate(token_ids):
token_text = enc.decode([token_id])
# Find where this token appears in the remaining text
remaining_text = text[current_text_pos:]
if token_text in remaining_text:
# Find the position of this token in the original text
local_pos = remaining_text.find(token_text)
actual_start = current_text_pos + local_pos
actual_end = actual_start + len(token_text)
# Map each character position to this token
for char_pos in range(actual_start, actual_end):
if char_pos not in char_to_tokens:
char_to_tokens[char_pos] = []
char_to_tokens[char_pos].append(token_id)
current_text_pos = actual_end
# Group consecutive characters that have the same token ID sets
processed_chars = set()
text_pos = 0
while text_pos < len(text):
if text_pos in processed_chars:
text_pos += 1
continue
# Get tokens for current character
current_tokens = char_to_tokens.get(text_pos, [])
if not current_tokens:
# Handle characters not covered by any token
token_data.append(
{
"text": text[text_pos],
"id": None,
"type": get_token_type(text[text_pos]),
"is_subword": False,
"bytes": len(text[text_pos].encode("utf-8")),
"position": len(token_data),
}
)
processed_chars.add(text_pos)
text_pos += 1
continue
# Find the span of characters that share the same token ID set
span_start = text_pos
span_end = text_pos + 1
# Extend span while characters have the same token set
while (
span_end < len(text)
and span_end in char_to_tokens
and char_to_tokens[span_end] == current_tokens
):
span_end += 1
# Get the text for this span
span_text = text[span_start:span_end]
# Create token data entry
token_data.append(
{
"text": span_text,
"id": current_tokens if len(current_tokens) > 1 else current_tokens[0],
"type": get_token_type(span_text),
"is_subword": is_subword(span_text, model, len(token_data) == 0),
"bytes": len(span_text.encode("utf-8")),
"position": len(token_data),
}
)
# Mark all characters in this span as processed
for pos in range(span_start, span_end):
processed_chars.add(pos)
text_pos = span_end
return {
"model": TOKENIZER_INFO[model]["name"],
"token_count": len(token_ids),
"tokens": token_data,
"compression_ratio": len(text) / len(token_data) if token_data else 0,
"encoding": TOKENIZER_INFO[model]["encoding"],
"vocab_size": TOKENIZER_INFO[model]["vocab_size"],
}
def tokenize_with_tiktoke1n(text, model):
encoding = "cl100k_base" if model == "gpt-4" else "gpt2"
enc = tiktoken.get_encoding(encoding)
token_data = []
current_pos = 0
text_ = text
for text in text_.split("\n"):
tokens = enc.encode(text + "\n")
# token_text = enc.decode([token_id])
# token_type = get_token_type(token_text)
# subword = is_subword(token_text, model, i == 0)
token_ids = encoding["input_ids"]
## offset in the text for each token, i.e. token i covers text[offsets[i][0]:offsets[i][1]]
offsets = encoding.get("offset_mapping", [])
token_data = []
curr_tok_id = 0
current_text_pos = 0
token_id = []
while curr_tok_id < len(token_ids) and curr_tok_id < len(tokens):
if offsets and curr_tok_id < len(offsets):
start, end = offsets[curr_tok_id]
actual_text = text[start:end]
if current_text_pos == end:
token_id.append(token_ids[curr_tok_id])
else:
token_id = [token_ids[curr_tok_id]]
token_type = get_token_type(actual_text)
subword = is_subword(actual_text, model, curr_tok_id == 0)
if current_text_pos != end:
token_data.append(
{
"text": actual_text,
"id": token_id,
"type": token_type,
"is_subword": subword,
"bytes": len(actual_text.encode("utf-8")),
"position": curr_tok_id,
}
)
curr_tok_id += 1
current_text_pos = end
return {
"model": TOKENIZER_INFO[model]["name"],
"token_count": len(token_data),
"tokens": token_data,
"compression_ratio": len(text) / len(token_data) if token_data else 0,
"encoding": TOKENIZER_INFO[model]["encoding"],
"vocab_size": TOKENIZER_INFO[model]["vocab_size"],
}
def get_hf_tokenizer(model):
model_name = MODEL_MAP.get(model, "gpt2")
if model_name in TOKENIZER_CACHE:
return TOKENIZER_CACHE[model_name]
# Get token from environment
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
return {
"model": TOKENIZER_INFO[model]["name"],
"token_count": 0,
"tokens": [],
"error": "HF_TOKEN not found in environment. Please add your HuggingFace token to Space secrets.",
}
if "tokenmonster" in model_name:
tokenizer = TokenMonsterTokenizer("englishcode-32000-consistent-v1")
else:
tokenizer = AutoTokenizer.from_pretrained(
model_name, token=hf_token, trust_remote_code=True
)
TOKENIZER_CACHE[model_name] = tokenizer
return tokenizer
def get_tokenizer(model):
# import code; code.interact(local=locals()|globals())
model_name = MODEL_MAP.get(model, None)
if model_name is None:
raise ValueError(f"Unknown tokenizer code {model_name}")
print(model_name)
if model_name in TOKENIZER_CACHE:
return TOKENIZER_CACHE[model_name]
# Get token from environment
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
return {
"model": TOKENIZER_INFO[model]["name"],
"token_count": 0,
"tokens": [],
"error": "HF_TOKEN not found in environment. Please add your HuggingFace token to Space secrets.",
}
if "tekken" in model_name:
from mistral_common.tokens.tokenizers.mistral import MistralTokenizer
tok = MistralTokenizer.v3(is_tekken=True)
tokenizer = tok.instruct_tokenizer.tokenizer
elif "tokenmonster" in model_name:
tokenizer = TokenMonsterTokenizer("englishcode-32000-consistent-v1")
elif "xglm" in model_name.lower():
# tokenizer = AutoTokenizer.from_pretrained(
tokenizer = XGLMTokenizerFast.from_pretrained(
model_name, token=hf_token, trust_remote_code=True,# use_fast=False
)
else:
tokenizer = AutoTokenizer.from_pretrained(
model_name, token=hf_token, trust_remote_code=True
)
TOKENIZER_CACHE[model_name] = tokenizer
return tokenizer
def tokenize_w_tekken(text, model):
tokenizer = get_tokenizer(model)
# Process the entire text at once, not line by line
token_ids = tokenizer.encode(text, bos=False, eos=False)
token_data = []
current_text_pos = 0
# Build character-to-token mapping
char_to_tokens = {}
# Decode each token and find its position in the original text
for i, token_id in enumerate(token_ids):
token_text = tokenizer.decode([token_id])
# Find where this token appears in the remaining text
remaining_text = text[current_text_pos:]
if token_text in remaining_text:
# Find the position of this token in the original text
local_pos = remaining_text.find(token_text)
actual_start = current_text_pos + local_pos
actual_end = actual_start + len(token_text)
# Map each character position to this token
for char_pos in range(actual_start, actual_end):
if char_pos not in char_to_tokens:
char_to_tokens[char_pos] = []
char_to_tokens[char_pos].append(token_id)
current_text_pos = actual_end
# Group consecutive characters that have the same token ID sets
processed_chars = set()
text_pos = 0
while text_pos < len(text):
if text_pos in processed_chars:
text_pos += 1
continue
# Get tokens for current character
current_tokens = char_to_tokens.get(text_pos, [])
if not current_tokens:
# Handle characters not covered by any token
token_data.append(
{
"text": text[text_pos],
"id": None,
"type": get_token_type(text[text_pos]),
"is_subword": False,
"bytes": len(text[text_pos].encode("utf-8")),
"position": len(token_data),
}
)
processed_chars.add(text_pos)
text_pos += 1
continue
# Find the span of characters that share the same token ID set
span_start = text_pos
span_end = text_pos + 1
# Extend span while characters have the same token set
while (
span_end < len(text)
and span_end in char_to_tokens
and char_to_tokens[span_end] == current_tokens
):
span_end += 1
# Get the text for this span
span_text = text[span_start:span_end]
# Create token data entry
token_data.append(
{
"text": span_text,
"id": current_tokens if len(current_tokens) > 1 else current_tokens[0],
"type": get_token_type(span_text),
"is_subword": is_subword(span_text, model, len(token_data) == 0),
"bytes": len(span_text.encode("utf-8")),
"position": len(token_data),
}
)
# Mark all characters in this span as processed
for pos in range(span_start, span_end):
processed_chars.add(pos)
text_pos = span_end
return {
"model": TOKENIZER_INFO[model]["name"],
"token_count": len(token_ids),
"tokens": token_data,
"compression_ratio": len(text) / len(token_data) if token_data else 0,
"encoding": TOKENIZER_INFO[model]["encoding"],
"vocab_size": TOKENIZER_INFO[model]["vocab_size"],
}
def tokenize_w_tekken1(text, model):
try:
tokenizer = get_tokenizer(model)
text_ = text
index = 0
token_data = []
for text_ in text.split("\n"):
text_ += "\n"
token_ids = tokenizer.encode(text_, bos=False, eos=False)
tokens = [tokenizer.decode([tok]) for tok in token_ids]
# import code; code.interact(local=locals()|globals())
for i, tok in enumerate(tokens):
tok = tok[0].encode("utf-8")
# token_type = get_token_type(tok)
token_type=None
# subword = is_subword(tok, tokenizer, is_first=index == 0)
subword=False
token_data.append(
{
"text": tok,
"id": token_ids[i],
"type": token_type,
"is_subword": subword,
"bytes": len(tok),
"position": index,
}
)
index += 1
# import code; code.interact(local=locals()|globals())
return {
"model": TOKENIZER_INFO[model]["name"],
"token_count": index,
"tokens": token_data,
"compression_ratio": len(text) / len(token_data) if token_data else 0,
"encoding": TOKENIZER_INFO[model]["encoding"],
"vocab_size": TOKENIZER_INFO[model]["vocab_size"],
}
except Exception as e:
# Your existing error handling...
print(f"Error: {e}")
pass
# Alternative version if you really need line-by-line processing:
def tokenize_with_hf(text, model):
try:
tokenizer = get_tokenizer(model)
all_token_data = []
global_position = 0
text_offset = 0
# Process line by line but accumulate results
for line in text.split("\n"):
line_with_newline = line + "\n"
encoding = tokenizer(
line_with_newline,
return_offsets_mapping=True,
return_tensors=None,
add_special_tokens=False,
)
token_ids = encoding["input_ids"]
tokens = tokenizer.convert_ids_to_tokens(token_ids)
offsets = encoding.get("offset_mapping", [])
# Process tokens for this line
for i in range(len(token_ids)):
if i < len(offsets) and offsets[i] is not None:
start, end = offsets[i]
actual_text = line_with_newline[start:end]
else:
actual_text = tokens[i] if i < len(tokens) else ""
if not actual_text:
continue
token_type = get_token_type(actual_text)
subword = is_subword(actual_text, model, global_position == 0)
all_token_data.append({
# "text": actual_text,
"text": tokens[i],
"id": [token_ids[i]],
"type": token_type,
"is_subword": subword,
"bytes": len(actual_text.encode("utf-8")),
"position": global_position,
})
global_position += 1
text_offset += len(line_with_newline)
# Calculate total token count
total_tokens = sum(len(encoding["input_ids"]) for encoding in [
tokenizer(text, return_tensors=None, add_special_tokens=False)
])
return {
"model": TOKENIZER_INFO[model]["name"],
"token_count": total_tokens,
"tokens": all_token_data,
"compression_ratio": len(text) / len(all_token_data) if all_token_data else 0,
"encoding": TOKENIZER_INFO[model]["encoding"],
"vocab_size": TOKENIZER_INFO[model]["vocab_size"],
}
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
return None
def tokenize_with_hfold(text, model):
try:
tokenizer = get_hf_tokenizer(model)
# Process the ENTIRE text at once, not line by line
text_ = text
token_data = []
for text_ in text.split("\n"):
text_ += "\n"
encoding = tokenizer(
text, # Use original text, not line by line
return_offsets_mapping=True,
return_tensors=None,
add_special_tokens=False,
)
token_ids = encoding["input_ids"]
tokens = tokenizer.convert_ids_to_tokens(token_ids)
## offset in the text for each token, i.e. token i covers text[offsets[i][0]:offsets[i][1]]
offsets = encoding.get("offset_mapping", [])
curr_tok_id = 0
current_text_pos = 0
token_id = []
while curr_tok_id < len(token_ids) and curr_tok_id < len(tokens):
if offsets and curr_tok_id < len(offsets):
start, end = offsets[curr_tok_id]
actual_text = text[start:end]
if current_text_pos == end:
token_id.append(token_ids[curr_tok_id])
else:
token_id = [token_ids[curr_tok_id]]
token_type = get_token_type(actual_text)
subword = is_subword(actual_text, model, curr_tok_id == 0)
if current_text_pos != end:
token_data.append(
{
"text": actual_text,
"id": token_id,
"type": token_type,
"is_subword": subword,
"bytes": len(actual_text.encode("utf-8")),
"position": curr_tok_id,
}
)
current_text_pos = end
else:
token_data.append(
{
"text": tokens[curr_tok_id],
"id": [token_ids[curr_tok_id]],
"type": get_token_type(tokens[curr_tok_id]),
"is_subword": is_subword(tokens[curr_tok_id]),
"bytes": len(tokens[curr_tok_id].encode("utf-8")),
"position": curr_tok_id,
}
)
curr_tok_id += 1
return {
"model": TOKENIZER_INFO[model]["name"],
"token_count": len(token_ids),
"tokens": token_data,
"compression_ratio": len(text) / len(token_data) if token_data else 0,
"encoding": TOKENIZER_INFO[model]["encoding"],
"vocab_size": TOKENIZER_INFO[model]["vocab_size"],
}
except Exception as e:
# Your existing error handling...
print(f"Error: {e}")
pass
def tokenize_with_byt5(text, model):
"""Special handling for ByT5 byte-level tokenizer"""
try:
tokenizer = get_hf_tokenizer(model)
# ByT5 doesn't support offset_mapping, so we handle it differently
encoding = tokenizer(
text,
return_tensors=None,
add_special_tokens=False,
)
token_ids = encoding["input_ids"]
# For ByT5, each token represents a byte
text_bytes = text.encode('utf-8')
token_data = []
for i, token_id in enumerate(token_ids):
# Decode individual token
try:
token_text = tokenizer.decode([token_id])
# For ByT5, tokens often correspond to individual bytes/characters
if i < len(text_bytes):
# Get the actual byte this token represents
byte_val = text_bytes[i]
actual_char = chr(byte_val) if byte_val < 128 else text_bytes[i:i+1].decode('utf-8', errors='replace')
else:
actual_char = token_text
token_type = get_token_type(actual_char)
subword = is_subword(actual_char, model, i == 0)
token_data.append({
"text": actual_char,
"id": [token_id],
"type": token_type,
"is_subword": subword,
"bytes": len(actual_char.encode("utf-8")),
"position": i,
})
except Exception as e:
# Handle special tokens or decoding issues
token_data.append({
"text": f"<special_token_{token_id}>",
"id": [token_id],
"type": "special",
"is_subword": False,
"bytes": 0,
"position": i,
})
return {
"model": TOKENIZER_INFO[model]["name"],
"token_count": len(token_ids),
"tokens": token_data,
"compression_ratio": len(text) / len(token_data) if token_data else 0,
"encoding": TOKENIZER_INFO[model]["encoding"],
"vocab_size": TOKENIZER_INFO[model]["vocab_size"],
}
except Exception as e:
print(f"Error in ByT5 tokenization: {e}")
return None
def normalize_text(text, method):
"""Apply normalization method to text"""
if method == "none":
return text
elif method == "lowercase":
return text.lower()
elif method == "nfc":
return unicodedata.normalize("NFC", text)
elif method == "nfd":
return unicodedata.normalize("NFD", text)
elif method == "nfk":
return unicodedata.normalize("NFK", text)
elif method == "nfkc":
return unicodedata.normalize("NFKC", text)
elif method == "nfkd":
return unicodedata.normalize("NFKD", text)
elif method == "strip_accents":
return "".join(
c
for c in unicodedata.normalize("NFD", text)
if unicodedata.category(c) != "Mn"
)
elif method == "strip_punctuation":
return re.sub(r"[^\w\s]", "", text)
elif method == "whitespace_normalize":
return " ".join(text.split())
return text
def get_normalization_methods():
"""Return available normalization methods"""
return [
("none", "No normalization"),
("lowercase", "Lowercase"),
("nfc", "Unicode NFC (Canonical)"),
("nfd", "Unicode NFD (Decomposed)"),
("nfk", ""),
("nfkc", "Unicode NFKC (Compatible)"),
("nfkd", "Unicode NFKD (Compatible Decomposed)"),
("strip_accents", "Remove Accents"),
("strip_punctuation", "Remove Punctuation"),
("whitespace_normalize", "Normalize Whitespace"),
]
def clean_token_display(token_text, tokenizer=None):
"""Clean up token display to avoid ? characters"""
if token_text == "\n" or token_text == "<newline> ":
return "<newline>"
# Handle common prefixes
if token_text.startswith("Ġ"): # GPT-2 style
return " " + token_text[1:]
elif token_text.startswith("▁"): # SentencePiece style
return " " + token_text[1:]
# Handle byte-level representations
if token_text.startswith("<0x") and token_text.endswith(">"):
try:
# Convert hex byte to character
hex_val = token_text[3:-1]
byte_val = int(hex_val, 16)
return chr(byte_val) if 32 <= byte_val <= 126 else f"[{hex_val}]"
except:
return token_text
# Handle other special cases
if "�" in token_text: # Unicode replacement character
return token_text.replace("�", "?")
return token_text