diff --git a/aeon/classification/shapelet_based/__init__.py b/aeon/classification/shapelet_based/__init__.py index 7e27bc3d79..f8c45a242a 100644 --- a/aeon/classification/shapelet_based/__init__.py +++ b/aeon/classification/shapelet_based/__init__.py @@ -5,11 +5,13 @@ "ShapeletTransformClassifier", "RDSTClassifier", "SASTClassifier", + "RSASTClassifier", "LearningShapeletClassifier", ] from aeon.classification.shapelet_based._ls import LearningShapeletClassifier from aeon.classification.shapelet_based._mrsqm import MrSQMClassifier from aeon.classification.shapelet_based._rdst import RDSTClassifier +from aeon.classification.shapelet_based._rsast_classifier import RSASTClassifier from aeon.classification.shapelet_based._sast_classifier import SASTClassifier from aeon.classification.shapelet_based._stc import ShapeletTransformClassifier diff --git a/aeon/classification/shapelet_based/_rsast_classifier.py b/aeon/classification/shapelet_based/_rsast_classifier.py new file mode 100644 index 0000000000..5b591a48fe --- /dev/null +++ b/aeon/classification/shapelet_based/_rsast_classifier.py @@ -0,0 +1,158 @@ +"""Random Scalable and Accurate Subsequence Transform (RSAST). + +Pipeline classifier using the RSAST transformer and an sklearn classifier. +""" + +__maintainer__ = ["nirojasva"] +__all__ = ["RSASTClassifier"] + +import numpy as np +from sklearn.linear_model import RidgeClassifierCV +from sklearn.pipeline import make_pipeline + +from aeon.base._base import _clone_estimator +from aeon.classification import BaseClassifier +from aeon.transformations.collection.shapelet_based import RSAST + + +class RSASTClassifier(BaseClassifier): + """RSASTClassifier. + + Classification pipeline using + Random Scalable and Accurate Subsequence Transform (RSAST) [1]_ transformer + and an sklearn classifier. + + Parameters + ---------- + n_random_points: int default = 10 the number of initial random points to extract + len_method: string default="both" the type of statistical tool used to get the + length of shapelets. "both"=ACF&PACF, "ACF"=ACF, "PACF"=PACF, + "None"=Extract randomly any length from the TS + nb_inst_per_class : int default = 10 + the number of reference time series to select per class + seed : int, default = None + the seed of the random generator + classifier : sklearn compatible classifier, default = None + if None, a RidgeClassifierCV(alphas=np.logspace(-3, 3, 10)) is used. + n_jobs : int, default -1 + Number of threads to use for the transform. + + + Reference + --------- + .. [1] Varela, N. R., Mbouopda, M. F., & Nguifo, E. M. (2023). RSAST: Sampling + Shapelets for Time Series Classification. + https://hal.science/hal-04311309/ + + Examples + -------- + >>> from aeon.classification.shapelet_based import RSASTClassifier + >>> from aeon.datasets import load_unit_test + >>> X_train, y_train = load_unit_test(split="train") + >>> X_test, y_test = load_unit_test(split="test") + >>> clf = RSASTClassifier() # doctest: +SKIP + >>> clf.fit(X_train, y_train) # doctest: +SKIP + RSASTClassifier(...) + >>> y_pred = clf.predict(X_test) # doctest: +SKIP + """ + + _tags = { + "capability:multithreading": True, + "capability:multivariate": False, + "algorithm_type": "shapelet", + "python_dependencies": "statsmodels", + } + + def __init__( + self, + n_random_points=10, + len_method="both", + nb_inst_per_class=10, + seed=None, + classifier=None, + n_jobs=-1, + ): + super().__init__() + self.n_random_points = n_random_points + self.len_method = len_method + self.nb_inst_per_class = nb_inst_per_class + self.n_jobs = n_jobs + self.seed = seed + self.classifier = classifier + + def _fit(self, X, y): + """Fit RSASTClassifier to the training data. + + Parameters + ---------- + X: np.ndarray shape (n_cases, n_channels, n_timepoints) + The training input samples. + y: array-like or list + The class values for X. + + Return + ------ + self : RSASTClassifier + This pipeline classifier + + """ + self._transformer = RSAST( + self.n_random_points, + self.len_method, + self.nb_inst_per_class, + self.seed, + self.n_jobs, + ) + + self._classifier = _clone_estimator( + ( + RidgeClassifierCV(alphas=np.logspace(-3, 3, 10)) + if self.classifier is None + else self.classifier + ), + self.seed, + ) + + self._pipeline = make_pipeline(self._transformer, self._classifier) + + self._pipeline.fit(X, y) + + return self + + def _predict(self, X): + """Predict labels for the input. + + Parameters + ---------- + X: np.ndarray shape (n_cases, n_channels, n_timepoints) + The training input samples. + + Return + ------ + array-like or list + Predicted class labels. + """ + return self._pipeline.predict(X) + + def _predict_proba(self, X): + """Predict labels probabilities for the input. + + Parameters + ---------- + X: np.ndarray shape (n_cases, n_channels, n_timepoints) + The training input samples. + + Return + ------ + dists : np.ndarray shape (n_cases, n_timepoints) + Predicted class probabilities. + """ + m = getattr(self._classifier, "predict_proba", None) + if callable(m): + dists = self._pipeline.predict_proba(X) + else: + dists = np.zeros((X.shape[0], self.n_classes_)) + preds = self._pipeline.predict(X) + for i in range(0, X.shape[0]): + dists[i, np.where(self.classes_ == preds[i])] = 1 + return dists diff --git a/aeon/transformations/collection/shapelet_based/__init__.py b/aeon/transformations/collection/shapelet_based/__init__.py index 11ba7222a0..23990ad520 100644 --- a/aeon/transformations/collection/shapelet_based/__init__.py +++ b/aeon/transformations/collection/shapelet_based/__init__.py @@ -1,10 +1,11 @@ """Shapelet based transformers.""" -__all__ = ["RandomShapeletTransform", "RandomDilatedShapeletTransform", "SAST"] +__all__ = ["RandomShapeletTransform", "RandomDilatedShapeletTransform", "SAST", "RSAST"] from aeon.transformations.collection.shapelet_based._dilated_shapelet_transform import ( RandomDilatedShapeletTransform, ) +from aeon.transformations.collection.shapelet_based._rsast import RSAST from aeon.transformations.collection.shapelet_based._sast import SAST from aeon.transformations.collection.shapelet_based._shapelet_transform import ( RandomShapeletTransform, diff --git a/aeon/transformations/collection/shapelet_based/_rsast.py b/aeon/transformations/collection/shapelet_based/_rsast.py new file mode 100644 index 0000000000..e0e7f8abd2 --- /dev/null +++ b/aeon/transformations/collection/shapelet_based/_rsast.py @@ -0,0 +1,332 @@ +import numpy as np +import pandas as pd +from numba import get_num_threads, njit, prange, set_num_threads + +from aeon.transformations.collection import BaseCollectionTransformer +from aeon.utils.numba.general import z_normalise_series +from aeon.utils.validation import check_n_jobs + + +@njit(fastmath=False) +def _apply_kernel(ts, arr): + d_best = np.inf # sdist + m = ts.shape[0] + kernel = arr[~np.isnan(arr)] # ignore nan + + kernel_len = kernel.shape[0] + for i in range(m - kernel_len + 1): + d = np.sum((z_normalise_series(ts[i : i + kernel_len]) - kernel) ** 2) + if d < d_best: + d_best = d + return d_best + + +@njit(parallel=True, fastmath=True) +def _apply_kernels(X, kernels): + nbk = len(kernels) + out = np.zeros((X.shape[0], nbk), dtype=np.float32) + for i in prange(nbk): + k = kernels[i] + for t in range(X.shape[0]): + ts = X[t] + out[t][i] = _apply_kernel(ts, k) + return out + + +class RSAST(BaseCollectionTransformer): + """Random Scalable and Accurate Subsequence Transform (RSAST). + + RSAST [1] is based on SAST, it uses a stratified sampling strategy + for subsequences selection but additionally takes into account certain + statistical criteria such as ANOVA, ACF, and PACF to further reduce + the search space of shapelets. + + RSAST starts with the pre-computation of a list of weights, using ANOVA, + which helps in the selection of initial points for subsequences. Then + randomly select k time series per class, which are used with an ACF and PACF, + obtaining a set of highly correlated lagged values. These values are used as + potential lengths for the shapelets. Lastly, with a pre-defined number of + admissible starting points to sample, the shapelets are extracted and used to + transform the original dataset, replacing each time series by the vector of its + distance to each subsequence. + + Parameters + ---------- + n_random_points: int default = 10 the number of initial random points to extract + len_method: string default="both" the type of statistical tool used to get + the length of shapelets. "both"=ACF&PACF, "ACF"=ACF, "PACF"=PACF, + "None"=Extract randomly any length from the TS + + nb_inst_per_class : int default = 10 + the number of reference time series to select per class + seed : int, default = None + the seed of the random generator + classifier : sklearn compatible classifier, default = None + if None, a RidgeClassifierCV(alphas=np.logspace(-3, 3, 10)) is used. + n_jobs : int, default -1 + Number of threads to use for the transform. + + Reference + --------- + .. [1] Varela, N. R., Mbouopda, M. F., & Nguifo, E. M. (2023). + RSAST: Sampling Shapelets for Time Series Classification. + https://hal.science/hal-04311309/ + + + Examples + -------- + >>> from aeon.transformations.collection.shapelet_based import RSAST + >>> from aeon.datasets import load_unit_test + >>> X_train, y_train = load_unit_test(split="train") + >>> X_test, y_test = load_unit_test(split="test") + >>> rsast = RSAST() # doctest: +SKIP + >>> rsast.fit(X_train, y_train) # doctest: +SKIP + RSAST() + >>> X_train = rsast.transform(X_train) # doctest: +SKIP + >>> X_test = rsast.transform(X_test) # doctest: +SKIP + + """ + + _tags = { + "output_data_type": "Tabular", + "capability:multivariate": False, + "algorithm_type": "shapelet", + "python_dependencies": "statsmodels", + } + + def __init__( + self, + n_random_points=10, + len_method="both", + nb_inst_per_class=10, + seed=None, + n_jobs=-1, + ): + self.n_random_points = n_random_points + self.len_method = len_method + self.nb_inst_per_class = nb_inst_per_class + self.n_jobs = n_jobs + self.seed = seed + self._kernels = None # z-normalized subsequences + self._cand_length_list = {} + self._kernel_orig = [] + self._kernels_generators = {} # Reference time series + super().__init__() + + def _fit(self, X, y): + + from scipy.stats import ConstantInputWarning, DegenerateDataWarning, f_oneway + from statsmodels.tsa.stattools import acf, pacf + + """Select reference time series and generate subsequences from them. + + Parameters + ---------- + X: np.ndarray shape (n_cases, n_channels, n_timepoints) + The training input samples. + y: array-like or list + The class values for X. + + Return + ------ + self : RSAST + This transformer + + """ + + # 0- initialize variables and convert values in "y" to string + X_ = np.reshape(X, (X.shape[0], X.shape[-1])) + + self._random_state = ( + np.random.RandomState(self.seed) + if not isinstance(self.seed, np.random.RandomState) + else self.seed + ) + + classes = np.unique(y) + self._num_classes = classes.shape[0] + + y = np.asarray([str(x_s) for x_s in y]) + + n = [] + classes = np.unique(y) + self.num_classes = classes.shape[0] + m_kernel = 0 + + # 1--calculate ANOVA per each time t throught the lenght of the TS + for i in range(X_.shape[1]): + statistic_per_class = {} + for c in classes: + assert ( + len(X_[np.where(y == c)[0]][:, i]) > 0 + ), "Time t without values in TS" + statistic_per_class[c] = X_[np.where(y == c)[0]][:, i] + + statistic_per_class = pd.Series(statistic_per_class) + # Calculate t-statistic and p-value + try: + t_statistic, p_value = f_oneway(*statistic_per_class) + except (DegenerateDataWarning, ConstantInputWarning): + p_value = np.nan + + # Interpretation of the results + # if p_value < 0.05: " The means of the populations are + # significantly different." + if np.isnan(p_value): + n.append(0) + else: + n.append(1 - p_value) + + # 2--calculate PACF and ACF for each TS chossen in each class + + for i, c in enumerate(classes): + + X_c = X_[y == c] + + cnt = np.min([self.nb_inst_per_class, X_c.shape[0]]).astype(int) + + choosen = self._random_state.permutation(X_c.shape[0])[:cnt] + + self._kernels_generators[c] = [] + + for rep, idx in enumerate(choosen): + # defining indices for length list + idx_len_list = c + "," + str(idx) + "," + str(rep) + + self._cand_length_list[idx_len_list] = [] + + non_zero_acf = [] + if self.len_method == "both" or self.len_method == "ACF": + # 2.1 -- Compute Autorrelation per object + acf_val, acf_confint = acf( + X_c[idx], nlags=len(X_c[idx]) - 1, alpha=0.05 + ) + + for j in range(len(acf_confint)): + if 3 <= j and ( + 0 < acf_confint[j][0] <= acf_confint[j][1] + or acf_confint[j][0] <= acf_confint[j][1] < 0 + ): + non_zero_acf.append(j) + self._cand_length_list[idx_len_list].append(j) + + non_zero_pacf = [] + if self.len_method == "both" or self.len_method == "PACF": + # 2.2 Compute Partial Autorrelation per object + pacf_val, pacf_confint = pacf( + X_c[idx], + method="ols", + nlags=(len(X_c[idx]) // 2) - 1, + alpha=0.05, + ) + + for j in range(len(pacf_confint)): + if 3 <= j and ( + 0 < pacf_confint[j][0] <= pacf_confint[j][1] + or pacf_confint[j][0] <= pacf_confint[j][1] < 0 + ): + non_zero_pacf.append(j) + self._cand_length_list[idx_len_list].append(j) + + if self.len_method == "all": + self._cand_length_list[idx_len_list].extend( + np.arange(3, 1 + len(X_c[idx])) + ) + + # 2.3-- Save the maximum autocorralated lag value as shapelet lenght + if len(self._cand_length_list[idx_len_list]) == 0: + # chose a random lenght using the lenght of the time series + # (added 1 since the range start in 0) + rand_value = self._random_state.choice(len(X_c[idx]), 1)[0] + 1 + self._cand_length_list[idx_len_list].extend([max(3, rand_value)]) + + self._cand_length_list[idx_len_list] = list( + set(self._cand_length_list[idx_len_list]) + ) + + for max_shp_length in self._cand_length_list[idx_len_list]: + # 2.4-- Choose randomly n_random_points point for a TS + # 2.5-- calculate the weights of probabilities for a random point + # in a TS + if sum(n) == 0: + # Determine equal weights of a random point point in TS is + # there are no significant points + weights = [1 / len(n) for i in range(len(n))] + weights = weights[ + : len(X_c[idx]) - max_shp_length + 1 + ] / np.sum(weights[: len(X_c[idx]) - max_shp_length + 1]) + else: + # Determine the weights of a random point point in TS + # (excluding points after n-l+1) + weights = n / np.sum(n) + weights = weights[ + : len(X_c[idx]) - max_shp_length + 1 + ] / np.sum(weights[: len(X_c[idx]) - max_shp_length + 1]) + + if self.n_random_points > len(X_c[idx]) - max_shp_length + 1: + # set a upper limit for the posible of number of random + # points when selecting without replacement + limit_rpoint = len(X_c[idx]) - max_shp_length + 1 + rand_point_ts = self._random_state.choice( + len(X_c[idx]) - max_shp_length + 1, + limit_rpoint, + p=weights, + replace=False, + ) + else: + rand_point_ts = self._random_state.choice( + len(X_c[idx]) - max_shp_length + 1, + self.n_random_points, + p=weights, + replace=False, + ) + + for i in rand_point_ts: + # 2.6-- Extract the subsequence with that point + kernel = X_c[idx][i : i + max_shp_length].reshape(1, -1).copy() + + if m_kernel < max_shp_length: + m_kernel = max_shp_length + + self._kernel_orig.append(np.squeeze(kernel)) + self._kernels_generators[c].extend(X_c[idx].reshape(1, -1)) + + # 3--save the calculated subsequences + n_kernels = len(self._kernel_orig) + + self._kernels = np.full( + (n_kernels, m_kernel), dtype=np.float32, fill_value=np.nan + ) + + for k, kernel in enumerate(self._kernel_orig): + self._kernels[k, : len(kernel)] = z_normalise_series(kernel) + + return self + + def _transform(self, X, y=None): + """Transform the input X using the generated subsequences. + + Parameters + ---------- + X: np.ndarray shape (n_cases, n_channels, n_timepoints) + The training input samples. + y: array-like or list + Ignored argument, interface compatibility + + Return + ------ + X_transformed: np.ndarray shape (n_cases, n_timepoints), + The transformed data + """ + X_ = np.reshape(X, (X.shape[0], X.shape[-1])) + + prev_threads = get_num_threads() + + n_jobs = check_n_jobs(self.n_jobs) + + set_num_threads(n_jobs) + + X_transformed = _apply_kernels(X_, self._kernels) # subsequence transform of X + set_num_threads(prev_threads) + + return X_transformed