From 997204c6c5e4662cdbb0480ed4daafe9ccb6cdde Mon Sep 17 00:00:00 2001 From: Tat Dat Duong Date: Tue, 22 Oct 2024 16:19:46 +0200 Subject: [PATCH] Fix ordering of recorded runs, buffer runs --- js/src/tests/utils/iterator.ts | 7 + js/src/tests/vercel_exporter.int.test.ts | 135 ++--- js/src/wrappers/vercel/exporter.ts | 596 ++++++++++++++--------- 3 files changed, 421 insertions(+), 317 deletions(-) create mode 100644 js/src/tests/utils/iterator.ts diff --git a/js/src/tests/utils/iterator.ts b/js/src/tests/utils/iterator.ts new file mode 100644 index 000000000..4734369ba --- /dev/null +++ b/js/src/tests/utils/iterator.ts @@ -0,0 +1,7 @@ +export async function gatherIterator( + i: AsyncIterable | Promise> +): Promise> { + const out: T[] = []; + for await (const item of await i) out.push(item); + return out; +} diff --git a/js/src/tests/vercel_exporter.int.test.ts b/js/src/tests/vercel_exporter.int.test.ts index 18e5e3650..7662e5434 100644 --- a/js/src/tests/vercel_exporter.int.test.ts +++ b/js/src/tests/vercel_exporter.int.test.ts @@ -12,6 +12,7 @@ import { embedMany, } from "ai"; import { tool } from "ai"; +import { gatherIterator } from "./utils/iterator.js"; const telemetrySettings = { isEnabled: true, @@ -22,19 +23,11 @@ const telemetrySettings = { }, }; -test.concurrent("generateText", async () => { - const traceExporter = new LangSmithAISDKExporter(); - const sdk = new NodeSDK({ traceExporter }); - sdk.start(); - - function getOrders(userId: string) { - return `User ${userId} has the following orders: 1`; - } - - function getTrackingInformation(orderId: string) { - return `Here is the tracking information for ${orderId}`; - } +const traceExporter = new LangSmithAISDKExporter(); +const sdk = new NodeSDK({ traceExporter }); +sdk.start(); +test("generateText", async () => { await generateText({ model: openai("gpt-4o-mini"), messages: [ @@ -47,34 +40,48 @@ test.concurrent("generateText", async () => { listOrders: tool({ description: "list all orders", parameters: z.object({ userId: z.string() }), - execute: async ({ userId }) => getOrders(userId), + execute: async ({ userId }) => + `User ${userId} has the following orders: 1`, }), viewTrackingInformation: tool({ description: "view tracking information for a specific order", parameters: z.object({ orderId: z.string() }), - execute: async ({ orderId }) => getTrackingInformation(orderId), + execute: async ({ orderId }) => + `Here is the tracking information for ${orderId}`, }), }, experimental_telemetry: telemetrySettings, maxSteps: 10, }); - await sdk.shutdown(); + await traceExporter.forceFlush?.(); }); -test.concurrent("streamText", async () => { - const traceExporter = new LangSmithAISDKExporter(); - const sdk = new NodeSDK({ traceExporter }); - sdk.start(); - - function getOrders(userId: string) { - return `User ${userId} has the following orders: 1`; - } +test("generateText with image", async () => { + await generateText({ + model: openai("gpt-4o-mini"), + messages: [ + { + role: "user", + content: [ + { + type: "text", + text: "What's in this picture?", + }, + { + type: "image", + image: new URL("https://picsum.photos/200/300"), + }, + ], + }, + ], + experimental_telemetry: telemetrySettings, + }); - function getTrackingInformation(orderId: string) { - return `Here is the tracking information for ${orderId}`; - } + await traceExporter.forceFlush?.(); +}); +test("streamText", async () => { const result = await streamText({ model: openai("gpt-4o-mini"), messages: [ @@ -87,30 +94,25 @@ test.concurrent("streamText", async () => { listOrders: tool({ description: "list all orders", parameters: z.object({ userId: z.string() }), - execute: async ({ userId }) => getOrders(userId), + execute: async ({ userId }) => + `User ${userId} has the following orders: 1`, }), viewTrackingInformation: tool({ description: "view tracking information for a specific order", parameters: z.object({ orderId: z.string() }), - execute: async ({ orderId }) => getTrackingInformation(orderId), + execute: async ({ orderId }) => + `Here is the tracking information for ${orderId}`, }), }, experimental_telemetry: { isEnabled: true }, maxSteps: 10, }); - for await (const _stream of result.fullStream) { - // consume - } - - await sdk.shutdown(); + await gatherIterator(result.fullStream); + await traceExporter.forceFlush?.(); }); -test.concurrent("generateObject", async () => { - const traceExporter = new LangSmithAISDKExporter(); - const sdk = new NodeSDK({ traceExporter }); - sdk.start(); - +test("generateObject", async () => { await generateObject({ model: openai("gpt-4o-mini", { structuredOutputs: true }), schema: z.object({ @@ -123,14 +125,10 @@ test.concurrent("generateObject", async () => { experimental_telemetry: telemetrySettings, }); - await sdk.shutdown(); + await traceExporter.forceFlush?.(); }); -test.concurrent("streamObject", async () => { - const traceExporter = new LangSmithAISDKExporter(); - const sdk = new NodeSDK({ traceExporter }); - sdk.start(); - +test("streamObject", async () => { const result = await streamObject({ model: openai("gpt-4o-mini", { structuredOutputs: true }), schema: z.object({ @@ -143,32 +141,21 @@ test.concurrent("streamObject", async () => { experimental_telemetry: telemetrySettings, }); - for await (const _partialObject of result.partialObjectStream) { - // pass - } - - await sdk.shutdown(); + await gatherIterator(result.partialObjectStream); + await traceExporter.forceFlush?.(); }); -test.concurrent("embed", async () => { - const traceExporter = new LangSmithAISDKExporter(); - const sdk = new NodeSDK({ traceExporter }); - sdk.start(); - +test("embed", async () => { await embed({ model: openai.embedding("text-embedding-3-small"), value: "prague castle at sunset", experimental_telemetry: telemetrySettings, }); - await sdk.shutdown(); + await traceExporter.forceFlush?.(); }); -test.concurrent("embedMany", async () => { - const traceExporter = new LangSmithAISDKExporter(); - const sdk = new NodeSDK({ traceExporter }); - sdk.start(); - +test("embedMany", async () => { await embedMany({ model: openai.embedding("text-embedding-3-small"), values: [ @@ -179,33 +166,9 @@ test.concurrent("embedMany", async () => { experimental_telemetry: telemetrySettings, }); - await sdk.shutdown(); + await traceExporter.forceFlush?.(); }); -test.concurrent("generateText with image", async () => { - const traceExporter = new LangSmithAISDKExporter(); - const sdk = new NodeSDK({ traceExporter }); - sdk.start(); - - await generateText({ - model: openai("gpt-4o-mini"), - messages: [ - { - role: "user", - content: [ - { - type: "text", - text: "What's in this picture?", - }, - { - type: "image", - image: new URL("https://picsum.photos/200/300"), - }, - ], - }, - ], - experimental_telemetry: telemetrySettings, - }); - +afterAll(async () => { await sdk.shutdown(); }); diff --git a/js/src/wrappers/vercel/exporter.ts b/js/src/wrappers/vercel/exporter.ts index 4f2b6131b..156222273 100644 --- a/js/src/wrappers/vercel/exporter.ts +++ b/js/src/wrappers/vercel/exporter.ts @@ -2,9 +2,9 @@ import type { ReadableSpan, SpanExporter } from "@opentelemetry/sdk-trace-base"; import type { ExportResult } from "@opentelemetry/core"; import type { CoreAssistantMessage, CoreMessage, ToolCallPart } from "ai"; import type { AISDKSpan } from "./exporter.types.js"; -import { Client, RunTree, RunTreeConfig } from "../../index.js"; -import { KVMap } from "../../schemas.js"; -import { AsyncLocalStorageProviderSingleton } from "../../singletons/traceable.js"; +import { Client } from "../../index.js"; +import { KVMap, RunCreate } from "../../schemas.js"; +import { v5 as uuid5 } from "uuid"; function assertNever(x: never): never { throw new Error("Unreachable state: " + x); @@ -174,163 +174,189 @@ const tryJson = ( } }; -const sortByHrTime = (a: ReadableSpan, b: ReadableSpan) => { +function stripNonAlphanumeric(input: string) { + return input.replace(/[-:.]/g, ""); +} + +function convertToDottedOrderFormat( + [seconds, nanoseconds]: [seconds: number, nanoseconds: number], + runId: string, + executionOrder: number +) { + // Date only has millisecond precision, so we use the microseconds to break + // possible ties, avoiding incorrect run order + const ms = Number(String(nanoseconds).slice(0, 3)); + const ns = String(Number(String(nanoseconds).slice(3, 6)) + executionOrder) + .padStart(3, "0") + .slice(0, 3); + return ( - Math.sign(a.startTime[0] - b.startTime[0]) || - Math.sign(a.startTime[1] - b.startTime[1]) + stripNonAlphanumeric( + `${new Date(seconds * 1000 + ms).toISOString().slice(0, -1)}${ns}Z` + ) + runId ); -}; +} + +function convertToTimestamp([seconds, nanoseconds]: [ + seconds: number, + nanoseconds: number +]) { + const ms = String(nanoseconds).slice(0, 3); + return Number(String(seconds) + ms); +} + +const RUN_ID_NS = "5c718b20-9078-11ef-9a3d-325096b39f47"; + +interface RunTask { + id: string; + parentId: string | undefined; + startTime: [seconds: number, nanoseconds: number]; + run: RunCreate; + sent: boolean; + executionOrder: number; +} export class LangSmithAISDKExporter implements SpanExporter { - private client: Client | undefined; + private client: Client; + private traceByMap: Record< + string, + { + childMap: Record; + nodeMap: Record; + relativeExecutionOrder: Record; + } + > = {}; constructor(args?: { client?: Client }) { - this.client = args?.client; + this.client = args?.client ?? new Client(); } - export( - spans: ReadableSpan[], - resultCallback: (result: ExportResult) => void - ): void { - const runTreeMap: Record = {}; - const sortedSpans = [...spans].sort(sortByHrTime) as AISDKSpan[]; - - for (const span of sortedSpans) { - const spanId = span.spanContext().spanId; - const parentSpanId = span.parentSpanId; - let parentRunTree = parentSpanId ? runTreeMap[parentSpanId] : null; - - if (parentRunTree == null) { - try { - parentRunTree = - AsyncLocalStorageProviderSingleton.getInstance().getStore() ?? null; - } catch { - // pass - } + protected getRunCreate(span: AISDKSpan): RunCreate { + const runId = uuid5(span.spanContext().spanId, RUN_ID_NS); + const parentRunId = span.parentSpanId + ? uuid5(span.parentSpanId, RUN_ID_NS) + : undefined; + + const asRunCreate = (rawConfig: RunCreate) => { + const aiMetadata = Object.keys(span.attributes) + .filter((key) => key.startsWith("ai.telemetry.metadata.")) + .reduce((acc, key) => { + acc[key.slice("ai.telemetry.metadata.".length)] = + span.attributes[key as keyof typeof span.attributes]; + + return acc; + }, {} as Record); + + if ( + ("ai.telemetry.functionId" in span.attributes && + span.attributes["ai.telemetry.functionId"]) || + ("resource.name" in span.attributes && span.attributes["resource.name"]) + ) { + aiMetadata["functionId"] = + span.attributes["ai.telemetry.functionId"] || + span.attributes["resource.name"]; } - const toRunTree = (rawConfig: RunTreeConfig) => { - const aiMetadata = Object.keys(span.attributes) - .filter((key) => key.startsWith("ai.telemetry.metadata.")) - .reduce((acc, key) => { - acc[key.slice("ai.telemetry.metadata.".length)] = - span.attributes[key as keyof typeof span.attributes]; - - return acc; - }, {} as Record); - - if ( - ("ai.telemetry.functionId" in span.attributes && - span.attributes["ai.telemetry.functionId"]) || - ("resource.name" in span.attributes && - span.attributes["resource.name"]) - ) { - aiMetadata["functionId"] = - span.attributes["ai.telemetry.functionId"] || - span.attributes["resource.name"]; - } - - const config: RunTreeConfig = { - ...rawConfig, + const config: RunCreate = { + ...rawConfig, + id: runId, + parent_run_id: parentRunId, + extra: { + ...rawConfig.extra, metadata: { - ...rawConfig.metadata, + ...rawConfig.extra?.metadata, ...aiMetadata, "ai.operationId": span.attributes["ai.operationId"], }, - start_time: +( - String(span.startTime[0]) + String(span.startTime[1]).slice(0, 3) - ), - end_time: +( - String(span.endTime[0]) + String(span.endTime[1]).slice(0, 3) - ), - client: this.client, - }; - const runTree = - parentRunTree?.createChild(config) ?? new RunTree(config); - this.client ??= runTree.client; - - return runTree; + }, + start_time: convertToTimestamp(span.startTime), + end_time: convertToTimestamp(span.endTime), }; - switch (span.name) { - case "ai.generateText.doGenerate": - case "ai.generateText": - case "ai.streamText.doStream": - case "ai.streamText": { - const inputs = ((): KVMap | undefined => { - if ("ai.prompt.messages" in span.attributes) { - return { - messages: tryJson( - span.attributes["ai.prompt.messages"] - ).flatMap((i: CoreMessage) => convertCoreToSmith(i)), - }; - } - - if ("ai.prompt" in span.attributes) { - const input = tryJson(span.attributes["ai.prompt"]); - - if ( - typeof input === "object" && - input != null && - "messages" in input && - Array.isArray(input.messages) - ) { - return { - messages: input.messages.flatMap((i: CoreMessage) => - convertCoreToSmith(i) - ), - }; - } + return config; + }; + + switch (span.name) { + case "ai.generateText.doGenerate": + case "ai.generateText": + case "ai.streamText.doStream": + case "ai.streamText": { + const inputs = ((): KVMap => { + if ("ai.prompt.messages" in span.attributes) { + return { + messages: tryJson(span.attributes["ai.prompt.messages"]).flatMap( + (i: CoreMessage) => convertCoreToSmith(i) + ), + }; + } - return { input }; - } + if ("ai.prompt" in span.attributes) { + const input = tryJson(span.attributes["ai.prompt"]); - return undefined; - })(); - - const outputs = ((): KVMap | undefined => { - let result: KVMap | undefined = undefined; - if (span.attributes["ai.response.toolCalls"]) { - result = { - llm_output: convertCoreToSmith({ - role: "assistant", - content: tryJson(span.attributes["ai.response.toolCalls"]), - } satisfies CoreAssistantMessage), - }; - } else if (span.attributes["ai.response.text"]) { - result = { - llm_output: convertCoreToSmith({ - role: "assistant", - content: span.attributes["ai.response.text"], - }), + if ( + typeof input === "object" && + input != null && + "messages" in input && + Array.isArray(input.messages) + ) { + return { + messages: input.messages.flatMap((i: CoreMessage) => + convertCoreToSmith(i) + ), }; } - if (span.attributes["ai.usage.completionTokens"]) { - result ??= {}; - result.llm_output ??= {}; - result.llm_output.token_usage ??= {}; - result.llm_output.token_usage["completion_tokens"] = - span.attributes["ai.usage.completionTokens"]; - } + return { input }; + } - if (span.attributes["ai.usage.promptTokens"]) { - result ??= {}; - result.llm_output ??= {}; - result.llm_output.token_usage ??= {}; - result.llm_output.token_usage["prompt_tokens"] = - span.attributes["ai.usage.promptTokens"]; - } + return {}; + })(); + + const outputs = ((): KVMap | undefined => { + let result: KVMap | undefined = undefined; + if (span.attributes["ai.response.toolCalls"]) { + result = { + llm_output: convertCoreToSmith({ + role: "assistant", + content: tryJson(span.attributes["ai.response.toolCalls"]), + } satisfies CoreAssistantMessage), + }; + } else if (span.attributes["ai.response.text"]) { + result = { + llm_output: convertCoreToSmith({ + role: "assistant", + content: span.attributes["ai.response.text"], + }), + }; + } + + if (span.attributes["ai.usage.completionTokens"]) { + result ??= {}; + result.llm_output ??= {}; + result.llm_output.token_usage ??= {}; + result.llm_output.token_usage["completion_tokens"] = + span.attributes["ai.usage.completionTokens"]; + } - return result; - })(); + if (span.attributes["ai.usage.promptTokens"]) { + result ??= {}; + result.llm_output ??= {}; + result.llm_output.token_usage ??= {}; + result.llm_output.token_usage["prompt_tokens"] = + span.attributes["ai.usage.promptTokens"]; + } - // TODO: add first_token_time - runTreeMap[spanId] = toRunTree({ - run_type: "llm", - name: span.attributes["ai.model.provider"], - inputs, - outputs, + return result; + })(); + + // TODO: add first_token_time + return asRunCreate({ + run_type: "llm", + name: span.attributes["ai.model.provider"], + inputs, + outputs, + extra: { + batch_size: 1, metadata: { ls_provider: span.attributes["ai.model.provider"] .split(".") @@ -340,88 +366,89 @@ export class LangSmithAISDKExporter implements SpanExporter { .at(1), ls_model_name: span.attributes["ai.model.id"], }, - extra: { batch_size: 1 }, - }); - break; - } - - case "ai.toolCall": { - const args = tryJson(span.attributes["ai.toolCall.args"]); - let inputs: KVMap | undefined = { args }; + }, + }); + break; + } - if (typeof args === "object" && args != null) { - inputs = args; - } + case "ai.toolCall": { + const args = tryJson(span.attributes["ai.toolCall.args"]); + let inputs: KVMap = { args }; - const output = tryJson(span.attributes["ai.toolCall.result"]); - let outputs: KVMap | undefined = { output }; + if (typeof args === "object" && args != null) { + inputs = args; + } - if (typeof output === "object" && output != null) { - outputs = output; - } + const output = tryJson(span.attributes["ai.toolCall.result"]); + let outputs: KVMap = { output }; - runTreeMap[spanId] = toRunTree({ - run_type: "tool", - name: span.attributes["ai.toolCall.name"], - inputs, - outputs, - }); - break; + if (typeof output === "object" && output != null) { + outputs = output; } - case "ai.streamObject": - case "ai.streamObject.doStream": - case "ai.generateObject": - case "ai.generateObject.doGenerate": { - const inputs = ((): KVMap | undefined => { - if ("ai.prompt.messages" in span.attributes) { - return { - messages: tryJson( - span.attributes["ai.prompt.messages"] - ).flatMap((i: CoreMessage) => convertCoreToSmith(i)), - }; - } + return asRunCreate({ + run_type: "tool", + name: span.attributes["ai.toolCall.name"], + inputs, + outputs, + }); + } - if ("ai.prompt" in span.attributes) { - return { input: tryJson(span.attributes["ai.prompt"]) }; - } + case "ai.streamObject": + case "ai.streamObject.doStream": + case "ai.generateObject": + case "ai.generateObject.doGenerate": { + const inputs = ((): KVMap => { + if ("ai.prompt.messages" in span.attributes) { + return { + messages: tryJson(span.attributes["ai.prompt.messages"]).flatMap( + (i: CoreMessage) => convertCoreToSmith(i) + ), + }; + } - return undefined; - })(); + if ("ai.prompt" in span.attributes) { + return { input: tryJson(span.attributes["ai.prompt"]) }; + } - const outputs = ((): KVMap | undefined => { - let result: KVMap | undefined = undefined; + return {}; + })(); - if (span.attributes["ai.response.object"]) { - result = { - output: tryJson(span.attributes["ai.response.object"]), - }; - } + const outputs = ((): KVMap | undefined => { + let result: KVMap | undefined = undefined; - if (span.attributes["ai.usage.completionTokens"]) { - result ??= {}; - result.llm_output ??= {}; - result.llm_output.token_usage ??= {}; - result.llm_output.token_usage["completion_tokens"] = - span.attributes["ai.usage.completionTokens"]; - } + if (span.attributes["ai.response.object"]) { + result = { + output: tryJson(span.attributes["ai.response.object"]), + }; + } - if (span.attributes["ai.usage.promptTokens"]) { - result ??= {}; - result.llm_output ??= {}; - result.llm_output.token_usage ??= {}; - result.llm_output.token_usage["prompt_tokens"] = - +span.attributes["ai.usage.promptTokens"]; - } + if (span.attributes["ai.usage.completionTokens"]) { + result ??= {}; + result.llm_output ??= {}; + result.llm_output.token_usage ??= {}; + result.llm_output.token_usage["completion_tokens"] = + span.attributes["ai.usage.completionTokens"]; + } - return result; - })(); + if (span.attributes["ai.usage.promptTokens"]) { + result ??= {}; + result.llm_output ??= {}; + result.llm_output.token_usage ??= {}; + result.llm_output.token_usage["prompt_tokens"] = + +span.attributes["ai.usage.promptTokens"]; + } + + return result; + })(); - runTreeMap[spanId] = toRunTree({ - run_type: "llm", - name: span.attributes["ai.model.provider"], - inputs, - outputs, + return asRunCreate({ + run_type: "llm", + name: span.attributes["ai.model.provider"], + inputs, + outputs, + extra: { + batch_size: 1, metadata: { ls_provider: span.attributes["ai.model.provider"] .split(".") @@ -431,41 +458,137 @@ export class LangSmithAISDKExporter implements SpanExporter { .at(1), ls_model_name: span.attributes["ai.model.id"], }, - extra: { batch_size: 1 }, - }); - break; - } + }, + }); + } - case "ai.embed": { - runTreeMap[spanId] = toRunTree({ - run_type: "chain", - name: span.attributes["ai.model.provider"], - inputs: { value: tryJson(span.attributes["ai.value"]) }, - outputs: { embedding: tryJson(span.attributes["ai.embedding"]) }, - }); - break; - } - case "ai.embed.doEmbed": - case "ai.embedMany": - case "ai.embedMany.doEmbed": { - runTreeMap[spanId] = toRunTree({ - run_type: "chain", - name: span.attributes["ai.model.provider"], - inputs: { values: span.attributes["ai.values"].map(tryJson) }, - outputs: { - embeddings: span.attributes["ai.embeddings"].map(tryJson), + case "ai.embed": { + return asRunCreate({ + run_type: "chain", + name: span.attributes["ai.model.provider"], + inputs: { value: tryJson(span.attributes["ai.value"]) }, + outputs: { embedding: tryJson(span.attributes["ai.embedding"]) }, + }); + } + case "ai.embed.doEmbed": + case "ai.embedMany": + case "ai.embedMany.doEmbed": { + return asRunCreate({ + run_type: "chain", + name: span.attributes["ai.model.provider"], + inputs: { values: span.attributes["ai.values"].map(tryJson) }, + outputs: { + embeddings: span.attributes["ai.embeddings"].map(tryJson), + }, + }); + } + + default: + assertNever(span); + } + } + + export( + spans: ReadableSpan[], + resultCallback: (result: ExportResult) => void + ): void { + for (const span of spans) { + const { traceId, spanId } = span.spanContext(); + const parentId = span.parentSpanId ?? undefined; + this.traceByMap[traceId] ??= { + childMap: {}, + nodeMap: {}, + relativeExecutionOrder: {}, + }; + + const runId = uuid5(spanId, RUN_ID_NS); + const parentRunId = parentId ? uuid5(parentId, RUN_ID_NS) : undefined; + + const traceMap = this.traceByMap[traceId]; + + traceMap.relativeExecutionOrder[parentRunId ?? "$"] ??= -1; + traceMap.relativeExecutionOrder[parentRunId ?? "$"] += 1; + + traceMap.nodeMap[runId] ??= { + id: runId, + parentId: parentRunId, + startTime: span.startTime, + run: this.getRunCreate(span as AISDKSpan), + sent: false, + executionOrder: traceMap.relativeExecutionOrder[parentRunId ?? "$"], + }; + + traceMap.childMap[parentRunId ?? "$"] ??= []; + traceMap.childMap[parentRunId ?? "$"].push(traceMap.nodeMap[runId]); + } + + // collect all subgraphs + const sampled: [ + { + dotted_order: string; + id: string; + trace_id: string; + parent_run_id: string | undefined; + }, + RunCreate + ][] = []; + + for (const traceId of Object.keys(this.traceByMap)) { + type QueueItem = { item: RunTask; dottedOrder: string; traceId: string }; + + const queue: QueueItem[] = + this.traceByMap[traceId].childMap["$"]?.map((item) => ({ + item, + dottedOrder: convertToDottedOrderFormat( + item.startTime, + item.id, + item.executionOrder + ), + traceId: item.id, + })) ?? []; + + const seen = new Set(); + while (queue.length) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const task = queue.shift()!; + if (seen.has(task.item.id)) continue; + + if (!task.item.sent) { + sampled.push([ + { + id: task.item.id, + parent_run_id: task.item.parentId, + dotted_order: task.dottedOrder, + trace_id: task.traceId, }, - }); - break; + task.item.run, + ]); + task.item.sent = true; } - default: - assertNever(span); + const children = this.traceByMap[traceId].childMap[task.item.id] ?? []; + queue.push( + ...children.map((child) => ({ + item: child, + dottedOrder: [ + task.dottedOrder, + convertToDottedOrderFormat( + child.startTime, + child.id, + child.executionOrder + ), + ].join("."), + traceId: task.traceId, + })) + ); } } Promise.all( - Object.values(runTreeMap).map((runTree) => runTree.postRun()) + sampled.map(([required, value]) => { + const payload = { ...value, ...required }; + return this.client.createRun(payload); + }) ).then( () => resultCallback({ code: 0 }), (error) => resultCallback({ code: 1, error }) @@ -473,6 +596,17 @@ export class LangSmithAISDKExporter implements SpanExporter { } async shutdown(): Promise { + // find nodes which are incomplete + const incompleteNodes = Object.values(this.traceByMap).flatMap((trace) => + Object.values(trace.nodeMap).filter((i) => !i.sent) + ); + + if (incompleteNodes.length > 0) { + console.warn( + "Some incomplete nodes were found before shutdown and not sent to LangSmith." + ); + } + await this.client?.awaitPendingTraceBatches(); } async forceFlush?(): Promise {