Skip to content

Commit

Permalink
fix(core): Fix runnable with fallbacks stream events bug (#7411)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacoblee93 authored Dec 20, 2024
1 parent 61b6cbb commit 1a4e0ca
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 21 deletions.
49 changes: 28 additions & 21 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2840,7 +2840,7 @@ export class RunnableWithFallbacks<RunInput, RunOutput> extends Runnable<
options?: Partial<RunnableConfig>
): Promise<RunOutput> {
const config = ensureConfig(options);
const callbackManager_ = await getCallbackManagerForConfig(options);
const callbackManager_ = await getCallbackManagerForConfig(config);
const { runId, ...otherConfigFields } = config;
const runManager = await callbackManager_?.handleChainStart(
this.toJSON(),
Expand All @@ -2851,35 +2851,41 @@ export class RunnableWithFallbacks<RunInput, RunOutput> extends Runnable<
undefined,
otherConfigFields?.runName
);
let firstError;
for (const runnable of this.runnables()) {
config?.signal?.throwIfAborted();
try {
const output = await runnable.invoke(
input,
patchConfig(otherConfigFields, { callbacks: runManager?.getChild() })
);
await runManager?.handleChainEnd(_coerceToDict(output, "output"));
return output;
} catch (e) {
const childConfig = patchConfig(otherConfigFields, {
callbacks: runManager?.getChild(),
});
const res = await AsyncLocalStorageProviderSingleton.runWithConfig(
childConfig,
async () => {
let firstError;
for (const runnable of this.runnables()) {
config?.signal?.throwIfAborted();
try {
const output = await runnable.invoke(input, childConfig);
await runManager?.handleChainEnd(_coerceToDict(output, "output"));
return output;
} catch (e) {
if (firstError === undefined) {
firstError = e;
}
}
}
if (firstError === undefined) {
firstError = e;
throw new Error("No error stored at end of fallback.");
}
await runManager?.handleChainError(firstError);
throw firstError;
}
}
if (firstError === undefined) {
throw new Error("No error stored at end of fallback.");
}
await runManager?.handleChainError(firstError);
throw firstError;
);
return res;
}

async *_streamIterator(
input: RunInput,
options?: Partial<RunnableConfig> | undefined
): AsyncGenerator<RunOutput> {
const config = ensureConfig(options);
const callbackManager_ = await getCallbackManagerForConfig(options);
const callbackManager_ = await getCallbackManagerForConfig(config);
const { runId, ...otherConfigFields } = config;
const runManager = await callbackManager_?.handleChainStart(
this.toJSON(),
Expand All @@ -2898,7 +2904,8 @@ export class RunnableWithFallbacks<RunInput, RunOutput> extends Runnable<
callbacks: runManager?.getChild(),
});
try {
stream = await runnable.stream(input, childConfig);
const originalStream = await runnable.stream(input, childConfig);
stream = consumeAsyncIterableInContext(childConfig, originalStream);
break;
} catch (e) {
if (firstError === undefined) {
Expand Down
37 changes: 37 additions & 0 deletions langchain-core/src/runnables/tests/runnable_with_fallbacks.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
/* eslint-disable no-promise-executor-return */
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable no-process-env */
import { test, expect } from "@jest/globals";
import { AsyncLocalStorage } from "node:async_hooks";
import { FakeLLM, FakeStreamingLLM } from "../../utils/testing/index.js";
import { RunnableLambda } from "../base.js";
import { AsyncLocalStorageProviderSingleton } from "../../singletons/index.js";

test("RunnableWithFallbacks", async () => {
const llm = new FakeLLM({
Expand Down Expand Up @@ -55,3 +59,36 @@ test("RunnableWithFallbacks stream", async () => {
expect(chunks.length).toBeGreaterThan(1);
expect(chunks.join("")).toEqual("What up");
});

test("RunnableWithFallbacks stream events with local storage and callbacks added via env vars", async () => {
process.env.LANGCHAIN_VERBOSE = "true";
AsyncLocalStorageProviderSingleton.initializeGlobalInstance(
new AsyncLocalStorage()
);
const llm = new FakeStreamingLLM({
thrownErrorString: "Bad error!",
});
const llmWithFallbacks = llm.withFallbacks({
fallbacks: [new FakeStreamingLLM({})],
});
const runnable = RunnableLambda.from(async (input: any) => {
const res = await llmWithFallbacks.invoke(input);
const stream = await llmWithFallbacks.stream(input);
for await (const _ of stream) {
void _;
}
return res;
});
const stream = await runnable.streamEvents("hi", {
version: "v2",
});
const chunks = [];
for await (const chunk of stream) {
if (chunk.event === "on_llm_stream") {
chunks.push(chunk);
}
}
expect(chunks.length).toBeGreaterThan(1);
console.log(JSON.stringify(chunks, null, 2));
expect(chunks.map((chunk) => chunk.data.chunk.text).join("")).toEqual("hihi");
});

0 comments on commit 1a4e0ca

Please sign in to comment.