From 219c3e6fe2239249107a26f3e95320f1bc56a4fc Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Mon, 6 Jul 2020 15:36:12 +0200 Subject: [PATCH 1/4] WIP: mqtt stream parser --- aedes.d.ts | 1 - aedes.js | 2 - docs/Aedes.md | 1 - lib/client.js | 91 ++---- lib/mqtt-stream-parser.js | 27 ++ test/basic.js | 19 +- test/client-pub-sub.js | 6 +- test/close_socket_by_other_party.js | 3 +- test/connect.js | 445 +++------------------------- test/meta.js | 4 +- test/qos1.js | 15 +- 11 files changed, 115 insertions(+), 499 deletions(-) create mode 100644 lib/mqtt-stream-parser.js diff --git a/aedes.d.ts b/aedes.d.ts index 3f06ac95..3554b9fb 100644 --- a/aedes.d.ts +++ b/aedes.d.ts @@ -69,7 +69,6 @@ declare namespace aedes { authorizeSubscribe?: AuthorizeSubscribeHandler authorizeForward?: AuthorizeForwardHandler published?: PublishedHandler - queueLimit?: number maxClientsIdLength?: number } interface Client extends EventEmitter { diff --git a/aedes.js b/aedes.js index cf1caed6..5ded54bd 100644 --- a/aedes.js +++ b/aedes.js @@ -28,7 +28,6 @@ const defaultOptions = { published: defaultPublished, trustProxy: false, trustedProxies: [], - queueLimit: 42, maxClientsIdLength: 23 } @@ -45,7 +44,6 @@ function Aedes (opts) { // +1 when construct a new aedes-packet // internal track for last brokerCounter this.counter = 0 - this.queueLimit = opts.queueLimit this.connectTimeout = opts.connectTimeout this.maxClientsIdLength = opts.maxClientsIdLength this.mq = opts.mq || mqemitter({ diff --git a/docs/Aedes.md b/docs/Aedes.md index 9a169d32..56b86756 100644 --- a/docs/Aedes.md +++ b/docs/Aedes.md @@ -38,7 +38,6 @@ - `mq` [``](../README.md#mqemitter) middleware used to deliver messages to subscribed clients. In a cluster environment it is used also to share messages between brokers instances. __Default__: `mqemitter` - `concurrency` `` maximum number of concurrent messages delivered by `mq`. __Default__: `100` - `persistence` [``](../README.md#persistence) middleware that stores _QoS > 0, retained, will_ packets and _subscriptions_. __Default__: `aedes-persistence` (_in memory_) - - `queueLimit` `` maximum number of queued messages before client session is established. If number of queued items exceeds, `connectionError` throws an error `Client queue limit reached`. __Default__: `42` - `maxClientsIdLength` option to override MQTT 3.1.0 clients Id length limit. __Default__: `23` - `heartbeatInterval` `` an interval in millisconds at which server beats its health signal in `$SYS//heartbeat` topic. __Default__: `60000` - `id` `` aedes broker unique identifier. __Default__: `uuidv4()` diff --git a/lib/client.js b/lib/client.js index f0be5a20..fd057d46 100644 --- a/lib/client.js +++ b/lib/client.js @@ -1,17 +1,16 @@ 'use strict' -const mqtt = require('mqtt-packet') const EventEmitter = require('events') const util = require('util') -const eos = require('end-of-stream') const Packet = require('aedes-packet') const write = require('./write') const QoSPacket = require('./qos-packet') const handleSubscribe = require('./handlers/subscribe') const handleUnsubscribe = require('./handlers/unsubscribe') const handle = require('./handlers') -const { pipeline } = require('readable-stream') +const { Writable, finished, pipeline } = require('readable-stream') const { through } = require('./utils') +const MqttStreamParser = require('./mqtt-stream-parser') module.exports = Client @@ -39,7 +38,7 @@ function Client (broker, conn, req) { this._disconnected = false this._authorized = false - this._parsingBatch = 1 + // for QoS > 0 this._nextId = Math.ceil(Math.random() * 65535) this.req = req @@ -51,50 +50,22 @@ function Client (broker, conn, req) { this.will = null this._will = null - this._parser = mqtt.parser() - this._parser.client = this - this._parser._queue = [] // queue packets received before client fires 'connect' event. Prevents memory leaks on 'connect' event - this._parser.on('packet', enqueue) - this.once('connected', dequeue) - - function nextBatch (err) { - if (err) { - that.emit('error', err) - return - } - - const client = that - - if (client._paused) { - return - } - - that._parsingBatch-- - if (that._parsingBatch <= 0) { - that._parsingBatch = 0 - var buf = client.conn.read(null) - if (!client.connackSent && client.broker.decodeProtocol && client.broker.trustProxy && buf) { - const { data } = client.broker.decodeProtocol(client, buf) - if (data) { - client._parser.parse(data) - } else { - client._parser.parse(buf) - } - } else if (buf) { - client._parser.parse(buf) - } - } + const mqttStream = new Writable({ objectMode: true, emitClose: false, encoding: 'utf8' }) + mqttStream._write = function (packet, encoding, done) { + handle(that, packet, done) } - this._nextBatch = nextBatch + mqttStream.on('error', this.emit.bind(this, 'error')) + + this._mqttStream = mqttStream + this._mqttParserStream = new MqttStreamParser(this) - conn.on('readable', nextBatch) + conn.pipe(this._mqttParserStream).pipe(mqttStream) this.on('error', onError) conn.on('error', this.emit.bind(this, 'error')) - this._parser.on('error', this.emit.bind(this, 'error')) conn.on('end', this.close.bind(this)) - this._eos = eos(this.conn, this.close.bind(this)) + this._eos = finished(this.conn, this.close.bind(this)) this.deliver0 = function deliverQoS0 (_packet, cb) { const toForward = dedupe(that, _packet) && @@ -261,10 +232,9 @@ Client.prototype.close = function (done) { this.closed = true - this._parser.removeAllListeners('packet') - conn.removeAllListeners('readable') - - this._parser._queue = null + conn.unpipe(this._mqttParserStream) + this._mqttParserStream.end() + this._mqttStream.end() if (this._keepaliveTimer) { this._keepaliveTimer.clear() @@ -334,38 +304,11 @@ Client.prototype.close = function (done) { } Client.prototype.pause = function () { - this._paused = true + this.conn.pause() } Client.prototype.resume = function () { - this._paused = false - this._nextBatch() -} - -function enqueue (packet) { - const client = this.client - client._parsingBatch++ - // already connected or it's the first packet - if (client.connackSent || client._parsingBatch === 1) { - handle(client, packet, client._nextBatch) - } else { - if (this._queue.length < client.broker.queueLimit) { - this._queue.push(packet) - } else { - this.emit('error', new Error('Client queue limit reached')) - } - } -} - -function dequeue () { - const q = this._parser._queue - if (q) { - for (var i = 0, len = q.length; i < len; i++) { - handle(this, q[i], this._nextBatch) - } - - this._parser._queue = null - } + this.conn.resume() } Client.prototype.emptyOutgoingQueue = function (done) { diff --git a/lib/mqtt-stream-parser.js b/lib/mqtt-stream-parser.js new file mode 100644 index 00000000..af1a068d --- /dev/null +++ b/lib/mqtt-stream-parser.js @@ -0,0 +1,27 @@ +'use strict' +const mqtt = require('mqtt-packet') +const { through } = require('./utils') + +module.exports = MqttStreamParser + +function MqttStreamParser (client, options) { + if (!options) options = {} + + const stream = through(process) + const parser = mqtt.parser(options) + + parser.on('packet', push) + parser.on('error', (err) => client.emit('error', err)) + this._parser = parser + + stream.on('error', stream.end.bind(stream)) + + function process (chunk, enc, cb) { + parser.parse(chunk) + cb() + } + function push (packet) { + stream.push(packet) + } + return stream +} diff --git a/test/basic.js b/test/basic.js index 893ffc04..b3208913 100644 --- a/test/basic.js +++ b/test/basic.js @@ -374,19 +374,22 @@ test('disconnect', function (t) { }) }) -test('disconnect client on wrong cmd', function (t) { +test('disconnect client on unknown cmd', function (t) { t.plan(1) - const s = noError(connect(setup()), t) - t.tearDown(s.broker.close.bind(s.broker)) + const broker = aedes() + t.tearDown(broker.close.bind(broker)) - s.broker.on('clientDisconnect', function () { - t.pass('closed stream') + broker.on('clientDisconnect', function (client) { + t.equal(client.id, 'abcde', 'client matches') }) - s.broker.on('clientReady', function (c) { - // don't use stream write here because it will throw an error on mqtt_packet genetete - c._parser.emit('packet', { cmd: 'pippo' }) + noError(connect(setup(broker), { clientId: 'abcde' }), t) + + broker.on('client', function (client) { + client._mqttStream.write({ + cmd: 'UNKNOWN' + }) }) }) diff --git a/test/client-pub-sub.js b/test/client-pub-sub.js index a01c21e3..7117e7fb 100644 --- a/test/client-pub-sub.js +++ b/test/client-pub-sub.js @@ -139,13 +139,13 @@ test('publish QoS 2 throws error in pubrel', function (t) { s.outStream.on('data', function (packet) { if (packet.cmd === 'publish') { + s.broker.persistence.outgoingUpdate = function (client, pubrel, cb) { + cb(new Error('error')) + } s.inStream.write({ cmd: 'pubrec', messageId: packet.messageId }) - s.broker.persistence.outgoingUpdate = function (client, pubrel, cb) { - cb(new Error('error')) - } } }) diff --git a/test/close_socket_by_other_party.js b/test/close_socket_by_other_party.js index da1530bb..5530fb74 100644 --- a/test/close_socket_by_other_party.js +++ b/test/close_socket_by_other_party.js @@ -152,7 +152,7 @@ test('multiple clients subscribe same topic, and all clients still receive messa client1.subscribe(_sameTopic, { qos: 0, retain: false }, () => { t.pass('client1 sub callback') - // stimulate closed socket by users + // simulate closed socket by users client1.stream.destroy() // client 2 @@ -166,6 +166,7 @@ test('multiple clients subscribe same topic, and all clients still receive messa // pubClient const pubClient = mqtt.connect('mqtt://localhost', { clientId: 'pubClient' }) + pubClient.publish(_sameTopic, 'world', { qos: 0, retain: false }, () => { t.pass('pubClient publish event') pubClient.end() diff --git a/test/connect.js b/test/connect.js index b5dba350..084186d2 100644 --- a/test/connect.js +++ b/test/connect.js @@ -4,11 +4,7 @@ const { test } = require('tap') const http = require('http') const ws = require('websocket-stream') const mqtt = require('mqtt') -const mqttPacket = require('mqtt-packet') -const net = require('net') -const proxyProtocol = require('proxy-protocol-js') -const { protocolDecoder } = require('aedes-protocol-decoder') -const { setup, connect, delay } = require('./helper') +const { setup, connect, delay, noError } = require('./helper') const aedes = require('../') ;[{ ver: 3, id: 'MQIsdp' }, { ver: 4, id: 'MQTT' }].forEach(function (ele) { @@ -551,86 +547,78 @@ test('reject clients with wrong protocol name', function (t) { }) }) -test('After first CONNECT Packet, others are queued until \'connect\' event', function (t) { - t.plan(2) - - const queueLimit = 50 - const broker = aedes({ queueLimit }) - t.tearDown(broker.close.bind(broker)) +test('Any queued messages after first CONNECT will be consumed once when broker emits clientReady event', function (t) { + const queued = 15 + t.plan(queued + 3) - const publishP = { + var published = 0 + var publishP = { cmd: 'publish', topic: 'hello', payload: Buffer.from('world'), qos: 0, - retain: false + retain: false, + dup: false } - const connectP = { - cmd: 'connect', - protocolId: 'MQTT', - protocolVersion: 4, - clean: true, - clientId: 'abcde', - keepalive: 0 - } + var broker = aedes({ + preConnect: (client, done) => { + t.ok(client.conn.isPaused()) + t.equal(published, queued, 'store all offlined messages') + done(null, true) + } + }) + t.tearDown(broker.close.bind(broker)) - const s = setup(broker) - s.inStream.write(connectP) + var s = noError(connect(setup(broker)), t) - process.once('warning', e => t.fail('Memory leak detected')) + broker.on('clientReady', function (client) { + t.notOk(client.conn.isPaused()) + }) + s.broker.mq.on('hello', function (packet, cb) { + delete packet.brokerId + delete packet.brokerCounter + t.deepEqual(packet, publishP, 'packet matches') + cb() + }) - for (let i = 0; i < queueLimit; i++) { + for (; published < queued; published++) { s.inStream.write(publishP) } - - broker.on('client', function (client) { - t.equal(client._parser._queue.length, queueLimit, 'Packets have been queued') - - client.once('connected', () => { - t.equal(client._parser._queue, null, 'Queue is empty') - s.conn.destroy() - }) - }) }) -test('Test queue limit', function (t) { - t.plan(1) - - const queueLimit = 50 - const broker = aedes({ queueLimit }) - t.tearDown(broker.close.bind(broker)) +test('Any queued messages after first CONNECT will be dropped once when authentication fails', function (t) { + t.plan(2) - const publishP = { + var published = 0 + var publishP = { cmd: 'publish', topic: 'hello', payload: Buffer.from('world'), qos: 0, - retain: false + retain: false, + dup: false } + const queued = 15 - const connectP = { - cmd: 'connect', - protocolId: 'MQTT', - protocolVersion: 4, - clean: true, - clientId: 'abcde', - keepalive: 0 - } + var broker = aedes({ + authenticate: (client, username, password, done) => { + t.ok(client.conn.isPaused()) + t.equal(published, queued, 'store all offlined messages') + done(null, false) + } + }) + t.tearDown(broker.close.bind(broker)) - const s = setup(broker) - s.inStream.write(connectP) + var s = connect(setup(broker)) - process.once('warning', e => t.fail('Memory leak detected')) + s.broker.mq.on('hello', function (packet, cb) { + t.fail('no queued messages are consumed') + }) - for (let i = 0; i < queueLimit + 1; i++) { + for (; published < queued; published++) { s.inStream.write(publishP) } - - broker.on('connectionError', function (conn, err) { - t.equal(err.message, 'Client queue limit reached', 'Queue error is thrown') - s.conn.destroy() - }) }) ;[['fail with no error msg', 3, null, false], ['succeed with no error msg', 9, null, true], ['fail with error msg', 6, new Error('connection banned'), false], ['succeed with error msg', 6, new Error('connection banned'), true]].forEach(function (ele, idx) { @@ -730,342 +718,3 @@ test('websocket clients have access to the request object', function (t) { server.close() }) }) - -// test ipAddress property presence when trustProxy is enabled -test('tcp clients have access to the ipAddress from the socket', function (t) { - t.plan(2) - - const port = 4883 - const broker = aedes({ - preConnect: function (client, done) { - if (client && client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal('::ffff:127.0.0.1', client.ip) - } else { - t.fail('no ip address present') - } - done(null, true) - }, - decodeProtocol: protocolDecoder, - trustProxy: true - }) - - const server = net.createServer(broker.handle) - server.listen(port, function (err) { - t.error(err, 'no error') - }) - - const client = mqtt.connect({ - port, - keepalive: 0, - clientId: 'mqtt-client', - clean: false - }) - - t.tearDown(() => { - client.end(true) - broker.close() - server.close() - }) -}) - -test('tcp proxied (protocol v1) clients have access to the ipAddress(v4)', function (t) { - t.plan(2) - - const port = 4883 - const clientIp = '192.168.0.140' - const packet = { - cmd: 'connect', - protocolId: 'MQIsdp', - protocolVersion: 3, - clean: true, - clientId: 'my-client-proxyV1', - keepalive: 0 - } - - const buf = mqttPacket.generate(packet) - const src = new proxyProtocol.Peer(clientIp, 12345) - const dst = new proxyProtocol.Peer('127.0.0.1', port) - const protocol = new proxyProtocol.V1BinaryProxyProtocol( - proxyProtocol.INETProtocol.TCP4, - src, - dst, - buf - ).build() - - const broker = aedes({ - preConnect: function (client, done) { - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal(clientIp, client.ip) - } else { - t.fail('no ip address present') - } - done(null, true) - }, - decodeProtocol: protocolDecoder, - trustProxy: true - }) - - const server = net.createServer(broker.handle) - server.listen(port, function (err) { - t.error(err, 'no error') - }) - - const client = net.connect({ - port, - timeout: 0 - }, function () { - client.write(protocol) - }) - - t.tearDown(() => { - client.end() - broker.close() - server.close() - }) -}) - -test('tcp proxied (protocol v2) clients have access to the ipAddress(v4)', function (t) { - t.plan(2) - - const port = 4883 - const clientIp = '192.168.0.140' - const packet = { - cmd: 'connect', - protocolId: 'MQTT', - protocolVersion: 4, - clean: true, - clientId: 'my-client-proxyV2' - } - - const protocol = new proxyProtocol.V2ProxyProtocol( - proxyProtocol.Command.LOCAL, - proxyProtocol.TransportProtocol.DGRAM, - new proxyProtocol.IPv4ProxyAddress( - proxyProtocol.IPv4Address.createFrom(clientIp.split('.')), - 12345, - proxyProtocol.IPv4Address.createFrom([127, 0, 0, 1]), - port - ), - mqttPacket.generate(packet) - ).build() - - const broker = aedes({ - preConnect: function (client, done) { - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal(clientIp, client.ip) - } else { - t.fail('no ip address present') - } - done(null, true) - }, - decodeProtocol: protocolDecoder, - trustProxy: true - }) - - const server = net.createServer(broker.handle) - server.listen(port, function (err) { - t.error(err, 'no error') - }) - - const client = net.createConnection( - { - port, - timeout: 0 - }, function () { - client.write(Buffer.from(protocol)) - } - ) - - t.tearDown(() => { - client.end() - broker.close() - server.close() - }) -}) - -test('tcp proxied (protocol v2) clients have access to the ipAddress(v6)', function (t) { - t.plan(2) - - const port = 4883 - const clientIpArray = [0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 192, 168, 1, 128] - const clientIp = '::ffff:c0a8:180:' - const packet = { - cmd: 'connect', - protocolId: 'MQTT', - protocolVersion: 4, - clean: true, - clientId: 'my-client-proxyV2' - } - - const protocol = new proxyProtocol.V2ProxyProtocol( - proxyProtocol.Command.PROXY, - proxyProtocol.TransportProtocol.STREAM, - new proxyProtocol.IPv6ProxyAddress( - proxyProtocol.IPv6Address.createFrom(clientIpArray), - 12345, - proxyProtocol.IPv6Address.createWithEmptyAddress(), - port - ), - mqttPacket.generate(packet) - ).build() - - const broker = aedes({ - preConnect: function (client, done) { - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal(clientIp, client.ip) - } else { - t.fail('no ip address present') - } - done(null, true) - }, - decodeProtocol: protocolDecoder, - trustProxy: true - }) - - const server = net.createServer(broker.handle) - server.listen(port, function (err) { - t.error(err, 'no error') - }) - - const client = net.createConnection( - { - port, - timeout: 0 - }, function () { - client.write(Buffer.from(protocol)) - } - ) - - t.tearDown(() => { - client.end() - broker.close() - server.close() - }) -}) - -test('websocket clients have access to the ipAddress from the socket (if no ip header)', function (t) { - t.plan(2) - - const clientIp = '::ffff:127.0.0.1' - const port = 4883 - const broker = aedes({ - preConnect: function (client, done) { - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal(clientIp, client.ip) - } else { - t.fail('no ip address present') - } - done(null, true) - }, - decodeProtocol: protocolDecoder, - trustProxy: true - }) - - const server = http.createServer() - ws.createServer({ - server: server - }, broker.handle) - - server.listen(port, function (err) { - t.error(err, 'no error') - }) - - const client = mqtt.connect(`ws://localhost:${port}`) - - t.tearDown(() => { - client.end(true) - broker.close() - server.close() - }) -}) - -test('websocket proxied clients have access to the ipAddress from x-real-ip header', function (t) { - t.plan(2) - - const clientIp = '192.168.0.140' - const port = 4883 - const broker = aedes({ - preConnect: function (client, done) { - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal(clientIp, client.ip) - } else { - t.fail('no ip address present') - } - done(null, true) - }, - decodeProtocol: protocolDecoder, - trustProxy: true - }) - - const server = http.createServer() - ws.createServer({ - server: server - }, broker.handle) - - server.listen(port, function (err) { - t.error(err, 'no error') - }) - - const client = mqtt.connect(`ws://localhost:${port}`, { - wsOptions: { - headers: { - 'X-Real-Ip': clientIp - } - } - }) - - t.tearDown(() => { - client.end(true) - broker.close() - server.close() - }) -}) - -test('websocket proxied clients have access to the ipAddress from x-forwarded-for header', function (t) { - t.plan(2) - - const clientIp = '192.168.0.140' - const port = 4883 - const broker = aedes({ - preConnect: function (client, done) { - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal(clientIp, client.ip) - } else { - t.fail('no ip address present') - } - done(null, true) - }, - decodeProtocol: protocolDecoder, - trustProxy: true - }) - - const server = http.createServer() - ws.createServer({ - server: server - }, broker.handle) - - server.listen(port, function (err) { - t.error(err, 'no error') - }) - - const client = mqtt.connect(`ws://localhost:${port}`, { - wsOptions: { - headers: { - 'X-Forwarded-For': clientIp - } - } - }) - - t.tearDown(() => { - client.end(true) - broker.close() - server.close() - }) -}) diff --git a/test/meta.js b/test/meta.js index b822ed45..5ce182c5 100644 --- a/test/meta.js +++ b/test/meta.js @@ -259,9 +259,7 @@ test('emit clientDisconnect event', function (t) { const s = noError(connect(setup(broker), { clientId: 'abcde' }), t) - s.inStream.end({ - cmd: 'disconnect' - }) + s.inStream.end() s.outStream.resume() }) diff --git a/test/qos1.js b/test/qos1.js index 202271ae..2a6884de 100644 --- a/test/qos1.js +++ b/test/qos1.js @@ -771,7 +771,7 @@ test('downgrade QoS 0 publish on QoS 1 subsciption', function (t) { }) }) -test('subscribe and publish QoS 1 in parallel', function (t) { +test('subscribe and publish QoS 1 in a row', function (t) { t.plan(5) const broker = aedes() @@ -789,17 +789,16 @@ test('subscribe and publish QoS 1 in parallel', function (t) { } broker.on('clientError', function (client, err) { - console.log(err.stack) - // t.fail('no client error') + t.fail('no client error') }) s.outStream.once('data', function (packet) { - t.equal(packet.cmd, 'puback') - t.equal(packet.messageId, 42, 'messageId must match') + t.equal(packet.cmd, 'suback') + t.deepEqual(packet.granted, [1]) + t.equal(packet.messageId, 24) s.outStream.on('data', function (packet) { - if (packet.cmd === 'suback') { - t.deepEqual(packet.granted, [1]) - t.equal(packet.messageId, 24) + if (packet.cmd === 'puback') { + t.equal(packet.messageId, 42, 'messageId must match') } if (packet.cmd === 'publish') { s.inStream.write({ From 9aba85e73514bd4cbe35d556103e323ba4d8ebe2 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Mon, 6 Jul 2020 17:37:50 +0200 Subject: [PATCH 2/4] fixed some tests --- lib/client.js | 17 +++++-- lib/handlers/connect.js | 2 +- test/connect.js | 104 ++++++++++++++++++++-------------------- test/not-blocking.js | 11 ++++- test/retain.js | 8 ---- 5 files changed, 75 insertions(+), 67 deletions(-) diff --git a/lib/client.js b/lib/client.js index fd057d46..bef7ee09 100644 --- a/lib/client.js +++ b/lib/client.js @@ -50,11 +50,20 @@ function Client (broker, conn, req) { this.will = null this._will = null - const mqttStream = new Writable({ objectMode: true, emitClose: false, encoding: 'utf8' }) - mqttStream._write = function (packet, encoding, done) { + const mqttStream = new Writable({ + objectMode: true, + emitClose: false, + encoding: 'utf8', + write: handlePacket + }) + + function handlePacket (packet, enc, done) { handle(that, packet, done) } - mqttStream.on('error', this.emit.bind(this, 'error')) + + const emitError = this.emit.bind(this, 'error') + + mqttStream.on('error', emitError) this._mqttStream = mqttStream this._mqttParserStream = new MqttStreamParser(this) @@ -62,7 +71,7 @@ function Client (broker, conn, req) { conn.pipe(this._mqttParserStream).pipe(mqttStream) this.on('error', onError) - conn.on('error', this.emit.bind(this, 'error')) + conn.on('error', emitError) conn.on('end', this.close.bind(this)) this._eos = finished(this.conn, this.close.bind(this)) diff --git a/lib/handlers/connect.js b/lib/handlers/connect.js index 2d5dd1b7..c250aff5 100644 --- a/lib/handlers/connect.js +++ b/lib/handlers/connect.js @@ -43,6 +43,7 @@ function handleConnect (client, packet, done) { clearTimeout(client._connectTimer) client._connectTimer = null client.connecting = true + client.pause() client.broker.preConnect(client, negate) function negate (err, successful) { @@ -97,7 +98,6 @@ function init (client, packet, done) { function authenticate (arg, done) { const client = this.client - client.pause() client.broker.authenticate( client, this.packet.username, diff --git a/test/connect.js b/test/connect.js index 084186d2..5be10359 100644 --- a/test/connect.js +++ b/test/connect.js @@ -7,6 +7,58 @@ const mqtt = require('mqtt') const { setup, connect, delay, noError } = require('./helper') const aedes = require('../') +test('reject second CONNECT Packet sent while first CONNECT still in preConnect stage', function (t) { + t.plan(2) + + const packet1 = { + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'my-client-1', + keepalive: 0 + } + const packet2 = { + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'my-client-2', + keepalive: 0 + } + + var i = 0 + const broker = aedes({ + preConnect: function (client, done) { + var ms = i++ === 0 ? 2000 : 500 + setTimeout(function () { + done(null, true) + }, ms) + } + }) + t.tearDown(broker.close.bind(broker)) + + const s = setup(broker) + + broker.on('clientError', function (client, err) { + t.equal(err.info.clientId, 'my-client-2') + t.equal(err.message, 'Invalid protocol') + }) + + const msg = async (s, ms, msg) => { + await delay(ms) + s.inStream.write(msg) + } + + ;(async () => { + await Promise.all([msg(s, 100, packet1), msg(s, 200, packet2)]) + })().catch( + (error) => { + t.fail(error) + } + ) +}) + ;[{ ver: 3, id: 'MQIsdp' }, { ver: 4, id: 'MQTT' }].forEach(function (ele) { test('connect and connack (minimal)', function (t) { t.plan(2) @@ -469,58 +521,6 @@ test('handler calls done when disconnect or unknown packet cmd is received', fun }) }) -test('reject second CONNECT Packet sent while first CONNECT still in preConnect stage', function (t) { - t.plan(2) - - const packet1 = { - cmd: 'connect', - protocolId: 'MQTT', - protocolVersion: 4, - clean: true, - clientId: 'my-client-1', - keepalive: 0 - } - const packet2 = { - cmd: 'connect', - protocolId: 'MQTT', - protocolVersion: 4, - clean: true, - clientId: 'my-client-2', - keepalive: 0 - } - - var i = 0 - const broker = aedes({ - preConnect: function (client, done) { - var ms = i++ === 0 ? 2000 : 500 - setTimeout(function () { - done(null, true) - }, ms) - } - }) - t.tearDown(broker.close.bind(broker)) - - const s = setup(broker) - - broker.on('connectionError', function (client, err) { - t.equal(err.info.clientId, 'my-client-2') - t.equal(err.message, 'Invalid protocol') - }) - - const msg = async (s, ms, msg) => { - await delay(ms) - s.inStream.write(msg) - } - - ;(async () => { - await Promise.all([msg(s, 100, packet1), msg(s, 200, packet2)]) - })().catch( - (error) => { - t.fail(error) - } - ) -}) - // [MQTT-3.1.2-1], Guarded in mqtt-packet test('reject clients with wrong protocol name', function (t) { t.plan(2) diff --git a/test/not-blocking.js b/test/not-blocking.js index 2c8149a6..71d11001 100644 --- a/test/not-blocking.js +++ b/test/not-blocking.js @@ -5,6 +5,7 @@ const mqtt = require('mqtt') const net = require('net') const Faketimers = require('@sinonjs/fake-timers') const aedes = require('../') +const { noError } = require('./helper') test('connect 200 concurrent clients', function (t) { t.plan(3) @@ -59,6 +60,8 @@ test('do not block after a subscription', function (t) { var sent = 0 var received = 0 + noError({ broker }) + server.listen(0, function (err) { t.error(err, 'no error') @@ -71,7 +74,8 @@ test('do not block after a subscription', function (t) { const publisher = mqtt.connect({ port: port, - keepalive: 0 + keepalive: 0, + clientId: 'publisher' }).on('error', function (err) { clock.clearTimeout(clockId) t.fail(err) @@ -95,7 +99,8 @@ test('do not block after a subscription', function (t) { function startSubscriber () { subscriber = mqtt.connect({ port: port, - keepalive: 0 + keepalive: 0, + clientId: 'subscriber' }).on('error', function (err) { if (err.code !== 'ECONNRESET') { clock.clearTimeout(clockId) @@ -136,6 +141,8 @@ test('do not block with overlapping subscription', function (t) { var sent = 0 var received = 0 + noError({ broker }) + server.listen(0, function (err) { t.error(err, 'no error') diff --git a/test/retain.js b/test/retain.js index 134ed9d0..3fe025a8 100644 --- a/test/retain.js +++ b/test/retain.js @@ -519,10 +519,6 @@ test('disconnect and retain messages with QoS 1 [clean=false]', function (t) { cmd: 'disconnect' }) - subscriber.outStream.on('data', function (packet) { - console.log('original', packet) - }) - const publisher = noError(connect(setup(broker)), t) publisher.inStream.write({ @@ -578,10 +574,6 @@ test('disconnect and two retain messages with QoS 1 [clean=false]', function (t) cmd: 'disconnect' }) - subscriber.outStream.on('data', function (packet) { - console.log('original', packet) - }) - const publisher = noError(connect(setup(broker)), t) publisher.inStream.write({ From 0bbace53e0bef76fdb6502ed1a873d7832b0ec0d Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Mon, 6 Jul 2020 17:59:56 +0200 Subject: [PATCH 3/4] fixed not-blocking tests --- test/not-blocking.js | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/test/not-blocking.js b/test/not-blocking.js index 71d11001..b1e61498 100644 --- a/test/not-blocking.js +++ b/test/not-blocking.js @@ -88,9 +88,7 @@ test('do not block after a subscription', function (t) { } function publish () { - if (sent === total) { - publisher.end() - } else { + if (sent !== total) { sent++ publisher.publish('test', 'payload', immediatePublish) } @@ -168,9 +166,7 @@ test('do not block with overlapping subscription', function (t) { } function publish () { - if (sent === total) { - publisher.end() - } else { + if (sent !== total) { sent++ publisher.publish('test', 'payload', immediatePublish) } From 2d6de56e9af61fc01d8c1b68f8ffccb23bf1c286 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Mon, 6 Jul 2020 18:01:31 +0200 Subject: [PATCH 4/4] fixed close socket tests --- test/close_socket_by_other_party.js | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/close_socket_by_other_party.js b/test/close_socket_by_other_party.js index 5530fb74..2c3faf40 100644 --- a/test/close_socket_by_other_party.js +++ b/test/close_socket_by_other_party.js @@ -137,9 +137,6 @@ test('multiple clients subscribe same topic, and all clients still receive messa const server = require('net').createServer(broker.handle) const port = 1883 server.listen(port) - broker.on('clientError', function (client, err) { - t.error(err) - }) var client1, client2 const _sameTopic = 'hello'