From 217d788222f11096fe0386be56463065ae3aee98 Mon Sep 17 00:00:00 2001 From: Brace Sproul Date: Thu, 5 Dec 2024 13:43:47 -0800 Subject: [PATCH] fix(langchain-core): Pick runnable config keys in asynclocalstorage (#7324) --- langchain-core/src/runnables/base.ts | 12 ++++++++---- langchain-core/src/runnables/config.ts | 18 ++++++++++++++++++ langchain-core/src/runnables/index.ts | 1 + langchain-core/src/runnables/iter.ts | 7 ++++--- langchain-core/src/runnables/types.ts | 2 +- langchain-core/src/tools/index.ts | 5 +++-- langchain-core/src/types/stream.ts | 5 +++++ langchain-core/src/utils/stream.ts | 18 +++++++++++------- 8 files changed, 51 insertions(+), 17 deletions(-) create mode 100644 langchain-core/src/types/stream.ts diff --git a/langchain-core/src/runnables/base.ts b/langchain-core/src/runnables/base.ts index 1cab402513ec..d2ba10b6f6bd 100644 --- a/langchain-core/src/runnables/base.ts +++ b/langchain-core/src/runnables/base.ts @@ -6,7 +6,11 @@ import { type TraceableFunction, isTraceableFunction, } from "langsmith/singletons/traceable"; -import type { RunnableInterface, RunnableBatchOptions } from "./types.js"; +import type { + RunnableInterface, + RunnableBatchOptions, + RunnableConfig, +} from "./types.js"; import { CallbackManagerForChainRun } from "../callbacks/manager.js"; import { LogStreamCallbackHandler, @@ -33,11 +37,11 @@ import { import { raceWithSignal } from "../utils/signal.js"; import { DEFAULT_RECURSION_LIMIT, - RunnableConfig, ensureConfig, getCallbackManagerForConfig, mergeConfigs, patchConfig, + pickRunnableConfigKeys, } from "./config.js"; import { AsyncCaller } from "../utils/async_caller.js"; import { Run } from "../tracers/base.js"; @@ -2529,7 +2533,7 @@ export class RunnableLambda< recursionLimit: (config?.recursionLimit ?? DEFAULT_RECURSION_LIMIT) - 1, }); void AsyncLocalStorageProviderSingleton.runWithConfig( - childConfig, + pickRunnableConfigKeys(childConfig), async () => { try { let output = await this.func(input, { @@ -2627,7 +2631,7 @@ export class RunnableLambda< const output = await new Promise( (resolve, reject) => { void AsyncLocalStorageProviderSingleton.runWithConfig( - childConfig, + pickRunnableConfigKeys(childConfig), async () => { try { const res = await this.func(finalChunk as RunInput, { diff --git a/langchain-core/src/runnables/config.ts b/langchain-core/src/runnables/config.ts index 8fa9a244ee3d..aae7164b5721 100644 --- a/langchain-core/src/runnables/config.ts +++ b/langchain-core/src/runnables/config.ts @@ -233,3 +233,21 @@ export function patchConfig( } return newConfig; } + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export function pickRunnableConfigKeys>( + config?: CallOptions +): Partial | undefined { + return config + ? { + configurable: config.configurable, + recursionLimit: config.recursionLimit, + callbacks: config.callbacks, + tags: config.tags, + metadata: config.metadata, + maxConcurrency: config.maxConcurrency, + timeout: config.timeout, + signal: config.signal, + } + : undefined; +} diff --git a/langchain-core/src/runnables/index.ts b/langchain-core/src/runnables/index.ts index 2a34d91c8164..7d78b1c5f75a 100644 --- a/langchain-core/src/runnables/index.ts +++ b/langchain-core/src/runnables/index.ts @@ -29,6 +29,7 @@ export { patchConfig, ensureConfig, mergeConfigs, + pickRunnableConfigKeys, } from "./config.js"; export { RunnablePassthrough } from "./passthrough.js"; export { type RouterInput, RouterRunnable } from "./router.js"; diff --git a/langchain-core/src/runnables/iter.ts b/langchain-core/src/runnables/iter.ts index 52b7a61db06a..4d7ead6efa60 100644 --- a/langchain-core/src/runnables/iter.ts +++ b/langchain-core/src/runnables/iter.ts @@ -1,5 +1,6 @@ +import type { RunnableConfig } from "../runnables/types.js"; import { AsyncLocalStorageProviderSingleton } from "../singletons/index.js"; -import { RunnableConfig } from "./config.js"; +import { pickRunnableConfigKeys } from "./config.js"; export function isIterableIterator( thing: unknown @@ -36,7 +37,7 @@ export function* consumeIteratorInContext( ): IterableIterator { while (true) { const { value, done } = AsyncLocalStorageProviderSingleton.runWithConfig( - context, + pickRunnableConfigKeys(context), iter.next.bind(iter), true ); @@ -56,7 +57,7 @@ export async function* consumeAsyncIterableInContext( while (true) { const { value, done } = await AsyncLocalStorageProviderSingleton.runWithConfig( - context, + pickRunnableConfigKeys(context), iterator.next.bind(iter), true ); diff --git a/langchain-core/src/runnables/types.ts b/langchain-core/src/runnables/types.ts index f40d80ee3831..f06e94fa1254 100644 --- a/langchain-core/src/runnables/types.ts +++ b/langchain-core/src/runnables/types.ts @@ -1,7 +1,7 @@ import type { z } from "zod"; -import type { IterableReadableStreamInterface } from "../utils/stream.js"; import type { SerializableInterface } from "../load/serializable.js"; import type { BaseCallbackConfig } from "../callbacks/manager.js"; +import type { IterableReadableStreamInterface } from "../types/stream.js"; export type RunnableBatchOptions = { /** @deprecated Pass in via the standard runnable config object instead */ diff --git a/langchain-core/src/tools/index.ts b/langchain-core/src/tools/index.ts index 348e85103904..8ce02d28c935 100644 --- a/langchain-core/src/tools/index.ts +++ b/langchain-core/src/tools/index.ts @@ -12,6 +12,7 @@ import { import { ensureConfig, patchConfig, + pickRunnableConfigKeys, type RunnableConfig, } from "../runnables/config.js"; import type { RunnableFunc, RunnableInterface } from "../runnables/base.js"; @@ -594,7 +595,7 @@ export function tool< callbacks: runManager?.getChild(), }); void AsyncLocalStorageProviderSingleton.runWithConfig( - childConfig, + pickRunnableConfigKeys(childConfig), async () => { try { // TS doesn't restrict the type here based on the guard above @@ -625,7 +626,7 @@ export function tool< callbacks: runManager?.getChild(), }); void AsyncLocalStorageProviderSingleton.runWithConfig( - childConfig, + pickRunnableConfigKeys(childConfig), async () => { try { // TS doesn't restrict the type here based on the guard above diff --git a/langchain-core/src/types/stream.ts b/langchain-core/src/types/stream.ts new file mode 100644 index 000000000000..ae03b69b78bb --- /dev/null +++ b/langchain-core/src/types/stream.ts @@ -0,0 +1,5 @@ +// Make this a type to override ReadableStream's async iterator type in case +// the popular web-streams-polyfill is imported - the supplied types +// in that case don't quite match. +export type IterableReadableStreamInterface = ReadableStream & + AsyncIterable; diff --git a/langchain-core/src/utils/stream.ts b/langchain-core/src/utils/stream.ts index aa9db0604637..cd3e592806be 100644 --- a/langchain-core/src/utils/stream.ts +++ b/langchain-core/src/utils/stream.ts @@ -1,11 +1,11 @@ +import { pickRunnableConfigKeys } from "../runnables/config.js"; import { AsyncLocalStorageProviderSingleton } from "../singletons/index.js"; +import type { IterableReadableStreamInterface } from "../types/stream.js"; import { raceWithSignal } from "./signal.js"; -// Make this a type to override ReadableStream's async iterator type in case -// the popular web-streams-polyfill is imported - the supplied types -// in that case don't quite match. -export type IterableReadableStreamInterface = ReadableStream & - AsyncIterable; +// Re-exported for backwards compatibility +// Do NOT import this type from this file inside the project. Instead, always import from `types/stream.js` +export type { IterableReadableStreamInterface }; /* * Support async iterator syntax for ReadableStreams in all environments. @@ -215,7 +215,9 @@ export class AsyncGeneratorWithSetup< // to each generator is available. this.setup = new Promise((resolve, reject) => { void AsyncLocalStorageProviderSingleton.runWithConfig( - params.config, + pickRunnableConfigKeys( + params.config as Record | undefined + ), async () => { this.firstResult = params.generator.next(); if (params.startSetup) { @@ -238,7 +240,9 @@ export class AsyncGeneratorWithSetup< } return AsyncLocalStorageProviderSingleton.runWithConfig( - this.config, + pickRunnableConfigKeys( + this.config as Record | undefined + ), this.signal ? async () => { return raceWithSignal(this.generator.next(...args), this.signal);