""" Cache manager for storing predictions and uploaded data """ import logging from typing import Dict, Optional from datetime import datetime, timedelta import pandas as pd from config.constants import MAX_PREDICTION_HISTORY logger = logging.getLogger(__name__) class CacheManager: """ Manages caching of predictions and data to improve performance """ def __init__(self): self.predictions = [] # List of prediction results self.uploaded_data = {} # Dict of uploaded datasets self.max_predictions = MAX_PREDICTION_HISTORY def store_prediction( self, data_hash: str, horizon: int, confidence_levels: list, result: Dict ): """ Store a prediction result Args: data_hash: Hash of the input data horizon: Forecast horizon used confidence_levels: Confidence levels used result: Prediction result dictionary """ prediction_entry = { 'data_hash': data_hash, 'horizon': horizon, 'confidence_levels': confidence_levels, 'result': result, 'timestamp': datetime.now() } self.predictions.append(prediction_entry) # Keep only the most recent predictions if len(self.predictions) > self.max_predictions: self.predictions = self.predictions[-self.max_predictions:] logger.debug(f"Stored prediction, cache size: {len(self.predictions)}") def get_prediction( self, data_hash: str, horizon: int, confidence_levels: list ) -> Optional[Dict]: """ Retrieve a cached prediction if available Args: data_hash: Hash of the input data horizon: Forecast horizon confidence_levels: Confidence levels Returns: Cached prediction result or None """ for entry in reversed(self.predictions): if (entry['data_hash'] == data_hash and entry['horizon'] == horizon and entry['confidence_levels'] == confidence_levels): logger.info("Cache hit for prediction") return entry['result'] logger.debug("Cache miss for prediction") return None def store_data(self, filename: str, data: pd.DataFrame): """ Store uploaded data Args: filename: Name of the uploaded file data: DataFrame containing the data """ self.uploaded_data[filename] = { 'data': data, 'timestamp': datetime.now() } logger.info(f"Stored data for {filename}") def get_data(self, filename: str) -> Optional[pd.DataFrame]: """ Retrieve uploaded data Args: filename: Name of the file Returns: DataFrame or None """ if filename in self.uploaded_data: return self.uploaded_data[filename]['data'] return None def clear_old_data(self, max_age_hours: int = 24): """ Clear data older than specified hours Args: max_age_hours: Maximum age in hours """ cutoff = datetime.now() - timedelta(hours=max_age_hours) # Clear old uploaded data old_files = [ filename for filename, entry in self.uploaded_data.items() if entry['timestamp'] < cutoff ] for filename in old_files: del self.uploaded_data[filename] if old_files: logger.info(f"Cleared {len(old_files)} old data entries") def clear_all(self): """Clear all cached data""" self.predictions.clear() self.uploaded_data.clear() logger.info("Cleared all cache") def get_stats(self) -> Dict: """Get cache statistics""" return { 'num_predictions': len(self.predictions), 'num_datasets': len(self.uploaded_data), 'total_memory_mb': self._estimate_memory() } def _estimate_memory(self) -> float: """Estimate memory usage in MB (rough estimate)""" try: total_bytes = 0 # Estimate prediction cache size for entry in self.predictions: if 'forecast' in entry['result']: total_bytes += entry['result']['forecast'].memory_usage(deep=True).sum() # Estimate data cache size for entry in self.uploaded_data.values(): total_bytes += entry['data'].memory_usage(deep=True).sum() return total_bytes / (1024 * 1024) except Exception as e: logger.warning(f"Failed to estimate memory: {str(e)}") return 0.0 # Global cache instance cache_manager = CacheManager()