diff --git a/Python/pyxdf/pyxdf.py b/Python/pyxdf/pyxdf.py index 57cdc2e..23471ce 100644 --- a/Python/pyxdf/pyxdf.py +++ b/Python/pyxdf/pyxdf.py @@ -12,7 +12,6 @@ import xml.etree.ElementTree as ET from collections import OrderedDict, defaultdict import logging - import numpy as np __all__ = ['load_xdf'] @@ -26,6 +25,8 @@ def load_xdf(filename, synchronize_clocks=True, handle_clock_resets=True, dejitter_timestamps=True, + sync_timestamps=False, + overlap_timestamps=False, jitter_break_threshold_seconds=1, jitter_break_threshold_samples=500, clock_reset_threshold_seconds=5, @@ -55,6 +56,28 @@ def load_xdf(filename, dejitter_timestamps : Whether to perform jitter removal for regularly sampled streams. (default: true) + sync_timestamps: {bool str} + sync timestamps of all streams sample-wise with the stream to the + highest effective sampling rate. Using sync_timestamps with any + method other than linear has dependency on scipy, which is not a + hard requirement of pyxdf. If scipy is not installed in your + environment, the method supports linear interpolation with + numpy. + + False -> no syncing + True -> linear syncing + str:<'linear’, ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, + ‘previous’, ‘next’> for method inherited from + scipy.interpolate.interp1d. + + overlap_timestamps: bool + If true, return only overlapping streams, i.e. all streams + are limited to periods where all streams have data. (default: True) + If false, depends on whether sync_timestamps is set. If set, it + expands all streams to include the earliest and latest timestamp of + any stream, if not set it simply return streams independently. + + on_chunk : Function that is called for each chunk of data as it is being retrieved from the file; the function is allowed to modify the data (for example, sub-sample it). The four input arguments @@ -346,7 +369,8 @@ def __init__(self, xml): else: stream.time_series = np.zeros((stream.nchns, 0)) - # perform (fault-tolerant) clock synchronization if requested + + # perform (fault-tolerant) clock synchronization if requested if synchronize_clocks: logger.info(' performing clock synchronization...') temp = _clock_sync(temp, handle_clock_resets, @@ -372,7 +396,21 @@ def __init__(self, xml): stream['info']['effective_srate'] = tmp.effective_srate stream['time_series'] = tmp.time_series stream['time_stamps'] = tmp.time_stamps - + + + # sync sampling with the fastest timeseries by interpolation / shifting + if sync_timestamps: + if type(sync_timestamps) is not str: + sync_timestamps = 'linear' + logger.warning('sync_timestamps defaults to "linear"') + streams = _sync_timestamps(streams, kind=sync_timestamps) + + + # limit streams to their overlapping periods + if overlap_timestamps: + streams = _limit_streams_to_overlap(streams) + + streams = [s for s in streams.values()] sort_data = [s['info']['name'][0] for s in streams] streams = [x for _, x in sorted(zip(sort_data, streams))] @@ -552,6 +590,171 @@ def _jitter_removal(streams, stream.effective_srate = 0 return streams +def _interpolate(x:np.ndarray, y:np.ndarray, new_x:np.ndarray, + kind='linear') -> np.ndarray: + try: + from scipy.interpolate import interp1d + f = interp1d(x, y, kind=kind, axis=0, + assume_sorted=True, #speed up + bounds_error=False) + return f(new_x) + except ImportError as e: + if kind != 'linear': + raise e + else: + return np.interp(new_x, xp=x, fp=y, left=np.NaN, right=np.NaN) + +def _sync_timestamps(streams, kind='linear'): + '''syncs all streams to the fastest sampling rate by shifting or + upsampling + + Depending on a streams channel-format, extrapolation is performed using + with NaNs (numerical formats) or with [''] (string format). + + Interpolation is only performed for numeric values, and depending on the + kind argument which is inherited from scipy.interpolate.interp1d. Consider + that If the channel format is an integer type (i.e. 'int8', 'int16', + 'int32', or 'int64'), integer output is enforced by rounding the values. + Additionally, consider that not all interpolation methods are convex, i.e. + for some kinds, you might receive values above or below the desired + integer type. There is no correction implemented for this, as it is assumed + this is a desired behavior if you give the appropriate argument. + + For string formats, events are shifted towards the nearest feasible + timepoint. Any time-stamps without a marker get then assigned an empty + marker, i.e. ['']. + ''' + # selecting the stream with the highest effective sampling rate + srate_key = 'effective_srate' + srates = [stream['info'][srate_key] for stream in streams.values()] + max_fs = max(srates, default=0) + + if max_fs == 0: #either no valid stream or all streams are async + return streams + if srates.count(max_fs) > 1: + # highly unlikely, with floating point precision and sampling noise + # but anyways: better safe than sorry + logger.warning('I found at least two streams with identical effective ' + 'srate. I select one at random for syncing timestamps.') + + # selecting maximal time range of the whole recording + # streams with fs=0 might are not dejittered be default, and therefore + # indexing first and last might miss the earliest/latest + # we therefore take the min and max timestamp + stamps = [stream['time_stamps'] for stream in streams.values()] + ts_first = min((min(s) for s in stamps)) + ts_last = max((max(s) for s in stamps)) + + # generate new timestamps + # based on extrapolation of the fastest timestamps towards the maximal + # time range of the whole recording + fs_step = 1.0/max_fs + new_timestamps = stamps[srates.index(max_fs)] + num_steps = int((new_timestamps[0]-ts_first)/fs_step) + 1 + front_stamps = np.linspace(ts_first, new_timestamps[0], num_steps) + num_steps = int((ts_last-new_timestamps[-1])/fs_step) + 1 + end_stamps = np.linspace(new_timestamps[-1], ts_last, num_steps) + + new_timestamps = np.concatenate((front_stamps, + new_timestamps[1:-1], + end_stamps), + axis=0) + + # interpolate or shift all streams to the new timestamps + for stream in streams.values(): + channel_format = stream['info']['channel_format'][0] + + if ( (channel_format == 'string') and + (stream['info'][srate_key]== 0) ): + # you can't really interpolate strings; and streams with srate=0 + # don't have a real sampling rate. One approach to sync them is to + # shift their events to the nearest timestamp of the new + # timestamps + shifted_x = [] + for x in stream['time_stamps']: + argmin = (abs(new_timestamps-x)).argmin() + shifted_x.append(new_timestamps[argmin]) + + shifted_y = [] + for x in new_timestamps: + try: + idx = shifted_x.index(x) + y = stream['time_series'][idx] + shifted_y.append([y]) + except ValueError: + shifted_y.append(['']) + + stream['time_series'] = np.asanyarray((shifted_y)) + stream['time_stamps'] = new_timestamps + + elif channel_format in ['float32', 'double64', 'int8', 'int16', + 'int32', 'int64']: + # continuous interpolation is possible using interp1d + # discrete interpolation requires some finetuning + # bounds_error=False replaces everything outside of interpolation + # zone with NaNs + y = stream['time_series'] + x = stream['time_stamps'] + + stream['time_series'] = _interpolate(x, y, new_timestamps, kind) + stream['time_stamps'] = new_timestamps + + if channel_format in ['int8', 'int16', 'int32', 'int64']: + # i am stuck with float64s, as integers have no nans + # therefore i round to the nearest integer instead + stream['time_series'] = np.around(stream['time_series'], 0) + + + else: + raise NotImplementedError("Don't know how to sync sampling for " + + 'channel_format={}'.format( + channel_format)) + stream['info']['effective_srate'] = max_fs + + return streams + + +def _limit_streams_to_overlap(streams): + '''takes streams, returns streams limited to time periods overlapping + between all streams + + The overlapping periods start and end for each streams with the first and + last sample completely within the overlapping period. + If time_stamps have been synced, these are the same time-points for all + streams. Consider that in the case of unsynced time-stamps, the time-stamps + can not be exactly equal! + ''' + ts_first, ts_last = [], [] + for stream in streams.values(): + # skip streams with fs=0 or if they send strings, because they might + # just not yet have send anything on purpose (i.e. markers) + # while other data was already being recorded. + if (stream['info']['effective_srate'] !=0 and + stream['info']['channel_format'][0] != 'string'): + # extrapolation in _sync_timestamps is done with NaNs + not_extrapolated = np.where(~np.isnan(stream['time_series']))[0] + ts_first.append(min(stream['time_stamps'][not_extrapolated])) + ts_last.append(max(stream['time_stamps'][not_extrapolated])) + + ts_first = max(ts_first) + ts_last = min(ts_last) + for stream in streams.values(): + # use np.around to prevent floating point hickups + around = np.around(stream['time_stamps'], 15) + a = np.where(around>=ts_first)[0] + b = np.where(around<=ts_last)[0] + select = np.intersect1d(a,b) + if type(stream['time_stamps']) is list: + stream['time_stamps'] = [stream['time_stamps'][s] for s in select] + else: + stream['time_stamps'] = stream['time_stamps'][select] + + if type(stream['time_series']) is list: + stream['time_series'] = [stream['time_series'][s] for s in select] + else: + stream['time_series'] = stream['time_series'][select] + + return streams # noinspection PyTypeChecker def _robust_fit(A, y, rho=1, iters=1000): diff --git a/Python/pyxdf/test/test_limit_on_synced_stamps.py b/Python/pyxdf/test/test_limit_on_synced_stamps.py new file mode 100644 index 0000000..514473b --- /dev/null +++ b/Python/pyxdf/test/test_limit_on_synced_stamps.py @@ -0,0 +1,71 @@ +from pyxdf.pyxdf import _sync_timestamps, _limit_streams_to_overlap +import numpy as np +# %% +def streams(): + #generate mock-streams + class MockStream(dict): + + def __init__(self, timestamps, timeseries, effective_srate, channel_format): + self['time_series'] = timeseries + self['time_stamps'] = timestamps + self['info'] = {} + self['info']['effective_srate'] = effective_srate + self['info']['channel_format'] = channel_format + + streams = {} + # fastest stream, latest timestamp + streams[1] = MockStream(np.linspace(1,2,1001), + np.linspace(1,2,1001), + 1000, ['float32']) + + # slowest stream, earliest timestamp + streams[2] = MockStream(np.linspace(0.5,1.5,251), + np.linspace(.5, 1.5, 251), + 250, ['float32']) + + # marker stream + streams[3] = MockStream([0.2, 1.1071, 1.2, 1.9, 2.5], + ['mark_' + str(n) for n in range(0,5,1)], + 0, ['string']) + # integer datatype stream, + streams[4] = MockStream(np.linspace(0.4,1.4,251), + np.linspace(4, 140, 251, dtype='int32'), + 250, ['int32']) + + return streams + +def mock(): + synced = _sync_timestamps(streams()) + return _limit_streams_to_overlap(synced) + +#%% test +# earliest overlapping timestamp is 1.0 from stream 1 +# latest overlapping timestamp is 1.4 from stream 4 +# fastest strteam is stream 1 with fs 1000 +def test_timestamps(): + 'check timestamps streams' + for s in mock().values(): + assert np.all(np.isclose(s['time_stamps'], np.linspace(1, 1.4, 401))) + +def test_first_timeseries(): + assert np.all(np.isclose(mock()[1]['time_series'], + np.linspace(1, 1.4, 401))) + +def test_second_timeseries(): + assert np.all(np.isclose(mock()[2]['time_series'], + np.linspace(1, 1.4, 401))) + +def test_third_timeseries(): + s = mock()[3]['time_series'] + idx = np.where(s!='')[0] + assert np.all(idx == [107, 200]) + assert mock()[3]['time_stamps'][idx[0]] == 1.107 # shifted to closest fit + assert mock()[3]['time_stamps'][idx[1]] == 1.2 # fits with fs 1000 + +def test_fourth_timeseries(): + # interpolation is tricky, as it depends on np.around and linear + # interpolation, which can not be approximated with np.linspace + # we therefore only test first and last value of the series + s = mock()[4]['time_series'] + assert np.isclose(s[0], 85) + assert np.isclose(s[-1], 140) \ No newline at end of file diff --git a/Python/pyxdf/test/test_limit_streams_to_overlap.py b/Python/pyxdf/test/test_limit_streams_to_overlap.py new file mode 100644 index 0000000..791cc10 --- /dev/null +++ b/Python/pyxdf/test/test_limit_streams_to_overlap.py @@ -0,0 +1,53 @@ +from pyxdf.pyxdf import _limit_streams_to_overlap +import numpy as np +# %% +def streams(): + #generate mock-streams + class MockStream(dict): + + def __init__(self, timestamps, timeseries, effective_srate, channel_format): + self['time_series'] = timeseries + self['time_stamps'] = timestamps + self['info'] = {} + self['info']['effective_srate'] = effective_srate + self['info']['channel_format'] = channel_format + + streams = {} + # fastest stream, latest timestamp + streams[1] = MockStream(np.linspace(1,2,1001), + np.linspace(1,2,1001), + 1000, ['float32']) + + # slowest stream, earliest timestamp + streams[2] = MockStream(np.linspace(0.4,1.4,251), + np.linspace(0.4,1.4,251), + 250, ['float32']) + + # marker stream + streams[3] = MockStream([0.2, 1.1071, 1.2, 1.9, 2.5], + ['mark_' + str(n) for n in range(0,5,1)], + 0, ['string']) + return streams + +# %% test + +def test_timestamps(): + 'test whether the first and last timestamps have been selected as expected' + olap = _limit_streams_to_overlap(streams()) + for s,v in zip(olap.values(), + [(1, 1.4), (1, 1.4), (1.1071, 1.2)]): + assert np.isclose(s['time_stamps'][0], v[0]) + assert np.isclose(s['time_stamps'][-1], v[-1]) + +def test_timeseries(): + 'test whether the first and last value are as expected' + olap = _limit_streams_to_overlap(streams()) + for s,v in zip(olap.values(), + [(1, 1.4), (1, 1.4), ('mark_1', 'mark_2')]): + if s['info']['channel_format'] != ['string']: + assert np.isclose(s['time_series'][0], v[0]) + assert np.isclose(s['time_series'][-1], v[-1]) + else: + assert s['time_series'][0] == v[0] + assert s['time_series'][-1] == v[-1] + diff --git a/Python/pyxdf/test/test_sync_timestamps.py b/Python/pyxdf/test/test_sync_timestamps.py new file mode 100644 index 0000000..fd00602 --- /dev/null +++ b/Python/pyxdf/test/test_sync_timestamps.py @@ -0,0 +1,128 @@ +from pyxdf.pyxdf import _sync_timestamps +import numpy as np +from copy import deepcopy +import pytest +# %% +@pytest.fixture(scope='session') +def streams(): + #generate mock-streams + class MockStream(dict): + + def __init__(self, timestamps, timeseries, effective_srate, channel_format): + self['time_series'] = timeseries + self['time_stamps'] = timestamps + self['info'] = {} + self['info']['effective_srate'] = effective_srate + self['info']['channel_format'] = channel_format + + streams = {} + # fastest stream, latest timestamp + streams[1] = MockStream(np.linspace(1,2,1001), + np.linspace(1,2,1001), + 1000, ['float32']) + + # slowest stream, earliest timestamp + streams[2] = MockStream(np.linspace(0.1,1.1,251), + np.linspace(2,1,251), + 250, ['float32']) + + # marker stream + streams[3] = MockStream([0.2, 1.1071, 1.2, 1.9, 2.5], + ['mark_' + str(n) for n in range(0,5,1)], + 0, ['string']) + # integer datatype stream, + streams[4] = MockStream(np.linspace(0.1,1.1,251), + np.linspace(0, 250, 251, dtype='int32'), + 250, ['int32']) + + return streams + +@pytest.fixture(scope='session') +def synced(streams): + synced = _sync_timestamps(deepcopy(streams)) + return synced + +#%% test +def test_samplingrate(synced): + 'check that for all streams the sampling rate is identical to the fastest' + for s in synced.values(): + assert(s['info']['effective_srate']==1000) + +def test_timestamps(synced): + '''test whether all timestamps are identical and as expected + earliest timestamp is 0.1 from stream 1,4; latest is 2.5 from stream 3''' + for s in synced.values(): + assert np.all(np.isclose(s['time_stamps'], np.linspace(.1, 2.5, 2401))) + +def test_identical_timestamp_steps(synced): + 'all steps between samples should be identical within float precision' + for stream in synced.values(): + uq = np.diff(stream['time_stamps']) + assert np.all([np.isclose(u, uq) for u in uq]) + +def test_markerstream(synced, streams): + '''check markers, they should be identical to the original but the second, + which should be shifted towards the next valid sample of the fastest + stream''' + idx = [] + for marker in streams[3]['time_series']: + idx.append(synced[3]['time_series'].tolist().index([marker])) + assert np.all(np.equal(synced[3]['time_stamps'][idx], + np.array([0.2 , #identical + 1.107, #shifted + 1.2 , 1.9 , 2.5 ]#shifted + ))) + +def test_interpolation(synced): + '''the linearly to 1000 Hz interpolated data from stream 2 should + be identical with a linspace with 1001 samples.''' + idx = np.where(~np.isnan(synced[2]['time_series']))[0] + assert np.all(np.isclose(synced[2]['time_series'][idx], + np.linspace(2, 1, 1001))) + +def test_extrapolation(synced, streams): + '''extrapolated data should be nans or "" for markers ''' + # stream 1 is extrapolated from 0.1 up to 1 and after 2 to 2.5 + idx = np.where(np.isnan(synced[1]['time_series']))[0] + values = synced[1]['time_stamps'][idx] + expectation = np.hstack((np.linspace(.1, 1, 900, endpoint=False), + np.linspace(2.001, 2.5, 500, endpoint=True))) + assert np.all(np.isclose(values, expectation)) + + # stream 2 is extrapolated from after 1.1 to 2.5 + idx = np.where(np.isnan(synced[2]['time_series']))[0] + values = synced[2]['time_stamps'][idx] + expectation = np.linspace(1.101, 2.5, num=1400, endpoint=True) + assert np.all(np.isclose(values, expectation)) + + # stream 3 is extrapolated everywhere but at the marker stamps + # can be tested easier by checking whether the count of [''] + # equals the total samples minus the original markers + values = np.where(synced[3]['time_series']==[''])[0].shape[0] + expectation = (synced[3]['time_series'].shape[0] - + len(streams[3]['time_series'])) + assert values == expectation + + # stream 4 is extrapolated from after 1.1 to 2.5 + idx = np.where(np.isnan(synced[4]['time_series']))[0] + values = synced[4]['time_stamps'][idx] + expectation = np.linspace(1.101, 2.5, num=1400, endpoint=True) + assert np.all(np.isclose(values, expectation)) + +def test_identity_after_interpolation(synced, streams): + 'the data for stream 1 should be identical to original' + idx = np.where(~np.isnan(synced[1]['time_series']))[0] + assert np.all(np.isclose(synced[1]['time_series'][idx], + streams[1]['time_series'])) + +def test_integer_interpolation(synced, streams): + '''the data of stream 4 should be all integers. + + .. note:: + interpolation is tricky, as it depends on np.around and linear + interpolation, which can not be approximated with np.linspace + ''' + u = np.unique(synced[4]['time_series']) + u = np.int64(np.compress(~np.isnan(u), u)) + assert np.all(streams[4]['time_series'] == u) + \ No newline at end of file