Skip to content

Commit

Permalink
Merge pull request #859 from mqttjs/fix-use-of-readable
Browse files Browse the repository at this point in the history
Proper use of `'readable'` event.
  • Loading branch information
mcollina authored Aug 25, 2018
2 parents 797f9ea + 59fd16e commit 9a3ff89
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 26 deletions.
70 changes: 44 additions & 26 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,38 +136,56 @@ function MqttClient (streamBuilder, options) {
}

this.connected = true
var outStore = null
outStore = this.outgoingStore.createStream()
var outStore = this.outgoingStore.createStream()

// Control of stored messages
outStore.once('readable', function () {
function storeDeliver () {
var packet = outStore.read(1)
var cb
this.once('close', remove)
outStore.on('end', function () {
that.removeListener('close', remove)
})
outStore.on('error', function (err) {
that.removeListener('close', remove)
that.emit('error', err)
})

if (!packet) {
return
}
function remove () {
outStore.destroy()
outStore = null
}

function storeDeliver () {
// edge case, we wrapped this twice
if (!outStore) {
return
}

// Avoid unnecessary stream read operations when disconnected
if (!that.disconnecting && !that.reconnectTimer) {
outStore.read(0)
cb = that.outgoing[packet.messageId]
that.outgoing[packet.messageId] = function (err, status) {
// Ensure that the original callback passed in to publish gets invoked
if (cb) {
cb(err, status)
}

storeDeliver()
var packet = outStore.read(1)
var cb

if (!packet) {
// read when data is available in the future
outStore.once('readable', storeDeliver)
return
}

// Avoid unnecessary stream read operations when disconnected
if (!that.disconnecting && !that.reconnectTimer) {
cb = that.outgoing[packet.messageId]
that.outgoing[packet.messageId] = function (err, status) {
// Ensure that the original callback passed in to publish gets invoked
if (cb) {
cb(err, status)
}
that._sendPacket(packet)
} else if (outStore.destroy) {
outStore.destroy()

storeDeliver()
}
that._sendPacket(packet)
} else if (outStore.destroy) {
outStore.destroy()
}
storeDeliver()
}).on('error', this.emit.bind(this, 'error'))
}

// start flowing
storeDeliver()
})

// Mark disconnected on stream close
Expand Down
25 changes: 25 additions & 0 deletions test/abstract_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1911,6 +1911,31 @@ module.exports = function (server, config) {
}
})

it('should not resend in-flight publish messages if disconnecting', function (done) {
var client = connect({reconnectPeriod: 200})
var serverPublished = false
var clientCalledBack = false
server.once('client', function (serverClient) {
serverClient.on('connect', function () {
setImmediate(function () {
serverClient.stream.destroy()
client.end()
serverPublished.should.be.false()
clientCalledBack.should.be.false()
done()
})
})
server.once('client', function (serverClientNew) {
serverClientNew.on('publish', function () {
serverPublished = true
})
})
})
client.publish('hello', 'world', { qos: 1 }, function () {
clientCalledBack = true
})
})

it('should resend in-flight QoS 2 publish messages from the client', function (done) {
var client = connect({reconnectPeriod: 200})
var serverPublished = false
Expand Down
26 changes: 26 additions & 0 deletions test/abstract_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,30 @@ module.exports = function abstractStoreTest (build) {
})
})
})

it('should replace a packet when doing put with the same messageId', function (done) {
var packet1 = {
cmd: 'publish', // added
topic: 'hello',
payload: 'world',
qos: 2,
messageId: 42
}
var packet2 = {
cmd: 'pubrel', // added
qos: 2,
messageId: 42
}

store.put(packet1, function () {
store.put(packet2, function () {
store
.createStream()
.on('data', function (data) {
data.should.eql(packet2)
done()
})
})
})
})
}

0 comments on commit 9a3ff89

Please sign in to comment.