-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
74 lines (71 loc) · 2.13 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
var through2 = require('through2');
/**
* @alias stream-sample
*
*
* Stream sampling uses a reservoir.
*
* This implements the [reservoir sampling](http://en.wikipedia.org/wiki/Reservoir_sampling)
* algorithm that allows you to glean a statistically valid fixed-size sample
* from an unknown-size input while keeping only the sample in memory.
*
* This module is different than the [reservoir-stream module](https://github.com/kesla/reservoir-stream)
* that requires the full input to be concatenated in memory.
*/
/**
* Create a transform stream that emits an evenly-distributed
* sample of `sampleCount` items for every new item received.
*
* @param {Number} sampleCount the number of elements to be placed in the
* sample.
* @returns {Stream.Transform} a transform stream that samples the input.
* @example
* var streamSample = require('stream-sample');
*
* var sampler = streamSample(10);
*
* sampler.on('data', function(sample) {
* // sample is n items from the stream
* });
*
* for (var i = 0; i < 100; i++) sampler.push(Math.random());
*/
function streamSample(sampleCountOrArray, lastOneOnly) {
var sample = [],
sampleCount = 0;
if (Array.isArray(sampleCountOrArray)) {
sample = sampleCountOrArray;
sampleCount = sample.length;
if (lastOneOnly === undefined) {
lastOneOnly = true;
}
} else {
sampleCount = sampleCountOrArray;
sample = new Array(sampleCount);
}
var i = 0;
return through2.obj(function (data, enc, callback) {
// Fill the initial sample.
if (i < sampleCount) {
sample[i] = data;
} else {
// With decreasing probability, replace sampled
// items with new items from the stream.
var randomIndex = Math.round(Math.random() * i);
if (randomIndex < sampleCount) {
sample[randomIndex] = data;
}
}
i++;
if (!lastOneOnly) {
this.push(sample);
}
callback();
}, function flush(cb) {
if (lastOneOnly) {
this.push(sample);
}
cb();
});
}
module.exports = streamSample;