Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to load only specific streams #24

Merged
merged 14 commits into from
Jul 8, 2019
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## [1.15.2] - 2019-06-07
### Added
- Store unique stream ID inside the `["info"]["stream_id"]` dict value ([#19](https://github.com/xdf-modules/xdf-Python/pull/19) by [Clemens Brunner](https://github.com/cbrnr)).
- Add option to load only specific streams ([#24](https://github.com/xdf-modules/xdf-Python/pull/24) by [Clemens Brunner](https://github.com/cbrnr)).

## [1.15.1] - 2019-04-26
### Added
Expand Down
6 changes: 3 additions & 3 deletions pyxdf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Authors: Christian Kothe & the Intheon pyxdf team
# Clemens Brunner
#
# License: BSD (2-clause)

from pkg_resources import get_distribution, DistributionNotFound
try:
__version__ = get_distribution(__name__).version
except DistributionNotFound:
# package is not installed
except DistributionNotFound: # package is not installed
__version__ = None
from .pyxdf import load_xdf
from .pyxdf import load_xdf, resolve_streams, match_streaminfos

206 changes: 184 additions & 22 deletions pyxdf/pyxdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __init__(self, xml):


def load_xdf(filename,
select_streams=None,
on_chunk=None,
synchronize_clocks=True,
handle_clock_resets=True,
Expand Down Expand Up @@ -94,6 +95,20 @@ def load_xdf(filename,
Args:
filename : name of the file to import (*.xdf or *.xdfz)

select_streams : int | list[int] | list[dict] | None
One or more stream IDs to load. Accepted values are:
- int or list[int]: load only specified stream IDs, e.g.
select_streams=5 loads only the stream with stream ID 5, whereas
select_streams=[2, 4] loads only streams with stream IDs 2 and 4.
- list[dict]: load only streams matching the query, e.g.
select_streams=[{'type': 'EEG'}] loads all streams of type 'EEG'.
Entries within a dict must all match a stream, e.g.
select_streams=[{'type': 'EEG', 'name': 'TestAMP'}] matches streams
with both type 'EEG' *and* name 'TestAMP'. If
select_streams=[{'type': 'EEG'}, {'name': 'TestAMP'}], streams
matching either the type *or* the name will be loaded.
- None: load all streams (default).

synchronize_clocks : Whether to enable clock synchronization based on
ClockOffset chunks. (default: true)

Expand Down Expand Up @@ -155,10 +170,10 @@ def load_xdf(filename,
streams : list of dicts, one for each stream; the dicts
have the following content:
['time_series'] entry: contains the stream's time series
[#Channels x #Samples] this matrix is of the type declared in
['info']['channel_format']
['time_stamps'] entry: contains the time stamps for each sample
(synced across streams)
[#Channels x #Samples] this matrix is of the type declared
in ['info']['channel_format']
['time_stamps'] entry: contains the time stamps for each
sample (synced across streams)

['info'] field: contains the meta-data of the stream
(all values are strings)
Expand All @@ -179,14 +194,31 @@ def load_xdf(filename,

Examples:
load the streams contained in a given XDF file
>>> streams, fileheader = load_xdf('C:\Recordings\myrecording.xdf')
>>> streams, fileheader = load_xdf('myrecording.xdf')
"""

logger.info('Importing XDF file %s...' % filename)
if not os.path.exists(filename):
raise Exception('file %s does not exist.' % filename)

# dict of returned streams, in order of apparance, indexed by stream id
# if select_streams is an int or a list of int, load only streams
# associated with the corresponding stream IDs
# if select_streams is a list of dicts, use this to query and load streams
# associated with these properties
if select_streams is None:
pass
elif isinstance(select_streams, int):
select_streams = [select_streams]
elif all([isinstance(elem, dict) for elem in select_streams]):
select_streams = match_streaminfos(resolve_streams(filename),
select_streams)
if not select_streams: # no streams found
raise ValueError("No matching streams found.")
elif not all([isinstance(elem, int) for elem in select_streams]):
raise ValueError("Argument 'select_streams' must be an int, a list of "
"ints or a list of dicts.")

# dict of returned streams, in order of appearance, indexed by stream id
streams = OrderedDict()
# dict of per-stream temporary data (StreamData), indexed by stream id
temp = {}
Expand All @@ -195,22 +227,9 @@ def load_xdf(filename,
# number of bytes in the file for fault tolerance
filesize = os.path.getsize(filename)

# read file contents ([SomeText] below refers to items in the XDF Spec)
filename = Path(filename) # convert to pathlib object
if filename.suffix == '.xdfz' or filename.suffixes == ['.xdf', '.gz']:
f_open = gzip.open
else:
f_open = open

with f_open(filename, 'rb') as f:
# read [MagicCode]
if f.read(4) != b'XDF:':
raise Exception('not a valid XDF file: %s' % filename)

# for each chunk...
StreamId = None
with open_xdf(filename) as f:
# for each chunk
while True:

# noinspection PyBroadException
try:
# read [NumLengthBytes], [Length]
Expand All @@ -231,9 +250,16 @@ def load_xdf(filename,
if tag in [2, 3, 4, 6]:
StreamId = struct.unpack('<I', f.read(4))[0]
log_str += ', StreamId={}'.format(StreamId)
else:
StreamId = None

logger.debug(log_str)

if StreamId is not None and select_streams is not None:
if StreamId not in select_streams:
f.read(chunklen - 2 - 4) # skip remaining chunk contents
continue

# read the chunk's [Content]...
if tag == 1:
# read [FileHeader] chunk
Expand All @@ -260,7 +286,7 @@ def load_xdf(filename,
# optionally send through the on_chunk function
if on_chunk is not None:
values, stamps, streams[StreamId] = on_chunk(values, stamps,
streams[StreamId], StreamId)
streams[StreamId], StreamId)
# append to the time series...
temp[StreamId].time_series.append(values)
temp[StreamId].time_stamps.append(stamps)
Expand Down Expand Up @@ -336,6 +362,18 @@ def load_xdf(filename,
return streams, fileheader


def open_xdf(filename):
"""Open XDF file for reading."""
filename = Path(filename) # convert to pathlib object
if filename.suffix == '.xdfz' or filename.suffixes == ['.xdf', '.gz']:
f = gzip.open(filename, 'rb')
else:
f = open(filename, 'rb')
if f.read(4) != b'XDF:': # magic bytes
raise IOError('Invalid XDF file {}'.format(filename))
return f


def _read_chunk3(f, s):
# read [NumSampleBytes], [NumSamples]
nsamples = _read_varlen_int(f)
Expand Down Expand Up @@ -387,6 +425,8 @@ def _read_varlen_int(f):
return struct.unpack('<I', f.read(4))[0]
elif nbytes == b'\x08':
return struct.unpack('<Q', f.read(8))[0]
elif not nbytes: # EOF
raise EOFError
else:
raise RuntimeError('invalid variable-length integer encountered.')

Expand Down Expand Up @@ -584,3 +624,125 @@ def _robust_fit(A, y, rho=1, iters=1000):
z = rho / (1 + rho) * d + 1 / (1 + rho) * tmp * d
u = d - z
return x


def match_streaminfos(stream_infos, parameters):
"""Find stream IDs matching specified criteria.

Parameters
----------
stream_infos : list of dicts
List of dicts containing information on each stream. This information
can be obtained using the function resolve_streams.
parameters : list of dicts
List of dicts containing key/values that should be present in streams.
Examples: [{"name": "Keyboard"}] matches all streams with a "name"
field equal to "Keyboard".
[{"name": "Keyboard"}, {"type": "EEG"}] matches all streams
with a "name" field equal to "Keyboard" and all streams with
a "type" field equal to "EEG".
"""
matches = []
for request in parameters:
for info in stream_infos:
for key in request.keys():
match = info[key] == request[key]
if not match:
break
if match:
matches.append(info['stream_id'])

return list(set(matches)) # return unique values


def resolve_streams(fname):
"""Resolve streams in given XDF file.

Parameters
----------
fname : str
Name of the XDF file.

Returns
-------
stream_infos : list of dicts
List of dicts containing information on each stream.
"""
return parse_chunks(parse_xdf(fname))


def parse_xdf(fname):
"""Parse and return chunks contained in an XDF file.

Parameters
----------
fname : str
Name of the XDF file.

Returns
-------
chunks : list
List of all chunks contained in the XDF file.
"""
chunks = []
with open_xdf(fname) as f:
for chunk in _read_chunks(f):
chunks.append(chunk)
return chunks


def parse_chunks(chunks):
"""Parse chunks and extract information on individual streams."""
streams = []
for chunk in chunks:
if chunk["tag"] == 2: # stream header chunk
streams.append(dict(stream_id=chunk["stream_id"],
name=chunk.get("name"), # optional
type=chunk.get("type"), # optional
source_id=chunk.get("source_id"), # optional
created_at=chunk.get("created_at"), # optional
uid=chunk.get("uid"), # optional
session_id=chunk.get("session_id"), # optional
hostname=chunk.get("hostname"), # optional
channel_count=int(chunk["channel_count"]),
channel_format=chunk["channel_format"],
nominal_srate=int(chunk["nominal_srate"])))
return streams


def _read_chunks(f):
"""Read and yield XDF chunks.

Parameters
----------
f : file handle
File handle of XDF file.


Yields
------
chunk : dict
XDF chunk.
"""
while True:
chunk = dict()
try:
cbrnr marked this conversation as resolved.
Show resolved Hide resolved
chunk["nbytes"] = _read_varlen_int(f)
except EOFError:
return
chunk["tag"] = struct.unpack('<H', f.read(2))[0]
if chunk["tag"] in [2, 3, 4, 6]:
chunk["stream_id"] = struct.unpack("<I", f.read(4))[0]
if chunk["tag"] == 2: # parse StreamHeader chunk
xml = ET.fromstring(f.read(chunk["nbytes"] - 6).decode())
chunk = {**chunk, **_parse_streamheader(xml)}
else: # skip remaining chunk contents
f.seek(chunk["nbytes"] - 6, 1)
else:
f.seek(chunk["nbytes"] - 2, 1) # skip remaining chunk contents
yield chunk


def _parse_streamheader(xml):
"""Parse stream header XML."""
return {el.tag: el.text for el in xml if el.tag != "desc"}