Uses such libraries as xstream and RxJS.
npm install
Limits a stream to N concurrent events, incorporating a feedback loop to free up new events. Useful for batching asynchronous requests of varying duration.
import concurrent from 'concurrent/concurrent'
// create a proxy to close the circular dependency
const responsesProxy$ = xs.create();
// limits the requests$ to 3 at most
const 3requests$ = concurrent(requests$, responsesProxy$, 3);
// do something with the requests
const responses$ = 3requests$.addListener(/* ... */);
// closes the circular dependency by passing our completed responses back in order to free up more requests
responsesProxy$.imitate(responses$);
With a concurrency of 3:
requests: -a-b-c-d-e--------f-g-h-i---|
responses: ----------T---T-T-T----T--T--
output: -a-b-c----d---e---f-g--h--i-|
Requests demo (max of 3)
npm run-script concurrent
Zips multiple streams together, emits arrays of values if and when all streams have something new to offer.
import zip from 'zip/zip'
zip(stream1$, stream2$);
Marble diagram (RxMarbles)
input1: -1--2------3--4---|
input2: ---a--b--c--d-----|
output: ---1a-2b---3c-4d--|
input1: -a---b---c-----d----|
input2: --1-----2----3---4--|
input3: -j----k-------l-----|
output: --a1j---b2k---c3l---|