From cf733d4999961f5c37aab5edb66b8237599117fd Mon Sep 17 00:00:00 2001 From: Jacob Lee Date: Wed, 7 Feb 2024 01:06:59 -0800 Subject: [PATCH] Adds auto batch tracer support for JS (#406) CC @nfcampos @hinthornw Unit tests done. Supersedes #383 --- js/package.json | 1 + js/src/client.ts | 310 +++++++++++++++++++++++--- js/src/schemas.ts | 51 +++-- js/src/tests/batch_client.int.test.ts | 164 ++++++++++++++ js/src/tests/batch_client.test.ts | 289 ++++++++++++++++++++++++ js/src/tests/client.int.test.ts | 28 ++- 6 files changed, 788 insertions(+), 55 deletions(-) create mode 100644 js/src/tests/batch_client.int.test.ts create mode 100644 js/src/tests/batch_client.test.ts diff --git a/js/package.json b/js/package.json index a9506707d..a98eec527 100644 --- a/js/package.json +++ b/js/package.json @@ -2,6 +2,7 @@ "name": "langsmith", "version": "0.0.66", "description": "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform.", + "packageManager": "yarn@1.22.19", "files": [ "dist/", "client.cjs", diff --git a/js/src/client.ts b/js/src/client.ts index 181a5cd4d..07ddbd108 100644 --- a/js/src/client.ts +++ b/js/src/client.ts @@ -40,6 +40,8 @@ interface ClientConfig { webUrl?: string; hideInputs?: boolean; hideOutputs?: boolean; + autoBatchTracing?: boolean; + pendingAutoBatchedRunLimit?: number; } interface ListRunsParams { @@ -56,6 +58,7 @@ interface ListRunsParams { query?: string; filter?: string; } + interface UploadCSVParams { csvFile: Blob; fileName: string; @@ -105,9 +108,15 @@ interface CreateRunParams { parent_run_id?: string; project_name?: string; revision_id?: string; + trace_id?: string; + dotted_order?: string; +} + +interface UpdateRunParams extends RunUpdate { + id?: string; } -interface projectOptions { +interface ProjectOptions { projectName?: string; projectId?: string; } @@ -121,6 +130,51 @@ export type CreateExampleOptions = { exampleId?: string; }; +type AutoBatchQueueItem = { + action: "create" | "update"; + item: RunCreate | RunUpdate; +}; + +async function mergeRuntimeEnvIntoRunCreates(runs: RunCreate[]) { + const runtimeEnv = await getRuntimeEnvironment(); + const envVars = getLangChainEnvVarsMetadata(); + return runs.map((run) => { + const extra = run.extra ?? {}; + const metadata = extra.metadata; + run.extra = { + ...extra, + runtime: { + ...runtimeEnv, + ...extra?.runtime, + }, + metadata: { + ...envVars, + ...(envVars.revision_id || run.revision_id + ? { revision_id: run.revision_id ?? envVars.revision_id } + : {}), + ...metadata, + }, + }; + return run; + }); +} + +const getTracingSamplingRate = () => { + const samplingRateStr = getEnvironmentVariable( + "LANGCHAIN_TRACING_SAMPLING_RATE" + ); + if (samplingRateStr === undefined) { + return undefined; + } + const samplingRate = parseFloat(samplingRateStr); + if (samplingRate < 0 || samplingRate > 1) { + throw new Error( + `LANGCHAIN_TRACING_SAMPLING_RATE must be between 0 and 1 if set. Got: ${samplingRate}` + ); + } + return samplingRate; +}; + // utility functions const isLocalhost = (url: string): boolean => { const strippedUrl = url.replace("http://", "").replace("https://", ""); @@ -182,9 +236,26 @@ export class Client { private hideOutputs?: boolean; + private tracingSampleRate?: number; + + private sampledPostUuids = new Set(); + + private autoBatchTracing = true; + + private pendingAutoBatchedRuns: AutoBatchQueueItem[] = []; + + private pendingAutoBatchedRunLimit = 100; + + private autoBatchTimeout: ReturnType | undefined; + + private autoBatchInitialDelayMs = 250; + + private autoBatchAggregationDelayMs = 50; + constructor(config: ClientConfig = {}) { const defaultConfig = Client.getDefaultClientConfig(); + this.tracingSampleRate = getTracingSamplingRate(); this.apiUrl = trimQuotes(config.apiUrl ?? defaultConfig.apiUrl) ?? ""; this.apiKey = trimQuotes(config.apiKey ?? defaultConfig.apiKey); this.webUrl = trimQuotes(config.webUrl ?? defaultConfig.webUrl); @@ -193,6 +264,9 @@ export class Client { this.caller = new AsyncCaller(config.callerOptions ?? {}); this.hideInputs = config.hideInputs ?? defaultConfig.hideInputs; this.hideOutputs = config.hideOutputs ?? defaultConfig.hideOutputs; + this.autoBatchTracing = config.autoBatchTracing ?? this.autoBatchTracing; + this.pendingAutoBatchedRunLimit = + config.pendingAutoBatchedRunLimit ?? this.pendingAutoBatchedRunLimit; } public static getDefaultClientConfig(): { @@ -273,6 +347,21 @@ export class Client { return outputs; } + private prepareRunCreateOrUpdateInputs(run: RunUpdate): RunUpdate; + private prepareRunCreateOrUpdateInputs(run: RunCreate): RunCreate; + private prepareRunCreateOrUpdateInputs( + run: RunCreate | RunUpdate + ): RunCreate | RunUpdate { + const runParams = { ...run }; + if (runParams.inputs !== undefined) { + runParams.inputs = this.processInputs(runParams.inputs); + } + if (runParams.outputs !== undefined) { + runParams.outputs = this.processOutputs(runParams.outputs); + } + return runParams; + } + private async _getResponse( path: string, queryParams?: URLSearchParams @@ -366,47 +455,201 @@ export class Client { } } + private _filterForSampling( + runs: CreateRunParams[] | UpdateRunParams[], + patch = false + ) { + if (this.tracingSampleRate === undefined) { + return runs; + } + + if (patch) { + const sampled = []; + for (const run of runs) { + if (this.sampledPostUuids.has(run.id)) { + sampled.push(run); + this.sampledPostUuids.delete(run.id); + } + } + return sampled; + } else { + const sampled = []; + for (const run of runs) { + if (Math.random() < this.tracingSampleRate) { + sampled.push(run); + this.sampledPostUuids.add(run.id); + } + } + return sampled; + } + } + + private async triggerAutoBatchSend(runs?: AutoBatchQueueItem[]) { + let batch = runs; + if (batch === undefined) { + batch = this.pendingAutoBatchedRuns.slice( + 0, + this.pendingAutoBatchedRunLimit + ); + this.pendingAutoBatchedRuns = this.pendingAutoBatchedRuns.slice( + this.pendingAutoBatchedRunLimit + ); + } + await this.batchIngestRuns({ + runCreates: batch + .filter((item) => item.action === "create") + .map((item) => item.item) as RunCreate[], + runUpdates: batch + .filter((item) => item.action === "update") + .map((item) => item.item) as RunUpdate[], + }); + } + + private appendRunCreateToAutoBatchQueue(item: AutoBatchQueueItem) { + const oldTimeout = this.autoBatchTimeout; + clearTimeout(this.autoBatchTimeout); + this.autoBatchTimeout = undefined; + this.pendingAutoBatchedRuns.push(item); + while ( + this.pendingAutoBatchedRuns.length >= this.pendingAutoBatchedRunLimit + ) { + const batch = this.pendingAutoBatchedRuns.slice( + 0, + this.pendingAutoBatchedRunLimit + ); + this.pendingAutoBatchedRuns = this.pendingAutoBatchedRuns.slice( + this.pendingAutoBatchedRunLimit + ); + void this.triggerAutoBatchSend(batch); + } + if (this.pendingAutoBatchedRuns.length > 0) { + if (!oldTimeout) { + this.autoBatchTimeout = setTimeout(() => { + this.autoBatchTimeout = undefined; + void this.triggerAutoBatchSend(); + }, this.autoBatchInitialDelayMs); + } else { + this.autoBatchTimeout = setTimeout(() => { + this.autoBatchTimeout = undefined; + void this.triggerAutoBatchSend(); + }, this.autoBatchAggregationDelayMs); + } + } + } + public async createRun(run: CreateRunParams): Promise { + if (!this._filterForSampling([run]).length) { + return; + } const headers = { ...this.headers, "Content-Type": "application/json" }; - const extra = run.extra ?? {}; - const metadata = extra.metadata; - const runtimeEnv = await getRuntimeEnvironment(); - const envVars = getLangChainEnvVarsMetadata(); const session_name = run.project_name; delete run.project_name; - const runCreate: RunCreate = { + + const runCreate: RunCreate = this.prepareRunCreateOrUpdateInputs({ session_name, ...run, - extra: { - ...run.extra, - runtime: { - ...runtimeEnv, - ...extra.runtime, - }, - metadata: { - ...envVars, - ...(envVars.revision_id || run.revision_id - ? { revision_id: run.revision_id ?? envVars.revision_id } - : {}), - ...metadata, - }, - }, - }; - runCreate.inputs = this.processInputs(runCreate.inputs); - if (runCreate.outputs) { - runCreate.outputs = this.processOutputs(runCreate.outputs); + start_time: run.start_time ?? Date.now(), + }); + if ( + this.autoBatchTracing && + runCreate.trace_id !== undefined && + runCreate.dotted_order !== undefined + ) { + this.appendRunCreateToAutoBatchQueue({ + action: "create", + item: runCreate, + }); + return; } - runCreate.start_time = run.start_time ?? Date.now(); + const mergedRunCreateParams = await mergeRuntimeEnvIntoRunCreates([ + runCreate, + ]); const response = await this.caller.call(fetch, `${this.apiUrl}/runs`, { method: "POST", headers, - body: JSON.stringify(runCreate), + body: JSON.stringify(mergedRunCreateParams[0]), signal: AbortSignal.timeout(this.timeout_ms), }); await raiseForStatus(response, "create run"); } + /** + * Batch ingest/upsert multiple runs in the Langsmith system. + * @param runs + */ + public async batchIngestRuns({ + runCreates, + runUpdates, + }: { + runCreates?: RunCreate[]; + runUpdates?: RunUpdate[]; + }) { + if (runCreates === undefined && runUpdates === undefined) { + return; + } + let preparedCreateParams = + runCreates?.map((create) => + this.prepareRunCreateOrUpdateInputs(create) + ) ?? []; + let preparedUpdateParams = + runUpdates?.map((update) => + this.prepareRunCreateOrUpdateInputs(update) + ) ?? []; + + if (preparedCreateParams.length > 0 && preparedUpdateParams.length > 0) { + const createById = preparedCreateParams.reduce( + (params: Record, run) => { + if (!run.id) { + return params; + } + params[run.id] = run; + return params; + }, + {} + ); + const standaloneUpdates = []; + for (const updateParam of preparedUpdateParams) { + if (updateParam.id !== undefined && createById[updateParam.id]) { + createById[updateParam.id] = { + ...createById[updateParam.id], + ...updateParam, + }; + } else { + standaloneUpdates.push(updateParam); + } + } + preparedCreateParams = Object.values(createById); + preparedUpdateParams = standaloneUpdates; + } + const body = { + post: this._filterForSampling(preparedCreateParams), + patch: this._filterForSampling(preparedUpdateParams, true), + }; + if (!body.post.length && !body.patch.length) { + return; + } + preparedCreateParams = await mergeRuntimeEnvIntoRunCreates( + preparedCreateParams + ); + const headers = { + ...this.headers, + "Content-Type": "application/json", + Accept: "application/json", + }; + const response = await this.caller.call( + fetch, + `${this.apiUrl}/runs/batch`, + { + method: "POST", + headers, + body: JSON.stringify(body), + signal: AbortSignal.timeout(this.timeout_ms), + } + ); + await raiseForStatus(response, "batch create run"); + } + public async updateRun(runId: string, run: RunUpdate): Promise { assertUuid(runId); if (run.inputs) { @@ -416,6 +659,19 @@ export class Client { if (run.outputs) { run.outputs = this.processOutputs(run.outputs); } + // TODO: Untangle types + const data: UpdateRunParams = { ...run, id: runId }; + if (!this._filterForSampling([data], true).length) { + return; + } + if ( + this.autoBatchTracing && + data.trace_id !== undefined && + data.dotted_order !== undefined + ) { + this.appendRunCreateToAutoBatchQueue({ action: "update", item: data }); + return; + } const headers = { ...this.headers, "Content-Type": "application/json" }; const response = await this.caller.call( fetch, @@ -449,7 +705,7 @@ export class Client { }: { runId?: string; run?: Run; - projectOpts?: projectOptions; + projectOpts?: ProjectOptions; }): Promise { if (run !== undefined) { let sessionId: string; diff --git a/js/src/schemas.ts b/js/src/schemas.ts index 78c33aed6..84bf0cc42 100644 --- a/js/src/schemas.ts +++ b/js/src/schemas.ts @@ -105,6 +105,23 @@ export interface BaseRun { /** Tags for further categorizing or annotating the run. */ tags?: string[]; + + /** Unique ID assigned to every run within this nested trace. **/ + trace_id?: string; + + /** + * The dotted order for the run. + * + * This is a string composed of {time}{run-uuid}.* so that a trace can be + * sorted in the order it was executed. + * + * Example: + * - Parent: 20230914T223155647Z1b64098b-4ab7-43f6-afee-992304f198d8 + * - Children: + * - 20230914T223155647Z1b64098b-4ab7-43f6-afee-992304f198d8.20230914T223155649Z809ed3a2-0172-4f4d-8a02-a64e9b7a0f8a + * - 20230915T223155647Z1b64098b-4ab7-43f6-afee-992304f198d8.20230914T223155650Zc8d9f4c5-6c5a-4b2d-9b1c-3d9d7a7c5c7c + */ + dotted_order?: string; } /** @@ -150,30 +167,16 @@ export interface Run extends BaseRun { /** IDs of parent runs, if multiple exist. */ parent_run_ids?: string[]; - /**Unique ID assigned to every run within this nested trace.**/ - trace_id?: string; - - /** - * The dotted order for the run. - * - * This is a string composed of {time}{run-uuid}.* so that a trace can be - * sorted in the order it was executed. - * - * Example: - * - Parent: 20230914T223155647Z1b64098b-4ab7-43f6-afee-992304f198d8 - * - Children: - * - 20230914T223155647Z1b64098b-4ab7-43f6-afee-992304f198d8.20230914T223155649Z809ed3a2-0172-4f4d-8a02-a64e9b7a0f8a - * - 20230915T223155647Z1b64098b-4ab7-43f6-afee-992304f198d8.20230914T223155650Zc8d9f4c5-6c5a-4b2d-9b1c-3d9d7a7c5c7c - */ - dotted_order?: string; } export interface RunCreate extends BaseRun { + revision_id?: string; child_runs?: this[]; session_name?: string; } export interface RunUpdate { + id?: string; end_time?: number; extra?: KVMap; error?: string; @@ -183,6 +186,22 @@ export interface RunUpdate { reference_example_id?: string; events?: KVMap[]; session_id?: string; + /** Unique ID assigned to every run within this nested trace. **/ + trace_id?: string; + + /** + * The dotted order for the run. + * + * This is a string composed of {time}{run-uuid}.* so that a trace can be + * sorted in the order it was executed. + * + * Example: + * - Parent: 20230914T223155647Z1b64098b-4ab7-43f6-afee-992304f198d8 + * - Children: + * - 20230914T223155647Z1b64098b-4ab7-43f6-afee-992304f198d8.20230914T223155649Z809ed3a2-0172-4f4d-8a02-a64e9b7a0f8a + * - 20230915T223155647Z1b64098b-4ab7-43f6-afee-992304f198d8.20230914T223155650Zc8d9f4c5-6c5a-4b2d-9b1c-3d9d7a7c5c7c + */ + dotted_order?: string; } export interface ExampleCreate extends BaseExample { diff --git a/js/src/tests/batch_client.int.test.ts b/js/src/tests/batch_client.int.test.ts new file mode 100644 index 000000000..bab6aa183 --- /dev/null +++ b/js/src/tests/batch_client.int.test.ts @@ -0,0 +1,164 @@ +import { Client } from "../client.js"; +import { convertToDottedOrderFormat } from "../run_trees.js"; +import { v4 as uuidv4 } from "uuid"; + +async function deleteProject(langchainClient: Client, projectName: string) { + try { + await langchainClient.readProject({ projectName }); + await langchainClient.deleteProject({ projectName }); + } catch (e) { + // Pass + } +} + +async function waitUntil( + condition: () => Promise, + timeout: number, + interval: number +): Promise { + const start = Date.now(); + while (Date.now() - start < timeout) { + if (await condition()) { + return; + } + await new Promise((resolve) => setTimeout(resolve, interval)); + } + throw new Error("Timeout"); +} + +async function waitUntilRunFound( + client: Client, + runId: string, + checkOutputs = false +) { + return waitUntil( + async () => { + try { + const run = await client.readRun(runId); + if (checkOutputs) { + return ( + run.outputs !== null && + run.outputs !== undefined && + Object.keys(run.outputs).length !== 0 + ); + } + return true; + } catch (e) { + return false; + } + }, + 30_000, + 1_000 + ); +} + +test("Test persist update run", async () => { + const langchainClient = new Client({ + autoBatchTracing: true, + callerOptions: { maxRetries: 0 }, + }); + const projectName = "__test_persist_update_run_batch"; + await deleteProject(langchainClient, projectName); + + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + await langchainClient.createRun({ + id: runId, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: { text: "hello world" }, + trace_id: runId, + dotted_order: dottedOrder, + }); + + await langchainClient.updateRun(runId, { + outputs: { output: ["Hi"] }, + dotted_order: dottedOrder, + trace_id: runId, + }); + await waitUntilRunFound(langchainClient, runId, true); + const storedRun = await langchainClient.readRun(runId); + expect(storedRun.id).toEqual(runId); + await langchainClient.deleteProject({ projectName }); +}); + +test("Test persist update runs above the batch size limit", async () => { + const langchainClient = new Client({ + autoBatchTracing: true, + callerOptions: { maxRetries: 0 }, + pendingAutoBatchedRunLimit: 2, + }); + const projectName = "__test_persist_update_run_batch"; + await deleteProject(langchainClient, projectName); + + const createRun = async () => { + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + await langchainClient.createRun({ + id: runId, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: { text: "hello world" }, + trace_id: runId, + dotted_order: dottedOrder, + }); + + await langchainClient.updateRun(runId, { + outputs: { output: ["Hi"] }, + dotted_order: dottedOrder, + trace_id: runId, + end_time: Math.floor(new Date().getTime() / 1000), + }); + await waitUntilRunFound(langchainClient, runId, true); + const storedRun = await langchainClient.readRun(runId); + expect(storedRun.id).toEqual(runId); + }; + + await Promise.all([createRun(), createRun(), createRun()]); + + await langchainClient.deleteProject({ projectName }); +}); + +test("Test persist update run with delay", async () => { + const langchainClient = new Client({ + autoBatchTracing: true, + callerOptions: { maxRetries: 0 }, + }); + const projectName = "__test_persist_update_run_batch"; + await deleteProject(langchainClient, projectName); + + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + await langchainClient.createRun({ + id: runId, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: { text: "hello world" }, + trace_id: runId, + dotted_order: dottedOrder, + }); + + await new Promise((resolve) => setTimeout(resolve, 1000)); + await langchainClient.updateRun(runId, { + outputs: { output: ["Hi"] }, + dotted_order: dottedOrder, + trace_id: runId, + end_time: Math.floor(new Date().getTime() / 1000), + }); + await waitUntilRunFound(langchainClient, runId, true); + const storedRun = await langchainClient.readRun(runId); + expect(storedRun.id).toEqual(runId); + await langchainClient.deleteProject({ projectName }); +}); diff --git a/js/src/tests/batch_client.test.ts b/js/src/tests/batch_client.test.ts new file mode 100644 index 000000000..48c4e7f06 --- /dev/null +++ b/js/src/tests/batch_client.test.ts @@ -0,0 +1,289 @@ +import { jest } from "@jest/globals"; +import { v4 as uuidv4 } from "uuid"; +import { Client } from "../client.js"; +import { convertToDottedOrderFormat } from "../run_trees.js"; + +describe("Batch client tracing", () => { + it("should create a batched run with the given input", async () => { + const client = new Client({ apiKey: "test-api-key" }); + const callSpy = jest + .spyOn((client as any).caller, "call") + .mockResolvedValue({ + ok: true, + text: () => "", + }); + const projectName = "__test_batch"; + + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + await client.createRun({ + id: runId, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: { text: "hello world" }, + trace_id: runId, + dotted_order: dottedOrder, + }); + + await new Promise((resolve) => setTimeout(resolve, 300)); + + const calledRequestParam: any = callSpy.mock.calls[0][2]; + expect(JSON.parse(calledRequestParam?.body)).toEqual({ + post: [ + expect.objectContaining({ + id: runId, + run_type: "llm", + inputs: { + text: "hello world", + }, + trace_id: runId, + dotted_order: dottedOrder, + }), + ], + patch: [], + }); + + expect(callSpy).toHaveBeenCalledWith( + fetch, + "https://api.smith.langchain.com/runs/batch", + expect.objectContaining({ body: expect.any(String) }) + ); + }); + + it("Create + update batching should merge into a single call", async () => { + const client = new Client({ apiKey: "test-api-key" }); + const callSpy = jest + .spyOn((client as any).caller, "call") + .mockResolvedValue({ + ok: true, + text: () => "", + }); + const projectName = "__test_batch"; + + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + await client.createRun({ + id: runId, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: { text: "hello world" }, + trace_id: runId, + dotted_order: dottedOrder, + }); + + const endTime = Math.floor(new Date().getTime() / 1000); + + await client.updateRun(runId, { + outputs: { output: ["Hi"] }, + dotted_order: dottedOrder, + trace_id: runId, + end_time: endTime, + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + const calledRequestParam: any = callSpy.mock.calls[0][2]; + expect(JSON.parse(calledRequestParam?.body)).toEqual({ + post: [ + expect.objectContaining({ + id: runId, + run_type: "llm", + inputs: { + text: "hello world", + }, + outputs: { + output: ["Hi"], + }, + end_time: endTime, + trace_id: runId, + dotted_order: dottedOrder, + }), + ], + patch: [], + }); + + expect(callSpy).toHaveBeenCalledWith( + fetch, + "https://api.smith.langchain.com/runs/batch", + expect.objectContaining({ body: expect.any(String) }) + ); + }); + + it("should create an example with the given input and generation", async () => { + const client = new Client({ apiKey: "test-api-key" }); + const callSpy = jest + .spyOn((client as any).caller, "call") + .mockResolvedValue({ + ok: true, + text: () => "", + }); + const projectName = "__test_batch"; + + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + await client.createRun({ + id: runId, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: { text: "hello world" }, + trace_id: runId, + dotted_order: dottedOrder, + }); + + await new Promise((resolve) => setTimeout(resolve, 300)); + + const endTime = Math.floor(new Date().getTime() / 1000); + + await client.updateRun(runId, { + outputs: { output: ["Hi"] }, + dotted_order: dottedOrder, + trace_id: runId, + end_time: endTime, + }); + + const runId2 = uuidv4(); + const dottedOrder2 = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId2 + ); + + await client.createRun({ + id: runId2, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: { text: "hello world 2" }, + trace_id: runId2, + dotted_order: dottedOrder2, + }); + + await new Promise((resolve) => setTimeout(resolve, 300)); + + const calledRequestParam: any = callSpy.mock.calls[0][2]; + const calledRequestParam2: any = callSpy.mock.calls[1][2]; + expect(JSON.parse(calledRequestParam?.body)).toEqual({ + post: [ + expect.objectContaining({ + id: runId, + run_type: "llm", + inputs: { + text: "hello world", + }, + trace_id: runId, + dotted_order: dottedOrder, + }), + ], + patch: [], + }); + + expect(JSON.parse(calledRequestParam2?.body)).toEqual({ + post: [ + expect.objectContaining({ + id: runId2, + run_type: "llm", + inputs: { + text: "hello world 2", + }, + trace_id: runId2, + dotted_order: dottedOrder2, + }), + ], + patch: [ + expect.objectContaining({ + id: runId, + dotted_order: dottedOrder, + trace_id: runId, + end_time: endTime, + outputs: { + output: ["Hi"], + }, + }), + ], + }); + }); + + it("should send traces above the batch size and see even batches", async () => { + const client = new Client({ + apiKey: "test-api-key", + pendingAutoBatchedRunLimit: 10, + }); + const callSpy = jest + .spyOn((client as any).caller, "call") + .mockResolvedValue({ + ok: true, + text: () => "", + }); + const projectName = "__test_batch"; + + const runIds = await Promise.all( + [...Array(15)].map(async (_, i) => { + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + await client.createRun({ + id: runId, + project_name: projectName, + name: "test_run " + i, + run_type: "llm", + inputs: { text: "hello world " + i }, + trace_id: runId, + dotted_order: dottedOrder, + }); + return runId; + }) + ); + + await new Promise((resolve) => setTimeout(resolve, 10)); + + const calledRequestParam: any = callSpy.mock.calls[0][2]; + // Second batch should still be pending + expect(callSpy.mock.calls[1]).toBeUndefined(); + // First batch should fire as soon as it hits 10 + expect(JSON.parse(calledRequestParam?.body)).toEqual({ + post: runIds.slice(0, 10).map((runId, i) => + expect.objectContaining({ + id: runId, + run_type: "llm", + inputs: { + text: "hello world " + i, + }, + trace_id: runId, + }) + ), + patch: [], + }); + + // Wait for the aggregation delay + await new Promise((resolve) => setTimeout(resolve, 100)); + + const calledRequestParam2: any = callSpy.mock.calls[1][2]; + + expect(JSON.parse(calledRequestParam2?.body)).toEqual({ + post: runIds.slice(10).map((runId, i) => + expect.objectContaining({ + id: runId, + run_type: "llm", + inputs: { + text: "hello world " + (i + 10), + }, + trace_id: runId, + }) + ), + patch: [], + }); + }); +}); diff --git a/js/src/tests/client.int.test.ts b/js/src/tests/client.int.test.ts index 4d8b371ca..257df3b08 100644 --- a/js/src/tests/client.int.test.ts +++ b/js/src/tests/client.int.test.ts @@ -77,7 +77,7 @@ async function waitUntilRunFound( // Test Dataset Creation, List, Read, Delete + upload CSV // Test Example Creation, List, Read, Update, Delete test.concurrent("Test LangSmith Client Dataset CRD", async () => { - const client = new Client({}); + const client = new Client({ autoBatchTracing: false }); const csvContent = `col1,col2,col3,col4\nval1,val2,val3,val4`; const blobData = new Blob([Buffer.from(csvContent)]); @@ -146,7 +146,7 @@ test.concurrent("Test LangSmith Client Dataset CRD", async () => { test.concurrent( "Test evaluate run", async () => { - const langchainClient = new Client({}); + const langchainClient = new Client({ autoBatchTracing: false }); const projectName = "__test_evaluate_run" + Date.now(); const datasetName = "__test_evaluate_run_dataset" + Date.now(); @@ -281,7 +281,7 @@ test.concurrent( ); test.concurrent("Test persist update run", async () => { - const langchainClient = new Client({}); + const langchainClient = new Client({ autoBatchTracing: false }); const projectName = "__test_persist_update_run"; await deleteProject(langchainClient, projectName); @@ -306,7 +306,7 @@ test.concurrent("Test persist update run", async () => { }); test.concurrent("test create dataset", async () => { - const langchainClient = new Client({}); + const langchainClient = new Client({ autoBatchTracing: false }); const datasetName = "__test_create_dataset"; const datasets = await toArray(langchainClient.listDatasets({ datasetName })); datasets.map(async (dataset: Dataset) => { @@ -330,7 +330,7 @@ test.concurrent("test create dataset", async () => { }); test.concurrent("Test share and unshare run", async () => { - const langchainClient = new Client({}); + const langchainClient = new Client({ autoBatchTracing: false }); // Create a new run const runId = uuidv4(); @@ -355,7 +355,7 @@ test.concurrent("Test share and unshare run", async () => { test.concurrent( "Test list datasets", async () => { - const langchainClient = new Client({}); + const langchainClient = new Client({ autoBatchTracing: false }); const datasetName1 = "___TEST dataset1"; const datasetName2 = "___TEST dataset2"; await deleteDataset(langchainClient, datasetName1); @@ -420,7 +420,7 @@ test.concurrent( test.concurrent( "Test create feedback with source run", async () => { - const langchainClient = new Client({}); + const langchainClient = new Client({ autoBatchTracing: false }); const projectName = "__test_create_feedback_with_source_run"; await deleteProject(langchainClient, projectName); const runId = uuidv4(); @@ -459,7 +459,11 @@ test.concurrent( test.concurrent( "Test create run with masked inputs/outputs", async () => { - const langchainClient = new Client({ hideInputs: true, hideOutputs: true }); + const langchainClient = new Client({ + hideInputs: true, + hideOutputs: true, + autoBatchTracing: false, + }); const projectName = "__test_create_run_with_masked_inputs_outputs"; await deleteProject(langchainClient, projectName); const runId = uuidv4(); @@ -507,7 +511,7 @@ test.concurrent( test.concurrent( "Test create run with revision id", async () => { - const langchainClient = new Client({}); + const langchainClient = new Client({ autoBatchTracing: false }); // eslint-disable-next-line no-process-env process.env.LANGCHAIN_REVISION_ID = "test_revision_id"; // eslint-disable-next-line no-process-env @@ -564,7 +568,7 @@ test.concurrent( describe("createChatExample", () => { it("should convert LangChainBaseMessage objects to examples", async () => { - const langchainClient = new Client({}); + const langchainClient = new Client({ autoBatchTracing: false }); const datasetName = "__createChatExample-test-dataset"; await deleteDataset(langchainClient, datasetName); @@ -619,7 +623,7 @@ describe("createChatExample", () => { test.concurrent( "Test getRunUrl with run", async () => { - const client = new Client({}); + const client = new Client({ autoBatchTracing: false }); const runId = uuidv4(); const run: Run = { id: runId, @@ -646,7 +650,7 @@ test.concurrent( test.concurrent( "Examples CRUD", async () => { - const client = new Client({}); + const client = new Client({ autoBatchTracing: false }); const datasetName = "__test_examples_crud"; await deleteDataset(client, datasetName); const dataset = await client.createDataset(datasetName);