Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat[langsmith]: transparent handoff between @traceable HOF and LangChain #5339

Merged
merged 14 commits into from
May 30, 2024
Binary file not shown.
Binary file not shown.
Binary file not shown.
2 changes: 1 addition & 1 deletion examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
"ioredis": "^5.3.2",
"js-yaml": "^4.1.0",
"langchain": "workspace:*",
"langsmith": "^0.1.1",
"langsmith": "^0.1.30",
"ml-distance": "^4.0.0",
"mongodb": "^6.3.0",
"pg": "^8.11.0",
Expand Down
2 changes: 1 addition & 1 deletion langchain-core/langchain.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ function abs(relativePath) {
}

export const config = {
internals: [/node\:/, /js-tiktoken/],
internals: [/node\:/, /js-tiktoken/, /langsmith/],
entrypoints: {
agents: "agents",
caches: "caches",
Expand Down
2 changes: 1 addition & 1 deletion langchain-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"camelcase": "6",
dqbd marked this conversation as resolved.
Show resolved Hide resolved
"decamelize": "1.2.0",
"js-tiktoken": "^1.0.12",
"langsmith": "~0.1.7",
"langsmith": "~0.1.30",
"ml-distance": "^4.0.0",
"mustache": "^4.2.0",
"p-queue": "^6.6.2",
Expand Down
10 changes: 8 additions & 2 deletions langchain-core/src/callbacks/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ export class CallbackManager

name = "callback_manager";

public readonly _parentRunId?: string;
public _parentRunId?: string;

constructor(
parentRunId?: string,
Expand Down Expand Up @@ -1010,7 +1010,13 @@ export class CallbackManager
)
) {
if (tracingV2Enabled) {
callbackManager.addHandler(await getTracingV2CallbackHandler(), true);
const tracerV2 = await getTracingV2CallbackHandler();
callbackManager.addHandler(tracerV2, true);

// handoff between langchain and langsmith/traceable
// override the parent run ID
callbackManager._parentRunId =
tracerV2.getTraceableRunTree()?.id ?? callbackManager._parentRunId;
}
}
}
Expand Down
124 changes: 121 additions & 3 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import { z } from "zod";
import pRetry from "p-retry";
import { v4 as uuidv4 } from "uuid";

import {
type TraceableFunction,
isTraceableFunction,
} from "langsmith/singletons/traceable";
import type { RunnableInterface, RunnableBatchOptions } from "./types.js";
import {
CallbackManager,
Expand Down Expand Up @@ -48,6 +52,7 @@ import {
consumeAsyncIterableInContext,
consumeIteratorInContext,
isAsyncIterable,
isIterableIterator,
isIterator,
} from "./iter.js";

Expand Down Expand Up @@ -2062,6 +2067,91 @@ export class RunnableMap<
}
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type AnyTraceableFunction = TraceableFunction<(...any: any[]) => any>;

/**
* A runnable that wraps a traced LangSmith function.
*/
export class RunnableTraceable<RunInput, RunOutput> extends Runnable<
RunInput,
RunOutput
> {
lc_serializable = false;

lc_namespace = ["langchain_core", "runnables"];

protected func: AnyTraceableFunction;

constructor(fields: { func: AnyTraceableFunction }) {
super(fields);

if (!isTraceableFunction(fields.func)) {
throw new Error(
"RunnableTraceable requires a function that is wrapped in traceable higher-order function"
);
}

this.func = fields.func;
}

async invoke(input: RunInput, options?: Partial<RunnableConfig>) {
const [config] = this._getOptionsList(options ?? {}, 1);
const callbacks = await getCallbackManagerForConfig(config);

return (await this.func(
patchConfig(config, { callbacks }),
input
)) as RunOutput;
}

async *_streamIterator(
input: RunInput,
options?: Partial<RunnableConfig>
): AsyncGenerator<RunOutput> {
const result = await this.invoke(input, options);

if (isAsyncIterable(result)) {
for await (const item of result) {
yield item as RunOutput;
}
return;
}

if (isIterator(result)) {
while (true) {
const state: IteratorResult<unknown> = result.next();
if (state.done) break;
yield state.value as RunOutput;
}
return;
}

yield result;
}

static from(func: AnyTraceableFunction) {
return new RunnableTraceable({ func });
}
}

function assertNonTraceableFunction<RunInput, RunOutput>(
func:
| RunnableFunc<RunInput, RunOutput | Runnable<RunInput, RunOutput>>
| TraceableFunction<
RunnableFunc<RunInput, RunOutput | Runnable<RunInput, RunOutput>>
>
): asserts func is RunnableFunc<
RunInput,
RunOutput | Runnable<RunInput, RunOutput>
> {
if (isTraceableFunction(func)) {
throw new Error(
"RunnableLambda requires a function that is not wrapped in traceable higher-order function. This shouldn't happen."
);
}
}

/**
* A runnable that runs a callable.
*/
Expand All @@ -2081,14 +2171,42 @@ export class RunnableLambda<RunInput, RunOutput> extends Runnable<
>;

constructor(fields: {
func: RunnableFunc<RunInput, RunOutput | Runnable<RunInput, RunOutput>>;
func:
| RunnableFunc<RunInput, RunOutput | Runnable<RunInput, RunOutput>>
| TraceableFunction<
RunnableFunc<RunInput, RunOutput | Runnable<RunInput, RunOutput>>
>;
}) {
if (isTraceableFunction(fields.func)) {
// eslint-disable-next-line no-constructor-return
return RunnableTraceable.from(fields.func) as unknown as RunnableLambda<
dqbd marked this conversation as resolved.
Show resolved Hide resolved
RunInput,
RunOutput
>;
}

super(fields);

assertNonTraceableFunction(fields.func);
this.func = fields.func;
}

static from<RunInput, RunOutput>(
func: RunnableFunc<RunInput, RunOutput | Runnable<RunInput, RunOutput>>
): RunnableLambda<RunInput, RunOutput>;

static from<RunInput, RunOutput>(
func: TraceableFunction<
RunnableFunc<RunInput, RunOutput | Runnable<RunInput, RunOutput>>
>
): RunnableLambda<RunInput, RunOutput>;

static from<RunInput, RunOutput>(
func:
| RunnableFunc<RunInput, RunOutput | Runnable<RunInput, RunOutput>>
| TraceableFunction<
RunnableFunc<RunInput, RunOutput | Runnable<RunInput, RunOutput>>
>
): RunnableLambda<RunInput, RunOutput> {
return new RunnableLambda({
func,
Expand Down Expand Up @@ -2141,7 +2259,7 @@ export class RunnableLambda<RunInput, RunOutput> extends Runnable<
}
}
output = finalOutput as typeof output;
} else if (isIterator(output)) {
} else if (isIterableIterator(output)) {
let finalOutput: RunOutput | undefined;
for (const chunk of consumeIteratorInContext(
childConfig,
Expand Down Expand Up @@ -2233,7 +2351,7 @@ export class RunnableLambda<RunInput, RunOutput> extends Runnable<
for await (const chunk of consumeAsyncIterableInContext(config, output)) {
yield chunk as RunOutput;
}
} else if (isIterator(output)) {
} else if (isIterableIterator(output)) {
for (const chunk of consumeIteratorInContext(config, output)) {
yield chunk as RunOutput;
}
Expand Down
10 changes: 9 additions & 1 deletion langchain-core/src/runnables/iter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { AsyncLocalStorageProviderSingleton } from "../singletons/index.js";
import { RunnableConfig } from "./config.js";

export function isIterator(thing: unknown): thing is IterableIterator<unknown> {
export function isIterableIterator(
thing: unknown
): thing is IterableIterator<unknown> {
return (
typeof thing === "object" &&
thing !== null &&
Expand All @@ -11,6 +13,12 @@ export function isIterator(thing: unknown): thing is IterableIterator<unknown> {
);
}

export const isIterator = (x: unknown): x is Iterator<unknown> =>
x != null &&
typeof x === "object" &&
"next" in x &&
typeof x.next === "function";

export function isAsyncIterable(
thing: unknown
): thing is AsyncIterable<unknown> {
Expand Down
45 changes: 45 additions & 0 deletions langchain-core/src/tracers/tracer_langchain.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { Client } from "langsmith";
import { RunTree } from "langsmith/run_trees";
import { getCurrentRunTree } from "langsmith/singletons/traceable";

import {
BaseRun,
RunCreate,
Expand Down Expand Up @@ -57,6 +60,40 @@ export class LangChainTracer
getEnvironmentVariable("LANGCHAIN_SESSION");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there! I noticed that the recent code changes in tracer_langchain.ts explicitly access an environment variable using the getEnvironmentVariable function. I've flagged this for your review to ensure it aligns with our environment variable usage guidelines. Keep up the great work!

dqbd marked this conversation as resolved.
Show resolved Hide resolved
this.exampleId = exampleId;
this.client = client ?? new Client({});

// if we're inside traceable, we can obtain the traceable tree
// and populate the run map, which is used to correctly
// infer dotted order and execution order
const traceableTree = this.getTraceableRunTree();
if (traceableTree) {
let rootRun: RunTree = traceableTree;
const visited = new Set<string>();
while (rootRun.parent_run) {
if (visited.has(rootRun.id)) break;
visited.add(rootRun.id);

if (!rootRun.parent_run) break;
rootRun = rootRun.parent_run as RunTree;
}
visited.clear();

const queue = [rootRun];
while (queue.length > 0) {
const current = queue.shift();
if (!current || visited.has(current.id)) continue;
visited.add(current.id);

// @ts-expect-error Types of property 'events' are incompatible.
this.runMap.set(current.id, current);
if (current.child_runs) {
queue.push(...current.child_runs);
}
}

this.client = traceableTree.client ?? this.client;
this.projectName = traceableTree.project_name ?? this.projectName;
this.exampleId = traceableTree.reference_example_id ?? this.exampleId;
}
}

private async _convertToCreate(
Expand Down Expand Up @@ -102,4 +139,12 @@ export class LangChainTracer
getRun(id: string): Run | undefined {
return this.runMap.get(id);
}

getTraceableRunTree(): RunTree | undefined {
try {
return getCurrentRunTree();
} catch {
return undefined;
}
}
}
2 changes: 1 addition & 1 deletion langchain/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@
"js-yaml": "^4.1.0",
dqbd marked this conversation as resolved.
Show resolved Hide resolved
"jsonpointer": "^5.0.1",
"langchainhub": "~0.0.8",
"langsmith": "~0.1.7",
"langsmith": "~0.1.30",
"ml-distance": "^4.0.0",
"openapi-types": "^12.1.3",
"p-retry": "4",
Expand Down
4 changes: 2 additions & 2 deletions langchain/src/smith/runner_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {
} from "langsmith";
import { EvaluationResult, RunEvaluator } from "langsmith/evaluation";
import { DataType } from "langsmith/schemas";
import type { TraceableFunction } from "langsmith/traceable";
import type { TraceableFunction } from "langsmith/singletons/traceable";
dqbd marked this conversation as resolved.
Show resolved Hide resolved
import { LLMStringEvaluator } from "../evaluation/base.js";
import { loadEvaluator } from "../evaluation/loader.js";
import { EvaluatorType } from "../evaluation/types.js";
Expand Down Expand Up @@ -165,7 +165,7 @@ class CallbackManagerRunTree extends RunTree {
this.callbackManager = callbackManager;
}

async createChild(config: RunTreeConfig): Promise<CallbackManagerRunTree> {
createChild(config: RunTreeConfig): CallbackManagerRunTree {
jacoblee93 marked this conversation as resolved.
Show resolved Hide resolved
const child = new CallbackManagerRunTree(
{
...config,
Expand Down
2 changes: 1 addition & 1 deletion libs/langchain-community/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"flat": "^5.0.2",
dqbd marked this conversation as resolved.
Show resolved Hide resolved
"js-yaml": "^4.1.0",
"langchain": "0.2.3",
"langsmith": "~0.1.1",
"langsmith": "~0.1.30",
"uuid": "^9.0.0",
"zod": "^3.22.3",
"zod-to-json-schema": "^3.22.5"
Expand Down
Loading
Loading