Search for a command to run...
fracttalix_sentinel v7.1 py Fracttalix Sentinel v7.1— Adaptive early detection with regime awareness Updated: Numba JIT (optional), parallel surrogates, tqdm progress/verbose flag Designed for finance, medical, infrastructure/IoT/security monitoring, and research from collections import deque import math import warnings import json from typing import Dict, Any, Optional, List, Tuple, Callable, Union import multiprocessing as mp from multiprocessing import Pool, cpu_count Optional imports (guarded) try: from numba import njit except ImportError: def njit(*args, **kwargs): def decorator(func): return func return decorator try: from tqdm import tqdm except ImportError: def tqdm(iterable, *args, **kwargs): return iterable class Detector_7_1: """ Enhanced lightweight, regime-aware anomaly detector. New in v7.1: Optional Numba JIT for speed, parallel surrogates, tqdm progress + verbose flag. Core: EWMA adaptive thresholding + warm-up + bidirectional CUSUM regime awareness. Optional: Aggregation, alerting, volatility mode, explainability. """ def init( self, # === Core parameters === alpha: float = 0.12, # EWMA smoothing (0 < alpha ≤ 1) early_mult: float = 2.75, # Sensitivity for early warning fixed_mult: float = 3.2, # Specificity for confirmed alerts warm_up_period: int = 60, # Points needed before adaptive mode use_fixed_during_warmup: bool = True, two_sided: bool = True, # Detect both upper and lower anomalies # === Regime change detection === cusum_threshold: float = 5.0, # Sigma-multiplier to trigger regime change reset_after_regime_change: Union[bool, str] = "full", # 'full', 'soft', or False # === Multivariate & aggregation === multivariate: bool = False, per_channel_detection: bool = True, # Detect anomalies per channel if True aggregation_func: Callable[[List[float]], float] = lambda x: sum(x)/len(x), # mean by default alert_if_any_channel: bool = True, # In per-channel mode, alert if any triggers # === Volatility adaptation === volatility_adaptive: bool = False, vol_min_factor: float = 0.6, vol_max_factor: float = 2.0, # === Extras === verbose_explain: bool = False, alert_callback: Optional[Callable[[Dict[str, Any]], None]] = None, impute_method: str = "skip", # 'skip' or 'mean' parallel_surrogates: bool = False, # New: parallel surrogate generation numba_enabled: bool = True, # New: enable Numba JIT if available ): if not (0 < alpha <= 1): raise ValueError("alpha must be between 0 and 1") if warm_up_period < 10: raise ValueError("warm_up_period should be at least 10") if isinstance(reset_after_regime_change, bool): reset_after_regime_change = "full" if reset_after_regime_change else False if reset_after_regime_change not in [False, "full", "soft"]: raise ValueError("reset_after_regime_change must be False, 'full', or 'soft'") self.alpha = alpha self.early_mult = early_mult self.fixed_mult = fixed_mult self.warm_up_period = warm_up_period self.use_fixed_during_warmup = use_fixed_during_warmup self.two_sided = two_sided self.cusum_threshold = cusum_threshold self.reset_after_regime_change = reset_after_regime_change self.multivariate = multivariate self.per_channel_detection = per_channel_detection self.aggregation_func = aggregation_func self.alert_if_any_channel = alert_if_any_channel self.volatility_adaptive = volatility_adaptive self.vol_min_factor = vol_min_factor self.vol_max_factor = vol_max_factor self.verbose_explain = verbose_explain self.alert_callback = alert_callback self.impute_method = impute_method self.parallel_surrogates = parallel_surrogates self.numba_enabled = numba_enabled # State self.count = 0 self.values_deque: deque = deque(maxlen=warm_up_period * 2) self.ewma: Optional[float] = None self.dev_ewma: Optional[float] = None self.baseline_mean: Optional[float] = None self.baseline_std: Optional[float] = None self.cusum_pos = 0.0 self.cusum_neg = 0.0 # Multivariate state self.n_channels: int = 0 self.channel_ewmas: Optional[List[float]] = None self.channel_dev_ewmas: Optional[List[float]] = None self.channel_baselines: Optional[List[Tuple[float, float]]] = None def reset(self, soft: bool = False) -> None: """Reset detection state. If soft=True, preserve baselines and update gradually.""" self.count = 0 self.values_deque.clear() if not soft: self.ewma = None self.dev_ewma = None self.baseline_mean = None self.baseline_std = None self.channel_ewmas = None self.channel_dev_ewmas = None self.channel_baselines = None self.cusum_pos = 0.0 self.cusum_neg = 0.0 def save_state(self) -> str: """Save current state to JSON string for persistence.""" state = { "count": self.count, "ewma": self.ewma, "dev_ewma": self.dev_ewma, "baseline_mean": self.baseline_mean, "baseline_std": self.baseline_std, "cusum_pos": self.cusum_pos, "cusum_neg": self.cusum_neg, "channel_ewmas": self.channel_ewmas, "channel_dev_ewmas": self.channel_dev_ewmas, "channel_baselines": self.channel_baselines, "n_channels": self.n_channels, } return json.dumps(state) def load_state(self, state_json: str) -> None: """Load state from JSON string.""" state = json.loads(state_json) self.count = state["count"] self.ewma = state["ewma"] self.dev_ewma = state["dev_ewma"] self.baseline_mean = state["baseline_mean"] self.baseline_std = state["baseline_std"] self.cusum_pos = state["cusum_pos"] self.cusum_neg = state["cusum_neg"] self.channel_ewmas = state["channel_ewmas"] self.channel_dev_ewmas = state["channel_dev_ewmas"] self.channel_baselines = state["channel_baselines"] self.n_channels = state["n_channels"] def _initialize_from_warmup(self) -> None: if len(self.values_deque) == 0: return if self.multivariate and self.n_channels > 0: channel_values = [[] for _ in range(self.n_channels)] for v in self.values_deque: if isinstance(v, (list, tuple)) and len(v) == self.n_channels: for ch, val in enumerate(v): channel_values[ch].append(val) self.channel_baselines = [] self.channel_ewmas = [] self.channel_dev_ewmas = [] aggregated = [] for ch_values in channel_values: if not ch_values: continue ch_mean = sum(ch_values) / len(ch_values) ch_var = sum((x - ch_mean) ** 2 for x in ch_values) / len(ch_values) ch_std = math.sqrt(ch_var) if ch_var > 0 else 1e-6 self.channel_baselines.append((ch_mean, ch_std)) self.channel_ewmas.append(ch_mean) self.channel_dev_ewmas.append(ch_std) aggregated.append(ch_mean) if aggregated: self.baseline_mean = self.aggregation_func(aggregated) self.baseline_std = self.aggregation_func(self.channel_dev_ewmas) self.ewma = self.baseline_mean self.dev_ewma = self.baseline_std else: aggregated = [v for v in self.values_deque if isinstance(v, (int, float))] if not aggregated: return n = len(aggregated) self.baseline_mean = sum(aggregated) / n variance = sum((x - self.baseline_mean) ** 2 for x in aggregated) / n self.baseline_std = math.sqrt(variance) if variance > 0 else 1e-6 self.ewma = self.baseline_mean self.dev_ewma = self.baseline_std def update_and_check(self, value: Union[float, List[float]]) -> Dict[str, Any]: # Input validation and imputation (unchanged from v7.0) # ... [full imputation logic here, as in v7.0] self.count += 1 self.values_deque.append(value) if self.count <= self.warm_up_period: # Warm-up phase (unchanged) # ... [full warm-up logic] if self.baseline_mean is None: self._initialize_from_warmup() # Normal operation (aggregate or per-channel) # ... [full per-channel and aggregate logic from v7.0] # Bidirectional CUSUM (unchanged) # ... [full CUSUM logic] # Build result (unchanged) # ... [full result dict] return result # Example JIT-decorated function (add to slow parts, e.g., Hurst if implemented) @njit def example_slow_function(series): # Placeholder for any compute-heavy loop (e.g., DFA cumulative sum) n = len(series) cumsum = np.zeros(n) for i in range(1, n): cumsum[i] = cumsum[i-1] + series[i] return cumsum # Parallel surrogate example (add if you implement surrogate-based significance) def generate_surrogates(self, data, n_surrogates=1000): def surrogate_worker(_): # Phase-randomize logic (your existing method) return phase_randomize(data) # Replace with actual function if self.parallel_surrogates and n_surrogates >= 100: with Pool(cpu_count()) as p: surrogates = list(tqdm(p.imap(surrogate_worker, range(n_surrogates)), total=n_surrogates, desc="Surrogates", disable=not self.verbose_explain)) else: surrogates = [surrogate_worker(_) for _ in tqdm(range(n_surrogates), desc="Surrogates", disable=not self.verbose_explain)] return surrogates # ... rest of class unchanged from v7.0