Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] KNN n_jobs and updated kneighbours method #2578

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ac185e2
added numba prange to all pairwise distances
chrisholder Feb 17, 2025
c406623
sfa update
chrisholder Feb 18, 2025
cbf5e19
changed warning
chrisholder Feb 18, 2025
2b7acac
Merge branch 'main' into distance-module-n-jobs
baraline Feb 18, 2025
2efb8ce
Merge branch 'distance-module-n-jobs' into knn-update
chrisholder Feb 28, 2025
5f74e4e
thread kneighbors
chrisholder Feb 28, 2025
4c95abb
fix test
chrisholder Feb 28, 2025
d35954d
Merge branch 'main' into distance-module-n-jobs
chrisholder Feb 28, 2025
3ff96b1
custom pairwise threaded
chrisholder Mar 3, 2025
2a74496
fixed
chrisholder Mar 3, 2025
4afc8df
added threaded decorator
chrisholder Mar 4, 2025
331bf6f
Merge branch 'distance-module-n-jobs' into knn-update
chrisholder Mar 4, 2025
3b6233e
merge changes and fixed call
chrisholder Mar 4, 2025
b603ddb
fix
chrisholder Mar 4, 2025
24b46b0
fix
chrisholder Mar 4, 2025
3ee0152
Merge branch 'distance-module-n-jobs' into knn-update
chrisholder Mar 4, 2025
85dfdab
Merge branch 'main' into distance-module-n-jobs
MatthewMiddlehurst Mar 6, 2025
26c309d
Merge branch 'distance-module-n-jobs' into knn-update
chrisholder Mar 7, 2025
9f2e010
fixed
chrisholder Mar 7, 2025
541e7fd
expanded threaded decorator to work with classes
chrisholder Mar 7, 2025
405bdda
Merge branch 'distance-module-n-jobs' into knn-update
chrisholder Mar 7, 2025
32e93eb
fixed
chrisholder Mar 7, 2025
add714e
fix
chrisholder Mar 7, 2025
ac6eabe
Merge branch 'distance-module-n-jobs' into knn-update
chrisholder Mar 7, 2025
7b02683
merge
chrisholder Mar 7, 2025
a94cc8d
added test for kneighbors
chrisholder Mar 7, 2025
4470989
added test for kneighbors
chrisholder Mar 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 69 additions & 50 deletions aeon/classification/distance_based/_time_series_neighbors.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
distances in aeon.distances.
"""

import numbers
from typing import Optional

__maintainer__ = []
Expand All @@ -15,7 +16,8 @@
import numpy as np

from aeon.classification.base import BaseClassifier
from aeon.distances import get_distance_function
from aeon.distances import pairwise_distance
from aeon.utils._threading import threaded

WEIGHTS_SUPPORTED = ["uniform", "distance"]

Expand Down Expand Up @@ -46,11 +48,10 @@ class KNeighborsTimeSeriesClassifier(BaseClassifier):
n_timepoints)`` as input and returns a float.
distance_params : dict, default = None
Dictionary for metric parameters for the case that distance is a str.
n_jobs : int, default = None
The number of parallel jobs to run for neighbors search.
``None`` means 1 unless in a :obj:`joblib.parallel_backend` context.
``-1`` means using all processors.
for more details. Parameter for compatibility purposes, still unimplemented.
n_jobs : int, default=1
The number of jobs to run in parallel. If -1, then the number of jobs is set
to the number of CPU cores. If 1, then the function is executed in a single
thread. If greater than 1, then the function is executed in parallel.

Examples
--------
Expand Down Expand Up @@ -111,7 +112,6 @@ def _fit(self, X, y):
y : array-like, shape = (n_cases)
The class labels.
"""
self.metric_ = get_distance_function(method=self.distance)
self.X_ = X
self.classes_, self.y_ = np.unique(y, return_inverse=True)
return self
Expand All @@ -136,7 +136,7 @@ def _predict_proba(self, X):
"""
preds = np.zeros((len(X), len(self.classes_)))
for i in range(len(X)):
idx, weights = self._kneighbors(X[i])
idx, weights = self.kneighbors(X[i])
for id, w in zip(idx, weights):
predicted_class = self.y_[id]
preds[i, predicted_class] += w
Expand All @@ -163,61 +163,80 @@ def _predict(self, X):
"""
self._check_is_fitted()

preds = np.empty(len(X), dtype=self.classes_.dtype)
for i in range(len(X)):
scores = np.zeros(len(self.classes_))
idx, weights = self._kneighbors(X[i])
for id, w in zip(idx, weights):
predicted_class = self.y_[id]
scores[predicted_class] += w

preds[i] = self.classes_[np.argmax(scores)]
indexes = self.kneighbors(X, return_distance=False)[:, 0]
return self.classes_[self.y_[indexes]]

return preds

def _kneighbors(self, X):
"""
Find the K-neighbors of a point.
@threaded
def kneighbors(self, X=None, n_neighbors=None, return_distance=True):
"""Find the K-neighbors of a point.

Returns indices and weights of each point.
Returns indices of and distances to the neighbors of each point.

Parameters
----------
X : np.ndarray
A single time series instance if shape = (n_channels, n_timepoints)
X : 3D np.ndarray of shape = (n_cases, n_channels, n_timepoints) or list of
shape [n_cases] of 2D arrays shape (n_channels,n_timepoints_i)
The query point or points.
If not provided, neighbors of each indexed point are returned.
In this case, the query point is not considered its own neighbor.
n_neighbors : int, default=None
Number of neighbors required for each sample. The default is the value
passed to the constructor.
return_distance : bool, default=True
Whether or not to return the distances.

Returns
-------
ind : array
neigh_dist : ndarray of shape (n_queries, n_neighbors)
Array representing the distances to points, only present if
return_distance=True.
neigh_ind : ndarray of shape (n_queries, n_neighbors)
Indices of the nearest points in the population matrix.
ws : array
Array representing the weights of each neighbor.
"""
distances = np.array(
[
self.metric_(X, self.X_[j], **self._distance_params)
for j in range(len(self.X_))
]
)
self._check_is_fitted()
if n_neighbors is None:
n_neighbors = self.n_neighbors
elif n_neighbors <= 0:
raise ValueError(f"Expected n_neighbors > 0. Got {n_neighbors}")
elif not isinstance(n_neighbors, numbers.Integral):
raise TypeError(
f"n_neighbors does not take {type(n_neighbors)} value, "
"enter integer value"
)

# Find indices of k nearest neighbors using partitioning:
# [0..k-1], [k], [k+1..n-1]
# They might not be ordered within themselves,
# but it is not necessary and partitioning is
# O(n) while sorting is O(nlogn)
closest_idx = np.argpartition(distances, self.n_neighbors)
closest_idx = closest_idx[: self.n_neighbors]

if self.weights == "distance":
ws = distances[closest_idx]
# Using epsilon ~= 0 to avoid division by zero
ws = 1 / (ws + np.finfo(float).eps)
elif self.weights == "uniform":
ws = np.repeat(1.0, self.n_neighbors)
query_is_train = X is None
if query_is_train:
X = self.X_
n_neighbors += 1
else:
raise Exception(f"Invalid kNN weights: {self.weights}")
X = self._preprocess_collection(X, store_metadata=False)
self._check_shape(X)

distances = pairwise_distance(
X,
self.X_ if not query_is_train else None,
method=self.distance,
n_jobs=self.n_jobs,
**self._distance_params,
)

sample_range = np.arange(distances.shape[0])[:, None]
neigh_ind = np.argpartition(distances, n_neighbors - 1, axis=1)
neigh_ind = neigh_ind[:, :n_neighbors]
neigh_ind = neigh_ind[
sample_range, np.argsort(distances[sample_range, neigh_ind])
]

if query_is_train:
neigh_ind = neigh_ind[:, 1:]

if return_distance:
if query_is_train:
neigh_dist = distances[sample_range, neigh_ind]
return neigh_dist, neigh_ind
return distances[sample_range, neigh_ind], neigh_ind

return closest_idx, ws
return neigh_ind

@classmethod
def _get_test_params(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests for KNeighborsTimeSeriesClassifier."""

import numpy as np
import pytest

from aeon.classification.distance_based import KNeighborsTimeSeriesClassifier
Expand Down Expand Up @@ -42,12 +43,9 @@
@pytest.mark.parametrize("distance_key", distance_functions)
def test_knn_on_unit_test(distance_key):
"""Test function for elastic knn, to be reinstated soon."""
# load arrowhead data for unit tests
X_train, y_train = load_unit_test(split="train")
X_test, y_test = load_unit_test(split="test")
knn = KNeighborsTimeSeriesClassifier(
distance=distance_key,
)
knn = KNeighborsTimeSeriesClassifier(distance=distance_key)
knn.fit(X_train, y_train)
pred = knn.predict(X_test)
correct = 0
Expand Down Expand Up @@ -77,3 +75,26 @@ def test_knn_bounding_matrix(distance_key):
if pred[j] == y_test[j]:
correct = correct + 1
assert correct == expected_correct_window[distance_key]


@pytest.mark.parametrize("distance_key", distance_functions)
def test_knn_kneighbors(distance_key):
"""Test knn kneighbors."""
X_train, y_train = load_unit_test(split="train")
X_test, y_test = load_unit_test(split="test")

knn = KNeighborsTimeSeriesClassifier(distance=distance_key)
knn.fit(X_train, y_train)
dists, ind = knn.kneighbors(X_test, n_neighbors=3)
assert isinstance(dists, np.ndarray)
assert isinstance(ind, np.ndarray)
assert dists.shape == (X_test.shape[0], 3)
assert ind.shape == (X_test.shape[0], 3)
indexes = ind[:, 0]
classes, y = np.unique(y_train, return_inverse=True)
pred = classes[y[indexes]]
correct = 0
for j in range(0, len(pred)):
if pred[j] == y_test[j]:
correct = correct + 1
assert correct == expected_correct_window[distance_key]
67 changes: 57 additions & 10 deletions aeon/distances/_distance.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, Callable, Optional, TypedDict, Union

import numpy as np
from joblib import Parallel, delayed
from typing_extensions import Unpack

from aeon.distances._mpdist import mp_distance, mp_pairwise_distance
Expand Down Expand Up @@ -82,6 +83,7 @@
squared_distance,
squared_pairwise_distance,
)
from aeon.utils._threading import threaded
from aeon.utils.conversion._convert_collection import _convert_collection_to_numba_list
from aeon.utils.validation.collection import _is_numpy_list_multivariate

Expand Down Expand Up @@ -173,6 +175,7 @@ def pairwise_distance(
y: Optional[np.ndarray] = None,
method: Union[str, DistanceFunction, None] = None,
symmetric: bool = True,
n_jobs: int = 1,
**kwargs: Unpack[DistanceKwargs],
) -> np.ndarray:
"""Compute the pairwise distance matrix between two time series.
Expand All @@ -197,6 +200,10 @@ def pairwise_distance(
function is provided as the "method" parameter, then it will compute an
asymmetric distance matrix, and the entire matrix (including both upper and
lower triangles) is returned.
n_jobs : int, default=1
The number of jobs to run in parallel. If -1, then the number of jobs is set
to the number of CPU cores. If 1, then the function is executed in a single
thread. If greater than 1, then the function is executed in parallel.
kwargs : Any
Extra arguments for distance. Refer to each distance documentation for a list of
possible arguments.
Expand Down Expand Up @@ -240,45 +247,71 @@ def pairwise_distance(
[ 48.]])
"""
if method in PAIRWISE_DISTANCE:
return DISTANCES_DICT[method]["pairwise_distance"](x, y, **kwargs)
return DISTANCES_DICT[method]["pairwise_distance"](
x, y, n_jobs=n_jobs, **kwargs
)
elif isinstance(method, Callable):
if y is None and not symmetric:
return _custom_func_pairwise(x, x, method, **kwargs)
return _custom_func_pairwise(x, y, method, **kwargs)
return _custom_func_pairwise(x, x, method, n_jobs=n_jobs, **kwargs)
return _custom_func_pairwise(x, y, method, n_jobs=n_jobs, **kwargs)
else:
raise ValueError("Method must be one of the supported strings or a callable")


@threaded
def _custom_func_pairwise(
X: Optional[Union[np.ndarray, list[np.ndarray]]],
y: Optional[Union[np.ndarray, list[np.ndarray]]] = None,
dist_func: Union[DistanceFunction, None] = None,
n_jobs: int = 1,
**kwargs: Unpack[DistanceKwargs],
) -> np.ndarray:
if dist_func is None:
raise ValueError("dist_func must be a callable")

multivariate_conversion = _is_numpy_list_multivariate(X, y)
X, _ = _convert_collection_to_numba_list(X, "X", multivariate_conversion)

if n_jobs > 1:
X = np.array(X)

if y is None:
# To self
return _custom_pairwise_distance(X, dist_func, **kwargs)
return _custom_pairwise_distance(X, dist_func, n_jobs=n_jobs, **kwargs)
y, _ = _convert_collection_to_numba_list(y, "y", multivariate_conversion)
return _custom_from_multiple_to_multiple_distance(X, y, dist_func, **kwargs)
if n_jobs > 1:
y = np.array(y)
return _custom_from_multiple_to_multiple_distance(
X, y, dist_func, n_jobs=n_jobs, **kwargs
)


def _custom_pairwise_distance(
X: Union[np.ndarray, list[np.ndarray]],
dist_func: DistanceFunction,
n_jobs: int = 1,
**kwargs: Unpack[DistanceKwargs],
) -> np.ndarray:
n_cases = len(X)
distances = np.zeros((n_cases, n_cases))

for i in range(n_cases):
for j in range(i + 1, n_cases):
def compute_single_distance(i, j):
return i, j, dist_func(X[i], X[j], **kwargs)

indices = [(i, j) for i in range(n_cases) for j in range(i + 1, n_cases)]

if n_jobs == 1:
for i, j in indices:
distances[i, j] = dist_func(X[i], X[j], **kwargs)
distances[j, i] = distances[i, j]
distances[j, i] = distances[i, j] # Mirror for symmetry
else:
results = Parallel(n_jobs=n_jobs)(
delayed(compute_single_distance)(i, j) for i, j in indices
)

for i, j, dist in results:
distances[i, j] = dist
distances[j, i] = dist # Mirror for symmetry

return distances

Expand All @@ -287,15 +320,29 @@ def _custom_from_multiple_to_multiple_distance(
x: Union[np.ndarray, list[np.ndarray]],
y: Union[np.ndarray, list[np.ndarray]],
dist_func: DistanceFunction,
n_jobs: int = 1,
**kwargs: Unpack[DistanceKwargs],
) -> np.ndarray:
n_cases = len(x)
m_cases = len(y)
distances = np.zeros((n_cases, m_cases))

for i in range(n_cases):
for j in range(m_cases):
def compute_single_distance(i, j):
return i, j, dist_func(x[i], y[j], **kwargs)

indices = [(i, j) for i in range(n_cases) for j in range(m_cases)]

if n_jobs == 1:
for i, j in indices:
distances[i, j] = dist_func(x[i], y[j], **kwargs)
else:
results = Parallel(n_jobs=n_jobs)(
delayed(compute_single_distance)(i, j) for i, j in indices
)

for i, j, dist in results:
distances[i, j] = dist

return distances


Expand Down
Loading
Loading