diff --git a/src/lease-manager.ts b/src/lease-manager.ts index 084f4b759..e543a290b 100644 --- a/src/lease-manager.ts +++ b/src/lease-manager.ts @@ -153,9 +153,10 @@ export class LeaseManager extends EventEmitter { * @fires LeaseManager#free * * @param {Message} message The message to remove. + * @param {boolean} runNextInQueue Should dispense next awaiting message * @private */ - remove(message: Message): void { + remove(message: Message, runNextInQueue: boolean): void { if (!this._messages.has(message)) { return; } @@ -170,7 +171,7 @@ export class LeaseManager extends EventEmitter { } else if (this._pending.includes(message)) { const index = this._pending.indexOf(message); this._pending.splice(index, 1); - } else if (this.pending > 0) { + } else if (runNextInQueue && this.pending > 0) { this._dispense(this._pending.shift()!); } @@ -262,13 +263,13 @@ export class LeaseManager extends EventEmitter { // In the case of a permanent failure (temporary failures are retried), // we need to stop trying to lease-manage the message. message.ackFailed(e as AckError); - this.remove(message); + this.remove(message, false); }); } else { message.modAck(deadline); } } else { - this.remove(message); + this.remove(message, false); } } diff --git a/src/subscriber.ts b/src/subscriber.ts index fe5defc8e..b909ae7dc 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -571,7 +571,7 @@ export class Subscriber extends EventEmitter { resultPromise.catch(() => {}); await this._acks.onFlush(); - this._inventory.remove(message); + this._inventory.remove(message, true); } /** @@ -686,7 +686,7 @@ export class Subscriber extends EventEmitter { */ async nack(message: Message): Promise { await this.modAck(message, 0); - this._inventory.remove(message); + this._inventory.remove(message, true); } /** diff --git a/test/lease-manager.ts b/test/lease-manager.ts index 2ceb17cf7..49c3ea141 100644 --- a/test/lease-manager.ts +++ b/test/lease-manager.ts @@ -425,7 +425,7 @@ describe('LeaseManager', () => { const message = new FakeMessage(); leaseManager.add(message as {} as Message); - leaseManager.remove(new FakeMessage() as {} as Message); + leaseManager.remove(new FakeMessage() as {} as Message, false); assert.strictEqual(leaseManager.size, 1); assert.strictEqual(leaseManager.bytes, message.length); @@ -435,7 +435,7 @@ describe('LeaseManager', () => { const message = new FakeMessage() as {} as Message; leaseManager.add(message); - leaseManager.remove(message); + leaseManager.remove(message, false); assert.strictEqual(leaseManager.size, 0); assert.strictEqual(leaseManager.bytes, 0); @@ -446,7 +446,7 @@ describe('LeaseManager', () => { leaseManager.setOptions({maxMessages: 1}); leaseManager.add(message); - setImmediate(() => leaseManager.remove(message)); + setImmediate(() => leaseManager.remove(message, false)); leaseManager.on('free', () => { assert.strictEqual(leaseManager.size, 0); @@ -467,7 +467,7 @@ describe('LeaseManager', () => { leaseManager.add(new FakeMessage() as {} as Message); leaseManager.add(pending); - leaseManager.remove(pending); + leaseManager.remove(pending, true); assert.strictEqual(leaseManager.pending, 0); setImmediate(done); @@ -491,7 +491,7 @@ describe('LeaseManager', () => { leaseManager.add(temp); leaseManager.add(pending); - leaseManager.remove(temp); + leaseManager.remove(temp, true); }); it('should cancel any extensions if no messages are left', () => { @@ -500,7 +500,7 @@ describe('LeaseManager', () => { const stub = sandbox.stub(subscriber, 'modAck').resolves(); leaseManager.add(message); - leaseManager.remove(message); + leaseManager.remove(message, true); clock.tick(subscriber.ackDeadline * 1000 * 2); @@ -518,7 +518,7 @@ describe('LeaseManager', () => { leaseManager.add(littleMessage); assert.strictEqual(leaseManager.isFull(), false); - leaseManager.remove(littleMessage); + leaseManager.remove(littleMessage, true); bigMessage.length = defaultOptions.subscription.maxOutstandingBytes * 2; leaseManager.add(bigMessage as {} as Message); assert.strictEqual(leaseManager.isFull(), true);