Skip to content

Commit

Permalink
destroy the ongoing stream in case of a disconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
mcollina committed Aug 25, 2018
1 parent f41ea10 commit 59fd16e
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 5 deletions.
25 changes: 20 additions & 5 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,28 @@ function MqttClient (streamBuilder, options) {
}

this.connected = true
var outStore = null
outStore = this.outgoingStore.createStream()
var outStore = this.outgoingStore.createStream()

this.once('close', remove)
outStore.on('end', function () {
that.removeListener('close', remove)
})
outStore.on('error', function (err) {
that.removeListener('close', remove)
that.emit('error', err)
})

function remove () {
outStore.destroy()
outStore = null
}

function storeDeliver () {
// edge case, we wrapped this twice
if (!outStore) {
return
}

var packet = outStore.read(1)
var cb

Expand All @@ -166,9 +184,6 @@ function MqttClient (streamBuilder, options) {
}
}

// Control of stored messages
outStore.on('error', this.emit.bind(this, 'error'))

// start flowing
storeDeliver()
})
Expand Down
26 changes: 26 additions & 0 deletions test/abstract_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,30 @@ module.exports = function abstractStoreTest (build) {
})
})
})

it('should replace a packet when doing put with the same messageId', function (done) {
var packet1 = {
cmd: 'publish', // added
topic: 'hello',
payload: 'world',
qos: 2,
messageId: 42
}
var packet2 = {
cmd: 'pubrel', // added
qos: 2,
messageId: 42
}

store.put(packet1, function () {
store.put(packet2, function () {
store
.createStream()
.on('data', function (data) {
data.should.eql(packet2)
done()
})
})
})
})
}

0 comments on commit 59fd16e

Please sign in to comment.