Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DEP] Move HampelFilter to outliers #1538

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions aeon/anomaly_detection/_hampel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
"""Implements transformers for detecting outliers in a time series."""

__maintainer__ = []
__all__ = ["HampelFilter"]

import warnings

import numpy as np
import pandas as pd

from aeon.anomaly_detection.base import BaseAnomalyDetector
from aeon.forecasting.model_selection import SlidingWindowSplitter


class HampelFilter(BaseAnomalyDetector):
"""Use HampelFilter to detect outliers based on a sliding window.

Parameters
----------
window_length : int, default=10
Length of the sliding window
n_sigma : int, optional
Defines how strong a point must outly to be an "outlier", by default 3
k : float, optional
A constant scale factor which is dependent on the distribution,
for Gaussian it is approximately 1.4826, by default 1.4826

Notes
-----
Implementation is based on [1]_.

References
----------
.. [1] Hampel F. R., "The influence curve and its role in robust estimation",
Journal of the American Statistical Association, 69, 382–393, 1974

Examples
--------
>>> from aeon.transformations.outlier_detection import HampelFilter
>>> from aeon.datasets import load_airline
>>> y = load_airline()
>>> transformer = HampelFilter(window_length=10)
>>> y_hat = transformer.fit_transform(y)
"""

_tags = {
"input_data_type": "Series",
"output_data_type": "Series",
"X_inner_type": ["pd.DataFrame", "pd.Series"],
"capability:missing_values": True,
"capability:multivariate": True,
}
Comment on lines +46 to +52
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are outdated now.


def __init__(self, window_length=10, n_sigma=3, k=1.4826):
self.window_length = window_length
self.n_sigma = n_sigma
self.k = k
super().__init__(axis=0)

def _predict(self, X, y=None):
"""Transform X and return a transformed version.

private _transform containing the core logic, called from transform

Parameters
----------
X : pd.Series or pd.DataFrame
Data to be transformed
y : ignored argument for interface compatibility
Additional data, e.g., labels for transformation

Returns
-------
Xt : pd.Series or pd.DataFrame, same type as X
transformed version of X
Comment on lines +74 to +75
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, a lot of the documentation does not seem to fit with the move.

"""
Z = X.copy()

# multivariate
if isinstance(Z, pd.DataFrame):
for col in Z:
Z[col] = self._predict_outliers(Z[col])
Comment on lines +79 to +82
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think testing may not be working as intended if this is passing

# univariate
else:
Z = self._predict_outliers(Z)

Xt = Z
return Xt

def _predict_outliers(self, Z):
"""Logic internal to the algorithm for transforming the input series.

Parameters
----------
Z : pd.Series

Returns
-------
pd.Series
"""
# warn if nan values in Series, as user might mix them
# up with outliers otherwise
if Z.isnull().values.any():
warnings.warn(
"""Series contains nan values, more nan might be
added if there are outliers""",
stacklevel=2,
)

cv = SlidingWindowSplitter(
window_length=self.window_length, step_length=1, start_with_window=True
)
half_window_length = int(self.window_length / 2)

Z = _hampel_filter(
Z=Z,
cv=cv,
n_sigma=self.n_sigma,
half_window_length=half_window_length,
k=self.k,
)

# data post-processing
Z = Z.apply(lambda x: bool(np.isnan(x)))

return Z

@classmethod
def get_test_params(cls, parameter_set="default"):
"""Return testing parameter settings for the estimator.

Parameters
----------
parameter_set : str, default="default"
Name of the set of test parameters to return, for use in tests. If no
special parameters are defined for a value, will return `"default"` set.


Returns
-------
params : dict or list of dict, default = {}
Parameters to create testing instances of the class
Each dict are parameters to construct an "interesting" test instance, i.e.,
`MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance.
`create_test_instance` uses the first (or only) dictionary in `params`
"""
return {"window_length": 3}


def _hampel_filter(Z, cv, n_sigma, half_window_length, k):
for i in cv.split(Z):
cv_window = i[0]
cv_median = np.nanmedian(Z[cv_window])
cv_sigma = k * np.nanmedian(np.abs(Z[cv_window] - cv_median))

# find outliers at start and end of z
if (
cv_window[0] <= half_window_length
or cv_window[-1] >= len(Z) - half_window_length
) and (cv_window[0] in [0, len(Z) - cv.window_length - 1]):
# first half of the first window
if cv_window[0] <= half_window_length:
idx_range = range(cv_window[0], half_window_length + 1)

# last half of the last window
else:
idx_range = range(len(Z) - half_window_length - 1, len(Z))
for j in idx_range:
Z.iloc[j] = _compare(
value=Z.iloc[j],
cv_median=cv_median,
cv_sigma=cv_sigma,
n_sigma=n_sigma,
)
else:
idx = cv_window[0] + half_window_length
Z.iloc[idx] = _compare(
value=Z.iloc[idx],
cv_median=cv_median,
cv_sigma=cv_sigma,
n_sigma=n_sigma,
)
return Z


def _compare(value, cv_median, cv_sigma, n_sigma):
"""Identify an outlier.

Parameters
----------
value : int/float
cv_median : int/float
cv_sigma : int/float
n_sigma : int/float

Returns
-------
int/float or np.nan
Returns value if value it is not an outlier,
else np.nan (or True/False if return_bool==True)
"""
if np.abs(value - cv_median) > n_sigma * cv_sigma:
return np.nan
else:
return value
15 changes: 15 additions & 0 deletions aeon/anomaly_detection/tests/test_hampel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Test for Hampel filter."""

import pandas as pd

from aeon.anomaly_detection._hampel import HampelFilter


def test_predict_outliers():
"""Test internal predict outliers function."""
hf = HampelFilter(window_length=4)
x = pd.Series([1, 2, 3, 100, 3, 2, 1, 5])
x2 = hf._predict_outliers(x)
assert isinstance(x2, pd.Series)
assert x2.iloc[3]
assert not x2.iloc[0] and not x2.iloc[6]
8 changes: 8 additions & 0 deletions aeon/transformations/outlier_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,19 @@

import numpy as np
import pandas as pd
from deprecated.sphinx import deprecated

from aeon.forecasting.model_selection import SlidingWindowSplitter
from aeon.transformations.base import BaseTransformer


# TODO: remove v0.10.0
@deprecated(
version="0.9.0",
reason="The HampelFilter transformer will be removed in v0.10.0 and will be "
"replaced by an anomaly detector with ",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with what?

category=FutureWarning,
)
class HampelFilter(BaseTransformer):
"""Use HampelFilter to detect outliers based on a sliding window.

Expand Down