""" Main Dash application for Chronos 2 Time Series Forecasting """ import base64 import io import logging from pathlib import Path from dash import Dash, html, dcc, Input, Output, State, callback_context import dash_bootstrap_components as dbc import pandas as pd # Import components from components.upload import ( create_upload_component, create_column_selector, create_sample_data_loader, format_upload_status, create_data_preview_table, create_quality_report ) from components.chart import ( create_forecast_chart, create_empty_chart, create_metrics_display, create_backtest_metrics_display, decimate_data ) from components.controls import ( create_forecast_controls, create_model_status_bar, create_results_section, create_app_header, create_footer ) # Import services from services.model_service import model_service from services.data_processor import data_processor from services.cache_manager import cache_manager # Import utilities from utils.validators import ( validate_file_upload, validate_column_selection, validate_forecast_parameters ) from utils.metrics import calculate_metrics # Import configuration from config.settings import CONFIG, APP_METADATA, LOG_LEVEL, LOG_FORMAT, LOG_FILE, setup_directories from config.constants import MAX_CHART_POINTS # Setup logging with both file and console handlers def setup_logging(): """Configure logging to write to both file and console""" # Create logs directory first Path(LOG_FILE).parent.mkdir(parents=True, exist_ok=True) # Get root logger root_logger = logging.getLogger() root_logger.setLevel(LOG_LEVEL) # Remove any existing handlers root_logger.handlers = [] # Create formatters formatter = logging.Formatter(LOG_FORMAT) # File handler - writes all logs to file file_handler = logging.FileHandler(LOG_FILE, mode='a', encoding='utf-8') file_handler.setLevel(LOG_LEVEL) file_handler.setFormatter(formatter) root_logger.addHandler(file_handler) # Console handler - writes to stderr console_handler = logging.StreamHandler() console_handler.setLevel(LOG_LEVEL) console_handler.setFormatter(formatter) root_logger.addHandler(console_handler) logger = logging.getLogger(__name__) logger.info(f"Logging configured - writing to {LOG_FILE}") return logger logger = setup_logging() # Initialize Dash app app = Dash( __name__, external_stylesheets=[ dbc.themes.BOOTSTRAP, 'https://use.fontawesome.com/releases/v5.15.4/css/all.css' ], suppress_callback_exceptions=True, title=APP_METADATA['title'] ) # App layout app.layout = dbc.Container([ # Header create_app_header(), # Model status html.Div(id='model-status-bar'), # Stores for data dcc.Store(id='uploaded-data-store'), dcc.Store(id='processed-data-store'), dcc.Store(id='forecast-results-store'), # Sample data loader create_sample_data_loader(), # Upload section create_upload_component(), # Column selector (hidden initially) create_column_selector(), # Forecast controls create_forecast_controls(), # Results section (hidden initially) create_results_section(), # Footer create_footer() ], fluid=True, className="py-4") # Callback: Load model on startup @app.callback( Output('model-status-bar', 'children'), Input('model-status-bar', 'id') ) def load_model_on_startup(_): """Load the model when the app starts""" logger.info("=" * 80) logger.info("CALLBACK: load_model_on_startup - ENTRY") logger.info("=" * 80) try: logger.info("Attempting to load Chronos-2 model...") result = model_service.load_model() logger.info(f"Model loading result: {result}") if result['status'] == 'success': logger.info("✓ Model loaded successfully - returning 'ready' status bar") status_bar = create_model_status_bar('ready') logger.info(f"Status bar created: {type(status_bar)}") return status_bar else: logger.error(f"✗ Model loading failed: {result.get('error')}") return create_model_status_bar('error') except Exception as e: logger.error(f"✗ EXCEPTION in load_model_on_startup: {str(e)}", exc_info=True) return create_model_status_bar('error') finally: logger.info("CALLBACK: load_model_on_startup - EXIT") logger.info("=" * 80) # Callback: Handle file upload @app.callback( [Output('uploaded-data-store', 'data'), Output('upload-status', 'children'), Output('column-selector-card', 'style'), Output('date-column-dropdown', 'options'), Output('target-column-dropdown', 'options'), Output('id-column-dropdown', 'options'), Output('covariate-columns-dropdown', 'options')], Input('upload-data', 'contents'), State('upload-data', 'filename') ) def handle_file_upload(contents, filename): """Handle file upload and extract column information""" logger.info("=" * 80) logger.info("CALLBACK: handle_file_upload - ENTRY") logger.info(f"Filename: {filename}") logger.info(f"Contents received: {contents is not None}") logger.info("=" * 80) if contents is None: logger.warning("No contents provided - returning empty response") return None, '', {'display': 'none'}, [], [], [], [] try: # Parse uploaded file content_type, content_string = contents.split(',') decoded = base64.b64decode(content_string) # Server-side validation validation = validate_file_upload(filename, len(decoded)) if not validation['valid']: error_msg = ' '.join(validation['issues']) logger.warning(f"File upload validation failed: {error_msg}") return None, format_upload_status('error', error_msg, True), {'display': 'none'}, [], [], [], [] # Additional security: Sanitize filename import re safe_filename = re.sub(r'[^\w\-\.]', '_', filename) if safe_filename != filename: logger.info(f"Sanitized filename from '{filename}' to '{safe_filename}'") # Load file logger.info(f"Loading file with data_processor: {len(decoded)} bytes") result = data_processor.load_file(decoded, filename) logger.info(f"Load result status: {result['status']}") if result['status'] == 'error': logger.error(f"✗ File loading error: {result['error']}") return None, format_upload_status('error', result['error'], True), {'display': 'none'}, [], [], [], [] # Get column information logger.info("Getting column information from data_processor") col_info = data_processor.get_column_info() logger.info(f"Column info: date_cols={col_info['date_columns']}, numeric_cols={col_info['numeric_columns'][:5]}...") # Create dropdown options date_options = [{'label': col, 'value': col} for col in col_info['date_columns']] target_options = [{'label': col, 'value': col} for col in col_info['numeric_columns']] id_options = [{'label': col, 'value': col} for col in col_info['all_columns']] # Covariates can be any numeric column covariate_options = [{'label': col, 'value': col} for col in col_info['numeric_columns']] logger.info(f"Created dropdown options: {len(date_options)} date, {len(target_options)} target, {len(id_options)} id, {len(covariate_options)} covariate") success_msg = f"Successfully loaded {filename} ({len(result['data'])} rows, {len(result['data'].columns)} columns)" logger.info(f"✓ {success_msg}") logger.info("CALLBACK: handle_file_upload - EXIT (success)") logger.info("=" * 80) return ( result['metadata'], format_upload_status('success', success_msg), {'display': 'block'}, date_options, target_options, id_options, covariate_options ) except Exception as e: logger.error(f"✗ EXCEPTION in handle_file_upload: {str(e)}", exc_info=True) logger.info("CALLBACK: handle_file_upload - EXIT (exception)") logger.info("=" * 80) return None, format_upload_status('error', f"Error: {str(e)}", True), {'display': 'none'}, [], [], [], [] # Callback: Load sample data @app.callback( [Output('uploaded-data-store', 'data', allow_duplicate=True), Output('upload-status', 'children', allow_duplicate=True), Output('column-selector-card', 'style', allow_duplicate=True), Output('date-column-dropdown', 'options', allow_duplicate=True), Output('target-column-dropdown', 'options', allow_duplicate=True), Output('id-column-dropdown', 'options', allow_duplicate=True), Output('covariate-columns-dropdown', 'options', allow_duplicate=True)], [Input('load-weather', 'n_clicks'), Input('load-airquality', 'n_clicks'), Input('load-bitcoin', 'n_clicks'), Input('load-stock', 'n_clicks'), Input('load-traffic', 'n_clicks'), Input('load-electricity', 'n_clicks')], prevent_initial_call=True ) def load_sample_data(weather_clicks, airquality_clicks, bitcoin_clicks, stock_clicks, traffic_clicks, electricity_clicks): """Load sample datasets""" ctx = callback_context if not ctx.triggered: return None, '', {'display': 'none'}, [], [], [], [] button_id = ctx.triggered[0]['prop_id'].split('.')[0] # Map button to filename sample_files = { 'load-weather': 'weather_stations.csv', 'load-airquality': 'air_quality_uci.csv', 'load-bitcoin': 'bitcoin_price.csv', 'load-stock': 'stock_sp500.csv', 'load-traffic': 'traffic_speeds.csv', 'load-electricity': 'electricity_consumption.csv' } filename = sample_files.get(button_id) if not filename: return None, '', {'display': 'none'}, [], [], [], [] try: # Load sample file filepath = f"{CONFIG['datasets_folder']}/{filename}" with open(filepath, 'rb') as f: contents = f.read() result = data_processor.load_file(contents, filename) if result['status'] == 'error': return None, format_upload_status('error', result['error'], True), {'display': 'none'}, [], [], [], [] # Get column information col_info = data_processor.get_column_info() date_options = [{'label': col, 'value': col} for col in col_info['date_columns']] target_options = [{'label': col, 'value': col} for col in col_info['numeric_columns']] id_options = [{'label': col, 'value': col} for col in col_info['all_columns']] covariate_options = [{'label': col, 'value': col} for col in col_info['numeric_columns']] success_msg = f"Loaded sample dataset: {filename}" return ( result['metadata'], format_upload_status('success', success_msg), {'display': 'block'}, date_options, target_options, id_options, covariate_options ) except Exception as e: logger.error(f"Error loading sample data: {str(e)}", exc_info=True) error_msg = f"Sample data not found. Please ensure datasets folder exists: {CONFIG['datasets_folder']}" return None, format_upload_status('warning', error_msg), {'display': 'none'}, [], [], [], [] # Callback: Handle forecasting mode changes @app.callback( [Output('covariate-section', 'style'), Output('target-help-text', 'children')], Input('forecasting-mode', 'value') ) def update_forecasting_mode(mode): """Update UI based on selected forecasting mode""" if mode == 'univariate': return ( {'display': 'none'}, 'Select ONE target variable (multi-select available, but use only one for univariate)' ) elif mode == 'multivariate': return ( {'display': 'none'}, 'Select MULTIPLE target variables to forecast together' ) else: # covariate-informed return ( {'display': 'block'}, 'Select target variable(s) to forecast (can select multiple)' ) # Callback: Handle backtest enable/disable @app.callback( Output('backtest-controls', 'style'), Input('backtest-enable', 'value') ) def toggle_backtest_controls(backtest_enabled): """Show/hide backtest controls based on checkbox""" if 'enabled' in backtest_enabled: return {'display': 'block'} return {'display': 'none'} # Callback: Update data preview and quality report @app.callback( [Output('data-preview-container', 'children'), Output('data-quality-report', 'children'), Output('processed-data-store', 'data'), Output('generate-forecast-btn', 'disabled')], [Input('date-column-dropdown', 'value'), Input('target-column-dropdown', 'value'), Input('forecasting-mode', 'value'), Input('covariate-columns-dropdown', 'value')], State('id-column-dropdown', 'value') ) def update_preview_and_process(date_col, target_col, mode, covariate_cols, id_col): """Update data preview and process data when columns are selected""" logger.info("=" * 80) logger.info("CALLBACK: update_preview_and_process - ENTRY") logger.info(f"date_col: {date_col}") logger.info(f"target_col: {target_col}") logger.info(f"mode: {mode}") logger.info(f"covariate_cols: {covariate_cols}") logger.info(f"id_col: {id_col}") logger.info("=" * 80) if not date_col or not target_col: logger.warning(f"Missing required columns - date_col: {date_col}, target_col: {target_col}") return '', '', None, True try: # Ensure target_col is a list for consistency if not isinstance(target_col, list): target_col = [target_col] if target_col else [] # Ensure covariate_cols is a list if covariate_cols and not isinstance(covariate_cols, list): covariate_cols = [covariate_cols] # Validate column selection # For multivariate, validate each target column for t_col in target_col: validation = validate_column_selection(data_processor.data, date_col, t_col) if not validation['valid']: error_msg = ' '.join(validation['issues']) return format_upload_status('error', error_msg, True), '', None, True # Show preview preview = create_data_preview_table(data_processor.data) # Process data - pass target columns based on mode # For univariate: single target, for multivariate: list of targets if mode == 'univariate': target_to_process = target_col[0] # Single target string else: target_to_process = target_col # List of targets for multivariate result = data_processor.preprocess( date_column=date_col, target_column=target_to_process, id_column=id_col, forecast_horizon=30 ) if result['status'] == 'error': return preview, format_upload_status('error', result['error'], True), None, True # Show quality report quality_report = create_quality_report(result['quality_report']) # Store processed data with forecasting mode and columns processed_data = { 'data': result['data'].to_json(date_format='iso'), 'quality_report': result['quality_report'], 'forecasting_mode': mode, 'target_columns': target_col, 'covariate_columns': covariate_cols if covariate_cols else [], 'date_column': date_col, 'id_column': id_col } return preview, quality_report, processed_data, False except Exception as e: logger.error(f"Error in preview/process: {str(e)}", exc_info=True) return '', format_upload_status('error', f"Error: {str(e)}", True), None, True # Callback: Generate forecast @app.callback( [Output('forecast-chart', 'figure'), Output('metrics-display', 'children'), Output('results-card', 'style'), Output('loading-output', 'children')], Input('generate-forecast-btn', 'n_clicks'), [State('processed-data-store', 'data'), State('horizon-slider', 'value'), State('confidence-checklist', 'value'), State('backtest-enable', 'value'), State('backtest-size-slider', 'value')], prevent_initial_call=True ) def generate_forecast(n_clicks, processed_data, horizon, confidence_levels, backtest_enabled, backtest_size): """Generate forecast using the Chronos model, optionally with backtesting""" logger.info("=" * 80) logger.info("CALLBACK: generate_forecast - ENTRY") logger.info(f"n_clicks: {n_clicks}") logger.info(f"horizon: {horizon}") logger.info(f"confidence_levels: {confidence_levels}") logger.info(f"processed_data is None: {processed_data is None}") logger.info("=" * 80) if not processed_data or not n_clicks: logger.warning(f"Early return - processed_data exists: {processed_data is not None}, n_clicks: {n_clicks}") return create_empty_chart(), '', {'display': 'none'}, '' try: # Load processed data logger.info("Loading processed data from JSON...") df = pd.read_json(processed_data['data']) logger.info(f"Loaded DataFrame: shape={df.shape}, columns={df.columns.tolist()}") # Get forecasting mode and metadata mode = processed_data.get('forecasting_mode', 'univariate') target_columns = processed_data.get('target_columns', []) covariate_columns = processed_data.get('covariate_columns', []) logger.info(f"Forecasting mode: {mode}") logger.info(f"Target columns: {target_columns}") logger.info(f"Covariate columns: {covariate_columns}") # Validate parameters logger.info("Validating forecast parameters...") validation = validate_forecast_parameters(horizon, confidence_levels, len(df)) logger.info(f"Validation result: {validation}") if not validation['valid']: error_msg = ' '.join(validation['issues']) logger.error(f"✗ Validation failed: {error_msg}") return create_empty_chart(error_msg), '', {'display': 'none'}, '' # Perform backtesting if enabled backtest_df = None backtest_metrics = None if backtest_enabled and 'enabled' in backtest_enabled: logger.info(f"Backtesting enabled with test_size={backtest_size}") backtest_result = model_service.backtest( data=df, test_size=min(backtest_size, len(df) // 3), # Ensure we have enough training data forecast_horizon=horizon, confidence_levels=confidence_levels ) if backtest_result['status'] == 'success': backtest_df = backtest_result['backtest_data'] backtest_metrics = backtest_result['metrics'] logger.info(f"✓ Backtest completed: {backtest_metrics}") else: logger.warning(f"Backtest failed: {backtest_result.get('error', 'Unknown error')}") # Generate forecast logger.info(f"Calling model_service.predict() - horizon={horizon}, confidence={confidence_levels}, mode={mode}") logger.info(f"Model service state: is_loaded={model_service.is_loaded}, variant={model_service.model_variant}") forecast_result = model_service.predict( data=df, horizon=horizon, confidence_levels=confidence_levels ) logger.info(f"Forecast result status: {forecast_result['status']}") if forecast_result['status'] == 'error': logger.error(f"✗ Forecast generation failed: {forecast_result['error']}") return create_empty_chart(f"Forecast failed: {forecast_result['error']}"), '', {'display': 'none'}, '' # Get forecast data forecast_df = forecast_result['forecast'] logger.info(f"Forecast DataFrame shape: {forecast_df.shape}, columns: {forecast_df.columns.tolist()}") # Decimate data if too large logger.info("Decimating data for chart...") historical_decimated = decimate_data(df, MAX_CHART_POINTS // 2) forecast_decimated = decimate_data(forecast_df, MAX_CHART_POINTS // 2) logger.info(f"Decimated - historical: {len(historical_decimated)}, forecast: {len(forecast_decimated)}") # Prepare data for chart (rename Chronos 2 columns to chart format) logger.info("Renaming columns for chart...") historical_for_chart = historical_decimated.rename(columns={ 'timestamp': 'ds', 'target': 'y' }) logger.info(f"Historical chart data columns: {historical_for_chart.columns.tolist()}") # Create chart title and labels based on target columns logger.info("Creating forecast chart...") primary_target = target_columns[0] if target_columns else 'Target' if mode == 'multivariate' and len(target_columns) > 1: chart_title = f"Forecast: {primary_target} (with {', '.join(target_columns[1:])} as covariates)" y_label = primary_target elif covariate_columns: chart_title = f"Forecast: {primary_target} (with covariates)" y_label = primary_target else: chart_title = f"Forecast: {primary_target}" y_label = primary_target fig = create_forecast_chart( historical_data=historical_for_chart, forecast_data=forecast_decimated, confidence_levels=confidence_levels, title=chart_title, y_axis_label=y_label, backtest_data=backtest_df ) logger.info(f"Chart created: {type(fig)}") # Create metrics display metrics = { 'inference_time': forecast_result['inference_time'], 'data_points': len(df), 'horizon': horizon } logger.info(f"Creating metrics display: {metrics}") # Add backtest metrics if available if backtest_metrics: metrics_components = dbc.Row([ dbc.Col(create_metrics_display(metrics, forecast_result['inference_time']), md=6), dbc.Col(create_backtest_metrics_display(backtest_metrics), md=6) ]) else: metrics_components = dbc.Row(create_metrics_display( metrics, forecast_result['inference_time'] )) logger.info("✓ Forecast generation successful - returning chart and metrics") logger.info("CALLBACK: generate_forecast - EXIT (success)") logger.info("=" * 80) return fig, metrics_components, {'display': 'block'}, '' except Exception as e: logger.error(f"✗ EXCEPTION in generate_forecast: {str(e)}", exc_info=True) logger.info("CALLBACK: generate_forecast - EXIT (exception)") logger.info("=" * 80) return create_empty_chart(f"Error: {str(e)}"), '', {'display': 'none'}, '' # Health check endpoint @app.server.route('/health') def health_check(): """Health check endpoint for deployment monitoring""" status = { 'status': 'healthy' if model_service.is_loaded else 'degraded', 'model_loaded': model_service.is_loaded, 'model_variant': model_service.model_variant, 'device': model_service.device } return status # Run the app if __name__ == '__main__': # Setup directories setup_directories() logger.info(f"Starting Chronos 2 Forecasting App") logger.info(f"Configuration: {CONFIG}") # Get host and port from environment variables (for HuggingFace Spaces, Render, etc.) import os host = os.getenv('HOST', '127.0.0.1') port = int(os.getenv('PORT', '7860')) # 7860 is HuggingFace Spaces default debug = os.getenv('DEBUG', 'True').lower() == 'true' # Run the app app.run_server( host=host, port=port, debug=debug )