-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement sync_timestamps #28
Closed
Closed
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
b4a9028
Implement sync_timestamps
agricolab 860f44c
Extrapolate timestamps for sync
agricolab ead7568
Implement limit to overlap
agricolab 9a8692a
Write test for _sync_timestamps
agricolab ed8fe6d
Write test for _limit_to_overlap
agricolab f55ad95
Interpolate also integer channel_formats
agricolab b66c924
Write tests for overlapping synced streams
agricolab 22b4beb
Refactor tests and add extrapolation test
agricolab 94bcb11
Add docstrings to private functions
agricolab 2a60647
Move scipy import into sync_timestamp
agricolab 91547cb
Move interpolation into private function
agricolab cf8e3f7
Update docstrings for conditional interpolation
agricolab File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
from pyxdf.pyxdf import _sync_timestamps, _limit_streams_to_overlap | ||
import numpy as np | ||
# %% | ||
def streams(): | ||
#generate mock-streams | ||
class MockStream(dict): | ||
|
||
def __init__(self, timestamps, timeseries, effective_srate, channel_format): | ||
self['time_series'] = timeseries | ||
self['time_stamps'] = timestamps | ||
self['info'] = {} | ||
self['info']['effective_srate'] = effective_srate | ||
self['info']['channel_format'] = channel_format | ||
|
||
streams = {} | ||
# fastest stream, latest timestamp | ||
streams[1] = MockStream(np.linspace(1,2,1001), | ||
np.linspace(1,2,1001), | ||
1000, ['float32']) | ||
|
||
# slowest stream, earliest timestamp | ||
streams[2] = MockStream(np.linspace(0.5,1.5,251), | ||
np.linspace(.5, 1.5, 251), | ||
250, ['float32']) | ||
|
||
# marker stream | ||
streams[3] = MockStream([0.2, 1.1071, 1.2, 1.9, 2.5], | ||
['mark_' + str(n) for n in range(0,5,1)], | ||
0, ['string']) | ||
# integer datatype stream, | ||
streams[4] = MockStream(np.linspace(0.4,1.4,251), | ||
np.linspace(4, 140, 251, dtype='int32'), | ||
250, ['int32']) | ||
|
||
return streams | ||
|
||
def mock(): | ||
synced = _sync_timestamps(streams()) | ||
return _limit_streams_to_overlap(synced) | ||
|
||
#%% test | ||
# earliest overlapping timestamp is 1.0 from stream 1 | ||
# latest overlapping timestamp is 1.4 from stream 4 | ||
# fastest strteam is stream 1 with fs 1000 | ||
def test_timestamps(): | ||
'check timestamps streams' | ||
for s in mock().values(): | ||
assert np.all(np.isclose(s['time_stamps'], np.linspace(1, 1.4, 401))) | ||
|
||
def test_first_timeseries(): | ||
assert np.all(np.isclose(mock()[1]['time_series'], | ||
np.linspace(1, 1.4, 401))) | ||
|
||
def test_second_timeseries(): | ||
assert np.all(np.isclose(mock()[2]['time_series'], | ||
np.linspace(1, 1.4, 401))) | ||
|
||
def test_third_timeseries(): | ||
s = mock()[3]['time_series'] | ||
idx = np.where(s!='')[0] | ||
assert np.all(idx == [107, 200]) | ||
assert mock()[3]['time_stamps'][idx[0]] == 1.107 # shifted to closest fit | ||
assert mock()[3]['time_stamps'][idx[1]] == 1.2 # fits with fs 1000 | ||
|
||
def test_fourth_timeseries(): | ||
# interpolation is tricky, as it depends on np.around and linear | ||
# interpolation, which can not be approximated with np.linspace | ||
# we therefore only test first and last value of the series | ||
s = mock()[4]['time_series'] | ||
assert np.isclose(s[0], 85) | ||
assert np.isclose(s[-1], 140) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
from pyxdf.pyxdf import _limit_streams_to_overlap | ||
import numpy as np | ||
# %% | ||
def streams(): | ||
#generate mock-streams | ||
class MockStream(dict): | ||
|
||
def __init__(self, timestamps, timeseries, effective_srate, channel_format): | ||
self['time_series'] = timeseries | ||
self['time_stamps'] = timestamps | ||
self['info'] = {} | ||
self['info']['effective_srate'] = effective_srate | ||
self['info']['channel_format'] = channel_format | ||
|
||
streams = {} | ||
# fastest stream, latest timestamp | ||
streams[1] = MockStream(np.linspace(1,2,1001), | ||
np.linspace(1,2,1001), | ||
1000, ['float32']) | ||
|
||
# slowest stream, earliest timestamp | ||
streams[2] = MockStream(np.linspace(0.4,1.4,251), | ||
np.linspace(0.4,1.4,251), | ||
250, ['float32']) | ||
|
||
# marker stream | ||
streams[3] = MockStream([0.2, 1.1071, 1.2, 1.9, 2.5], | ||
['mark_' + str(n) for n in range(0,5,1)], | ||
0, ['string']) | ||
return streams | ||
|
||
# %% test | ||
|
||
def test_timestamps(): | ||
'test whether the first and last timestamps have been selected as expected' | ||
olap = _limit_streams_to_overlap(streams()) | ||
for s,v in zip(olap.values(), | ||
[(1, 1.4), (1, 1.4), (1.1071, 1.2)]): | ||
assert np.isclose(s['time_stamps'][0], v[0]) | ||
assert np.isclose(s['time_stamps'][-1], v[-1]) | ||
|
||
def test_timeseries(): | ||
'test whether the first and last value are as expected' | ||
olap = _limit_streams_to_overlap(streams()) | ||
for s,v in zip(olap.values(), | ||
[(1, 1.4), (1, 1.4), ('mark_1', 'mark_2')]): | ||
if s['info']['channel_format'] != ['string']: | ||
assert np.isclose(s['time_series'][0], v[0]) | ||
assert np.isclose(s['time_series'][-1], v[-1]) | ||
else: | ||
assert s['time_series'][0] == v[0] | ||
assert s['time_series'][-1] == v[-1] | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Streams are not guaranteed to start and stop at the same time. i.e. the first sample for the fastest stream might not come until several minutes into a recording. What do do then?
I think we can extrapolate timestamps (but not data!) assuming a constant interval.
So maybe for each stream we need to create a new timestamp vector that uses new_timestamps for the overlapping periods, and then extrapolates timestamps for the non-overlapping periods assuming even intervals.
For data processing streams that require all sources to have the same number of samples, we could then have a separate option to trim all streams to smallest overlapping regions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good points. I implemented that new_timestamps is now based on an extrapolation to the earliest and latest timestamp of any stream using the timestamps of the fastest stream as seed. An alternative would have been to generate a completely new timestamp vector based only on start and endpoint and an arbitrary sampling rate. But i did want to reduce the amount of interpolation necessary, and not implement downsampling. Additionally, i added a function to limit all streams to overlapping time-periods. The latter is supposed to work independently of whether timestamps have been synced before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expanded interpolation to also deal with integer values. If a channels format is any integer type, interpolation now enforces integers as output with np.around. I also improved the coumentation, added unit tests to be run with pytest, and fixed a couple of bugs detected with the tests. Feel more or less finished, and am open for feedback.