from huggingface_hub import run_job, inspect_job, fetch_job_logs import os import re import time from datetime import datetime import globals from utils.io import save_results, load_models_providers from typing import Optional def extract_score_from_job(job_id: str) -> Optional[float]: """Extract average score from completed job logs. Parses the results table and calculates the average of the main metric for each task (the metric on the same line as the task name). """ try: # Inspect the job to get details and logs logs = fetch_job_logs(job_id=job_id, namespace=globals.NAMESPACE) scores = [] for line in logs: # Find the results table # Look for lines that match the pattern: |task_name|version|metric|value|...| # We want to extract the score (value) from lines where the task name is not empty if '|' in line: parts = [p.strip() for p in line.split('|')] # Skip header and separator lines # Table format: | Task | Version | Metric | Value | | Stderr | if len(parts) == 8: _, task, _, metric, value, _, _, _ = parts # Is the task name correct if task and task in [t.replace("|", ":") for t in globals.TASKS.split(",")]: # Try to extract numeric value # Remove any extra characters and convert to float score = float(value) scores.append(score) print(f"Extracted score {score} for task '{task}' metric '{metric}'") # Calculate average of all task scores if scores: average_score = sum(scores) / len(scores) print(f"Calculated average score: {average_score:.4f} from {len(scores)} tasks") return average_score else: print("No scores found in job logs") return None except Exception as e: print(f"Error extracting score for job {job_id}: {e}") import traceback traceback.print_exc() return None def run_single_job(model: str, provider: str, tasks: str = globals.TASKS, run_number: int = 1) -> Optional[str]: """Run a single job for a model-provider combination. Args: model: Model ID provider: Provider name tasks: Tasks to run run_number: Which run this is (1-4 for multiple runs) """ if not model or not provider: print("Missing model or provider") return -1 # Check if this specific run number is already running for this model-provider key = globals.get_model_provider_key(model, provider) if key in globals.job_results: runs = globals.job_results[key].get("runs", []) # Check if this specific run number is already running for run in runs: if run.get("run_number") == run_number and run.get("status") == "RUNNING": print(f"Run {run_number} for {model} on {provider} is already running. Please wait for it to complete.") return -1 print(f"Starting job for model={model}, provider={provider}, run {run_number}/{globals.NUM_RUNS_PER_JOB}") job = run_job( image="hf.co/spaces/OpenEvals/EvalsOnTheHub", command=[ "lighteval", "endpoint", "inference-providers", f"model_name={model},provider={provider}", tasks, "--push-to-hub", "--save-details", "--results-org", "IPTesting", ], namespace=globals.NAMESPACE, secrets={"HF_TOKEN": os.getenv("HF_TOKEN")}, token=os.getenv("HF_TOKEN") ) job_id = job.id start_time = datetime.now() with globals.results_lock: # Initialize or update the job result if key not in globals.job_results: # First run - initialize the structure previous_score = None globals.job_results[key] = { "model": model, "provider": provider, "last_run": start_time.strftime("%Y-%m-%d %H:%M:%S"), "status": "RUNNING", "current_score": None, "previous_score": None, "job_id": job_id, "start_time": start_time.isoformat(), "duration": None, "completed_at": None, "runs": [] } else: # Subsequent run or relaunch previous_score = globals.job_results[key].get("current_score") globals.job_results[key]["status"] = "RUNNING" globals.job_results[key]["last_run"] = start_time.strftime("%Y-%m-%d %H:%M:%S") globals.job_results[key]["start_time"] = start_time.isoformat() globals.job_results[key]["previous_score"] = previous_score # Add this run to the runs list globals.job_results[key]["runs"].append({ "run_number": run_number, "job_id": job_id, "status": "RUNNING", "score": None, "start_time": start_time.isoformat(), "duration": None, "completed_at": None }) # Don't save immediately - let the periodic save handle it print(f"Job launched: ID={job_id}, model={model}, provider={provider}, run {run_number}") return job_id def run_multiple_jobs(model: str, provider: str, tasks: str = globals.TASKS, num_runs: int = globals.NUM_RUNS_PER_JOB) -> list: """Run multiple jobs for a model-provider combination to reduce variance. Returns: List of job IDs launched """ job_ids = [] for run_number in range(1, num_runs + 1): job_id = run_single_job(model, provider, tasks, run_number=run_number) if job_id != -1: job_ids.append(job_id) return job_ids # Todo: factorize both following functions def launch_jobs(tasks: str = globals.TASKS, config_file: str = globals.LOCAL_CONFIG_FILE): """Launch jobs for all models and providers with multiple runs per combination.""" models_providers = load_models_providers(config_file) if not models_providers: print("No valid model-provider combinations found in config file") return "No valid model-provider combinations found" print(f"Found {len(models_providers)} model-provider combinations") print(f"Will launch {globals.NUM_RUNS_PER_JOB} runs per combination") launched_count = 0 for model, provider in models_providers: job_ids = run_multiple_jobs(model, provider, tasks) if job_ids: launched_count += len(job_ids) # Save all results once after launching all jobs save_results() total_expected = len(models_providers) * globals.NUM_RUNS_PER_JOB print(f"Launched {launched_count}/{total_expected} jobs successfully") return f"Launched {launched_count}/{total_expected} jobs ({len(models_providers)} model-provider combinations × {globals.NUM_RUNS_PER_JOB} runs each)" def relaunch_failed_jobs(): """Relaunch only failed model-provider combinations from job results.""" if not globals.job_results: return "No existing jobs to relaunch" failed_jobs = [(key, info) for key, info in globals.job_results.items() if info.get("status") in ["ERROR", "FAILED"]] if not failed_jobs: return "No failed jobs to relaunch" relaunched_count = 0 for key, info in failed_jobs: model = info["model"] provider = info["provider"] job_id = run_single_job(model, provider, globals.TASKS) if job_id != -1: relaunched_count += 1 # Save all results once after relaunching all failed jobs save_results() return f"Relaunched {relaunched_count}/{len(failed_jobs)} failed jobs" def update_job_statuses() -> None: """Check and update the status of active jobs and aggregate scores from multiple runs.""" try: keys = list(globals.job_results.keys()) for key in keys: try: with globals.results_lock: runs = globals.job_results[key].get("runs", []) if not runs: # Legacy format - no runs list continue # Check status of each run all_completed = True all_failed = True any_running = False for run in runs: if run["status"] == "RUNNING": # Check if this run's job is still running try: job_info = inspect_job(job_id=run["job_id"], namespace=globals.NAMESPACE) new_status = job_info.status.stage if run["status"] != new_status: run["status"] = new_status print(f"Run {run['run_number']} job {run['job_id']} status changed: {run['status']} -> {new_status}") if new_status == "COMPLETED": completed_time = datetime.now() run["completed_at"] = completed_time.strftime("%Y-%m-%d %H:%M:%S") # Calculate duration if run.get("start_time"): start_time = datetime.fromisoformat(run["start_time"]) run["duration"] = (completed_time - start_time).total_seconds() # Extract score score = extract_score_from_job(run["job_id"]) if score is not None: run["score"] = score print(f"Run {run['run_number']}: extracted score {score:.4f}") except Exception as e: print(f"Error checking run {run['run_number']}: {e}") # Update aggregate status flags if run["status"] == "RUNNING": any_running = True all_completed = False all_failed = False elif run["status"] == "COMPLETED": all_failed = False elif run["status"] in ["ERROR", "FAILED"]: all_completed = False # Update overall status if any_running: globals.job_results[key]["status"] = "RUNNING" elif all_completed: globals.job_results[key]["status"] = "COMPLETED" # Calculate aggregate statistics from completed runs completed_scores = [run["score"] for run in runs if run["status"] == "COMPLETED" and run["score"] is not None] completed_durations = [run["duration"] for run in runs if run["status"] == "COMPLETED" and run.get("duration") is not None] if completed_scores: import statistics mean_score = statistics.mean(completed_scores) variance = statistics.variance(completed_scores) if len(completed_scores) > 1 else 0.0 globals.job_results[key]["current_score"] = mean_score globals.job_results[key]["score_variance"] = variance print(f"Aggregated {len(completed_scores)} runs: mean={mean_score:.4f}, variance={variance:.6f}") # Calculate average duration if completed_durations: mean_duration = statistics.mean(completed_durations) globals.job_results[key]["duration"] = mean_duration print(f"Average duration: {mean_duration:.2f} seconds") # Update completion time to latest run latest_completion = max([run["completed_at"] for run in runs if run.get("completed_at")], default=None) if latest_completion: globals.job_results[key]["completed_at"] = latest_completion elif all_failed: globals.job_results[key]["status"] = "ERROR" except Exception as e: print(f"Error checking job: {str(e)}") import traceback traceback.print_exc() save_results() except Exception as e: print(f"Error in update_job_statuses: {str(e)}") import traceback traceback.print_exc()