Skip to content

Commit

Permalink
fix: do the maxMessages and maxBytes checks _before_ adding
Browse files Browse the repository at this point in the history
  • Loading branch information
feywind committed Sep 11, 2024
1 parent 9687928 commit 37be5b9
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,18 @@ export abstract class MessageQueue {
}

const {maxMessages, maxMilliseconds} = this._options;
const size = Buffer.byteLength(message.ackId, 'utf8');

// If we will go over maxMessages or MAX_BATCH_BYTES by adding this
// message, flush first. (maxMilliseconds is handled by timers.)
if (
this._requests.length + 1 >= maxMessages! ||
this.bytes + size >= MAX_BATCH_BYTES
) {
this.flush();
}

// Add the message to the current batch.
const responsePromise = defer<void>();
this._requests.push({
message: {
Expand All @@ -215,14 +226,10 @@ export abstract class MessageQueue {
});
this.numPendingRequests++;
this.numInFlightRequests++;
this.bytes += Buffer.byteLength(message.ackId, 'utf8');
this.bytes += size;

if (
this._requests.length >= maxMessages! ||
this.bytes >= MAX_BATCH_BYTES
) {
this.flush();
} else if (!this._timer) {
// Ensure that we are counting toward maxMilliseconds by timer.
if (!this._timer) {
this._timer = setTimeout(() => this.flush(), maxMilliseconds!);
}

Expand Down

0 comments on commit 37be5b9

Please sign in to comment.