From fbdcbd9fe826a01808479a13c85e27ab95e02974 Mon Sep 17 00:00:00 2001 From: Jacob Lee Date: Wed, 14 Feb 2024 23:31:38 -0800 Subject: [PATCH] Immediately trigger a batch send on root run end in JS (#441) @nfcampos @hinthornw --------- Co-authored-by: William FH <13333726+hinthornw@users.noreply.github.com> --- js/src/client.ts | 124 +++++++++++++++++--------- js/src/tests/batch_client.int.test.ts | 2 +- js/src/tests/batch_client.test.ts | 44 ++++----- js/src/tests/client.int.test.ts | 2 +- js/src/tests/run_trees.int.test.ts | 2 +- 5 files changed, 108 insertions(+), 66 deletions(-) diff --git a/js/src/client.ts b/js/src/client.ts index 743899c0d..2d4e98717 100644 --- a/js/src/client.ts +++ b/js/src/client.ts @@ -220,6 +220,38 @@ function assertUuid(str: string): void { } } +export class Queue { + items: [T, () => void][] = []; + + get size() { + return this.items.length; + } + + push(item: T): Promise { + // this.items.push is synchronous with promise creation: + // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/Promise + return new Promise((resolve) => { + this.items.push([item, resolve]); + }); + } + + pop(upToN: number): [T[], () => void] { + if (upToN < 1) { + throw new Error("Number of items to pop off may not be less than 1."); + } + const popped: typeof this.items = []; + while (popped.length < upToN && this.items.length) { + const item = this.items.shift(); + if (item) { + popped.push(item); + } else { + break; + } + } + return [popped.map((it) => it[0]), () => popped.forEach((it) => it[1]())]; + } +} + export class Client { private apiKey?: string; @@ -241,11 +273,11 @@ export class Client { private sampledPostUuids = new Set(); - private autoBatchTracing = false; + private autoBatchTracing = true; private batchEndpointSupported?: boolean; - private pendingAutoBatchedRuns: AutoBatchQueueItem[] = []; + private autoBatchQueue = new Queue(); private pendingAutoBatchedRunLimit = 100; @@ -487,57 +519,56 @@ export class Client { } } - private async triggerAutoBatchSend(runs?: AutoBatchQueueItem[]) { - let batch = runs; - if (batch === undefined) { - batch = this.pendingAutoBatchedRuns.slice( - 0, - this.pendingAutoBatchedRunLimit - ); - this.pendingAutoBatchedRuns = this.pendingAutoBatchedRuns.slice( + private async drainAutoBatchQueue() { + while (this.autoBatchQueue.size >= 0) { + const [batch, done] = this.autoBatchQueue.pop( this.pendingAutoBatchedRunLimit ); + if (!batch.length) { + done(); + return; + } + try { + await this.batchIngestRuns({ + runCreates: batch + .filter((item) => item.action === "create") + .map((item) => item.item) as RunCreate[], + runUpdates: batch + .filter((item) => item.action === "update") + .map((item) => item.item) as RunUpdate[], + }); + } finally { + done(); + } } - await this.batchIngestRuns({ - runCreates: batch - .filter((item) => item.action === "create") - .map((item) => item.item) as RunCreate[], - runUpdates: batch - .filter((item) => item.action === "update") - .map((item) => item.item) as RunUpdate[], - }); } - private appendRunCreateToAutoBatchQueue(item: AutoBatchQueueItem) { + private async processRunOperation( + item: AutoBatchQueueItem, + immediatelyTriggerBatch?: boolean + ) { const oldTimeout = this.autoBatchTimeout; clearTimeout(this.autoBatchTimeout); this.autoBatchTimeout = undefined; - this.pendingAutoBatchedRuns.push(item); - while ( - this.pendingAutoBatchedRuns.length >= this.pendingAutoBatchedRunLimit + const itemPromise = this.autoBatchQueue.push(item); + if ( + immediatelyTriggerBatch || + this.autoBatchQueue.size > this.pendingAutoBatchedRunLimit ) { - const batch = this.pendingAutoBatchedRuns.slice( - 0, - this.pendingAutoBatchedRunLimit - ); - this.pendingAutoBatchedRuns = this.pendingAutoBatchedRuns.slice( - this.pendingAutoBatchedRunLimit - ); - void this.triggerAutoBatchSend(batch); + await this.drainAutoBatchQueue(); } - if (this.pendingAutoBatchedRuns.length > 0) { - if (!oldTimeout) { - this.autoBatchTimeout = setTimeout(() => { - this.autoBatchTimeout = undefined; - void this.triggerAutoBatchSend(); - }, this.autoBatchInitialDelayMs); - } else { - this.autoBatchTimeout = setTimeout(() => { + if (this.autoBatchQueue.size > 0) { + this.autoBatchTimeout = setTimeout( + () => { this.autoBatchTimeout = undefined; - void this.triggerAutoBatchSend(); - }, this.autoBatchAggregationDelayMs); - } + void this.drainAutoBatchQueue(); + }, + oldTimeout + ? this.autoBatchAggregationDelayMs + : this.autoBatchInitialDelayMs + ); } + return itemPromise; } protected async batchEndpointIsSupported() { @@ -573,7 +604,7 @@ export class Client { runCreate.trace_id !== undefined && runCreate.dotted_order !== undefined ) { - this.appendRunCreateToAutoBatchQueue({ + void this.processRunOperation({ action: "create", item: runCreate, }); @@ -705,7 +736,14 @@ export class Client { data.trace_id !== undefined && data.dotted_order !== undefined ) { - this.appendRunCreateToAutoBatchQueue({ action: "update", item: data }); + if (run.end_time !== undefined && data.parent_run_id === undefined) { + // Trigger a batch as soon as a root trace ends and block to ensure trace finishes + // in serverless environments. + await this.processRunOperation({ action: "update", item: data }, true); + return; + } else { + void this.processRunOperation({ action: "update", item: data }); + } return; } const headers = { ...this.headers, "Content-Type": "application/json" }; diff --git a/js/src/tests/batch_client.int.test.ts b/js/src/tests/batch_client.int.test.ts index 51a86a6e9..095ff0d05 100644 --- a/js/src/tests/batch_client.int.test.ts +++ b/js/src/tests/batch_client.int.test.ts @@ -48,7 +48,7 @@ async function waitUntilRunFound( } }, 30_000, - 1_000 + 5_000 ); } diff --git a/js/src/tests/batch_client.test.ts b/js/src/tests/batch_client.test.ts index 9b7a10568..7f69b8e49 100644 --- a/js/src/tests/batch_client.test.ts +++ b/js/src/tests/batch_client.test.ts @@ -129,7 +129,7 @@ describe("Batch client tracing", () => { ); }); - it("should create an example with the given input and generation", async () => { + it("should immediately trigger a batch on root run end", async () => { const client = new Client({ apiKey: "test-api-key", autoBatchTracing: true, @@ -160,10 +160,12 @@ describe("Batch client tracing", () => { dotted_order: dottedOrder, }); + // Wait for first batch to send await new Promise((resolve) => setTimeout(resolve, 300)); const endTime = Math.floor(new Date().getTime() / 1000); + // A root run finishing triggers the second batch await client.updateRun(runId, { outputs: { output: ["Hi"] }, dotted_order: dottedOrder, @@ -177,6 +179,7 @@ describe("Batch client tracing", () => { runId2 ); + // Will send in a third batch, even though it's triggered around the same time as the update await client.createRun({ id: runId2, project_name: projectName, @@ -191,6 +194,7 @@ describe("Batch client tracing", () => { const calledRequestParam: any = callSpy.mock.calls[0][2]; const calledRequestParam2: any = callSpy.mock.calls[1][2]; + const calledRequestParam3: any = callSpy.mock.calls[2][2]; expect(JSON.parse(calledRequestParam?.body)).toEqual({ post: [ expect.objectContaining({ @@ -207,17 +211,7 @@ describe("Batch client tracing", () => { }); expect(JSON.parse(calledRequestParam2?.body)).toEqual({ - post: [ - expect.objectContaining({ - id: runId2, - run_type: "llm", - inputs: { - text: "hello world 2", - }, - trace_id: runId2, - dotted_order: dottedOrder2, - }), - ], + post: [], patch: [ expect.objectContaining({ id: runId, @@ -230,6 +224,20 @@ describe("Batch client tracing", () => { }), ], }); + expect(JSON.parse(calledRequestParam3?.body)).toEqual({ + post: [ + expect.objectContaining({ + id: runId2, + run_type: "llm", + inputs: { + text: "hello world 2", + }, + trace_id: runId2, + dotted_order: dottedOrder2, + }), + ], + patch: [], + }); }); it("should send traces above the batch size and see even batches", async () => { @@ -272,9 +280,10 @@ describe("Batch client tracing", () => { await new Promise((resolve) => setTimeout(resolve, 10)); const calledRequestParam: any = callSpy.mock.calls[0][2]; - // Second batch should still be pending - expect(callSpy.mock.calls[1]).toBeUndefined(); - // First batch should fire as soon as it hits 10 + const calledRequestParam2: any = callSpy.mock.calls[1][2]; + + // Queue should drain as soon as size limit is reached, + // sending both batches expect(JSON.parse(calledRequestParam?.body)).toEqual({ post: runIds.slice(0, 10).map((runId, i) => expect.objectContaining({ @@ -289,11 +298,6 @@ describe("Batch client tracing", () => { patch: [], }); - // Wait for the aggregation delay - await new Promise((resolve) => setTimeout(resolve, 100)); - - const calledRequestParam2: any = callSpy.mock.calls[1][2]; - expect(JSON.parse(calledRequestParam2?.body)).toEqual({ post: runIds.slice(10).map((runId, i) => expect.objectContaining({ diff --git a/js/src/tests/client.int.test.ts b/js/src/tests/client.int.test.ts index bc5a97154..9509416b8 100644 --- a/js/src/tests/client.int.test.ts +++ b/js/src/tests/client.int.test.ts @@ -68,7 +68,7 @@ async function waitUntilRunFound( } }, 180_000, - 1_000 + 5_000 ); } diff --git a/js/src/tests/run_trees.int.test.ts b/js/src/tests/run_trees.int.test.ts index 749ebe2d7..2a0184d3b 100644 --- a/js/src/tests/run_trees.int.test.ts +++ b/js/src/tests/run_trees.int.test.ts @@ -43,7 +43,7 @@ async function pollRunsUntilCount( } }, 120_000, // Wait up to 120 seconds - 3000 // every 3 second + 5000 // every 5 second ); }