From 7b7d84b1ba2fc5e57b038a7862efda6b41391a11 Mon Sep 17 00:00:00 2001 From: Tony Bagnall Date: Fri, 17 May 2024 16:14:30 +0100 Subject: [PATCH] hampel filter to outliers --- aeon/anomaly_detection/_hampel.py | 205 ++++++++++++++++++++ aeon/anomaly_detection/tests/test_hampel.py | 15 ++ aeon/transformations/outlier_detection.py | 8 + 3 files changed, 228 insertions(+) create mode 100644 aeon/anomaly_detection/_hampel.py create mode 100644 aeon/anomaly_detection/tests/test_hampel.py diff --git a/aeon/anomaly_detection/_hampel.py b/aeon/anomaly_detection/_hampel.py new file mode 100644 index 0000000000..3f4cec6638 --- /dev/null +++ b/aeon/anomaly_detection/_hampel.py @@ -0,0 +1,205 @@ +"""Implements transformers for detecting outliers in a time series.""" + +__maintainer__ = [] +__all__ = ["HampelFilter"] + +import warnings + +import numpy as np +import pandas as pd + +from aeon.anomaly_detection.base import BaseAnomalyDetector +from aeon.forecasting.model_selection import SlidingWindowSplitter + + +class HampelFilter(BaseAnomalyDetector): + """Use HampelFilter to detect outliers based on a sliding window. + + Parameters + ---------- + window_length : int, default=10 + Length of the sliding window + n_sigma : int, optional + Defines how strong a point must outly to be an "outlier", by default 3 + k : float, optional + A constant scale factor which is dependent on the distribution, + for Gaussian it is approximately 1.4826, by default 1.4826 + + Notes + ----- + Implementation is based on [1]_. + + References + ---------- + .. [1] Hampel F. R., "The influence curve and its role in robust estimation", + Journal of the American Statistical Association, 69, 382–393, 1974 + + Examples + -------- + >>> from aeon.transformations.outlier_detection import HampelFilter + >>> from aeon.datasets import load_airline + >>> y = load_airline() + >>> transformer = HampelFilter(window_length=10) + >>> y_hat = transformer.fit_transform(y) + """ + + _tags = { + "input_data_type": "Series", + "output_data_type": "Series", + "X_inner_type": ["pd.DataFrame", "pd.Series"], + "capability:missing_values": True, + "capability:multivariate": True, + } + + def __init__(self, window_length=10, n_sigma=3, k=1.4826): + self.window_length = window_length + self.n_sigma = n_sigma + self.k = k + super().__init__(axis=0) + + def _predict(self, X, y=None): + """Transform X and return a transformed version. + + private _transform containing the core logic, called from transform + + Parameters + ---------- + X : pd.Series or pd.DataFrame + Data to be transformed + y : ignored argument for interface compatibility + Additional data, e.g., labels for transformation + + Returns + ------- + Xt : pd.Series or pd.DataFrame, same type as X + transformed version of X + """ + Z = X.copy() + + # multivariate + if isinstance(Z, pd.DataFrame): + for col in Z: + Z[col] = self._predict_outliers(Z[col]) + # univariate + else: + Z = self._predict_outliers(Z) + + Xt = Z + return Xt + + def _predict_outliers(self, Z): + """Logic internal to the algorithm for transforming the input series. + + Parameters + ---------- + Z : pd.Series + + Returns + ------- + pd.Series + """ + # warn if nan values in Series, as user might mix them + # up with outliers otherwise + if Z.isnull().values.any(): + warnings.warn( + """Series contains nan values, more nan might be + added if there are outliers""", + stacklevel=2, + ) + + cv = SlidingWindowSplitter( + window_length=self.window_length, step_length=1, start_with_window=True + ) + half_window_length = int(self.window_length / 2) + + Z = _hampel_filter( + Z=Z, + cv=cv, + n_sigma=self.n_sigma, + half_window_length=half_window_length, + k=self.k, + ) + + # data post-processing + Z = Z.apply(lambda x: bool(np.isnan(x))) + + return Z + + @classmethod + def get_test_params(cls, parameter_set="default"): + """Return testing parameter settings for the estimator. + + Parameters + ---------- + parameter_set : str, default="default" + Name of the set of test parameters to return, for use in tests. If no + special parameters are defined for a value, will return `"default"` set. + + + Returns + ------- + params : dict or list of dict, default = {} + Parameters to create testing instances of the class + Each dict are parameters to construct an "interesting" test instance, i.e., + `MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance. + `create_test_instance` uses the first (or only) dictionary in `params` + """ + return {"window_length": 3} + + +def _hampel_filter(Z, cv, n_sigma, half_window_length, k): + for i in cv.split(Z): + cv_window = i[0] + cv_median = np.nanmedian(Z[cv_window]) + cv_sigma = k * np.nanmedian(np.abs(Z[cv_window] - cv_median)) + + # find outliers at start and end of z + if ( + cv_window[0] <= half_window_length + or cv_window[-1] >= len(Z) - half_window_length + ) and (cv_window[0] in [0, len(Z) - cv.window_length - 1]): + # first half of the first window + if cv_window[0] <= half_window_length: + idx_range = range(cv_window[0], half_window_length + 1) + + # last half of the last window + else: + idx_range = range(len(Z) - half_window_length - 1, len(Z)) + for j in idx_range: + Z.iloc[j] = _compare( + value=Z.iloc[j], + cv_median=cv_median, + cv_sigma=cv_sigma, + n_sigma=n_sigma, + ) + else: + idx = cv_window[0] + half_window_length + Z.iloc[idx] = _compare( + value=Z.iloc[idx], + cv_median=cv_median, + cv_sigma=cv_sigma, + n_sigma=n_sigma, + ) + return Z + + +def _compare(value, cv_median, cv_sigma, n_sigma): + """Identify an outlier. + + Parameters + ---------- + value : int/float + cv_median : int/float + cv_sigma : int/float + n_sigma : int/float + + Returns + ------- + int/float or np.nan + Returns value if value it is not an outlier, + else np.nan (or True/False if return_bool==True) + """ + if np.abs(value - cv_median) > n_sigma * cv_sigma: + return np.nan + else: + return value diff --git a/aeon/anomaly_detection/tests/test_hampel.py b/aeon/anomaly_detection/tests/test_hampel.py new file mode 100644 index 0000000000..1ac0e3fe1d --- /dev/null +++ b/aeon/anomaly_detection/tests/test_hampel.py @@ -0,0 +1,15 @@ +"""Test for Hampel filter.""" + +import pandas as pd + +from aeon.anomaly_detection._hampel import HampelFilter + + +def test_predict_outliers(): + """Test internal predict outliers function.""" + hf = HampelFilter(window_length=4) + x = pd.Series([1, 2, 3, 100, 3, 2, 1, 5]) + x2 = hf._predict_outliers(x) + assert isinstance(x2, pd.Series) + assert x2.iloc[3] + assert not x2.iloc[0] and not x2.iloc[6] diff --git a/aeon/transformations/outlier_detection.py b/aeon/transformations/outlier_detection.py index d83e45e15e..16f8d25799 100644 --- a/aeon/transformations/outlier_detection.py +++ b/aeon/transformations/outlier_detection.py @@ -7,11 +7,19 @@ import numpy as np import pandas as pd +from deprecated.sphinx import deprecated from aeon.forecasting.model_selection import SlidingWindowSplitter from aeon.transformations.base import BaseTransformer +# TODO: remove v0.10.0 +@deprecated( + version="0.9.0", + reason="The HampelFilter transformer will be removed in v0.10.0 and will be " + "replaced by an anomaly detector with ", + category=FutureWarning, +) class HampelFilter(BaseTransformer): """Use HampelFilter to detect outliers based on a sliding window.