Skip to content

Commit

Permalink
Fix remote runnable tracing for stream log and stream events
Browse files Browse the repository at this point in the history
  • Loading branch information
jacoblee93 committed Mar 30, 2024
1 parent 8828682 commit 3db4721
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 74 deletions.
174 changes: 102 additions & 72 deletions langchain-core/src/runnables/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import { Document } from "../documents/index.js";
import { CallbackManagerForChainRun } from "../callbacks/manager.js";
import { ChatPromptValue, StringPromptValue } from "../prompt_values.js";
import {
LogStreamCallbackHandler,
RunLogPatch,
type LogStreamCallbackHandlerInput,
type StreamEvent,
RunLog,
} from "../tracers/log_stream.js";
import {
AIMessage,
Expand Down Expand Up @@ -472,20 +472,16 @@ export class RemoteRunnable<
): AsyncGenerator<RunLogPatch> {
const [config, kwargs] =
this._separateRunnableConfigFromCallOptions(options);
const stream = new LogStreamCallbackHandler({
...streamOptions,
autoClose: false,
});
const { callbacks } = config;
if (callbacks === undefined) {
config.callbacks = [stream];
} else if (Array.isArray(callbacks)) {
config.callbacks = callbacks.concat([stream]);
} else {
const copiedCallbacks = callbacks.copy();
copiedCallbacks.inheritableHandlers.push(stream);
config.callbacks = copiedCallbacks;
}
const callbackManager_ = await getCallbackManagerForConfig(options);
const runManager = await callbackManager_?.handleChainStart(
this.toJSON(),
_coerceToDict(input, "input"),
undefined,
undefined,
undefined,
undefined,
options?.runName
);
// The type is in camelCase but the API only accepts snake_case.
const camelCaseStreamOptions = {
include_names: streamOptions?.includeNames,
Expand All @@ -495,32 +491,46 @@ export class RemoteRunnable<
exclude_types: streamOptions?.excludeTypes,
exclude_tags: streamOptions?.excludeTags,
};
const response = await this.post<{
input: RunInput;
config?: RunnableConfig;
kwargs?: Omit<Partial<CallOptions>, keyof RunnableConfig>;
diff: false;
}>("/stream_log", {
input,
config: removeCallbacks(config),
kwargs,
...camelCaseStreamOptions,
diff: false,
});
const { body, ok } = response;
if (!ok) {
throw new Error(`${response.status} Error: ${await response.text()}`);
}
if (!body) {
throw new Error(
"Could not begin remote stream log. Please check the given URL and try again."
);
}
const runnableStream = convertEventStreamToIterableReadableDataStream(body);
for await (const log of runnableStream) {
const chunk = revive(JSON.parse(log));
yield new RunLogPatch({ ops: chunk.ops });
let runLog;
try {
const response = await this.post<{
input: RunInput;
config?: RunnableConfig;
kwargs?: Omit<Partial<CallOptions>, keyof RunnableConfig>;
diff: false;
}>("/stream_log", {
input,
config: removeCallbacks(config),
kwargs,
...camelCaseStreamOptions,
diff: false,
});
const { body, ok } = response;
if (!ok) {
throw new Error(`${response.status} Error: ${await response.text()}`);
}
if (!body) {
throw new Error(
"Could not begin remote stream log. Please check the given URL and try again."
);
}
const runnableStream =
convertEventStreamToIterableReadableDataStream(body);
for await (const log of runnableStream) {
const chunk = revive(JSON.parse(log));
const logPatch = new RunLogPatch({ ops: chunk.ops });
yield logPatch;
if (runLog === undefined) {
runLog = RunLog.fromRunLogPatch(logPatch);
} else {
runLog = runLog.concat(logPatch);
}
}
} catch (err) {
await runManager?.handleChainError(err);
throw err;
}
await runManager?.handleChainEnd(runLog?.state.final_output);
}

async *streamEvents(
Expand All @@ -535,6 +545,16 @@ export class RemoteRunnable<
}
const [config, kwargs] =
this._separateRunnableConfigFromCallOptions(options);
const callbackManager_ = await getCallbackManagerForConfig(options);
const runManager = await callbackManager_?.handleChainStart(
this.toJSON(),
_coerceToDict(input, "input"),
undefined,
undefined,
undefined,
undefined,
options?.runName
);
// The type is in camelCase but the API only accepts snake_case.
const camelCaseStreamOptions = {
include_names: streamOptions?.includeNames,
Expand All @@ -544,38 +564,48 @@ export class RemoteRunnable<
exclude_types: streamOptions?.excludeTypes,
exclude_tags: streamOptions?.excludeTags,
};
const response = await this.post<{
input: RunInput;
config?: RunnableConfig;
kwargs?: Omit<Partial<CallOptions>, keyof RunnableConfig>;
diff: false;
}>("/stream_events", {
input,
config: removeCallbacks(config),
kwargs,
...camelCaseStreamOptions,
diff: false,
});
const { body, ok } = response;
if (!ok) {
throw new Error(`${response.status} Error: ${await response.text()}`);
}
if (!body) {
throw new Error(
"Could not begin remote stream events. Please check the given URL and try again."
);
}
const runnableStream = convertEventStreamToIterableReadableDataStream(body);
for await (const log of runnableStream) {
const chunk = revive(JSON.parse(log));
yield {
event: chunk.event,
name: chunk.name,
run_id: chunk.run_id,
tags: chunk.tags,
metadata: chunk.metadata,
data: chunk.data,
};
const events = [];
try {
const response = await this.post<{
input: RunInput;
config?: RunnableConfig;
kwargs?: Omit<Partial<CallOptions>, keyof RunnableConfig>;
diff: false;
}>("/stream_events", {
input,
config: removeCallbacks(config),
kwargs,
...camelCaseStreamOptions,
diff: false,
});
const { body, ok } = response;
if (!ok) {
throw new Error(`${response.status} Error: ${await response.text()}`);
}
if (!body) {
throw new Error(
"Could not begin remote stream events. Please check the given URL and try again."
);
}
const runnableStream =
convertEventStreamToIterableReadableDataStream(body);
for await (const log of runnableStream) {
const chunk = revive(JSON.parse(log));
const event = {
event: chunk.event,
name: chunk.name,
run_id: chunk.run_id,
tags: chunk.tags,
metadata: chunk.metadata,
data: chunk.data,
};
yield event;
events.push(event);
}
} catch (err) {
await runManager?.handleChainError(err);
throw err;
}
await runManager?.handleChainEnd(events);
}
}
6 changes: 4 additions & 2 deletions langchain-core/src/runnables/tests/runnable_remote.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,10 @@ describe("RemoteRunnable", () => {

test("Streaming in a chain with model output", async () => {
const remote = new RemoteRunnable({ url: `${BASE_URL}/b` });
const prompt = PromptTemplate.fromTemplate('');
const chunks = await prompt.pipe(remote).stream({ text: "What are the 5 best apples?" });
const prompt = PromptTemplate.fromTemplate("");
const chunks = await prompt
.pipe(remote)
.stream({ text: "What are the 5 best apples?" });
let chunkCount = 0;
let accumulator: AIMessageChunk | null = null;
for await (const chunk of chunks) {
Expand Down

0 comments on commit 3db4721

Please sign in to comment.