Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Handoff #461

Merged
merged 8 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@
"devDependencies": {
"@babel/preset-env": "^7.22.4",
"@jest/globals": "^29.5.0",
"@langchain/core": "^0.1.28",
"@langchain/core": "^0.1.32",
"@langchain/langgraph": "^0.0.8",
"@tsconfig/recommended": "^1.0.2",
"@types/jest": "^29.5.1",
"@typescript-eslint/eslint-plugin": "^5.59.8",
Expand Down
123 changes: 117 additions & 6 deletions js/src/run_trees.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ export interface RunTreeConfig {
id?: string;
project_name?: string;
parent_run?: RunTree;
parent_run_id?: string;
child_runs?: RunTree[];
start_time?: number;
end_time?: number;
extra?: KVMap;
tags?: string[];
error?: string;
serialized?: object;
inputs?: KVMap;
Expand All @@ -41,16 +43,52 @@ export interface RunTreeConfig {
client?: Client;
}

export interface RunnableConfigLike {
/**
* Tags for this call and any sub-calls (eg. a Chain calling an LLM).
* You can use these to filter calls.
*/
tags?: string[];

/**
* Metadata for this call and any sub-calls (eg. a Chain calling an LLM).
* Keys should be strings, values should be JSON-serializable.
*/
metadata?: Record<string, unknown>;

/**
* Callbacks for this call and any sub-calls (eg. a Chain calling an LLM).
* Tags are passed to all callbacks, metadata is passed to handle*Start callbacks.
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
callbacks?: any;
}

interface CallbackManagerLike {
handlers: TracerLike[];
getParentRunId?: () => string | undefined;
}

interface TracerLike {
name: string;
}
interface LangChainTracerLike extends TracerLike {
name: "langchain_tracer";
projectName: string;
getRun?: (id: string) => RunTree | undefined;
}

export class RunTree implements BaseRun {
id: string;
name: RunTreeConfig["name"];
run_type: string;
project_name: string;
parent_run?: RunTree;
parent_run?: BaseRun;
child_runs: RunTree[];
start_time: number;
end_time?: number;
extra: KVMap;
tags?: string[];
error?: string;
serialized: object;
inputs: KVMap;
Expand All @@ -62,11 +100,12 @@ export class RunTree implements BaseRun {
dotted_order: string;

constructor(config: RunTreeConfig) {
const defaultConfig = RunTree.getDefaultConfig(config.client);
Object.assign(this, { ...defaultConfig, ...config });
const defaultConfig = RunTree.getDefaultConfig();
const client = config.client ?? new Client();
Object.assign(this, { ...defaultConfig, ...config, client });
if (!this.trace_id) {
if (this.parent_run) {
this.trace_id = this.parent_run.trace_id;
this.trace_id = this.parent_run.trace_id ?? this.id;
} else {
this.trace_id = this.id;
}
Expand All @@ -84,7 +123,49 @@ export class RunTree implements BaseRun {
}
}
}
private static getDefaultConfig(client?: Client): object {

static fromRunnableConfig(
config: RunnableConfigLike,
props: {
name: string;
tags?: string[];
metadata?: KVMap;
}
): RunTree {
// We only handle the callback manager case for now
const callbackManager = config?.callbacks as
| CallbackManagerLike
| undefined;
let parentRun: RunTree | undefined;
let projectName: string | undefined;
if (callbackManager) {
const parentRunId = callbackManager?.getParentRunId?.() ?? "";
const langChainTracer = callbackManager?.handlers?.find(
(handler: TracerLike) => handler?.name == "langchain_tracer"
) as LangChainTracerLike | undefined;
parentRun = langChainTracer?.getRun?.(parentRunId);
projectName = langChainTracer?.projectName;
}
const deduppedTags = [
...new Set((parentRun?.tags ?? []).concat(config?.tags ?? [])),
];
const dedupedMetadata = {
...parentRun?.extra?.metadata,
...config?.metadata,
};
const rt = new RunTree({
name: props?.name ?? "<lambda>",
parent_run: parentRun,
tags: deduppedTags,
extra: {
metadata: dedupedMetadata,
},
project_name: projectName,
});
return rt;
}

private static getDefaultConfig(): object {
return {
id: uuid.v4(),
run_type: "chain",
Expand All @@ -101,7 +182,6 @@ export class RunTree implements BaseRun {
serialized: {},
inputs: {},
extra: {},
client: client ?? new Client({}),
};
}

Expand Down Expand Up @@ -171,6 +251,7 @@ export class RunTree implements BaseRun {
parent_run_id: parent_run_id,
trace_id: run.trace_id,
dotted_order: run.dotted_order,
tags: run.tags,
};
return persistedRun;
}
Expand Down Expand Up @@ -200,6 +281,7 @@ export class RunTree implements BaseRun {
events: this.events,
dotted_order: this.dotted_order,
trace_id: this.trace_id,
tags: this.tags,
};

await this.client.updateRun(this.id, runUpdate);
Expand All @@ -213,3 +295,32 @@ export function isRunTree(x?: unknown): x is RunTree {
typeof (x as RunTree).postRun === "function"
);
}

function containsLangChainTracerLike(x?: unknown): x is LangChainTracerLike[] {
return (
Array.isArray(x) &&
x.some((callback: unknown) => {
return (
typeof (callback as LangChainTracerLike).name === "string" &&
(callback as LangChainTracerLike).name === "langchain_tracer"
);
})
);
}

export function isRunnableConfigLike(x?: unknown): x is RunnableConfigLike {
// Check that it's an object with a callbacks arg
// that has either a CallbackManagerLike object with a langchain tracer within it
// or an array with a LangChainTracerLike object within it

return (
x !== undefined &&
typeof (x as RunnableConfigLike).callbacks === "object" &&
// Callback manager with a langchain tracer
(containsLangChainTracerLike(
(x as RunnableConfigLike).callbacks?.handlers
) ||
// Or it's an array with a LangChainTracerLike object within it
containsLangChainTracerLike((x as RunnableConfigLike).callbacks))
);
}
1 change: 1 addition & 0 deletions js/src/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ export interface RunUpdate {
id?: string;
end_time?: number;
extra?: KVMap;
tags?: string[];
error?: string;
inputs?: KVMap;
outputs?: KVMap;
Expand Down
41 changes: 1 addition & 40 deletions js/src/tests/client.int.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,7 @@ import { FunctionMessage, HumanMessage } from "@langchain/core/messages";

import { Client } from "../client.js";
import { v4 as uuidv4 } from "uuid";

async function toArray<T>(iterable: AsyncIterable<T>): Promise<T[]> {
const result: T[] = [];
for await (const item of iterable) {
result.push(item);
}
return result;
}

async function deleteProject(langchainClient: Client, projectName: string) {
try {
await langchainClient.readProject({ projectName });
await langchainClient.deleteProject({ projectName });
} catch (e) {
// Pass
}
}
async function deleteDataset(langchainClient: Client, datasetName: string) {
try {
const existingDataset = await langchainClient.readDataset({ datasetName });
await langchainClient.deleteDataset({ datasetId: existingDataset.id });
} catch (e) {
// Pass
}
}
async function waitUntil(
condition: () => Promise<boolean>,
timeout: number,
interval: number
): Promise<void> {
const start = Date.now();
while (Date.now() - start < timeout) {
if (await condition()) {
return;
}
await new Promise((resolve) => setTimeout(resolve, interval));
}
const elapsed = Date.now() - start;
throw new Error(`Timeout after ${elapsed / 1000}s`);
}
import { deleteDataset, deleteProject, toArray, waitUntil } from "./utils.js";

type CheckOutputsType = boolean | ((run: Run) => boolean);
async function waitUntilRunFound(
Expand Down
92 changes: 92 additions & 0 deletions js/src/tests/lcls_handoff.int.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { BaseMessage, HumanMessage } from "@langchain/core/messages";
import { RunnableConfig, RunnableLambda } from "@langchain/core/runnables";
import { LangChainTracer } from "@langchain/core/tracers/tracer_langchain";
import { MessageGraph } from "@langchain/langgraph";
import { v4 as uuidv4 } from "uuid";
import { Client } from "../client.js";
import { Run } from "../schemas.js";
import { traceable } from "../traceable.js";
import { toArray, waitUntil } from "./utils.js";

test.concurrent(
"Test handoff between run tree and LangChain code.",
async () => {
const projectName = `__test_handoff ${uuidv4()}`;

// Define a new graph
const workflow = new MessageGraph();

const addValueTraceable = traceable(
(msg: BaseMessage) => {
return new HumanMessage({ content: msg.content + " world" });
},
{
name: "add_negligible_value",
}
);

const myFunc = async (messages: BaseMessage[], config?: RunnableConfig) => {
const runnableConfig = config ?? { callbacks: [] };
const newMsg = await addValueTraceable(
runnableConfig,
messages[0] as HumanMessage
);
return [newMsg];
};

// Define the two nodes we will cycle between
workflow.addNode(
"agent",
new RunnableLambda({
func: async () => new HumanMessage({ content: "Hello!" }),
})
);
workflow.addNode("action", new RunnableLambda({ func: myFunc }));

// Set the entrypoint as `agent`
// This means that this node is the first one called
workflow.setEntryPoint("agent");
workflow.addEdge("agent", "action");
workflow.setFinishPoint("action");
const app = workflow.compile();
console.log(projectName);
const tracer = new LangChainTracer({ projectName });
const client = new Client({
callerOptions: { maxRetries: 3 },
});
try {
const result = await app.invoke(
[new HumanMessage({ content: "Hello!" })],
{
callbacks: [tracer],
}
);
expect(result[result.length - 1].content).toEqual("Hello! world");

// First wait until at least one trace is found in the project
const getNestedFunction = (): Promise<Run[]> =>
toArray(
client.listRuns({
projectName,
filter: "eq(name, 'add_negligible_value')",
})
);
await waitUntil(
async () => {
const traces = await getNestedFunction();
return traces.length > 0;
},
30_000,
10
);

const traces = await getNestedFunction();
expect(traces.length).toEqual(1);
const trace = traces[0];
expect(trace.name).toEqual("add_negligible_value");
expect(trace.parent_run_id).not.toBeNull();
} finally {
await client.deleteProject({ projectName });
}
}
);
43 changes: 1 addition & 42 deletions js/src/tests/run_trees.int.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,7 @@ import {
RunTreeConfig,
convertToDottedOrderFormat,
} from "../run_trees.js";

async function toArray<T>(iterable: AsyncIterable<T>): Promise<T[]> {
const result: T[] = [];
for await (const item of iterable) {
result.push(item);
}
return result;
}

async function waitUntil(
condition: () => Promise<boolean>,
timeout: number,
interval: number
): Promise<void> {
const start = Date.now();
while (Date.now() - start < timeout) {
if (await condition()) {
return;
}
await new Promise((resolve) => setTimeout(resolve, interval));
}
throw new Error("Timeout");
}

async function pollRunsUntilCount(
client: Client,
projectName: string,
count: number
): Promise<void> {
await waitUntil(
async () => {
try {
const runs = await toArray(client.listRuns({ projectName }));
return runs.length === count;
} catch (e) {
return false;
}
},
120_000, // Wait up to 120 seconds
5000 // every 5 second
);
}
import { toArray, waitUntil, pollRunsUntilCount } from "./utils.js";

test.concurrent(
"Test post and patch run",
Expand Down
Loading
Loading