diff --git a/src/lease-manager.ts b/src/lease-manager.ts index d31c7111f..69e51ec24 100644 --- a/src/lease-manager.ts +++ b/src/lease-manager.ts @@ -17,6 +17,7 @@ import {EventEmitter} from 'events'; import {AckError, Message, Subscriber} from './subscriber'; import {defaultOptions} from './default-options'; +import defer = require('p-defer'); export interface FlowControlOptions { allowExcessMessages?: boolean; @@ -65,6 +66,7 @@ export class LeaseManager extends EventEmitter { private _pending: Message[]; private _subscriber: Subscriber; private _timer?: NodeJS.Timeout; + private _onDrain?: defer.DeferredPromise; constructor(sub: Subscriber, options = {}) { super(); @@ -119,6 +121,18 @@ export class LeaseManager extends EventEmitter { this.emit('full'); } } + + /** + * Only clear the messages that are not currently in-flight, helps with + * graceful shutdown. + */ + clearNonDispensedMessages(): void { + for (const message of this._messages) { + if (!message.dispensed) { + this.remove(message); + } + } + } /** * Removes ALL messages from inventory. * @private @@ -146,6 +160,15 @@ export class LeaseManager extends EventEmitter { const {maxBytes, maxMessages} = this._options; return this.size >= maxMessages! || this.bytes >= maxBytes!; } + /** + * Returns a promise that resolves when all messages have drained. + */ + onDrain(): Promise { + if (!this._onDrain) { + this._onDrain = defer(); + } + return this._onDrain.promise; + } /** * Removes a message from the inventory. Stopping the deadline extender if no * messages are left over. @@ -174,8 +197,15 @@ export class LeaseManager extends EventEmitter { this._dispense(this._pending.shift()!); } - if (this.size === 0 && this._isLeasing) { - this._cancelExtension(); + if (this.size === 0) { + if (this._isLeasing) { + this._cancelExtension(); + } + + if (this._onDrain) { + this._onDrain.resolve(); + delete this._onDrain; + } } } /** @@ -240,7 +270,10 @@ export class LeaseManager extends EventEmitter { */ private _dispense(message: Message): void { if (this._subscriber.isOpen) { - process.nextTick(() => this._subscriber.emit('message', message)); + message.dispensed = true; + process.nextTick(() => { + this._subscriber.emit('message', message); + }); } } /** diff --git a/src/subscriber.ts b/src/subscriber.ts index fe5defc8e..8256285c4 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -100,6 +100,7 @@ export class Message { orderingKey?: string; publishTime: PreciseDate; received: number; + dispensed: boolean; private _handled: boolean; private _length: number; private _subscriber: Subscriber; @@ -182,6 +183,11 @@ export class Message { */ this.received = Date.now(); + /** + * Indicates that a message is currently being processed by user code. + */ + this.dispensed = false; + this._handled = false; this._length = this.data.length; this._subscriber = sub; @@ -563,6 +569,7 @@ export class Subscriber extends EventEmitter { */ async ack(message: Message): Promise { const ackTimeSeconds = (Date.now() - message.received) / 1000; + this.updateAckDeadline(ackTimeSeconds); // Ignore this in this version of the method (but hook catch @@ -607,7 +614,9 @@ export class Subscriber extends EventEmitter { this.isOpen = false; this._stream.destroy(); - this._inventory.clear(); + + // Clear only the messages that have not begun processing + this._inventory.clearNonDispensedMessages(); await this._waitForFlush(); @@ -902,8 +911,13 @@ export class Subscriber extends EventEmitter { * @returns {Promise} */ private async _waitForFlush(): Promise { - const promises: Array> = []; + // If there are messages in flight, lets wait for them to drain so that we can then + // wait on their ACKs. + if (this._inventory.size) { + await this._inventory.onDrain(); + } + const promises: Array> = []; if (this._acks.numPendingRequests) { promises.push(this._acks.onFlush()); this._acks.flush(); diff --git a/test/lease-manager.ts b/test/lease-manager.ts index 2ceb17cf7..b9a8305f5 100644 --- a/test/lease-manager.ts +++ b/test/lease-manager.ts @@ -50,6 +50,7 @@ class FakeSubscriber extends EventEmitter { class FakeMessage { length = 20; received: number; + dispensed = false; constructor() { this.received = Date.now(); } @@ -169,6 +170,18 @@ describe('LeaseManager', () => { leaseManager.add(fakeMessage); }); + it('should set dispensed to true after dispatching message', done => { + const fakeMessage = new FakeMessage() as {} as Message; + + leaseManager.setOptions({allowExcessMessages: false}); + leaseManager.add(fakeMessage); + + subscriber.on('message', message => { + assert.strictEqual(message.dispensed, true); + done(); + }); + }); + it('should not dispatch the message if the inventory is full', done => { const fakeMessage = new FakeMessage() as {} as Message; @@ -387,6 +400,28 @@ describe('LeaseManager', () => { }); }); + describe('clearNonDispensedMessages', () => { + it('should only clear messages with dispensed=false', () => { + leaseManager.setOptions({ + maxMessages: 2, + maxBytes: 10, + allowExcessMessages: false, + }); + + const message = new FakeMessage(); + const message2 = new FakeMessage(); + + message.dispensed = true; + message2.dispensed = false; + leaseManager.add(message as {} as Message); + leaseManager.add(message2 as {} as Message); + + leaseManager.clearNonDispensedMessages(); + assert.strictEqual(leaseManager.size, 1); + assert.strictEqual(leaseManager.bytes, message.length); + }); + }); + describe('isFull', () => { it('should return true if the maxMessages threshold is hit', () => { const maxMessages = 1; @@ -506,6 +541,19 @@ describe('LeaseManager', () => { assert.strictEqual(stub.callCount, 0); }); + + it('should resolve onDrain if no messages are left', done => { + const message = new FakeMessage() as {} as Message; + + const onDrain = leaseManager.onDrain(); + + onDrain.then(() => { + done(); + }); + + leaseManager.add(message); + leaseManager.remove(message); + }); }); describe('setOptions', () => { diff --git a/test/subscriber.ts b/test/subscriber.ts index cab00e286..e4a778498 100644 --- a/test/subscriber.ts +++ b/test/subscriber.ts @@ -90,6 +90,11 @@ class FakeLeaseManager extends EventEmitter { // eslint-disable-next-line @typescript-eslint/no-unused-vars add(message: s.Message): void {} clear(): void {} + clearNonDispensedMessages(): void {} + async onDrain(): Promise {} + get size(): number { + return 0; + } // eslint-disable-next-line @typescript-eslint/no-unused-vars remove(message: s.Message): void {} } @@ -543,9 +548,9 @@ describe('Subscriber', () => { assert.strictEqual(stub.callCount, 1); }); - it('should clear the inventory', () => { + it('should clear the inventory of non-dispensed messages', () => { const inventory: FakeLeaseManager = stubs.get('inventory'); - const stub = sandbox.stub(inventory, 'clear'); + const stub = sandbox.stub(inventory, 'clearNonDispensedMessages'); subscriber.close(); assert.strictEqual(stub.callCount, 1); @@ -621,6 +626,26 @@ describe('Subscriber', () => { assert.strictEqual(ackOnDrain.callCount, 1); assert.strictEqual(modAckOnDrain.callCount, 1); }); + + it('should wait for dispensed messages to drain', async () => { + const inventory: FakeLeaseManager = stubs.get('inventory'); + const inventoryOnDrain = sandbox.stub(inventory, 'onDrain'); + + const ackQueue: FakeAckQueue = stubs.get('ackQueue'); + const modAckQueue: FakeModAckQueue = stubs.get('modAckQueue'); + const ackOnDrain = sandbox.stub(ackQueue, 'onDrain').resolves(); + const modAckOnDrain = sandbox.stub(modAckQueue, 'onDrain').resolves(); + + ackQueue.numInFlightRequests = 1; + modAckQueue.numInFlightRequests = 1; + sandbox.stub(inventory, 'size').get(() => 1); + + await subscriber.close(); + + assert.strictEqual(ackOnDrain.callCount, 1); + assert.strictEqual(modAckOnDrain.callCount, 1); + assert.strictEqual(inventoryOnDrain.callCount, 1); + }); }); });