Skip to content

Commit

Permalink
Merge
Browse files Browse the repository at this point in the history
  • Loading branch information
hinthornw committed Nov 25, 2024
2 parents b75593f + c3053e6 commit 5d6a94d
Show file tree
Hide file tree
Showing 35 changed files with 1,492 additions and 369 deletions.
16 changes: 16 additions & 0 deletions .github/workflows/build_langsmith_pyo3_wheels.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: Build langsmith_pyo3 wheels

on:
push:
branches:
- main
pull_request:
branches:
- main
workflow_dispatch:

jobs:
hello-world:
runs-on: ubuntu-20.04
steps:
- run: echo 'hello world'
2 changes: 1 addition & 1 deletion .github/workflows/py-bench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
run: |
{
echo 'OUTPUT<<EOF'
make -s benchmark
make -s benchmark-fast
echo EOF
} >> "$GITHUB_OUTPUT"
- name: Compare benchmarks
Expand Down
5 changes: 3 additions & 2 deletions js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "langsmith",
"version": "0.2.5",
"version": "0.2.7",
"description": "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform.",
"packageManager": "[email protected]",
"files": [
Expand Down Expand Up @@ -114,7 +114,7 @@
"@faker-js/faker": "^8.4.1",
"@jest/globals": "^29.5.0",
"@langchain/core": "^0.3.14",
"@langchain/langgraph": "^0.2.18",
"@langchain/langgraph": "^0.2.20",
"@langchain/openai": "^0.3.11",
"@opentelemetry/sdk-trace-base": "^1.26.0",
"@opentelemetry/sdk-trace-node": "^1.26.0",
Expand All @@ -133,6 +133,7 @@
"eslint-plugin-prettier": "^4.2.1",
"jest": "^29.5.0",
"langchain": "^0.3.3",
"node-fetch": "^2.7.0",
"openai": "^4.67.3",
"prettier": "^2.8.8",
"ts-jest": "^29.1.0",
Expand Down
46 changes: 32 additions & 14 deletions js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1150,33 +1150,51 @@ export class Client {

private async _sendMultipartRequest(parts: MultipartPart[], context: string) {
try {
const formData = new FormData();
// Create multipart form data manually using Blobs
const boundary =
"----LangSmithFormBoundary" + Math.random().toString(36).slice(2);
const chunks: Blob[] = [];

for (const part of parts) {
formData.append(part.name, part.payload);
// Add field boundary
chunks.push(new Blob([`--${boundary}\r\n`]));
chunks.push(
new Blob([
`Content-Disposition: form-data; name="${part.name}"\r\n`,
`Content-Type: ${part.payload.type}\r\n\r\n`,
])
);
chunks.push(part.payload);
chunks.push(new Blob(["\r\n"]));
}
// Log the form data
await this.batchIngestCaller.call(

// Add final boundary
chunks.push(new Blob([`--${boundary}--\r\n`]));

// Combine all chunks into a single Blob
const body = new Blob(chunks);

// Convert Blob to ArrayBuffer for compatibility
const arrayBuffer = await body.arrayBuffer();

const res = await this.batchIngestCaller.call(
_getFetchImplementation(),
`${this.apiUrl}/runs/multipart`,
{
method: "POST",
headers: {
...this.headers,
"Content-Type": `multipart/form-data; boundary=${boundary}`,
},
body: formData,
body: arrayBuffer,
signal: AbortSignal.timeout(this.timeout_ms),
...this.fetchOptions,
}
);
} catch (e) {
let errorMessage = "Failed to multipart ingest runs";
// eslint-disable-next-line no-instanceof/no-instanceof
if (e instanceof Error) {
errorMessage += `: ${e.stack || e.message}`;
} else {
errorMessage += `: ${String(e)}`;
}
console.warn(`${errorMessage.trim()}\n\nContext: ${context}`);
await raiseForStatus(res, "ingest multipart runs", true);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (e: any) {
console.warn(`${e.message.trim()}\n\nContext: ${context}`);
}
}

Expand Down
2 changes: 1 addition & 1 deletion js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ export { RunTree, type RunTreeConfig } from "./run_trees.js";
export { overrideFetchImplementation } from "./singletons/fetch.js";

// Update using yarn bump-version
export const __version__ = "0.2.5";
export const __version__ = "0.2.7";
32 changes: 32 additions & 0 deletions js/src/tests/batch_client.int.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { v4 as uuidv4 } from "uuid";
import * as fs from "node:fs";
import * as path from "node:path";
import { fileURLToPath } from "node:url";
import nodeFetch from "node-fetch";

import { Client } from "../client.js";
import { RunTree, convertToDottedOrderFormat } from "../run_trees.js";
Expand All @@ -11,6 +12,7 @@ import {
waitUntilRunFound,
} from "./utils.js";
import { traceable } from "../traceable.js";
import { overrideFetchImplementation } from "../singletons/fetch.js";

test.concurrent(
"Test persist update run",
Expand Down Expand Up @@ -229,6 +231,7 @@ test.concurrent(
outputs: { output: ["Hi"] },
dotted_order: dottedOrder,
trace_id: runId,
end_time: Math.floor(new Date().getTime() / 1000),
});

await Promise.all([
Expand Down Expand Up @@ -282,3 +285,32 @@ test.skip("very large runs", async () => {

await langchainClient.deleteProject({ projectName });
}, 180_000);

test("multipart should work with overridden node-fetch", async () => {
overrideFetchImplementation(nodeFetch);

const langchainClient = new Client({
autoBatchTracing: true,
timeout_ms: 120_000,
});

const projectName = "__test_node_fetch" + uuidv4().substring(0, 4);
await deleteProject(langchainClient, projectName);

await traceable(
async () => {
return "testing with node fetch";
},
{
project_name: projectName,
client: langchainClient,
tracingEnabled: true,
}
)();

await langchainClient.awaitPendingTraceBatches();

await Promise.all([waitUntilProjectFound(langchainClient, projectName)]);

await langchainClient.deleteProject({ projectName });
});
46 changes: 35 additions & 11 deletions js/src/tests/batch_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,36 @@ import { convertToDottedOrderFormat } from "../run_trees.js";
import { _getFetchImplementation } from "../singletons/fetch.js";
import { RunCreate } from "../schemas.js";

const parseMockRequestBody = async (body: string | FormData) => {
const parseMockRequestBody = async (body: string | ArrayBuffer) => {
if (typeof body === "string") {
return JSON.parse(body);
}
// Typing is missing
const entries: any[] = Array.from((body as any).entries());
const rawMultipart = new TextDecoder().decode(body);
// Parse the multipart form data boundary from the raw text
const boundary = rawMultipart.split("\r\n")[0].trim();
// Split the multipart body into individual parts
const parts = rawMultipart.split(boundary).slice(1, -1);

const entries: [string, any][] = parts.map((part) => {
const [headers, ...contentParts] = part.trim().split("\r\n\r\n");
const content = contentParts.join("\r\n\r\n");
// Extract the name from Content-Disposition header
const nameMatch = headers.match(/name="([^"]+)"/);
const name = nameMatch ? nameMatch[1] : "";
return [name, content.trim()];
});
const reconstructedBody: any = {
post: [],
patch: [],
};
for (const [key, value] of entries) {
let [method, id, type] = key.split(".");
const text = await value.text();
let parsedValue;
try {
parsedValue = JSON.parse(text);
parsedValue = JSON.parse(value);
} catch (e) {
parsedValue = text;
parsedValue = value;
}
// if (method === "attachment") {
// for (const item of reconstructedBody.post) {
Expand Down Expand Up @@ -131,7 +143,7 @@ describe.each(ENDPOINT_TYPES)(
_getFetchImplementation(),
expectedTraceURL,
expect.objectContaining({
body: expect.any(endpointType === "batch" ? String : FormData),
body: expect.any(endpointType === "batch" ? String : ArrayBuffer),
})
);
});
Expand Down Expand Up @@ -245,7 +257,7 @@ describe.each(ENDPOINT_TYPES)(
_getFetchImplementation(),
expectedTraceURL,
expect.objectContaining({
body: expect.any(endpointType === "batch" ? String : FormData),
body: expect.any(endpointType === "batch" ? String : ArrayBuffer),
})
);
});
Expand Down Expand Up @@ -326,7 +338,7 @@ describe.each(ENDPOINT_TYPES)(
_getFetchImplementation(),
expectedTraceURL,
expect.objectContaining({
body: expect.any(endpointType === "batch" ? String : FormData),
body: expect.any(endpointType === "batch" ? String : ArrayBuffer),
})
);
});
Expand Down Expand Up @@ -612,9 +624,21 @@ describe.each(ENDPOINT_TYPES)(
const calledRequestParam: any = callSpy.mock.calls[0][2];
const calledRequestParam2: any = callSpy.mock.calls[1][2];

const firstBatchBody = await parseMockRequestBody(
calledRequestParam?.body
);
const secondBatchBody = await parseMockRequestBody(
calledRequestParam2?.body
);

const initialBatchBody =
firstBatchBody.post.length === 10 ? firstBatchBody : secondBatchBody;
const followupBatchBody =
firstBatchBody.post.length === 10 ? secondBatchBody : firstBatchBody;

// Queue should drain as soon as size limit is reached,
// sending both batches
expect(await parseMockRequestBody(calledRequestParam?.body)).toEqual({
expect(initialBatchBody).toEqual({
post: runIds.slice(0, 10).map((runId, i) =>
expect.objectContaining({
id: runId,
Expand All @@ -628,7 +652,7 @@ describe.each(ENDPOINT_TYPES)(
patch: [],
});

expect(await parseMockRequestBody(calledRequestParam2?.body)).toEqual({
expect(followupBatchBody).toEqual({
post: runIds.slice(10).map((runId, i) =>
expect.objectContaining({
id: runId,
Expand Down Expand Up @@ -903,7 +927,7 @@ describe.each(ENDPOINT_TYPES)(
_getFetchImplementation(),
expectedTraceURL,
expect.objectContaining({
body: expect.any(endpointType === "batch" ? String : FormData),
body: expect.any(endpointType === "batch" ? String : ArrayBuffer),
})
);
});
Expand Down
2 changes: 1 addition & 1 deletion js/src/tests/vercel.int.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ test("generateText with image", async () => {
expect(storedRun.id).toEqual(runId);
});

test("streamText", async () => {
test.skip("streamText", async () => {
const runId = uuid();
const result = await streamText({
model: openai("gpt-4o-mini"),
Expand Down
7 changes: 6 additions & 1 deletion js/src/utils/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,16 @@ export function getLangChainEnvVarsMetadata(): Record<string, string> {
"LANGCHAIN_TRACING_V2",
"LANGCHAIN_PROJECT",
"LANGCHAIN_SESSION",
"LANGSMITH_API_KEY",
"LANGSMITH_ENDPOINT",
"LANGSMITH_TRACING_V2",
"LANGSMITH_PROJECT",
"LANGSMITH_SESSION",
];

for (const [key, value] of Object.entries(allEnvVars)) {
if (
key.startsWith("LANGCHAIN_") &&
(key.startsWith("LANGCHAIN_") || key.startsWith("LANGSMITH_")) &&
typeof value === "string" &&
!excluded.includes(key) &&
!key.toLowerCase().includes("key") &&
Expand Down
Loading

0 comments on commit 5d6a94d

Please sign in to comment.