From 830c40ef07daafdfac8c84ca06b06fa9806356af Mon Sep 17 00:00:00 2001 From: Maksim Norkin Date: Fri, 22 Mar 2019 13:34:56 +0200 Subject: [PATCH] Use new buffer --- lib/connection.js | 9 +++++---- lib/protocol/common.js | 5 +++-- lib/protocol/misc/compression.js | 4 +++- package.json | 7 ++++--- test/07.compression.js | 7 ++++--- test/08.connection.js | 3 ++- 6 files changed, 21 insertions(+), 14 deletions(-) diff --git a/lib/connection.js b/lib/connection.js index 3708623..f1d23c4 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -5,6 +5,7 @@ var Promise = require('./bluebird-configured'); var NoKafkaConnectionError = require('./errors').NoKafkaConnectionError; var tls = require('tls'); var _ = require('lodash'); +var Buffer = require('safer-buffer').Buffer; function Connection(options) { this.options = _.defaults(options || {}, { @@ -19,7 +20,7 @@ function Connection(options) { // internal state this.connected = false; this.closed = false; // raised if close() was called - this.buffer = new Buffer(this.options.initialBufferSize); + this.buffer = Buffer.alloc(this.options.initialBufferSize); this.offset = 0; this.queue = {}; @@ -115,7 +116,7 @@ Connection.prototype._disconnect = function (err) { }; Connection.prototype._growBuffer = function (newLength) { - var _b = new Buffer(newLength); + var _b = Buffer.alloc(newLength); this.buffer.copy(_b, 0, 0, this.offset); this.buffer = _b; }; @@ -135,7 +136,7 @@ Connection.prototype.close = function () { * @return {Promise} Promise resolved with a Kafka response message */ Connection.prototype.send = function (correlationId, data, noresponse) { - var self = this, buffer = new Buffer(4 + data.length); + var self = this, buffer = Buffer.alloc(4 + data.length); buffer.writeInt32BE(data.length, 0); data.copy(buffer, 4); @@ -199,7 +200,7 @@ Connection.prototype._receive = function (data) { correlationId = data.readInt32BE(4); if (this.queue.hasOwnProperty(correlationId)) { - this.queue[correlationId].resolve(new Buffer(data.slice(4, length + 4))); + this.queue[correlationId].resolve(Buffer.from(data.slice(4, length + 4))); delete this.queue[correlationId]; } diff --git a/lib/protocol/common.js b/lib/protocol/common.js index eadb9e1..f59e208 100644 --- a/lib/protocol/common.js +++ b/lib/protocol/common.js @@ -4,6 +4,7 @@ var Protocol = require('./index'); var errors = require('../errors'); var crc32 = require('buffer-crc32'); var _ = require('lodash'); +var Buffer = require('safer-buffer').Buffer; /* jshint bitwise: false */ @@ -28,7 +29,7 @@ Protocol.define('bytes', { this.Int32BE(-1); } else { if (!Buffer.isBuffer(value)) { - value = new Buffer(_(value).toString(), 'utf8'); + value = Buffer.from(_(value).toString(), 'utf8'); } this .Int32BE(value.length) @@ -51,7 +52,7 @@ Protocol.define('string', { if (value === undefined || value === null) { this.Int16BE(-1); } else { - value = new Buffer(_(value).toString(), 'utf8'); + value = Buffer.from(_(value).toString(), 'utf8'); this .Int16BE(value.length) .raw(value); diff --git a/lib/protocol/misc/compression.js b/lib/protocol/misc/compression.js index 5864b8d..5d2348c 100644 --- a/lib/protocol/misc/compression.js +++ b/lib/protocol/misc/compression.js @@ -1,5 +1,7 @@ 'use strict'; +var Buffer = require('safer-buffer').Buffer; + var requireSafe = function requireSafe(moduleName) { try { require.resolve(moduleName); @@ -13,7 +15,7 @@ var snappy = requireSafe('snappy'); var zlib = require('zlib'); var _ = require('lodash'); -var SNAPPY_MAGIC_HEADER = new Buffer([-126, 83, 78, 65, 80, 80, 89, 0,]); // '\x82SNAPPY\00' +var SNAPPY_MAGIC_HEADER = Buffer.from([-126, 83, 78, 65, 80, 80, 89, 0,]); // '\x82SNAPPY\00' /*var SNAPPY_BLOCK_SIZE = 32 * 1024; var SNAPPY_DEFAULT_VERSION = 1; var SNAPPY_MINIMUM_COMPATIBLE_VERSION = 1;*/ diff --git a/package.json b/package.json index 7c69d12..f794879 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,8 @@ "kafka" ], "dependencies": { + "@types/bluebird": "3.5.0", + "@types/lodash": "^4.14.55", "bin-protocol": "^3.1.1", "bluebird": "^3.3.3", "buffer-crc32": "^0.2.5", @@ -26,9 +28,8 @@ "lodash": "=4.17.11", "murmur-hash-js": "^1.0.0", "nice-simple-logger": "^1.0.1", - "wrr-pool": "^1.0.3", - "@types/lodash": "^4.14.55", - "@types/bluebird": "3.5.0" + "safer-buffer": "^2.1.2", + "wrr-pool": "^1.0.3" }, "devDependencies": { "chai": "^3.5.0", diff --git a/test/07.compression.js b/test/07.compression.js index d526c8a..60b4ef4 100644 --- a/test/07.compression.js +++ b/test/07.compression.js @@ -5,6 +5,7 @@ var crc32 = require('buffer-crc32'); var Kafka = require('../lib/index'); var kafkaTestkit = require('./testkit/kafka'); +var Buffer = require('safer-buffer').Buffer; describe('Compression', function () { describe('sync', function () { @@ -79,7 +80,7 @@ describe('Compression', function () { }); it('should send/receive with Snappy compression (>32kb)', async () => { - const buf = new Buffer(90 * 1024); + const buf = Buffer.alloc(90 * 1024) const crc = crc32.signed(buf); dataHandlerSpy.reset(); @@ -230,7 +231,7 @@ describe('Compression', function () { }); it('should send/receive with async Snappy compression (>32kb)', async () => { - const buf = new Buffer(90 * 1024); + const buf = Buffer.alloc(90 * 1024); const crc = crc32.signed(buf); dataHandlerSpy.reset(); @@ -277,7 +278,7 @@ describe('Compression', function () { }); it('should send/receive with async Gzip compression (>32kb)', async () => { - const buf = new Buffer(90 * 1024); + const buf = Buffer.alloc(90 * 1024); const crc = crc32.signed(buf); dataHandlerSpy.reset(); diff --git a/test/08.connection.js b/test/08.connection.js index 1bab226..adb4c9a 100644 --- a/test/08.connection.js +++ b/test/08.connection.js @@ -7,6 +7,7 @@ var fs = require('fs'); var crc32 = require('buffer-crc32'); var Kafka = require('../lib/index'); var kafkaTestkit = require('./testkit/kafka'); +var Buffer = require('safer-buffer').Buffer; describe('Connection', function () { var KAFKA_TOPIC = 'kafka-test-topic'; @@ -38,7 +39,7 @@ describe('Connection', function () { }); it('should be able to grow receive buffer', function () { - var buf = new Buffer(384 * 1024), crc = crc32.signed(buf); + var buf = Buffer.alloc(384 * 1024), crc = crc32.signed(buf); dataHandlerSpy.reset();