From 8c3c4c9c39e5ab74f3b5060aceeccdbd9766a9b8 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Fri, 9 Feb 2024 12:40:03 -0800 Subject: [PATCH 1/3] Fix start_time getting overwrriten when using batch endpoint (#421) --- python/langsmith/client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index fd56a6ec7..56afb5115 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -840,7 +840,7 @@ def upload_csv( @staticmethod def _run_transform( - run: Union[ls_schemas.Run, dict, ls_schemas.RunLikeDict], + run: Union[ls_schemas.Run, dict, ls_schemas.RunLikeDict], update: bool = False ) -> dict: """ Transforms the given run object into a dictionary representation. @@ -863,7 +863,7 @@ def _run_transform( run_create["inputs"] = _hide_inputs(run_create["inputs"]) if "outputs" in run_create: run_create["outputs"] = _hide_outputs(run_create["outputs"]) - if not run_create.get("start_time"): + if not update and not run_create.get("start_time"): run_create["start_time"] = datetime.datetime.utcnow() return run_create @@ -1024,7 +1024,7 @@ def batch_ingest_runs( return # transform and convert to dicts create_dicts = [self._run_transform(run) for run in create or []] - update_dicts = [self._run_transform(run) for run in update or []] + update_dicts = [self._run_transform(run, update=True) for run in update or []] # combine post and patch dicts where possible if update_dicts and create_dicts: create_by_id = {run["id"]: run for run in create_dicts} From 5bed6af4fbbe6bf400df49bcbcedf88f44452ca0 Mon Sep 17 00:00:00 2001 From: William Fu-Hinthorn <13333726+hinthornw@users.noreply.github.com> Date: Fri, 9 Feb 2024 12:55:44 -0800 Subject: [PATCH 2/3] update --- python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index dced0ec3e..8bed494e1 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "langsmith" -version = "0.0.88" +version = "0.0.89" description = "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform." authors = ["LangChain "] license = "MIT" From c5742d83fbce61406a20d81108aee263c1d05eff Mon Sep 17 00:00:00 2001 From: Jacob Lee Date: Fri, 9 Feb 2024 16:28:09 -0800 Subject: [PATCH 3/3] Adds batch tracing support check for JS (#411) @hinthornw @nfcampos I think this is better than doing a check on startup? --- js/src/client.ts | 35 +++++++++++++++++ js/src/tests/batch_client.test.ts | 65 +++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+) diff --git a/js/src/client.ts b/js/src/client.ts index b650642c0..9770dc607 100644 --- a/js/src/client.ts +++ b/js/src/client.ts @@ -242,6 +242,8 @@ export class Client { private autoBatchTracing = false; + private batchEndpointSupported?: boolean; + private pendingAutoBatchedRuns: AutoBatchQueueItem[] = []; private pendingAutoBatchedRunLimit = 100; @@ -537,6 +539,21 @@ export class Client { } } + protected async batchEndpointIsSupported() { + const response = await fetch(`${this.apiUrl}/info`, { + method: "GET", + headers: { Accept: "application/json" }, + signal: AbortSignal.timeout(this.timeout_ms), + }); + if (!response.ok) { + // consume the response body to release the connection + // https://undici.nodejs.org/#/?id=garbage-collection + await response.text(); + return false; + } + return true; + } + public async createRun(run: CreateRunParams): Promise { if (!this._filterForSampling([run]).length) { return; @@ -632,6 +649,24 @@ export class Client { preparedCreateParams = await mergeRuntimeEnvIntoRunCreates( preparedCreateParams ); + if (this.batchEndpointSupported === undefined) { + this.batchEndpointSupported = await this.batchEndpointIsSupported(); + } + if (!this.batchEndpointSupported) { + this.autoBatchTracing = false; + for (const preparedCreateParam of body.post) { + await this.createRun(preparedCreateParam as CreateRunParams); + } + for (const preparedUpdateParam of body.patch) { + if (preparedUpdateParam.id !== undefined) { + await this.updateRun( + preparedUpdateParam.id, + preparedUpdateParam as UpdateRunParams + ); + } + } + return; + } const headers = { ...this.headers, "Content-Type": "application/json", diff --git a/js/src/tests/batch_client.test.ts b/js/src/tests/batch_client.test.ts index 36dab304f..9b7a10568 100644 --- a/js/src/tests/batch_client.test.ts +++ b/js/src/tests/batch_client.test.ts @@ -15,6 +15,9 @@ describe("Batch client tracing", () => { ok: true, text: () => "", }); + jest + .spyOn(client as any, "batchEndpointIsSupported") + .mockResolvedValue(true); const projectName = "__test_batch"; const runId = uuidv4(); @@ -68,6 +71,9 @@ describe("Batch client tracing", () => { ok: true, text: () => "", }); + jest + .spyOn(client as any, "batchEndpointIsSupported") + .mockResolvedValue(true); const projectName = "__test_batch"; const runId = uuidv4(); @@ -134,6 +140,9 @@ describe("Batch client tracing", () => { ok: true, text: () => "", }); + jest + .spyOn(client as any, "batchEndpointIsSupported") + .mockResolvedValue(true); const projectName = "__test_batch"; const runId = uuidv4(); @@ -235,6 +244,9 @@ describe("Batch client tracing", () => { ok: true, text: () => "", }); + jest + .spyOn(client as any, "batchEndpointIsSupported") + .mockResolvedValue(true); const projectName = "__test_batch"; const runIds = await Promise.all( @@ -296,4 +308,57 @@ describe("Batch client tracing", () => { patch: [], }); }); + + it("If batching is unsupported, fall back to old endpoint", async () => { + const client = new Client({ + apiKey: "test-api-key", + autoBatchTracing: true, + }); + const callSpy = jest + .spyOn((client as any).caller, "call") + .mockResolvedValue({ + ok: true, + text: () => "", + }); + jest + .spyOn(client as any, "batchEndpointIsSupported") + .mockResolvedValue(false); + const projectName = "__test_batch"; + + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + await client.createRun({ + id: runId, + project_name: projectName, + name: "test_run", + run_type: "llm", + inputs: { text: "hello world" }, + trace_id: runId, + dotted_order: dottedOrder, + }); + + await new Promise((resolve) => setTimeout(resolve, 300)); + + const calledRequestParam: any = callSpy.mock.calls[0][2]; + expect(JSON.parse(calledRequestParam?.body)).toMatchObject({ + id: runId, + session_name: projectName, + extra: expect.anything(), + start_time: expect.any(Number), + name: "test_run", + run_type: "llm", + inputs: { text: "hello world" }, + trace_id: runId, + dotted_order: dottedOrder, + }); + + expect(callSpy).toHaveBeenCalledWith( + fetch, + "https://api.smith.langchain.com/runs", + expect.objectContaining({ body: expect.any(String) }) + ); + }); });