Skip to content

Commit

Permalink
Fixed #952. (#953)
Browse files Browse the repository at this point in the history
* Fixed #952.

Conditional flush `outgoing` on close.

Scenario:
1. The client connect to the server.
2. The client sends subscribe to the server.
3. The server destroys the client connection before suback sending.
4. The client detect `close` event, then reconnects to the server.

At the step4, `outgoing` still stored the callback for
subscribe. However, it has never called because server doen't send
corresponding suback.
The same thing happens on unsubscribe.

So I defined subscribe/unsubscribe as volatile.
The volatile type of `outgoing` entries should be cleared when `close`
from the server is detected.

On the contrary, QoS1 and QoS2 publish is not volatile. Because they are
resent after reconnection. And then, callback in the `store` is
called. This behavior shouldn't be changed.

So I added `volatile` flag to `outgoing` element.

* Fixed cb accessing code.

If `outgoing[mid]` doesn't match, then accessing `outgoing[mid].cb`
causes `Cannot read property` error. So added checking code.

Fixed outgoing assignment at `_onConnect` function.
  • Loading branch information
redboltz authored and mcollina committed May 27, 2019
1 parent 84ca344 commit 481e560
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 21 deletions.
66 changes: 45 additions & 21 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,19 @@ function sendPacket (client, packet, cb) {
function flush (queue) {
if (queue) {
Object.keys(queue).forEach(function (messageId) {
if (typeof queue[messageId] === 'function') {
queue[messageId](new Error('Connection closed'))
if (typeof queue[messageId].cb === 'function') {
queue[messageId].cb(new Error('Connection closed'))
delete queue[messageId]
}
})
}
}

function flushVolatile (queue) {
if (queue) {
Object.keys(queue).forEach(function (messageId) {
if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') {
queue[messageId].cb(new Error('Connection closed'))
delete queue[messageId]
}
})
Expand Down Expand Up @@ -290,6 +301,7 @@ MqttClient.prototype._setupStream = function () {

// Echo stream close
this.stream.on('close', function () {
flushVolatile(that.outgoing)
that.emit('close')
})

Expand Down Expand Up @@ -447,7 +459,10 @@ MqttClient.prototype.publish = function (topic, message, opts, callback) {
case 1:
case 2:
// Add to callbacks
this.outgoing[packet.messageId] = callback || nop
this.outgoing[packet.messageId] = {
volatile: false,
cb: callback || nop
}
if (this._storeProcessing) {
this._packetIdsDuringStoreProcessing[packet.messageId] = false
this._storePacket(packet, undefined, opts.cbStorePut)
Expand Down Expand Up @@ -606,15 +621,18 @@ MqttClient.prototype.subscribe = function () {
that.messageIdToTopic[packet.messageId] = topics
}

this.outgoing[packet.messageId] = function (err, packet) {
if (!err) {
var granted = packet.granted
for (var i = 0; i < granted.length; i += 1) {
subs[i].qos = granted[i]
this.outgoing[packet.messageId] = {
volatile: true,
cb: function (err, packet) {
if (!err) {
var granted = packet.granted
for (var i = 0; i < granted.length; i += 1) {
subs[i].qos = granted[i]
}
}
}

callback(err, subs)
callback(err, subs)
}
}

this._sendPacket(packet)
Expand Down Expand Up @@ -678,7 +696,10 @@ MqttClient.prototype.unsubscribe = function () {
packet.properties = opts.properties
}

this.outgoing[packet.messageId] = callback
this.outgoing[packet.messageId] = {
volatile: true,
cb: callback
}

this._sendPacket(packet)

Expand Down Expand Up @@ -772,7 +793,7 @@ MqttClient.prototype.end = function () {
* @example client.removeOutgoingMessage(client.getLastMessageId());
*/
MqttClient.prototype.removeOutgoingMessage = function (mid) {
var cb = this.outgoing[mid]
var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null
delete this.outgoing[mid]
this.outgoingStore.del({messageId: mid}, function () {
cb(new Error('Message removed'))
Expand Down Expand Up @@ -957,7 +978,7 @@ MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) {
if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
this.queue.push({ packet: packet, cb: cb })
} else if (packet.qos > 0) {
cb = this.outgoing[packet.messageId]
cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null
this.outgoingStore.put(packet, function (err) {
if (err) {
return cb && cb(err)
Expand Down Expand Up @@ -1172,7 +1193,7 @@ MqttClient.prototype._handleAck = function (packet) {
var mid = packet.messageId
var type = packet.cmd
var response = null
var cb = this.outgoing[mid]
var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null
var that = this
var err

Expand Down Expand Up @@ -1395,14 +1416,17 @@ MqttClient.prototype._onConnect = function (packet) {

// Avoid unnecessary stream read operations when disconnected
if (!that.disconnecting && !that.reconnectTimer) {
cb = that.outgoing[packet.messageId]
that.outgoing[packet.messageId] = function (err, status) {
// Ensure that the original callback passed in to publish gets invoked
if (cb) {
cb(err, status)
cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null
that.outgoing[packet.messageId] = {
volatile: false,
cb: function (err, status) {
// Ensure that the original callback passed in to publish gets invoked
if (cb) {
cb(err, status)
}

storeDeliver()
}

storeDeliver()
}
that._packetIdsDuringStoreProcessing[packet.messageId] = true
that._sendPacket(packet)
Expand Down
51 changes: 51 additions & 0 deletions test/abstract_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -2682,6 +2682,57 @@ module.exports = function (server, config) {
})
})

it('should clear outgoing if close from server', function (done) {
var reconnect = false
var client = {}
var server2 = new Server(function (c) {
c.on('connect', function (packet) {
c.connack({returnCode: 0})
})
c.on('subscribe', function (packet) {
if (reconnect) {
c.suback({
messageId: packet.messageId,
granted: packet.subscriptions.map(function (e) {
return e.qos
})
})
} else {
c.destroy()
}
})
})

server2.listen(port + 50, function () {
client = mqtt.connect({
port: port + 50,
host: 'localhost',
clean: true,
clientId: 'cid1',
reconnectPeriod: 0
})

client.on('connect', function () {
client.subscribe('test', {qos: 2}, function (e) {
if (!e) {
client.end()
}
})
})

client.on('close', function () {
if (reconnect) {
server2.close()
done()
} else {
Object.keys(client.outgoing).length.should.equal(0)
reconnect = true
client.reconnect()
}
})
})
})

it('should resend in-flight QoS 1 publish messages from the client if clean is false', function (done) {
var reconnect = false
var client = {}
Expand Down

0 comments on commit 481e560

Please sign in to comment.