Skip to content

Commit

Permalink
Immediately trigger a batch send on root run end in JS (#441)
Browse files Browse the repository at this point in the history
@nfcampos @hinthornw

---------

Co-authored-by: William FH <[email protected]>
  • Loading branch information
jacoblee93 and hinthornw authored Feb 15, 2024
1 parent ebb61b6 commit fbdcbd9
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 66 deletions.
124 changes: 81 additions & 43 deletions js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,38 @@ function assertUuid(str: string): void {
}
}

export class Queue<T> {
items: [T, () => void][] = [];

get size() {
return this.items.length;
}

push(item: T): Promise<void> {
// this.items.push is synchronous with promise creation:
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/Promise
return new Promise<void>((resolve) => {
this.items.push([item, resolve]);
});
}

pop(upToN: number): [T[], () => void] {
if (upToN < 1) {
throw new Error("Number of items to pop off may not be less than 1.");
}
const popped: typeof this.items = [];
while (popped.length < upToN && this.items.length) {
const item = this.items.shift();
if (item) {
popped.push(item);
} else {
break;
}
}
return [popped.map((it) => it[0]), () => popped.forEach((it) => it[1]())];
}
}

export class Client {
private apiKey?: string;

Expand All @@ -241,11 +273,11 @@ export class Client {

private sampledPostUuids = new Set();

private autoBatchTracing = false;
private autoBatchTracing = true;

private batchEndpointSupported?: boolean;

private pendingAutoBatchedRuns: AutoBatchQueueItem[] = [];
private autoBatchQueue = new Queue<AutoBatchQueueItem>();

private pendingAutoBatchedRunLimit = 100;

Expand Down Expand Up @@ -487,57 +519,56 @@ export class Client {
}
}

private async triggerAutoBatchSend(runs?: AutoBatchQueueItem[]) {
let batch = runs;
if (batch === undefined) {
batch = this.pendingAutoBatchedRuns.slice(
0,
this.pendingAutoBatchedRunLimit
);
this.pendingAutoBatchedRuns = this.pendingAutoBatchedRuns.slice(
private async drainAutoBatchQueue() {
while (this.autoBatchQueue.size >= 0) {
const [batch, done] = this.autoBatchQueue.pop(
this.pendingAutoBatchedRunLimit
);
if (!batch.length) {
done();
return;
}
try {
await this.batchIngestRuns({
runCreates: batch
.filter((item) => item.action === "create")
.map((item) => item.item) as RunCreate[],
runUpdates: batch
.filter((item) => item.action === "update")
.map((item) => item.item) as RunUpdate[],
});
} finally {
done();
}
}
await this.batchIngestRuns({
runCreates: batch
.filter((item) => item.action === "create")
.map((item) => item.item) as RunCreate[],
runUpdates: batch
.filter((item) => item.action === "update")
.map((item) => item.item) as RunUpdate[],
});
}

private appendRunCreateToAutoBatchQueue(item: AutoBatchQueueItem) {
private async processRunOperation(
item: AutoBatchQueueItem,
immediatelyTriggerBatch?: boolean
) {
const oldTimeout = this.autoBatchTimeout;
clearTimeout(this.autoBatchTimeout);
this.autoBatchTimeout = undefined;
this.pendingAutoBatchedRuns.push(item);
while (
this.pendingAutoBatchedRuns.length >= this.pendingAutoBatchedRunLimit
const itemPromise = this.autoBatchQueue.push(item);
if (
immediatelyTriggerBatch ||
this.autoBatchQueue.size > this.pendingAutoBatchedRunLimit
) {
const batch = this.pendingAutoBatchedRuns.slice(
0,
this.pendingAutoBatchedRunLimit
);
this.pendingAutoBatchedRuns = this.pendingAutoBatchedRuns.slice(
this.pendingAutoBatchedRunLimit
);
void this.triggerAutoBatchSend(batch);
await this.drainAutoBatchQueue();
}
if (this.pendingAutoBatchedRuns.length > 0) {
if (!oldTimeout) {
this.autoBatchTimeout = setTimeout(() => {
this.autoBatchTimeout = undefined;
void this.triggerAutoBatchSend();
}, this.autoBatchInitialDelayMs);
} else {
this.autoBatchTimeout = setTimeout(() => {
if (this.autoBatchQueue.size > 0) {
this.autoBatchTimeout = setTimeout(
() => {
this.autoBatchTimeout = undefined;
void this.triggerAutoBatchSend();
}, this.autoBatchAggregationDelayMs);
}
void this.drainAutoBatchQueue();
},
oldTimeout
? this.autoBatchAggregationDelayMs
: this.autoBatchInitialDelayMs
);
}
return itemPromise;
}

protected async batchEndpointIsSupported() {
Expand Down Expand Up @@ -573,7 +604,7 @@ export class Client {
runCreate.trace_id !== undefined &&
runCreate.dotted_order !== undefined
) {
this.appendRunCreateToAutoBatchQueue({
void this.processRunOperation({
action: "create",
item: runCreate,
});
Expand Down Expand Up @@ -705,7 +736,14 @@ export class Client {
data.trace_id !== undefined &&
data.dotted_order !== undefined
) {
this.appendRunCreateToAutoBatchQueue({ action: "update", item: data });
if (run.end_time !== undefined && data.parent_run_id === undefined) {
// Trigger a batch as soon as a root trace ends and block to ensure trace finishes
// in serverless environments.
await this.processRunOperation({ action: "update", item: data }, true);
return;
} else {
void this.processRunOperation({ action: "update", item: data });
}
return;
}
const headers = { ...this.headers, "Content-Type": "application/json" };
Expand Down
2 changes: 1 addition & 1 deletion js/src/tests/batch_client.int.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async function waitUntilRunFound(
}
},
30_000,
1_000
5_000
);
}

Expand Down
44 changes: 24 additions & 20 deletions js/src/tests/batch_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ describe("Batch client tracing", () => {
);
});

it("should create an example with the given input and generation", async () => {
it("should immediately trigger a batch on root run end", async () => {
const client = new Client({
apiKey: "test-api-key",
autoBatchTracing: true,
Expand Down Expand Up @@ -160,10 +160,12 @@ describe("Batch client tracing", () => {
dotted_order: dottedOrder,
});

// Wait for first batch to send
await new Promise((resolve) => setTimeout(resolve, 300));

const endTime = Math.floor(new Date().getTime() / 1000);

// A root run finishing triggers the second batch
await client.updateRun(runId, {
outputs: { output: ["Hi"] },
dotted_order: dottedOrder,
Expand All @@ -177,6 +179,7 @@ describe("Batch client tracing", () => {
runId2
);

// Will send in a third batch, even though it's triggered around the same time as the update
await client.createRun({
id: runId2,
project_name: projectName,
Expand All @@ -191,6 +194,7 @@ describe("Batch client tracing", () => {

const calledRequestParam: any = callSpy.mock.calls[0][2];
const calledRequestParam2: any = callSpy.mock.calls[1][2];
const calledRequestParam3: any = callSpy.mock.calls[2][2];
expect(JSON.parse(calledRequestParam?.body)).toEqual({
post: [
expect.objectContaining({
Expand All @@ -207,17 +211,7 @@ describe("Batch client tracing", () => {
});

expect(JSON.parse(calledRequestParam2?.body)).toEqual({
post: [
expect.objectContaining({
id: runId2,
run_type: "llm",
inputs: {
text: "hello world 2",
},
trace_id: runId2,
dotted_order: dottedOrder2,
}),
],
post: [],
patch: [
expect.objectContaining({
id: runId,
Expand All @@ -230,6 +224,20 @@ describe("Batch client tracing", () => {
}),
],
});
expect(JSON.parse(calledRequestParam3?.body)).toEqual({
post: [
expect.objectContaining({
id: runId2,
run_type: "llm",
inputs: {
text: "hello world 2",
},
trace_id: runId2,
dotted_order: dottedOrder2,
}),
],
patch: [],
});
});

it("should send traces above the batch size and see even batches", async () => {
Expand Down Expand Up @@ -272,9 +280,10 @@ describe("Batch client tracing", () => {
await new Promise((resolve) => setTimeout(resolve, 10));

const calledRequestParam: any = callSpy.mock.calls[0][2];
// Second batch should still be pending
expect(callSpy.mock.calls[1]).toBeUndefined();
// First batch should fire as soon as it hits 10
const calledRequestParam2: any = callSpy.mock.calls[1][2];

// Queue should drain as soon as size limit is reached,
// sending both batches
expect(JSON.parse(calledRequestParam?.body)).toEqual({
post: runIds.slice(0, 10).map((runId, i) =>
expect.objectContaining({
Expand All @@ -289,11 +298,6 @@ describe("Batch client tracing", () => {
patch: [],
});

// Wait for the aggregation delay
await new Promise((resolve) => setTimeout(resolve, 100));

const calledRequestParam2: any = callSpy.mock.calls[1][2];

expect(JSON.parse(calledRequestParam2?.body)).toEqual({
post: runIds.slice(10).map((runId, i) =>
expect.objectContaining({
Expand Down
2 changes: 1 addition & 1 deletion js/src/tests/client.int.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async function waitUntilRunFound(
}
},
180_000,
1_000
5_000
);
}

Expand Down
2 changes: 1 addition & 1 deletion js/src/tests/run_trees.int.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async function pollRunsUntilCount(
}
},
120_000, // Wait up to 120 seconds
3000 // every 3 second
5000 // every 5 second
);
}

Expand Down

0 comments on commit fbdcbd9

Please sign in to comment.