Skip to content

Commit

Permalink
Merge pull request #1 from zaplify/1213-fix-expired-messages-breaking…
Browse files Browse the repository at this point in the history
…-max-messages-limit

fix: googleapis#1213 Pubsub emits more messages then allowed by maxMessages and allowExcessMessages after reaching maxExtension period
  • Loading branch information
adrian-borodziuk authored Aug 18, 2023
2 parents 1ed45ff + 6dd3997 commit f7a23c4
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 13 deletions.
9 changes: 5 additions & 4 deletions src/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ export class LeaseManager extends EventEmitter {
* @fires LeaseManager#free
*
* @param {Message} message The message to remove.
* @param {boolean} runNextInQueue Should dispense next awaiting message
* @private
*/
remove(message: Message): void {
remove(message: Message, runNextInQueue: boolean): void {
if (!this._messages.has(message)) {
return;
}
Expand All @@ -170,7 +171,7 @@ export class LeaseManager extends EventEmitter {
} else if (this._pending.includes(message)) {
const index = this._pending.indexOf(message);
this._pending.splice(index, 1);
} else if (this.pending > 0) {
} else if (runNextInQueue && this.pending > 0) {
this._dispense(this._pending.shift()!);
}

Expand Down Expand Up @@ -262,13 +263,13 @@ export class LeaseManager extends EventEmitter {
// In the case of a permanent failure (temporary failures are retried),
// we need to stop trying to lease-manage the message.
message.ackFailed(e as AckError);
this.remove(message);
this.remove(message, false);
});
} else {
message.modAck(deadline);
}
} else {
this.remove(message);
this.remove(message, false);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ export class Subscriber extends EventEmitter {
resultPromise.catch(() => {});

await this._acks.onFlush();
this._inventory.remove(message);
this._inventory.remove(message, true);
}

/**
Expand Down Expand Up @@ -686,7 +686,7 @@ export class Subscriber extends EventEmitter {
*/
async nack(message: Message): Promise<void> {
await this.modAck(message, 0);
this._inventory.remove(message);
this._inventory.remove(message, true);
}

/**
Expand Down
14 changes: 7 additions & 7 deletions test/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ describe('LeaseManager', () => {
const message = new FakeMessage();

leaseManager.add(message as {} as Message);
leaseManager.remove(new FakeMessage() as {} as Message);
leaseManager.remove(new FakeMessage() as {} as Message, false);

assert.strictEqual(leaseManager.size, 1);
assert.strictEqual(leaseManager.bytes, message.length);
Expand All @@ -435,7 +435,7 @@ describe('LeaseManager', () => {
const message = new FakeMessage() as {} as Message;

leaseManager.add(message);
leaseManager.remove(message);
leaseManager.remove(message, false);

assert.strictEqual(leaseManager.size, 0);
assert.strictEqual(leaseManager.bytes, 0);
Expand All @@ -446,7 +446,7 @@ describe('LeaseManager', () => {

leaseManager.setOptions({maxMessages: 1});
leaseManager.add(message);
setImmediate(() => leaseManager.remove(message));
setImmediate(() => leaseManager.remove(message, false));

leaseManager.on('free', () => {
assert.strictEqual(leaseManager.size, 0);
Expand All @@ -467,7 +467,7 @@ describe('LeaseManager', () => {

leaseManager.add(new FakeMessage() as {} as Message);
leaseManager.add(pending);
leaseManager.remove(pending);
leaseManager.remove(pending, true);

assert.strictEqual(leaseManager.pending, 0);
setImmediate(done);
Expand All @@ -491,7 +491,7 @@ describe('LeaseManager', () => {

leaseManager.add(temp);
leaseManager.add(pending);
leaseManager.remove(temp);
leaseManager.remove(temp, true);
});

it('should cancel any extensions if no messages are left', () => {
Expand All @@ -500,7 +500,7 @@ describe('LeaseManager', () => {
const stub = sandbox.stub(subscriber, 'modAck').resolves();

leaseManager.add(message);
leaseManager.remove(message);
leaseManager.remove(message, true);

clock.tick(subscriber.ackDeadline * 1000 * 2);

Expand All @@ -518,7 +518,7 @@ describe('LeaseManager', () => {
leaseManager.add(littleMessage);
assert.strictEqual(leaseManager.isFull(), false);

leaseManager.remove(littleMessage);
leaseManager.remove(littleMessage, true);
bigMessage.length = defaultOptions.subscription.maxOutstandingBytes * 2;
leaseManager.add(bigMessage as {} as Message);
assert.strictEqual(leaseManager.isFull(), true);
Expand Down

0 comments on commit f7a23c4

Please sign in to comment.