from huggingface_hub import run_job, inspect_job, fetch_job_logs import os import re import time from datetime import datetime import globals from utils.io import save_results, load_models_providers from typing import Optional def extract_score_from_job(job_id: str) -> Optional[float]: """Extract average score from completed job logs. Parses the results table and calculates the average of the main metric for each task (the metric on the same line as the task name). """ try: # Inspect the job to get details and logs logs = fetch_job_logs(job_id=job_id, namespace=globals.NAMESPACE) scores = [] for line in logs: # Find the results table # Look for lines that match the pattern: |task_name|version|metric|value|...| # We want to extract the score (value) from lines where the task name is not empty if '|' in line: parts = [p.strip() for p in line.split('|')] # Skip header and separator lines # Table format: | Task | Version | Metric | Value | | Stderr | if len(parts) == 8: _, task, _, metric, value, _, _, _ = parts # Is the task name correct if task and task in [t.replace("|", ":") for t in globals.TASKS.split(",")]: # Try to extract numeric value # Remove any extra characters and convert to float score = float(value) scores.append(score) print(f"Extracted score {score} for task '{task}' metric '{metric}'") # Calculate average of all task scores if scores: average_score = sum(scores) / len(scores) print(f"Calculated average score: {average_score:.4f} from {len(scores)} tasks") return average_score else: print("No scores found in job logs") return None except Exception as e: print(f"Error extracting score for job {job_id}: {e}") import traceback traceback.print_exc() return None def run_single_job(model: str, provider: str, tasks: str) -> Optional[str]: """Run a single job for a model-provider combination.""" if not model or not provider: print("Missing model or provider") return -1 # Verify the model-provider combination exists in the config models_providers = load_models_providers(globals.LOCAL_CONFIG_FILE) if (model, provider) not in models_providers: print( f"Error: {model} with {provider} not found in {globals.LOCAL_CONFIG_FILE}") return -1 # Check if job is already running key = globals.get_model_provider_key(model, provider) with globals.results_lock: if key in globals.job_results: current_status = globals.job_results[key].get("status") if current_status == "running": print( f"Job for {model} on {provider} is already running. Please wait for it to complete.") return -1 print(f"Starting job for model={model}, provider={provider}") job = run_job( image="hf.co/spaces/OpenEvals/EvalsOnTheHub", command=[ "lighteval", "endpoint", "inference-providers", f"model_name={model},provider={provider}", tasks, "--push-to-hub", "--save-details", "--results-org", "IPTesting", "--max-samples", "10" ], namespace=globals.NAMESPACE, secrets={"HF_TOKEN": os.getenv("HF_TOKEN")}, token=os.getenv("HF_TOKEN") ) job_id = job.id key = globals.get_model_provider_key(model, provider) with globals.results_lock: # Move current score to previous score if it exists (relaunching) previous_score = None if key in globals.job_results and globals.job_results[key].get("current_score"): previous_score = globals.job_results[key]["current_score"] globals.job_results[key] = { "model": model, "provider": provider, "last_run": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "status": "running", "current_score": None, "previous_score": previous_score, "job_id": job_id } save_results() print(f"Job launched: ID={job_id}, model={model}, provider={provider}") return job_id def launch_jobs(tasks: str = globals.TASKS, config_file: str = globals.LOCAL_CONFIG_FILE): """Launch jobs for all models and providers.""" models_providers = load_models_providers(config_file) if not models_providers: print("No valid model-provider combinations found in config file") return "No valid model-provider combinations found" print(f"Found {len(models_providers)} model-provider combinations") launched_count = 0 for model, provider in models_providers: job_id = run_single_job(model, provider, tasks) if job_id != -1: launched_count += 1 # Small delay between launches to avoid rate limiting time.sleep(2) print(f"Launched {launched_count}/{len(models_providers)} jobs successfully") return f"Launched {launched_count} jobs" def update_job_statuses() -> None: """Check and update the status of active jobs.""" try: with globals.results_lock: keys = list(globals.job_results.keys()) for key in keys: try: with globals.results_lock: job_id = globals.job_results[key]["job_id"] job_info = inspect_job(job_id=job_id) new_status = job_info.status.stage with globals.results_lock: old_status = globals.job_results[key]["status"] if old_status != new_status: globals.job_results[key]["status"] = new_status print(f"Job {job_id} status changed: {old_status} -> {new_status}") # If job completed, try to extract score if new_status == "COMPLETED": score = extract_score_from_job(job_id) if score is not None: globals.job_results[key]["current_score"] = score if new_status == "COMPLETED" and globals.job_results[key]["current_score"] is None: score = extract_score_from_job(job_id) if score is not None: globals.job_results[key]["current_score"] = score except Exception as e: print(f"Error checking job: {str(e)}") save_results() except Exception as e: print(f"Error in update_job_statuses: {str(e)}")