Skip to content

Commit

Permalink
fix: streamed function/tool calls (#262)
Browse files Browse the repository at this point in the history
  • Loading branch information
hassiebp authored Jun 20, 2024
1 parent a2923f2 commit 6df266a
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 10 deletions.
82 changes: 82 additions & 0 deletions integration-test/langfuse-openai.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ describe("Langfuse-OpenAI-Integation", () => {
model: "gpt-3.5-turbo-instruct",
stream: true,
user: "[email protected]",
temperature: 0,
max_tokens: 300,
});
let content = "";
Expand Down Expand Up @@ -364,6 +365,87 @@ describe("Langfuse-OpenAI-Integation", () => {
expect(generation.statusMessage).toBeNull();
}, 10000);

it("Streamed Tools and Toolchoice Calling on openai", async () => {
const name = `Tools-and-Toolchoice-Streaming-${randomUUID()}`;
const client = observeOpenAI(openai, { generationName: name });
const stream = await client.chat.completions.create({
stream: true,
model: "gpt-3.5-turbo",
messages: [{ role: "user", content: "What's the weather like in Boston today?" }],
tool_choice: "auto",
tools: [
{
type: "function",
function: {
name: "get_current_weather",
description: "Get the current weather in a given location",
parameters: {
type: "object",
properties: {
location: {
type: "string",
description: "The city and state, e.g. San Francisco, CA",
},
unit: { type: "string", enum: ["celsius", "fahrenheit"] },
},
required: ["location"],
},
},
},
],
user: "[email protected]",
max_tokens: 300,
});

for await (const _ of stream) {
}

await client.flushAsync();

const response = await getGeneration(name);
expect(response.status).toBe(200);
const generation = response.data.data[0];
expect(generation.name).toBe(name);
expect(generation.modelParameters).toBeDefined();
expect(generation.modelParameters).toMatchObject({ user: "[email protected]", max_tokens: 300 });
expect(generation.usage).toBeDefined();
expect(generation.model).toBe("gpt-3.5-turbo");
expect(generation.totalTokens).toBeDefined();
expect(generation.promptTokens).toBeDefined();
expect(generation.completionTokens).toBeDefined();
expect(generation.input).toBeDefined();
expect(generation.input.messages).toMatchObject([
{ role: "user", content: "What's the weather like in Boston today?" },
]);
expect(generation.input.tools).toMatchObject([
{
type: "function",
function: {
name: "get_current_weather",
description: "Get the current weather in a given location",
parameters: {
type: "object",
properties: {
location: {
type: "string",
description: "The city and state, e.g. San Francisco, CA",
},
unit: { type: "string", enum: ["celsius", "fahrenheit"] },
},
required: ["location"],
},
},
},
]);
expect(generation.input.tool_choice).toBe("auto");

expect(generation.output).toBeDefined();
expect(generation.calculatedInputCost).toBeDefined();
expect(generation.calculatedOutputCost).toBeDefined();
expect(generation.calculatedTotalCost).toBeDefined();
expect(generation.statusMessage).toBeNull();
}, 10000);

it("Using a common OpenAI client for multiple requests", async () => {
const name = `Common-client-initialisation-${randomUUID()}`;
const client = observeOpenAI(openai, { generationName: name });
Expand Down
53 changes: 47 additions & 6 deletions langfuse/src/openai/parseOpenAI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,31 @@ export const parseUsage = (res: unknown): Usage | undefined => {
}
};

export const parseChunk = (rawChunk: unknown): string => {
export const parseChunk = (
rawChunk: unknown
):
| { isToolCall: false; data: string }
| { isToolCall: true; data: OpenAI.Chat.Completions.ChatCompletionChunk.Choice.Delta.ToolCall } => {
let isToolCall = false;
const _chunk = rawChunk as OpenAI.ChatCompletionChunk | OpenAI.Completions.Completion;
const chunkData = _chunk?.choices[0];

try {
if ("delta" in _chunk?.choices[0]) {
return _chunk.choices[0].delta?.content || "";
if ("delta" in chunkData && "tool_calls" in chunkData.delta && Array.isArray(chunkData.delta.tool_calls)) {
isToolCall = true;

return { isToolCall, data: chunkData.delta.tool_calls[0] };
}
if ("delta" in chunkData) {
return { isToolCall, data: chunkData.delta?.content || "" };
}

if ("text" in _chunk?.choices[0]) {
return _chunk?.choices[0].text || "";
if ("text" in chunkData) {
return { isToolCall, data: chunkData.text || "" };
}
} catch (e) {}

return "";
return { isToolCall: false, data: "" };
};

// Type guard to check if an unknown object is a UsageResponse
Expand All @@ -101,3 +112,33 @@ function hasCompletionUsage(obj: any): obj is { usage: OpenAI.CompletionUsage }
typeof obj.usage.total_tokens === "number"
);
}

export const getToolCallOutput = (
toolCallChunks: OpenAI.Chat.Completions.ChatCompletionChunk.Choice.Delta.ToolCall[]
): {
tool_calls: {
function: {
name: string;
arguments: string;
};
}[];
} => {
let name = "";
let toolArguments = "";

for (const toolCall of toolCallChunks) {
name = toolCall.function?.name || name;
toolArguments += toolCall.function?.arguments || "";
}

return {
tool_calls: [
{
function: {
name,
arguments: toolArguments,
},
},
],
};
};
15 changes: 11 additions & 4 deletions langfuse/src/openai/traceMethod.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import type OpenAI from "openai";
import type { LangfuseParent } from "./types";

import { LangfuseSingleton } from "./LangfuseSingleton";
import { parseChunk, parseCompletionOutput, parseInputArgs, parseUsage } from "./parseOpenAI";
import { getToolCallOutput, parseChunk, parseCompletionOutput, parseInputArgs, parseUsage } from "./parseOpenAI";
import { isAsyncIterable } from "./utils";
import type { LangfuseConfig } from "./types";

Expand Down Expand Up @@ -58,19 +59,25 @@ const wrapMethod = async <T extends GenericMethod>(
if (isAsyncIterable(res)) {
async function* tracedOutputGenerator(): AsyncGenerator<unknown, void, unknown> {
const response = res;
const processedChunks: string[] = [];
const textChunks: string[] = [];
const toolCallChunks: OpenAI.Chat.Completions.ChatCompletionChunk.Choice.Delta.ToolCall[] = [];
let completionStartTime: Date | null = null;

for await (const rawChunk of response as AsyncIterable<unknown>) {
completionStartTime = completionStartTime ?? new Date();

const processedChunk = parseChunk(rawChunk);
processedChunks.push(processedChunk);

if (!processedChunk.isToolCall) {
textChunks.push(processedChunk.data);
} else {
toolCallChunks.push(processedChunk.data);
}

yield rawChunk;
}

const output = processedChunks.join("");
const output = toolCallChunks.length > 0 ? getToolCallOutput(toolCallChunks) : textChunks.join("");

langfuseParent.generation({
...observationData,
Expand Down

0 comments on commit 6df266a

Please sign in to comment.