From f41ea1026ad39f470169a03c352004220780e591 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Sat, 25 Aug 2018 12:32:27 +0200 Subject: [PATCH 1/2] Proper use of `'readable'` event. --- lib/client.js | 51 ++++++++++++++++++++++------------------- test/abstract_client.js | 25 ++++++++++++++++++++ 2 files changed, 52 insertions(+), 24 deletions(-) diff --git a/lib/client.js b/lib/client.js index 2dfbc18d2..4d23c4f7e 100644 --- a/lib/client.js +++ b/lib/client.js @@ -139,35 +139,38 @@ function MqttClient (streamBuilder, options) { var outStore = null outStore = this.outgoingStore.createStream() - // Control of stored messages - outStore.once('readable', function () { - function storeDeliver () { - var packet = outStore.read(1) - var cb + function storeDeliver () { + var packet = outStore.read(1) + var cb - if (!packet) { - return - } + if (!packet) { + // read when data is available in the future + outStore.once('readable', storeDeliver) + return + } - // Avoid unnecessary stream read operations when disconnected - if (!that.disconnecting && !that.reconnectTimer) { - outStore.read(0) - cb = that.outgoing[packet.messageId] - that.outgoing[packet.messageId] = function (err, status) { - // Ensure that the original callback passed in to publish gets invoked - if (cb) { - cb(err, status) - } - - storeDeliver() + // Avoid unnecessary stream read operations when disconnected + if (!that.disconnecting && !that.reconnectTimer) { + cb = that.outgoing[packet.messageId] + that.outgoing[packet.messageId] = function (err, status) { + // Ensure that the original callback passed in to publish gets invoked + if (cb) { + cb(err, status) } - that._sendPacket(packet) - } else if (outStore.destroy) { - outStore.destroy() + + storeDeliver() } + that._sendPacket(packet) + } else if (outStore.destroy) { + outStore.destroy() } - storeDeliver() - }).on('error', this.emit.bind(this, 'error')) + } + + // Control of stored messages + outStore.on('error', this.emit.bind(this, 'error')) + + // start flowing + storeDeliver() }) // Mark disconnected on stream close diff --git a/test/abstract_client.js b/test/abstract_client.js index e97854d6c..bcab00410 100644 --- a/test/abstract_client.js +++ b/test/abstract_client.js @@ -1911,6 +1911,31 @@ module.exports = function (server, config) { } }) + it('should not resend in-flight publish messages if disconnecting', function (done) { + var client = connect({reconnectPeriod: 200}) + var serverPublished = false + var clientCalledBack = false + server.once('client', function (serverClient) { + serverClient.on('connect', function () { + setImmediate(function () { + serverClient.stream.destroy() + client.end() + serverPublished.should.be.false() + clientCalledBack.should.be.false() + done() + }) + }) + server.once('client', function (serverClientNew) { + serverClientNew.on('publish', function () { + serverPublished = true + }) + }) + }) + client.publish('hello', 'world', { qos: 1 }, function () { + clientCalledBack = true + }) + }) + it('should resend in-flight QoS 2 publish messages from the client', function (done) { var client = connect({reconnectPeriod: 200}) var serverPublished = false From 59fd16e5868e9d293df571f90aec352fe29a84e9 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Sat, 25 Aug 2018 14:48:47 +0200 Subject: [PATCH 2/2] destroy the ongoing stream in case of a disconnect --- lib/client.js | 25 ++++++++++++++++++++----- test/abstract_store.js | 26 ++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/lib/client.js b/lib/client.js index 4d23c4f7e..83819db36 100644 --- a/lib/client.js +++ b/lib/client.js @@ -136,10 +136,28 @@ function MqttClient (streamBuilder, options) { } this.connected = true - var outStore = null - outStore = this.outgoingStore.createStream() + var outStore = this.outgoingStore.createStream() + + this.once('close', remove) + outStore.on('end', function () { + that.removeListener('close', remove) + }) + outStore.on('error', function (err) { + that.removeListener('close', remove) + that.emit('error', err) + }) + + function remove () { + outStore.destroy() + outStore = null + } function storeDeliver () { + // edge case, we wrapped this twice + if (!outStore) { + return + } + var packet = outStore.read(1) var cb @@ -166,9 +184,6 @@ function MqttClient (streamBuilder, options) { } } - // Control of stored messages - outStore.on('error', this.emit.bind(this, 'error')) - // start flowing storeDeliver() }) diff --git a/test/abstract_store.js b/test/abstract_store.js index 28f4dda50..faea25142 100644 --- a/test/abstract_store.js +++ b/test/abstract_store.js @@ -130,4 +130,30 @@ module.exports = function abstractStoreTest (build) { }) }) }) + + it('should replace a packet when doing put with the same messageId', function (done) { + var packet1 = { + cmd: 'publish', // added + topic: 'hello', + payload: 'world', + qos: 2, + messageId: 42 + } + var packet2 = { + cmd: 'pubrel', // added + qos: 2, + messageId: 42 + } + + store.put(packet1, function () { + store.put(packet2, function () { + store + .createStream() + .on('data', function (data) { + data.should.eql(packet2) + done() + }) + }) + }) + }) }