-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathbuffer-peek-stream.js
88 lines (62 loc) · 1.83 KB
/
buffer-peek-stream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
var stream = require('stream');
var util = require('util');
function peek(source, bytes, callback) {
if (!callback) return peek(source, undefined, bytes);
var dest = new BufferPeekStream({peekBytes: bytes});
dest.once('peek', function (buffer) {
callback(null, buffer, dest);
});
return source.pipe(dest);
}
peek.BufferPeekStream = BufferPeekStream;
peek.promise = promise;
module.exports = peek;
function promise(source, bytes) {
return new Promise((resolve, reject) => {
peek(source, bytes, (err, buffer, dest) => {
if (err) return reject(err);
resolve([buffer, dest]);
})
})
}
function BufferPeekStream(opts) {
if (!opts) opts = {};
opts.highWaterMark = opts.peekBytes || 65536;
stream.Transform.call(this, opts);
this._peekState = {
buffer: [],
bytes: 0,
maxBytes: opts.peekBytes || 65536,
peeked: false
};
}
util.inherits(BufferPeekStream, stream.Transform);
BufferPeekStream.prototype._transform = function _transform(chunk, enc, callback) {
var state = this._peekState;
// buffer incoming chunks until we have enough for our peek
state.buffer.push(chunk);
state.bytes += chunk.length;
// get more?
if (state.bytes >= state.maxBytes) _peek(this, callback);
else callback();
};
BufferPeekStream.prototype._flush = function _flush(callback) {
if (this._peekState.peeked) callback();
else _peek(this, callback);
};
function _peek(stream, callback) {
var state = stream._peekState;
var buffer = Buffer.concat(state.buffer);
// emit exactly the number of bytes we wanted to peek
stream.emit('peek', buffer.slice(0, state.maxBytes));
stream.push(buffer);
state.buffer = null;
state.bytes = null;
state.peeked = true;
stream._transform = passthrough;
callback();
}
function passthrough(chunk, enc, callback) {
this.push(chunk);
callback();
}