Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement sync_timestamps #28

Closed
wants to merge 12 commits into from
Prev Previous commit
Next Next commit
Interpolate also integer channel_formats
  • Loading branch information
agricolab committed Dec 20, 2018
commit f55ad95874e67bb8dee67e808ee6d3543b8142a0
16 changes: 14 additions & 2 deletions Python/pyxdf/pyxdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,9 @@ def _jitter_removal(streams,


def _sync_timestamps(streams, kind='linear'):
'''syncs all streams to the fastest sampling rate by shifting or
upsampling
'''

# selecting the stream with the highest effective sampling rate
srate_key = 'effective_srate'
Expand Down Expand Up @@ -653,8 +656,10 @@ def _sync_timestamps(streams, kind='linear'):
stream['time_series'] = np.asanyarray((shifted_y))
stream['time_stamps'] = new_timestamps

elif channel_format == 'float32':
#a float32 can be interpolated with interp1d
elif channel_format in ['float32', 'double64', 'int8', 'int16',
'int32', 'int64']:
# continuous interpolation is possible using interp1d
# discrete interpolation requires some finetuning
# bounds_error=False replaces everything outside of interpolation
# zone with NaNs
y = stream['time_series']
Expand All @@ -666,6 +671,13 @@ def _sync_timestamps(streams, kind='linear'):
stream['time_series'] = f(new_timestamps)
stream['time_stamps'] = new_timestamps
stream['info']['effective_srate'] = max_fs

if channel_format in ['int8', 'int16', 'int32', 'int64']:
# i am stuck with float64s, as integers have no nans
# therefore i round to the nearest integer instead
stream['time_series'] = np.around(stream['time_series'], 0)


else:
raise NotImplementedError("Don't know how to sync sampling for " +
'channel_format={}'.format(
Expand Down
14 changes: 13 additions & 1 deletion Python/pyxdf/test/test_sync_timestamps.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ def __init__(self, timestamps, timeseries, effective_srate, channel_format):
streams[3] = MockStream([0.2, 1.1071, 1.2, 1.9, 2.5],
['mark_' + str(n) for n in range(0,5,1)],
0, ['string'])
# integer datatype stream,
streams[4] = MockStream(np.linspace(0.1,1.1,251),
np.linspace(0, 250, 251, dtype='int32'),
250, ['int32'])

return streams

@pytest.fixture(scope='session')
Expand Down Expand Up @@ -70,9 +75,16 @@ def test_interpolation(synced):
idx = np.where(~np.isnan(synced[2]['time_series']))[0]
assert np.all(np.isclose(synced[2]['time_series'][idx],
np.linspace(2, 1, 1001)))



def test_identity_after_interpolation(synced, streams):
'the data for stream 1 should be identical to original'
idx = np.where(~np.isnan(synced[1]['time_series']))[0]
assert np.all(np.isclose(synced[1]['time_series'][idx],
streams[1]['time_series']))

def test_integer_interpolation(synced, streams):
'the data of stream 4 should be all integers'
u = np.unique(synced[4]['time_series'])
u = np.int64(np.compress(~np.isnan(u), u))
assert np.all(streams[4]['time_series'] == u)