Skip to content

Commit

Permalink
Always segment at non-monotonic timestamps in _jitter_removal
Browse files Browse the repository at this point in the history
* Segment when timestamp intervals exceed threshold (previous
  behaviour)

* Always segment at non-positive timestamp intervals (irrespective of
  threshold)

* Additional tests for non-monotonicity
  • Loading branch information
jamieforth committed Dec 8, 2024
1 parent 0e41c5e commit 3d668f1
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 12 deletions.
4 changes: 2 additions & 2 deletions src/pyxdf/pyxdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,9 +629,9 @@ def _detect_breaks(stream, threshold_seconds=1.0, threshold_samples=500):
"""Detect breaks in the time_stamps of a stream."""
# Identify breaks in the time_stamps
diffs = np.diff(stream.time_stamps)
b_breaks = np.abs(diffs) > np.max(
b_breaks = (diffs <= 0) | (diffs > np.max(
(threshold_seconds, threshold_samples * stream.tdiff)
)
))
# find indices (+ 1 to compensate for lost sample in np.diff)
break_inds = np.where(b_breaks)[0] + 1
return break_inds
Expand Down
80 changes: 70 additions & 10 deletions test/test_jitter_removal.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import pytest
from pyxdf.pyxdf import _detect_breaks


Expand All @@ -8,44 +7,105 @@ def __init__(self, time_stamps, tdiff):
self.tdiff = tdiff


def test_detect_no_breaks():
def test_detect_no_breaks_seconds():
timestamps = list(range(-5, 5))
stream = MockStreamData(timestamps, 1)
# if diff > 2 and larger 500 * tdiff -> 0
breaks = _detect_breaks(stream, threshold_seconds=2, threshold_samples=500)
# if diff > 2 and larger 0 * tdiff -> 0
breaks = _detect_breaks(stream, threshold_seconds=2, threshold_samples=0)
assert breaks.size == 0
# if diff > 0.1 and larger 1 * tdiff -> 0
breaks = _detect_breaks(stream, threshold_seconds=0.1, threshold_samples=1)
# if diff > 1 and larger 0 * tdiff -> 0
breaks = _detect_breaks(stream, threshold_seconds=1, threshold_samples=0)
assert breaks.size == 0


def test_detect_breaks_reverse():
timestamps = list(reversed(range(-5, 5)))
def test_detect_no_breaks_samples():
timestamps = list(range(-5, 5))
stream = MockStreamData(timestamps, 1)
# if diff > 0 and larger 2 * tdiff -> 0
breaks = _detect_breaks(stream, threshold_seconds=0, threshold_samples=2)
assert breaks.size == 0
# if diff > 0 and larger 1 * tdiff -> 0
breaks = _detect_breaks(stream, threshold_seconds=0, threshold_samples=1)
assert breaks.size == 0


def test_detect_breaks_seconds():
timestamps = list(range(-5, 5, 2))
stream = MockStreamData(timestamps, 1)
# if diff > 1 and larger 0 * tdiff -> 4
breaks = _detect_breaks(stream, threshold_seconds=0.1, threshold_samples=0)
assert breaks.size == len(timestamps) - 1


def test_detect_breaks_samples():
timestamps = list(range(-5, 5, 2))
stream = MockStreamData(timestamps, 1)
# if diff > 0 and larger 1 * tdiff -> 4
breaks = _detect_breaks(stream, threshold_seconds=0, threshold_samples=1)
assert breaks.size == len(timestamps) - 1


def test_detect_breaks_gap_in_negative():
timestamps = [-4, 1, 2, 3, 4]
stream = MockStreamData(timestamps, 1)
# if diff > 1 and larger 1 * tdiff -> 1
breaks = _detect_breaks(stream, threshold_seconds=1, threshold_samples=1)
assert breaks.size == 1
assert breaks[0] == 1
timestamps = [-4, -2, -1, 0, 1, 2, 3, 4]
stream = MockStreamData(timestamps, 1)
# if diff > 1 and larger 1 * tdiff -> 1
breaks = _detect_breaks(stream, threshold_seconds=1, threshold_samples=1)
assert breaks.size == 1
assert breaks[0] == 1
# if diff > 0.1 and larger 0 * tdiff -> 7
breaks = _detect_breaks(stream, threshold_seconds=0.1, threshold_samples=0)
assert breaks.size == len(timestamps) - 1


def test_detect_breaks_gap_in_positive():
timestamps = [1, 3, 4, 5, 6]
stream = MockStreamData(timestamps, 1)
# if diff > 1 and larger 1 * tdiff -> 1 -> 1
# if diff > 1 and larger 1 * tdiff -> 1
breaks = _detect_breaks(stream, threshold_seconds=1, threshold_samples=1)
assert breaks.size == 1
assert breaks[0] == 1
# if diff > 0.1 and larger 0 * tdiff ->
# if diff > 0.1 and larger 0 * tdiff -> 4
breaks = _detect_breaks(stream, threshold_seconds=0.1, threshold_samples=0)
assert breaks.size == len(timestamps) - 1


def test_detect_breaks_reverse():
timestamps = list(reversed(range(-5, 5)))
stream = MockStreamData(timestamps, 1)
# if diff <= 0 -> 9
breaks = _detect_breaks(stream, threshold_seconds=0, threshold_samples=0)
assert breaks.size == len(timestamps) - 1


def test_detect_breaks_gaps_non_monotonic():
timestamps = [-4, 1, -3, -2, -1, 1, 5, 1, 2]
stream = MockStreamData(timestamps, 1)
# if diff <= 0 or diff > 1 and larger 1 * tdiff -> 5
breaks = _detect_breaks(stream, threshold_seconds=1, threshold_samples=1)
assert list(breaks) == [1, 2, 5, 6, 7]
# if diff <= 0 or diff > 2 and larger 1 * tdiff -> 4
breaks = _detect_breaks(stream, threshold_seconds=2, threshold_samples=1)
assert list(breaks) == [1, 2, 6, 7]
# if diff <= 0 or diff > 0.1 and larger 0 * tdiff -> 8
breaks = _detect_breaks(stream, threshold_seconds=0.1, threshold_samples=0)
assert breaks.size == len(timestamps) - 1


def test_detect_breaks_strict_non_monotonic():
timestamps = [-4, -5, -3, -2, -1, 0, 0, 1, 2]
stream = MockStreamData(timestamps, 1)
# if diff <= 0 or diff > 1 and larger 1 * tdiff -> 3
breaks = _detect_breaks(stream, threshold_seconds=1, threshold_samples=1)
assert list(breaks) == [1, 2, 6]
# if diff <= 0 or diff > 2 and larger 2 * tdiff -> 2
breaks = _detect_breaks(stream, threshold_seconds=2, threshold_samples=2)
assert list(breaks) == [1, 6]
# if diff <= 0 or diff > 0.1 and larger 0 * tdiff -> 8
breaks = _detect_breaks(stream, threshold_seconds=0.1, threshold_samples=0)
assert breaks.size == len(timestamps) - 1

0 comments on commit 3d668f1

Please sign in to comment.