Skip to content

Commit

Permalink
Drain batch queue immediately when limit is hit
Browse files Browse the repository at this point in the history
  • Loading branch information
jacoblee93 committed Feb 14, 2024
1 parent 85dd64d commit bafc07a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 33 deletions.
54 changes: 29 additions & 25 deletions js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ export class Queue<T> {
}

push(item: T): Promise<void> {
// this.items.push is synchronous with promise creation:
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/Promise
return new Promise<void>((resolve) => {
this.items.push([item, resolve]);
});
Expand Down Expand Up @@ -516,25 +518,27 @@ export class Client {
}
}

private async triggerAutoBatchSend() {
const [batch, done] = this.autoBatchQueue.pop(
this.pendingAutoBatchedRunLimit
);
if (!batch.length) {
done();
return;
}
try {
await this.batchIngestRuns({
runCreates: batch
.filter((item) => item.action === "create")
.map((item) => item.item) as RunCreate[],
runUpdates: batch
.filter((item) => item.action === "update")
.map((item) => item.item) as RunUpdate[],
});
} finally {
done();
private async drainAutoBatchQueue() {
while (this.autoBatchQueue.size >= 0) {
const [batch, done] = this.autoBatchQueue.pop(
this.pendingAutoBatchedRunLimit
);
if (!batch.length) {
done();
return;
}
try {
await this.batchIngestRuns({
runCreates: batch
.filter((item) => item.action === "create")
.map((item) => item.item) as RunCreate[],
runUpdates: batch
.filter((item) => item.action === "update")
.map((item) => item.item) as RunUpdate[],
});
} finally {
done();
}
}
}

Expand All @@ -546,17 +550,17 @@ export class Client {
clearTimeout(this.autoBatchTimeout);
this.autoBatchTimeout = undefined;
const itemPromise = this.autoBatchQueue.push(item);
if (immediatelyTriggerBatch) {
await this.triggerAutoBatchSend();
}
while (this.autoBatchQueue.size >= this.pendingAutoBatchedRunLimit) {
await this.triggerAutoBatchSend();
if (
immediatelyTriggerBatch ||
this.autoBatchQueue.size > this.pendingAutoBatchedRunLimit
) {
await this.drainAutoBatchQueue();
}
if (this.autoBatchQueue.size > 0) {
this.autoBatchTimeout = setTimeout(
() => {
this.autoBatchTimeout = undefined;
void this.triggerAutoBatchSend();
void this.drainAutoBatchQueue();
},
oldTimeout
? this.autoBatchAggregationDelayMs
Expand Down
12 changes: 4 additions & 8 deletions js/src/tests/batch_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,10 @@ describe("Batch client tracing", () => {
await new Promise((resolve) => setTimeout(resolve, 10));

const calledRequestParam: any = callSpy.mock.calls[0][2];
// Second batch should still be pending
expect(callSpy.mock.calls[1]).toBeUndefined();
// First batch should fire as soon as it hits 10
const calledRequestParam2: any = callSpy.mock.calls[1][2];

// Queue should drain as soon as size limit is reached,
// sending both batches
expect(JSON.parse(calledRequestParam?.body)).toEqual({
post: runIds.slice(0, 10).map((runId, i) =>
expect.objectContaining({
Expand All @@ -297,11 +298,6 @@ describe("Batch client tracing", () => {
patch: [],
});

// Wait for the aggregation delay
await new Promise((resolve) => setTimeout(resolve, 100));

const calledRequestParam2: any = callSpy.mock.calls[1][2];

expect(JSON.parse(calledRequestParam2?.body)).toEqual({
post: runIds.slice(10).map((runId, i) =>
expect.objectContaining({
Expand Down

0 comments on commit bafc07a

Please sign in to comment.