File size: 13,068 Bytes
8dafde0
7f5506e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
096bf86
7f5506e
8dafde0
7f5506e
8dafde0
7f5506e
 
 
8dafde0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7f5506e
 
 
 
 
 
 
 
 
 
0b4b222
 
 
 
 
 
 
 
 
7f5506e
 
 
 
 
cc714d4
7f5506e
32de2c3
cc714d4
 
 
 
 
 
7f5506e
0b4b222
7f5506e
 
 
 
 
 
 
 
 
 
8580b64
7f5506e
 
 
 
 
0b4b222
7f5506e
 
0b4b222
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9252209
0b4b222
 
9252209
 
 
0b4b222
7f5506e
9252209
0b4b222
7f5506e
 
0b4b222
 
 
 
 
 
 
 
 
 
 
 
 
 
 
80d548a
7f5506e
0b4b222
7f5506e
 
 
 
 
 
 
0b4b222
7f5506e
 
 
0b4b222
 
 
7f5506e
9252209
 
0b4b222
 
 
7f5506e
80d548a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9252209
 
80d548a
 
 
7f5506e
 
0b4b222
7f5506e
32de2c3
7f5506e
 
 
 
0b4b222
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71ff8d6
0b4b222
 
 
 
 
 
 
 
 
 
 
71ff8d6
 
 
 
 
 
0b4b222
 
 
 
 
 
 
8dafde0
7f5506e
 
0b4b222
 
7f5506e
 
 
 
 
0b4b222
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
from huggingface_hub import run_job, inspect_job, fetch_job_logs
import os
import re
import time
from datetime import datetime
import globals
from utils.io import save_results, load_models_providers
from typing import Optional


def extract_score_from_job(job_id: str) -> Optional[float]:
    """Extract average score from completed job logs.

    Parses the results table and calculates the average of the main metric
    for each task (the metric on the same line as the task name).
    """
    try:
        # Inspect the job to get details and logs
        logs = fetch_job_logs(job_id=job_id, namespace=globals.NAMESPACE)

        scores = []

        for line in logs:
            # Find the results table
            # Look for lines that match the pattern: |task_name|version|metric|value|...|
            # We want to extract the score (value) from lines where the task name is not empty
            if '|' in line:
                parts = [p.strip() for p in line.split('|')]

                # Skip header and separator lines
                # Table format: | Task | Version | Metric | Value | | Stderr |
                if len(parts) == 8:
                    _, task, _, metric, value, _, _, _  = parts

                    # Is the task name correct
                    if task and task in [t.replace("|", ":") for t in globals.TASKS.split(",")]:
                        # Try to extract numeric value
                        # Remove any extra characters and convert to float
                        score = float(value)
                        scores.append(score)
                        print(f"Extracted score {score} for task '{task}' metric '{metric}'")

        # Calculate average of all task scores
        if scores:
            average_score = sum(scores) / len(scores)
            print(f"Calculated average score: {average_score:.4f} from {len(scores)} tasks")
            return average_score
        else:
            print("No scores found in job logs")

        return None

    except Exception as e:
        print(f"Error extracting score for job {job_id}: {e}")
        import traceback
        traceback.print_exc()
        return None


def run_single_job(model: str, provider: str, tasks: str = globals.TASKS, run_number: int = 1) -> Optional[str]:
    """Run a single job for a model-provider combination.

    Args:
        model: Model ID
        provider: Provider name
        tasks: Tasks to run
        run_number: Which run this is (1-4 for multiple runs)
    """

    if not model or not provider:
        print("Missing model or provider")
        return -1

    # Check if this specific run number is already running for this model-provider
    key = globals.get_model_provider_key(model, provider)
    if key in globals.job_results:
        runs = globals.job_results[key].get("runs", [])
        # Check if this specific run number is already running
        for run in runs:
            if run.get("run_number") == run_number and run.get("status") == "RUNNING":
                print(f"Run {run_number} for {model} on {provider} is already running. Please wait for it to complete.")
                return -1

    print(f"Starting job for model={model}, provider={provider}, run {run_number}/{globals.NUM_RUNS_PER_JOB}")

    job = run_job(
        image="hf.co/spaces/OpenEvals/EvalsOnTheHub",
        command=[
            "lighteval", "endpoint", "inference-providers",
            f"model_name={model},provider={provider}",
            tasks,
            "--push-to-hub", "--save-details",
            "--results-org", "IPTesting",
        ],
        namespace=globals.NAMESPACE,
        secrets={"HF_TOKEN": os.getenv("HF_TOKEN")},
        token=os.getenv("HF_TOKEN")
    )

    job_id = job.id
    start_time = datetime.now()

    with globals.results_lock:
        # Initialize or update the job result
        if key not in globals.job_results:
            # First run - initialize the structure
            previous_score = None
            globals.job_results[key] = {
                "model": model,
                "provider": provider,
                "last_run": start_time.strftime("%Y-%m-%d %H:%M:%S"),
                "status": "RUNNING",
                "current_score": None,
                "previous_score": None,
                "job_id": job_id,
                "start_time": start_time.isoformat(),
                "duration": None,
                "completed_at": None,
                "runs": []
            }
        else:
            # Subsequent run or relaunch
            previous_score = globals.job_results[key].get("current_score")
            globals.job_results[key]["status"] = "RUNNING"
            globals.job_results[key]["last_run"] = start_time.strftime("%Y-%m-%d %H:%M:%S")
            globals.job_results[key]["start_time"] = start_time.isoformat()
            globals.job_results[key]["previous_score"] = previous_score

        # Add this run to the runs list
        globals.job_results[key]["runs"].append({
            "run_number": run_number,
            "job_id": job_id,
            "status": "RUNNING",
            "score": None,
            "start_time": start_time.isoformat(),
            "duration": None,
            "completed_at": None
        })

    # Don't save immediately - let the periodic save handle it
    print(f"Job launched: ID={job_id}, model={model}, provider={provider}, run {run_number}")
    return job_id


def run_multiple_jobs(model: str, provider: str, tasks: str = globals.TASKS, num_runs: int = globals.NUM_RUNS_PER_JOB) -> list:
    """Run multiple jobs for a model-provider combination to reduce variance.

    Returns:
        List of job IDs launched
    """
    job_ids = []
    for run_number in range(1, num_runs + 1):
        job_id = run_single_job(model, provider, tasks, run_number=run_number)
        if job_id != -1:
            job_ids.append(job_id)

    return job_ids

# Todo: factorize both following functions
def launch_jobs(tasks: str = globals.TASKS, config_file: str = globals.LOCAL_CONFIG_FILE):
    """Launch jobs for all models and providers with multiple runs per combination."""
    models_providers = load_models_providers(config_file)

    if not models_providers:
        print("No valid model-provider combinations found in config file")
        return "No valid model-provider combinations found"

    print(f"Found {len(models_providers)} model-provider combinations")
    print(f"Will launch {globals.NUM_RUNS_PER_JOB} runs per combination")

    launched_count = 0
    for model, provider in models_providers:
        job_ids = run_multiple_jobs(model, provider, tasks)
        if job_ids:
            launched_count += len(job_ids)

    # Save all results once after launching all jobs
    save_results()
    total_expected = len(models_providers) * globals.NUM_RUNS_PER_JOB
    print(f"Launched {launched_count}/{total_expected} jobs successfully")
    return f"Launched {launched_count}/{total_expected} jobs ({len(models_providers)} model-provider combinations × {globals.NUM_RUNS_PER_JOB} runs each)"

def relaunch_failed_jobs():
    """Relaunch only failed model-provider combinations from job results."""
    if not globals.job_results:
        return "No existing jobs to relaunch"

    failed_jobs = [(key, info) for key, info in globals.job_results.items()
                    if info.get("status") in ["ERROR", "FAILED"]]

    if not failed_jobs:
        return "No failed jobs to relaunch"

    relaunched_count = 0
    for key, info in failed_jobs:
        model = info["model"]
        provider = info["provider"]
        job_id = run_single_job(model, provider, globals.TASKS)
        if job_id != -1:
            relaunched_count += 1

    # Save all results once after relaunching all failed jobs
    save_results()
    return f"Relaunched {relaunched_count}/{len(failed_jobs)} failed jobs"



def update_job_statuses() -> None:
    """Check and update the status of active jobs and aggregate scores from multiple runs."""
    try:
        keys = list(globals.job_results.keys())

        for key in keys:
            try:
                with globals.results_lock:
                    runs = globals.job_results[key].get("runs", [])

                    if not runs:
                        # Legacy format - no runs list
                        continue

                    # Check status of each run
                    all_completed = True
                    all_failed = True
                    any_running = False

                    for run in runs:
                        if run["status"] == "RUNNING":
                            # Check if this run's job is still running
                            try:
                                job_info = inspect_job(job_id=run["job_id"], namespace=globals.NAMESPACE)
                                new_status = job_info.status.stage

                                if run["status"] != new_status:
                                    run["status"] = new_status
                                    print(f"Run {run['run_number']} job {run['job_id']} status changed: {run['status']} -> {new_status}")

                                    if new_status == "COMPLETED":
                                        completed_time = datetime.now()
                                        run["completed_at"] = completed_time.strftime("%Y-%m-%d %H:%M:%S")

                                        # Calculate duration
                                        if run.get("start_time"):
                                            start_time = datetime.fromisoformat(run["start_time"])
                                            run["duration"] = (completed_time - start_time).total_seconds()

                                        # Extract score
                                        score = extract_score_from_job(run["job_id"])
                                        if score is not None:
                                            run["score"] = score
                                            print(f"Run {run['run_number']}: extracted score {score:.4f}")

                            except Exception as e:
                                print(f"Error checking run {run['run_number']}: {e}")

                        # Update aggregate status flags
                        if run["status"] == "RUNNING":
                            any_running = True
                            all_completed = False
                            all_failed = False
                        elif run["status"] == "COMPLETED":
                            all_failed = False
                        elif run["status"] in ["ERROR", "FAILED"]:
                            all_completed = False

                    # Update overall status
                    if any_running:
                        globals.job_results[key]["status"] = "RUNNING"
                    elif all_completed:
                        globals.job_results[key]["status"] = "COMPLETED"

                        # Calculate aggregate statistics from completed runs
                        completed_scores = [run["score"] for run in runs if run["status"] == "COMPLETED" and run["score"] is not None]
                        completed_durations = [run["duration"] for run in runs if run["status"] == "COMPLETED" and run.get("duration") is not None]

                        if completed_scores:
                            import statistics
                            mean_score = statistics.mean(completed_scores)
                            variance = statistics.variance(completed_scores) if len(completed_scores) > 1 else 0.0

                            globals.job_results[key]["current_score"] = mean_score
                            globals.job_results[key]["score_variance"] = variance

                            print(f"Aggregated {len(completed_scores)} runs: mean={mean_score:.4f}, variance={variance:.6f}")

                            # Calculate average duration
                            if completed_durations:
                                mean_duration = statistics.mean(completed_durations)
                                globals.job_results[key]["duration"] = mean_duration
                                print(f"Average duration: {mean_duration:.2f} seconds")

                            # Update completion time to latest run
                            latest_completion = max([run["completed_at"] for run in runs if run.get("completed_at")], default=None)
                            if latest_completion:
                                globals.job_results[key]["completed_at"] = latest_completion

                    elif all_failed:
                        globals.job_results[key]["status"] = "ERROR"

            except Exception as e:
                print(f"Error checking job: {str(e)}")
                import traceback
                traceback.print_exc()

        save_results()

    except Exception as e:
        print(f"Error in update_job_statuses: {str(e)}")
        import traceback
        traceback.print_exc()