diff --git a/lib/client.js b/lib/client.js index 2dfbc18d2..83819db36 100644 --- a/lib/client.js +++ b/lib/client.js @@ -136,38 +136,56 @@ function MqttClient (streamBuilder, options) { } this.connected = true - var outStore = null - outStore = this.outgoingStore.createStream() + var outStore = this.outgoingStore.createStream() - // Control of stored messages - outStore.once('readable', function () { - function storeDeliver () { - var packet = outStore.read(1) - var cb + this.once('close', remove) + outStore.on('end', function () { + that.removeListener('close', remove) + }) + outStore.on('error', function (err) { + that.removeListener('close', remove) + that.emit('error', err) + }) - if (!packet) { - return - } + function remove () { + outStore.destroy() + outStore = null + } + + function storeDeliver () { + // edge case, we wrapped this twice + if (!outStore) { + return + } - // Avoid unnecessary stream read operations when disconnected - if (!that.disconnecting && !that.reconnectTimer) { - outStore.read(0) - cb = that.outgoing[packet.messageId] - that.outgoing[packet.messageId] = function (err, status) { - // Ensure that the original callback passed in to publish gets invoked - if (cb) { - cb(err, status) - } - - storeDeliver() + var packet = outStore.read(1) + var cb + + if (!packet) { + // read when data is available in the future + outStore.once('readable', storeDeliver) + return + } + + // Avoid unnecessary stream read operations when disconnected + if (!that.disconnecting && !that.reconnectTimer) { + cb = that.outgoing[packet.messageId] + that.outgoing[packet.messageId] = function (err, status) { + // Ensure that the original callback passed in to publish gets invoked + if (cb) { + cb(err, status) } - that._sendPacket(packet) - } else if (outStore.destroy) { - outStore.destroy() + + storeDeliver() } + that._sendPacket(packet) + } else if (outStore.destroy) { + outStore.destroy() } - storeDeliver() - }).on('error', this.emit.bind(this, 'error')) + } + + // start flowing + storeDeliver() }) // Mark disconnected on stream close diff --git a/test/abstract_client.js b/test/abstract_client.js index e97854d6c..bcab00410 100644 --- a/test/abstract_client.js +++ b/test/abstract_client.js @@ -1911,6 +1911,31 @@ module.exports = function (server, config) { } }) + it('should not resend in-flight publish messages if disconnecting', function (done) { + var client = connect({reconnectPeriod: 200}) + var serverPublished = false + var clientCalledBack = false + server.once('client', function (serverClient) { + serverClient.on('connect', function () { + setImmediate(function () { + serverClient.stream.destroy() + client.end() + serverPublished.should.be.false() + clientCalledBack.should.be.false() + done() + }) + }) + server.once('client', function (serverClientNew) { + serverClientNew.on('publish', function () { + serverPublished = true + }) + }) + }) + client.publish('hello', 'world', { qos: 1 }, function () { + clientCalledBack = true + }) + }) + it('should resend in-flight QoS 2 publish messages from the client', function (done) { var client = connect({reconnectPeriod: 200}) var serverPublished = false diff --git a/test/abstract_store.js b/test/abstract_store.js index 28f4dda50..faea25142 100644 --- a/test/abstract_store.js +++ b/test/abstract_store.js @@ -130,4 +130,30 @@ module.exports = function abstractStoreTest (build) { }) }) }) + + it('should replace a packet when doing put with the same messageId', function (done) { + var packet1 = { + cmd: 'publish', // added + topic: 'hello', + payload: 'world', + qos: 2, + messageId: 42 + } + var packet2 = { + cmd: 'pubrel', // added + qos: 2, + messageId: 42 + } + + store.put(packet1, function () { + store.put(packet2, function () { + store + .createStream() + .on('data', function (data) { + data.should.eql(packet2) + done() + }) + }) + }) + }) }