Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Distance module n_jobs support #2545

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
67 changes: 57 additions & 10 deletions aeon/distances/_distance.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, Callable, Optional, TypedDict, Union

import numpy as np
from joblib import Parallel, delayed
from typing_extensions import Unpack

from aeon.distances._mpdist import mp_distance, mp_pairwise_distance
Expand Down Expand Up @@ -82,6 +83,7 @@
squared_distance,
squared_pairwise_distance,
)
from aeon.utils._threading import threaded
from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list
from aeon.utils.validation.collection import _is_numpy_list_multivariate

Expand Down Expand Up @@ -173,6 +175,7 @@ def pairwise_distance(
y: Optional[np.ndarray] = None,
method: Union[str, DistanceFunction, None] = None,
symmetric: bool = True,
n_jobs: int = 1,
**kwargs: Unpack[DistanceKwargs],
) -> np.ndarray:
"""Compute the pairwise distance matrix between two time series.
Expand All @@ -197,6 +200,10 @@ def pairwise_distance(
function is provided as the "method" parameter, then it will compute an
asymmetric distance matrix, and the entire matrix (including both upper and
lower triangles) is returned.
n_jobs : int, default=1
The number of jobs to run in parallel. If -1, then the number of jobs is set
to the number of CPU cores. If 1, then the function is executed in a single
thread. If greater than 1, then the function is executed in parallel.
kwargs : Any
Extra arguments for distance. Refer to each distance documentation for a list of
possible arguments.
Expand Down Expand Up @@ -240,45 +247,71 @@ def pairwise_distance(
[ 48.]])
"""
if method in PAIRWISE_DISTANCE:
return DISTANCES_DICT[method]["pairwise_distance"](x, y, **kwargs)
return DISTANCES_DICT[method]["pairwise_distance"](
x, y, n_jobs=n_jobs, **kwargs
)
elif isinstance(method, Callable):
if y is None and not symmetric:
return _custom_func_pairwise(x, x, method, **kwargs)
return _custom_func_pairwise(x, y, method, **kwargs)
return _custom_func_pairwise(x, x, method, n_jobs=n_jobs, **kwargs)
return _custom_func_pairwise(x, y, method, n_jobs=n_jobs, **kwargs)
else:
raise ValueError("Method must be one of the supported strings or a callable")


@threaded
def _custom_func_pairwise(
X: Optional[Union[np.ndarray, list[np.ndarray]]],
y: Optional[Union[np.ndarray, list[np.ndarray]]] = None,
dist_func: Union[DistanceFunction, None] = None,
n_jobs: int = 1,
**kwargs: Unpack[DistanceKwargs],
) -> np.ndarray:
if dist_func is None:
raise ValueError("dist_func must be a callable")

multivariate_conversion = _is_numpy_list_multivariate(X, y)
X, _ = _convert_collection_to_numba_list(X, "X", multivariate_conversion)

if n_jobs > 1:
X = np.array(X)

if y is None:
# To self
return _custom_pairwise_distance(X, dist_func, **kwargs)
return _custom_pairwise_distance(X, dist_func, n_jobs=n_jobs, **kwargs)
y, _ = _convert_collection_to_numba_list(y, "y", multivariate_conversion)
return _custom_from_multiple_to_multiple_distance(X, y, dist_func, **kwargs)
if n_jobs > 1:
y = np.array(y)
return _custom_from_multiple_to_multiple_distance(
X, y, dist_func, n_jobs=n_jobs, **kwargs
)


def _custom_pairwise_distance(
X: Union[np.ndarray, list[np.ndarray]],
dist_func: DistanceFunction,
n_jobs: int = 1,
**kwargs: Unpack[DistanceKwargs],
) -> np.ndarray:
n_cases = len(X)
distances = np.zeros((n_cases, n_cases))

for i in range(n_cases):
for j in range(i + 1, n_cases):
def compute_single_distance(i, j):
return i, j, dist_func(X[i], X[j], **kwargs)

indices = [(i, j) for i in range(n_cases) for j in range(i + 1, n_cases)]

if n_jobs == 1:
for i, j in indices:
distances[i, j] = dist_func(X[i], X[j], **kwargs)
distances[j, i] = distances[i, j]
distances[j, i] = distances[i, j] # Mirror for symmetry
else:
results = Parallel(n_jobs=n_jobs)(
delayed(compute_single_distance)(i, j) for i, j in indices
)

for i, j, dist in results:
distances[i, j] = dist
distances[j, i] = dist # Mirror for symmetry

return distances

Expand All @@ -287,15 +320,29 @@ def _custom_from_multiple_to_multiple_distance(
x: Union[np.ndarray, list[np.ndarray]],
y: Union[np.ndarray, list[np.ndarray]],
dist_func: DistanceFunction,
n_jobs: int = 1,
**kwargs: Unpack[DistanceKwargs],
) -> np.ndarray:
n_cases = len(x)
m_cases = len(y)
distances = np.zeros((n_cases, m_cases))

for i in range(n_cases):
for j in range(m_cases):
def compute_single_distance(i, j):
return i, j, dist_func(x[i], y[j], **kwargs)

indices = [(i, j) for i in range(n_cases) for j in range(m_cases)]

if n_jobs == 1:
for i, j in indices:
distances[i, j] = dist_func(x[i], y[j], **kwargs)
else:
results = Parallel(n_jobs=n_jobs)(
delayed(compute_single_distance)(i, j) for i, j in indices
)

for i, j, dist in results:
distances[i, j] = dist

return distances


Expand Down
49 changes: 5 additions & 44 deletions aeon/distances/_mpdist.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@

import numpy as np
from numba import njit
from numba.typed import List as NumbaList

from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list
from aeon.utils.validation.collection import _is_numpy_list_multivariate


def mp_distance(x: np.ndarray, y: np.ndarray, m: int = 0) -> float:
Expand Down Expand Up @@ -287,6 +283,8 @@ def mp_pairwise_distance(
X: Union[np.ndarray, list[np.ndarray]],
y: Optional[Union[np.ndarray, list[np.ndarray]]] = None,
m: int = 0,
n_jobs: int = 1,
**kwargs,
) -> np.ndarray:
"""Compute the mpdist pairwise distance between a set of time series.

Expand Down Expand Up @@ -339,45 +337,8 @@ def mp_pairwise_distance(
[2.82842712],
[2.82842712]])
"""
multivariate_conversion = _is_numpy_list_multivariate(X, y)
_X, unequal_length = _convert_collection_to_numba_list(
X, "X", multivariate_conversion
)

if m == 0:
m = int(_X.shape[2] / 4)

if y is None:
return _mpdist_pairwise_distance_single(_X, m)

_y, unequal_length = _convert_collection_to_numba_list(
y, "y", multivariate_conversion
)

return _mpdist_pairwise_distance(_X, _y, m)


def _mpdist_pairwise_distance_single(x: NumbaList[np.ndarray], m: int) -> np.ndarray:
n_cases = len(x)
distances = np.zeros((n_cases, n_cases))

for i in range(n_cases):
for j in range(i + 1, n_cases):
distances[i, j] = mp_distance(x[i], x[j], m)
distances[j, i] = distances[i, j]

return distances


def _mpdist_pairwise_distance(
x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], m: int
) -> np.ndarray:
n_cases = len(x)
m_cases = len(y)

distances = np.zeros((n_cases, m_cases))
m = int(X.shape[2] / 4)
from aeon.distances._distance import pairwise_distance

for i in range(n_cases):
for j in range(m_cases):
distances[i, j] = mp_distance(x[i], y[j], m)
return distances
return pairwise_distance(X, y, method=mp_distance, m=m, n_jobs=n_jobs, **kwargs)
28 changes: 23 additions & 5 deletions aeon/distances/_sbd.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

__maintainer__ = ["SebastianSchmidl"]

import warnings
from typing import Optional, Union

import numpy as np
from numba import njit, objmode
from numba import njit, objmode, prange
from numba.typed import List as NumbaList
from scipy.signal import correlate

from aeon.utils._threading import threaded
from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list
from aeon.utils.validation.collection import _is_numpy_list_multivariate

Expand Down Expand Up @@ -113,10 +115,13 @@ def sbd_distance(x: np.ndarray, y: np.ndarray, standardize: bool = True) -> floa
raise ValueError("x and y must be 1D or 2D")


@threaded
def sbd_pairwise_distance(
X: Union[np.ndarray, list[np.ndarray]],
y: Optional[Union[np.ndarray, list[np.ndarray]]] = None,
standardize: bool = True,
n_jobs: int = 1,
**kwargs,
) -> np.ndarray:
"""
Compute the shape-based distance (SBD) between all pairs of time series.
Expand All @@ -138,6 +143,10 @@ def sbd_pairwise_distance(
standardize : bool, default=True
Apply z-score to both input time series for standardization before
computing the distance. This makes SBD scaling invariant. Default is True.
n_jobs : int, default=1
The number of jobs to run in parallel. If -1, then the number of jobs is set
to the number of CPU cores. If 1, then the function is executed in a single
thread. If greater than 1, then the function is executed in parallel.

Returns
-------
Expand Down Expand Up @@ -188,6 +197,15 @@ def sbd_pairwise_distance(
[0.36754447, 0. , 0.29289322],
[0.5527864 , 0.29289322, 0. ]])
"""
if n_jobs > 1:
warnings.warn(
"You have set n_jobs > 1. For this distance function "
"unless your data has a large number of time points, it is "
"recommended to use n_jobs=1. If this function is slower than "
"expected try setting n_jobs=1.",
UserWarning,
stacklevel=2,
)
multivariate_conversion = _is_numpy_list_multivariate(X, y)
_X, _ = _convert_collection_to_numba_list(X, "", multivariate_conversion)

Expand All @@ -199,30 +217,30 @@ def sbd_pairwise_distance(
return _sbd_pairwise_distance(_X, _y, standardize)


@njit(cache=True, fastmath=True)
@njit(cache=True, fastmath=True, parallel=True)
def _sbd_pairwise_distance_single(
x: NumbaList[np.ndarray], standardize: bool
) -> np.ndarray:
n_cases = len(x)
distances = np.zeros((n_cases, n_cases))

for i in range(n_cases):
for i in prange(n_cases):
for j in range(i + 1, n_cases):
distances[i, j] = sbd_distance(x[i], x[j], standardize)
distances[j, i] = distances[i, j]

return distances


@njit(cache=True, fastmath=True)
@njit(cache=True, fastmath=True, parallel=True)
def _sbd_pairwise_distance(
x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], standardize: bool
) -> np.ndarray:
n_cases = len(x)
m_cases = len(y)
distances = np.zeros((n_cases, m_cases))

for i in range(n_cases):
for i in prange(n_cases):
for j in range(m_cases):
distances[i, j] = sbd_distance(x[i], y[j], standardize)
return distances
Expand Down
14 changes: 11 additions & 3 deletions aeon/distances/_shift_scale_invariant.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from typing import Optional, Union

import numpy as np
from numba import njit
from numba import njit, prange
from numba.typed import List as NumbaList

from aeon.utils._threading import threaded
from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list
from aeon.utils.validation.collection import _is_numpy_list_multivariate

Expand Down Expand Up @@ -156,10 +157,13 @@ def _univariate_shift_scale_invariant_distance(
return min_dist, best_shifted_y


@threaded
def shift_scale_invariant_pairwise_distance(
X: Union[np.ndarray, list[np.ndarray]],
y: Optional[Union[np.ndarray, list[np.ndarray]]] = None,
max_shift: Optional[int] = None,
n_jobs: int = 1,
**kwargs,
) -> np.ndarray:
r"""Compute the shift-scale invariant pairwise distance between time series.

Expand Down Expand Up @@ -193,6 +197,10 @@ def shift_scale_invariant_pairwise_distance(
Maximum shift allowed in the alignment path. If None, then max_shift is set
to min(X.shape[-1], y.shape[-1]) or if y is None, max_shift is set to
X.shape[-1].
n_jobs : int, default=1
The number of jobs to run in parallel. If -1, then the number of jobs is set
to the number of CPU cores. If 1, then the function is executed in a single
thread. If greater than 1, then the function is executed in parallel.

Returns
-------
Expand Down Expand Up @@ -308,15 +316,15 @@ def shift_scale_invariant_best_shift(
raise ValueError("x and y must be 1D or 2D")


@njit(cache=True, fastmath=True)
@njit(cache=True, fastmath=True, parallel=True)
def _shift_invariant_pairwise_distance(
x: NumbaList[np.ndarray], y: NumbaList[np.ndarray], max_shift: int
) -> np.ndarray:
n_cases = len(x)
m_cases = len(y)
distances = np.zeros((n_cases, m_cases))

for i in range(n_cases):
for i in prange(n_cases):
for j in range(m_cases):
distances[i, j] = shift_scale_invariant_distance(x[i], y[j], max_shift)
return distances
Loading
Loading