Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import gradio as gr | |
| from datasets import load_dataset, load_metric, Audio, concatenate_datasets, Dataset | |
| from transformers import Wav2Vec2CTCTokenizer, Wav2Vec2FeatureExtractor, Wav2Vec2Processor, Wav2Vec2ForCTC, TrainingArguments, Trainer | |
| import json | |
| import torch | |
| from dataclasses import dataclass, field | |
| from typing import Any, Dict, List, Optional, Union | |
| import random | |
| import argparse | |
| import pandas as pd | |
| import os | |
| import multiprocess | |
| import json | |
| from typing import List, Optional | |
| from transformers.tokenization_utils import PreTrainedTokenizer | |
| from transformers.tokenization_utils_base import AddedToken | |
| class Wav2Vec2CTCTokenizer(Wav2Vec2CTCTokenizer): | |
| def _decode( | |
| self, | |
| token_ids: list[int], | |
| skip_special_tokens: bool = False, | |
| clean_up_tokenization_spaces: Optional[bool] = None, | |
| group_tokens: bool = True, | |
| spaces_between_special_tokens: bool = False, | |
| output_word_offsets: Optional[bool] = False, | |
| output_char_offsets: Optional[bool] = False, | |
| ) -> str: | |
| """ | |
| special _decode function is needed for Wav2Vec2Tokenizer because added tokens should be treated exactly the | |
| same as tokens of the base vocabulary and therefore the function `convert_tokens_to_string` has to be called on | |
| the whole token list and not individually on added tokens | |
| """ | |
| filtered_tokens = self.convert_ids_to_tokens(token_ids, skip_special_tokens=skip_special_tokens) | |
| result = [] | |
| for token in filtered_tokens: | |
| if skip_special_tokens and ( | |
| token in self.all_special_ids or (token != self.pad_token and token in self.all_special_tokens) | |
| ): | |
| continue | |
| result.append(token) | |
| string_output = self.convert_tokens_to_string( | |
| result, | |
| group_tokens=group_tokens, | |
| spaces_between_special_tokens=spaces_between_special_tokens, | |
| output_word_offsets=output_word_offsets, | |
| output_char_offsets=output_char_offsets, | |
| ) | |
| text = string_output["text"] | |
| clean_up_tokenization_spaces = ( | |
| clean_up_tokenization_spaces | |
| if clean_up_tokenization_spaces is not None | |
| else self.clean_up_tokenization_spaces | |
| ) | |
| if clean_up_tokenization_spaces: | |
| text = self.clean_up_tokenization(text) | |
| if output_word_offsets or output_char_offsets: | |
| return Wav2Vec2CTCTokenizerOutput( | |
| text=text, | |
| char_offsets=string_output["char_offsets"], | |
| word_offsets=string_output["word_offsets"], | |
| ) | |
| else: | |
| return text | |
| import torch | |
| import warnings | |
| from torch import nn # needed only if you add extra layers | |
| from transformers import ( | |
| Wav2Vec2ForCTC, # base model we extend | |
| Wav2Vec2Config, # type hinting & standalone instantiation | |
| Wav2Vec2Model, | |
| logging as hf_logging # optional: nicer error messages | |
| ) | |
| from transformers.utils import ( | |
| auto_docstring, | |
| ) | |
| from transformers.modeling_outputs import ( | |
| CausalLMOutput, | |
| ) | |
| class Wav2Vec2ForCTC24Heads(Wav2Vec2ForCTC): | |
| """ | |
| Same encoder as Wav2Vec2ForCTC but with 24 parallel lm-heads and | |
| an aggregated CTC loss. | |
| Expected `labels` shape : (batch, 24, target_len) | |
| Returned `logits` shape : (batch, 24, time, vocab_size) | |
| """ | |
| def __init__(self, config, num_heads: int = 24, target_lang: Optional[str] = None): | |
| super().__init__(config) | |
| self.wav2vec2 = Wav2Vec2Model(config) | |
| self.dropout = nn.Dropout(config.final_dropout) | |
| self.target_lang = target_lang | |
| if config.vocab_size is None: | |
| raise ValueError( | |
| f"You are trying to instantiate {self.__class__} with a configuration that " | |
| "does not define the vocabulary size of the language model head. Please " | |
| "instantiate the model as follows: `Wav2Vec2ForCTC.from_pretrained(..., vocab_size=vocab_size)`. " | |
| "or define `vocab_size` of your model's configuration." | |
| ) | |
| output_hidden_size = ( | |
| config.output_hidden_size if hasattr(config, "add_adapter") and config.add_adapter else config.hidden_size | |
| ) | |
| self.num_heads = num_heads | |
| # Replace the single head with a ModuleList of heads | |
| self.lm_head = nn.ModuleList( | |
| [nn.Linear(output_hidden_size, config.vocab_size) for _ in range(num_heads)] | |
| ) | |
| def freeze_feature_extractor(self): | |
| """ | |
| Calling this function will disable the gradient computation for the feature encoder so that its parameters will | |
| not be updated during training. | |
| """ | |
| warnings.warn( | |
| "The method `freeze_feature_extractor` is deprecated and will be removed in Transformers v5. " | |
| "Please use the equivalent `freeze_feature_encoder` method instead.", | |
| FutureWarning, | |
| ) | |
| self.freeze_feature_encoder() | |
| def forward( | |
| self, | |
| input_values: Optional[torch.Tensor], | |
| attention_mask: Optional[torch.Tensor] = None, | |
| output_attentions: Optional[bool] = None, | |
| output_hidden_states: Optional[bool] = None, | |
| return_dict: Optional[bool] = None, | |
| labels: Optional[torch.Tensor] = None, | |
| ) -> Union[tuple, CausalLMOutput]: | |
| r""" | |
| labels (`torch.LongTensor` of shape `(batch_size, target_length)`, *optional*): | |
| Labels for connectionist temporal classification. Note that `target_length` has to be smaller or equal to | |
| the sequence length of the output logits. Indices are selected in `[-100, 0, ..., config.vocab_size - 1]`. | |
| All labels set to `-100` are ignored (masked), the loss is only computed for labels in `[0, ..., | |
| config.vocab_size - 1]`. | |
| """ | |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
| if labels is not None and labels.max() >= self.config.vocab_size: | |
| raise ValueError(f"Label values must be <= vocab_size: {self.config.vocab_size}") | |
| outputs = self.wav2vec2( | |
| input_values, | |
| attention_mask=attention_mask, | |
| output_attentions=output_attentions, | |
| output_hidden_states=output_hidden_states, | |
| return_dict=return_dict, | |
| ) | |
| hidden_states = outputs[0] | |
| hidden_states = self.dropout(hidden_states) | |
| logits = torch.stack( | |
| [head(hidden_states) for head in self.lm_head], # list[B,T,V] | |
| dim=1 # -> (B, 24, T, V) | |
| ) | |
| loss = None | |
| if labels is not None: | |
| # retrieve loss input_lengths from attention_mask | |
| attention_mask = ( | |
| attention_mask if attention_mask is not None else torch.ones_like(input_values, dtype=torch.long) | |
| ) | |
| input_lengths = self._get_feat_extract_output_lengths(attention_mask.sum(-1)).to(torch.long) | |
| loss_list = [] | |
| for h in range(self.num_heads): | |
| # grab labels for this head: (B, target_len) | |
| lab = labels[:, h] | |
| # mask – targets for CTC must be 1-D | |
| # assuming that padded tokens are filled with -100 | |
| # when not being attended to | |
| lab_mask = lab >= 0 | |
| target_lengths = lab_mask.sum(-1) | |
| flat_targets = lab.masked_select(lab_mask) | |
| log_probs = nn.functional.log_softmax(logits[:, h], dim=-1).transpose(0, 1) # (T,B,V) | |
| with torch.backends.cudnn.flags(enabled=False): | |
| head_loss = nn.functional.ctc_loss( | |
| log_probs, | |
| flat_targets, | |
| input_lengths, | |
| target_lengths, | |
| blank=self.config.pad_token_id, | |
| reduction="mean", # per-head loss | |
| zero_infinity=self.config.ctc_zero_infinity, | |
| ) | |
| loss_list.append(head_loss) | |
| loss = torch.stack(loss_list).mean() # aggregate | |
| batch_preds = [] # will become length B | |
| for b in range(logits.size(0)): | |
| head_preds = [] # will become length 24 | |
| for h in range(logits.size(1)): | |
| ids = logits[b, h].argmax(dim=-1) # (T,) | |
| head_preds.append(ids) # accumulate each head | |
| head_preds = torch.stack(head_preds) # (24, T) ← “vector” of heads | |
| batch_preds.append(head_preds) | |
| batch_preds = torch.stack(batch_preds) # (B, 24, T) | |
| if not return_dict: | |
| output = (logits,) + outputs[_HIDDEN_STATES_START_POSITION:] | |
| return ((loss,) + output) if loss is not None else output | |
| return CausalLMOutput( | |
| loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions | |
| ) | |
| from dataclasses import dataclass | |
| from typing import Dict, List, Union | |
| import torch | |
| from transformers import Wav2Vec2Processor | |
| class DataCollatorCTCWithPadding: | |
| """ | |
| Data collator that will dynamically pad the inputs received. | |
| Args: | |
| processor (:class:`~transformers.Wav2Vec2Processor`) | |
| The processor used for proccessing the data. | |
| padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`): | |
| Select a strategy to pad the returned sequences (according to the model's padding side and padding index) | |
| among: | |
| * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single | |
| sequence if provided). | |
| * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the | |
| maximum acceptable input length for the model if that argument is not provided. | |
| * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of | |
| different lengths). | |
| max_length (:obj:`int`, `optional`): | |
| Maximum length of the ``input_values`` of the returned list and optionally padding length (see above). | |
| max_length_labels (:obj:`int`, `optional`): | |
| Maximum length of the ``labels`` returned list and optionally padding length (see above). | |
| pad_to_multiple_of (:obj:`int`, `optional`): | |
| If set will pad the sequence to a multiple of the provided value. | |
| This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >= | |
| 7.5 (Volta). | |
| """ | |
| processor: Wav2Vec2Processor | |
| padding: Union[bool, str] = True | |
| max_length: Optional[int] = None | |
| max_length_labels: Optional[int] = None | |
| pad_to_multiple_of: Optional[int] = None | |
| pad_to_multiple_of_labels: Optional[int] = None | |
| def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]: | |
| # Split inputs and labels since they have to be of different lengths | |
| # and need different padding methods | |
| input_features = [{"input_values": feature["input_values"]} for feature in features] | |
| label_features = [{"input_ids": feature["labels"]} for feature in features] | |
| batch = self.processor.pad( | |
| input_features, | |
| padding=self.padding, | |
| max_length=self.max_length, | |
| pad_to_multiple_of=self.pad_to_multiple_of, | |
| return_tensors="pt", | |
| ) | |
| with self.processor.as_target_processor(): | |
| labels_batch = self.processor.pad( | |
| label_features, | |
| padding=self.padding, | |
| max_length=self.max_length_labels, | |
| pad_to_multiple_of=self.pad_to_multiple_of_labels, | |
| return_tensors="pt", | |
| ) | |
| # Replace padding with -100 to ignore loss correctly | |
| labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100) | |
| batch["labels"] = labels | |
| return batch | |
| class DataCollator24CTC(DataCollatorCTCWithPadding): | |
| processor: Wav2Vec2Processor | |
| padding: Union[bool, str] = True | |
| max_length: Optional[int] = None | |
| max_length_labels: Optional[int] = None | |
| pad_to_multiple_of: Optional[int] = None | |
| pad_to_multiple_of_labels: Optional[int] = None | |
| num_heads: int = 24 | |
| def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]: | |
| # Split inputs and labels since they have to be of different lengths | |
| # and need different padding methods | |
| input_features = [{"input_values": feature["input_values"]} for feature in features] | |
| batch = self.processor.pad( | |
| input_features, | |
| padding=self.padding, | |
| max_length=self.max_length, | |
| pad_to_multiple_of=self.pad_to_multiple_of, | |
| return_tensors="pt", | |
| ) | |
| all_labels = [] | |
| for h in range(self.num_heads): | |
| label_features_h = [{"input_ids": feature["labels"][h]} for feature in features] | |
| with self.processor.as_target_processor(): | |
| labels_batch = self.processor.pad( | |
| label_features_h, | |
| padding=self.padding, | |
| max_length=self.max_length_labels, | |
| pad_to_multiple_of=self.pad_to_multiple_of_labels, | |
| return_tensors="pt", | |
| ) | |
| padded_ids = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100) | |
| all_labels.append(padded_ids) | |
| # Stack to (num_heads, batch, seq_len) -> then permute to (batch, num_heads, seq_len) | |
| labels = torch.stack(all_labels).permute(1, 0, 2) | |
| batch['labels'] = labels | |
| return batch | |
| import os | |
| import json | |
| import random | |
| from pathlib import Path | |
| from typing import List | |
| import numpy as np | |
| import torchaudio, torchaudio.transforms as T | |
| from datasets import Dataset, Features, Sequence, Value, load_from_disk, concatenate_datasets | |
| # ------------------------------------------------------------------ | |
| # 1) Audio helpers | |
| # ------------------------------------------------------------------ | |
| def load_and_standardise(path: str | Path, target_sr: int = 16_000) -> list[float]: | |
| """ | |
| • Loads `path` with torchaudio | |
| • Resamples to `target_sr` if necessary | |
| • Converts to mono (mean over channels) | |
| • Standardises to zero-mean / unit-var | |
| • Returns a *Python list* of floats so it is JSON-serialisable | |
| """ | |
| try: | |
| torchaudio.set_audio_backend("sox_io") | |
| except RuntimeError: | |
| raise ImportError("To support decoding 'mp3' audio files, please install 'sox'.") | |
| array, sampling_rate = torchaudio.load(path) | |
| if sampling_rate != 16000: | |
| array = T.Resample(sampling_rate, 16000)(array) | |
| array = array.numpy() | |
| array = array.mean(axis=0) | |
| return array.tolist() | |
| # -------------------------------------------------------------- | |
| # 2) Streaming readers (JSON array or NDJSON) | |
| # -------------------------------------------------------------- | |
| def iter_entries(json_path: str | Path): | |
| """ | |
| Yield entries from either a single JSON array file or an NDJSON file. | |
| Streaming line-by-line for NDJSON so we never hold the whole file in RAM. | |
| """ | |
| p = Path(json_path) | |
| txt = p.read_text(encoding="utf-8") | |
| try: | |
| data = json.loads(txt) | |
| if isinstance(data, list): | |
| for obj in data: | |
| yield obj | |
| else: | |
| yield data | |
| except json.JSONDecodeError: | |
| for ln in txt.splitlines(): | |
| ln = ln.strip() | |
| if ln: | |
| yield json.loads(ln) | |
| # -------------------------------------------------------------- | |
| # 3) Stage-1: process one source once and cache to disk (Arrow) | |
| # -------------------------------------------------------------- | |
| def preprocess_source_to_cache( | |
| json_path: str | Path, | |
| processor: Wav2Vec2Processor, | |
| cache_root: str | Path, | |
| source_tag: str, # any stable name (e.g. 'en', 'jp', 'doreco-an') | |
| ) -> Path: | |
| """ | |
| Stream over entries in json_path, fully decode audio and convert labels to IDs. | |
| Save as a HuggingFace dataset to disk (memory-mapped Arrow). | |
| Returns the folder path created by `save_to_disk()`. | |
| """ | |
| cache_root = Path(cache_root) | |
| cache_root.mkdir(parents=True, exist_ok=True) | |
| save_path = cache_root / f"cache_{source_tag}" | |
| save_path.mkdir(parents=True, exist_ok=True) | |
| # If cache already exists, skip reprocessing to save time. | |
| if (save_path / "dataset_info.json").exists(): | |
| print(f"[cache] Using existing cache: {save_path}") | |
| return save_path | |
| else: | |
| if save_path.exists(): | |
| import shutil; shutil.rmtree(save_path) | |
| save_path.mkdir(parents=True, exist_ok=True) | |
| def row_generator(): | |
| for obj in iter_entries(json_path): | |
| # Expect {"path": "...", "ipa": <matrix or whatever your build used>} | |
| ipa_matrix = obj.get("ipa", []) | |
| if not ipa_matrix: | |
| continue | |
| # your original: matrix was [segments x 22]; you transposed and stringified | |
| transpose = [list(row) for row in zip(*ipa_matrix)] | |
| transpose_str = [[str(tok) for tok in head] for head in transpose] | |
| # Decode audio once (as requested) | |
| audio = load_and_standardise(obj["path"]) | |
| # Cast to float32 for Arrow efficiency | |
| audio = np.asarray(audio, dtype=np.float32) | |
| # Convert labels to IDs once (keep nested per-head if your collator expects it) | |
| label_ids: List[List[int]] = [] | |
| for head in transpose_str: | |
| with processor.as_target_processor(): | |
| ids = processor(head).input_ids | |
| # ids might be [[id]]; unwrap if needed: | |
| ids = [tok[0] if isinstance(tok, list) else tok for tok in ids] | |
| label_ids.append(ids) | |
| yield { | |
| "input_values": audio, # variable length float32 | |
| "labels": label_ids, # list[list[int]] | |
| "source": source_tag, # keep origin | |
| } | |
| # Features: variable-length floats + nested variable-length ints | |
| features = Features({ | |
| "input_values": Sequence(Value("float32")), | |
| "labels": Sequence(Sequence(Value("int32"))), | |
| "source": Value("string"), | |
| }) | |
| rows, chunks = [], [] | |
| for row in row_generator(): # <- your existing generator | |
| rows.append(row) | |
| if len(rows) >= 5_000: # tune shard size to your RAM | |
| chunks.append(Dataset.from_list(rows)) | |
| rows = [] # free current chunk | |
| if rows: # tail of the stream | |
| chunks.append(Dataset.from_list(rows)) | |
| ds = concatenate_datasets(chunks) # single Dataset object | |
| ds.save_to_disk(save_path.as_posix()) # writes Arrow to local FS | |
| print(f"[cache] Wrote {len(ds)} rows → {save_path}") | |
| return save_path | |
| # -------------------------------------------------------------- | |
| # 4) Stage-2: build a weighted dataset from cached sources | |
| # (no re-decoding, no in-RAM duplication) | |
| # -------------------------------------------------------------- | |
| def build_weighted_dataset_from_cache( | |
| cache_paths: list[str | Path], | |
| percentages: list[float], | |
| *, | |
| seed: int = 42 | |
| ) -> Dataset: | |
| """ | |
| For each cached source dataset: | |
| pct >= 100 → full copies n_full times + fractional random subset | |
| pct < 100 → fractional random subset only | |
| All operations are Arrow-backed (memory-mapped), so no RAM blow-ups. | |
| """ | |
| assert len(cache_paths) == len(percentages) | |
| rng = random.Random(seed) | |
| per_source_weighted = [] | |
| for cache_path, pct in zip(cache_paths, percentages): | |
| ds = load_from_disk(str(cache_path)) | |
| N = len(ds) | |
| if N == 0 or pct <= 0: | |
| continue | |
| n_full = int(pct // 100) | |
| frac = (pct % 100) / 100.0 | |
| n_frac = round(N * frac) | |
| parts = [] | |
| # Full copies: concatenate the same dataset handle N times (no decode) | |
| if n_full > 0: | |
| parts.extend([ds] * n_full) | |
| # Fractional random subset (no decode) | |
| if n_frac > 0: | |
| idxs = rng.sample(range(N), n_frac) | |
| parts.append(ds.select(idxs)) | |
| if not parts: | |
| continue | |
| ds_weighted = parts[0] if len(parts) == 1 else concatenate_datasets(parts) | |
| per_source_weighted.append(ds_weighted) | |
| print(f"[weight] {cache_path} → {len(ds_weighted)} rows " | |
| f"(full×{n_full} + frac {n_frac}/{N})") | |
| # Final training set = concat of all weighted sources | |
| if not per_source_weighted: | |
| raise RuntimeError("No data after weighting.") | |
| train_ds = per_source_weighted[0] if len(per_source_weighted) == 1 \ | |
| else concatenate_datasets(per_source_weighted) | |
| # Optional: shuffle once for training | |
| train_ds = train_ds.shuffle(seed=seed) | |
| print(f"[train] Total rows: {len(train_ds)}") | |
| return train_ds | |
| vocab_file = "dummy_vocab.json" | |
| feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, | |
| sampling_rate=16_000, | |
| padding_value=0.0, | |
| do_normalize=True, | |
| return_attention_mask=True) | |
| tokenizer_ipa = Wav2Vec2CTCTokenizer("./{}".format(vocab_file), | |
| unk_token="[UNK]", | |
| pad_token="[PAD]", | |
| word_delimiter_token="|") | |
| processor_ipa = Wav2Vec2Processor(feature_extractor=feature_extractor, | |
| tokenizer=tokenizer_ipa) | |
| import numpy as np | |
| from phd_model.phonetics.ipa import symbol_to_descriptor, to_symbol | |
| from phd_model.model.wav2vec2 import Wav2Vec2 | |
| from transformers import Wav2Vec2Processor | |
| import torchaudio, torchaudio.transforms as T | |
| from torchinfo import summary | |
| import torch | |
| import re | |
| ckpt_dir = "anim400k_train_v2" | |
| # Get device | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # Load model from Huggingface hub | |
| wav2vec2 = Wav2Vec2ForCTC24Heads.from_pretrained(ckpt_dir) | |
| processor = Wav2Vec2Processor.from_pretrained(ckpt_dir) | |
| wav2vec2.to(device) | |
| wav2vec2.eval() | |
| # Print model summary for batch_size 1 and a single second of audio samples | |
| summary(wav2vec2, input_size=(1, 16_000), depth=8, device=device) | |
| # Create new random audio (you can load your own audio here to get actual predictions) | |
| #rand_audio = np.random.rand(1, 16_000) | |
| def generate_tensor(audio_path: str): | |
| #audio_path = "/workspace/F5-TTS/data/marrazki_custom/wavs/segment_3153.wav" | |
| #rand_audio = load_and_standardise(audio_path) | |
| #rand_audio, sr = torchaudio.load(audio_path) | |
| try: | |
| torchaudio.set_audio_backend("sox_io") | |
| except RuntimeError: | |
| raise ImportError("To support decoding 'mp3' audio files, please install 'sox'.") | |
| array, sampling_rate = torchaudio.load(audio_path) | |
| if sampling_rate != 16000: | |
| array = T.Resample(sampling_rate, 16000)(array) | |
| array = array.numpy() | |
| array = array.mean(axis=0, keepdims=True) | |
| # Create torch tensor, move to device and feed the model | |
| array = torch.tensor( | |
| array, | |
| dtype=torch.float, | |
| device=device, | |
| ) | |
| print(array) | |
| with torch.no_grad(): | |
| out = wav2vec2(array) | |
| logits = out.logits | |
| # regular–expression that finds either the 2‑char token "-1" | |
| # OR any single char in 0,1,| | |
| token_re = re.compile(r"-1|[01\|]") | |
| batch_tokens = [] # final matrix (B × 24) | |
| for b in range(logits.size(0)): | |
| head_tokens = [] # 24 rows for this utterance | |
| for h in range(logits.size(1)): | |
| # ---------- 1) arg‑max & CTC collapse → string ---------- | |
| ids = logits[b, h].argmax(dim=-1).cpu().tolist() | |
| #text = processor._decode( | |
| # ids, | |
| #) | |
| text = tokenizer_ipa._decode(token_ids = ids) | |
| # ---------- 2) split the string into symbols ---------- | |
| symbols = token_re.findall(text) # e.g. ['-1', '1', '-1', '-1', …] | |
| head_tokens.append(symbols) | |
| batch_tokens.append(head_tokens) | |
| batch_data = [[[int(val) for val in row] for row in matrix] for matrix in batch_tokens] | |
| print(f"batch_data : {batch_data}") | |
| # Convert to a PyTorch tensor | |
| batch_tensor = torch.tensor(batch_data) | |
| return batch_tensor | |
| """ | |
| vector2ipa.py | |
| ============= | |
| Map articulatory feature vectors (shape ≡ [*, 22]) to IPA symbols. | |
| * If a row is an **exact** match for a symbol’s feature vector, | |
| return that symbol. | |
| * Otherwise compute the Levenshtein distance between the input | |
| vector and every known IPA vector and choose the symbol with | |
| the minimum distance. | |
| Requires: panphon (pip install panphon) | |
| numpy (only for dtype / convenience, but any tensor works) | |
| Author: <you> | |
| """ | |
| import numpy as np | |
| from typing import Iterable, List, Sequence, Tuple | |
| import panphon # -- main feature database | |
| from panphon.segment import Segment # convenient Segment wrapper | |
| # -------------------------------------------------------------------- | |
| # helpers | |
| # -------------------------------------------------------------------- | |
| def _levenshtein(a: Sequence[int], b: Sequence[int]) -> int: | |
| """Classic O(m·n) Levenshtein distance for two sequences of ints.""" | |
| m, n = len(a), len(b) | |
| prev = list(range(n + 1)) | |
| curr = [0] * (n + 1) | |
| for i in range(1, m + 1): | |
| curr[0] = i | |
| for j in range(1, n + 1): | |
| cost = 0 if a[i - 1] == b[j - 1] else 1 | |
| curr[j] = min( | |
| curr[j - 1] + 1, # insertion | |
| prev[j] + 1, # deletion | |
| prev[j - 1] + cost # substitution | |
| ) | |
| prev, curr = curr, prev # reuse buffers | |
| return prev[n] | |
| def _as_int_vector(raw): | |
| """Convert a PanPhon vector (numeric or ±0 string form) to a tuple of ints.""" | |
| if isinstance(raw[0], int): | |
| return tuple(int(x) for x in raw) | |
| map_sym = {'+': 1, '-': -1, '0': 0} | |
| return tuple(map_sym[x] for x in raw) | |
| def _build_inventory(ft): | |
| ipa_syms, ipa_vecs = [], [] | |
| # ❶ Whatever version we’re on, get *something* iterable | |
| seg_iter = getattr(ft, "segments", None) or getattr(ft, "_segments", None) | |
| if seg_iter is None: | |
| raise RuntimeError("Can't locate segment inventory on this PanPhon version.") | |
| for item in seg_iter: | |
| # ❷ Newer PanPhon: item = (symbol:str, Segment) | |
| # Older PanPhon: item = symbol:str | |
| symbol = item[0] if isinstance(item, tuple) else item | |
| # ❸ Grab the canonical 22-feature vector | |
| try: | |
| raw = ft.segment_to_vector(symbol) # post-0.22 | |
| except TypeError: | |
| raw = ft.segment_to_vector(symbol, True) # ≤0.21 fallback | |
| if raw is None: # skip tones, length marks… | |
| continue | |
| ipa_syms.append(symbol) | |
| ipa_vecs.append(_as_int_vector(raw)) # → tuple[int, …] | |
| return ipa_syms, ipa_vecs | |
| # -------------------------------------------------------------------- | |
| # public API | |
| # -------------------------------------------------------------------- | |
| def vectors_to_ipa( | |
| tensor: Iterable[Sequence[int]], | |
| ft: panphon.FeatureTable | None = None, | |
| ) -> List[str]: | |
| """ | |
| Parameters | |
| ---------- | |
| tensor | |
| Any iterable yielding rows of 22 ints (values −1/0/+1). | |
| Works with: | |
| * list[list[int]] | |
| * numpy.ndarray (shape [N,22] or [22]) | |
| * torch.Tensor (dtype=torch.int8 / int16 / int32) | |
| * etc. | |
| ft | |
| Optionally pass in a pre-constructed FeatureTable so you | |
| don’t pay the I/O cost repeatedly. | |
| Returns | |
| ------- | |
| List[str] | |
| The IPA symbol that best matches each input row. | |
| """ | |
| # 🗄️ Load feature database exactly once | |
| ft = ft or panphon.FeatureTable() | |
| ipa_syms, ipa_vecs = _build_inventory(ft) | |
| # ⚡ Small dict for constant-time exact look-ups | |
| exact_lookup = {v: s for s, v in zip(ipa_syms, ipa_vecs)} | |
| results: List[str] = [] | |
| for row in tensor: | |
| vec = tuple(int(x) for x in row) # normalise dtype | |
| # 1️⃣ Exact hit? | |
| if vec in exact_lookup: | |
| results.append(exact_lookup[vec]) | |
| continue | |
| # 2️⃣ Nearest neighbour by Levenshtein distance | |
| best_sym, best_dist = None, float("inf") | |
| for ref_vec, sym in zip(ipa_vecs, ipa_syms): | |
| d = _levenshtein(vec, ref_vec) | |
| if d < best_dist: | |
| best_dist, best_sym = d, sym | |
| if d == 0: # early exit | |
| break | |
| results.append(f"{best_sym}") | |
| # Print results (per brief) and return in case caller needs them | |
| symbols_str = " ".join(results) | |
| #print(symbols_str) | |
| return symbols_str | |
| def transcribe_to_ipa(audio_path): | |
| batch_tensor = generate_tensor(audio_path) | |
| batch_tensor = batch_tensor.squeeze(0) | |
| symbols = vectors_to_ipa(batch_tensor.t()) | |
| return symbols | |
| demo = gr.Interface(fn=transcribe_to_ipa, inputs=gr.Audio(type="filepath"), outputs="text") | |
| demo.launch(share=True) |