| | from huggingface_hub import HfFileSystem |
| | import pandas as pd |
| | from utils import logger |
| | from datetime import datetime, timedelta |
| | import threading |
| | import traceback |
| | import json |
| | import re |
| | import random |
| | from typing import List, Tuple, Optional, Dict |
| |
|
| | |
| | fs = HfFileSystem() |
| |
|
| | IMPORTANT_MODELS = [ |
| | "auto", |
| | "bert", |
| | "gpt2", |
| | "t5", |
| | "modernbert", |
| | "vit", |
| | "clip", |
| | "detr", |
| | "table_transformer", |
| | "got_ocr2", |
| | "whisper", |
| | "wav2vec2", |
| | "qwen2_audio", |
| | "speech_t5", |
| | "csm", |
| | "llama", |
| | "gemma3", |
| | "qwen2", |
| | "mistral3", |
| | "qwen2_5_vl", |
| | "llava", |
| | "smolvlm", |
| | "internvl", |
| | "gemma3n", |
| | "qwen2_5_omni", |
| | |
| | "qwen2_5_omni", |
| | ] |
| |
|
| | KEYS_TO_KEEP = [ |
| | "success_amd", |
| | "success_nvidia", |
| | "skipped_amd", |
| | "skipped_nvidia", |
| | "failed_multi_no_amd", |
| | "failed_multi_no_nvidia", |
| | "failed_single_no_amd", |
| | "failed_single_no_nvidia", |
| | "failures_amd", |
| | "failures_nvidia", |
| | "job_link_amd", |
| | "job_link_nvidia", |
| | ] |
| |
|
| | |
| | |
| | |
| |
|
| | def generate_fake_dates(num_days: int = 7) -> List[str]: |
| | """Generate fake dates for the last N days.""" |
| | today = datetime.now() |
| | return [(today - timedelta(days=i)).strftime("%Y-%m-%d") for i in range(num_days)] |
| |
|
| | def parse_json_field(value) -> dict: |
| | """Safely parse a JSON field that might be a string or dict.""" |
| | if value is None or pd.isna(value): |
| | return {} |
| | if isinstance(value, str): |
| | try: |
| | return json.loads(value) |
| | except: |
| | return {} |
| | |
| | if isinstance(value, dict): |
| | return value |
| | |
| | try: |
| | return dict(value) if hasattr(value, '__iter__') else {} |
| | except: |
| | return {} |
| |
|
| | def extract_date_from_path(path: str, pattern: str) -> Optional[str]: |
| | """Extract date from file path using regex pattern.""" |
| | match = re.search(pattern, path) |
| | return match.group(1) if match else None |
| |
|
| | def get_test_names(tests: list) -> set: |
| | """Extract test names from a list of test dictionaries.""" |
| | return {test.get('line', '') for test in tests} |
| |
|
| | def safe_extract(row: pd.Series, key: str) -> int: |
| | """Safely extract an integer value from a DataFrame row.""" |
| | return int(row.get(key, 0)) if pd.notna(row.get(key, 0)) else 0 |
| |
|
| | |
| | |
| | |
| |
|
| | def log_dataframe_link(link: str) -> str: |
| | """ |
| | Adds the link to the dataset in the logs, modifies it to get a clockable link and then returns the date of the |
| | report. |
| | """ |
| | if link.startswith("sample_"): |
| | return "9999-99-99" |
| | logger.info(f"Reading df located at {link}") |
| | |
| | if link.startswith("hf://"): |
| | link = "https://huggingface.co/" + link.removeprefix("hf://") |
| | |
| | pattern = r'transformers_daily_ci(.*?)/(\d{4}-\d{2}-\d{2})' |
| | match = re.search(pattern, link) |
| | |
| | if not match: |
| | logger.error("Could not find transformers_daily_ci and.or date in the link") |
| | return "9999-99-99" |
| | |
| | path_between = match.group(1) |
| | link = link.replace("transformers_daily_ci" + path_between, "transformers_daily_ci/blob/main") |
| | logger.info(f"Link to data source: {link}") |
| | |
| | return match.group(2) |
| |
|
| | def infer_latest_update_msg(date_df_amd: str, date_df_nvidia: str) -> str: |
| | |
| | if date_df_amd.startswith("9999") and date_df_nvidia.startswith("9999"): |
| | return "could not find last update time" |
| | |
| | if date_df_amd != date_df_nvidia: |
| | logger.warning(f"Different dates found: {date_df_amd} (AMD) vs {date_df_nvidia} (NVIDIA)") |
| | |
| | try: |
| | latest_date = max(date_df_amd, date_df_nvidia) |
| | yyyy, mm, dd = latest_date.split("-") |
| | return f"last updated {mm}/{dd}/{yyyy}" |
| | except Exception as e: |
| | logger.error(f"When trying to infer latest date, got error {e}") |
| | return "could not find last update time" |
| |
|
| | def read_one_dataframe(json_path: str, device_label: str) -> tuple[pd.DataFrame, str]: |
| | df_upload_date = log_dataframe_link(json_path) |
| | df = pd.read_json(json_path, orient="index") |
| | df.index.name = "model_name" |
| | df[f"failed_multi_no_{device_label}"] = df["failures"].apply(lambda x: len(x["multi"]) if "multi" in x else 0) |
| | df[f"failed_single_no_{device_label}"] = df["failures"].apply(lambda x: len(x["single"]) if "single" in x else 0) |
| | return df, df_upload_date |
| |
|
| | def get_available_dates() -> List[str]: |
| | """Get list of available dates from both AMD and NVIDIA datasets.""" |
| | try: |
| | |
| | amd_src = "hf://datasets/optimum-amd/transformers_daily_ci/**/runs/**/ci_results_run_models_gpu/model_results.json" |
| | nvidia_src = "hf://datasets/hf-internal-testing/transformers_daily_ci/*/ci_results_run_models_gpu/model_results.json" |
| | |
| | files_amd = sorted(fs.glob(amd_src, refresh=True), reverse=True) |
| | files_nvidia = sorted(fs.glob(nvidia_src, refresh=True), reverse=True) |
| | |
| | logger.info(f"Found {len(files_amd)} AMD files, {len(files_nvidia)} NVIDIA files") |
| | |
| | |
| | amd_pattern = r'transformers_daily_ci/(\d{4}-\d{2}-\d{2})/runs/[^/]+/ci_results_run_models_gpu/model_results\.json' |
| | nvidia_pattern = r'transformers_daily_ci/(\d{4}-\d{2}-\d{2})/ci_results_run_models_gpu/model_results\.json' |
| | |
| | amd_dates = {extract_date_from_path(f, amd_pattern) for f in files_amd} |
| | amd_dates.discard(None) |
| | |
| | nvidia_dates = {extract_date_from_path(f, nvidia_pattern) for f in files_nvidia} |
| | nvidia_dates.discard(None) |
| | |
| | logger.info(f"AMD dates: {sorted(amd_dates, reverse=True)[:5]}...") |
| | logger.info(f"NVIDIA dates: {sorted(nvidia_dates, reverse=True)[:5]}...") |
| | |
| | |
| | common_dates = sorted(amd_dates.intersection(nvidia_dates), reverse=True) |
| | logger.info(f"Common dates: {len(common_dates)} dates where both AMD and NVIDIA have data") |
| | |
| | if common_dates: |
| | return common_dates[:30] |
| | |
| | |
| | |
| | logger.warning("No common dates found between AMD and NVIDIA datasets") |
| | return [] |
| | |
| | except Exception as e: |
| | logger.error(f"Error getting available dates: {e}") |
| | return [] |
| |
|
| |
|
| | def get_data_for_date(target_date: str) -> tuple[pd.DataFrame, str]: |
| | """Get data for a specific date.""" |
| | try: |
| | |
| | |
| | amd_src = f"hf://datasets/optimum-amd/transformers_daily_ci/{target_date}/runs/*/ci_results_run_models_gpu/model_results.json" |
| | amd_files = fs.glob(amd_src, refresh=True) |
| | |
| | if not amd_files: |
| | raise FileNotFoundError(f"No AMD data found for date {target_date}") |
| | |
| | |
| | amd_file = amd_files[0] |
| | |
| | if not amd_file.startswith("hf://"): |
| | amd_file = f"hf://{amd_file}" |
| | |
| | |
| | nvidia_src = f"hf://datasets/hf-internal-testing/transformers_daily_ci/{target_date}/ci_results_run_models_gpu/model_results.json" |
| | |
| | |
| | df_amd = pd.DataFrame() |
| | df_nvidia = pd.DataFrame() |
| | |
| | try: |
| | df_amd, _ = read_one_dataframe(amd_file, "amd") |
| | logger.info(f"Successfully loaded AMD data for {target_date}") |
| | except Exception as e: |
| | logger.warning(f"Failed to load AMD data for {target_date}: {e}") |
| | |
| | try: |
| | df_nvidia, _ = read_one_dataframe(nvidia_src, "nvidia") |
| | logger.info(f"Successfully loaded NVIDIA data for {target_date}") |
| | except Exception as e: |
| | logger.warning(f"Failed to load NVIDIA data for {target_date}: {e}") |
| | |
| | |
| | if df_amd.empty and df_nvidia.empty: |
| | logger.warning(f"No data available for either platform on {target_date}") |
| | return pd.DataFrame(), target_date |
| | |
| | |
| | if not df_amd.empty and not df_nvidia.empty: |
| | joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") |
| | elif not df_amd.empty: |
| | joined = df_amd.copy() |
| | else: |
| | joined = df_nvidia.copy() |
| | |
| | joined = joined[KEYS_TO_KEEP] |
| | joined.index = joined.index.str.replace("^models_", "", regex=True) |
| | |
| | |
| | important_models_lower = [model.lower() for model in IMPORTANT_MODELS] |
| | filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] |
| | |
| | return filtered_joined, target_date |
| | |
| | except Exception as e: |
| | logger.error(f"Error getting data for date {target_date}: {e}") |
| | |
| | return pd.DataFrame(), target_date |
| |
|
| |
|
| | def get_historical_data(start_date: str, end_date: str, sample_data = False) -> pd.DataFrame: |
| | """Get historical data for a date range.""" |
| | if sample_data: |
| | return get_fake_historical_data(start_date, end_date) |
| | |
| | try: |
| | start_dt = datetime.strptime(start_date, "%Y-%m-%d") |
| | end_dt = datetime.strptime(end_date, "%Y-%m-%d") |
| | historical_data = [] |
| | |
| | |
| | current_dt = start_dt |
| | while current_dt <= end_dt: |
| | date_str = current_dt.strftime("%Y-%m-%d") |
| | try: |
| | df, _ = get_data_for_date(date_str) |
| | if not df.empty: |
| | df['date'] = date_str |
| | historical_data.append(df) |
| | logger.info(f"Loaded data for {date_str}") |
| | except Exception as e: |
| | logger.warning(f"Could not load data for {date_str}: {e}") |
| | current_dt += timedelta(days=1) |
| | |
| | return pd.concat(historical_data, ignore_index=False) if historical_data else pd.DataFrame() |
| | |
| | except Exception as e: |
| | logger.error(f"Error getting historical data: {e}") |
| | return get_fake_historical_data(start_date, end_date) |
| |
|
| |
|
| | def get_distant_data() -> tuple[pd.DataFrame, str]: |
| | |
| | amd_src = "hf://datasets/optimum-amd/transformers_daily_ci/**/runs/**/ci_results_run_models_gpu/model_results.json" |
| | files_amd = sorted(fs.glob(amd_src, refresh=True), reverse=True) |
| | df_amd, date_df_amd = read_one_dataframe(f"hf://{files_amd[0]}", "amd") |
| | |
| | |
| | nvidia_src = "hf://datasets/hf-internal-testing/transformers_daily_ci/*/ci_results_run_models_gpu/model_results.json" |
| | files_nvidia = sorted(fs.glob(nvidia_src, refresh=True), reverse=True) |
| | |
| | nvidia_path = files_nvidia[0].lstrip('datasets/hf-internal-testing/transformers_daily_ci/') |
| | nvidia_path = "https://huggingface.co/datasets/hf-internal-testing/transformers_daily_ci/raw/main/" + nvidia_path |
| | df_nvidia, date_df_nvidia = read_one_dataframe(nvidia_path, "nvidia") |
| | |
| | latest_update_msg = infer_latest_update_msg(date_df_amd, date_df_nvidia) |
| | |
| | joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") |
| | joined = joined[KEYS_TO_KEEP] |
| | joined.index = joined.index.str.replace("^models_", "", regex=True) |
| | |
| | important_models_lower = [model.lower() for model in IMPORTANT_MODELS] |
| | filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] |
| | |
| | for model in IMPORTANT_MODELS: |
| | if model not in filtered_joined.index: |
| | print(f"[WARNING] Model {model} was missing from index.") |
| | return filtered_joined, latest_update_msg |
| |
|
| |
|
| | def get_sample_data() -> tuple[pd.DataFrame, str]: |
| | |
| | df_amd, _ = read_one_dataframe("sample_amd.json", "amd") |
| | df_nvidia, _ = read_one_dataframe("sample_nvidia.json", "nvidia") |
| | |
| | joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") |
| | joined = joined[KEYS_TO_KEEP] |
| | joined.index = joined.index.str.replace("^models_", "", regex=True) |
| | |
| | important_models_lower = [model.lower() for model in IMPORTANT_MODELS] |
| | filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] |
| | |
| | filtered_joined.index = "sample_" + filtered_joined.index |
| | return filtered_joined, "sample data was loaded" |
| |
|
| |
|
| | def get_fake_historical_data(start_date: str, end_date: str) -> pd.DataFrame: |
| | """Generate fake historical data for a date range when real data loading fails.""" |
| | try: |
| | start_dt = datetime.strptime(start_date, "%Y-%m-%d") |
| | end_dt = datetime.strptime(end_date, "%Y-%m-%d") |
| | sample_df, _ = get_sample_data() |
| | historical_data = [] |
| | |
| | |
| | current_dt = start_dt |
| | while current_dt <= end_dt: |
| | date_df = sample_df.copy() |
| | date_df['date'] = current_dt.strftime("%Y-%m-%d") |
| | |
| | |
| | for idx in date_df.index: |
| | |
| | for col in ['success_amd', 'success_nvidia', 'skipped_amd', 'skipped_nvidia']: |
| | if col in date_df.columns and pd.notna(date_df.loc[idx, col]): |
| | val = date_df.loc[idx, col] |
| | if val > 0: |
| | date_df.loc[idx, col] = max(0, int(val * random.uniform(0.8, 1.2))) |
| | |
| | |
| | for col in ['failed_multi_no_amd', 'failed_multi_no_nvidia', 'failed_single_no_amd', 'failed_single_no_nvidia']: |
| | if col in date_df.columns and pd.notna(date_df.loc[idx, col]): |
| | val = date_df.loc[idx, col] |
| | date_df.loc[idx, col] = max(0, int(val * random.uniform(0.5, 2.0))) |
| | |
| | historical_data.append(date_df) |
| | current_dt += timedelta(days=1) |
| | |
| | if not historical_data: |
| | return pd.DataFrame() |
| | |
| | combined_df = pd.concat(historical_data, ignore_index=False) |
| | logger.info(f"Generated fake historical data: {len(combined_df)} records from {start_date} to {end_date}") |
| | return combined_df |
| | |
| | except Exception as e: |
| | logger.error(f"Error generating fake historical data: {e}") |
| | return pd.DataFrame() |
| |
|
| | def find_failure_first_seen(historical_df: pd.DataFrame, model_name: str, test_name: str, device: str, gpu_type: str) -> Optional[str]: |
| | """Find the first date when a specific test failure appeared in historical data.""" |
| | if historical_df is None or historical_df.empty: |
| | return None |
| | |
| | try: |
| | model_name_lower = model_name.lower() |
| | |
| | model_data = historical_df[historical_df.index.str.lower() == model_name_lower].copy() |
| | if model_data.empty: |
| | return None |
| | |
| | |
| | if 'date' not in model_data.columns: |
| | return None |
| | |
| | |
| | for _, row in model_data.sort_values('date').iterrows(): |
| | failures_raw = row.get(f'failures_{device}') |
| | if failures_raw is None or pd.isna(failures_raw): |
| | continue |
| | |
| | |
| | failures = parse_json_field(failures_raw) |
| | if not isinstance(failures, dict) or gpu_type not in failures: |
| | continue |
| | |
| | |
| | for test in failures.get(gpu_type, []): |
| | if isinstance(test, dict) and test.get('line', '') == test_name: |
| | date_value = row.get('date') |
| | return date_value if date_value else None |
| | |
| | return None |
| | |
| | except Exception as e: |
| | logger.error(f"Error finding first seen date for {test_name}: {e}") |
| | return None |
| |
|
| |
|
| | def _find_device_regressions(model_name: str, current_failures: dict, yesterday_failures: dict, device: str) -> list[dict]: |
| | """Helper to find regressions for a specific device.""" |
| | regressions = [] |
| | for gpu_type in ['single', 'multi']: |
| | current_tests = get_test_names(current_failures.get(gpu_type, [])) |
| | yesterday_tests = get_test_names(yesterday_failures.get(gpu_type, [])) |
| | |
| | |
| | new_tests = current_tests - yesterday_tests |
| | for test_name in new_tests: |
| | if test_name: |
| | regressions.append({ |
| | 'model': model_name, |
| | 'test': test_name.split('::')[-1], |
| | 'test_full': test_name, |
| | 'device': device, |
| | 'gpu_type': gpu_type |
| | }) |
| | return regressions |
| |
|
| | def find_new_regressions(current_df: pd.DataFrame, historical_df: pd.DataFrame) -> list[dict]: |
| | """Compare current failures against previous day's failures to find new regressions.""" |
| | if current_df.empty or historical_df.empty: |
| | return [] |
| | |
| | |
| | available_dates = sorted(historical_df['date'].unique(), reverse=True) |
| | if not available_dates: |
| | return [] |
| | |
| | yesterday_data = historical_df[historical_df['date'] == available_dates[0]] |
| | new_regressions = [] |
| | |
| | |
| | for model_name in current_df.index: |
| | current_row = current_df.loc[model_name] |
| | yesterday_row = yesterday_data[yesterday_data.index == model_name.lower()] |
| | |
| | |
| | current_amd = parse_json_field(current_row.get('failures_amd', {})) |
| | current_nvidia = parse_json_field(current_row.get('failures_nvidia', {})) |
| | |
| | |
| | yesterday_amd = {} |
| | yesterday_nvidia = {} |
| | if not yesterday_row.empty: |
| | yesterday_row = yesterday_row.iloc[0] |
| | yesterday_amd = parse_json_field(yesterday_row.get('failures_amd', {})) |
| | yesterday_nvidia = parse_json_field(yesterday_row.get('failures_nvidia', {})) |
| | |
| | |
| | new_regressions.extend(_find_device_regressions(model_name, current_amd, yesterday_amd, 'amd')) |
| | new_regressions.extend(_find_device_regressions(model_name, current_nvidia, yesterday_nvidia, 'nvidia')) |
| | |
| | return new_regressions |
| |
|
| |
|
| | def extract_model_data(row: pd.Series) -> tuple[dict[str, int], dict[str, int], int, int, int, int]: |
| | """Extract and process model data from DataFrame row.""" |
| | |
| | counts = {key: safe_extract(row, key) for key in [ |
| | 'success_amd', 'success_nvidia', 'skipped_amd', 'skipped_nvidia', |
| | 'failed_multi_no_amd', 'failed_multi_no_nvidia', |
| | 'failed_single_no_amd', 'failed_single_no_nvidia' |
| | ]} |
| | |
| | |
| | amd_stats = { |
| | 'passed': counts['success_amd'], |
| | 'failed': counts['failed_multi_no_amd'] + counts['failed_single_no_amd'], |
| | 'skipped': counts['skipped_amd'], |
| | 'error': 0 |
| | } |
| | nvidia_stats = { |
| | 'passed': counts['success_nvidia'], |
| | 'failed': counts['failed_multi_no_nvidia'] + counts['failed_single_no_nvidia'], |
| | 'skipped': counts['skipped_nvidia'], |
| | 'error': 0 |
| | } |
| | |
| | return (amd_stats, nvidia_stats, counts['failed_multi_no_amd'], |
| | counts['failed_single_no_amd'], counts['failed_multi_no_nvidia'], |
| | counts['failed_single_no_nvidia']) |
| |
|
| |
|
| |
|
| | class CIResults: |
| |
|
| | def __init__(self): |
| | self.df = pd.DataFrame() |
| | self.available_models = [] |
| | self.latest_update_msg = "" |
| | self.available_dates = [] |
| | self.historical_df = pd.DataFrame() |
| | self.all_historical_data = pd.DataFrame() |
| | self.sample_data = False |
| |
|
| | def load_data(self) -> None: |
| | """Load data from the data source.""" |
| | |
| | try: |
| | logger.info("Loading distant data...") |
| | new_df, latest_update_msg = get_distant_data() |
| | self.latest_update_msg = latest_update_msg |
| | self.sample_data = False |
| | except Exception as e: |
| | error_msg = [ |
| | "Loading data failed:", |
| | "-" * 120, |
| | traceback.format_exc(), |
| | "-" * 120, |
| | "Falling back on sample data." |
| | ] |
| | logger.error("\n".join(error_msg)) |
| | self.sample_data = True |
| | new_df, latest_update_msg = get_sample_data() |
| | self.latest_update_msg = latest_update_msg |
| | |
| | |
| | try: |
| | if not self.sample_data: |
| | self.available_dates = get_available_dates() |
| | logger.info(f"Available dates: {len(self.available_dates)} dates") |
| | if self.available_dates: |
| | logger.info(f"Date range: {self.available_dates[-1]} to {self.available_dates[0]}") |
| | else: |
| | logger.warning("No available dates found") |
| | self.available_dates = [] |
| | else: |
| | |
| | self.available_dates = generate_fake_dates() |
| | except Exception as e: |
| | logger.warning(f"Failed to get available dates: {e}") |
| | if self.sample_data: |
| | self.available_dates = generate_fake_dates() |
| | else: |
| | self.available_dates = [] |
| | |
| | |
| | self.df = new_df |
| | self.available_models = new_df.index.tolist() |
| | |
| | |
| | self.load_all_historical_data() |
| | |
| | |
| | if self.available_dates: |
| | start_date_val = self.available_dates[-1] |
| | end_date_val = self.available_dates[0] |
| | self.load_historical_data(start_date_val, end_date_val) |
| | logger.info(f"Updated historical_df with {len(self.historical_df)} records") |
| | |
| | |
| | logger.info(f"Data loaded successfully: {len(self.available_models)} models") |
| | logger.info(f"Models: {self.available_models[:5]}{'...' if len(self.available_models) > 5 else ''}") |
| | logger.info(f"Latest update message: {self.latest_update_msg}") |
| | |
| | msg = {} |
| | for model in self.available_models[:3]: |
| | msg[model] = {} |
| | for col in self.df.columns: |
| | value = self.df.loc[model, col] |
| | if not isinstance(value, int): |
| | value = str(value) |
| | if len(value) > 10: |
| | value = value[:10] + "..." |
| | msg[model][col] = value |
| | logger.info(json.dumps(msg, indent=4)) |
| |
|
| | def load_all_historical_data(self) -> None: |
| | """Load all available historical data. Replaces existing data to ensure latest dates are included.""" |
| | try: |
| | if not self.available_dates: |
| | logger.warning("No available dates found, skipping historical data load") |
| | self.all_historical_data = pd.DataFrame() |
| | return |
| | |
| | logger.info(f"Loading all historical data for {len(self.available_dates)} dates...") |
| | start_date, end_date = self.available_dates[-1], self.available_dates[0] |
| | logger.info(f"Date range: {start_date} to {end_date}") |
| | self.all_historical_data = get_historical_data(start_date, end_date, self.sample_data) |
| | logger.info(f"All historical data loaded: {len(self.all_historical_data)} records") |
| | if not self.all_historical_data.empty: |
| | unique_dates = sorted(self.all_historical_data['date'].unique()) |
| | logger.info(f"Loaded dates: {unique_dates[0]} to {unique_dates[-1]} ({len(unique_dates)} unique dates)") |
| | except Exception as e: |
| | logger.error(f"Error loading all historical data: {e}") |
| | self.all_historical_data = pd.DataFrame() |
| |
|
| | def load_historical_data(self, start_date: str, end_date: str) -> None: |
| | """Load historical data for a date range from pre-loaded data.""" |
| | try: |
| | logger.info(f"Filtering historical data from {start_date} to {end_date}") |
| | |
| | if self.all_historical_data.empty: |
| | logger.warning("No pre-loaded historical data available") |
| | self.historical_df = pd.DataFrame() |
| | return |
| | |
| | |
| | start_dt = datetime.strptime(start_date, "%Y-%m-%d") |
| | end_dt = datetime.strptime(end_date, "%Y-%m-%d") |
| | |
| | filtered_data = [ |
| | self.all_historical_data[self.all_historical_data['date'] == date_str] |
| | for date_str in self.all_historical_data['date'].unique() |
| | if start_dt <= datetime.strptime(date_str, "%Y-%m-%d") <= end_dt |
| | ] |
| | |
| | if filtered_data: |
| | self.historical_df = pd.concat(filtered_data, ignore_index=False) |
| | logger.info(f"Historical data filtered: {len(self.historical_df)} records for {start_date} to {end_date}") |
| | else: |
| | self.historical_df = pd.DataFrame() |
| | logger.warning(f"No historical data found for date range {start_date} to {end_date}") |
| | |
| | except Exception as e: |
| | logger.error(f"Error filtering historical data: {e}") |
| | self.historical_df = pd.DataFrame() |
| |
|
| | def schedule_data_reload(self): |
| | """Schedule the next data reload.""" |
| | def reload_data(): |
| | self.load_data() |
| | |
| | timer = threading.Timer(900.0, reload_data) |
| | timer.daemon = True |
| | timer.start() |
| | logger.info("Next data reload scheduled in 15 minutes") |
| |
|
| | |
| | timer = threading.Timer(900.0, reload_data) |
| | timer.daemon = True |
| | timer.start() |
| | logger.info("Data auto-reload scheduled every 15 minutes") |
| |
|
| |
|