Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement sync_timestamps #28

Closed
wants to merge 12 commits into from
209 changes: 206 additions & 3 deletions Python/pyxdf/pyxdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import xml.etree.ElementTree as ET
from collections import OrderedDict, defaultdict
import logging

import numpy as np

__all__ = ['load_xdf']
Expand All @@ -26,6 +25,8 @@ def load_xdf(filename,
synchronize_clocks=True,
handle_clock_resets=True,
dejitter_timestamps=True,
sync_timestamps=False,
overlap_timestamps=False,
jitter_break_threshold_seconds=1,
jitter_break_threshold_samples=500,
clock_reset_threshold_seconds=5,
Expand Down Expand Up @@ -55,6 +56,28 @@ def load_xdf(filename,
dejitter_timestamps : Whether to perform jitter removal for regularly
sampled streams. (default: true)

sync_timestamps: {bool str}
sync timestamps of all streams sample-wise with the stream to the
highest effective sampling rate. Using sync_timestamps with any
method other than linear has dependency on scipy, which is not a
hard requirement of pyxdf. If scipy is not installed in your
environment, the method supports linear interpolation with
numpy.

False -> no syncing
True -> linear syncing
str:<'linear’, ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’,
‘previous’, ‘next’> for method inherited from
scipy.interpolate.interp1d.

overlap_timestamps: bool
If true, return only overlapping streams, i.e. all streams
are limited to periods where all streams have data. (default: True)
If false, depends on whether sync_timestamps is set. If set, it
expands all streams to include the earliest and latest timestamp of
any stream, if not set it simply return streams independently.


on_chunk : Function that is called for each chunk of data as it is
being retrieved from the file; the function is allowed to modify
the data (for example, sub-sample it). The four input arguments
Expand Down Expand Up @@ -346,7 +369,8 @@ def __init__(self, xml):
else:
stream.time_series = np.zeros((stream.nchns, 0))

# perform (fault-tolerant) clock synchronization if requested

# perform (fault-tolerant) clock synchronization if requested
if synchronize_clocks:
logger.info(' performing clock synchronization...')
temp = _clock_sync(temp, handle_clock_resets,
Expand All @@ -372,7 +396,21 @@ def __init__(self, xml):
stream['info']['effective_srate'] = tmp.effective_srate
stream['time_series'] = tmp.time_series
stream['time_stamps'] = tmp.time_stamps



# sync sampling with the fastest timeseries by interpolation / shifting
if sync_timestamps:
if type(sync_timestamps) is not str:
sync_timestamps = 'linear'
logger.warning('sync_timestamps defaults to "linear"')
streams = _sync_timestamps(streams, kind=sync_timestamps)


# limit streams to their overlapping periods
if overlap_timestamps:
streams = _limit_streams_to_overlap(streams)


streams = [s for s in streams.values()]
sort_data = [s['info']['name'][0] for s in streams]
streams = [x for _, x in sorted(zip(sort_data, streams))]
Expand Down Expand Up @@ -552,6 +590,171 @@ def _jitter_removal(streams,
stream.effective_srate = 0
return streams

def _interpolate(x:np.ndarray, y:np.ndarray, new_x:np.ndarray,
kind='linear') -> np.ndarray:
try:
from scipy.interpolate import interp1d
f = interp1d(x, y, kind=kind, axis=0,
assume_sorted=True, #speed up
bounds_error=False)
return f(new_x)
except ImportError as e:
if kind != 'linear':
raise e
else:
return np.interp(new_x, xp=x, fp=y, left=np.NaN, right=np.NaN)

def _sync_timestamps(streams, kind='linear'):
'''syncs all streams to the fastest sampling rate by shifting or
upsampling

Depending on a streams channel-format, extrapolation is performed using
with NaNs (numerical formats) or with [''] (string format).

Interpolation is only performed for numeric values, and depending on the
kind argument which is inherited from scipy.interpolate.interp1d. Consider
that If the channel format is an integer type (i.e. 'int8', 'int16',
'int32', or 'int64'), integer output is enforced by rounding the values.
Additionally, consider that not all interpolation methods are convex, i.e.
for some kinds, you might receive values above or below the desired
integer type. There is no correction implemented for this, as it is assumed
this is a desired behavior if you give the appropriate argument.

For string formats, events are shifted towards the nearest feasible
timepoint. Any time-stamps without a marker get then assigned an empty
marker, i.e. [''].
'''
# selecting the stream with the highest effective sampling rate
srate_key = 'effective_srate'
srates = [stream['info'][srate_key] for stream in streams.values()]
max_fs = max(srates, default=0)

if max_fs == 0: #either no valid stream or all streams are async
return streams
if srates.count(max_fs) > 1:
# highly unlikely, with floating point precision and sampling noise
# but anyways: better safe than sorry
logger.warning('I found at least two streams with identical effective '
'srate. I select one at random for syncing timestamps.')

# selecting maximal time range of the whole recording
# streams with fs=0 might are not dejittered be default, and therefore
# indexing first and last might miss the earliest/latest
# we therefore take the min and max timestamp
stamps = [stream['time_stamps'] for stream in streams.values()]
ts_first = min((min(s) for s in stamps))
ts_last = max((max(s) for s in stamps))

# generate new timestamps
# based on extrapolation of the fastest timestamps towards the maximal
# time range of the whole recording
fs_step = 1.0/max_fs
new_timestamps = stamps[srates.index(max_fs)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streams are not guaranteed to start and stop at the same time. i.e. the first sample for the fastest stream might not come until several minutes into a recording. What do do then?
I think we can extrapolate timestamps (but not data!) assuming a constant interval.
So maybe for each stream we need to create a new timestamp vector that uses new_timestamps for the overlapping periods, and then extrapolates timestamps for the non-overlapping periods assuming even intervals.

For data processing streams that require all sources to have the same number of samples, we could then have a separate option to trim all streams to smallest overlapping regions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points. I implemented that new_timestamps is now based on an extrapolation to the earliest and latest timestamp of any stream using the timestamps of the fastest stream as seed. An alternative would have been to generate a completely new timestamp vector based only on start and endpoint and an arbitrary sampling rate. But i did want to reduce the amount of interpolation necessary, and not implement downsampling. Additionally, i added a function to limit all streams to overlapping time-periods. The latter is supposed to work independently of whether timestamps have been synced before.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expanded interpolation to also deal with integer values. If a channels format is any integer type, interpolation now enforces integers as output with np.around. I also improved the coumentation, added unit tests to be run with pytest, and fixed a couple of bugs detected with the tests. Feel more or less finished, and am open for feedback.

num_steps = int((new_timestamps[0]-ts_first)/fs_step) + 1
front_stamps = np.linspace(ts_first, new_timestamps[0], num_steps)
num_steps = int((ts_last-new_timestamps[-1])/fs_step) + 1
end_stamps = np.linspace(new_timestamps[-1], ts_last, num_steps)

new_timestamps = np.concatenate((front_stamps,
new_timestamps[1:-1],
end_stamps),
axis=0)

# interpolate or shift all streams to the new timestamps
for stream in streams.values():
channel_format = stream['info']['channel_format'][0]

if ( (channel_format == 'string') and
(stream['info'][srate_key]== 0) ):
# you can't really interpolate strings; and streams with srate=0
# don't have a real sampling rate. One approach to sync them is to
# shift their events to the nearest timestamp of the new
# timestamps
shifted_x = []
for x in stream['time_stamps']:
argmin = (abs(new_timestamps-x)).argmin()
shifted_x.append(new_timestamps[argmin])

shifted_y = []
for x in new_timestamps:
try:
idx = shifted_x.index(x)
y = stream['time_series'][idx]
shifted_y.append([y])
except ValueError:
shifted_y.append([''])

stream['time_series'] = np.asanyarray((shifted_y))
stream['time_stamps'] = new_timestamps

elif channel_format in ['float32', 'double64', 'int8', 'int16',
'int32', 'int64']:
# continuous interpolation is possible using interp1d
# discrete interpolation requires some finetuning
# bounds_error=False replaces everything outside of interpolation
# zone with NaNs
y = stream['time_series']
x = stream['time_stamps']

stream['time_series'] = _interpolate(x, y, new_timestamps, kind)
stream['time_stamps'] = new_timestamps

if channel_format in ['int8', 'int16', 'int32', 'int64']:
# i am stuck with float64s, as integers have no nans
# therefore i round to the nearest integer instead
stream['time_series'] = np.around(stream['time_series'], 0)


else:
raise NotImplementedError("Don't know how to sync sampling for " +
'channel_format={}'.format(
channel_format))
stream['info']['effective_srate'] = max_fs

return streams


def _limit_streams_to_overlap(streams):
'''takes streams, returns streams limited to time periods overlapping
between all streams

The overlapping periods start and end for each streams with the first and
last sample completely within the overlapping period.
If time_stamps have been synced, these are the same time-points for all
streams. Consider that in the case of unsynced time-stamps, the time-stamps
can not be exactly equal!
'''
ts_first, ts_last = [], []
for stream in streams.values():
# skip streams with fs=0 or if they send strings, because they might
# just not yet have send anything on purpose (i.e. markers)
# while other data was already being recorded.
if (stream['info']['effective_srate'] !=0 and
stream['info']['channel_format'][0] != 'string'):
# extrapolation in _sync_timestamps is done with NaNs
not_extrapolated = np.where(~np.isnan(stream['time_series']))[0]
ts_first.append(min(stream['time_stamps'][not_extrapolated]))
ts_last.append(max(stream['time_stamps'][not_extrapolated]))

ts_first = max(ts_first)
ts_last = min(ts_last)
for stream in streams.values():
# use np.around to prevent floating point hickups
around = np.around(stream['time_stamps'], 15)
a = np.where(around>=ts_first)[0]
b = np.where(around<=ts_last)[0]
select = np.intersect1d(a,b)
if type(stream['time_stamps']) is list:
stream['time_stamps'] = [stream['time_stamps'][s] for s in select]
else:
stream['time_stamps'] = stream['time_stamps'][select]

if type(stream['time_series']) is list:
stream['time_series'] = [stream['time_series'][s] for s in select]
else:
stream['time_series'] = stream['time_series'][select]

return streams

# noinspection PyTypeChecker
def _robust_fit(A, y, rho=1, iters=1000):
Expand Down
71 changes: 71 additions & 0 deletions Python/pyxdf/test/test_limit_on_synced_stamps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from pyxdf.pyxdf import _sync_timestamps, _limit_streams_to_overlap
import numpy as np
# %%
def streams():
#generate mock-streams
class MockStream(dict):

def __init__(self, timestamps, timeseries, effective_srate, channel_format):
self['time_series'] = timeseries
self['time_stamps'] = timestamps
self['info'] = {}
self['info']['effective_srate'] = effective_srate
self['info']['channel_format'] = channel_format

streams = {}
# fastest stream, latest timestamp
streams[1] = MockStream(np.linspace(1,2,1001),
np.linspace(1,2,1001),
1000, ['float32'])

# slowest stream, earliest timestamp
streams[2] = MockStream(np.linspace(0.5,1.5,251),
np.linspace(.5, 1.5, 251),
250, ['float32'])

# marker stream
streams[3] = MockStream([0.2, 1.1071, 1.2, 1.9, 2.5],
['mark_' + str(n) for n in range(0,5,1)],
0, ['string'])
# integer datatype stream,
streams[4] = MockStream(np.linspace(0.4,1.4,251),
np.linspace(4, 140, 251, dtype='int32'),
250, ['int32'])

return streams

def mock():
synced = _sync_timestamps(streams())
return _limit_streams_to_overlap(synced)

#%% test
# earliest overlapping timestamp is 1.0 from stream 1
# latest overlapping timestamp is 1.4 from stream 4
# fastest strteam is stream 1 with fs 1000
def test_timestamps():
'check timestamps streams'
for s in mock().values():
assert np.all(np.isclose(s['time_stamps'], np.linspace(1, 1.4, 401)))

def test_first_timeseries():
assert np.all(np.isclose(mock()[1]['time_series'],
np.linspace(1, 1.4, 401)))

def test_second_timeseries():
assert np.all(np.isclose(mock()[2]['time_series'],
np.linspace(1, 1.4, 401)))

def test_third_timeseries():
s = mock()[3]['time_series']
idx = np.where(s!='')[0]
assert np.all(idx == [107, 200])
assert mock()[3]['time_stamps'][idx[0]] == 1.107 # shifted to closest fit
assert mock()[3]['time_stamps'][idx[1]] == 1.2 # fits with fs 1000

def test_fourth_timeseries():
# interpolation is tricky, as it depends on np.around and linear
# interpolation, which can not be approximated with np.linspace
# we therefore only test first and last value of the series
s = mock()[4]['time_series']
assert np.isclose(s[0], 85)
assert np.isclose(s[-1], 140)
53 changes: 53 additions & 0 deletions Python/pyxdf/test/test_limit_streams_to_overlap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from pyxdf.pyxdf import _limit_streams_to_overlap
import numpy as np
# %%
def streams():
#generate mock-streams
class MockStream(dict):

def __init__(self, timestamps, timeseries, effective_srate, channel_format):
self['time_series'] = timeseries
self['time_stamps'] = timestamps
self['info'] = {}
self['info']['effective_srate'] = effective_srate
self['info']['channel_format'] = channel_format

streams = {}
# fastest stream, latest timestamp
streams[1] = MockStream(np.linspace(1,2,1001),
np.linspace(1,2,1001),
1000, ['float32'])

# slowest stream, earliest timestamp
streams[2] = MockStream(np.linspace(0.4,1.4,251),
np.linspace(0.4,1.4,251),
250, ['float32'])

# marker stream
streams[3] = MockStream([0.2, 1.1071, 1.2, 1.9, 2.5],
['mark_' + str(n) for n in range(0,5,1)],
0, ['string'])
return streams

# %% test

def test_timestamps():
'test whether the first and last timestamps have been selected as expected'
olap = _limit_streams_to_overlap(streams())
for s,v in zip(olap.values(),
[(1, 1.4), (1, 1.4), (1.1071, 1.2)]):
assert np.isclose(s['time_stamps'][0], v[0])
assert np.isclose(s['time_stamps'][-1], v[-1])

def test_timeseries():
'test whether the first and last value are as expected'
olap = _limit_streams_to_overlap(streams())
for s,v in zip(olap.values(),
[(1, 1.4), (1, 1.4), ('mark_1', 'mark_2')]):
if s['info']['channel_format'] != ['string']:
assert np.isclose(s['time_series'][0], v[0])
assert np.isclose(s['time_series'][-1], v[-1])
else:
assert s['time_series'][0] == v[0]
assert s['time_series'][-1] == v[-1]

Loading