Spaces:
Running
Running
| # /// script | |
| # requires-python = ">=3.11" | |
| # dependencies = [ | |
| # "httpx", | |
| # "huggingface_hub", | |
| # ] | |
| # /// | |
| """ | |
| Scheduled job: regenerate data.json and upload to the benchmark-race Space. | |
| Run locally: | |
| uv run update_data.py | |
| Schedule on HF Jobs (twice daily): | |
| hf jobs scheduled uv run "0 8,20 * * *" \ | |
| --secrets HF_TOKEN \ | |
| https://huggingface.co/spaces/davanstrien/benchmark-race/resolve/main/update_data.py | |
| """ | |
| import json | |
| import os | |
| import re | |
| import tempfile | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| import httpx | |
| from huggingface_hub import HfApi | |
| SPACE_REPO = "davanstrien/benchmark-race" | |
| # Benchmarks are auto-discovered from datasets tagged `benchmark:official` on | |
| # the Hub. The originals get keys preserved so the UI's hardcoded default | |
| # (`sweVerified` in index.html) keeps working; new benchmarks get | |
| # slugified keys and a name from cardData.pretty_name (or basename). | |
| OVERRIDES = { | |
| "SWE-bench/SWE-bench_Verified": ("sweVerified", "SWE-bench Verified"), | |
| "ScaleAI/SWE-bench_Pro": ("swePro", "SWE-bench Pro"), | |
| "TIGER-Lab/MMLU-Pro": ("mmluPro", "MMLU-Pro"), | |
| "Idavidrein/gpqa": ("gpqa", "GPQA Diamond"), | |
| "cais/hle": ("hle", "HLE"), | |
| "MathArena/aime_2026": ("aime2026", "AIME 2026"), | |
| "MathArena/hmmt_feb_2026": ("hmmt2026", "HMMT Feb 2026"), | |
| "allenai/olmOCR-bench": ("olmOcr", "olmOCR-bench"), | |
| "harborframework/terminal-bench-2.0": ("terminalBench", "Terminal-Bench 2.0"), | |
| "FutureMa/EvasionBench": ("evasionBench", "EvasionBench"), | |
| } | |
| MIN_MODELS = 2 | |
| def slugify(dataset_id: str) -> str: | |
| base = dataset_id.split("/")[-1] | |
| s = re.sub(r"[^a-zA-Z0-9]+", "_", base).strip("_") | |
| return s or dataset_id.replace("/", "_") | |
| def discover_benchmarks(hf_token: str | None) -> list[dict]: | |
| """Fetch every benchmark:official dataset with a usable leaderboard.""" | |
| print("Discovering official benchmarks...") | |
| resp = httpx.get( | |
| "https://huggingface.co/api/datasets", | |
| params={"filter": "benchmark:official", "limit": 500}, | |
| timeout=30, | |
| ) | |
| resp.raise_for_status() | |
| datasets = resp.json() | |
| print(f" found {len(datasets)} datasets with benchmark:official tag") | |
| configs = [] | |
| for d in datasets: | |
| did = d["id"] | |
| try: | |
| info = httpx.get(f"https://huggingface.co/api/datasets/{did}", timeout=15).json() | |
| except Exception as e: | |
| print(f" {did}: skipped (info fetch failed: {e})") | |
| continue | |
| gated = bool(info.get("gated")) | |
| card = info.get("cardData") or {} | |
| if did in OVERRIDES: | |
| key, pretty = OVERRIDES[did] | |
| else: | |
| key = slugify(did) | |
| pretty = card.get("pretty_name") or did.split("/")[-1] | |
| headers = {"Authorization": f"Bearer {hf_token}"} if (gated and hf_token) else {} | |
| if gated and not hf_token: | |
| print(f" {did}: skipped (gated, no token)") | |
| continue | |
| try: | |
| lb = httpx.get( | |
| f"https://huggingface.co/api/datasets/{did}/leaderboard", | |
| headers=headers, | |
| timeout=30, | |
| ) | |
| except Exception as e: | |
| print(f" {did}: skipped (leaderboard fetch failed: {e})") | |
| continue | |
| if lb.status_code != 200: | |
| print(f" {did}: skipped (status {lb.status_code})") | |
| continue | |
| rows = lb.json() | |
| if not isinstance(rows, list) or len(rows) < MIN_MODELS: | |
| print(f" {did}: skipped (only {len(rows) if isinstance(rows, list) else '?'} rows)") | |
| continue | |
| lower_is_better = False | |
| for r in rows: | |
| if isinstance(r, dict) and "lower_is_better" in r: | |
| lower_is_better = bool(r["lower_is_better"]) | |
| break | |
| configs.append({ | |
| "dataset": did, | |
| "key": key, | |
| "name": pretty, | |
| "gated": gated, | |
| "lower_is_better": lower_is_better, | |
| }) | |
| print(f" {did} -> {key} ({len(rows)} rows, lower_is_better={lower_is_better})") | |
| return configs | |
| PALETTE = [ | |
| "#6366f1", "#0d9488", "#d97706", "#e11d48", "#7c3aed", | |
| "#16a34a", "#2563eb", "#ea580c", "#8b5cf6", "#0891b2", | |
| "#c026d3", "#65a30d", "#dc2626", "#0284c7", "#a21caf", | |
| "#059669", "#9333ea", "#ca8a04", "#be185d", "#0369a1", | |
| ] | |
| def fetch_leaderboard(config: dict, hf_token: str | None) -> list[dict]: | |
| url = f"https://huggingface.co/api/datasets/{config['dataset']}/leaderboard" | |
| headers = {} | |
| if config["gated"] and hf_token: | |
| headers["Authorization"] = f"Bearer {hf_token}" | |
| elif config["gated"]: | |
| print(f" {config['name']}: skipped (gated, no token)") | |
| return [] | |
| print(f" {config['name']}: fetching scores...") | |
| try: | |
| resp = httpx.get(url, headers=headers, timeout=30) | |
| if resp.status_code != 200: | |
| print(f" skip (status {resp.status_code})") | |
| return [] | |
| data = resp.json() | |
| if not isinstance(data, list): | |
| return [] | |
| except Exception as e: | |
| print(f" error: {e}") | |
| return [] | |
| lower = config.get("lower_is_better", False) | |
| seen: dict[str, float] = {} | |
| for entry in data: | |
| if not isinstance(entry, dict): | |
| continue | |
| model_id = entry.get("modelId") | |
| score = entry.get("value") | |
| if model_id and score is not None: | |
| try: | |
| score = float(score) | |
| except (TypeError, ValueError): | |
| continue | |
| if model_id not in seen: | |
| seen[model_id] = score | |
| elif (lower and score < seen[model_id]) or (not lower and score > seen[model_id]): | |
| seen[model_id] = score | |
| print(f" {len(seen)} models") | |
| return [{"model_id": mid, "score": s} for mid, s in seen.items()] | |
| def fetch_model_dates(model_ids: list[str], hf_token: str | None) -> dict[str, dict]: | |
| api = HfApi() | |
| results = {} | |
| def _get_info(mid): | |
| try: | |
| info = api.model_info(mid, token=hf_token) | |
| params_b = None | |
| if info.safetensors and hasattr(info.safetensors, "total"): | |
| params_b = round(info.safetensors.total / 1_000_000_000, 1) | |
| if params_b is None: | |
| m = re.findall(r"[-_/](\d+\.?\d*)[Bb](?:[-_/]|$)", mid) | |
| if m: | |
| params_b = max(float(x) for x in m) | |
| is_quantized = any(t.startswith("base_model:quantized:") for t in (info.tags or [])) | |
| return mid, info.created_at.strftime("%Y-%m-%d"), params_b, is_quantized | |
| except Exception: | |
| return mid, None, None, False | |
| with ThreadPoolExecutor(max_workers=8) as pool: | |
| futures = {pool.submit(_get_info, mid): mid for mid in model_ids} | |
| for f in as_completed(futures): | |
| mid, date, params, is_quantized = f.result() | |
| if date: | |
| results[mid] = {"date": date, "parameters_b": params, "is_quantized": is_quantized} | |
| return results | |
| def fetch_logo(provider: str) -> str | None: | |
| try: | |
| resp = httpx.get( | |
| f"https://huggingface.co/api/organizations/{provider}/avatar", | |
| timeout=5, | |
| ) | |
| if resp.status_code == 200: | |
| return resp.json().get("avatarUrl") | |
| except Exception: | |
| pass | |
| return None | |
| def fetch_all_logos(providers: set[str]) -> dict[str, str]: | |
| logos = {} | |
| with ThreadPoolExecutor(max_workers=8) as pool: | |
| futures = {pool.submit(fetch_logo, p): p for p in providers} | |
| for f in as_completed(futures): | |
| p = futures[f] | |
| url = f.result() | |
| if url: | |
| logos[p] = url | |
| return logos | |
| def main(): | |
| hf_token = os.environ.get("HF_TOKEN") | |
| print("Generating data.json for bar chart race\n") | |
| benchmark_configs = discover_benchmarks(hf_token) | |
| print(f"\n{len(benchmark_configs)} usable benchmarks\n") | |
| all_scores: dict[str, dict] = {} | |
| all_model_ids: set[str] = set() | |
| for config in benchmark_configs: | |
| rows = fetch_leaderboard(config, hf_token) | |
| if rows: | |
| all_scores[config["key"]] = { | |
| "name": config["name"], | |
| "dataset": config["dataset"], | |
| "lower_is_better": config["lower_is_better"], | |
| "rows": rows, | |
| } | |
| all_model_ids.update(r["model_id"] for r in rows) | |
| print(f"\n{len(all_model_ids)} unique models across {len(all_scores)} benchmarks") | |
| print("Fetching model dates...") | |
| model_dates = fetch_model_dates(list(all_model_ids), hf_token) | |
| print(f" got dates for {len(model_dates)}/{len(all_model_ids)} models") | |
| all_providers: set[str] = set() | |
| benchmarks = {} | |
| for key, info in all_scores.items(): | |
| models = [] | |
| for row in info["rows"]: | |
| mid = row["model_id"] | |
| if mid not in model_dates: | |
| continue | |
| if model_dates[mid].get("is_quantized"): | |
| continue | |
| provider = mid.split("/")[0] if "/" in mid else mid | |
| short_name = mid.split("/")[-1] | |
| all_providers.add(provider) | |
| models.append({ | |
| "model_id": mid, | |
| "short_name": short_name, | |
| "provider": provider, | |
| "score": round(row["score"], 2), | |
| "date": model_dates[mid]["date"], | |
| }) | |
| if len(models) >= MIN_MODELS: | |
| benchmarks[key] = { | |
| "name": info["name"], | |
| "dataset": info["dataset"], | |
| "lower_is_better": info["lower_is_better"], | |
| "models": models, | |
| } | |
| print(f"\nFetching logos for {len(all_providers)} providers...") | |
| logos = fetch_all_logos(all_providers) | |
| print(f" got {len(logos)} logos") | |
| color_map = {} | |
| for i, provider in enumerate(sorted(all_providers)): | |
| color_map[provider] = PALETTE[i % len(PALETTE)] | |
| output = { | |
| "benchmarks": benchmarks, | |
| "logos": logos, | |
| "colors": color_map, | |
| "generated_at": datetime.now(timezone.utc).isoformat(), | |
| } | |
| data_json = json.dumps(output, indent=2) | |
| print(f"\nGenerated {len(data_json) / 1024:.1f} KB") | |
| for key, bm in benchmarks.items(): | |
| print(f" {bm['name']}: {len(bm['models'])} models") | |
| # Upload to Space | |
| print(f"\nUploading data.json to {SPACE_REPO}...") | |
| api = HfApi() | |
| with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: | |
| f.write(data_json) | |
| tmp_path = f.name | |
| try: | |
| api.upload_file( | |
| path_or_fileobj=tmp_path, | |
| path_in_repo="data.json", | |
| repo_id=SPACE_REPO, | |
| repo_type="space", | |
| commit_message=f"Update data.json ({datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')})", | |
| ) | |
| print("Done!") | |
| finally: | |
| Path(tmp_path).unlink(missing_ok=True) | |
| if __name__ == "__main__": | |
| main() | |