-
Notifications
You must be signed in to change notification settings - Fork 0
Creek: Design notes
See know - design throughts for more context and deliberations on how the interface to streams.
creek.infinite_sequence.InfiniteSeq
shows how one can get a slicing [bt:tt]
interface to a stream of items where bt
and tt
are in the "items counter" unit. Changing that to another unit can be done by mapping these to and from the unit we need.
We'd like to have a similar capability, but dealing with a stream of indexed segments. Of course, we can use InfiniteSeq
as is for this, but what if the index we need to use can't be computed without looking into the streamed items. The item might carry it's index itself (e.g. a json with a "timestamp" field). In other cases, the index can be computed by updating it based on some feature of the stream items (e.g. size of segments): we can use creek.tools.DynamicIndexer
to compute the indices of our items.
If these items not only have their own, possibly irregular, index, but also the information they carry is relevant to not only a point in time, but an interval in time, we now have a stream of (interval, segment_data)
items, and we want to not only manage the bt:tt
with respect to the interval
information, but also might want to aggregate the matched segment_data
items in some particular way.
Example: A stream of (non-fixed-sized) segments of audio bytes coming from a microphone. We manage to maintain a byte offset (with DynamicIndexer
for example), and with it, create a stream of (bt, tt, audio_bytes)
triples which we want to query with as audio[qbt:qtt]
, getting all the bytes in one piece.
Problem A: Given a stream
of (dbt, d)
items ordered by dbt
, s = Sliceable(stream)
is such that s[qbt:qtt]
gives us the items that intersect with (qbt, qtt)
interval. Intersection is a configurable concept.
Problem B: Like problem A, but we have ((dbt, dtt), d)
items. Intersection is a bit more clear here, but still configurable.
Problem A solves itself by computing ((dbt, dtt), d)
from (dbt, d)
, and solving Problem B.
We get from multi-dimensional timestamped streams of data to a single one via flattening (e.g. MergedStreams).
Before of after the flattening, we need to normalize the time-unit (multi streams means multi time units, but there's only one query/slicing time unit!).
From there, there's two main routes:
- iterator: creating an iterator of slabs (i.e. a stream of multi-dimensional data aggregated by time segments (or fixed, or not fixed size)
- matrix: allowing a the data to be queried by "random-access" (i.e. not necessarily ordered)
bt:tt
slice queries
The difference between both is that the first has a fixed interval query logic (think "a sequence of ordered intervals/windows/chunks"), whereas the second is "on demand"; the data is buffered, and we give access to any data in that buffer, in any query order.
The matrix approach subsumes the iterator, but the iterator approach is frequently sufficient, and enables simpler optimized code.
As far as the iterator approach goes, one central function is:
slabs_intervals, time_query_intervals -> slabs_intervals
We assume here that (1) the data has been flattened, so can ignore the multidimensional aspect, and
(2) the slabs_intervals
are linked to their data somehow (but we don't need the data for this function.
Central to this is the
def intervals_match(slabs_intervals, time_query_intervals) -> bool: ...
function. The default would be (because we assume that both intervals are in the [bt, tt)
language (i.e. include bt
and exclude tt
):
def default_intervals_match(slabs_intervals, time_query_intervals) -> bool: ...
slab_bt, slab_tt = slabs_intervals
time_bt, time_tt = time_query_intervals
return time_bt <= slab_bt and slab_tt <= time_tt
But other definitions are immediately useful. One comes to mind:
return time_bt < slab_tt and slab_bt < time_tt
^ ^ ^ ^
Namely, see the "matched segments" part of the image below.
source -> timestamped_segments -> intersect_with_slice_query -> aggregate -> output
timestamped_segments
are streams of (bt, tt, segment)
(or (bt, segment)
, or just segment
, with a way get to the normalized form from there). We assume that the stream items are index-sorted already.
What method might be the "best" depends on what assumptions we can make about the incoming data and how we measure better (tradeoffs of memory vs speed, average vs worst case, etc.). Here are a few ideas though
Use filter(overlaps_with_query, timestamped_segments)
where overlaps_with_query = mk_filter_func_for_query(query)
is a boolean function defined by the query interval.
The advantage here is
- it is simple
- makes minimal assumptions
- fits a common pattern, built in python (
filter
)
The disadvantage is that it doesn't take advantage of some assumptions we could make in many cases.
For example, that the timestamped_segments
iterator is sorted.
The filter way obliges, by design, a full scan of the segments at every query.
We have no way here to skip earlier items or stop the scan early if we know we'll have no more matches.
InfiniteSeq
solve our slicing problem in the case where timestamps are simple enumerators (0, 1, 2,...
).
In that case, matching a query is direct, since the query (interval) is the definition of the slice we want: segments[bt:tt]
.
If the assumptions on segments
are such that we can map the segments' timestamps to the (simple enumerator) index, then we can use this conversion function to map our query to an explicit bt:tt
slice of the InfiniteSeq
buffer.
Sometimes, these assumptions will allow the conversion function to be computed through an efficient formula. In other situations, we'll have to maintain an explicit mapping.
In general, we'll need a mix of a (maintained) explicit mapping {query_bt: buffer_index, ...}
,
and some indexer to efficiently find the query_bts
of the mapping corresponding to the query.
With point data, one could use the (builtin package) bisect to maintain a
collection in sorted order.
For interval data you can go further and use a interval tree, implemented in python here.
But again -- one must keep an eye on tradeoffs.
The expense of maintaining a sorted collection or interval tree may not always be worth the advantages it has to offer
(biased on the query-matching speed).
creek.tools: Segmenter
, BufferStats
Sometimes all we need to do is iterate through the data.
- The data might be uni- or multi-dimensional.
- The data might be live, or persisted.
- The size of the data items might be fixed (e.g. video/audio frames) or not (e.g. files).
- Several layers of "structuring" (e.g. split, map, aggregate) may be needed to produce the wanted slabs from the source(s)
slabs = make_iterator_of_streaming_data(...)
for slab in slabs:
... # do stuff with slab
Things often involved in making slabs:
- Chunking data (gathering data until the total is some fixed size, and releasing that "chunk"). For example (for non-overlapping gapless fixed size chunks):
yield from map(list, zip(*([iter(source_iterable)] * chk_size)))
. See also taped simple chunker or other chunkers (slang). - Chunking time (generate intervals of time, then extracting the corresponding data for that interval)
- Smart chunking: Sending base slabs through come computation pipeline to decide whether to include it in the iteration or not
Some terminology proposals:
from typing import Iterable
from atypes.util import MyType
TimeIndexedItem = MyType(
'TimeIndexedItem', Any,
doc='Data that is (implicitly or explicitly) time-indexed.'
)
Slab = MyType(
'Slab', Iterable[TimeIndexedItem], # extra condition: all items within a same interval
doc='A collection of (time-indexed) items of a same interval of time.',
)
Other terms proposed (also in atypes
):
-
Hunk
(fixed size slab), -
Segment
(uni-dimensional slab), -
Chunk
(fixed size uni-dimensional slab)
Sometimes we need to get (channel, time_interval) slices of data from a buffer of live or persisted data. Sometimes we'd like to also write such slices (on a separate buffer
![](https://user-images.githubusercontent.com/1906276/138754719-a768221a-4603-4c1d-abb0-45a40991b022.png)
Ideas of interface:
s = MultiChannelStream(...)
# reading
channels = list(s) # list of channels
ch = channels[0]
x = s[ch] # time-sliceable iterable (i.e., we can do x[bt:tt]
s[ch][bt:tt] # the data that happened between (timestamps) `bt` and `tt` (by default `bt` inclusive and `tt` exclusive.
# writing
s[ch].append(data) # append to `ch` data (requires some implicit or explicit (e.g. data:=((bt, tt), array)) timestamps)
s[ch][bt:tt] = data # insert `data` as the data of `ch` between `bt` and `tt` (requires time-alignment validation)
s[ch][bt:tt]
is a default choice here, but different contexts could call for different variation of this interface.
For example:
-
s[ch, bt:tt]
(e.g. seeAudioSegment
which does this wherech
is a filename, or any key that leads to a sliceable waveform -- note thatAudioSegment
also caches the waveform in memory so that multiple slices of a samech
(key) are optimized. -
s[bt:tt][ch]
: If we (mostly) need all the channel data for a give slice
In some cases, it's easy (and not dangerous) to support multiple such interfaces. For example: s
could support both s[ch][bt:tt]
and s[ch, bt:tt]
. Note that s[ch][bt:tt]
and s[ch, bt:tt]
could be equivalent, or even the code for one just using the code for the other; but in some cases s[ch, bt:tt]
could offer opportunities for optimization since the code now has more visibility on the user's intentions (get data for channel, and slice it... This may remind you of the optimization opportunities of GraphQL over REST.).
A few more ideas on the [...]
interface of a stream.
(Note that some of these are more relevant to persisted storage than a live stream, but again; we'd like the interface to reveal the difference minimally -- only if necessary).
Rectangular (explicit) selections:
rectangle = (channel_list, bt:tt)
data = s[rectangle]
rectangles = [(channel_list, bt:tt), ...]
data = s[rectangles]
Logical selection:
filt = ... # A function filtering (in or out) over channels and/or time and/or data itself
data = s[filt]
Let's not forget abstract annotations -- namely, naming collections of things, and possibly associating meta-datas to these
s[alias_of_channel]
s[named_collection_of_channels] # for example, corresponding to a user, or project, or asset, or type of data...
s[collection_of_intervals] # for example, sessions, phases, tags
s[named_collection_of_rectangles] # basically, a named dataset (for example, train/test data)
s.meta[named_collection_of_rectangles]['user_permission'].update(bob={'read': True, 'write': False}, alice={})
More on the collection_of_intervals
key:
# let's say somehow we have the following mapping
tags = {
'dog': [(bt, tt), ...], # those intervals that have be tagged as "dog"
'cat': [(bt, tt), ...], # those intervals that have be tagged as "dog"
}
dog_waveforms = s[tags['dog']]['audio']
Note that the concerns expressed above are separate, but will need to play along with a few other related concerns: Index unit, sourcing, and caching.
What is the unit of the indexing (enumeration, utc time, size based etc.)? See creek.tools.DynamicIndexer
for some ideas/code about that.
It's important that when objects are timestamped, that interval queries responses be as time aligned as possible. For example, this means that enumeration indices will only do if there's only one channel, or all channels have the same item (time) size, step and rate.
How does is the source data actually generated?
In the case of persisted data, we usually want to have the data be pulled from the source as it's needed from the consuming process. Usually you also want to batch and preprocess requests (e.g. sort) and use caching to optimize IO.
Further, when persisted data is multi-dimensional (multi-channel) and/or is sourced from various places, one needs to decide how the iteration is actually being seeded. For example, if we have annotated audio, it is sometimes better to iterate through the annotations and retrieve the audio according to the references therein, but sometimes better to iterate through the audio source and pull in the relevant annotation accordingly.
In the case of live data, the iteration is (ultimately) being seeded by time itself. And time doesn't wait. This means that a buffer is required to make live data available enough time for it to be sliced. The source(s) are responsible to push their data onto the buffer, and then buffer reader(s) can have enough time (if buffer size large enough for data and consumption rates) to slice and do their job.
A buffer is just one instance of using a cache in the live data case. Other kinds of cache are also useful in persisted data cases to optimize IO etc.
Below are a few older notes kept around for further information (but take with a grain of salt).
Bits that may be useful:
InfiniteSeq
- https://github.com/i2mint/creek/blob/master/creek/infinite_sequence.py
- https://github.com/i2mint/creek/blob/master/creek/scrap/multi_streams.py
AudioSegment
- https://github.com/otosense/hear/blob/master/hear/tools.py
- https://github.com/otosense/hear/blob/master/hear/session_block_stores.py (but old and too particular; read with a grain of salt, and extract ideas/code)
Problem A: Given a stream
of (dbt, d)
items ordered by dbt
, s = Sliceable(stream)
is such that s[qbt:qtt]
gives us the items that intersect with (qbt, qtt)
interval. Intersection is a configurable concept.
Problem B: Like problem A, but we have ((dbt, dtt), d)
items. Intersection is a bit more clear here, but still configurable.
Problem A solves itself by computing ((dbt, dtt), d)
from (dbt, d)
, and solving Problem B.
Utils:
interval_intersection(interval_1, interval_2)
-
slicer(data, data_interval=(dbt, dtt), query_interval=(qbt,qtt))
(justdata[intersection(...) - dbt)]
More about this in python docs and this useful blog post
We all know iter(iterable)
to get an iterator (i.e. has a next) from an iterable (i.e. you can loop on it).
But we (I at least) often forget the iter(callable, sentinel)
form.
For reason, it's a strange interface choice: If you mention the second argument (sentinel),
your first argument is now expected to be a different type; no longer an iterable, but a callable!! (??!!?)
The idea of the sentinel is to stop "iterating" (read "calling that callable") as soon as it encounters that sentinel value.
But this doesn't work (raises TypeError: iter(v, w): v must be callable
):
list(iter([1, 2, 3, None, 4], None))
Instead, you need to transform your [1, 2, 3, None, 4]
into a callable returning successive
values of the iterable. In python, that is functools. partial(next, iter([1,2,3, None, 4])
.
So:
from functools import partial
list(iter(partial(next, iter([1, 2, 3, None, 4])), None))
They (the python gods) could have made iter
form be iter(iterable, callable_sentinel)
, but I guess
they figured that this is equivalent to iter(filter(callable_sentinel, iterable))
?
I guess?
Anyway, this code might be useful for creeking:
from functools import partial
no_sentinel = object()
def iterable_to_reader(iterable, sentinel=no_sentinel):
"""Makes an argument-less function that will give you the next iterm of an iterable when called"""
if sentinel is no_sentinel:
return partial(next, iter(iterable))
else:
return partial(next, iter(iterable), sentinel)
def finite_list(iterable, max_items=5):
from itertools import islice
return list(islice(iterable, max_items))
read = iterable_to_reader([1, 2, 3])
assert read() == 1
assert read() == 2
assert read() == 3
# Calling walker() again would raise StopIteration
# Use of iter(callable, sentinel)
# Here we'll ask that the reader not raise a StopIteration, but yield 'no_more_items' when there's no more data.
# If your reader uses the 'no_more_items' sentinel too, you'll get only the data
read = iterable_to_reader([1, 2, 3], sentinel='no_more_items')
assert finite_list(iter(read, 'no_more_items')) == [1, 2, 3]
# If your reader uses a different sentinel too, you'll get what ever that reader is giving you...
read = iterable_to_reader([1, 2, 3], sentinel='my_sentinel')
assert finite_list(iter(read, 'no_more_items')) == [1, 2, 3, 'my_sentinel', 'my_sentinel']
- one file per channel
- several gluable files per channel
- several non-glueable files per channel
- stream2py: several gluable timestamped segments
-
wf_store := {ch0: Iterable[wf1, wf2, wf3], ...}
where the wfs are gapless -
wf_store := {ch0: ch0_wf_store, ...}
wherech0_wf_store := {bt0: wf0, ...}
s = MultiChannelStream(...)
channels = list(s) # list of channels
ch = channels[0]
s[ch] # time-sliceable iterable
s[ch][bt:tt] # the data that happened between (timestamps) `bt` and `tt` (by default `bt` inclusive and `tt` exclusive.
Then:
s[bt, tt]
Future:
s[bt, tt, :]
s[bt, tt, list_of_channels]