benchmark-race / update_data.py
davanstrien's picture
davanstrien HF Staff
Auto-discover all benchmark:official leaderboards on the Hub
bedf956 verified
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "httpx",
# "huggingface_hub",
# ]
# ///
"""
Scheduled job: regenerate data.json and upload to the benchmark-race Space.
Run locally:
uv run update_data.py
Schedule on HF Jobs (twice daily):
hf jobs scheduled uv run "0 8,20 * * *" \
--secrets HF_TOKEN \
https://huggingface.co/spaces/davanstrien/benchmark-race/resolve/main/update_data.py
"""
import json
import os
import re
import tempfile
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from pathlib import Path
import httpx
from huggingface_hub import HfApi
SPACE_REPO = "davanstrien/benchmark-race"
# Benchmarks are auto-discovered from datasets tagged `benchmark:official` on
# the Hub. The originals get keys preserved so the UI's hardcoded default
# (`sweVerified` in index.html) keeps working; new benchmarks get
# slugified keys and a name from cardData.pretty_name (or basename).
OVERRIDES = {
"SWE-bench/SWE-bench_Verified": ("sweVerified", "SWE-bench Verified"),
"ScaleAI/SWE-bench_Pro": ("swePro", "SWE-bench Pro"),
"TIGER-Lab/MMLU-Pro": ("mmluPro", "MMLU-Pro"),
"Idavidrein/gpqa": ("gpqa", "GPQA Diamond"),
"cais/hle": ("hle", "HLE"),
"MathArena/aime_2026": ("aime2026", "AIME 2026"),
"MathArena/hmmt_feb_2026": ("hmmt2026", "HMMT Feb 2026"),
"allenai/olmOCR-bench": ("olmOcr", "olmOCR-bench"),
"harborframework/terminal-bench-2.0": ("terminalBench", "Terminal-Bench 2.0"),
"FutureMa/EvasionBench": ("evasionBench", "EvasionBench"),
}
MIN_MODELS = 2
def slugify(dataset_id: str) -> str:
base = dataset_id.split("/")[-1]
s = re.sub(r"[^a-zA-Z0-9]+", "_", base).strip("_")
return s or dataset_id.replace("/", "_")
def discover_benchmarks(hf_token: str | None) -> list[dict]:
"""Fetch every benchmark:official dataset with a usable leaderboard."""
print("Discovering official benchmarks...")
resp = httpx.get(
"https://huggingface.co/api/datasets",
params={"filter": "benchmark:official", "limit": 500},
timeout=30,
)
resp.raise_for_status()
datasets = resp.json()
print(f" found {len(datasets)} datasets with benchmark:official tag")
configs = []
for d in datasets:
did = d["id"]
try:
info = httpx.get(f"https://huggingface.co/api/datasets/{did}", timeout=15).json()
except Exception as e:
print(f" {did}: skipped (info fetch failed: {e})")
continue
gated = bool(info.get("gated"))
card = info.get("cardData") or {}
if did in OVERRIDES:
key, pretty = OVERRIDES[did]
else:
key = slugify(did)
pretty = card.get("pretty_name") or did.split("/")[-1]
headers = {"Authorization": f"Bearer {hf_token}"} if (gated and hf_token) else {}
if gated and not hf_token:
print(f" {did}: skipped (gated, no token)")
continue
try:
lb = httpx.get(
f"https://huggingface.co/api/datasets/{did}/leaderboard",
headers=headers,
timeout=30,
)
except Exception as e:
print(f" {did}: skipped (leaderboard fetch failed: {e})")
continue
if lb.status_code != 200:
print(f" {did}: skipped (status {lb.status_code})")
continue
rows = lb.json()
if not isinstance(rows, list) or len(rows) < MIN_MODELS:
print(f" {did}: skipped (only {len(rows) if isinstance(rows, list) else '?'} rows)")
continue
lower_is_better = False
for r in rows:
if isinstance(r, dict) and "lower_is_better" in r:
lower_is_better = bool(r["lower_is_better"])
break
configs.append({
"dataset": did,
"key": key,
"name": pretty,
"gated": gated,
"lower_is_better": lower_is_better,
})
print(f" {did} -> {key} ({len(rows)} rows, lower_is_better={lower_is_better})")
return configs
PALETTE = [
"#6366f1", "#0d9488", "#d97706", "#e11d48", "#7c3aed",
"#16a34a", "#2563eb", "#ea580c", "#8b5cf6", "#0891b2",
"#c026d3", "#65a30d", "#dc2626", "#0284c7", "#a21caf",
"#059669", "#9333ea", "#ca8a04", "#be185d", "#0369a1",
]
def fetch_leaderboard(config: dict, hf_token: str | None) -> list[dict]:
url = f"https://huggingface.co/api/datasets/{config['dataset']}/leaderboard"
headers = {}
if config["gated"] and hf_token:
headers["Authorization"] = f"Bearer {hf_token}"
elif config["gated"]:
print(f" {config['name']}: skipped (gated, no token)")
return []
print(f" {config['name']}: fetching scores...")
try:
resp = httpx.get(url, headers=headers, timeout=30)
if resp.status_code != 200:
print(f" skip (status {resp.status_code})")
return []
data = resp.json()
if not isinstance(data, list):
return []
except Exception as e:
print(f" error: {e}")
return []
lower = config.get("lower_is_better", False)
seen: dict[str, float] = {}
for entry in data:
if not isinstance(entry, dict):
continue
model_id = entry.get("modelId")
score = entry.get("value")
if model_id and score is not None:
try:
score = float(score)
except (TypeError, ValueError):
continue
if model_id not in seen:
seen[model_id] = score
elif (lower and score < seen[model_id]) or (not lower and score > seen[model_id]):
seen[model_id] = score
print(f" {len(seen)} models")
return [{"model_id": mid, "score": s} for mid, s in seen.items()]
def fetch_model_dates(model_ids: list[str], hf_token: str | None) -> dict[str, dict]:
api = HfApi()
results = {}
def _get_info(mid):
try:
info = api.model_info(mid, token=hf_token)
params_b = None
if info.safetensors and hasattr(info.safetensors, "total"):
params_b = round(info.safetensors.total / 1_000_000_000, 1)
if params_b is None:
m = re.findall(r"[-_/](\d+\.?\d*)[Bb](?:[-_/]|$)", mid)
if m:
params_b = max(float(x) for x in m)
is_quantized = any(t.startswith("base_model:quantized:") for t in (info.tags or []))
return mid, info.created_at.strftime("%Y-%m-%d"), params_b, is_quantized
except Exception:
return mid, None, None, False
with ThreadPoolExecutor(max_workers=8) as pool:
futures = {pool.submit(_get_info, mid): mid for mid in model_ids}
for f in as_completed(futures):
mid, date, params, is_quantized = f.result()
if date:
results[mid] = {"date": date, "parameters_b": params, "is_quantized": is_quantized}
return results
def fetch_logo(provider: str) -> str | None:
try:
resp = httpx.get(
f"https://huggingface.co/api/organizations/{provider}/avatar",
timeout=5,
)
if resp.status_code == 200:
return resp.json().get("avatarUrl")
except Exception:
pass
return None
def fetch_all_logos(providers: set[str]) -> dict[str, str]:
logos = {}
with ThreadPoolExecutor(max_workers=8) as pool:
futures = {pool.submit(fetch_logo, p): p for p in providers}
for f in as_completed(futures):
p = futures[f]
url = f.result()
if url:
logos[p] = url
return logos
def main():
hf_token = os.environ.get("HF_TOKEN")
print("Generating data.json for bar chart race\n")
benchmark_configs = discover_benchmarks(hf_token)
print(f"\n{len(benchmark_configs)} usable benchmarks\n")
all_scores: dict[str, dict] = {}
all_model_ids: set[str] = set()
for config in benchmark_configs:
rows = fetch_leaderboard(config, hf_token)
if rows:
all_scores[config["key"]] = {
"name": config["name"],
"dataset": config["dataset"],
"lower_is_better": config["lower_is_better"],
"rows": rows,
}
all_model_ids.update(r["model_id"] for r in rows)
print(f"\n{len(all_model_ids)} unique models across {len(all_scores)} benchmarks")
print("Fetching model dates...")
model_dates = fetch_model_dates(list(all_model_ids), hf_token)
print(f" got dates for {len(model_dates)}/{len(all_model_ids)} models")
all_providers: set[str] = set()
benchmarks = {}
for key, info in all_scores.items():
models = []
for row in info["rows"]:
mid = row["model_id"]
if mid not in model_dates:
continue
if model_dates[mid].get("is_quantized"):
continue
provider = mid.split("/")[0] if "/" in mid else mid
short_name = mid.split("/")[-1]
all_providers.add(provider)
models.append({
"model_id": mid,
"short_name": short_name,
"provider": provider,
"score": round(row["score"], 2),
"date": model_dates[mid]["date"],
})
if len(models) >= MIN_MODELS:
benchmarks[key] = {
"name": info["name"],
"dataset": info["dataset"],
"lower_is_better": info["lower_is_better"],
"models": models,
}
print(f"\nFetching logos for {len(all_providers)} providers...")
logos = fetch_all_logos(all_providers)
print(f" got {len(logos)} logos")
color_map = {}
for i, provider in enumerate(sorted(all_providers)):
color_map[provider] = PALETTE[i % len(PALETTE)]
output = {
"benchmarks": benchmarks,
"logos": logos,
"colors": color_map,
"generated_at": datetime.now(timezone.utc).isoformat(),
}
data_json = json.dumps(output, indent=2)
print(f"\nGenerated {len(data_json) / 1024:.1f} KB")
for key, bm in benchmarks.items():
print(f" {bm['name']}: {len(bm['models'])} models")
# Upload to Space
print(f"\nUploading data.json to {SPACE_REPO}...")
api = HfApi()
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
f.write(data_json)
tmp_path = f.name
try:
api.upload_file(
path_or_fileobj=tmp_path,
path_in_repo="data.json",
repo_id=SPACE_REPO,
repo_type="space",
commit_message=f"Update data.json ({datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')})",
)
print("Done!")
finally:
Path(tmp_path).unlink(missing_ok=True)
if __name__ == "__main__":
main()