Skip to content

Commit

Permalink
Implement autobatching for the JS client
Browse files Browse the repository at this point in the history
  • Loading branch information
jacoblee93 committed Jan 26, 2024
1 parent 4aa390d commit 0b6a9e9
Show file tree
Hide file tree
Showing 3 changed files with 310 additions and 42 deletions.
3 changes: 2 additions & 1 deletion js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"name": "langsmith",
"version": "0.0.63",
"description": "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform.",
"packageManager": "[email protected]",
"files": [
"dist/",
"client.cjs",
Expand Down Expand Up @@ -121,4 +122,4 @@
},
"./package.json": "./package.json"
}
}
}
298 changes: 273 additions & 25 deletions js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ interface ClientConfig {
callerOptions?: AsyncCallerParams;
timeout_ms?: number;
webUrl?: string;
autoBatchTracing?: boolean;
}

interface ListRunsParams {
Expand Down Expand Up @@ -106,6 +107,10 @@ interface CreateRunParams {
revision_id?: string;
}

interface UpdateRunParams extends RunUpdate {
id?: string;
}

interface projectOptions {
projectName?: string;
projectId?: string;
Expand All @@ -120,6 +125,11 @@ export type CreateExampleOptions = {
exampleId?: string;
};

type AutoBatchQueueItem = {
action: "create" | "update";
item: RunCreate | RunUpdate;
};

// utility functions
const isLocalhost = (url: string): boolean => {
const strippedUrl = url.replace("http://", "").replace("https://", "");
Expand Down Expand Up @@ -178,6 +188,59 @@ function assertUuid(str: string): void {
}
}

function prepareRunCreateOrUpdateInputs(run: RunCreate): RunCreate;
function prepareRunCreateOrUpdateInputs(run: RunUpdate): RunUpdate;
function prepareRunCreateOrUpdateInputs(run: RunCreate | RunUpdate) {
let runParams = { ...run };
if (runParams.inputs !== undefined) {
runParams.inputs = hideInputs(runParams.inputs);
}
if (runParams.outputs !== undefined) {
runParams.outputs = hideOutputs(runParams.outputs);
}
return runParams;
}

async function mergeRuntimeEnvIntoRunCreates(runs: RunCreate[]) {
const runtimeEnv = await getRuntimeEnvironment();
const envVars = getLangChainEnvVarsMetadata();
return runs.map((run) => {
const extra = run.extra ?? {};
const metadata = extra.metadata;
run.extra = {
...extra,
runtime: {
...runtimeEnv,
...extra?.runtime,
},
metadata: {
...envVars,
...(envVars.revision_id || run.revision_id
? { revision_id: run.revision_id ?? envVars.revision_id }
: {}),
...metadata,
},
};
return run;
});
}

function getTracingSamplingRate() {
const samplingRateStr = getEnvironmentVariable(
"LANGCHAIN_TRACING_SAMPLING_RATE"
);
if (samplingRateStr === undefined) {
return undefined;
}
const samplingRate = parseFloat(samplingRateStr);
if (samplingRate < 0 || samplingRate > 1) {
throw new Error(
`LANGCHAIN_TRACING_SAMPLING_RATE must be between 0 and 1 if set. Got: ${samplingRate}`
);
}
return samplingRate;
}

export class Client {
private apiKey?: string;

Expand All @@ -187,19 +250,37 @@ export class Client {

private caller: AsyncCaller;

private tracingSampleRate?: number;

private sampledPostUuids = new Set();

private timeout_ms: number;

private _tenantId: string | null = null;

private autoBatchTracing = true;

private pendingAutoBatchedRuns: AutoBatchQueueItem[] = [];

private pendingAutoBatchedRunLimit = 100;

private autoBatchTimeout: ReturnType<typeof setTimeout> | undefined;

private autoBatchInitialDelayMs = 250;

private autoBatchAggregationDelayMs = 50;

constructor(config: ClientConfig = {}) {
const defaultConfig = Client.getDefaultClientConfig();

this.tracingSampleRate = getTracingSamplingRate();
this.apiUrl = trimQuotes(config.apiUrl ?? defaultConfig.apiUrl) ?? "";
this.apiKey = trimQuotes(config.apiKey ?? defaultConfig.apiKey);
this.webUrl = trimQuotes(config.webUrl ?? defaultConfig.webUrl);
this.validateApiKeyIfHosted();
this.timeout_ms = config.timeout_ms ?? 12_000;
this.caller = new AsyncCaller(config.callerOptions ?? {});
this.autoBatchTracing = config.autoBatchTracing ?? this.autoBatchTracing;
}

public static getDefaultClientConfig(): {
Expand Down Expand Up @@ -328,7 +409,11 @@ export class Client {
while (true) {
const response = await this.caller.call(fetch, `${this.apiUrl}${path}`, {
method: requestMethod,
headers: { ...this.headers, "Content-Type": "application/json" },
headers: {
...this.headers,
"Content-Type": "application/json",
Accept: "application/json",
},
signal: AbortSignal.timeout(this.timeout_ms),
body: JSON.stringify(bodyParams),
});
Expand All @@ -351,46 +436,196 @@ export class Client {
}
}

private _filterForSampling(
runs: CreateRunParams[] | UpdateRunParams[],
patch = false
) {
if (this.tracingSampleRate === undefined) {
return runs;
}

if (patch) {
let sampled = [];
for (let run of runs) {
if (this.sampledPostUuids.has(run.id)) {
sampled.push(run);
this.sampledPostUuids.delete(run.id);
}
}
return sampled;
} else {
let sampled = [];
for (let run of runs) {
if (Math.random() < this.tracingSampleRate) {
sampled.push(run);
this.sampledPostUuids.add(run.id);
}
}
return sampled;
}
}

private async triggerAutoBatchSend(runs?: AutoBatchQueueItem[]) {
let batch = runs;
if (batch === undefined) {
batch = this.pendingAutoBatchedRuns.slice(
0,
this.pendingAutoBatchedRunLimit
);
this.pendingAutoBatchedRuns = this.pendingAutoBatchedRuns.slice(
this.pendingAutoBatchedRunLimit
);
}
await this.batchIngestRuns({
runCreates: batch
.filter((item) => item.action === "create")
.map((item) => item.item) as RunCreate[],
runUpdates: batch
.filter((item) => item.action === "update")
.map((item) => item.item) as RunUpdate[],
});
}

private appendRunCreateToAutoBatchQueue(item: AutoBatchQueueItem) {
clearTimeout(this.autoBatchTimeout);
this.autoBatchTimeout = undefined;
this.pendingAutoBatchedRuns.push(item);
while (
this.pendingAutoBatchedRuns.length >= this.pendingAutoBatchedRunLimit
) {
const batch = this.pendingAutoBatchedRuns.slice(
0,
this.pendingAutoBatchedRunLimit
);
this.pendingAutoBatchedRuns = this.pendingAutoBatchedRuns.slice(
this.pendingAutoBatchedRunLimit
);
void this.triggerAutoBatchSend(batch);
}
if (this.pendingAutoBatchedRuns.length > 0) {
if (!this.autoBatchTimeout) {
this.autoBatchTimeout = setTimeout(
this.triggerAutoBatchSend,

Check failure on line 508 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Promise returned in function argument where a void return was expected
this.autoBatchInitialDelayMs
);
} else {
this.autoBatchTimeout = setTimeout(
this.triggerAutoBatchSend,

Check failure on line 513 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Promise returned in function argument where a void return was expected
this.autoBatchAggregationDelayMs
);
}
}
}

public async createRun(run: CreateRunParams): Promise<void> {
if (!this._filterForSampling([run]).length) {
return;
}
const headers = { ...this.headers, "Content-Type": "application/json" };
const extra = run.extra ?? {};
const metadata = extra.metadata;
const runtimeEnv = await getRuntimeEnvironment();
const envVars = getLangChainEnvVarsMetadata();
const session_name = run.project_name;

delete run.project_name;
const runCreate: RunCreate = {
const runCreate: RunCreate = prepareRunCreateOrUpdateInputs({
session_name,
...run,
extra: {
...run.extra,
runtime: {
...runtimeEnv,
...extra.runtime,
},
metadata: {
...envVars,
...(envVars.revision_id || run.revision_id
? { revision_id: run.revision_id ?? envVars.revision_id }
: {}),
...metadata,
},
},
};
runCreate.inputs = hideInputs(runCreate.inputs);
if (runCreate.outputs) {
runCreate.outputs = hideOutputs(runCreate.outputs);
execution_order: run.execution_order ?? 1,
});
if (
this.autoBatchTracing &&
runCreate.trace_id !== undefined &&
runCreate.dotted_order !== undefined
) {
this.appendRunCreateToAutoBatchQueue({
action: "create",
item: runCreate,
});
return;
}
const mergedRunCreateParams = await mergeRuntimeEnvIntoRunCreates([
runCreate,
]);

const response = await this.caller.call(fetch, `${this.apiUrl}/runs`, {
method: "POST",
headers,
body: JSON.stringify(runCreate),
body: JSON.stringify(mergedRunCreateParams[0]),
signal: AbortSignal.timeout(this.timeout_ms),
});
await raiseForStatus(response, "create run");
}

/**
* Batch ingest/upsert multiple runs in the Langsmith system.
* @param runs
*/
public async batchIngestRuns({
runCreates,
runUpdates,
}: {
runCreates?: RunCreate[];
runUpdates?: RunUpdate[];
}) {
if (runCreates === undefined && runUpdates === undefined) {
return;
}
let preparedCreateParams =
(runCreates?.map(prepareRunCreateOrUpdateInputs) as RunCreate[]) ?? [];
let preparedUpdateParams =
(runUpdates?.map(prepareRunCreateOrUpdateInputs) as RunUpdate[]) ?? [];

if (preparedCreateParams.length > 0 && preparedUpdateParams.length > 0) {
const createById = preparedCreateParams.reduce(
(params: Record<string, RunCreate>, run) => {
if (!run.id) {
return params;
}
params[run.id] = run;
return params;
},
{}
);
const standaloneUpdates = [];
for (const updateParam of preparedUpdateParams) {
if (updateParam.id !== undefined && createById[updateParam.id]) {
createById[updateParam.id] = {
...createById[updateParam.id],
...updateParam,
};
} else {
standaloneUpdates.push(updateParam);
}
}
preparedCreateParams = Object.values(createById);
preparedUpdateParams = standaloneUpdates;
}
const body = {
post: this._filterForSampling(preparedCreateParams),
patch: this._filterForSampling(preparedUpdateParams, true),
};
if (!body.post.length && !body.patch.length) {
return;
}
preparedCreateParams = await mergeRuntimeEnvIntoRunCreates(
preparedCreateParams
);
const headers = {
...this.headers,
"Content-Type": "application/json",
Accept: "application/json",
};
const response = await this.caller.call(
fetch,
`${this.apiUrl}/runs/batch`,
{
method: "POST",
headers,
body: JSON.stringify(body),
signal: AbortSignal.timeout(this.timeout_ms),
}
);
await raiseForStatus(response, "batch create run");
}

public async updateRun(runId: string, run: RunUpdate): Promise<void> {
assertUuid(runId);
if (run.inputs) {
Expand All @@ -400,6 +635,19 @@ export class Client {
if (run.outputs) {
run.outputs = hideOutputs(run.outputs);
}
// TODO: Untangle types
const data: UpdateRunParams = { ...run, id: runId };
if (this._filterForSampling([data], true).length) {
return;
}
if (
this.autoBatchTracing &&
data.trace_id !== undefined &&
data.dotted_order !== undefined
) {
this.appendRunCreateToAutoBatchQueue({ action: "update", item: data });
return;
}
const headers = { ...this.headers, "Content-Type": "application/json" };
const response = await this.caller.call(
fetch,
Expand Down
Loading

0 comments on commit 0b6a9e9

Please sign in to comment.