Skip to content

Commit

Permalink
fix(langchain-core): Pick runnable config keys in asynclocalstorage
Browse files Browse the repository at this point in the history
  • Loading branch information
bracesproul committed Dec 5, 2024
1 parent 983acd2 commit 0560c29
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 8 deletions.
5 changes: 3 additions & 2 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
atee,
pipeGeneratorWithSetup,
AsyncGeneratorWithSetup,
pickRunnableConfigKeys,
} from "../utils/stream.js";
import { raceWithSignal } from "../utils/signal.js";
import {
Expand Down Expand Up @@ -2529,7 +2530,7 @@ export class RunnableLambda<
recursionLimit: (config?.recursionLimit ?? DEFAULT_RECURSION_LIMIT) - 1,
});
void AsyncLocalStorageProviderSingleton.runWithConfig(
childConfig,
pickRunnableConfigKeys(childConfig),
async () => {
try {
let output = await this.func(input, {
Expand Down Expand Up @@ -2627,7 +2628,7 @@ export class RunnableLambda<
const output = await new Promise<RunOutput | Runnable>(
(resolve, reject) => {
void AsyncLocalStorageProviderSingleton.runWithConfig(
childConfig,
pickRunnableConfigKeys(childConfig),
async () => {
try {
const res = await this.func(finalChunk as RunInput, {
Expand Down
5 changes: 3 additions & 2 deletions langchain-core/src/runnables/iter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { AsyncLocalStorageProviderSingleton } from "../singletons/index.js";
import { pickRunnableConfigKeys } from "../utils/stream.js";
import { RunnableConfig } from "./config.js";

export function isIterableIterator(
Expand Down Expand Up @@ -36,7 +37,7 @@ export function* consumeIteratorInContext<T>(
): IterableIterator<T> {
while (true) {
const { value, done } = AsyncLocalStorageProviderSingleton.runWithConfig(
context,
pickRunnableConfigKeys(context),
iter.next.bind(iter),
true
);
Expand All @@ -56,7 +57,7 @@ export async function* consumeAsyncIterableInContext<T>(
while (true) {
const { value, done } =
await AsyncLocalStorageProviderSingleton.runWithConfig(
context,
pickRunnableConfigKeys(context),
iterator.next.bind(iter),
true
);
Expand Down
5 changes: 3 additions & 2 deletions langchain-core/src/tools/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { MessageContent } from "../messages/base.js";
import { AsyncLocalStorageProviderSingleton } from "../singletons/index.js";
import { _isToolCall, ToolInputParsingException } from "./utils.js";
import { isZodSchema } from "../utils/types/is_zod_schema.js";
import { pickRunnableConfigKeys } from "../utils/stream.js";

export { ToolInputParsingException };

Expand Down Expand Up @@ -594,7 +595,7 @@ export function tool<
callbacks: runManager?.getChild(),
});
void AsyncLocalStorageProviderSingleton.runWithConfig(
childConfig,
pickRunnableConfigKeys(childConfig),
async () => {
try {
// TS doesn't restrict the type here based on the guard above
Expand Down Expand Up @@ -625,7 +626,7 @@ export function tool<
callbacks: runManager?.getChild(),
});
void AsyncLocalStorageProviderSingleton.runWithConfig(
childConfig,
pickRunnableConfigKeys(childConfig),
async () => {
try {
// TS doesn't restrict the type here based on the guard above
Expand Down
23 changes: 21 additions & 2 deletions langchain-core/src/utils/stream.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { RunnableConfig } from "../../runnables.js";
import { AsyncLocalStorageProviderSingleton } from "../singletons/index.js";
import { raceWithSignal } from "./signal.js";

Expand Down Expand Up @@ -180,6 +181,24 @@ export function concat<
}
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function pickRunnableConfigKeys<CallOptions extends Record<string, any>>(
config?: CallOptions
): RunnableConfig | undefined {
return config
? {
configurable: config.configurable,
recursionLimit: config.recursionLimit,
callbacks: config.callbacks,
tags: config.tags,
metadata: config.metadata,
maxConcurrency: config.maxConcurrency,
timeout: config.timeout,
signal: config.signal,
}
: undefined;
}

export class AsyncGeneratorWithSetup<
S = unknown,
T = unknown,
Expand Down Expand Up @@ -215,7 +234,7 @@ export class AsyncGeneratorWithSetup<
// to each generator is available.
this.setup = new Promise((resolve, reject) => {
void AsyncLocalStorageProviderSingleton.runWithConfig(
params.config,
pickRunnableConfigKeys(params.config as RunnableConfig),
async () => {
this.firstResult = params.generator.next();
if (params.startSetup) {
Expand All @@ -238,7 +257,7 @@ export class AsyncGeneratorWithSetup<
}

return AsyncLocalStorageProviderSingleton.runWithConfig(
this.config,
pickRunnableConfigKeys(this.config as RunnableConfig),
this.signal
? async () => {
return raceWithSignal(this.generator.next(...args), this.signal);
Expand Down

0 comments on commit 0560c29

Please sign in to comment.