From 6bda7fc6bfc5f06185b5cff5698c8806d8763d35 Mon Sep 17 00:00:00 2001 From: zeyus Date: Fri, 8 Mar 2024 13:06:27 +0100 Subject: [PATCH] Moved calc_rqa. Formatting, Linting, Typing. --- src/mopipe/core/analysis/__init__.py | 1 + src/mopipe/core/analysis/pipeline.py | 1 - src/mopipe/core/analysis/rqa.py | 65 +++++++++++ src/mopipe/segment.py | 156 +++++++++++++-------------- tests/core/analysis/test_pipeline.py | 1 + tests/test_segments.py | 14 +-- 6 files changed, 152 insertions(+), 86 deletions(-) create mode 100644 src/mopipe/core/analysis/rqa.py diff --git a/src/mopipe/core/analysis/__init__.py b/src/mopipe/core/analysis/__init__.py index dcb7cd8..692dd6d 100644 --- a/src/mopipe/core/analysis/__init__.py +++ b/src/mopipe/core/analysis/__init__.py @@ -1 +1,2 @@ from .pipeline import Pipeline # noqa: F401, TID252 +from .rqa import calc_rqa # noqa: F401, TID252 diff --git a/src/mopipe/core/analysis/pipeline.py b/src/mopipe/core/analysis/pipeline.py index eb50b52..66635ee 100644 --- a/src/mopipe/core/analysis/pipeline.py +++ b/src/mopipe/core/analysis/pipeline.py @@ -96,7 +96,6 @@ def __setitem__(self, index: t.Union[int, slice], value: t.Union[Segment, t.Iter raise ValueError(msg) self._segments[index] = list(value) - @t.overload def __delitem__(self, index: int) -> None: ... diff --git a/src/mopipe/core/analysis/rqa.py b/src/mopipe/core/analysis/rqa.py new file mode 100644 index 0000000..2af5397 --- /dev/null +++ b/src/mopipe/core/analysis/rqa.py @@ -0,0 +1,65 @@ +import numpy as np +import scipy # type: ignore +from pandas.api.extensions import ExtensionArray + + +def calc_rqa( + x: ExtensionArray | np.ndarray, + y: ExtensionArray | np.ndarray, + dim: int = 1, + tau: int = 1, + threshold: float = 0.1, + lmin: int = 2, +) -> list[float]: + embed_data_x: list[np.ndarray] | np.ndarray = [] + embed_data_y: list[np.ndarray] | np.ndarray = [] + for i in range(dim): + embed_data_x.append(x[i * tau : x.shape[0] - (dim - i - 1) * tau]) # type: ignore + embed_data_y.append(y[i * tau : y.shape[0] - (dim - i - 1) * tau]) # type: ignore + embed_data_x, embed_data_y = np.array(embed_data_x), np.array(embed_data_y) + + distance_matrix = scipy.spatial.distance_matrix(embed_data_x.T, embed_data_y.T) + recurrence_matrix = distance_matrix < threshold + msize = recurrence_matrix.shape[0] + + d_line_dist = np.zeros(msize + 1) + for i in range(-msize + 1, msize): + cline = 0 + for e in np.diagonal(recurrence_matrix, i): + if e: + cline += 1 + else: + d_line_dist[cline] += 1 + cline = 0 + d_line_dist[cline] += 1 + + v_line_dist = np.zeros(msize + 1) + for i in range(msize): + cline = 0 + for e in recurrence_matrix[:, i]: + if e: + cline += 1 + else: + v_line_dist[cline] += 1 + cline = 0 + v_line_dist[cline] += 1 + + rr_sum = recurrence_matrix.sum() + rr = rr_sum / msize**2 + det = (d_line_dist[lmin:] * np.arange(msize + 1)[lmin:]).sum() / rr_sum if rr_sum > 0 else 0 + lam = (v_line_dist[lmin:] * np.arange(msize + 1)[lmin:]).sum() / rr_sum if rr_sum > 0 else 0 + + d_sum = d_line_dist[lmin:].sum() + avg_diag_length = (d_line_dist[lmin:] * np.arange(msize + 1)[lmin:]).sum() / d_sum if d_sum > 0 else 0 + v_sum = d_line_dist[lmin:].sum() + avg_vert_length = (v_line_dist[lmin:] * np.arange(msize + 1)[lmin:]).sum() / v_sum if v_sum > 0 else 0 + + d_probs = d_line_dist[lmin:][d_line_dist[lmin:] > 0] + d_probs /= d_probs.sum() + d_entropy = -(d_probs * np.log(d_probs)).sum() + + v_probs = v_line_dist[lmin:][v_line_dist[lmin:] > 0] + v_probs /= v_probs.sum() + v_entropy = -(v_probs * np.log(v_probs)).sum() + + return [rr, det, lam, avg_diag_length, avg_vert_length, d_entropy, v_entropy] diff --git a/src/mopipe/segment.py b/src/mopipe/segment.py index 443b8e8..78015d2 100644 --- a/src/mopipe/segment.py +++ b/src/mopipe/segment.py @@ -2,8 +2,8 @@ import numpy as np import pandas as pd -import scipy +from mopipe.core.analysis import calc_rqa from mopipe.core.common.util import int_or_str_slice from mopipe.core.segments.inputs import AnySeriesInput, MultivariateSeriesInput, UnivariateSeriesInput from mopipe.core.segments.outputs import MultivariateSeriesOutput, SingleNumericValueOutput, UnivariateSeriesOutput @@ -21,7 +21,10 @@ def process(self, x: t.Union[pd.Series, pd.DataFrame], **kwargs) -> float: # no class ColMeans(SummaryType, MultivariateSeriesInput, UnivariateSeriesOutput, Segment): def process( - self, x: pd.DataFrame, col: t.Union[str, int, slice, None] = None, **kwargs # noqa: ARG002 + self, + x: pd.DataFrame, + col: t.Union[str, int, slice, None] = None, + **kwargs, # noqa: ARG002 ) -> pd.Series: slice_type = None if x.empty: @@ -40,103 +43,83 @@ def process( class CalcShift(TransformType, MultivariateSeriesInput, MultivariateSeriesOutput, Segment): def process( - self, x: pd.DataFrame, cols: t.Union[list[str], None] = None, shift: int = 1, **kwargs + self, + x: pd.DataFrame, + cols: pd.Index | None = None, + shift: int = 1, + **kwargs, # noqa: ARG002 ) -> pd.DataFrame: if cols is None: cols = x.columns for col_name in cols: col_data = x[col_name].values new_col_name = col_name + "_shift" - new_col_data = np.concatenate((np.zeros(shift), - col_data[shift:] - col_data[:-shift])) + new_col_data = np.concatenate((np.zeros(shift), col_data[shift:] - col_data[:-shift])) x[new_col_name] = new_col_data return x class SimpleGapFilling(TransformType, MultivariateSeriesInput, MultivariateSeriesOutput, Segment): def process( - self, x: pd.DataFrame, **kwargs + self, + x: pd.DataFrame, + **kwargs, # noqa: ARG002 ) -> pd.DataFrame: return x.interpolate(method="linear") -def calc_rqa(x: np.array, y: np.array, dim: int = 1, tau: int = 1, threshold: float = 0.1, lmin: int = 2): - embed_data_x, embed_data_y = [], [] - for i in range(dim): - embed_data_x.append(x[i*tau:x.shape[0]-(dim-i-1)*tau]) - embed_data_y.append(y[i*tau:y.shape[0]-(dim-i-1)*tau]) - embed_data_x, embed_data_y = np.array(embed_data_x), np.array(embed_data_y) - - distance_matrix = scipy.spatial.distance_matrix(embed_data_x.T, embed_data_y.T) - recurrence_matrix = distance_matrix < threshold - msize = recurrence_matrix.shape[0] - - d_line_dist = np.zeros(msize+1) - for i in range(-msize+1, msize): - cline = 0 - for e in np.diagonal(recurrence_matrix, i): - if e: - cline += 1 - else: - d_line_dist[cline] += 1 - cline = 0 - d_line_dist[cline] += 1 - - v_line_dist = np.zeros(msize+1) - for i in range(msize): - cline = 0 - for e in recurrence_matrix[:,i]: - if e: - cline += 1 - else: - v_line_dist[cline] += 1 - cline = 0 - v_line_dist[cline] += 1 - - rr_sum = recurrence_matrix.sum() - rr = rr_sum / msize**2 - det = (d_line_dist[lmin:] * np.arange(msize+1)[lmin:]).sum() / rr_sum if rr_sum > 0 else 0 - lam = (v_line_dist[lmin:] * np.arange(msize+1)[lmin:]).sum() / rr_sum if rr_sum > 0 else 0 - - d_sum = d_line_dist[lmin:].sum() - avg_diag_length = (d_line_dist[lmin:] * np.arange(msize+1)[lmin:]).sum() / d_sum if d_sum > 0 else 0 - v_sum = d_line_dist[lmin:].sum() - avg_vert_length = (v_line_dist[lmin:] * np.arange(msize+1)[lmin:]).sum() / v_sum if v_sum > 0 else 0 - - d_line_dist[lmin:] > 0 - d_probs = d_line_dist[lmin:][d_line_dist[lmin:] > 0] - d_probs /= d_probs.sum() - d_entropy = -(d_probs * np.log(d_probs)).sum() - - v_line_dist[lmin:] > 0 - v_probs = v_line_dist[lmin:][v_line_dist[lmin:] > 0] - v_probs /= v_probs.sum() - v_entropy = -(v_probs * np.log(v_probs)).sum() - - return rr, det, lam, avg_diag_length, avg_vert_length, d_entropy, v_entropy - - class RQAStats(AnalysisType, UnivariateSeriesInput, MultivariateSeriesOutput, Segment): def process( - self, x: pd.Series, dim: int = 1, tau: int = 1, threshold: float = 0.1, lmin: int = 2, **kwargs + self, + x: pd.Series, + dim: int = 1, + tau: int = 1, + threshold: float = 0.1, + lmin: int = 2, + **kwargs, # noqa: ARG002 ) -> pd.DataFrame: - out = pd.DataFrame(columns=["recurrence_rate", "determinism", "laminarity", - "avg_diag_length", "avg_vert_length", "d_entropy", "v_entropy"]) + out = pd.DataFrame( + columns=[ + "recurrence_rate", + "determinism", + "laminarity", + "avg_diag_length", + "avg_vert_length", + "d_entropy", + "v_entropy", + ] + ) if x.empty: return out - x = x.values - out.loc[len(out)] = calc_rqa(x, x, dim, tau, threshold, lmin) + xv = x.values + out.loc[len(out)] = calc_rqa(xv, xv, dim, tau, threshold, lmin) return out class CrossRQAStats(AnalysisType, MultivariateSeriesInput, MultivariateSeriesOutput, Segment): def process( - self, x: pd.DataFrame, col_a: t.Union[str, int] = 0, col_b: t.Union[str, int] = 0, - dim: int = 1, tau: int = 1, threshold: float = 0.1, lmin: int = 2, **kwargs + self, + x: pd.DataFrame, + col_a: t.Union[str, int] = 0, + col_b: t.Union[str, int] = 0, + dim: int = 1, + tau: int = 1, + threshold: float = 0.1, + lmin: int = 2, + **kwargs, # noqa: ARG002 ) -> pd.DataFrame: - out = pd.DataFrame(columns=["recurrence_rate", "determinism", "laminarity", - "avg_diag_length", "avg_vert_length", "d_entropy", "v_entropy"]) + out = pd.DataFrame( + columns=[ + "recurrence_rate", + "determinism", + "laminarity", + "avg_diag_length", + "avg_vert_length", + "d_entropy", + "v_entropy", + ] + ) if x.empty: return out if isinstance(col_a, int): @@ -154,12 +137,29 @@ def process( class WindowedCrossRQAStats(AnalysisType, MultivariateSeriesInput, MultivariateSeriesOutput, Segment): def process( - self, x: pd.DataFrame, col_a: t.Union[str, int] = 0, col_b: t.Union[str, int] = 0, - dim: int = 1, tau: int = 1, threshold: float = 0.1, lmin: int = 2, window: int = 100, - step: int = 10, **kwargs + self, + x: pd.DataFrame, + col_a: t.Union[str, int] = 0, + col_b: t.Union[str, int] = 0, + dim: int = 1, + tau: int = 1, + threshold: float = 0.1, + lmin: int = 2, + window: int = 100, + step: int = 10, + **kwargs, # noqa: ARG002 ) -> pd.DataFrame: - out = pd.DataFrame(columns=["recurrence_rate", "determinism", "laminarity", - "avg_diag_length", "avg_vert_length", "d_entropy", "v_entropy"]) + out = pd.DataFrame( + columns=[ + "recurrence_rate", + "determinism", + "laminarity", + "avg_diag_length", + "avg_vert_length", + "d_entropy", + "v_entropy", + ] + ) if x.empty: return out if isinstance(col_a, int): @@ -171,6 +171,6 @@ def process( if isinstance(col_b, str): xb = x.loc[:, col_b].values - for w in range(0, xa.shape[0]-window+1, step): - out.loc[len(out)] = calc_rqa(xa[w:w+window], xb[w:w+window], dim, tau, threshold, lmin) + for w in range(0, xa.shape[0] - window + 1, step): + out.loc[len(out)] = calc_rqa(xa[w : w + window], xb[w : w + window], dim, tau, threshold, lmin) return out diff --git a/tests/core/analysis/test_pipeline.py b/tests/core/analysis/test_pipeline.py index 9b94b12..26caa2d 100644 --- a/tests/core/analysis/test_pipeline.py +++ b/tests/core/analysis/test_pipeline.py @@ -20,6 +20,7 @@ def test_run_with_multiple_segments(self): output = pipeline.run(x=1) assert output == segment2.process_output + class MockSegment: def __init__(self): self.process_output = None diff --git a/tests/test_segments.py b/tests/test_segments.py index ab4dfe2..e138460 100644 --- a/tests/test_segments.py +++ b/tests/test_segments.py @@ -55,7 +55,7 @@ def segment(self) -> Segment: return RQAStats("TestRQAStats") def test_recurrence_measures(self, segment: Segment) -> None: - x = pd.Series([1,1,2,2]) + x = pd.Series([1, 1, 2, 2]) res = segment.process(x) assert res.loc[0, "recurrence_rate"] == 0.5 assert res.loc[0, "determinism"] == 0.5 @@ -71,7 +71,7 @@ def segment(self) -> Segment: return CrossRQAStats("TestCrossRQAStats") def test_cross_recurrence_measures(self, segment: Segment) -> None: - x = pd.DataFrame({"a": [1,1,2,2,1,1,2,2], "b": [3,3,2,2,3,3,2,2]}) + x = pd.DataFrame({"a": [1, 1, 2, 2, 1, 1, 2, 2], "b": [3, 3, 2, 2, 3, 3, 2, 2]}) res = segment.process(x, col_a=0, col_b=1) assert res.loc[0, "recurrence_rate"] == 0.25 res = segment.process(x, col_a="a", col_b="b") @@ -88,12 +88,12 @@ def segment(self) -> Segment: return WindowedCrossRQAStats("TestWindowedCrossRQAStats") def test_cross_recurrence_measures(self, segment: Segment) -> None: - x = pd.DataFrame({"a": [1,1,2,2,1,1,1,1], "b": [3,3,2,2,3,3,2,2]}) + x = pd.DataFrame({"a": [1, 1, 2, 2, 1, 1, 1, 1], "b": [3, 3, 2, 2, 3, 3, 2, 2]}) res = segment.process(x, col_a=0, col_b=1, window=4, step=2) assert res.shape[0] == 3 assert res.loc[0, "recurrence_rate"] == 0.25 assert res.loc[1, "recurrence_rate"] == 0.25 - assert res.loc[2, "recurrence_rate"] == 0. + assert res.loc[2, "recurrence_rate"] == 0.0 class TestCalcShift: @@ -102,10 +102,10 @@ def segment(self) -> Segment: return CalcShift("TestCalcShift") def test_calc_shift(self, segment: Segment) -> None: - x = pd.DataFrame({"a": [1,1,2,2,1,1,1,1], "b": [3,3,2,2,3,3,2,2]}) + x = pd.DataFrame({"a": [1, 1, 2, 2, 1, 1, 1, 1], "b": [3, 3, 2, 2, 3, 3, 2, 2]}) res = segment.process(x, cols=["a"], shift=2) assert res.shape[1] == 3 - assert (res["a_shift"].values == [0,0,1,1,-1,-1,0,0]).mean() == 1.0 + assert (res["a_shift"].values == [0, 0, 1, 1, -1, -1, 0, 0]).mean() == 1.0 class TestSimpleGapFilling: @@ -114,7 +114,7 @@ def segment(self) -> Segment: return SimpleGapFilling("TestGapFilling") def test_gap_filling(self, segment: Segment) -> None: - x = pd.DataFrame({"a": [1,1,2,np.nan,1,1,1,1], "b": [3,3,2,2,np.nan,np.nan,2,2]}) + x = pd.DataFrame({"a": [1, 1, 2, np.nan, 1, 1, 1, 1], "b": [3, 3, 2, 2, np.nan, np.nan, 2, 2]}) res = segment.process(x) assert res["a"][3] == 1.5 assert res["b"][4] == 2