From 37be5b97571cb14e20cd806246327e1a00927b93 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Wed, 11 Sep 2024 16:59:51 -0400 Subject: [PATCH] fix: do the maxMessages and maxBytes checks _before_ adding --- src/message-queues.ts | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/message-queues.ts b/src/message-queues.ts index b660012a4..a08330a36 100644 --- a/src/message-queues.ts +++ b/src/message-queues.ts @@ -202,7 +202,18 @@ export abstract class MessageQueue { } const {maxMessages, maxMilliseconds} = this._options; + const size = Buffer.byteLength(message.ackId, 'utf8'); + // If we will go over maxMessages or MAX_BATCH_BYTES by adding this + // message, flush first. (maxMilliseconds is handled by timers.) + if ( + this._requests.length + 1 >= maxMessages! || + this.bytes + size >= MAX_BATCH_BYTES + ) { + this.flush(); + } + + // Add the message to the current batch. const responsePromise = defer(); this._requests.push({ message: { @@ -215,14 +226,10 @@ export abstract class MessageQueue { }); this.numPendingRequests++; this.numInFlightRequests++; - this.bytes += Buffer.byteLength(message.ackId, 'utf8'); + this.bytes += size; - if ( - this._requests.length >= maxMessages! || - this.bytes >= MAX_BATCH_BYTES - ) { - this.flush(); - } else if (!this._timer) { + // Ensure that we are counting toward maxMilliseconds by timer. + if (!this._timer) { this._timer = setTimeout(() => this.flush(), maxMilliseconds!); }