diff --git a/langchain-core/package.json b/langchain-core/package.json index 8efc8c4bb3d1..ad9a7de938a9 100644 --- a/langchain-core/package.json +++ b/langchain-core/package.json @@ -45,7 +45,7 @@ "camelcase": "6", "decamelize": "1.2.0", "js-tiktoken": "^1.0.12", - "langsmith": "~0.1.30", + "langsmith": "~0.1.39", "ml-distance": "^4.0.0", "mustache": "^4.2.0", "p-queue": "^6.6.2", diff --git a/langchain-core/src/callbacks/manager.ts b/langchain-core/src/callbacks/manager.ts index ff22fddf149c..6e784c0f90f0 100644 --- a/langchain-core/src/callbacks/manager.ts +++ b/langchain-core/src/callbacks/manager.ts @@ -20,9 +20,10 @@ import { consumeCallback } from "./promises.js"; import { Serialized } from "../load/serializable.js"; import type { DocumentInterface } from "../documents/document.js"; import { isTracingEnabled } from "../utils/callbacks.js"; +import { isBaseTracer } from "../tracers/base.js"; if ( - /* #__PURE__ */ getEnvironmentVariable("LANGCHAIN_TRACING_V2") === "true" && + /* #__PURE__ */ isTracingEnabled() && /* #__PURE__ */ getEnvironmentVariable("LANGCHAIN_CALLBACKS_BACKGROUND") !== "true" ) { @@ -607,31 +608,47 @@ export class CallbackManager const runId_ = idx === 0 && runId ? runId : uuidv4(); await Promise.all( - this.handlers.map((handler) => - consumeCallback(async () => { - if (!handler.ignoreLLM) { - try { - await handler.handleLLMStart?.( - llm, - [prompt], - runId_, - this._parentRunId, - extraParams, - this.tags, - this.metadata, - runName - ); - } catch (err) { - console.error( - `Error in handler ${handler.constructor.name}, handleLLMStart: ${err}` - ); - if (handler.raiseError) { - throw err; - } + this.handlers.map((handler) => { + if (handler.ignoreLLM) { + return; + } + if (isBaseTracer(handler)) { + // Create and add run to the run map. + // We do this synchronously to avoid race conditions + // when callbacks are backgrounded. + handler._createRunForLLMStart( + llm, + [prompt], + runId_, + this._parentRunId, + extraParams, + this.tags, + this.metadata, + runName + ); + } + return consumeCallback(async () => { + try { + await handler.handleLLMStart?.( + llm, + [prompt], + runId_, + this._parentRunId, + extraParams, + this.tags, + this.metadata, + runName + ); + } catch (err) { + console.error( + `Error in handler ${handler.constructor.name}, handleLLMStart: ${err}` + ); + if (handler.raiseError) { + throw err; } } - }, handler.awaitHandlers) - ) + }, handler.awaitHandlers); + }) ); return new CallbackManagerForLLMRun( @@ -664,45 +681,61 @@ export class CallbackManager const runId_ = idx === 0 && runId ? runId : uuidv4(); await Promise.all( - this.handlers.map((handler) => - consumeCallback(async () => { - if (!handler.ignoreLLM) { - try { - if (handler.handleChatModelStart) { - await handler.handleChatModelStart?.( - llm, - [messageGroup], - runId_, - this._parentRunId, - extraParams, - this.tags, - this.metadata, - runName - ); - } else if (handler.handleLLMStart) { - const messageString = getBufferString(messageGroup); - await handler.handleLLMStart?.( - llm, - [messageString], - runId_, - this._parentRunId, - extraParams, - this.tags, - this.metadata, - runName - ); - } - } catch (err) { - console.error( - `Error in handler ${handler.constructor.name}, handleLLMStart: ${err}` + this.handlers.map((handler) => { + if (handler.ignoreLLM) { + return; + } + if (isBaseTracer(handler)) { + // Create and add run to the run map. + // We do this synchronously to avoid race conditions + // when callbacks are backgrounded. + handler._createRunForChatModelStart( + llm, + [messageGroup], + runId_, + this._parentRunId, + extraParams, + this.tags, + this.metadata, + runName + ); + } + return consumeCallback(async () => { + try { + if (handler.handleChatModelStart) { + await handler.handleChatModelStart?.( + llm, + [messageGroup], + runId_, + this._parentRunId, + extraParams, + this.tags, + this.metadata, + runName ); - if (handler.raiseError) { - throw err; - } + } else if (handler.handleLLMStart) { + const messageString = getBufferString(messageGroup); + await handler.handleLLMStart?.( + llm, + [messageString], + runId_, + this._parentRunId, + extraParams, + this.tags, + this.metadata, + runName + ); + } + } catch (err) { + console.error( + `Error in handler ${handler.constructor.name}, handleLLMStart: ${err}` + ); + if (handler.raiseError) { + throw err; } } - }, handler.awaitHandlers) - ) + }, handler.awaitHandlers); + }) ); return new CallbackManagerForLLMRun( @@ -729,31 +762,47 @@ export class CallbackManager runName: string | undefined = undefined ): Promise { await Promise.all( - this.handlers.map((handler) => - consumeCallback(async () => { - if (!handler.ignoreChain) { - try { - await handler.handleChainStart?.( - chain, - inputs, - runId, - this._parentRunId, - this.tags, - this.metadata, - runType, - runName - ); - } catch (err) { - console.error( - `Error in handler ${handler.constructor.name}, handleChainStart: ${err}` - ); - if (handler.raiseError) { - throw err; - } + this.handlers.map((handler) => { + if (handler.ignoreChain) { + return; + } + if (isBaseTracer(handler)) { + // Create and add run to the run map. + // We do this synchronously to avoid race conditions + // when callbacks are backgrounded. + handler._createRunForChainStart( + chain, + inputs, + runId, + this._parentRunId, + this.tags, + this.metadata, + runType, + runName + ); + } + return consumeCallback(async () => { + try { + await handler.handleChainStart?.( + chain, + inputs, + runId, + this._parentRunId, + this.tags, + this.metadata, + runType, + runName + ); + } catch (err) { + console.error( + `Error in handler ${handler.constructor.name}, handleChainStart: ${err}` + ); + if (handler.raiseError) { + throw err; } } - }, handler.awaitHandlers) - ) + }, handler.awaitHandlers); + }) ); return new CallbackManagerForChainRun( runId, @@ -777,30 +826,45 @@ export class CallbackManager runName: string | undefined = undefined ): Promise { await Promise.all( - this.handlers.map((handler) => - consumeCallback(async () => { - if (!handler.ignoreAgent) { - try { - await handler.handleToolStart?.( - tool, - input, - runId, - this._parentRunId, - this.tags, - this.metadata, - runName - ); - } catch (err) { - console.error( - `Error in handler ${handler.constructor.name}, handleToolStart: ${err}` - ); - if (handler.raiseError) { - throw err; - } + this.handlers.map((handler) => { + if (handler.ignoreAgent) { + return; + } + if (isBaseTracer(handler)) { + // Create and add run to the run map. + // We do this synchronously to avoid race conditions + // when callbacks are backgrounded. + handler._createRunForToolStart( + tool, + input, + runId, + this._parentRunId, + this.tags, + this.metadata, + runName + ); + } + return consumeCallback(async () => { + try { + await handler.handleToolStart?.( + tool, + input, + runId, + this._parentRunId, + this.tags, + this.metadata, + runName + ); + } catch (err) { + console.error( + `Error in handler ${handler.constructor.name}, handleToolStart: ${err}` + ); + if (handler.raiseError) { + throw err; } } - }, handler.awaitHandlers) - ) + }, handler.awaitHandlers); + }) ); return new CallbackManagerForToolRun( runId, @@ -824,30 +888,45 @@ export class CallbackManager runName: string | undefined = undefined ): Promise { await Promise.all( - this.handlers.map((handler) => - consumeCallback(async () => { - if (!handler.ignoreRetriever) { - try { - await handler.handleRetrieverStart?.( - retriever, - query, - runId, - this._parentRunId, - this.tags, - this.metadata, - runName - ); - } catch (err) { - console.error( - `Error in handler ${handler.constructor.name}, handleRetrieverStart: ${err}` - ); - if (handler.raiseError) { - throw err; - } + this.handlers.map((handler) => { + if (handler.ignoreRetriever) { + return; + } + if (isBaseTracer(handler)) { + // Create and add run to the run map. + // We do this synchronously to avoid race conditions + // when callbacks are backgrounded. + handler._createRunForRetrieverStart( + retriever, + query, + runId, + this._parentRunId, + this.tags, + this.metadata, + runName + ); + } + return consumeCallback(async () => { + try { + await handler.handleRetrieverStart?.( + retriever, + query, + runId, + this._parentRunId, + this.tags, + this.metadata, + runName + ); + } catch (err) { + console.error( + `Error in handler ${handler.constructor.name}, handleRetrieverStart: ${err}` + ); + if (handler.raiseError) { + throw err; } } - }, handler.awaitHandlers) - ) + }, handler.awaitHandlers); + }) ); return new CallbackManagerForRetrieverRun( runId, @@ -1046,7 +1125,10 @@ export class CallbackManager const verboseEnabled = getEnvironmentVariable("LANGCHAIN_VERBOSE") === "true" || options?.verbose; - const tracingV2Enabled = isTracingEnabled(); + + const tracingV2Enabled = + LangChainTracer.getTraceableRunTree()?.tracingEnabled || + isTracingEnabled(); const tracingEnabled = tracingV2Enabled || @@ -1077,7 +1159,8 @@ export class CallbackManager // handoff between langchain and langsmith/traceable // override the parent run ID callbackManager._parentRunId = - tracerV2.getTraceableRunTree()?.id ?? callbackManager._parentRunId; + LangChainTracer.getTraceableRunTree()?.id ?? + callbackManager._parentRunId; } } } diff --git a/langchain-core/src/callbacks/tests/callbacks.test.ts b/langchain-core/src/callbacks/tests/callbacks.test.ts index 4f24161f9fcc..58644caa40e1 100644 --- a/langchain-core/src/callbacks/tests/callbacks.test.ts +++ b/langchain-core/src/callbacks/tests/callbacks.test.ts @@ -1,3 +1,4 @@ +/* eslint-disable no-promise-executor-return */ import { test, expect } from "@jest/globals"; import * as uuid from "uuid"; import { CallbackManager } from "../manager.js"; @@ -202,6 +203,9 @@ test("CallbackManager", async () => { ]); await retrieverCb.handleRetrieverError(new Error("test")); + // In case background mode is on while running this test + await new Promise((resolve) => setTimeout(resolve, 100)); + for (const handler of [handler1, handler2]) { expect(handler.starts).toBe(5); expect(handler.ends).toBe(5); diff --git a/langchain-core/src/runnables/base.ts b/langchain-core/src/runnables/base.ts index e66216584edc..2f60d739eae7 100644 --- a/langchain-core/src/runnables/base.ts +++ b/langchain-core/src/runnables/base.ts @@ -2298,7 +2298,7 @@ export class RunnableLambda extends Runnable< callbacks: runManager?.getChild(), recursionLimit: (config?.recursionLimit ?? DEFAULT_RECURSION_LIMIT) - 1, }); - void AsyncLocalStorageProviderSingleton.getInstance().run( + void AsyncLocalStorageProviderSingleton.runWithConfig( childConfig, async () => { try { @@ -2395,7 +2395,7 @@ export class RunnableLambda extends Runnable< }); const output = await new Promise( (resolve, reject) => { - void AsyncLocalStorageProviderSingleton.getInstance().run( + void AsyncLocalStorageProviderSingleton.runWithConfig( childConfig, async () => { try { diff --git a/langchain-core/src/runnables/config.ts b/langchain-core/src/runnables/config.ts index 20bf6a94f506..409d556eac8d 100644 --- a/langchain-core/src/runnables/config.ts +++ b/langchain-core/src/runnables/config.ts @@ -1,31 +1,13 @@ -import { - type BaseCallbackConfig, - CallbackManager, - ensureHandler, -} from "../callbacks/manager.js"; +import { CallbackManager, ensureHandler } from "../callbacks/manager.js"; import { AsyncLocalStorageProviderSingleton } from "../singletons/index.js"; +import { RunnableConfig } from "./types.js"; export const DEFAULT_RECURSION_LIMIT = 25; -export interface RunnableConfig extends BaseCallbackConfig { - /** - * Runtime values for attributes previously made configurable on this Runnable, - * or sub-Runnables. - */ - // eslint-disable-next-line @typescript-eslint/no-explicit-any - configurable?: Record; - - /** - * Maximum number of times a call can recurse. If not provided, defaults to 25. - */ - recursionLimit?: number; - - /** Maximum number of parallel calls to make. */ - maxConcurrency?: number; -} +export { type RunnableConfig }; export async function getCallbackManagerForConfig(config?: RunnableConfig) { - return CallbackManager.configure( + return CallbackManager._configureSync( config?.callbacks, undefined, config?.tags, @@ -119,15 +101,11 @@ const PRIMITIVES = new Set(["string", "number", "boolean"]); /** * Ensure that a passed config is an object with all required keys present. - * - * Note: To make sure async local storage loading works correctly, this - * should not be called with a default or prepopulated config argument. */ export function ensureConfig( config?: CallOptions ): CallOptions { - const implicitConfig = - AsyncLocalStorageProviderSingleton.getInstance().getStore(); + const implicitConfig = AsyncLocalStorageProviderSingleton.getRunnableConfig(); let empty: RunnableConfig = { tags: [], metadata: {}, @@ -135,10 +113,10 @@ export function ensureConfig( runId: undefined, }; if (implicitConfig) { - // Don't allow runId to be loaded implicitly, as this can cause + // Don't allow runId and runName to be loaded implicitly, as this can cause // child runs to improperly inherit their parents' run ids. // eslint-disable-next-line @typescript-eslint/no-unused-vars - const { runId, ...rest } = implicitConfig; + const { runId, runName, ...rest } = implicitConfig; empty = Object.entries(rest).reduce( // eslint-disable-next-line @typescript-eslint/no-explicit-any (currentConfig: Record, [key, value]) => { diff --git a/langchain-core/src/runnables/iter.ts b/langchain-core/src/runnables/iter.ts index 33df40c5991f..52b7a61db06a 100644 --- a/langchain-core/src/runnables/iter.ts +++ b/langchain-core/src/runnables/iter.ts @@ -34,9 +34,12 @@ export function* consumeIteratorInContext( context: Partial | undefined, iter: IterableIterator ): IterableIterator { - const storage = AsyncLocalStorageProviderSingleton.getInstance(); while (true) { - const { value, done } = storage.run(context, iter.next.bind(iter)); + const { value, done } = AsyncLocalStorageProviderSingleton.runWithConfig( + context, + iter.next.bind(iter), + true + ); if (done) { break; } else { @@ -49,13 +52,14 @@ export async function* consumeAsyncIterableInContext( context: Partial | undefined, iter: AsyncIterable ): AsyncIterableIterator { - const storage = AsyncLocalStorageProviderSingleton.getInstance(); const iterator = iter[Symbol.asyncIterator](); while (true) { - const { value, done } = await storage.run( - context, - iterator.next.bind(iter) - ); + const { value, done } = + await AsyncLocalStorageProviderSingleton.runWithConfig( + context, + iterator.next.bind(iter), + true + ); if (done) { break; } else { diff --git a/langchain-core/src/runnables/types.ts b/langchain-core/src/runnables/types.ts index 0050a955f5b8..569e8aa26c0e 100644 --- a/langchain-core/src/runnables/types.ts +++ b/langchain-core/src/runnables/types.ts @@ -1,7 +1,7 @@ import type { z } from "zod"; -import type { RunnableConfig } from "./config.js"; import type { IterableReadableStreamInterface } from "../utils/stream.js"; import type { SerializableInterface } from "../load/serializable.js"; +import type { BaseCallbackConfig } from "../callbacks/manager.js"; export type RunnableBatchOptions = { /** @deprecated Pass in via the standard runnable config object instead */ @@ -73,3 +73,20 @@ export interface Node { id: string; data: RunnableIOSchema | RunnableInterface; } + +export interface RunnableConfig extends BaseCallbackConfig { + /** + * Runtime values for attributes previously made configurable on this Runnable, + * or sub-Runnables. + */ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + configurable?: Record; + + /** + * Maximum number of times a call can recurse. If not provided, defaults to 25. + */ + recursionLimit?: number; + + /** Maximum number of parallel calls to make. */ + maxConcurrency?: number; +} diff --git a/langchain-core/src/singletons/index.ts b/langchain-core/src/singletons/index.ts index c01af26a62e8..3b4740591e50 100644 --- a/langchain-core/src/singletons/index.ts +++ b/langchain-core/src/singletons/index.ts @@ -1,4 +1,7 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ +import { RunTree } from "langsmith"; +import { CallbackManager } from "../callbacks/manager.js"; +import { LangChainTracer } from "../tracers/tracer_langchain.js"; export interface AsyncLocalStorageInterface { getStore: () => any | undefined; @@ -18,17 +21,61 @@ export class MockAsyncLocalStorage implements AsyncLocalStorageInterface { const mockAsyncLocalStorage = new MockAsyncLocalStorage(); +const TRACING_ALS_KEY = Symbol.for("ls:tracing_async_local_storage"); +const LC_CHILD_KEY = Symbol.for("lc:child_config"); + class AsyncLocalStorageProvider { getInstance(): AsyncLocalStorageInterface { - return ( - (globalThis as any).__lc_tracing_async_local_storage ?? - mockAsyncLocalStorage + return (globalThis as any)[TRACING_ALS_KEY] ?? mockAsyncLocalStorage; + } + + getRunnableConfig() { + const storage = this.getInstance(); + // this has the runnable config + // which means that we should also have an instance of a LangChainTracer + // with the run map prepopulated + return storage.getStore()?.extra?.[LC_CHILD_KEY]; + } + + runWithConfig( + config: any, + callback: () => T, + avoidCreatingRootRunTree?: boolean + ): T { + const callbackManager = CallbackManager._configureSync( + config?.callbacks, + undefined, + config?.tags, + undefined, + config?.metadata ); + const storage = this.getInstance(); + const parentRunId = callbackManager?.getParentRunId(); + + const langChainTracer = callbackManager?.handlers?.find( + (handler) => handler?.name === "langchain_tracer" + ) as LangChainTracer | undefined; + + let runTree; + if (langChainTracer && parentRunId) { + runTree = langChainTracer.convertToRunTree(parentRunId); + } else if (!avoidCreatingRootRunTree) { + runTree = new RunTree({ + name: "", + tracingEnabled: false, + }); + } + + if (runTree) { + runTree.extra = { ...runTree.extra, [LC_CHILD_KEY]: config }; + } + + return storage.run(runTree, callback); } initializeGlobalInstance(instance: AsyncLocalStorageInterface) { - if ((globalThis as any).__lc_tracing_async_local_storage === undefined) { - (globalThis as any).__lc_tracing_async_local_storage = instance; + if ((globalThis as any)[TRACING_ALS_KEY] === undefined) { + (globalThis as any)[TRACING_ALS_KEY] = instance; } } } diff --git a/langchain-core/src/singletons/tests/async_local_storage.test.ts b/langchain-core/src/singletons/tests/async_local_storage.test.ts index 180f7c947da8..4cb5a2ea77f9 100644 --- a/langchain-core/src/singletons/tests/async_local_storage.test.ts +++ b/langchain-core/src/singletons/tests/async_local_storage.test.ts @@ -140,7 +140,6 @@ test("Runnable streamEvents method with streaming nested in a RunnableLambda", a AsyncLocalStorageProviderSingleton.initializeGlobalInstance( new AsyncLocalStorage() ); - const asyncLocalStorage = AsyncLocalStorageProviderSingleton.getInstance(); const chat = new FakeListChatModel({ responses: ["Hello"], }); @@ -150,7 +149,7 @@ test("Runnable streamEvents method with streaming nested in a RunnableLambda", a const dummyHandler = new FakeCallbackHandler(); const myFunc = async (input: string) => { const outerCallbackManager = await getCallbackManagerForConfig( - asyncLocalStorage.getStore() + AsyncLocalStorageProviderSingleton.getRunnableConfig() ); expect(outerCallbackManager?.getParentRunId()).toEqual(outerRunId); @@ -167,7 +166,7 @@ test("Runnable streamEvents method with streaming nested in a RunnableLambda", a const nestedLambdaWithoutOverriddenCallbacks = RunnableLambda.from( async (_: string, config) => { const innerCallbackManager = await getCallbackManagerForConfig( - asyncLocalStorage.getStore() + AsyncLocalStorageProviderSingleton.getRunnableConfig() ); expect(innerCallbackManager?.getParentRunId()).toEqual(innerRunId2); expect(config?.callbacks?.handlers).toContain(dummyHandler); diff --git a/langchain-core/src/tracers/base.ts b/langchain-core/src/tracers/base.ts index 69cc098cdc55..c9eb56821a6c 100644 --- a/langchain-core/src/tracers/base.ts +++ b/langchain-core/src/tracers/base.ts @@ -60,6 +60,10 @@ function convertToDottedOrderFormat( ); } +export function isBaseTracer(x: BaseCallbackHandler): x is BaseTracer { + return typeof (x as BaseTracer)._addRunToRunMap === "function"; +} + export abstract class BaseTracer extends BaseCallbackHandler { protected runMap: Map = new Map(); @@ -90,7 +94,7 @@ export abstract class BaseTracer extends BaseCallbackHandler { parentRun.child_runs.push(childRun); } - protected async _startTrace(run: Run) { + _addRunToRunMap(run: Run) { const currentDottedOrder = convertToDottedOrderFormat( run.start_time, run.id, @@ -126,7 +130,7 @@ export abstract class BaseTracer extends BaseCallbackHandler { storedRun.dotted_order = currentDottedOrder; } this.runMap.set(storedRun.id, storedRun); - await this.onRunCreate?.(storedRun); + return storedRun; } protected async _endTrace(run: Run): Promise { @@ -154,7 +158,12 @@ export abstract class BaseTracer extends BaseCallbackHandler { return parentRun.child_execution_order + 1; } - async handleLLMStart( + /** + * Create and add a run to the run map for LLM start events. + * This must sometimes be done synchronously to avoid race conditions + * when callbacks are backgrounded, so we expose it as a separate method here. + */ + _createRunForLLMStart( llm: Serialized, prompts: string[], runId: string, @@ -163,7 +172,7 @@ export abstract class BaseTracer extends BaseCallbackHandler { tags?: string[], metadata?: KVMap, name?: string - ): Promise { + ) { const execution_order = this._getExecutionOrder(parentRunId); const start_time = Date.now(); const finalExtraParams = metadata @@ -189,13 +198,42 @@ export abstract class BaseTracer extends BaseCallbackHandler { extra: finalExtraParams ?? {}, tags: tags || [], }; + return this._addRunToRunMap(run); + } - await this._startTrace(run); + async handleLLMStart( + llm: Serialized, + prompts: string[], + runId: string, + parentRunId?: string, + extraParams?: KVMap, + tags?: string[], + metadata?: KVMap, + name?: string + ): Promise { + const run = + this.runMap.get(runId) ?? + this._createRunForLLMStart( + llm, + prompts, + runId, + parentRunId, + extraParams, + tags, + metadata, + name + ); + await this.onRunCreate?.(run); await this.onLLMStart?.(run); return run; } - async handleChatModelStart( + /** + * Create and add a run to the run map for chat model start events. + * This must sometimes be done synchronously to avoid race conditions + * when callbacks are backgrounded, so we expose it as a separate method here. + */ + _createRunForChatModelStart( llm: Serialized, messages: BaseMessage[][], runId: string, @@ -204,7 +242,7 @@ export abstract class BaseTracer extends BaseCallbackHandler { tags?: string[], metadata?: KVMap, name?: string - ): Promise { + ) { const execution_order = this._getExecutionOrder(parentRunId); const start_time = Date.now(); const finalExtraParams = metadata @@ -230,8 +268,32 @@ export abstract class BaseTracer extends BaseCallbackHandler { extra: finalExtraParams ?? {}, tags: tags || [], }; + return this._addRunToRunMap(run); + } - await this._startTrace(run); + async handleChatModelStart( + llm: Serialized, + messages: BaseMessage[][], + runId: string, + parentRunId?: string, + extraParams?: KVMap, + tags?: string[], + metadata?: KVMap, + name?: string + ): Promise { + const run = + this.runMap.get(runId) ?? + this._createRunForChatModelStart( + llm, + messages, + runId, + parentRunId, + extraParams, + tags, + metadata, + name + ); + await this.onRunCreate?.(run); await this.onLLMStart?.(run); return run; } @@ -268,7 +330,12 @@ export abstract class BaseTracer extends BaseCallbackHandler { return run; } - async handleChainStart( + /** + * Create and add a run to the run map for chain start events. + * This must sometimes be done synchronously to avoid race conditions + * when callbacks are backgrounded, so we expose it as a separate method here. + */ + _createRunForChainStart( chain: Serialized, inputs: ChainValues, runId: string, @@ -277,7 +344,7 @@ export abstract class BaseTracer extends BaseCallbackHandler { metadata?: KVMap, runType?: string, name?: string - ): Promise { + ) { const execution_order = this._getExecutionOrder(parentRunId); const start_time = Date.now(); const run: Run = { @@ -300,7 +367,32 @@ export abstract class BaseTracer extends BaseCallbackHandler { extra: metadata ? { metadata } : {}, tags: tags || [], }; - await this._startTrace(run); + return this._addRunToRunMap(run); + } + + async handleChainStart( + chain: Serialized, + inputs: ChainValues, + runId: string, + parentRunId?: string, + tags?: string[], + metadata?: KVMap, + runType?: string, + name?: string + ): Promise { + const run = + this.runMap.get(runId) ?? + this._createRunForChainStart( + chain, + inputs, + runId, + parentRunId, + tags, + metadata, + runType, + name + ); + await this.onRunCreate?.(run); await this.onChainStart?.(run); return run; } @@ -355,7 +447,12 @@ export abstract class BaseTracer extends BaseCallbackHandler { return run; } - async handleToolStart( + /** + * Create and add a run to the run map for tool start events. + * This must sometimes be done synchronously to avoid race conditions + * when callbacks are backgrounded, so we expose it as a separate method here. + */ + _createRunForToolStart( tool: Serialized, input: string, runId: string, @@ -363,7 +460,7 @@ export abstract class BaseTracer extends BaseCallbackHandler { tags?: string[], metadata?: KVMap, name?: string - ): Promise { + ) { const execution_order = this._getExecutionOrder(parentRunId); const start_time = Date.now(); const run: Run = { @@ -386,8 +483,30 @@ export abstract class BaseTracer extends BaseCallbackHandler { extra: metadata ? { metadata } : {}, tags: tags || [], }; + return this._addRunToRunMap(run); + } - await this._startTrace(run); + async handleToolStart( + tool: Serialized, + input: string, + runId: string, + parentRunId?: string, + tags?: string[], + metadata?: KVMap, + name?: string + ): Promise { + const run = + this.runMap.get(runId) ?? + this._createRunForToolStart( + tool, + input, + runId, + parentRunId, + tags, + metadata, + name + ); + await this.onRunCreate?.(run); await this.onToolStart?.(run); return run; } @@ -454,7 +573,12 @@ export abstract class BaseTracer extends BaseCallbackHandler { await this.onAgentEnd?.(run); } - async handleRetrieverStart( + /** + * Create and add a run to the run map for retriever start events. + * This must sometimes be done synchronously to avoid race conditions + * when callbacks are backgrounded, so we expose it as a separate method here. + */ + _createRunForRetrieverStart( retriever: Serialized, query: string, runId: string, @@ -462,7 +586,7 @@ export abstract class BaseTracer extends BaseCallbackHandler { tags?: string[], metadata?: KVMap, name?: string - ): Promise { + ) { const execution_order = this._getExecutionOrder(parentRunId); const start_time = Date.now(); const run: Run = { @@ -485,8 +609,30 @@ export abstract class BaseTracer extends BaseCallbackHandler { extra: metadata ? { metadata } : {}, tags: tags || [], }; + return this._addRunToRunMap(run); + } - await this._startTrace(run); + async handleRetrieverStart( + retriever: Serialized, + query: string, + runId: string, + parentRunId?: string, + tags?: string[], + metadata?: KVMap, + name?: string + ): Promise { + const run = + this.runMap.get(runId) ?? + this._createRunForRetrieverStart( + retriever, + query, + runId, + parentRunId, + tags, + metadata, + name + ); + await this.onRunCreate?.(run); await this.onRetrieverStart?.(run); return run; } diff --git a/langchain-core/src/tracers/tests/langsmith_interop.test.ts b/langchain-core/src/tracers/tests/langsmith_interop.test.ts new file mode 100644 index 000000000000..4fd4f123fd97 --- /dev/null +++ b/langchain-core/src/tracers/tests/langsmith_interop.test.ts @@ -0,0 +1,615 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +/* eslint-disable no-process-env */ +/* eslint-disable no-promise-executor-return */ + +import { jest } from "@jest/globals"; +import { traceable } from "langsmith/traceable"; + +import { RunnableLambda } from "../../runnables/base.js"; +import { BaseMessage, HumanMessage } from "../../messages/index.js"; + +let fetchMock: any; + +const originalTracingEnvValue = process.env.LANGCHAIN_TRACING_V2; + +beforeEach(() => { + fetchMock = jest + .spyOn(global, "fetch") + .mockImplementation(() => + Promise.resolve({ ok: true, text: () => "" } as any) + ); + process.env.LANGCHAIN_TRACING_V2 = "true"; +}); + +afterEach(() => { + jest.restoreAllMocks(); +}); + +afterAll(() => { + process.env.LANGCHAIN_TRACING_V2 = originalTracingEnvValue; +}); + +test.each(["true", "false"])( + "traceables nested within runnables with background callbacks %s", + async (value) => { + process.env.LANGCHAIN_CALLBACKS_BACKGROUND = value; + + const aiGreet = traceable( + async (msg: BaseMessage, name = "world") => { + await new Promise((resolve) => setTimeout(resolve, 300)); + return msg.content + name; + }, + { name: "aiGreet", tracingEnabled: true } + ); + + const root = RunnableLambda.from(async (messages: BaseMessage[]) => { + const lastMsg = messages.at(-1) as HumanMessage; + const greetOne = await aiGreet(lastMsg, "David"); + + return [greetOne]; + }); + + await root.invoke([new HumanMessage({ content: "Hello!" })]); + + const relevantCalls = fetchMock.mock.calls.filter((call: any) => { + return call[0].startsWith("https://api.smith.langchain.com/runs"); + }); + + expect(relevantCalls.length).toEqual(4); + const firstCallParams = JSON.parse((relevantCalls[0][1] as any).body); + const secondCallParams = JSON.parse((relevantCalls[1][1] as any).body); + const thirdCallParams = JSON.parse((relevantCalls[2][1] as any).body); + const fourthCallParams = JSON.parse((relevantCalls[3][1] as any).body); + expect(firstCallParams).toMatchObject({ + id: firstCallParams.id, + name: "RunnableLambda", + start_time: expect.any(Number), + serialized: { + lc: 1, + type: "not_implemented", + id: ["langchain_core", "runnables", "RunnableLambda"], + }, + events: [{ name: "start", time: expect.any(String) }], + inputs: { + input: [ + { + lc: 1, + type: "constructor", + id: ["langchain_core", "messages", "HumanMessage"], + kwargs: { + content: "Hello!", + additional_kwargs: {}, + response_metadata: {}, + }, + }, + ], + }, + execution_order: 1, + child_execution_order: 1, + run_type: "chain", + extra: expect.any(Object), + tags: [], + trace_id: firstCallParams.id, + dotted_order: expect.any(String), + }); + expect(secondCallParams).toMatchObject({ + id: expect.any(String), + name: "aiGreet", + start_time: expect.any(Number), + run_type: "chain", + extra: expect.any(Object), + serialized: {}, + inputs: { + args: [ + { + lc: 1, + type: "constructor", + id: ["langchain_core", "messages", "HumanMessage"], + kwargs: { + content: "Hello!", + additional_kwargs: {}, + response_metadata: {}, + }, + }, + "David", + ], + }, + child_runs: [], + parent_run_id: firstCallParams.id, + trace_id: firstCallParams.id, + dotted_order: expect.stringContaining(`${firstCallParams.dotted_order}.`), + tags: [], + }); + expect(thirdCallParams).toMatchObject({ + end_time: expect.any(Number), + inputs: { + args: [ + { + lc: 1, + type: "constructor", + id: ["langchain_core", "messages", "HumanMessage"], + kwargs: { + content: "Hello!", + additional_kwargs: {}, + response_metadata: {}, + }, + }, + "David", + ], + }, + outputs: { outputs: "Hello!David" }, + parent_run_id: firstCallParams.id, + extra: expect.any(Object), + dotted_order: secondCallParams.dotted_order, + trace_id: firstCallParams.id, + tags: [], + }); + expect(fourthCallParams).toMatchObject({ + end_time: expect.any(Number), + outputs: { output: ["Hello!David"] }, + events: [ + { name: "start", time: expect.any(String) }, + { name: "end", time: expect.any(String) }, + ], + inputs: { + input: [ + { + lc: 1, + type: "constructor", + id: ["langchain_core", "messages", "HumanMessage"], + kwargs: { + content: "Hello!", + additional_kwargs: {}, + response_metadata: {}, + }, + }, + ], + }, + trace_id: firstCallParams.id, + dotted_order: firstCallParams.dotted_order, + }); + } +); + +test.each(["true", "false"])( + "streaming traceables nested within runnables with background callbacks %s", + async (value) => { + process.env.LANGCHAIN_CALLBACKS_BACKGROUND = value; + + const aiGreet = traceable( + async function* (msg: BaseMessage, name = "world") { + const res = msg.content + name; + await new Promise((resolve) => setTimeout(resolve, 300)); + for (const letter of res.split("")) { + yield letter; + } + }, + { name: "aiGreet" } + ); + + const root = RunnableLambda.from(async function* (messages: BaseMessage[]) { + const lastMsg = messages.at(-1) as HumanMessage; + yield* aiGreet(lastMsg, "David"); + }); + + const stream = await root.stream([new HumanMessage({ content: "Hello!" })]); + + for await (const _ of stream) { + // Just consume iterator + } + + const relevantCalls = fetchMock.mock.calls.filter((call: any) => { + return call[0].startsWith("https://api.smith.langchain.com/runs"); + }); + + expect(relevantCalls.length).toEqual(4); + const firstCallParams = JSON.parse((relevantCalls[0][1] as any).body); + const secondCallParams = JSON.parse((relevantCalls[1][1] as any).body); + const thirdCallParams = JSON.parse((relevantCalls[2][1] as any).body); + const fourthCallParams = JSON.parse((relevantCalls[3][1] as any).body); + expect(firstCallParams).toMatchObject({ + id: firstCallParams.id, + name: "RunnableLambda", + start_time: expect.any(Number), + serialized: { + lc: 1, + type: "not_implemented", + id: ["langchain_core", "runnables", "RunnableLambda"], + }, + events: [{ name: "start", time: expect.any(String) }], + inputs: { + input: "", + }, + execution_order: 1, + child_execution_order: 1, + run_type: "chain", + extra: expect.any(Object), + tags: [], + trace_id: firstCallParams.id, + dotted_order: expect.any(String), + }); + expect(secondCallParams).toMatchObject({ + id: expect.any(String), + name: "aiGreet", + start_time: expect.any(Number), + run_type: "chain", + extra: expect.any(Object), + serialized: {}, + inputs: { + args: [ + { + lc: 1, + type: "constructor", + id: ["langchain_core", "messages", "HumanMessage"], + kwargs: { + content: "Hello!", + additional_kwargs: {}, + response_metadata: {}, + }, + }, + "David", + ], + }, + child_runs: [], + parent_run_id: firstCallParams.id, + trace_id: firstCallParams.id, + dotted_order: expect.stringContaining(`${firstCallParams.dotted_order}.`), + tags: [], + }); + expect(thirdCallParams).toMatchObject({ + end_time: expect.any(Number), + inputs: { + args: [ + { + lc: 1, + type: "constructor", + id: ["langchain_core", "messages", "HumanMessage"], + kwargs: { + content: "Hello!", + additional_kwargs: {}, + response_metadata: {}, + }, + }, + "David", + ], + }, + outputs: { + outputs: ["H", "e", "l", "l", "o", "!", "D", "a", "v", "i", "d"], + }, + parent_run_id: firstCallParams.id, + extra: expect.any(Object), + dotted_order: secondCallParams.dotted_order, + trace_id: firstCallParams.id, + tags: [], + }); + expect(fourthCallParams).toMatchObject({ + end_time: expect.any(Number), + outputs: { output: "Hello!David" }, + events: [ + { name: "start", time: expect.any(String) }, + { name: "end", time: expect.any(String) }, + ], + inputs: { + input: [ + { + lc: 1, + type: "constructor", + id: ["langchain_core", "messages", "HumanMessage"], + kwargs: { + content: "Hello!", + additional_kwargs: {}, + response_metadata: {}, + }, + }, + ], + }, + trace_id: firstCallParams.id, + dotted_order: firstCallParams.dotted_order, + }); + } +); + +test.each(["true", "false"])( + "runnables nested within traceables with background callbacks %s", + async (value) => { + process.env.LANGCHAIN_CALLBACKS_BACKGROUND = value; + + const nested = RunnableLambda.from(async (messages: BaseMessage[]) => { + const lastMsg = messages.at(-1) as HumanMessage; + await new Promise((resolve) => setTimeout(resolve, 300)); + return [lastMsg.content]; + }); + + const aiGreet = traceable( + async (msg: BaseMessage, name = "world") => { + const contents = await nested.invoke([msg]); + return contents[0] + name; + }, + { name: "aiGreet", tracingEnabled: true } + ); + + await aiGreet(new HumanMessage({ content: "Hello!" }), "mitochondria"); + + const relevantCalls = fetchMock.mock.calls.filter((call: any) => { + return call[0].startsWith("https://api.smith.langchain.com/runs"); + }); + + expect(relevantCalls.length).toEqual(4); + const firstCallParams = JSON.parse((relevantCalls[0][1] as any).body); + const secondCallParams = JSON.parse((relevantCalls[1][1] as any).body); + const thirdCallParams = JSON.parse((relevantCalls[2][1] as any).body); + const fourthCallParams = JSON.parse((relevantCalls[3][1] as any).body); + expect(firstCallParams).toMatchObject({ + id: firstCallParams.id, + name: "aiGreet", + start_time: expect.any(Number), + run_type: "chain", + extra: expect.any(Object), + serialized: {}, + inputs: { + args: [ + { + lc: 1, + type: "constructor", + id: ["langchain_core", "messages", "HumanMessage"], + kwargs: { + content: "Hello!", + additional_kwargs: {}, + response_metadata: {}, + }, + }, + "mitochondria", + ], + }, + child_runs: [], + trace_id: firstCallParams.id, + dotted_order: firstCallParams.dotted_order, + tags: [], + }); + expect(secondCallParams).toMatchObject({ + id: secondCallParams.id, + name: "RunnableLambda", + parent_run_id: firstCallParams.id, + start_time: expect.any(Number), + serialized: { + lc: 1, + type: "not_implemented", + id: ["langchain_core", "runnables", "RunnableLambda"], + }, + events: [{ name: "start", time: expect.any(String) }], + inputs: { + input: [ + { + lc: 1, + type: "constructor", + id: ["langchain_core", "messages", "HumanMessage"], + kwargs: { + content: "Hello!", + additional_kwargs: {}, + response_metadata: {}, + }, + }, + ], + }, + execution_order: 2, + child_execution_order: 2, + run_type: "chain", + extra: expect.any(Object), + tags: [], + trace_id: firstCallParams.id, + dotted_order: expect.stringContaining(`${firstCallParams.dotted_order}.`), + }); + expect(thirdCallParams).toMatchObject({ + end_time: expect.any(Number), + outputs: { output: ["Hello!"] }, + events: [ + { name: "start", time: expect.any(String) }, + { name: "end", time: expect.any(String) }, + ], + inputs: { + input: [ + { + lc: 1, + type: "constructor", + id: ["langchain_core", "messages", "HumanMessage"], + kwargs: { + content: "Hello!", + additional_kwargs: {}, + response_metadata: {}, + }, + }, + ], + }, + trace_id: firstCallParams.id, + dotted_order: expect.stringContaining(`${firstCallParams.dotted_order}.`), + parent_run_id: firstCallParams.id, + }); + expect(fourthCallParams).toMatchObject({ + end_time: expect.any(Number), + inputs: { + args: [ + { + lc: 1, + type: "constructor", + id: ["langchain_core", "messages", "HumanMessage"], + kwargs: { + content: "Hello!", + additional_kwargs: {}, + response_metadata: {}, + }, + }, + "mitochondria", + ], + }, + outputs: { outputs: "Hello!mitochondria" }, + extra: expect.any(Object), + dotted_order: firstCallParams.dotted_order, + trace_id: firstCallParams.id, + tags: [], + }); + } +); + +test.each(["true", "false"])( + "streaming runnables nested within traceables with background callbacks %s", + async (value) => { + process.env.LANGCHAIN_CALLBACKS_BACKGROUND = value; + + const nested = RunnableLambda.from(async function* ( + messages: BaseMessage[] + ) { + const lastMsg = messages.at(-1) as HumanMessage; + await new Promise((resolve) => setTimeout(resolve, 300)); + for (const letter of (lastMsg.content as string).split("")) { + yield letter; + } + }); + + const aiGreet = traceable( + async function* (msg: BaseMessage, name = "world") { + for await (const chunk of await nested.stream([msg])) { + yield chunk; + } + for (const letter of name.split("")) { + yield letter; + } + }, + { name: "aiGreet", tracingEnabled: true } + ); + + for await (const _ of aiGreet( + new HumanMessage({ content: "Hello!" }), + "mitochondria" + )) { + // Just consume iterator + } + + const relevantCalls = fetchMock.mock.calls.filter((call: any) => { + return call[0].startsWith("https://api.smith.langchain.com/runs"); + }); + + expect(relevantCalls.length).toEqual(4); + const firstCallParams = JSON.parse((relevantCalls[0][1] as any).body); + const secondCallParams = JSON.parse((relevantCalls[1][1] as any).body); + const thirdCallParams = JSON.parse((relevantCalls[2][1] as any).body); + const fourthCallParams = JSON.parse((relevantCalls[3][1] as any).body); + expect(firstCallParams).toMatchObject({ + id: firstCallParams.id, + name: "aiGreet", + start_time: expect.any(Number), + run_type: "chain", + extra: expect.any(Object), + serialized: {}, + inputs: { + args: [ + { + lc: 1, + type: "constructor", + id: ["langchain_core", "messages", "HumanMessage"], + kwargs: { + content: "Hello!", + additional_kwargs: {}, + response_metadata: {}, + }, + }, + "mitochondria", + ], + }, + child_runs: [], + trace_id: firstCallParams.id, + dotted_order: firstCallParams.dotted_order, + tags: [], + }); + expect(secondCallParams).toMatchObject({ + id: secondCallParams.id, + name: "RunnableLambda", + parent_run_id: firstCallParams.id, + start_time: expect.any(Number), + serialized: { + lc: 1, + type: "not_implemented", + id: ["langchain_core", "runnables", "RunnableLambda"], + }, + events: [{ name: "start", time: expect.any(String) }], + inputs: { + input: "", + }, + execution_order: 2, + child_execution_order: 2, + run_type: "chain", + extra: expect.any(Object), + tags: [], + trace_id: firstCallParams.id, + dotted_order: expect.stringContaining(`${firstCallParams.dotted_order}.`), + }); + expect(thirdCallParams).toMatchObject({ + end_time: expect.any(Number), + outputs: { output: "Hello!" }, + events: [ + { name: "start", time: expect.any(String) }, + { name: "end", time: expect.any(String) }, + ], + inputs: { + input: [ + { + lc: 1, + type: "constructor", + id: ["langchain_core", "messages", "HumanMessage"], + kwargs: { + content: "Hello!", + additional_kwargs: {}, + response_metadata: {}, + }, + }, + ], + }, + trace_id: firstCallParams.id, + dotted_order: expect.stringContaining(`${firstCallParams.dotted_order}.`), + parent_run_id: firstCallParams.id, + }); + expect(fourthCallParams).toMatchObject({ + end_time: expect.any(Number), + inputs: { + args: [ + { + lc: 1, + type: "constructor", + id: ["langchain_core", "messages", "HumanMessage"], + kwargs: { + content: "Hello!", + additional_kwargs: {}, + response_metadata: {}, + }, + }, + "mitochondria", + ], + }, + outputs: { + outputs: [ + "H", + "e", + "l", + "l", + "o", + "!", + "m", + "i", + "t", + "o", + "c", + "h", + "o", + "n", + "d", + "r", + "i", + "a", + ], + }, + extra: expect.any(Object), + dotted_order: firstCallParams.dotted_order, + trace_id: firstCallParams.id, + tags: [], + }); + } +); diff --git a/langchain-core/src/tracers/tracer_langchain.ts b/langchain-core/src/tracers/tracer_langchain.ts index a85ddf29a523..3e4fa6a54bb7 100644 --- a/langchain-core/src/tracers/tracer_langchain.ts +++ b/langchain-core/src/tracers/tracer_langchain.ts @@ -61,38 +61,9 @@ export class LangChainTracer this.exampleId = exampleId; this.client = client ?? new Client({}); - // if we're inside traceable, we can obtain the traceable tree - // and populate the run map, which is used to correctly - // infer dotted order and execution order - const traceableTree = this.getTraceableRunTree(); + const traceableTree = LangChainTracer.getTraceableRunTree(); if (traceableTree) { - let rootRun: RunTree = traceableTree; - const visited = new Set(); - while (rootRun.parent_run) { - if (visited.has(rootRun.id)) break; - visited.add(rootRun.id); - - if (!rootRun.parent_run) break; - rootRun = rootRun.parent_run as RunTree; - } - visited.clear(); - - const queue = [rootRun]; - while (queue.length > 0) { - const current = queue.shift(); - if (!current || visited.has(current.id)) continue; - visited.add(current.id); - - // @ts-expect-error Types of property 'events' are incompatible. - this.runMap.set(current.id, current); - if (current.child_runs) { - queue.push(...current.child_runs); - } - } - - this.client = traceableTree.client ?? this.client; - this.projectName = traceableTree.project_name ?? this.projectName; - this.exampleId = traceableTree.reference_example_id ?? this.exampleId; + this.updateFromRunTree(traceableTree); } } @@ -140,7 +111,83 @@ export class LangChainTracer return this.runMap.get(id); } - getTraceableRunTree(): RunTree | undefined { + updateFromRunTree(runTree: RunTree) { + let rootRun: RunTree = runTree; + const visited = new Set(); + while (rootRun.parent_run) { + if (visited.has(rootRun.id)) break; + visited.add(rootRun.id); + + if (!rootRun.parent_run) break; + rootRun = rootRun.parent_run as RunTree; + } + visited.clear(); + + const queue = [rootRun]; + while (queue.length > 0) { + const current = queue.shift(); + if (!current || visited.has(current.id)) continue; + visited.add(current.id); + + // @ts-expect-error Types of property 'events' are incompatible. + this.runMap.set(current.id, current); + if (current.child_runs) { + queue.push(...current.child_runs); + } + } + + this.client = runTree.client ?? this.client; + this.projectName = runTree.project_name ?? this.projectName; + this.exampleId = runTree.reference_example_id ?? this.exampleId; + } + + convertToRunTree(id: string): RunTree | undefined { + const runTreeMap: Record = {}; + const runTreeList: [id: string, dotted_order: string | undefined][] = []; + for (const [id, run] of this.runMap) { + // by converting the run map to a run tree, we are doing a copy + // thus, any mutation performed on the run tree will not be reflected + // back in the run map + // TODO: Stop using `this.runMap` in favour of LangSmith's `RunTree` + const runTree = new RunTree({ + ...run, + child_runs: [], + parent_run: undefined, + + // inherited properties + client: this.client, + project_name: this.projectName, + reference_example_id: this.exampleId, + tracingEnabled: true, + }); + + runTreeMap[id] = runTree; + runTreeList.push([id, run.dotted_order]); + } + + runTreeList.sort((a, b) => { + if (!a[1] || !b[1]) return 0; + return a[1].localeCompare(b[1]); + }); + + for (const [id] of runTreeList) { + const run = this.runMap.get(id); + const runTree = runTreeMap[id]; + if (!run || !runTree) continue; + + if (run.parent_run_id) { + const parentRunTree = runTreeMap[run.parent_run_id]; + if (parentRunTree) { + parentRunTree.child_runs.push(runTree); + runTree.parent_run = parentRunTree; + } + } + } + + return runTreeMap[id]; + } + + static getTraceableRunTree(): RunTree | undefined { try { return getCurrentRunTree(); } catch { diff --git a/langchain-core/src/utils/stream.ts b/langchain-core/src/utils/stream.ts index dc64d5468d70..234cec3b900f 100644 --- a/langchain-core/src/utils/stream.ts +++ b/langchain-core/src/utils/stream.ts @@ -1,8 +1,8 @@ -// Make this a type to override ReadableStream's async iterator type in case -// the popular web-streams-polyfill is imported - the supplied types import { AsyncLocalStorageProviderSingleton } from "../singletons/index.js"; -// in this case don't quite match. +// Make this a type to override ReadableStream's async iterator type in case +// the popular web-streams-polyfill is imported - the supplied types +// in that case don't quite match. export type IterableReadableStreamInterface = ReadableStream & AsyncIterable; @@ -202,15 +202,18 @@ export class AsyncGeneratorWithSetup< // needs to happen in logical order, ie. in the order in which input to // to each generator is available. this.setup = new Promise((resolve, reject) => { - const storage = AsyncLocalStorageProviderSingleton.getInstance(); - void storage.run(params.config, async () => { - this.firstResult = params.generator.next(); - if (params.startSetup) { - this.firstResult.then(params.startSetup).then(resolve, reject); - } else { - this.firstResult.then((_result) => resolve(undefined as S), reject); - } - }); + void AsyncLocalStorageProviderSingleton.runWithConfig( + params.config, + async () => { + this.firstResult = params.generator.next(); + if (params.startSetup) { + this.firstResult.then(params.startSetup).then(resolve, reject); + } else { + this.firstResult.then((_result) => resolve(undefined as S), reject); + } + }, + true + ); }); } @@ -220,10 +223,13 @@ export class AsyncGeneratorWithSetup< return this.firstResult; } - const storage = AsyncLocalStorageProviderSingleton.getInstance(); - return storage.run(this.config, async () => { - return this.generator.next(...args); - }); + return AsyncLocalStorageProviderSingleton.runWithConfig( + this.config, + async () => { + return this.generator.next(...args); + }, + true + ); } async return( @@ -260,7 +266,10 @@ export async function pipeGeneratorWithSetup< startSetup: () => Promise, ...args: A ) { - const gen = new AsyncGeneratorWithSetup({ generator, startSetup }); + const gen = new AsyncGeneratorWithSetup({ + generator, + startSetup, + }); const setup = await gen.setup; return { output: to(gen, setup, ...args), setup }; } diff --git a/yarn.lock b/yarn.lock index 3b1e15f3dddc..220b0fd245ba 100644 --- a/yarn.lock +++ b/yarn.lock @@ -11033,7 +11033,7 @@ __metadata: jest: ^29.5.0 jest-environment-node: ^29.6.4 js-tiktoken: ^1.0.12 - langsmith: ~0.1.30 + langsmith: ~0.1.39 ml-distance: ^4.0.0 ml-matrix: ^6.10.4 mustache: ^4.2.0 @@ -30361,9 +30361,9 @@ __metadata: languageName: unknown linkType: soft -"langsmith@npm:^0.1.30, langsmith@npm:~0.1.30": - version: 0.1.30 - resolution: "langsmith@npm:0.1.30" +"langsmith@npm:^0.1.30, langsmith@npm:~0.1.30, langsmith@npm:~0.1.39": + version: 0.1.39 + resolution: "langsmith@npm:0.1.39" dependencies: "@types/uuid": ^9.0.1 commander: ^10.0.1 @@ -30381,7 +30381,7 @@ __metadata: optional: true openai: optional: true - checksum: 61f4f645b0d95bf0ddec4a275a2fd6859a650569c1ca0d092b318dcabb96fc72d9ae45f35c20d53c2ff6c2d615a2a99f27bb5a974c44ae57c4f359783c26ee99 + checksum: df21332662ec3a2d2d5cf915acede52b96aedf2a286259435d683f230af5926500b129cab1f0275450e0d3de6d9d8476e410ac46f5e994beb43f2e2df8a1965f languageName: node linkType: hard