File size: 13,068 Bytes
8dafde0 7f5506e 096bf86 7f5506e 8dafde0 7f5506e 8dafde0 7f5506e 8dafde0 7f5506e 0b4b222 7f5506e cc714d4 7f5506e 32de2c3 cc714d4 7f5506e 0b4b222 7f5506e 8580b64 7f5506e 0b4b222 7f5506e 0b4b222 9252209 0b4b222 9252209 0b4b222 7f5506e 9252209 0b4b222 7f5506e 0b4b222 80d548a 7f5506e 0b4b222 7f5506e 0b4b222 7f5506e 0b4b222 7f5506e 9252209 0b4b222 7f5506e 80d548a 9252209 80d548a 7f5506e 0b4b222 7f5506e 32de2c3 7f5506e 0b4b222 71ff8d6 0b4b222 71ff8d6 0b4b222 8dafde0 7f5506e 0b4b222 7f5506e 0b4b222 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
from huggingface_hub import run_job, inspect_job, fetch_job_logs
import os
import re
import time
from datetime import datetime
import globals
from utils.io import save_results, load_models_providers
from typing import Optional
def extract_score_from_job(job_id: str) -> Optional[float]:
"""Extract average score from completed job logs.
Parses the results table and calculates the average of the main metric
for each task (the metric on the same line as the task name).
"""
try:
# Inspect the job to get details and logs
logs = fetch_job_logs(job_id=job_id, namespace=globals.NAMESPACE)
scores = []
for line in logs:
# Find the results table
# Look for lines that match the pattern: |task_name|version|metric|value|...|
# We want to extract the score (value) from lines where the task name is not empty
if '|' in line:
parts = [p.strip() for p in line.split('|')]
# Skip header and separator lines
# Table format: | Task | Version | Metric | Value | | Stderr |
if len(parts) == 8:
_, task, _, metric, value, _, _, _ = parts
# Is the task name correct
if task and task in [t.replace("|", ":") for t in globals.TASKS.split(",")]:
# Try to extract numeric value
# Remove any extra characters and convert to float
score = float(value)
scores.append(score)
print(f"Extracted score {score} for task '{task}' metric '{metric}'")
# Calculate average of all task scores
if scores:
average_score = sum(scores) / len(scores)
print(f"Calculated average score: {average_score:.4f} from {len(scores)} tasks")
return average_score
else:
print("No scores found in job logs")
return None
except Exception as e:
print(f"Error extracting score for job {job_id}: {e}")
import traceback
traceback.print_exc()
return None
def run_single_job(model: str, provider: str, tasks: str = globals.TASKS, run_number: int = 1) -> Optional[str]:
"""Run a single job for a model-provider combination.
Args:
model: Model ID
provider: Provider name
tasks: Tasks to run
run_number: Which run this is (1-4 for multiple runs)
"""
if not model or not provider:
print("Missing model or provider")
return -1
# Check if this specific run number is already running for this model-provider
key = globals.get_model_provider_key(model, provider)
if key in globals.job_results:
runs = globals.job_results[key].get("runs", [])
# Check if this specific run number is already running
for run in runs:
if run.get("run_number") == run_number and run.get("status") == "RUNNING":
print(f"Run {run_number} for {model} on {provider} is already running. Please wait for it to complete.")
return -1
print(f"Starting job for model={model}, provider={provider}, run {run_number}/{globals.NUM_RUNS_PER_JOB}")
job = run_job(
image="hf.co/spaces/OpenEvals/EvalsOnTheHub",
command=[
"lighteval", "endpoint", "inference-providers",
f"model_name={model},provider={provider}",
tasks,
"--push-to-hub", "--save-details",
"--results-org", "IPTesting",
],
namespace=globals.NAMESPACE,
secrets={"HF_TOKEN": os.getenv("HF_TOKEN")},
token=os.getenv("HF_TOKEN")
)
job_id = job.id
start_time = datetime.now()
with globals.results_lock:
# Initialize or update the job result
if key not in globals.job_results:
# First run - initialize the structure
previous_score = None
globals.job_results[key] = {
"model": model,
"provider": provider,
"last_run": start_time.strftime("%Y-%m-%d %H:%M:%S"),
"status": "RUNNING",
"current_score": None,
"previous_score": None,
"job_id": job_id,
"start_time": start_time.isoformat(),
"duration": None,
"completed_at": None,
"runs": []
}
else:
# Subsequent run or relaunch
previous_score = globals.job_results[key].get("current_score")
globals.job_results[key]["status"] = "RUNNING"
globals.job_results[key]["last_run"] = start_time.strftime("%Y-%m-%d %H:%M:%S")
globals.job_results[key]["start_time"] = start_time.isoformat()
globals.job_results[key]["previous_score"] = previous_score
# Add this run to the runs list
globals.job_results[key]["runs"].append({
"run_number": run_number,
"job_id": job_id,
"status": "RUNNING",
"score": None,
"start_time": start_time.isoformat(),
"duration": None,
"completed_at": None
})
# Don't save immediately - let the periodic save handle it
print(f"Job launched: ID={job_id}, model={model}, provider={provider}, run {run_number}")
return job_id
def run_multiple_jobs(model: str, provider: str, tasks: str = globals.TASKS, num_runs: int = globals.NUM_RUNS_PER_JOB) -> list:
"""Run multiple jobs for a model-provider combination to reduce variance.
Returns:
List of job IDs launched
"""
job_ids = []
for run_number in range(1, num_runs + 1):
job_id = run_single_job(model, provider, tasks, run_number=run_number)
if job_id != -1:
job_ids.append(job_id)
return job_ids
# Todo: factorize both following functions
def launch_jobs(tasks: str = globals.TASKS, config_file: str = globals.LOCAL_CONFIG_FILE):
"""Launch jobs for all models and providers with multiple runs per combination."""
models_providers = load_models_providers(config_file)
if not models_providers:
print("No valid model-provider combinations found in config file")
return "No valid model-provider combinations found"
print(f"Found {len(models_providers)} model-provider combinations")
print(f"Will launch {globals.NUM_RUNS_PER_JOB} runs per combination")
launched_count = 0
for model, provider in models_providers:
job_ids = run_multiple_jobs(model, provider, tasks)
if job_ids:
launched_count += len(job_ids)
# Save all results once after launching all jobs
save_results()
total_expected = len(models_providers) * globals.NUM_RUNS_PER_JOB
print(f"Launched {launched_count}/{total_expected} jobs successfully")
return f"Launched {launched_count}/{total_expected} jobs ({len(models_providers)} model-provider combinations × {globals.NUM_RUNS_PER_JOB} runs each)"
def relaunch_failed_jobs():
"""Relaunch only failed model-provider combinations from job results."""
if not globals.job_results:
return "No existing jobs to relaunch"
failed_jobs = [(key, info) for key, info in globals.job_results.items()
if info.get("status") in ["ERROR", "FAILED"]]
if not failed_jobs:
return "No failed jobs to relaunch"
relaunched_count = 0
for key, info in failed_jobs:
model = info["model"]
provider = info["provider"]
job_id = run_single_job(model, provider, globals.TASKS)
if job_id != -1:
relaunched_count += 1
# Save all results once after relaunching all failed jobs
save_results()
return f"Relaunched {relaunched_count}/{len(failed_jobs)} failed jobs"
def update_job_statuses() -> None:
"""Check and update the status of active jobs and aggregate scores from multiple runs."""
try:
keys = list(globals.job_results.keys())
for key in keys:
try:
with globals.results_lock:
runs = globals.job_results[key].get("runs", [])
if not runs:
# Legacy format - no runs list
continue
# Check status of each run
all_completed = True
all_failed = True
any_running = False
for run in runs:
if run["status"] == "RUNNING":
# Check if this run's job is still running
try:
job_info = inspect_job(job_id=run["job_id"], namespace=globals.NAMESPACE)
new_status = job_info.status.stage
if run["status"] != new_status:
run["status"] = new_status
print(f"Run {run['run_number']} job {run['job_id']} status changed: {run['status']} -> {new_status}")
if new_status == "COMPLETED":
completed_time = datetime.now()
run["completed_at"] = completed_time.strftime("%Y-%m-%d %H:%M:%S")
# Calculate duration
if run.get("start_time"):
start_time = datetime.fromisoformat(run["start_time"])
run["duration"] = (completed_time - start_time).total_seconds()
# Extract score
score = extract_score_from_job(run["job_id"])
if score is not None:
run["score"] = score
print(f"Run {run['run_number']}: extracted score {score:.4f}")
except Exception as e:
print(f"Error checking run {run['run_number']}: {e}")
# Update aggregate status flags
if run["status"] == "RUNNING":
any_running = True
all_completed = False
all_failed = False
elif run["status"] == "COMPLETED":
all_failed = False
elif run["status"] in ["ERROR", "FAILED"]:
all_completed = False
# Update overall status
if any_running:
globals.job_results[key]["status"] = "RUNNING"
elif all_completed:
globals.job_results[key]["status"] = "COMPLETED"
# Calculate aggregate statistics from completed runs
completed_scores = [run["score"] for run in runs if run["status"] == "COMPLETED" and run["score"] is not None]
completed_durations = [run["duration"] for run in runs if run["status"] == "COMPLETED" and run.get("duration") is not None]
if completed_scores:
import statistics
mean_score = statistics.mean(completed_scores)
variance = statistics.variance(completed_scores) if len(completed_scores) > 1 else 0.0
globals.job_results[key]["current_score"] = mean_score
globals.job_results[key]["score_variance"] = variance
print(f"Aggregated {len(completed_scores)} runs: mean={mean_score:.4f}, variance={variance:.6f}")
# Calculate average duration
if completed_durations:
mean_duration = statistics.mean(completed_durations)
globals.job_results[key]["duration"] = mean_duration
print(f"Average duration: {mean_duration:.2f} seconds")
# Update completion time to latest run
latest_completion = max([run["completed_at"] for run in runs if run.get("completed_at")], default=None)
if latest_completion:
globals.job_results[key]["completed_at"] = latest_completion
elif all_failed:
globals.job_results[key]["status"] = "ERROR"
except Exception as e:
print(f"Error checking job: {str(e)}")
import traceback
traceback.print_exc()
save_results()
except Exception as e:
print(f"Error in update_job_statuses: {str(e)}")
import traceback
traceback.print_exc()
|