diff --git a/langchain-core/src/callbacks/base.ts b/langchain-core/src/callbacks/base.ts index 8227c92dc661..42ecd9820768 100644 --- a/langchain-core/src/callbacks/base.ts +++ b/langchain-core/src/callbacks/base.ts @@ -394,3 +394,13 @@ export abstract class BaseCallbackHandler return new Handler(); } } + +export const isBaseCallbackHandler = (x: unknown) => { + const callbackHandler = x as BaseCallbackHandler; + return ( + callbackHandler !== undefined && + typeof callbackHandler.copy === "function" && + typeof callbackHandler.name === "string" && + typeof callbackHandler.awaitHandlers === "boolean" + ); +}; diff --git a/langchain-core/src/callbacks/manager.ts b/langchain-core/src/callbacks/manager.ts index 6e517b83aac3..dcabf602b838 100644 --- a/langchain-core/src/callbacks/manager.ts +++ b/langchain-core/src/callbacks/manager.ts @@ -6,6 +6,7 @@ import { BaseCallbackHandler, CallbackHandlerMethods, HandleLLMNewTokenCallbackFields, + isBaseCallbackHandler, NewTokenIndices, } from "./base.js"; import { ConsoleCallbackHandler } from "../tracers/console.js"; @@ -21,6 +22,10 @@ import { Serialized } from "../load/serializable.js"; import type { DocumentInterface } from "../documents/document.js"; import { isTracingEnabled } from "../utils/callbacks.js"; import { isBaseTracer } from "../tracers/base.js"; +import { + getContextVariable, + _getConfigureHooks, +} from "../singletons/async_local_storage/context.js"; type BaseCallbackManagerMethods = { [K in keyof CallbackHandlerMethods]?: ( @@ -1252,6 +1257,31 @@ export class CallbackManager callbackManager.addMetadata(localMetadata ?? {}, false); } } + + for (const { + contextVar, + inheritable = true, + handlerClass, + envVar, + } of _getConfigureHooks()) { + const createIfNotInContext = + envVar && getEnvironmentVariable(envVar) === "true" && handlerClass; + let handler: BaseCallbackHandler | undefined; + const contextVarValue = + contextVar !== undefined ? getContextVariable(contextVar) : undefined; + if (contextVarValue && isBaseCallbackHandler(contextVarValue)) { + handler = contextVarValue; + } else if (createIfNotInContext) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + handler = new (handlerClass as any)({}); + } + if (handler !== undefined) { + if (!callbackManager?.handlers.some((h) => h.name === handler!.name)) { + callbackManager?.addHandler(handler, inheritable); + } + } + } + return callbackManager; } } diff --git a/langchain-core/src/callbacks/tests/manager.test.ts b/langchain-core/src/callbacks/tests/manager.test.ts new file mode 100644 index 000000000000..7e947049006f --- /dev/null +++ b/langchain-core/src/callbacks/tests/manager.test.ts @@ -0,0 +1,73 @@ +/* eslint-disable no-process-env */ +import { expect, test, beforeAll, afterEach } from "@jest/globals"; + +import { setContextVariable, registerConfigureHook } from "../../context.js"; +import { BaseCallbackHandler } from "../base.js"; +import { CallbackManager } from "../manager.js"; + +class TestHandler extends BaseCallbackHandler { + name = "TestHandler"; +} + +const handlerInstance = new TestHandler(); + +beforeAll(() => { + process.env.LANGCHAIN_TRACING_V2 = "false"; + process.env.LANGSMITH_TRACING_V2 = "false"; + process.env.__TEST_VAR = "false"; +}); + +afterEach(() => { + setContextVariable("my_test_handler", undefined); +}); + +test("configure with empty array", async () => { + const manager = CallbackManager.configure([]); + expect(manager?.handlers.length).toBe(0); +}); + +test("configure with one handler", async () => { + const manager = CallbackManager.configure([handlerInstance]); + expect(manager?.handlers[0]).toBe(handlerInstance); +}); + +test("registerConfigureHook with contextVar", async () => { + setContextVariable("my_test_handler", handlerInstance); + registerConfigureHook({ + contextVar: "my_test_handler", + }); + const manager = CallbackManager.configure([]); + expect(manager?.handlers[0]).toBe(handlerInstance); +}); + +test("registerConfigureHook with env", async () => { + process.env.__TEST_VAR = "true"; + registerConfigureHook({ + handlerClass: TestHandler, + envVar: "__TEST_VAR", + }); + const manager = CallbackManager.configure([]); + expect(manager?.handlers[0].name).toBe("TestHandler"); +}); + +test("registerConfigureHook doesn't add with env false", async () => { + process.env.__TEST_VAR = "false"; + registerConfigureHook({ + handlerClass: TestHandler, + envVar: "__TEST_VAR", + }); + const manager = CallbackManager.configure([]); + expect(manager?.handlers.length).toBe(0); +}); + +test("registerConfigureHook avoids multiple", async () => { + process.env.__TEST_VAR = "true"; + registerConfigureHook({ + contextVar: "my_test_handler", + handlerClass: TestHandler, + envVar: "__TEST_VAR", + }); + const manager = CallbackManager.configure([handlerInstance]); + expect(manager?.handlers[0]).toBe(handlerInstance); + expect(manager?.handlers[1]).toBe(undefined); +}); diff --git a/langchain-core/src/context.ts b/langchain-core/src/context.ts index ff12a9eb4fc8..366883fb7620 100644 --- a/langchain-core/src/context.ts +++ b/langchain-core/src/context.ts @@ -1,131 +1,29 @@ /* __LC_ALLOW_ENTRYPOINT_SIDE_EFFECTS__ */ + +/** + * This file exists as a convenient public entrypoint for functionality + * related to context variables. + * + * Because it automatically initializes AsyncLocalStorage, internal + * functionality SHOULD NEVER import from this file outside of tests. + */ + import { AsyncLocalStorage } from "node:async_hooks"; -import { RunTree } from "langsmith"; -import { isRunTree } from "langsmith/run_trees"; +import { AsyncLocalStorageProviderSingleton } from "./singletons/index.js"; import { - _CONTEXT_VARIABLES_KEY, - AsyncLocalStorageProviderSingleton, -} from "./singletons/index.js"; + getContextVariable, + setContextVariable, + type ConfigureHook, + registerConfigureHook, +} from "./singletons/async_local_storage/context.js"; AsyncLocalStorageProviderSingleton.initializeGlobalInstance( new AsyncLocalStorage() ); -/** - * Set a context variable. Context variables are scoped to any - * child runnables called by the current runnable, or globally if set outside - * of any runnable. - * - * @remarks - * This function is only supported in environments that support AsyncLocalStorage, - * including Node.js, Deno, and Cloudflare Workers. - * - * @example - * ```ts - * import { RunnableLambda } from "@langchain/core/runnables"; - * import { - * getContextVariable, - * setContextVariable - * } from "@langchain/core/context"; - * - * const nested = RunnableLambda.from(() => { - * // "bar" because it was set by a parent - * console.log(getContextVariable("foo")); - * - * // Override to "baz", but only for child runnables - * setContextVariable("foo", "baz"); - * - * // Now "baz", but only for child runnables - * return getContextVariable("foo"); - * }); - * - * const runnable = RunnableLambda.from(async () => { - * // Set a context variable named "foo" - * setContextVariable("foo", "bar"); - * - * const res = await nested.invoke({}); - * - * // Still "bar" since child changes do not affect parents - * console.log(getContextVariable("foo")); - * - * return res; - * }); - * - * // undefined, because context variable has not been set yet - * console.log(getContextVariable("foo")); - * - * // Final return value is "baz" - * const result = await runnable.invoke({}); - * ``` - * - * @param name The name of the context variable. - * @param value The value to set. - */ -// eslint-disable-next-line @typescript-eslint/no-explicit-any -export function setContextVariable(name: PropertyKey, value: T): void { - const runTree = AsyncLocalStorageProviderSingleton.getInstance().getStore(); - const contextVars = { ...runTree?.[_CONTEXT_VARIABLES_KEY] }; - contextVars[name] = value; - let newValue = {}; - if (isRunTree(runTree)) { - newValue = new RunTree(runTree); - } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (newValue as any)[_CONTEXT_VARIABLES_KEY] = contextVars; - AsyncLocalStorageProviderSingleton.getInstance().enterWith(newValue); -} - -/** - * Get the value of a previously set context variable. Context variables - * are scoped to any child runnables called by the current runnable, - * or globally if set outside of any runnable. - * - * @remarks - * This function is only supported in environments that support AsyncLocalStorage, - * including Node.js, Deno, and Cloudflare Workers. - * - * @example - * ```ts - * import { RunnableLambda } from "@langchain/core/runnables"; - * import { - * getContextVariable, - * setContextVariable - * } from "@langchain/core/context"; - * - * const nested = RunnableLambda.from(() => { - * // "bar" because it was set by a parent - * console.log(getContextVariable("foo")); - * - * // Override to "baz", but only for child runnables - * setContextVariable("foo", "baz"); - * - * // Now "baz", but only for child runnables - * return getContextVariable("foo"); - * }); - * - * const runnable = RunnableLambda.from(async () => { - * // Set a context variable named "foo" - * setContextVariable("foo", "bar"); - * - * const res = await nested.invoke({}); - * - * // Still "bar" since child changes do not affect parents - * console.log(getContextVariable("foo")); - * - * return res; - * }); - * - * // undefined, because context variable has not been set yet - * console.log(getContextVariable("foo")); - * - * // Final return value is "baz" - * const result = await runnable.invoke({}); - * ``` - * - * @param name The name of the context variable. - */ -// eslint-disable-next-line @typescript-eslint/no-explicit-any -export function getContextVariable(name: PropertyKey): T | undefined { - const runTree = AsyncLocalStorageProviderSingleton.getInstance().getStore(); - return runTree?.[_CONTEXT_VARIABLES_KEY]?.[name]; -} +export { + getContextVariable, + setContextVariable, + registerConfigureHook, + type ConfigureHook, +}; diff --git a/langchain-core/src/singletons/async_local_storage/context.ts b/langchain-core/src/singletons/async_local_storage/context.ts new file mode 100644 index 000000000000..af08cd0d41d4 --- /dev/null +++ b/langchain-core/src/singletons/async_local_storage/context.ts @@ -0,0 +1,204 @@ +import { isRunTree, RunTree } from "langsmith/run_trees"; +import { BaseCallbackHandler } from "../../callbacks/base.js"; +import { + _CONTEXT_VARIABLES_KEY, + getGlobalAsyncLocalStorageInstance, +} from "./globals.js"; + +/** + * Set a context variable. Context variables are scoped to any + * child runnables called by the current runnable, or globally if set outside + * of any runnable. + * + * @remarks + * This function is only supported in environments that support AsyncLocalStorage, + * including Node.js, Deno, and Cloudflare Workers. + * + * @example + * ```ts + * import { RunnableLambda } from "@langchain/core/runnables"; + * import { + * getContextVariable, + * setContextVariable + * } from "@langchain/core/context"; + * + * const nested = RunnableLambda.from(() => { + * // "bar" because it was set by a parent + * console.log(getContextVariable("foo")); + * + * // Override to "baz", but only for child runnables + * setContextVariable("foo", "baz"); + * + * // Now "baz", but only for child runnables + * return getContextVariable("foo"); + * }); + * + * const runnable = RunnableLambda.from(async () => { + * // Set a context variable named "foo" + * setContextVariable("foo", "bar"); + * + * const res = await nested.invoke({}); + * + * // Still "bar" since child changes do not affect parents + * console.log(getContextVariable("foo")); + * + * return res; + * }); + * + * // undefined, because context variable has not been set yet + * console.log(getContextVariable("foo")); + * + * // Final return value is "baz" + * const result = await runnable.invoke({}); + * ``` + * + * @param name The name of the context variable. + * @param value The value to set. + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export function setContextVariable(name: PropertyKey, value: T): void { + // Avoid using global singleton due to circuluar dependency issues + const asyncLocalStorageInstance = getGlobalAsyncLocalStorageInstance(); + if (asyncLocalStorageInstance === undefined) { + throw new Error( + `Internal error: Global shared async local storage instance has not been initialized.` + ); + } + const runTree = asyncLocalStorageInstance.getStore(); + const contextVars = { ...runTree?.[_CONTEXT_VARIABLES_KEY] }; + contextVars[name] = value; + let newValue = {}; + if (isRunTree(runTree)) { + newValue = new RunTree(runTree); + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (newValue as any)[_CONTEXT_VARIABLES_KEY] = contextVars; + asyncLocalStorageInstance.enterWith(newValue); +} + +/** + * Get the value of a previously set context variable. Context variables + * are scoped to any child runnables called by the current runnable, + * or globally if set outside of any runnable. + * + * @remarks + * This function is only supported in environments that support AsyncLocalStorage, + * including Node.js, Deno, and Cloudflare Workers. + * + * @example + * ```ts + * import { RunnableLambda } from "@langchain/core/runnables"; + * import { + * getContextVariable, + * setContextVariable + * } from "@langchain/core/context"; + * + * const nested = RunnableLambda.from(() => { + * // "bar" because it was set by a parent + * console.log(getContextVariable("foo")); + * + * // Override to "baz", but only for child runnables + * setContextVariable("foo", "baz"); + * + * // Now "baz", but only for child runnables + * return getContextVariable("foo"); + * }); + * + * const runnable = RunnableLambda.from(async () => { + * // Set a context variable named "foo" + * setContextVariable("foo", "bar"); + * + * const res = await nested.invoke({}); + * + * // Still "bar" since child changes do not affect parents + * console.log(getContextVariable("foo")); + * + * return res; + * }); + * + * // undefined, because context variable has not been set yet + * console.log(getContextVariable("foo")); + * + * // Final return value is "baz" + * const result = await runnable.invoke({}); + * ``` + * + * @param name The name of the context variable. + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export function getContextVariable(name: PropertyKey): T | undefined { + // Avoid using global singleton due to circuluar dependency issues + const asyncLocalStorageInstance = getGlobalAsyncLocalStorageInstance(); + if (asyncLocalStorageInstance === undefined) { + return undefined; + } + const runTree = asyncLocalStorageInstance.getStore(); + return runTree?.[_CONTEXT_VARIABLES_KEY]?.[name]; +} + +const LC_CONFIGURE_HOOKS_KEY = Symbol("lc:configure_hooks"); + +export const _getConfigureHooks = () => + getContextVariable(LC_CONFIGURE_HOOKS_KEY) || []; + +/** + * Register a callback configure hook to automatically add callback handlers to all runs. + * + * There are two ways to use this: + * + * 1. Using a context variable: + * - Set `contextVar` to specify the variable name + * - Use `setContextVariable()` to store your handler instance + * + * 2. Using an environment variable: + * - Set both `envVar` and `handlerClass` + * - The handler will be instantiated when the env var is set to "true". + * + * @example + * ```typescript + * // Method 1: Using context variable + * import { + * registerConfigureHook, + * setContextVariable + * } from "@langchain/core/context"; + * + * const tracer = new MyCallbackHandler(); + * registerConfigureHook({ + * contextVar: "my_tracer", + * }); + * setContextVariable("my_tracer", tracer); + * + * // ...run code here + * + * // Method 2: Using environment variable + * registerConfigureHook({ + * handlerClass: MyCallbackHandler, + * envVar: "MY_TRACER_ENABLED", + * }); + * process.env.MY_TRACER_ENABLED = "true"; + * + * // ...run code here + * ``` + * + * @param config Configuration object for the hook + * @param config.contextVar Name of the context variable containing the handler instance + * @param config.inheritable Whether child runs should inherit this handler + * @param config.handlerClass Optional callback handler class (required if using envVar) + * @param config.envVar Optional environment variable name to control handler activation + */ +export const registerConfigureHook = (config: ConfigureHook) => { + if (config.envVar && !config.handlerClass) { + throw new Error( + "If envVar is set, handlerClass must also be set to a non-None value." + ); + } + setContextVariable(LC_CONFIGURE_HOOKS_KEY, [..._getConfigureHooks(), config]); +}; + +export type ConfigureHook = { + contextVar?: string; + inheritable?: boolean; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + handlerClass?: new (...args: any[]) => BaseCallbackHandler; + envVar?: string; +}; diff --git a/langchain-core/src/singletons/async_local_storage/globals.ts b/langchain-core/src/singletons/async_local_storage/globals.ts index c3428989b8c4..d7c661aac79e 100644 --- a/langchain-core/src/singletons/async_local_storage/globals.ts +++ b/langchain-core/src/singletons/async_local_storage/globals.ts @@ -9,12 +9,16 @@ export interface AsyncLocalStorageInterface { export const TRACING_ALS_KEY = Symbol.for("ls:tracing_async_local_storage"); +export const _CONTEXT_VARIABLES_KEY = Symbol.for("lc:context_variables"); + export const setGlobalAsyncLocalStorageInstance = ( instance: AsyncLocalStorageInterface ) => { (globalThis as any)[TRACING_ALS_KEY] = instance; }; -export const getGlobalAsyncLocalStorageInstance = () => { +export const getGlobalAsyncLocalStorageInstance = (): + | AsyncLocalStorageInterface + | undefined => { return (globalThis as any)[TRACING_ALS_KEY]; }; diff --git a/langchain-core/src/singletons/async_local_storage/index.ts b/langchain-core/src/singletons/async_local_storage/index.ts index f89c9fbce50a..3dc1341d92e2 100644 --- a/langchain-core/src/singletons/async_local_storage/index.ts +++ b/langchain-core/src/singletons/async_local_storage/index.ts @@ -4,6 +4,7 @@ import { AsyncLocalStorageInterface, getGlobalAsyncLocalStorageInstance, setGlobalAsyncLocalStorageInstance, + _CONTEXT_VARIABLES_KEY, } from "./globals.js"; import { CallbackManager } from "../../callbacks/manager.js"; import { LangChainTracer } from "../../tracers/tracer_langchain.js"; @@ -26,8 +27,6 @@ const mockAsyncLocalStorage = new MockAsyncLocalStorage(); const LC_CHILD_KEY = Symbol.for("lc:child_config"); -export const _CONTEXT_VARIABLES_KEY = Symbol.for("lc:context_variables"); - class AsyncLocalStorageProvider { getInstance(): AsyncLocalStorageInterface { return getGlobalAsyncLocalStorageInstance() ?? mockAsyncLocalStorage; diff --git a/langchain-core/src/singletons/callbacks.ts b/langchain-core/src/singletons/callbacks.ts index 681d770ba96d..5c881bfcbab6 100644 --- a/langchain-core/src/singletons/callbacks.ts +++ b/langchain-core/src/singletons/callbacks.ts @@ -37,20 +37,18 @@ export async function consumeCallback( if (wait === true) { // Clear config since callbacks are not part of the root run // Avoid using global singleton due to circuluar dependency issues - if (getGlobalAsyncLocalStorageInstance() !== undefined) { - await getGlobalAsyncLocalStorageInstance().run(undefined, async () => - promiseFn() - ); + const asyncLocalStorageInstance = getGlobalAsyncLocalStorageInstance(); + if (asyncLocalStorageInstance !== undefined) { + await asyncLocalStorageInstance.run(undefined, async () => promiseFn()); } else { await promiseFn(); } } else { queue = getQueue(); void queue.add(async () => { - if (getGlobalAsyncLocalStorageInstance() !== undefined) { - await getGlobalAsyncLocalStorageInstance().run(undefined, async () => - promiseFn() - ); + const asyncLocalStorageInstance = getGlobalAsyncLocalStorageInstance(); + if (asyncLocalStorageInstance !== undefined) { + await asyncLocalStorageInstance.run(undefined, async () => promiseFn()); } else { await promiseFn(); } diff --git a/langchain-core/src/singletons/index.ts b/langchain-core/src/singletons/index.ts index bee8320abaec..21f17232ccf2 100644 --- a/langchain-core/src/singletons/index.ts +++ b/langchain-core/src/singletons/index.ts @@ -1,14 +1,13 @@ -/* eslint-disable @typescript-eslint/no-explicit-any */ import { type AsyncLocalStorageInterface, AsyncLocalStorageProviderSingleton, - _CONTEXT_VARIABLES_KEY, MockAsyncLocalStorage, } from "./async_local_storage/index.js"; +import { _CONTEXT_VARIABLES_KEY } from "./async_local_storage/globals.js"; export { type AsyncLocalStorageInterface, AsyncLocalStorageProviderSingleton, - _CONTEXT_VARIABLES_KEY, MockAsyncLocalStorage, }; +export { _CONTEXT_VARIABLES_KEY };