Skip to content

Commit

Permalink
Moved calc_rqa. Formatting, Linting, Typing.
Browse files Browse the repository at this point in the history
  • Loading branch information
zeyus committed Mar 8, 2024
1 parent fae72a2 commit 6bda7fc
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 86 deletions.
1 change: 1 addition & 0 deletions src/mopipe/core/analysis/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .pipeline import Pipeline # noqa: F401, TID252
from .rqa import calc_rqa # noqa: F401, TID252
1 change: 0 additions & 1 deletion src/mopipe/core/analysis/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ def __setitem__(self, index: t.Union[int, slice], value: t.Union[Segment, t.Iter
raise ValueError(msg)
self._segments[index] = list(value)


@t.overload
def __delitem__(self, index: int) -> None: ...

Expand Down
65 changes: 65 additions & 0 deletions src/mopipe/core/analysis/rqa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import numpy as np
import scipy # type: ignore
from pandas.api.extensions import ExtensionArray


def calc_rqa(
x: ExtensionArray | np.ndarray,
y: ExtensionArray | np.ndarray,
dim: int = 1,
tau: int = 1,
threshold: float = 0.1,
lmin: int = 2,
) -> list[float]:
embed_data_x: list[np.ndarray] | np.ndarray = []
embed_data_y: list[np.ndarray] | np.ndarray = []
for i in range(dim):
embed_data_x.append(x[i * tau : x.shape[0] - (dim - i - 1) * tau]) # type: ignore
embed_data_y.append(y[i * tau : y.shape[0] - (dim - i - 1) * tau]) # type: ignore
embed_data_x, embed_data_y = np.array(embed_data_x), np.array(embed_data_y)

distance_matrix = scipy.spatial.distance_matrix(embed_data_x.T, embed_data_y.T)
recurrence_matrix = distance_matrix < threshold
msize = recurrence_matrix.shape[0]

d_line_dist = np.zeros(msize + 1)
for i in range(-msize + 1, msize):
cline = 0
for e in np.diagonal(recurrence_matrix, i):
if e:
cline += 1
else:
d_line_dist[cline] += 1
cline = 0
d_line_dist[cline] += 1

v_line_dist = np.zeros(msize + 1)
for i in range(msize):
cline = 0
for e in recurrence_matrix[:, i]:
if e:
cline += 1
else:
v_line_dist[cline] += 1
cline = 0
v_line_dist[cline] += 1

rr_sum = recurrence_matrix.sum()
rr = rr_sum / msize**2
det = (d_line_dist[lmin:] * np.arange(msize + 1)[lmin:]).sum() / rr_sum if rr_sum > 0 else 0
lam = (v_line_dist[lmin:] * np.arange(msize + 1)[lmin:]).sum() / rr_sum if rr_sum > 0 else 0

d_sum = d_line_dist[lmin:].sum()
avg_diag_length = (d_line_dist[lmin:] * np.arange(msize + 1)[lmin:]).sum() / d_sum if d_sum > 0 else 0
v_sum = d_line_dist[lmin:].sum()
avg_vert_length = (v_line_dist[lmin:] * np.arange(msize + 1)[lmin:]).sum() / v_sum if v_sum > 0 else 0

d_probs = d_line_dist[lmin:][d_line_dist[lmin:] > 0]
d_probs /= d_probs.sum()
d_entropy = -(d_probs * np.log(d_probs)).sum()

v_probs = v_line_dist[lmin:][v_line_dist[lmin:] > 0]
v_probs /= v_probs.sum()
v_entropy = -(v_probs * np.log(v_probs)).sum()

return [rr, det, lam, avg_diag_length, avg_vert_length, d_entropy, v_entropy]
156 changes: 78 additions & 78 deletions src/mopipe/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import numpy as np
import pandas as pd
import scipy

from mopipe.core.analysis import calc_rqa
from mopipe.core.common.util import int_or_str_slice
from mopipe.core.segments.inputs import AnySeriesInput, MultivariateSeriesInput, UnivariateSeriesInput
from mopipe.core.segments.outputs import MultivariateSeriesOutput, SingleNumericValueOutput, UnivariateSeriesOutput
Expand All @@ -21,7 +21,10 @@ def process(self, x: t.Union[pd.Series, pd.DataFrame], **kwargs) -> float: # no

class ColMeans(SummaryType, MultivariateSeriesInput, UnivariateSeriesOutput, Segment):
def process(
self, x: pd.DataFrame, col: t.Union[str, int, slice, None] = None, **kwargs # noqa: ARG002
self,
x: pd.DataFrame,
col: t.Union[str, int, slice, None] = None,
**kwargs, # noqa: ARG002
) -> pd.Series:
slice_type = None
if x.empty:
Expand All @@ -40,103 +43,83 @@ def process(

class CalcShift(TransformType, MultivariateSeriesInput, MultivariateSeriesOutput, Segment):
def process(
self, x: pd.DataFrame, cols: t.Union[list[str], None] = None, shift: int = 1, **kwargs
self,
x: pd.DataFrame,
cols: pd.Index | None = None,
shift: int = 1,
**kwargs, # noqa: ARG002
) -> pd.DataFrame:
if cols is None:
cols = x.columns
for col_name in cols:
col_data = x[col_name].values
new_col_name = col_name + "_shift"
new_col_data = np.concatenate((np.zeros(shift),
col_data[shift:] - col_data[:-shift]))
new_col_data = np.concatenate((np.zeros(shift), col_data[shift:] - col_data[:-shift]))
x[new_col_name] = new_col_data
return x


class SimpleGapFilling(TransformType, MultivariateSeriesInput, MultivariateSeriesOutput, Segment):
def process(
self, x: pd.DataFrame, **kwargs
self,
x: pd.DataFrame,
**kwargs, # noqa: ARG002
) -> pd.DataFrame:
return x.interpolate(method="linear")


def calc_rqa(x: np.array, y: np.array, dim: int = 1, tau: int = 1, threshold: float = 0.1, lmin: int = 2):
embed_data_x, embed_data_y = [], []
for i in range(dim):
embed_data_x.append(x[i*tau:x.shape[0]-(dim-i-1)*tau])
embed_data_y.append(y[i*tau:y.shape[0]-(dim-i-1)*tau])
embed_data_x, embed_data_y = np.array(embed_data_x), np.array(embed_data_y)

distance_matrix = scipy.spatial.distance_matrix(embed_data_x.T, embed_data_y.T)
recurrence_matrix = distance_matrix < threshold
msize = recurrence_matrix.shape[0]

d_line_dist = np.zeros(msize+1)
for i in range(-msize+1, msize):
cline = 0
for e in np.diagonal(recurrence_matrix, i):
if e:
cline += 1
else:
d_line_dist[cline] += 1
cline = 0
d_line_dist[cline] += 1

v_line_dist = np.zeros(msize+1)
for i in range(msize):
cline = 0
for e in recurrence_matrix[:,i]:
if e:
cline += 1
else:
v_line_dist[cline] += 1
cline = 0
v_line_dist[cline] += 1

rr_sum = recurrence_matrix.sum()
rr = rr_sum / msize**2
det = (d_line_dist[lmin:] * np.arange(msize+1)[lmin:]).sum() / rr_sum if rr_sum > 0 else 0
lam = (v_line_dist[lmin:] * np.arange(msize+1)[lmin:]).sum() / rr_sum if rr_sum > 0 else 0

d_sum = d_line_dist[lmin:].sum()
avg_diag_length = (d_line_dist[lmin:] * np.arange(msize+1)[lmin:]).sum() / d_sum if d_sum > 0 else 0
v_sum = d_line_dist[lmin:].sum()
avg_vert_length = (v_line_dist[lmin:] * np.arange(msize+1)[lmin:]).sum() / v_sum if v_sum > 0 else 0

d_line_dist[lmin:] > 0
d_probs = d_line_dist[lmin:][d_line_dist[lmin:] > 0]
d_probs /= d_probs.sum()
d_entropy = -(d_probs * np.log(d_probs)).sum()

v_line_dist[lmin:] > 0
v_probs = v_line_dist[lmin:][v_line_dist[lmin:] > 0]
v_probs /= v_probs.sum()
v_entropy = -(v_probs * np.log(v_probs)).sum()

return rr, det, lam, avg_diag_length, avg_vert_length, d_entropy, v_entropy


class RQAStats(AnalysisType, UnivariateSeriesInput, MultivariateSeriesOutput, Segment):
def process(
self, x: pd.Series, dim: int = 1, tau: int = 1, threshold: float = 0.1, lmin: int = 2, **kwargs
self,
x: pd.Series,
dim: int = 1,
tau: int = 1,
threshold: float = 0.1,
lmin: int = 2,
**kwargs, # noqa: ARG002
) -> pd.DataFrame:
out = pd.DataFrame(columns=["recurrence_rate", "determinism", "laminarity",
"avg_diag_length", "avg_vert_length", "d_entropy", "v_entropy"])
out = pd.DataFrame(
columns=[
"recurrence_rate",
"determinism",
"laminarity",
"avg_diag_length",
"avg_vert_length",
"d_entropy",
"v_entropy",
]
)
if x.empty:
return out

x = x.values
out.loc[len(out)] = calc_rqa(x, x, dim, tau, threshold, lmin)
xv = x.values
out.loc[len(out)] = calc_rqa(xv, xv, dim, tau, threshold, lmin)
return out


class CrossRQAStats(AnalysisType, MultivariateSeriesInput, MultivariateSeriesOutput, Segment):
def process(
self, x: pd.DataFrame, col_a: t.Union[str, int] = 0, col_b: t.Union[str, int] = 0,
dim: int = 1, tau: int = 1, threshold: float = 0.1, lmin: int = 2, **kwargs
self,
x: pd.DataFrame,
col_a: t.Union[str, int] = 0,
col_b: t.Union[str, int] = 0,
dim: int = 1,
tau: int = 1,
threshold: float = 0.1,
lmin: int = 2,
**kwargs, # noqa: ARG002
) -> pd.DataFrame:
out = pd.DataFrame(columns=["recurrence_rate", "determinism", "laminarity",
"avg_diag_length", "avg_vert_length", "d_entropy", "v_entropy"])
out = pd.DataFrame(
columns=[
"recurrence_rate",
"determinism",
"laminarity",
"avg_diag_length",
"avg_vert_length",
"d_entropy",
"v_entropy",
]
)
if x.empty:
return out
if isinstance(col_a, int):
Expand All @@ -154,12 +137,29 @@ def process(

class WindowedCrossRQAStats(AnalysisType, MultivariateSeriesInput, MultivariateSeriesOutput, Segment):
def process(
self, x: pd.DataFrame, col_a: t.Union[str, int] = 0, col_b: t.Union[str, int] = 0,
dim: int = 1, tau: int = 1, threshold: float = 0.1, lmin: int = 2, window: int = 100,
step: int = 10, **kwargs
self,
x: pd.DataFrame,
col_a: t.Union[str, int] = 0,
col_b: t.Union[str, int] = 0,
dim: int = 1,
tau: int = 1,
threshold: float = 0.1,
lmin: int = 2,
window: int = 100,
step: int = 10,
**kwargs, # noqa: ARG002
) -> pd.DataFrame:
out = pd.DataFrame(columns=["recurrence_rate", "determinism", "laminarity",
"avg_diag_length", "avg_vert_length", "d_entropy", "v_entropy"])
out = pd.DataFrame(
columns=[
"recurrence_rate",
"determinism",
"laminarity",
"avg_diag_length",
"avg_vert_length",
"d_entropy",
"v_entropy",
]
)
if x.empty:
return out
if isinstance(col_a, int):
Expand All @@ -171,6 +171,6 @@ def process(
if isinstance(col_b, str):
xb = x.loc[:, col_b].values

for w in range(0, xa.shape[0]-window+1, step):
out.loc[len(out)] = calc_rqa(xa[w:w+window], xb[w:w+window], dim, tau, threshold, lmin)
for w in range(0, xa.shape[0] - window + 1, step):
out.loc[len(out)] = calc_rqa(xa[w : w + window], xb[w : w + window], dim, tau, threshold, lmin)
return out
1 change: 1 addition & 0 deletions tests/core/analysis/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def test_run_with_multiple_segments(self):
output = pipeline.run(x=1)
assert output == segment2.process_output


class MockSegment:
def __init__(self):
self.process_output = None
Expand Down
14 changes: 7 additions & 7 deletions tests/test_segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def segment(self) -> Segment:
return RQAStats("TestRQAStats")

def test_recurrence_measures(self, segment: Segment) -> None:
x = pd.Series([1,1,2,2])
x = pd.Series([1, 1, 2, 2])
res = segment.process(x)
assert res.loc[0, "recurrence_rate"] == 0.5
assert res.loc[0, "determinism"] == 0.5
Expand All @@ -71,7 +71,7 @@ def segment(self) -> Segment:
return CrossRQAStats("TestCrossRQAStats")

def test_cross_recurrence_measures(self, segment: Segment) -> None:
x = pd.DataFrame({"a": [1,1,2,2,1,1,2,2], "b": [3,3,2,2,3,3,2,2]})
x = pd.DataFrame({"a": [1, 1, 2, 2, 1, 1, 2, 2], "b": [3, 3, 2, 2, 3, 3, 2, 2]})
res = segment.process(x, col_a=0, col_b=1)
assert res.loc[0, "recurrence_rate"] == 0.25
res = segment.process(x, col_a="a", col_b="b")
Expand All @@ -88,12 +88,12 @@ def segment(self) -> Segment:
return WindowedCrossRQAStats("TestWindowedCrossRQAStats")

def test_cross_recurrence_measures(self, segment: Segment) -> None:
x = pd.DataFrame({"a": [1,1,2,2,1,1,1,1], "b": [3,3,2,2,3,3,2,2]})
x = pd.DataFrame({"a": [1, 1, 2, 2, 1, 1, 1, 1], "b": [3, 3, 2, 2, 3, 3, 2, 2]})
res = segment.process(x, col_a=0, col_b=1, window=4, step=2)
assert res.shape[0] == 3
assert res.loc[0, "recurrence_rate"] == 0.25
assert res.loc[1, "recurrence_rate"] == 0.25
assert res.loc[2, "recurrence_rate"] == 0.
assert res.loc[2, "recurrence_rate"] == 0.0


class TestCalcShift:
Expand All @@ -102,10 +102,10 @@ def segment(self) -> Segment:
return CalcShift("TestCalcShift")

def test_calc_shift(self, segment: Segment) -> None:
x = pd.DataFrame({"a": [1,1,2,2,1,1,1,1], "b": [3,3,2,2,3,3,2,2]})
x = pd.DataFrame({"a": [1, 1, 2, 2, 1, 1, 1, 1], "b": [3, 3, 2, 2, 3, 3, 2, 2]})
res = segment.process(x, cols=["a"], shift=2)
assert res.shape[1] == 3
assert (res["a_shift"].values == [0,0,1,1,-1,-1,0,0]).mean() == 1.0
assert (res["a_shift"].values == [0, 0, 1, 1, -1, -1, 0, 0]).mean() == 1.0


class TestSimpleGapFilling:
Expand All @@ -114,7 +114,7 @@ def segment(self) -> Segment:
return SimpleGapFilling("TestGapFilling")

def test_gap_filling(self, segment: Segment) -> None:
x = pd.DataFrame({"a": [1,1,2,np.nan,1,1,1,1], "b": [3,3,2,2,np.nan,np.nan,2,2]})
x = pd.DataFrame({"a": [1, 1, 2, np.nan, 1, 1, 1, 1], "b": [3, 3, 2, 2, np.nan, np.nan, 2, 2]})
res = segment.process(x)
assert res["a"][3] == 1.5
assert res["b"][4] == 2
Expand Down

0 comments on commit 6bda7fc

Please sign in to comment.