Spaces:
Running
Running
| import os | |
| import re | |
| import traceback | |
| import unicodedata | |
| import tiktoken | |
| from transformers import AutoTokenizer, XGLMTokenizerFast | |
| from mappings import MODEL_MAP, TOKENIZER_INFO | |
| TOKENIZER_CACHE = {} | |
| class TokenMonsterTokenizer: | |
| def __init__(self, name): | |
| import tokenmonster | |
| self.name = name | |
| self.vocab = tokenmonster.load(name.split("/")[-1]) | |
| def __call__(self, text, **kwargs): | |
| ids = list(self.vocab.tokenize(text)) | |
| return {"input_ids": ids} | |
| def convert_ids_to_tokens(self, ids): | |
| return [self.vocab.decode(id_) for id_ in ids] | |
| def get_token_type(token_text): | |
| if re.match(r"^\s+$", token_text): | |
| return "whitespace" | |
| elif re.match(r"^[a-zA-Z]+$", token_text): | |
| return "word" | |
| elif re.match(r"^\d+$", token_text): | |
| return "number" | |
| elif re.match(r"^[^\w\s]+$", token_text): | |
| return "punctuation" | |
| elif token_text.startswith("<") and token_text.endswith(">"): | |
| return "special" | |
| else: | |
| return "mixed" | |
| def is_subword(token_text, model, is_first): | |
| if not token_text or token_text.isspace(): | |
| return False | |
| if token_text.startswith("<") and token_text.endswith(">"): | |
| return False # special token | |
| if model in { | |
| "llama-2", | |
| "llama-3", | |
| "gemma-2", | |
| "bloom", | |
| "aya-expanse", | |
| "comma", | |
| }: | |
| return ( | |
| not (token_text.startswith("▁") or token_text.startswith("Ġ")) | |
| and not is_first | |
| ) | |
| elif model == "bert": | |
| return token_text.startswith("##") | |
| elif model in {"qwen3", "qwen2.5"}: | |
| return ( | |
| not (token_text.startswith("▁") or token_text.startswith("Ġ")) | |
| and not is_first | |
| ) | |
| elif model in {"gpt-4", "gpt-2", "byt5"}: | |
| return not token_text.startswith(" ") and not is_first | |
| else: | |
| return not is_first | |
| def tokenize_with_tiktoken(text, model): | |
| enc = tiktoken.encoding_for_model(model) | |
| # Process the entire text at once, not line by line | |
| token_ids = enc.encode(text) | |
| token_data = [] | |
| current_text_pos = 0 | |
| # Build character-to-token mapping | |
| char_to_tokens = {} | |
| # Decode each token and find its position in the original text | |
| for i, token_id in enumerate(token_ids): | |
| token_text = enc.decode([token_id]) | |
| # Find where this token appears in the remaining text | |
| remaining_text = text[current_text_pos:] | |
| if token_text in remaining_text: | |
| # Find the position of this token in the original text | |
| local_pos = remaining_text.find(token_text) | |
| actual_start = current_text_pos + local_pos | |
| actual_end = actual_start + len(token_text) | |
| # Map each character position to this token | |
| for char_pos in range(actual_start, actual_end): | |
| if char_pos not in char_to_tokens: | |
| char_to_tokens[char_pos] = [] | |
| char_to_tokens[char_pos].append(token_id) | |
| current_text_pos = actual_end | |
| # Group consecutive characters that have the same token ID sets | |
| processed_chars = set() | |
| text_pos = 0 | |
| while text_pos < len(text): | |
| if text_pos in processed_chars: | |
| text_pos += 1 | |
| continue | |
| # Get tokens for current character | |
| current_tokens = char_to_tokens.get(text_pos, []) | |
| if not current_tokens: | |
| # Handle characters not covered by any token | |
| token_data.append( | |
| { | |
| "text": text[text_pos], | |
| "id": None, | |
| "type": get_token_type(text[text_pos]), | |
| "is_subword": False, | |
| "bytes": len(text[text_pos].encode("utf-8")), | |
| "position": len(token_data), | |
| } | |
| ) | |
| processed_chars.add(text_pos) | |
| text_pos += 1 | |
| continue | |
| # Find the span of characters that share the same token ID set | |
| span_start = text_pos | |
| span_end = text_pos + 1 | |
| # Extend span while characters have the same token set | |
| while ( | |
| span_end < len(text) | |
| and span_end in char_to_tokens | |
| and char_to_tokens[span_end] == current_tokens | |
| ): | |
| span_end += 1 | |
| # Get the text for this span | |
| span_text = text[span_start:span_end] | |
| # Create token data entry | |
| token_data.append( | |
| { | |
| "text": span_text, | |
| "id": current_tokens if len(current_tokens) > 1 else current_tokens[0], | |
| "type": get_token_type(span_text), | |
| "is_subword": is_subword(span_text, model, len(token_data) == 0), | |
| "bytes": len(span_text.encode("utf-8")), | |
| "position": len(token_data), | |
| } | |
| ) | |
| # Mark all characters in this span as processed | |
| for pos in range(span_start, span_end): | |
| processed_chars.add(pos) | |
| text_pos = span_end | |
| return { | |
| "model": TOKENIZER_INFO[model]["name"], | |
| "token_count": len(token_ids), | |
| "tokens": token_data, | |
| "compression_ratio": len(text) / len(token_data) if token_data else 0, | |
| "encoding": TOKENIZER_INFO[model]["encoding"], | |
| "vocab_size": TOKENIZER_INFO[model]["vocab_size"], | |
| } | |
| def tokenize_with_tiktoke1n(text, model): | |
| encoding = "cl100k_base" if model == "gpt-4" else "gpt2" | |
| enc = tiktoken.get_encoding(encoding) | |
| token_data = [] | |
| current_pos = 0 | |
| text_ = text | |
| for text in text_.split("\n"): | |
| tokens = enc.encode(text + "\n") | |
| # token_text = enc.decode([token_id]) | |
| # token_type = get_token_type(token_text) | |
| # subword = is_subword(token_text, model, i == 0) | |
| token_ids = encoding["input_ids"] | |
| ## offset in the text for each token, i.e. token i covers text[offsets[i][0]:offsets[i][1]] | |
| offsets = encoding.get("offset_mapping", []) | |
| token_data = [] | |
| curr_tok_id = 0 | |
| current_text_pos = 0 | |
| token_id = [] | |
| while curr_tok_id < len(token_ids) and curr_tok_id < len(tokens): | |
| if offsets and curr_tok_id < len(offsets): | |
| start, end = offsets[curr_tok_id] | |
| actual_text = text[start:end] | |
| if current_text_pos == end: | |
| token_id.append(token_ids[curr_tok_id]) | |
| else: | |
| token_id = [token_ids[curr_tok_id]] | |
| token_type = get_token_type(actual_text) | |
| subword = is_subword(actual_text, model, curr_tok_id == 0) | |
| if current_text_pos != end: | |
| token_data.append( | |
| { | |
| "text": actual_text, | |
| "id": token_id, | |
| "type": token_type, | |
| "is_subword": subword, | |
| "bytes": len(actual_text.encode("utf-8")), | |
| "position": curr_tok_id, | |
| } | |
| ) | |
| curr_tok_id += 1 | |
| current_text_pos = end | |
| return { | |
| "model": TOKENIZER_INFO[model]["name"], | |
| "token_count": len(token_data), | |
| "tokens": token_data, | |
| "compression_ratio": len(text) / len(token_data) if token_data else 0, | |
| "encoding": TOKENIZER_INFO[model]["encoding"], | |
| "vocab_size": TOKENIZER_INFO[model]["vocab_size"], | |
| } | |
| def get_hf_tokenizer(model): | |
| model_name = MODEL_MAP.get(model, "gpt2") | |
| if model_name in TOKENIZER_CACHE: | |
| return TOKENIZER_CACHE[model_name] | |
| # Get token from environment | |
| hf_token = os.getenv("HF_TOKEN") | |
| if not hf_token: | |
| return { | |
| "model": TOKENIZER_INFO[model]["name"], | |
| "token_count": 0, | |
| "tokens": [], | |
| "error": "HF_TOKEN not found in environment. Please add your HuggingFace token to Space secrets.", | |
| } | |
| if "tokenmonster" in model_name: | |
| tokenizer = TokenMonsterTokenizer("englishcode-32000-consistent-v1") | |
| else: | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| model_name, token=hf_token, trust_remote_code=True | |
| ) | |
| TOKENIZER_CACHE[model_name] = tokenizer | |
| return tokenizer | |
| def get_tokenizer(model): | |
| # import code; code.interact(local=locals()|globals()) | |
| model_name = MODEL_MAP.get(model, None) | |
| if model_name is None: | |
| raise ValueError(f"Unknown tokenizer code {model_name}") | |
| print(model_name) | |
| if model_name in TOKENIZER_CACHE: | |
| return TOKENIZER_CACHE[model_name] | |
| # Get token from environment | |
| hf_token = os.getenv("HF_TOKEN") | |
| if not hf_token: | |
| return { | |
| "model": TOKENIZER_INFO[model]["name"], | |
| "token_count": 0, | |
| "tokens": [], | |
| "error": "HF_TOKEN not found in environment. Please add your HuggingFace token to Space secrets.", | |
| } | |
| if "tekken" in model_name: | |
| from mistral_common.tokens.tokenizers.mistral import MistralTokenizer | |
| tok = MistralTokenizer.v3(is_tekken=True) | |
| tokenizer = tok.instruct_tokenizer.tokenizer | |
| elif "tokenmonster" in model_name: | |
| tokenizer = TokenMonsterTokenizer("englishcode-32000-consistent-v1") | |
| elif "xglm" in model_name.lower(): | |
| # tokenizer = AutoTokenizer.from_pretrained( | |
| tokenizer = XGLMTokenizerFast.from_pretrained( | |
| model_name, token=hf_token, trust_remote_code=True,# use_fast=False | |
| ) | |
| else: | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| model_name, token=hf_token, trust_remote_code=True | |
| ) | |
| TOKENIZER_CACHE[model_name] = tokenizer | |
| return tokenizer | |
| def tokenize_w_tekken(text, model): | |
| tokenizer = get_tokenizer(model) | |
| # Process the entire text at once, not line by line | |
| token_ids = tokenizer.encode(text, bos=False, eos=False) | |
| token_data = [] | |
| current_text_pos = 0 | |
| # Build character-to-token mapping | |
| char_to_tokens = {} | |
| # Decode each token and find its position in the original text | |
| for i, token_id in enumerate(token_ids): | |
| token_text = tokenizer.decode([token_id]) | |
| # Find where this token appears in the remaining text | |
| remaining_text = text[current_text_pos:] | |
| if token_text in remaining_text: | |
| # Find the position of this token in the original text | |
| local_pos = remaining_text.find(token_text) | |
| actual_start = current_text_pos + local_pos | |
| actual_end = actual_start + len(token_text) | |
| # Map each character position to this token | |
| for char_pos in range(actual_start, actual_end): | |
| if char_pos not in char_to_tokens: | |
| char_to_tokens[char_pos] = [] | |
| char_to_tokens[char_pos].append(token_id) | |
| current_text_pos = actual_end | |
| # Group consecutive characters that have the same token ID sets | |
| processed_chars = set() | |
| text_pos = 0 | |
| while text_pos < len(text): | |
| if text_pos in processed_chars: | |
| text_pos += 1 | |
| continue | |
| # Get tokens for current character | |
| current_tokens = char_to_tokens.get(text_pos, []) | |
| if not current_tokens: | |
| # Handle characters not covered by any token | |
| token_data.append( | |
| { | |
| "text": text[text_pos], | |
| "id": None, | |
| "type": get_token_type(text[text_pos]), | |
| "is_subword": False, | |
| "bytes": len(text[text_pos].encode("utf-8")), | |
| "position": len(token_data), | |
| } | |
| ) | |
| processed_chars.add(text_pos) | |
| text_pos += 1 | |
| continue | |
| # Find the span of characters that share the same token ID set | |
| span_start = text_pos | |
| span_end = text_pos + 1 | |
| # Extend span while characters have the same token set | |
| while ( | |
| span_end < len(text) | |
| and span_end in char_to_tokens | |
| and char_to_tokens[span_end] == current_tokens | |
| ): | |
| span_end += 1 | |
| # Get the text for this span | |
| span_text = text[span_start:span_end] | |
| # Create token data entry | |
| token_data.append( | |
| { | |
| "text": span_text, | |
| "id": current_tokens if len(current_tokens) > 1 else current_tokens[0], | |
| "type": get_token_type(span_text), | |
| "is_subword": is_subword(span_text, model, len(token_data) == 0), | |
| "bytes": len(span_text.encode("utf-8")), | |
| "position": len(token_data), | |
| } | |
| ) | |
| # Mark all characters in this span as processed | |
| for pos in range(span_start, span_end): | |
| processed_chars.add(pos) | |
| text_pos = span_end | |
| return { | |
| "model": TOKENIZER_INFO[model]["name"], | |
| "token_count": len(token_ids), | |
| "tokens": token_data, | |
| "compression_ratio": len(text) / len(token_data) if token_data else 0, | |
| "encoding": TOKENIZER_INFO[model]["encoding"], | |
| "vocab_size": TOKENIZER_INFO[model]["vocab_size"], | |
| } | |
| def tokenize_w_tekken1(text, model): | |
| try: | |
| tokenizer = get_tokenizer(model) | |
| text_ = text | |
| index = 0 | |
| token_data = [] | |
| for text_ in text.split("\n"): | |
| text_ += "\n" | |
| token_ids = tokenizer.encode(text_, bos=False, eos=False) | |
| tokens = [tokenizer.decode([tok]) for tok in token_ids] | |
| # import code; code.interact(local=locals()|globals()) | |
| for i, tok in enumerate(tokens): | |
| tok = tok[0].encode("utf-8") | |
| # token_type = get_token_type(tok) | |
| token_type=None | |
| # subword = is_subword(tok, tokenizer, is_first=index == 0) | |
| subword=False | |
| token_data.append( | |
| { | |
| "text": tok, | |
| "id": token_ids[i], | |
| "type": token_type, | |
| "is_subword": subword, | |
| "bytes": len(tok), | |
| "position": index, | |
| } | |
| ) | |
| index += 1 | |
| # import code; code.interact(local=locals()|globals()) | |
| return { | |
| "model": TOKENIZER_INFO[model]["name"], | |
| "token_count": index, | |
| "tokens": token_data, | |
| "compression_ratio": len(text) / len(token_data) if token_data else 0, | |
| "encoding": TOKENIZER_INFO[model]["encoding"], | |
| "vocab_size": TOKENIZER_INFO[model]["vocab_size"], | |
| } | |
| except Exception as e: | |
| # Your existing error handling... | |
| print(f"Error: {e}") | |
| pass | |
| # Alternative version if you really need line-by-line processing: | |
| def tokenize_with_hf(text, model): | |
| try: | |
| tokenizer = get_tokenizer(model) | |
| all_token_data = [] | |
| global_position = 0 | |
| text_offset = 0 | |
| # Process line by line but accumulate results | |
| for line in text.split("\n"): | |
| line_with_newline = line + "\n" | |
| encoding = tokenizer( | |
| line_with_newline, | |
| return_offsets_mapping=True, | |
| return_tensors=None, | |
| add_special_tokens=False, | |
| ) | |
| token_ids = encoding["input_ids"] | |
| tokens = tokenizer.convert_ids_to_tokens(token_ids) | |
| offsets = encoding.get("offset_mapping", []) | |
| # Process tokens for this line | |
| for i in range(len(token_ids)): | |
| if i < len(offsets) and offsets[i] is not None: | |
| start, end = offsets[i] | |
| actual_text = line_with_newline[start:end] | |
| else: | |
| actual_text = tokens[i] if i < len(tokens) else "" | |
| if not actual_text: | |
| continue | |
| token_type = get_token_type(actual_text) | |
| subword = is_subword(actual_text, model, global_position == 0) | |
| all_token_data.append({ | |
| # "text": actual_text, | |
| "text": tokens[i], | |
| "id": [token_ids[i]], | |
| "type": token_type, | |
| "is_subword": subword, | |
| "bytes": len(actual_text.encode("utf-8")), | |
| "position": global_position, | |
| }) | |
| global_position += 1 | |
| text_offset += len(line_with_newline) | |
| # Calculate total token count | |
| total_tokens = sum(len(encoding["input_ids"]) for encoding in [ | |
| tokenizer(text, return_tensors=None, add_special_tokens=False) | |
| ]) | |
| return { | |
| "model": TOKENIZER_INFO[model]["name"], | |
| "token_count": total_tokens, | |
| "tokens": all_token_data, | |
| "compression_ratio": len(text) / len(all_token_data) if all_token_data else 0, | |
| "encoding": TOKENIZER_INFO[model]["encoding"], | |
| "vocab_size": TOKENIZER_INFO[model]["vocab_size"], | |
| } | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |
| def tokenize_with_hfold(text, model): | |
| try: | |
| tokenizer = get_hf_tokenizer(model) | |
| # Process the ENTIRE text at once, not line by line | |
| text_ = text | |
| token_data = [] | |
| for text_ in text.split("\n"): | |
| text_ += "\n" | |
| encoding = tokenizer( | |
| text, # Use original text, not line by line | |
| return_offsets_mapping=True, | |
| return_tensors=None, | |
| add_special_tokens=False, | |
| ) | |
| token_ids = encoding["input_ids"] | |
| tokens = tokenizer.convert_ids_to_tokens(token_ids) | |
| ## offset in the text for each token, i.e. token i covers text[offsets[i][0]:offsets[i][1]] | |
| offsets = encoding.get("offset_mapping", []) | |
| curr_tok_id = 0 | |
| current_text_pos = 0 | |
| token_id = [] | |
| while curr_tok_id < len(token_ids) and curr_tok_id < len(tokens): | |
| if offsets and curr_tok_id < len(offsets): | |
| start, end = offsets[curr_tok_id] | |
| actual_text = text[start:end] | |
| if current_text_pos == end: | |
| token_id.append(token_ids[curr_tok_id]) | |
| else: | |
| token_id = [token_ids[curr_tok_id]] | |
| token_type = get_token_type(actual_text) | |
| subword = is_subword(actual_text, model, curr_tok_id == 0) | |
| if current_text_pos != end: | |
| token_data.append( | |
| { | |
| "text": actual_text, | |
| "id": token_id, | |
| "type": token_type, | |
| "is_subword": subword, | |
| "bytes": len(actual_text.encode("utf-8")), | |
| "position": curr_tok_id, | |
| } | |
| ) | |
| current_text_pos = end | |
| else: | |
| token_data.append( | |
| { | |
| "text": tokens[curr_tok_id], | |
| "id": [token_ids[curr_tok_id]], | |
| "type": get_token_type(tokens[curr_tok_id]), | |
| "is_subword": is_subword(tokens[curr_tok_id]), | |
| "bytes": len(tokens[curr_tok_id].encode("utf-8")), | |
| "position": curr_tok_id, | |
| } | |
| ) | |
| curr_tok_id += 1 | |
| return { | |
| "model": TOKENIZER_INFO[model]["name"], | |
| "token_count": len(token_ids), | |
| "tokens": token_data, | |
| "compression_ratio": len(text) / len(token_data) if token_data else 0, | |
| "encoding": TOKENIZER_INFO[model]["encoding"], | |
| "vocab_size": TOKENIZER_INFO[model]["vocab_size"], | |
| } | |
| except Exception as e: | |
| # Your existing error handling... | |
| print(f"Error: {e}") | |
| pass | |
| def tokenize_with_byt5(text, model): | |
| """Special handling for ByT5 byte-level tokenizer""" | |
| try: | |
| tokenizer = get_hf_tokenizer(model) | |
| # ByT5 doesn't support offset_mapping, so we handle it differently | |
| encoding = tokenizer( | |
| text, | |
| return_tensors=None, | |
| add_special_tokens=False, | |
| ) | |
| token_ids = encoding["input_ids"] | |
| # For ByT5, each token represents a byte | |
| text_bytes = text.encode('utf-8') | |
| token_data = [] | |
| for i, token_id in enumerate(token_ids): | |
| # Decode individual token | |
| try: | |
| token_text = tokenizer.decode([token_id]) | |
| # For ByT5, tokens often correspond to individual bytes/characters | |
| if i < len(text_bytes): | |
| # Get the actual byte this token represents | |
| byte_val = text_bytes[i] | |
| actual_char = chr(byte_val) if byte_val < 128 else text_bytes[i:i+1].decode('utf-8', errors='replace') | |
| else: | |
| actual_char = token_text | |
| token_type = get_token_type(actual_char) | |
| subword = is_subword(actual_char, model, i == 0) | |
| token_data.append({ | |
| "text": actual_char, | |
| "id": [token_id], | |
| "type": token_type, | |
| "is_subword": subword, | |
| "bytes": len(actual_char.encode("utf-8")), | |
| "position": i, | |
| }) | |
| except Exception as e: | |
| # Handle special tokens or decoding issues | |
| token_data.append({ | |
| "text": f"<special_token_{token_id}>", | |
| "id": [token_id], | |
| "type": "special", | |
| "is_subword": False, | |
| "bytes": 0, | |
| "position": i, | |
| }) | |
| return { | |
| "model": TOKENIZER_INFO[model]["name"], | |
| "token_count": len(token_ids), | |
| "tokens": token_data, | |
| "compression_ratio": len(text) / len(token_data) if token_data else 0, | |
| "encoding": TOKENIZER_INFO[model]["encoding"], | |
| "vocab_size": TOKENIZER_INFO[model]["vocab_size"], | |
| } | |
| except Exception as e: | |
| print(f"Error in ByT5 tokenization: {e}") | |
| return None | |
| def normalize_text(text, method): | |
| """Apply normalization method to text""" | |
| if method == "none": | |
| return text | |
| elif method == "lowercase": | |
| return text.lower() | |
| elif method == "nfc": | |
| return unicodedata.normalize("NFC", text) | |
| elif method == "nfd": | |
| return unicodedata.normalize("NFD", text) | |
| elif method == "nfk": | |
| return unicodedata.normalize("NFK", text) | |
| elif method == "nfkc": | |
| return unicodedata.normalize("NFKC", text) | |
| elif method == "nfkd": | |
| return unicodedata.normalize("NFKD", text) | |
| elif method == "strip_accents": | |
| return "".join( | |
| c | |
| for c in unicodedata.normalize("NFD", text) | |
| if unicodedata.category(c) != "Mn" | |
| ) | |
| elif method == "strip_punctuation": | |
| return re.sub(r"[^\w\s]", "", text) | |
| elif method == "whitespace_normalize": | |
| return " ".join(text.split()) | |
| return text | |
| def get_normalization_methods(): | |
| """Return available normalization methods""" | |
| return [ | |
| ("none", "No normalization"), | |
| ("lowercase", "Lowercase"), | |
| ("nfc", "Unicode NFC (Canonical)"), | |
| ("nfd", "Unicode NFD (Decomposed)"), | |
| ("nfk", ""), | |
| ("nfkc", "Unicode NFKC (Compatible)"), | |
| ("nfkd", "Unicode NFKD (Compatible Decomposed)"), | |
| ("strip_accents", "Remove Accents"), | |
| ("strip_punctuation", "Remove Punctuation"), | |
| ("whitespace_normalize", "Normalize Whitespace"), | |
| ] | |
| def clean_token_display(token_text, tokenizer=None): | |
| """Clean up token display to avoid ? characters""" | |
| if token_text == "\n" or token_text == "<newline> ": | |
| return "<newline>" | |
| # Handle common prefixes | |
| if token_text.startswith("Ġ"): # GPT-2 style | |
| return " " + token_text[1:] | |
| elif token_text.startswith("▁"): # SentencePiece style | |
| return " " + token_text[1:] | |
| # Handle byte-level representations | |
| if token_text.startswith("<0x") and token_text.endswith(">"): | |
| try: | |
| # Convert hex byte to character | |
| hex_val = token_text[3:-1] | |
| byte_val = int(hex_val, 16) | |
| return chr(byte_val) if 32 <= byte_val <= 126 else f"[{hex_val}]" | |
| except: | |
| return token_text | |
| # Handle other special cases | |
| if "�" in token_text: # Unicode replacement character | |
| return token_text.replace("�", "?") | |
| return token_text | |