diff --git a/aeon/classification/distance_based/_time_series_neighbors.py b/aeon/classification/distance_based/_time_series_neighbors.py index f89b1be636..6792ea0423 100644 --- a/aeon/classification/distance_based/_time_series_neighbors.py +++ b/aeon/classification/distance_based/_time_series_neighbors.py @@ -5,6 +5,7 @@ distances in aeon.distances. """ +import numbers from typing import Optional __maintainer__ = [] @@ -15,7 +16,8 @@ import numpy as np from aeon.classification.base import BaseClassifier -from aeon.distances import get_distance_function +from aeon.distances import pairwise_distance +from aeon.utils._threading import threaded WEIGHTS_SUPPORTED = ["uniform", "distance"] @@ -46,11 +48,10 @@ class KNeighborsTimeSeriesClassifier(BaseClassifier): n_timepoints)`` as input and returns a float. distance_params : dict, default = None Dictionary for metric parameters for the case that distance is a str. - n_jobs : int, default = None - The number of parallel jobs to run for neighbors search. - ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. - ``-1`` means using all processors. - for more details. Parameter for compatibility purposes, still unimplemented. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Examples -------- @@ -111,7 +112,6 @@ def _fit(self, X, y): y : array-like, shape = (n_cases) The class labels. """ - self.metric_ = get_distance_function(method=self.distance) self.X_ = X self.classes_, self.y_ = np.unique(y, return_inverse=True) return self @@ -136,7 +136,7 @@ def _predict_proba(self, X): """ preds = np.zeros((len(X), len(self.classes_))) for i in range(len(X)): - idx, weights = self._kneighbors(X[i]) + idx, weights = self.kneighbors(X[i]) for id, w in zip(idx, weights): predicted_class = self.y_[id] preds[i, predicted_class] += w @@ -163,61 +163,80 @@ def _predict(self, X): """ self._check_is_fitted() - preds = np.empty(len(X), dtype=self.classes_.dtype) - for i in range(len(X)): - scores = np.zeros(len(self.classes_)) - idx, weights = self._kneighbors(X[i]) - for id, w in zip(idx, weights): - predicted_class = self.y_[id] - scores[predicted_class] += w - - preds[i] = self.classes_[np.argmax(scores)] + indexes = self.kneighbors(X, return_distance=False)[:, 0] + return self.classes_[self.y_[indexes]] - return preds - - def _kneighbors(self, X): - """ - Find the K-neighbors of a point. + @threaded + def kneighbors(self, X=None, n_neighbors=None, return_distance=True): + """Find the K-neighbors of a point. - Returns indices and weights of each point. + Returns indices of and distances to the neighbors of each point. Parameters ---------- - X : np.ndarray - A single time series instance if shape = (n_channels, n_timepoints) + X : 3D np.ndarray of shape = (n_cases, n_channels, n_timepoints) or list of + shape [n_cases] of 2D arrays shape (n_channels,n_timepoints_i) + The query point or points. + If not provided, neighbors of each indexed point are returned. + In this case, the query point is not considered its own neighbor. + n_neighbors : int, default=None + Number of neighbors required for each sample. The default is the value + passed to the constructor. + return_distance : bool, default=True + Whether or not to return the distances. Returns ------- - ind : array + neigh_dist : ndarray of shape (n_queries, n_neighbors) + Array representing the distances to points, only present if + return_distance=True. + neigh_ind : ndarray of shape (n_queries, n_neighbors) Indices of the nearest points in the population matrix. - ws : array - Array representing the weights of each neighbor. """ - distances = np.array( - [ - self.metric_(X, self.X_[j], **self._distance_params) - for j in range(len(self.X_)) - ] - ) + self._check_is_fitted() + if n_neighbors is None: + n_neighbors = self.n_neighbors + elif n_neighbors <= 0: + raise ValueError(f"Expected n_neighbors > 0. Got {n_neighbors}") + elif not isinstance(n_neighbors, numbers.Integral): + raise TypeError( + f"n_neighbors does not take {type(n_neighbors)} value, " + "enter integer value" + ) - # Find indices of k nearest neighbors using partitioning: - # [0..k-1], [k], [k+1..n-1] - # They might not be ordered within themselves, - # but it is not necessary and partitioning is - # O(n) while sorting is O(nlogn) - closest_idx = np.argpartition(distances, self.n_neighbors) - closest_idx = closest_idx[: self.n_neighbors] - - if self.weights == "distance": - ws = distances[closest_idx] - # Using epsilon ~= 0 to avoid division by zero - ws = 1 / (ws + np.finfo(float).eps) - elif self.weights == "uniform": - ws = np.repeat(1.0, self.n_neighbors) + query_is_train = X is None + if query_is_train: + X = self.X_ + n_neighbors += 1 else: - raise Exception(f"Invalid kNN weights: {self.weights}") + X = self._preprocess_collection(X, store_metadata=False) + self._check_shape(X) + + distances = pairwise_distance( + X, + self.X_ if not query_is_train else None, + method=self.distance, + n_jobs=self.n_jobs, + **self._distance_params, + ) + + sample_range = np.arange(distances.shape[0])[:, None] + neigh_ind = np.argpartition(distances, n_neighbors - 1, axis=1) + neigh_ind = neigh_ind[:, :n_neighbors] + neigh_ind = neigh_ind[ + sample_range, np.argsort(distances[sample_range, neigh_ind]) + ] + + if query_is_train: + neigh_ind = neigh_ind[:, 1:] + + if return_distance: + if query_is_train: + neigh_dist = distances[sample_range, neigh_ind] + return neigh_dist, neigh_ind + return distances[sample_range, neigh_ind], neigh_ind - return closest_idx, ws + return neigh_ind @classmethod def _get_test_params( diff --git a/aeon/classification/distance_based/tests/test_time_series_neighbors.py b/aeon/classification/distance_based/tests/test_time_series_neighbors.py index 7746439d94..e68cad0019 100644 --- a/aeon/classification/distance_based/tests/test_time_series_neighbors.py +++ b/aeon/classification/distance_based/tests/test_time_series_neighbors.py @@ -1,5 +1,6 @@ """Tests for KNeighborsTimeSeriesClassifier.""" +import numpy as np import pytest from aeon.classification.distance_based import KNeighborsTimeSeriesClassifier @@ -42,12 +43,9 @@ @pytest.mark.parametrize("distance_key", distance_functions) def test_knn_on_unit_test(distance_key): """Test function for elastic knn, to be reinstated soon.""" - # load arrowhead data for unit tests X_train, y_train = load_unit_test(split="train") X_test, y_test = load_unit_test(split="test") - knn = KNeighborsTimeSeriesClassifier( - distance=distance_key, - ) + knn = KNeighborsTimeSeriesClassifier(distance=distance_key) knn.fit(X_train, y_train) pred = knn.predict(X_test) correct = 0 @@ -77,3 +75,26 @@ def test_knn_bounding_matrix(distance_key): if pred[j] == y_test[j]: correct = correct + 1 assert correct == expected_correct_window[distance_key] + + +@pytest.mark.parametrize("distance_key", distance_functions) +def test_knn_kneighbors(distance_key): + """Test knn kneighbors.""" + X_train, y_train = load_unit_test(split="train") + X_test, y_test = load_unit_test(split="test") + + knn = KNeighborsTimeSeriesClassifier(distance=distance_key) + knn.fit(X_train, y_train) + dists, ind = knn.kneighbors(X_test, n_neighbors=3) + assert isinstance(dists, np.ndarray) + assert isinstance(ind, np.ndarray) + assert dists.shape == (X_test.shape[0], 3) + assert ind.shape == (X_test.shape[0], 3) + indexes = ind[:, 0] + classes, y = np.unique(y_train, return_inverse=True) + pred = classes[y[indexes]] + correct = 0 + for j in range(0, len(pred)): + if pred[j] == y_test[j]: + correct = correct + 1 + assert correct == expected_correct_window[distance_key] diff --git a/aeon/distances/_distance.py b/aeon/distances/_distance.py index 1263e11cb4..8a5a2fb369 100644 --- a/aeon/distances/_distance.py +++ b/aeon/distances/_distance.py @@ -4,6 +4,7 @@ from typing import Any, Callable, Optional, TypedDict, Union import numpy as np +from joblib import Parallel, delayed from typing_extensions import Unpack from aeon.distances._mpdist import mp_distance, mp_pairwise_distance @@ -82,6 +83,7 @@ squared_distance, squared_pairwise_distance, ) +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -173,6 +175,7 @@ def pairwise_distance( y: Optional[np.ndarray] = None, method: Union[str, DistanceFunction, None] = None, symmetric: bool = True, + n_jobs: int = 1, **kwargs: Unpack[DistanceKwargs], ) -> np.ndarray: """Compute the pairwise distance matrix between two time series. @@ -197,6 +200,10 @@ def pairwise_distance( function is provided as the "method" parameter, then it will compute an asymmetric distance matrix, and the entire matrix (including both upper and lower triangles) is returned. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. kwargs : Any Extra arguments for distance. Refer to each distance documentation for a list of possible arguments. @@ -240,19 +247,23 @@ def pairwise_distance( [ 48.]]) """ if method in PAIRWISE_DISTANCE: - return DISTANCES_DICT[method]["pairwise_distance"](x, y, **kwargs) + return DISTANCES_DICT[method]["pairwise_distance"]( + x, y, n_jobs=n_jobs, **kwargs + ) elif isinstance(method, Callable): if y is None and not symmetric: - return _custom_func_pairwise(x, x, method, **kwargs) - return _custom_func_pairwise(x, y, method, **kwargs) + return _custom_func_pairwise(x, x, method, n_jobs=n_jobs, **kwargs) + return _custom_func_pairwise(x, y, method, n_jobs=n_jobs, **kwargs) else: raise ValueError("Method must be one of the supported strings or a callable") +@threaded def _custom_func_pairwise( X: Optional[Union[np.ndarray, list[np.ndarray]]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, dist_func: Union[DistanceFunction, None] = None, + n_jobs: int = 1, **kwargs: Unpack[DistanceKwargs], ) -> np.ndarray: if dist_func is None: @@ -260,25 +271,47 @@ def _custom_func_pairwise( multivariate_conversion = _is_numpy_list_multivariate(X, y) X, _ = _convert_collection_to_numba_list(X, "X", multivariate_conversion) + + if n_jobs > 1: + X = np.array(X) + if y is None: # To self - return _custom_pairwise_distance(X, dist_func, **kwargs) + return _custom_pairwise_distance(X, dist_func, n_jobs=n_jobs, **kwargs) y, _ = _convert_collection_to_numba_list(y, "y", multivariate_conversion) - return _custom_from_multiple_to_multiple_distance(X, y, dist_func, **kwargs) + if n_jobs > 1: + y = np.array(y) + return _custom_from_multiple_to_multiple_distance( + X, y, dist_func, n_jobs=n_jobs, **kwargs + ) def _custom_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], dist_func: DistanceFunction, + n_jobs: int = 1, **kwargs: Unpack[DistanceKwargs], ) -> np.ndarray: n_cases = len(X) distances = np.zeros((n_cases, n_cases)) - for i in range(n_cases): - for j in range(i + 1, n_cases): + def compute_single_distance(i, j): + return i, j, dist_func(X[i], X[j], **kwargs) + + indices = [(i, j) for i in range(n_cases) for j in range(i + 1, n_cases)] + + if n_jobs == 1: + for i, j in indices: distances[i, j] = dist_func(X[i], X[j], **kwargs) - distances[j, i] = distances[i, j] + distances[j, i] = distances[i, j] # Mirror for symmetry + else: + results = Parallel(n_jobs=n_jobs)( + delayed(compute_single_distance)(i, j) for i, j in indices + ) + + for i, j, dist in results: + distances[i, j] = dist + distances[j, i] = dist # Mirror for symmetry return distances @@ -287,15 +320,29 @@ def _custom_from_multiple_to_multiple_distance( x: Union[np.ndarray, list[np.ndarray]], y: Union[np.ndarray, list[np.ndarray]], dist_func: DistanceFunction, + n_jobs: int = 1, **kwargs: Unpack[DistanceKwargs], ) -> np.ndarray: n_cases = len(x) m_cases = len(y) distances = np.zeros((n_cases, m_cases)) - for i in range(n_cases): - for j in range(m_cases): + def compute_single_distance(i, j): + return i, j, dist_func(x[i], y[j], **kwargs) + + indices = [(i, j) for i in range(n_cases) for j in range(m_cases)] + + if n_jobs == 1: + for i, j in indices: distances[i, j] = dist_func(x[i], y[j], **kwargs) + else: + results = Parallel(n_jobs=n_jobs)( + delayed(compute_single_distance)(i, j) for i, j in indices + ) + + for i, j, dist in results: + distances[i, j] = dist + return distances diff --git a/aeon/distances/_mpdist.py b/aeon/distances/_mpdist.py index c679daef5c..7bfab4526c 100644 --- a/aeon/distances/_mpdist.py +++ b/aeon/distances/_mpdist.py @@ -4,10 +4,6 @@ import numpy as np from numba import njit -from numba.typed import List as NumbaList - -from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list -from aeon.utils.validation.collection import _is_numpy_list_multivariate def mp_distance(x: np.ndarray, y: np.ndarray, m: int = 0) -> float: @@ -287,6 +283,8 @@ def mp_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, m: int = 0, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: """Compute the mpdist pairwise distance between a set of time series. @@ -339,45 +337,8 @@ def mp_pairwise_distance( [2.82842712], [2.82842712]]) """ - multivariate_conversion = _is_numpy_list_multivariate(X, y) - _X, unequal_length = _convert_collection_to_numba_list( - X, "X", multivariate_conversion - ) - if m == 0: - m = int(_X.shape[2] / 4) - - if y is None: - return _mpdist_pairwise_distance_single(_X, m) - - _y, unequal_length = _convert_collection_to_numba_list( - y, "y", multivariate_conversion - ) - - return _mpdist_pairwise_distance(_X, _y, m) - - -def _mpdist_pairwise_distance_single(x: NumbaList[np.ndarray], m: int) -> np.ndarray: - n_cases = len(x) - distances = np.zeros((n_cases, n_cases)) - - for i in range(n_cases): - for j in range(i + 1, n_cases): - distances[i, j] = mp_distance(x[i], x[j], m) - distances[j, i] = distances[i, j] - - return distances - - -def _mpdist_pairwise_distance( - x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], m: int -) -> np.ndarray: - n_cases = len(x) - m_cases = len(y) - - distances = np.zeros((n_cases, m_cases)) + m = int(X.shape[2] / 4) + from aeon.distances._distance import pairwise_distance - for i in range(n_cases): - for j in range(m_cases): - distances[i, j] = mp_distance(x[i], y[j], m) - return distances + return pairwise_distance(X, y, method=mp_distance, m=m, n_jobs=n_jobs, **kwargs) diff --git a/aeon/distances/_sbd.py b/aeon/distances/_sbd.py index 1097f27b5a..1e72d4eca6 100644 --- a/aeon/distances/_sbd.py +++ b/aeon/distances/_sbd.py @@ -2,13 +2,15 @@ __maintainer__ = ["SebastianSchmidl"] +import warnings from typing import Optional, Union import numpy as np -from numba import njit, objmode +from numba import njit, objmode, prange from numba.typed import List as NumbaList from scipy.signal import correlate +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -113,10 +115,13 @@ def sbd_distance(x: np.ndarray, y: np.ndarray, standardize: bool = True) -> floa raise ValueError("x and y must be 1D or 2D") +@threaded def sbd_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, standardize: bool = True, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: """ Compute the shape-based distance (SBD) between all pairs of time series. @@ -138,6 +143,10 @@ def sbd_pairwise_distance( standardize : bool, default=True Apply z-score to both input time series for standardization before computing the distance. This makes SBD scaling invariant. Default is True. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -188,6 +197,15 @@ def sbd_pairwise_distance( [0.36754447, 0. , 0.29289322], [0.5527864 , 0.29289322, 0. ]]) """ + if n_jobs > 1: + warnings.warn( + "You have set n_jobs > 1. For this distance function " + "unless your data has a large number of time points, it is " + "recommended to use n_jobs=1. If this function is slower than " + "expected try setting n_jobs=1.", + UserWarning, + stacklevel=2, + ) multivariate_conversion = _is_numpy_list_multivariate(X, y) _X, _ = _convert_collection_to_numba_list(X, "", multivariate_conversion) @@ -199,14 +217,14 @@ def sbd_pairwise_distance( return _sbd_pairwise_distance(_X, _y, standardize) -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _sbd_pairwise_distance_single( x: NumbaList[np.ndarray], standardize: bool ) -> np.ndarray: n_cases = len(x) distances = np.zeros((n_cases, n_cases)) - for i in range(n_cases): + for i in prange(n_cases): for j in range(i + 1, n_cases): distances[i, j] = sbd_distance(x[i], x[j], standardize) distances[j, i] = distances[i, j] @@ -214,7 +232,7 @@ def _sbd_pairwise_distance_single( return distances -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _sbd_pairwise_distance( x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], standardize: bool ) -> np.ndarray: @@ -222,7 +240,7 @@ def _sbd_pairwise_distance( m_cases = len(y) distances = np.zeros((n_cases, m_cases)) - for i in range(n_cases): + for i in prange(n_cases): for j in range(m_cases): distances[i, j] = sbd_distance(x[i], y[j], standardize) return distances diff --git a/aeon/distances/_shift_scale_invariant.py b/aeon/distances/_shift_scale_invariant.py index 951b7ac560..e425ca702a 100644 --- a/aeon/distances/_shift_scale_invariant.py +++ b/aeon/distances/_shift_scale_invariant.py @@ -3,9 +3,10 @@ from typing import Optional, Union import numpy as np -from numba import njit +from numba import njit, prange from numba.typed import List as NumbaList +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -156,10 +157,13 @@ def _univariate_shift_scale_invariant_distance( return min_dist, best_shifted_y +@threaded def shift_scale_invariant_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, max_shift: Optional[int] = None, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: r"""Compute the shift-scale invariant pairwise distance between time series. @@ -193,6 +197,10 @@ def shift_scale_invariant_pairwise_distance( Maximum shift allowed in the alignment path. If None, then max_shift is set to min(X.shape[-1], y.shape[-1]) or if y is None, max_shift is set to X.shape[-1]. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -308,7 +316,7 @@ def shift_scale_invariant_best_shift( raise ValueError("x and y must be 1D or 2D") -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _shift_invariant_pairwise_distance( x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], max_shift: int ) -> np.ndarray: @@ -316,7 +324,7 @@ def _shift_invariant_pairwise_distance( m_cases = len(y) distances = np.zeros((n_cases, m_cases)) - for i in range(n_cases): + for i in prange(n_cases): for j in range(m_cases): distances[i, j] = shift_scale_invariant_distance(x[i], y[j], max_shift) return distances diff --git a/aeon/distances/elastic/_adtw.py b/aeon/distances/elastic/_adtw.py index feab2b4c18..f061f479ff 100644 --- a/aeon/distances/elastic/_adtw.py +++ b/aeon/distances/elastic/_adtw.py @@ -5,12 +5,13 @@ from typing import Optional, Union import numpy as np -from numba import njit +from numba import njit, prange from numba.typed import List as NumbaList from aeon.distances.elastic._alignment_paths import compute_min_return_path from aeon.distances.elastic._bounding_matrix import create_bounding_matrix from aeon.distances.pointwise._squared import _univariate_squared_distance +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -197,12 +198,15 @@ def _adtw_cost_matrix( return cost_matrix[1:, 1:] +@threaded def adtw_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, window: Optional[float] = None, itakura_max_slope: Optional[float] = None, warp_penalty: float = 1.0, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: r"""Compute the ADTW pairwise distance between a set of time series. @@ -226,6 +230,10 @@ def adtw_pairwise_distance( Penalty for warping. A high value will mean less warping. warp less and if value is low then will encourage algorithm to warp more. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -290,7 +298,7 @@ def adtw_pairwise_distance( ) -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _adtw_pairwise_distance( X: NumbaList[np.ndarray], window: Optional[float], @@ -306,7 +314,7 @@ def _adtw_pairwise_distance( bounding_matrix = create_bounding_matrix( n_timepoints, n_timepoints, window, itakura_max_slope ) - for i in range(n_cases): + for i in prange(n_cases): for j in range(i + 1, n_cases): x1, x2 = X[i], X[j] if unequal_length: @@ -319,7 +327,7 @@ def _adtw_pairwise_distance( return distances -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _adtw_from_multiple_to_multiple_distance( x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], @@ -336,7 +344,7 @@ def _adtw_from_multiple_to_multiple_distance( bounding_matrix = create_bounding_matrix( x[0].shape[1], y[0].shape[1], window, itakura_max_slope ) - for i in range(n_cases): + for i in prange(n_cases): for j in range(m_cases): x1, y1 = x[i], y[j] if unequal_length: diff --git a/aeon/distances/elastic/_ddtw.py b/aeon/distances/elastic/_ddtw.py index 50007185fd..a31745f2f0 100644 --- a/aeon/distances/elastic/_ddtw.py +++ b/aeon/distances/elastic/_ddtw.py @@ -5,7 +5,7 @@ from typing import Optional, Union import numpy as np -from numba import njit +from numba import njit, prange from numba.typed import List as NumbaList from aeon.distances.elastic._alignment_paths import compute_min_return_path @@ -14,6 +14,7 @@ _dtw_distance, create_bounding_matrix, ) +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -167,11 +168,14 @@ def ddtw_cost_matrix( raise ValueError("x and y must be 1D or 2D") +@threaded def ddtw_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, window: Optional[float] = None, itakura_max_slope: Optional[float] = None, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: """Compute the DDTW pairwise distance between a set of time series. @@ -191,6 +195,10 @@ def ddtw_pairwise_distance( itakura_max_slope : float, default=None Maximum slope as a proportion of the number of time points used to create Itakura parallelogram on the bounding matrix. Must be between 0. and 1. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -254,7 +262,7 @@ def ddtw_pairwise_distance( ) -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _ddtw_pairwise_distance( X: NumbaList[np.ndarray], window: Optional[float], @@ -274,7 +282,7 @@ def _ddtw_pairwise_distance( for i in range(n_cases): X_average_of_slope.append(average_of_slope(X[i])) - for i in range(n_cases): + for i in prange(n_cases): for j in range(i + 1, n_cases): x1, x2 = X_average_of_slope[i], X_average_of_slope[j] if unequal_length: @@ -287,7 +295,7 @@ def _ddtw_pairwise_distance( return distances -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _ddtw_from_multiple_to_multiple_distance( x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], @@ -313,7 +321,7 @@ def _ddtw_from_multiple_to_multiple_distance( for i in range(m_cases): y_average_of_slope.append(average_of_slope(y[i])) - for i in range(n_cases): + for i in prange(n_cases): for j in range(m_cases): x1, y1 = x_average_of_slope[i], y_average_of_slope[j] if unequal_length: diff --git a/aeon/distances/elastic/_dtw.py b/aeon/distances/elastic/_dtw.py index 73cce697ab..26add187dd 100644 --- a/aeon/distances/elastic/_dtw.py +++ b/aeon/distances/elastic/_dtw.py @@ -5,12 +5,13 @@ from typing import Optional, Union import numpy as np -from numba import njit +from numba import njit, prange from numba.typed import List as NumbaList from aeon.distances.elastic._alignment_paths import compute_min_return_path from aeon.distances.elastic._bounding_matrix import create_bounding_matrix from aeon.distances.pointwise._squared import _univariate_squared_distance +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -228,11 +229,14 @@ def _dtw_cost_matrix( return cost_matrix[1:, 1:] +@threaded def dtw_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, window: Optional[float] = None, itakura_max_slope: Optional[float] = None, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: r"""Compute the DTW pairwise distance between a set of time series. @@ -268,6 +272,10 @@ def dtw_pairwise_distance( itakura_max_slope : float, default=None Maximum slope as a proportion of the number of time points used to create Itakura parallelogram on the bounding matrix. Must be between 0. and 1. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -330,7 +338,7 @@ def dtw_pairwise_distance( ) -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _dtw_pairwise_distance( X: NumbaList[np.ndarray], window: Optional[float], @@ -345,7 +353,7 @@ def _dtw_pairwise_distance( bounding_matrix = create_bounding_matrix( n_timepoints, n_timepoints, window, itakura_max_slope ) - for i in range(n_cases): + for i in prange(n_cases): for j in range(i + 1, n_cases): x1, x2 = X[i], X[j] if unequal_length: @@ -358,7 +366,7 @@ def _dtw_pairwise_distance( return distances -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _dtw_from_multiple_to_multiple_distance( x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], @@ -374,7 +382,7 @@ def _dtw_from_multiple_to_multiple_distance( bounding_matrix = create_bounding_matrix( x[0].shape[1], y[0].shape[1], window, itakura_max_slope ) - for i in range(n_cases): + for i in prange(n_cases): for j in range(m_cases): x1, y1 = x[i], y[j] if unequal_length: diff --git a/aeon/distances/elastic/_edr.py b/aeon/distances/elastic/_edr.py index e14996ef7a..19a4ec483b 100644 --- a/aeon/distances/elastic/_edr.py +++ b/aeon/distances/elastic/_edr.py @@ -5,12 +5,13 @@ from typing import Optional, Union import numpy as np -from numba import njit +from numba import njit, prange from numba.typed import List as NumbaList from aeon.distances.elastic._alignment_paths import compute_min_return_path from aeon.distances.elastic._bounding_matrix import create_bounding_matrix from aeon.distances.pointwise._euclidean import _univariate_euclidean_distance +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -229,12 +230,15 @@ def _edr_cost_matrix( return cost_matrix[1:, 1:] +@threaded def edr_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, window: Optional[float] = None, epsilon: Optional[float] = None, itakura_max_slope: Optional[float] = None, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: """Compute the pairwise EDR distance between a set of time series. @@ -258,6 +262,10 @@ def edr_pairwise_distance( itakura_max_slope : float, default=None Maximum slope as a proportion of the number of time points used to create Itakura parallelogram on the bounding matrix. Must be between 0. and 1. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -322,7 +330,7 @@ def edr_pairwise_distance( ) -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _edr_pairwise_distance( X: NumbaList[np.ndarray], window: Optional[float], @@ -338,7 +346,7 @@ def _edr_pairwise_distance( bounding_matrix = create_bounding_matrix( n_timepoints, n_timepoints, window, itakura_max_slope ) - for i in range(n_cases): + for i in prange(n_cases): for j in range(i + 1, n_cases): x1, x2 = X[i], X[j] if unequal_length: @@ -351,7 +359,7 @@ def _edr_pairwise_distance( return distances -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _edr_from_multiple_to_multiple_distance( x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], @@ -368,7 +376,7 @@ def _edr_from_multiple_to_multiple_distance( bounding_matrix = create_bounding_matrix( x[0].shape[1], y[0].shape[1], window, itakura_max_slope ) - for i in range(n_cases): + for i in prange(n_cases): for j in range(m_cases): x1, y1 = x[i], y[j] if unequal_length: diff --git a/aeon/distances/elastic/_erp.py b/aeon/distances/elastic/_erp.py index 179b2f24f4..b1a7071a7b 100644 --- a/aeon/distances/elastic/_erp.py +++ b/aeon/distances/elastic/_erp.py @@ -5,12 +5,13 @@ from typing import Optional, Union import numpy as np -from numba import njit +from numba import njit, prange from numba.typed import List as NumbaList from aeon.distances.elastic._alignment_paths import compute_min_return_path from aeon.distances.elastic._bounding_matrix import create_bounding_matrix from aeon.distances.pointwise._euclidean import _univariate_euclidean_distance +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -248,6 +249,7 @@ def _precompute_g( return gx_distance, x_sum +@threaded def erp_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, @@ -255,6 +257,8 @@ def erp_pairwise_distance( g: float = 0.0, g_arr: Optional[np.ndarray] = None, itakura_max_slope: Optional[float] = None, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: """Compute the ERP pairwise distance between a set of time series. @@ -283,6 +287,10 @@ def erp_pairwise_distance( itakura_max_slope : float, default=None Maximum slope as a proportion of the number of time points used to create Itakura parallelogram on the bounding matrix. Must be between 0. and 1. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -343,7 +351,7 @@ def erp_pairwise_distance( ) -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _erp_pairwise_distance( X: NumbaList[np.ndarray], window: Optional[float], @@ -361,7 +369,7 @@ def _erp_pairwise_distance( n_timepoints, n_timepoints, window, itakura_max_slope ) - for i in range(n_cases): + for i in prange(n_cases): for j in range(i + 1, n_cases): x1, x2 = X[i], X[j] if unequal_length: @@ -374,7 +382,7 @@ def _erp_pairwise_distance( return distances -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _erp_from_multiple_to_multiple_distance( x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], @@ -392,7 +400,7 @@ def _erp_from_multiple_to_multiple_distance( bounding_matrix = create_bounding_matrix( x[0].shape[1], y[0].shape[1], window, itakura_max_slope ) - for i in range(n_cases): + for i in prange(n_cases): for j in range(m_cases): x1, y1 = x[i], y[j] if unequal_length: diff --git a/aeon/distances/elastic/_lcss.py b/aeon/distances/elastic/_lcss.py index 23e1eb9fe2..0cddbd9b0f 100644 --- a/aeon/distances/elastic/_lcss.py +++ b/aeon/distances/elastic/_lcss.py @@ -5,12 +5,13 @@ from typing import Optional, Union import numpy as np -from numba import njit +from numba import njit, prange from numba.typed import List as NumbaList from aeon.distances.elastic._alignment_paths import compute_lcss_return_path from aeon.distances.elastic._bounding_matrix import create_bounding_matrix from aeon.distances.pointwise._euclidean import _univariate_euclidean_distance +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -222,12 +223,15 @@ def _lcss_cost_matrix( return cost_matrix +@threaded def lcss_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, window: Optional[float] = None, epsilon: float = 1.0, itakura_max_slope: Optional[float] = None, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: """Compute the LCSS pairwise distance between a set of time series. @@ -250,6 +254,10 @@ def lcss_pairwise_distance( itakura_max_slope : float, default=None Maximum slope as a proportion of the number of time points used to create Itakura parallelogram on the bounding matrix. Must be between 0. and 1. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -312,7 +320,7 @@ def lcss_pairwise_distance( ) -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _lcss_pairwise_distance( X: NumbaList[np.ndarray], window: Optional[float], @@ -327,7 +335,7 @@ def _lcss_pairwise_distance( bounding_matrix = create_bounding_matrix( n_timepoints, n_timepoints, window, itakura_max_slope ) - for i in range(n_cases): + for i in prange(n_cases): for j in range(i + 1, n_cases): x1, x2 = X[i], X[j] if unequal_length: @@ -340,7 +348,7 @@ def _lcss_pairwise_distance( return distances -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _lcss_from_multiple_to_multiple_distance( x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], @@ -357,7 +365,7 @@ def _lcss_from_multiple_to_multiple_distance( bounding_matrix = create_bounding_matrix( x[0].shape[1], y[0].shape[1], window, itakura_max_slope ) - for i in range(n_cases): + for i in prange(n_cases): for j in range(m_cases): x1, y1 = x[i], y[j] if unequal_length: diff --git a/aeon/distances/elastic/_msm.py b/aeon/distances/elastic/_msm.py index c51eca3ab6..f75ab5daf8 100644 --- a/aeon/distances/elastic/_msm.py +++ b/aeon/distances/elastic/_msm.py @@ -5,12 +5,13 @@ from typing import Optional, Union import numpy as np -from numba import njit +from numba import njit, prange from numba.typed import List as NumbaList from aeon.distances.elastic._alignment_paths import compute_min_return_path from aeon.distances.elastic._bounding_matrix import create_bounding_matrix from aeon.distances.pointwise._squared import _univariate_squared_distance +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -343,6 +344,7 @@ def _cost_independent(x: float, y: float, z: float, c: float) -> float: return c + min(abs(x - y), abs(x - z)) +@threaded def msm_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, @@ -350,6 +352,8 @@ def msm_pairwise_distance( independent: bool = True, c: float = 1.0, itakura_max_slope: Optional[float] = None, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: """Compute the msm pairwise distance between a set of time series. @@ -374,6 +378,10 @@ def msm_pairwise_distance( itakura_max_slope : float, default=None Maximum slope as a proportion of the number of time points used to create Itakura parallelogram on the bounding matrix. Must be between 0. and 1. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -438,7 +446,7 @@ def msm_pairwise_distance( ) -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _msm_pairwise_distance( X: NumbaList[np.ndarray], window: Optional[float], @@ -455,7 +463,7 @@ def _msm_pairwise_distance( bounding_matrix = create_bounding_matrix( n_timepoints, n_timepoints, window, itakura_max_slope ) - for i in range(n_cases): + for i in prange(n_cases): for j in range(i + 1, n_cases): x1, x2 = X[i], X[j] if unequal_length: @@ -468,7 +476,7 @@ def _msm_pairwise_distance( return distances -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _msm_from_multiple_to_multiple_distance( x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], @@ -486,7 +494,7 @@ def _msm_from_multiple_to_multiple_distance( bounding_matrix = create_bounding_matrix( x[0].shape[1], y[0].shape[1], window, itakura_max_slope ) - for i in range(n_cases): + for i in prange(n_cases): for j in range(m_cases): x1, y1 = x[i], y[j] if unequal_length: diff --git a/aeon/distances/elastic/_shape_dtw.py b/aeon/distances/elastic/_shape_dtw.py index 25a72cef10..894db087c3 100644 --- a/aeon/distances/elastic/_shape_dtw.py +++ b/aeon/distances/elastic/_shape_dtw.py @@ -5,13 +5,14 @@ from typing import Optional, Union import numpy as np -from numba import njit +from numba import njit, prange from numba.typed import List as NumbaList from aeon.distances.elastic._alignment_paths import compute_min_return_path from aeon.distances.elastic._bounding_matrix import create_bounding_matrix from aeon.distances.elastic._dtw import _dtw_cost_matrix from aeon.distances.pointwise._squared import _univariate_squared_distance +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -515,6 +516,7 @@ def shape_dtw_alignment_path( return (compute_min_return_path(cost_matrix), shapedtw_dist) +@threaded def shape_dtw_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, @@ -525,6 +527,8 @@ def shape_dtw_pairwise_distance( transformation_precomputed: bool = False, transformed_x: Optional[np.ndarray] = None, transformed_y: Optional[np.ndarray] = None, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: """Compute the ShapeDTW pairwise distance among a set of series. @@ -563,6 +567,10 @@ def shape_dtw_pairwise_distance( The transformation of X, ignored if transformation_precomputed is False. transformed_y : np.ndarray, default = None The transformation of y, ignored if transformation_precomputed is False. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -644,7 +652,7 @@ def shape_dtw_pairwise_distance( ) -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _shape_dtw_pairwise_distance( X: NumbaList[np.ndarray], window: Optional[float], @@ -663,7 +671,7 @@ def _shape_dtw_pairwise_distance( bounding_matrix = create_bounding_matrix( n_timepoints, n_timepoints, window, itakura_max_slope ) - for i in range(len(X)): + for i in prange(len(X)): for j in range(i + 1, n_cases): x1_, x2_ = X[i], X[j] x1 = _pad_ts_edges(x=x1_, reach=reach) @@ -695,7 +703,7 @@ def _shape_dtw_pairwise_distance( return distances -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _shape_dtw_from_multiple_to_multiple_distance( x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], @@ -716,7 +724,7 @@ def _shape_dtw_from_multiple_to_multiple_distance( bounding_matrix = create_bounding_matrix( x[0].shape[1], y[0].shape[1], window, itakura_max_slope ) - for i in range(n_cases): + for i in prange(n_cases): for j in range(m_cases): x1_, y1_ = x[i], y[j] x1 = _pad_ts_edges(x=x1_, reach=reach) diff --git a/aeon/distances/elastic/_soft_dtw.py b/aeon/distances/elastic/_soft_dtw.py index 31b8743599..861b9cf428 100644 --- a/aeon/distances/elastic/_soft_dtw.py +++ b/aeon/distances/elastic/_soft_dtw.py @@ -5,13 +5,14 @@ from typing import Optional, Union import numpy as np -from numba import njit +from numba import njit, prange from numba.typed import List as NumbaList from aeon.distances.elastic._alignment_paths import compute_min_return_path from aeon.distances.elastic._bounding_matrix import create_bounding_matrix from aeon.distances.elastic._dtw import _dtw_cost_matrix from aeon.distances.pointwise._squared import _univariate_squared_distance +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -243,12 +244,15 @@ def _soft_dtw_cost_matrix( return cost_matrix[1:, 1:] +@threaded def soft_dtw_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, gamma: float = 1.0, window: Optional[float] = None, itakura_max_slope: Optional[float] = None, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: r"""Compute the soft-DTW pairwise distance between a set of time series. @@ -270,6 +274,10 @@ def soft_dtw_pairwise_distance( itakura_max_slope : float, default=None Maximum slope as a proportion of the number of time points used to create Itakura parallelogram on the bounding matrix. Must be between 0. and 1. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -334,7 +342,7 @@ def soft_dtw_pairwise_distance( ) -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _soft_dtw_pairwise_distance( X: NumbaList[np.ndarray], window: Optional[float], @@ -350,7 +358,7 @@ def _soft_dtw_pairwise_distance( bounding_matrix = create_bounding_matrix( n_timepoints, n_timepoints, window, itakura_max_slope ) - for i in range(n_cases): + for i in prange(n_cases): for j in range(i + 1, n_cases): x1, x2 = X[i], X[j] if unequal_length: @@ -363,7 +371,7 @@ def _soft_dtw_pairwise_distance( return distances -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _soft_dtw_from_multiple_to_multiple_distance( x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], @@ -380,7 +388,7 @@ def _soft_dtw_from_multiple_to_multiple_distance( bounding_matrix = create_bounding_matrix( x[0].shape[1], y[0].shape[1], window, itakura_max_slope ) - for i in range(n_cases): + for i in prange(n_cases): for j in range(m_cases): x1, y1 = x[i], y[j] if unequal_length: diff --git a/aeon/distances/elastic/_twe.py b/aeon/distances/elastic/_twe.py index fa4a3f4dea..077172e0a2 100644 --- a/aeon/distances/elastic/_twe.py +++ b/aeon/distances/elastic/_twe.py @@ -5,12 +5,13 @@ from typing import Optional, Union import numpy as np -from numba import njit +from numba import njit, prange from numba.typed import List as NumbaList from aeon.distances.elastic._alignment_paths import compute_min_return_path from aeon.distances.elastic._bounding_matrix import create_bounding_matrix from aeon.distances.pointwise._euclidean import _univariate_euclidean_distance +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -243,6 +244,7 @@ def _pad_arrs(x: np.ndarray) -> np.ndarray: return padded_x +@threaded def twe_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, @@ -250,6 +252,8 @@ def twe_pairwise_distance( nu: float = 0.001, lmbda: float = 1.0, itakura_max_slope: Optional[float] = None, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: """Compute the TWE pairwise distance between a set of time series. @@ -274,6 +278,10 @@ def twe_pairwise_distance( itakura_max_slope : float, default=None Maximum slope as a proportion of the number of time points used to create Itakura parallelogram on the bounding matrix. Must be between 0. and 1. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -336,7 +344,7 @@ def twe_pairwise_distance( ) -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _twe_pairwise_distance( X: NumbaList[np.ndarray], window: Optional[float], @@ -359,7 +367,7 @@ def _twe_pairwise_distance( for i in range(n_cases): padded_X.append(_pad_arrs(X[i])) - for i in range(n_cases): + for i in prange(n_cases): for j in range(i + 1, n_cases): x1, x2 = padded_X[i], padded_X[j] if unequal_length: @@ -372,7 +380,7 @@ def _twe_pairwise_distance( return distances -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _twe_from_multiple_to_multiple_distance( x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], @@ -399,7 +407,7 @@ def _twe_from_multiple_to_multiple_distance( for i in range(m_cases): padded_y.append(_pad_arrs(y[i])) - for i in range(n_cases): + for i in prange(n_cases): for j in range(m_cases): x1, y1 = padded_x[i], padded_y[j] if unequal_length: diff --git a/aeon/distances/elastic/_wddtw.py b/aeon/distances/elastic/_wddtw.py index 9a49728c30..5e064a8162 100644 --- a/aeon/distances/elastic/_wddtw.py +++ b/aeon/distances/elastic/_wddtw.py @@ -5,13 +5,14 @@ from typing import Optional, Union import numpy as np -from numba import njit +from numba import njit, prange from numba.typed import List as NumbaList from aeon.distances.elastic._alignment_paths import compute_min_return_path from aeon.distances.elastic._bounding_matrix import create_bounding_matrix from aeon.distances.elastic._ddtw import average_of_slope from aeon.distances.elastic._wdtw import _wdtw_cost_matrix, _wdtw_distance +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -171,12 +172,15 @@ def wddtw_cost_matrix( raise ValueError("x and y must be 1D or 2D") +@threaded def wddtw_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, window: Optional[float] = None, g: float = 0.05, itakura_max_slope: Optional[float] = None, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: """Compute the WDDTW pairwise distance between a set of time series. @@ -199,6 +203,10 @@ def wddtw_pairwise_distance( itakura_max_slope : float, default=None Maximum slope as a proportion of the number of time points used to create Itakura parallelogram on the bounding matrix. Must be between 0. and 1. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Raises ------ @@ -258,7 +266,7 @@ def wddtw_pairwise_distance( ) -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _wddtw_pairwise_distance( X: NumbaList[np.ndarray], window: Optional[float], @@ -278,7 +286,7 @@ def _wddtw_pairwise_distance( for i in range(n_cases): X_average_of_slope.append(average_of_slope(X[i])) - for i in range(n_cases): + for i in prange(n_cases): for j in range(i + 1, n_cases): x1, x2 = X_average_of_slope[i], X_average_of_slope[j] if unequal_length: @@ -291,7 +299,7 @@ def _wddtw_pairwise_distance( return distances -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _wddtw_from_multiple_to_multiple_distance( x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], @@ -318,7 +326,7 @@ def _wddtw_from_multiple_to_multiple_distance( for i in range(m_cases): y_average_of_slope.append(average_of_slope(y[i])) - for i in range(n_cases): + for i in prange(n_cases): for j in range(m_cases): x1, y1 = x_average_of_slope[i], y_average_of_slope[j] if unequal_length: diff --git a/aeon/distances/elastic/_wdtw.py b/aeon/distances/elastic/_wdtw.py index 3ad1767c9e..3573cedfc3 100644 --- a/aeon/distances/elastic/_wdtw.py +++ b/aeon/distances/elastic/_wdtw.py @@ -5,12 +5,13 @@ from typing import Optional, Union import numpy as np -from numba import njit +from numba import njit, prange from numba.typed import List as NumbaList from aeon.distances.elastic._alignment_paths import compute_min_return_path from aeon.distances.elastic._bounding_matrix import create_bounding_matrix from aeon.distances.pointwise._squared import _univariate_squared_distance +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -235,12 +236,15 @@ def _wdtw_cost_matrix( return cost_matrix[1:, 1:] +@threaded def wdtw_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, window: Optional[float] = None, g: float = 0.05, itakura_max_slope: Optional[float] = None, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: """Compute the WDTW pairwise distance between a set of time series. @@ -263,6 +267,10 @@ def wdtw_pairwise_distance( itakura_max_slope : float, default=None Maximum slope as a proportion of the number of time points used to create Itakura parallelogram on the bounding matrix. Must be between 0. and 1. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -324,7 +332,7 @@ def wdtw_pairwise_distance( ) -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _wdtw_pairwise_distance( X: NumbaList[np.ndarray], window: Optional[float], @@ -340,7 +348,7 @@ def _wdtw_pairwise_distance( bounding_matrix = create_bounding_matrix( n_timepoints, n_timepoints, window, itakura_max_slope ) - for i in range(n_cases): + for i in prange(n_cases): for j in range(i + 1, n_cases): x1, x2 = X[i], X[j] if unequal_length: @@ -353,7 +361,7 @@ def _wdtw_pairwise_distance( return distances -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _wdtw_from_multiple_to_multiple_distance( x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], @@ -370,7 +378,7 @@ def _wdtw_from_multiple_to_multiple_distance( bounding_matrix = create_bounding_matrix( x[0].shape[1], y[0].shape[1], window, itakura_max_slope ) - for i in range(n_cases): + for i in prange(n_cases): for j in range(m_cases): x1, y1 = x[i], y[j] if unequal_length: diff --git a/aeon/distances/mindist/_dft_sfa.py b/aeon/distances/mindist/_dft_sfa.py index 5f6e856260..9d8bd80794 100644 --- a/aeon/distances/mindist/_dft_sfa.py +++ b/aeon/distances/mindist/_dft_sfa.py @@ -5,6 +5,7 @@ import numpy as np from numba import njit, prange +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -85,8 +86,9 @@ def _univariate_dft_sfa_distance( return np.sqrt(2 * dist) +@threaded def mindist_dft_sfa_pairwise_distance( - X: np.ndarray, y: np.ndarray, breakpoints: np.ndarray + X: np.ndarray, y: np.ndarray, breakpoints: np.ndarray, n_jobs: int = 1, **kwargs ) -> np.ndarray: """Compute the DFT SFA pairwise distance between a set of SFA representations. @@ -98,6 +100,10 @@ def mindist_dft_sfa_pairwise_distance( A collection of SFA instances of shape ``(n_instances, n_timepoints)``. breakpoints: np.ndarray The breakpoints of the SAX transformation + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -132,7 +138,7 @@ def _dft_sfa_from_multiple_to_multiple_distance( distances = np.zeros((n_instances, n_instances)) for i in prange(n_instances): - for j in prange(i + 1, n_instances): + for j in range(i + 1, n_instances): distances[i, j] = _univariate_dft_sfa_distance(X[i], X[j], breakpoints) distances[j, i] = distances[i, j] else: @@ -141,7 +147,7 @@ def _dft_sfa_from_multiple_to_multiple_distance( distances = np.zeros((n_instances, m_instances)) for i in prange(n_instances): - for j in prange(m_instances): + for j in range(m_instances): distances[i, j] = _univariate_dft_sfa_distance(X[i], y[j], breakpoints) return distances diff --git a/aeon/distances/mindist/_paa_sax.py b/aeon/distances/mindist/_paa_sax.py index a53a8b35aa..8d7fb42350 100644 --- a/aeon/distances/mindist/_paa_sax.py +++ b/aeon/distances/mindist/_paa_sax.py @@ -3,6 +3,7 @@ import numpy as np from numba import njit, prange +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -90,8 +91,14 @@ def _univariate_paa_sax_distance( return np.sqrt(dist) +@threaded def mindist_paa_sax_pairwise_distance( - X: np.ndarray, y: np.ndarray, breakpoints: np.ndarray, n: int + X: np.ndarray, + y: np.ndarray, + breakpoints: np.ndarray, + n: int, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: """Compute the PAA SAX pairwise distance between a set of SAX representations. @@ -105,6 +112,10 @@ def mindist_paa_sax_pairwise_distance( The breakpoints of the SAX transformation n : int The original size of the time series + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -139,7 +150,7 @@ def _paa_sax_from_multiple_to_multiple_distance( distances = np.zeros((n_instances, n_instances)) for i in prange(n_instances): - for j in prange(i + 1, n_instances): + for j in range(i + 1, n_instances): distances[i, j] = _univariate_paa_sax_distance( X[i], X[j], breakpoints, n ) @@ -150,7 +161,7 @@ def _paa_sax_from_multiple_to_multiple_distance( distances = np.zeros((n_instances, m_instances)) for i in prange(n_instances): - for j in prange(m_instances): + for j in range(m_instances): distances[i, j] = _univariate_paa_sax_distance( X[i], y[j], breakpoints, n ) diff --git a/aeon/distances/mindist/_sax.py b/aeon/distances/mindist/_sax.py index cdecfb2ebc..8313de26bd 100644 --- a/aeon/distances/mindist/_sax.py +++ b/aeon/distances/mindist/_sax.py @@ -5,6 +5,7 @@ import numpy as np from numba import njit, prange +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -84,8 +85,14 @@ def _univariate_sax_distance( return np.sqrt(dist) +@threaded def mindist_sax_pairwise_distance( - X: np.ndarray, y: np.ndarray, breakpoints: np.ndarray, n: int + X: np.ndarray, + y: np.ndarray, + breakpoints: np.ndarray, + n: int, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: """Compute the SAX pairwise distance between a set of SAX representations. @@ -99,6 +106,10 @@ def mindist_sax_pairwise_distance( The breakpoints of the SAX transformation n : int The original size of the time series + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -134,7 +145,7 @@ def _sax_from_multiple_to_multiple_distance( distances = np.zeros((n_instances, n_instances)) for i in prange(n_instances): - for j in prange(i + 1, n_instances): + for j in range(i + 1, n_instances): distances[i, j] = _univariate_sax_distance(X[i], X[j], breakpoints, n) distances[j, i] = distances[i, j] else: @@ -143,7 +154,7 @@ def _sax_from_multiple_to_multiple_distance( distances = np.zeros((n_instances, m_instances)) for i in prange(n_instances): - for j in prange(m_instances): + for j in range(m_instances): distances[i, j] = _univariate_sax_distance(X[i], y[j], breakpoints, n) return distances diff --git a/aeon/distances/mindist/_sfa.py b/aeon/distances/mindist/_sfa.py index e9c6cf8638..95dabd9cb6 100644 --- a/aeon/distances/mindist/_sfa.py +++ b/aeon/distances/mindist/_sfa.py @@ -5,6 +5,7 @@ import numpy as np from numba import njit, prange +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -77,8 +78,9 @@ def _univariate_sfa_distance( return np.sqrt(2 * dist) +@threaded def mindist_sfa_pairwise_distance( - X: np.ndarray, y: np.ndarray, breakpoints: np.ndarray + X: np.ndarray, y: np.ndarray, breakpoints: np.ndarray, n_jobs: int = 1, **kwargs ) -> np.ndarray: """Compute the SFA mindist pairwise distance between a set of SFA representations. @@ -90,6 +92,10 @@ def mindist_sfa_pairwise_distance( A collection of SFA instances of shape ``(n_instances, n_timepoints)``. breakpoints: np.ndarray The breakpoints of the SAX transformation + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -125,7 +131,7 @@ def _sfa_from_multiple_to_multiple_distance( distances = np.zeros((n_instances, n_instances)) for i in prange(n_instances): - for j in prange(i + 1, n_instances): + for j in range(i + 1, n_instances): distances[i, j] = _univariate_sfa_distance(X[i], X[j], breakpoints) distances[j, i] = distances[i, j] else: @@ -134,7 +140,7 @@ def _sfa_from_multiple_to_multiple_distance( distances = np.zeros((n_instances, m_instances)) for i in prange(n_instances): - for j in prange(m_instances): + for j in range(m_instances): distances[i, j] = _univariate_sfa_distance(X[i], y[j], breakpoints) return distances diff --git a/aeon/distances/pointwise/_euclidean.py b/aeon/distances/pointwise/_euclidean.py index f7f0a640d4..607032b887 100644 --- a/aeon/distances/pointwise/_euclidean.py +++ b/aeon/distances/pointwise/_euclidean.py @@ -1,15 +1,17 @@ __maintainer__ = [] +import warnings from typing import Optional, Union import numpy as np -from numba import njit +from numba import njit, prange from numba.typed import List as NumbaList from aeon.distances.pointwise._squared import ( _univariate_squared_distance, squared_distance, ) +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -69,9 +71,12 @@ def _univariate_euclidean_distance(x: np.ndarray, y: np.ndarray) -> float: return np.sqrt(_univariate_squared_distance(x, y)) +@threaded def euclidean_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: """Compute the Euclidean pairwise distance between a set of time series. @@ -85,6 +90,10 @@ def euclidean_pairwise_distance( ``(m_cases, m_timepoints)`` or ``(m_cases, m_channels, m_timepoints)``. If None, then the euclidean pairwise distance between the instances of X is calculated. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -128,6 +137,16 @@ def euclidean_pairwise_distance( [ 5.19615242, 0. , 8. ], [12.12435565, 8. , 0. ]]) """ + if n_jobs > 1: + warnings.warn( + "You have set n_jobs > 1. For this distance function " + "unless your data has a large number of time points, it is " + "recommended to use n_jobs=1. If this function is slower than " + "expected try setting n_jobs=1.", + UserWarning, + stacklevel=2, + ) + multivariate_conversion = _is_numpy_list_multivariate(X, y) _X, _ = _convert_collection_to_numba_list(X, "X", multivariate_conversion) if y is None: @@ -138,12 +157,12 @@ def euclidean_pairwise_distance( return _euclidean_from_multiple_to_multiple_distance(_X, _y) -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _euclidean_pairwise_distance(X: NumbaList[np.ndarray]) -> np.ndarray: n_cases = len(X) distances = np.zeros((n_cases, n_cases)) - for i in range(n_cases): + for i in prange(n_cases): for j in range(i + 1, n_cases): distances[i, j] = euclidean_distance(X[i], X[j]) distances[j, i] = distances[i, j] @@ -151,7 +170,7 @@ def _euclidean_pairwise_distance(X: NumbaList[np.ndarray]) -> np.ndarray: return distances -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _euclidean_from_multiple_to_multiple_distance( x: NumbaList[np.ndarray], y: NumbaList[np.ndarray] ) -> np.ndarray: @@ -159,7 +178,7 @@ def _euclidean_from_multiple_to_multiple_distance( m_cases = len(y) distances = np.zeros((n_cases, m_cases)) - for i in range(n_cases): + for i in prange(n_cases): for j in range(m_cases): distances[i, j] = euclidean_distance(x[i], y[j]) return distances diff --git a/aeon/distances/pointwise/_manhattan.py b/aeon/distances/pointwise/_manhattan.py index 5c4a80e7a2..4d3892aed4 100644 --- a/aeon/distances/pointwise/_manhattan.py +++ b/aeon/distances/pointwise/_manhattan.py @@ -1,11 +1,13 @@ __maintainer__ = [] +import warnings from typing import Optional, Union import numpy as np -from numba import njit +from numba import njit, prange from numba.typed import List as NumbaList +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -74,9 +76,12 @@ def _univariate_manhattan_distance(x: np.ndarray, y: np.ndarray) -> float: return distance +@threaded def manhattan_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: """Compute the manhattan pairwise distance between a set of time series. @@ -90,6 +95,10 @@ def manhattan_pairwise_distance( ``(m_cases, m_timepoints)`` or ``(m_cases, m_channels, m_timepoints)``. If None, then the manhattan pairwise distance between the instances of X is calculated. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -133,6 +142,16 @@ def manhattan_pairwise_distance( [ 9., 0., 16.], [21., 16., 0.]]) """ + if n_jobs > 1: + warnings.warn( + "You have set n_jobs > 1. For this distance function " + "unless your data has a large number of time points, it is " + "recommended to use n_jobs=1. If this function is slower than " + "expected try setting n_jobs=1.", + UserWarning, + stacklevel=2, + ) + multivariate_conversion = _is_numpy_list_multivariate(X, y) _X, _ = _convert_collection_to_numba_list(X, "X", multivariate_conversion) if y is None: @@ -142,12 +161,12 @@ def manhattan_pairwise_distance( return _manhattan_from_multiple_to_multiple_distance(_X, _y) -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _manhattan_pairwise_distance(X: NumbaList[np.ndarray]) -> np.ndarray: n_cases = len(X) distances = np.zeros((n_cases, n_cases)) - for i in range(n_cases): + for i in prange(n_cases): for j in range(i + 1, n_cases): distances[i, j] = manhattan_distance(X[i], X[j]) distances[j, i] = distances[i, j] @@ -155,7 +174,7 @@ def _manhattan_pairwise_distance(X: NumbaList[np.ndarray]) -> np.ndarray: return distances -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _manhattan_from_multiple_to_multiple_distance( x: NumbaList[np.ndarray], y: NumbaList[np.ndarray] ) -> np.ndarray: @@ -163,7 +182,7 @@ def _manhattan_from_multiple_to_multiple_distance( m_cases = len(y) distances = np.zeros((n_cases, m_cases)) - for i in range(n_cases): + for i in prange(n_cases): for j in range(m_cases): distances[i, j] = manhattan_distance(x[i], y[j]) return distances diff --git a/aeon/distances/pointwise/_minkowski.py b/aeon/distances/pointwise/_minkowski.py index d25b504403..232e383eb1 100644 --- a/aeon/distances/pointwise/_minkowski.py +++ b/aeon/distances/pointwise/_minkowski.py @@ -3,9 +3,10 @@ from typing import Optional, Union import numpy as np -from numba import njit +from numba import njit, prange from numba.typed import List as NumbaList +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -126,11 +127,14 @@ def _multivariate_minkowski_distance( return dist ** (1.0 / p) +@threaded def minkowski_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, p: float = 2.0, w: Optional[np.ndarray] = None, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: """Compute the Minkowski pairwise distance between a set of time series. @@ -150,6 +154,10 @@ def minkowski_pairwise_distance( w : np.ndarray, default=None An array of weights, applied to each pairwise calculation. The weights should match the shape of the time series in X and y. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -211,14 +219,14 @@ def minkowski_pairwise_distance( return _minkowski_from_multiple_to_multiple_distance(_X, _y, p, w) -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _minkowski_pairwise_distance( X: NumbaList[np.ndarray], p: float, w: Optional[np.ndarray] = None ) -> np.ndarray: n_cases = len(X) distances = np.zeros((n_cases, n_cases)) - for i in range(n_cases): + for i in prange(n_cases): for j in range(i + 1, n_cases): if w is None: distances[i, j] = minkowski_distance(X[i], X[j], p) @@ -232,7 +240,7 @@ def _minkowski_pairwise_distance( return distances -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _minkowski_from_multiple_to_multiple_distance( x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], @@ -243,7 +251,7 @@ def _minkowski_from_multiple_to_multiple_distance( m_cases = len(y) distances = np.zeros((n_cases, m_cases)) - for i in range(n_cases): + for i in prange(n_cases): for j in range(m_cases): if w is None: distances[i, j] = minkowski_distance(x[i], y[j], p) diff --git a/aeon/distances/pointwise/_squared.py b/aeon/distances/pointwise/_squared.py index 045466ef51..f7391c8565 100644 --- a/aeon/distances/pointwise/_squared.py +++ b/aeon/distances/pointwise/_squared.py @@ -1,11 +1,13 @@ __maintainer__ = [] +import warnings from typing import Optional, Union import numpy as np -from numba import njit +from numba import njit, prange from numba.typed import List as NumbaList +from aeon.utils._threading import threaded from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list from aeon.utils.validation.collection import _is_numpy_list_multivariate @@ -73,9 +75,12 @@ def _univariate_squared_distance(x: np.ndarray, y: np.ndarray) -> float: return distance +@threaded def squared_pairwise_distance( X: Union[np.ndarray, list[np.ndarray]], y: Optional[Union[np.ndarray, list[np.ndarray]]] = None, + n_jobs: int = 1, + **kwargs, ) -> np.ndarray: """Compute the squared pairwise distance between a set of time series. @@ -89,6 +94,10 @@ def squared_pairwise_distance( ``(m_cases, m_timepoints)`` or ``(m_cases, m_channels, m_timepoints)``. If None, then the squared pairwise distance between the instances of X is calculated. + n_jobs : int, default=1 + The number of jobs to run in parallel. If -1, then the number of jobs is set + to the number of CPU cores. If 1, then the function is executed in a single + thread. If greater than 1, then the function is executed in parallel. Returns ------- @@ -132,6 +141,15 @@ def squared_pairwise_distance( [ 27., 0., 64.], [147., 64., 0.]]) """ + if n_jobs > 1: + warnings.warn( + "You have set n_jobs > 1. For this distance function " + "unless your data has a large number of time points, it is " + "recommended to use n_jobs=1. If this function is slower than " + "expected try setting n_jobs=1.", + UserWarning, + stacklevel=2, + ) multivariate_conversion = _is_numpy_list_multivariate(X, y) _X, _ = _convert_collection_to_numba_list(X, "X", multivariate_conversion) @@ -143,12 +161,12 @@ def squared_pairwise_distance( return _squared_from_multiple_to_multiple_distance(_X, _y) -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _squared_pairwise_distance(X: NumbaList[np.ndarray]) -> np.ndarray: n_cases = len(X) distances = np.zeros((n_cases, n_cases)) - for i in range(n_cases): + for i in prange(n_cases): for j in range(i + 1, n_cases): distances[i, j] = squared_distance(X[i], X[j]) distances[j, i] = distances[i, j] @@ -156,7 +174,7 @@ def _squared_pairwise_distance(X: NumbaList[np.ndarray]) -> np.ndarray: return distances -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def _squared_from_multiple_to_multiple_distance( x: NumbaList[np.ndarray], y: NumbaList[np.ndarray] ) -> np.ndarray: @@ -164,7 +182,7 @@ def _squared_from_multiple_to_multiple_distance( m_cases = len(y) distances = np.zeros((n_cases, m_cases)) - for i in range(n_cases): + for i in prange(n_cases): for j in range(m_cases): distances[i, j] = squared_distance(x[i], y[j]) return distances diff --git a/aeon/regression/distance_based/_time_series_neighbors.py b/aeon/regression/distance_based/_time_series_neighbors.py index 9981e2dc12..1a2eff0163 100644 --- a/aeon/regression/distance_based/_time_series_neighbors.py +++ b/aeon/regression/distance_based/_time_series_neighbors.py @@ -134,7 +134,7 @@ def _predict(self, X): """ preds = np.empty(len(X)) for i in range(len(X)): - idx, weights = self._kneighbors(X[i]) + idx, weights = self.kneighbors(X[i]) preds[i] = np.average(self.y_[idx], weights=weights) return preds diff --git a/aeon/utils/_threading.py b/aeon/utils/_threading.py new file mode 100644 index 0000000000..adf32df0a6 --- /dev/null +++ b/aeon/utils/_threading.py @@ -0,0 +1,62 @@ +import functools +import inspect +import os +import threading +from typing import Any, Callable + +from numba import set_num_threads + +from aeon.utils.validation import check_n_jobs + + +def threaded(func: Callable) -> Callable: + """Set thread count based on n_jobs parameter and restore it afterward. + + A decorator that sets the number of threads based on the n_jobs parameter + passed to the function, and restores the original thread count afterward. + + The decorated function is expected to have a 'n_jobs' parameter. + """ + + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + numba_env_threads = os.environ.get("NUMBA_NUM_THREADS") + + if numba_env_threads is not None and numba_env_threads.isdigit(): + original_thread_count = int(numba_env_threads) + else: + active_count = threading.active_count() + if isinstance(active_count, int): + original_thread_count = threading.active_count() + else: + original_thread_count = 1 + + n_jobs = None + if "n_jobs" in kwargs: + n_jobs = kwargs["n_jobs"] + else: + sig = inspect.signature(func) + param_names = list(sig.parameters.keys()) + + if "n_jobs" in param_names: + n_jobs_index = param_names.index("n_jobs") + if n_jobs_index < len(args): + n_jobs = args[n_jobs_index] + else: + default = sig.parameters["n_jobs"].default + n_jobs = default if default is not inspect.Parameter.empty else None + + if n_jobs is None and args and hasattr(args[0], "n_jobs"): + # This gets n_jobs if it belongs to a object (i.e. self.n_jobs) + n_jobs = args[0].n_jobs + + adjusted_n_jobs = check_n_jobs(n_jobs) + set_num_threads(adjusted_n_jobs) + + try: + result = func(*args, **kwargs) + return result + finally: + set_num_threads(original_thread_count) + + return wrapper diff --git a/aeon/utils/tests/test_threading_decorator.py b/aeon/utils/tests/test_threading_decorator.py new file mode 100644 index 0000000000..76c0d80503 --- /dev/null +++ b/aeon/utils/tests/test_threading_decorator.py @@ -0,0 +1,282 @@ +"""Test threading util decorator.""" + +import os +from unittest.mock import MagicMock, patch + +import pytest + +from aeon.utils._threading import threaded + + +def check_n_jobs(n_jobs): + """Mock implementation of check_n_jobs.""" + return n_jobs if n_jobs is not None else 1 + + +def set_num_threads(n_threads): + """Mock implementation of set_num_threads.""" + pass + + +@pytest.fixture +def clean_env(): + """Save and restore environment variables between tests.""" + original_env = os.environ.copy() + yield + os.environ.clear() + os.environ.update(original_env) + + +def test_basic_functionality(): + """Test that the decorator correctly sets and restores thread count.""" + check_jobs_mock = MagicMock(side_effect=lambda x: x if x is not None else 1) + set_threads_mock = MagicMock() + + with patch("aeon.utils._threading.check_n_jobs", check_jobs_mock): + with patch("aeon.utils._threading.set_num_threads", set_threads_mock): + + @threaded + def sample_func(n_jobs=None): + return "executed" + + result = sample_func(n_jobs=4) + + assert result == "executed" + check_jobs_mock.assert_called_once_with(4) + assert set_threads_mock.call_count == 2 + + +def test_numba_env_variable(clean_env): + """Test that the decorator respects NUMBA_NUM_THREADS environment variable.""" + os.environ["NUMBA_NUM_THREADS"] = "8" + + check_jobs_mock = MagicMock(side_effect=lambda x: x if x is not None else 1) + set_threads_mock = MagicMock() + + with patch("aeon.utils._threading.check_n_jobs", check_jobs_mock): + with patch("aeon.utils._threading.set_num_threads", set_threads_mock): + + @threaded + def sample_func(n_jobs=None): + return "executed" + + sample_func(n_jobs=4) + + assert set_threads_mock.call_args_list[0][0][0] == 4 + assert set_threads_mock.call_args_list[1][0][0] == 8 + + +def test_fallback_to_threading_count(clean_env): + """ + Test the fallback mechanism to the system's active thread count. + + When the NUMBA_NUM_THREADS environment variable is not set or is invalid, + the decorator should use the system's active thread count as the baseline. + This ensures proper thread management even when no explicit configuration is + provided. + """ + check_jobs_mock = MagicMock(side_effect=lambda x: x if x is not None else 1) + set_threads_mock = MagicMock() + thread_count_mock = MagicMock(return_value=3) + + with patch("aeon.utils._threading.check_n_jobs", check_jobs_mock): + with patch("aeon.utils._threading.set_num_threads", set_threads_mock): + with patch("threading.active_count", thread_count_mock): + + @threaded + def sample_func(n_jobs=None): + return "executed" + + sample_func(n_jobs=4) + + assert set_threads_mock.call_args_list[1][0][0] == 3 + + +def test_positional_argument(): + """ + Test the extraction of n_jobs when passed as a positional argument. + + The threaded decorator needs to correctly identify the n_jobs parameter + regardless of how it's passed to the function. This test verifies that + when n_jobs is passed as a positional argument, the decorator correctly + extracts its value and uses it to configure the thread count. + """ + check_jobs_mock = MagicMock(side_effect=lambda x: x if x is not None else 1) + set_threads_mock = MagicMock() + + with patch("aeon.utils._threading.check_n_jobs", check_jobs_mock): + with patch("aeon.utils._threading.set_num_threads", set_threads_mock): + + @threaded + def sample_func(data, n_jobs=None): + return data + + sample_func("test_data", 4) + + check_jobs_mock.assert_called_once_with(4) + + +def test_keyword_argument(): + """ + Test the extraction of n_jobs when passed as a keyword argument. + + Functions decorated with the threaded decorator can receive the n_jobs + parameter as a keyword argument. This test ensures that the decorator + correctly identifies and extracts the n_jobs value when passed this way, + demonstrating the decorator's flexibility in handling different calling styles. + """ + check_jobs_mock = MagicMock(side_effect=lambda x: x if x is not None else 1) + set_threads_mock = MagicMock() + + with patch("aeon.utils._threading.check_n_jobs", check_jobs_mock): + with patch("aeon.utils._threading.set_num_threads", set_threads_mock): + + @threaded + def sample_func(data, n_jobs=None): + return data + + sample_func(data="test_data", n_jobs=4) + + check_jobs_mock.assert_called_once_with(4) + + +def test_default_value(): + """ + Test the use of default n_jobs value when not explicitly provided. + + When a function has a default value for the n_jobs parameter and is called + without specifying this parameter, the threaded decorator should use the + function's default value. This test verifies this behavior, ensuring that + default function parameters are properly respected by the decorator. + """ + check_jobs_mock = MagicMock(side_effect=lambda x: x if x is not None else 1) + set_threads_mock = MagicMock() + + with patch("aeon.utils._threading.check_n_jobs", check_jobs_mock): + with patch("aeon.utils._threading.set_num_threads", set_threads_mock): + + @threaded + def sample_func(data, n_jobs=2): + return data + + sample_func("test_data") + + check_jobs_mock.assert_called_once_with(2) + + +def test_exception_handling(): + """ + Test resource cleanup when exceptions occur in the decorated function. + + A robust decorator must ensure resources are properly managed even when + the decorated function raises an exception. This test verifies that the + threaded decorator correctly restores the original thread count even when + the function execution fails with an exception, preventing resource leaks. + """ + check_jobs_mock = MagicMock(side_effect=lambda x: x if x is not None else 1) + set_threads_mock = MagicMock() + + with patch("aeon.utils._threading.check_n_jobs", check_jobs_mock): + with patch("aeon.utils._threading.set_num_threads", set_threads_mock): + + @threaded + def sample_func(n_jobs=None): + raise ValueError("Test exception") + + with pytest.raises(ValueError, match="Test exception"): + sample_func(n_jobs=4) + + assert set_threads_mock.call_count == 2 + + +def test_class_attribute(): + """ + Test the extraction of n_jobs from a class attribute. + + The threaded decorator should be able to extract the n_jobs value from + the first argument (typically 'self' in class methods) when it has an + n_jobs attribute. This test verifies that the decorator correctly identifies + and uses this attribute when the n_jobs parameter is not explicitly passed. + """ + check_jobs_mock = MagicMock(side_effect=lambda x: x if x is not None else 1) + set_threads_mock = MagicMock() + + with patch("aeon.utils._threading.check_n_jobs", check_jobs_mock): + with patch("aeon.utils._threading.set_num_threads", set_threads_mock): + + class TestClass: + def __init__(self, n_jobs): + self.n_jobs = n_jobs + + @threaded + def process_data(self, data): + return data + + test_instance = TestClass(n_jobs=5) + + test_instance.process_data("test_data") + + check_jobs_mock.assert_called_once_with(5) + assert set_threads_mock.call_count == 2 + + +def test_parameter_precedence_over_attribute(): + """ + Test that n_jobs parameter takes precedence over class attribute. + + When both a class attribute and a method parameter for n_jobs exist, + the parameter value should take precedence. This test verifies this + precedence rule, ensuring that explicit parameter values override + attribute values. + """ + check_jobs_mock = MagicMock(side_effect=lambda x: x if x is not None else 1) + set_threads_mock = MagicMock() + + with patch("aeon.utils._threading.check_n_jobs", check_jobs_mock): + with patch("aeon.utils._threading.set_num_threads", set_threads_mock): + + class TestClass: + def __init__(self, n_jobs): + self.n_jobs = n_jobs + + @threaded + def process_data(self, data, n_jobs=None): + return data + + test_instance = TestClass(n_jobs=5) + + test_instance.process_data("test_data", n_jobs=7) + + check_jobs_mock.assert_called_once_with(7) + assert set_threads_mock.call_count == 2 + + +def test_fallback_when_no_attribute(): + """ + Test fallback behavior when neither parameter nor attribute is available. + + When a class doesn't have an n_jobs attribute and the method doesn't + have an n_jobs parameter, the decorator should fall back to using None, + which will be converted to 1 by check_n_jobs. This test verifies this + fallback behavior. + """ + check_jobs_mock = MagicMock(side_effect=lambda x: x if x is not None else 1) + set_threads_mock = MagicMock() + + with patch("aeon.utils._threading.check_n_jobs", check_jobs_mock): + with patch("aeon.utils._threading.set_num_threads", set_threads_mock): + + class TestClass: + # No n_jobs attribute + pass + + @threaded + def process_data(self, data): + return data + + test_instance = TestClass() + + test_instance.process_data("test_data") + + check_jobs_mock.assert_called_once_with(None) + assert set_threads_mock.call_count == 2