|
|
from huggingface_hub import run_job, inspect_job, fetch_job_logs |
|
|
import os |
|
|
import re |
|
|
import time |
|
|
from datetime import datetime |
|
|
import globals |
|
|
from utils.io import save_results, load_models_providers |
|
|
from typing import Optional |
|
|
|
|
|
|
|
|
def extract_score_from_job(job_id: str) -> Optional[float]: |
|
|
"""Extract average score from completed job logs. |
|
|
|
|
|
Parses the results table and calculates the average of the main metric |
|
|
for each task (the metric on the same line as the task name). |
|
|
""" |
|
|
try: |
|
|
|
|
|
logs = fetch_job_logs(job_id=job_id, namespace=globals.NAMESPACE) |
|
|
|
|
|
scores = [] |
|
|
|
|
|
for line in logs: |
|
|
|
|
|
|
|
|
|
|
|
if '|' in line: |
|
|
parts = [p.strip() for p in line.split('|')] |
|
|
|
|
|
|
|
|
|
|
|
if len(parts) == 8: |
|
|
_, task, _, metric, value, _, _, _ = parts |
|
|
|
|
|
|
|
|
if task and task in [t.replace("|", ":") for t in globals.TASKS.split(",")]: |
|
|
|
|
|
|
|
|
score = float(value) |
|
|
scores.append(score) |
|
|
print(f"Extracted score {score} for task '{task}' metric '{metric}'") |
|
|
|
|
|
|
|
|
if scores: |
|
|
average_score = sum(scores) / len(scores) |
|
|
print(f"Calculated average score: {average_score:.4f} from {len(scores)} tasks") |
|
|
return average_score |
|
|
else: |
|
|
print("No scores found in job logs") |
|
|
|
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error extracting score for job {job_id}: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return None |
|
|
|
|
|
|
|
|
def run_single_job(model: str, provider: str, tasks: str = globals.TASKS, run_number: int = 1) -> Optional[str]: |
|
|
"""Run a single job for a model-provider combination. |
|
|
|
|
|
Args: |
|
|
model: Model ID |
|
|
provider: Provider name |
|
|
tasks: Tasks to run |
|
|
run_number: Which run this is (1-4 for multiple runs) |
|
|
""" |
|
|
|
|
|
if not model or not provider: |
|
|
print("Missing model or provider") |
|
|
return -1 |
|
|
|
|
|
|
|
|
key = globals.get_model_provider_key(model, provider) |
|
|
if key in globals.job_results: |
|
|
runs = globals.job_results[key].get("runs", []) |
|
|
|
|
|
for run in runs: |
|
|
if run.get("run_number") == run_number and run.get("status") == "RUNNING": |
|
|
print(f"Run {run_number} for {model} on {provider} is already running. Please wait for it to complete.") |
|
|
return -1 |
|
|
|
|
|
print(f"Starting job for model={model}, provider={provider}, run {run_number}/{globals.NUM_RUNS_PER_JOB}") |
|
|
|
|
|
job = run_job( |
|
|
image="hf.co/spaces/OpenEvals/EvalsOnTheHub", |
|
|
command=[ |
|
|
"lighteval", "endpoint", "inference-providers", |
|
|
f"model_name={model},provider={provider}", |
|
|
tasks, |
|
|
"--push-to-hub", "--save-details", |
|
|
"--results-org", "IPTesting", |
|
|
], |
|
|
namespace=globals.NAMESPACE, |
|
|
secrets={"HF_TOKEN": os.getenv("HF_TOKEN")}, |
|
|
token=os.getenv("HF_TOKEN") |
|
|
) |
|
|
|
|
|
job_id = job.id |
|
|
start_time = datetime.now() |
|
|
|
|
|
with globals.results_lock: |
|
|
|
|
|
if key not in globals.job_results: |
|
|
|
|
|
previous_score = None |
|
|
globals.job_results[key] = { |
|
|
"model": model, |
|
|
"provider": provider, |
|
|
"last_run": start_time.strftime("%Y-%m-%d %H:%M:%S"), |
|
|
"status": "RUNNING", |
|
|
"current_score": None, |
|
|
"previous_score": None, |
|
|
"job_id": job_id, |
|
|
"start_time": start_time.isoformat(), |
|
|
"duration": None, |
|
|
"completed_at": None, |
|
|
"runs": [] |
|
|
} |
|
|
else: |
|
|
|
|
|
previous_score = globals.job_results[key].get("current_score") |
|
|
globals.job_results[key]["status"] = "RUNNING" |
|
|
globals.job_results[key]["last_run"] = start_time.strftime("%Y-%m-%d %H:%M:%S") |
|
|
globals.job_results[key]["start_time"] = start_time.isoformat() |
|
|
globals.job_results[key]["previous_score"] = previous_score |
|
|
|
|
|
|
|
|
globals.job_results[key]["runs"].append({ |
|
|
"run_number": run_number, |
|
|
"job_id": job_id, |
|
|
"status": "RUNNING", |
|
|
"score": None, |
|
|
"start_time": start_time.isoformat(), |
|
|
"duration": None, |
|
|
"completed_at": None |
|
|
}) |
|
|
|
|
|
|
|
|
print(f"Job launched: ID={job_id}, model={model}, provider={provider}, run {run_number}") |
|
|
return job_id |
|
|
|
|
|
|
|
|
def run_multiple_jobs(model: str, provider: str, tasks: str = globals.TASKS, num_runs: int = globals.NUM_RUNS_PER_JOB) -> list: |
|
|
"""Run multiple jobs for a model-provider combination to reduce variance. |
|
|
|
|
|
Returns: |
|
|
List of job IDs launched |
|
|
""" |
|
|
job_ids = [] |
|
|
for run_number in range(1, num_runs + 1): |
|
|
job_id = run_single_job(model, provider, tasks, run_number=run_number) |
|
|
if job_id != -1: |
|
|
job_ids.append(job_id) |
|
|
|
|
|
return job_ids |
|
|
|
|
|
|
|
|
def launch_jobs(tasks: str = globals.TASKS, config_file: str = globals.LOCAL_CONFIG_FILE): |
|
|
"""Launch jobs for all models and providers with multiple runs per combination.""" |
|
|
models_providers = load_models_providers(config_file) |
|
|
|
|
|
if not models_providers: |
|
|
print("No valid model-provider combinations found in config file") |
|
|
return "No valid model-provider combinations found" |
|
|
|
|
|
print(f"Found {len(models_providers)} model-provider combinations") |
|
|
print(f"Will launch {globals.NUM_RUNS_PER_JOB} runs per combination") |
|
|
|
|
|
launched_count = 0 |
|
|
for model, provider in models_providers: |
|
|
job_ids = run_multiple_jobs(model, provider, tasks) |
|
|
if job_ids: |
|
|
launched_count += len(job_ids) |
|
|
|
|
|
|
|
|
save_results() |
|
|
total_expected = len(models_providers) * globals.NUM_RUNS_PER_JOB |
|
|
print(f"Launched {launched_count}/{total_expected} jobs successfully") |
|
|
return f"Launched {launched_count}/{total_expected} jobs ({len(models_providers)} model-provider combinations × {globals.NUM_RUNS_PER_JOB} runs each)" |
|
|
|
|
|
def relaunch_failed_jobs(): |
|
|
"""Relaunch only failed model-provider combinations from job results.""" |
|
|
if not globals.job_results: |
|
|
return "No existing jobs to relaunch" |
|
|
|
|
|
failed_jobs = [(key, info) for key, info in globals.job_results.items() |
|
|
if info.get("status") in ["ERROR", "FAILED"]] |
|
|
|
|
|
if not failed_jobs: |
|
|
return "No failed jobs to relaunch" |
|
|
|
|
|
relaunched_count = 0 |
|
|
for key, info in failed_jobs: |
|
|
model = info["model"] |
|
|
provider = info["provider"] |
|
|
job_id = run_single_job(model, provider, globals.TASKS) |
|
|
if job_id != -1: |
|
|
relaunched_count += 1 |
|
|
|
|
|
|
|
|
save_results() |
|
|
return f"Relaunched {relaunched_count}/{len(failed_jobs)} failed jobs" |
|
|
|
|
|
|
|
|
|
|
|
def update_job_statuses() -> None: |
|
|
"""Check and update the status of active jobs and aggregate scores from multiple runs.""" |
|
|
try: |
|
|
keys = list(globals.job_results.keys()) |
|
|
|
|
|
for key in keys: |
|
|
try: |
|
|
with globals.results_lock: |
|
|
runs = globals.job_results[key].get("runs", []) |
|
|
|
|
|
if not runs: |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
all_completed = True |
|
|
all_failed = True |
|
|
any_running = False |
|
|
|
|
|
for run in runs: |
|
|
if run["status"] == "RUNNING": |
|
|
|
|
|
try: |
|
|
job_info = inspect_job(job_id=run["job_id"], namespace=globals.NAMESPACE) |
|
|
new_status = job_info.status.stage |
|
|
|
|
|
if run["status"] != new_status: |
|
|
run["status"] = new_status |
|
|
print(f"Run {run['run_number']} job {run['job_id']} status changed: {run['status']} -> {new_status}") |
|
|
|
|
|
if new_status == "COMPLETED": |
|
|
completed_time = datetime.now() |
|
|
run["completed_at"] = completed_time.strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
|
|
|
if run.get("start_time"): |
|
|
start_time = datetime.fromisoformat(run["start_time"]) |
|
|
run["duration"] = (completed_time - start_time).total_seconds() |
|
|
|
|
|
|
|
|
score = extract_score_from_job(run["job_id"]) |
|
|
if score is not None: |
|
|
run["score"] = score |
|
|
print(f"Run {run['run_number']}: extracted score {score:.4f}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error checking run {run['run_number']}: {e}") |
|
|
|
|
|
|
|
|
if run["status"] == "RUNNING": |
|
|
any_running = True |
|
|
all_completed = False |
|
|
all_failed = False |
|
|
elif run["status"] == "COMPLETED": |
|
|
all_failed = False |
|
|
elif run["status"] in ["ERROR", "FAILED"]: |
|
|
all_completed = False |
|
|
|
|
|
|
|
|
if any_running: |
|
|
globals.job_results[key]["status"] = "RUNNING" |
|
|
elif all_completed: |
|
|
globals.job_results[key]["status"] = "COMPLETED" |
|
|
|
|
|
|
|
|
completed_scores = [run["score"] for run in runs if run["status"] == "COMPLETED" and run["score"] is not None] |
|
|
completed_durations = [run["duration"] for run in runs if run["status"] == "COMPLETED" and run.get("duration") is not None] |
|
|
|
|
|
if completed_scores: |
|
|
import statistics |
|
|
mean_score = statistics.mean(completed_scores) |
|
|
variance = statistics.variance(completed_scores) if len(completed_scores) > 1 else 0.0 |
|
|
|
|
|
globals.job_results[key]["current_score"] = mean_score |
|
|
globals.job_results[key]["score_variance"] = variance |
|
|
|
|
|
print(f"Aggregated {len(completed_scores)} runs: mean={mean_score:.4f}, variance={variance:.6f}") |
|
|
|
|
|
|
|
|
if completed_durations: |
|
|
mean_duration = statistics.mean(completed_durations) |
|
|
globals.job_results[key]["duration"] = mean_duration |
|
|
print(f"Average duration: {mean_duration:.2f} seconds") |
|
|
|
|
|
|
|
|
latest_completion = max([run["completed_at"] for run in runs if run.get("completed_at")], default=None) |
|
|
if latest_completion: |
|
|
globals.job_results[key]["completed_at"] = latest_completion |
|
|
|
|
|
elif all_failed: |
|
|
globals.job_results[key]["status"] = "ERROR" |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error checking job: {str(e)}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
save_results() |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error in update_job_statuses: {str(e)}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|