From 6df266a9e5d091308ae32bdfe4c3c11006b59e88 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Thu, 20 Jun 2024 18:54:32 +0200 Subject: [PATCH] fix: streamed function/tool calls (#262) --- integration-test/langfuse-openai.spec.ts | 82 ++++++++++++++++++++++++ langfuse/src/openai/parseOpenAI.ts | 53 +++++++++++++-- langfuse/src/openai/traceMethod.ts | 15 +++-- 3 files changed, 140 insertions(+), 10 deletions(-) diff --git a/integration-test/langfuse-openai.spec.ts b/integration-test/langfuse-openai.spec.ts index c22901ee..99d9e034 100644 --- a/integration-test/langfuse-openai.spec.ts +++ b/integration-test/langfuse-openai.spec.ts @@ -177,6 +177,7 @@ describe("Langfuse-OpenAI-Integation", () => { model: "gpt-3.5-turbo-instruct", stream: true, user: "langfuse-user@gmail.com", + temperature: 0, max_tokens: 300, }); let content = ""; @@ -364,6 +365,87 @@ describe("Langfuse-OpenAI-Integation", () => { expect(generation.statusMessage).toBeNull(); }, 10000); + it("Streamed Tools and Toolchoice Calling on openai", async () => { + const name = `Tools-and-Toolchoice-Streaming-${randomUUID()}`; + const client = observeOpenAI(openai, { generationName: name }); + const stream = await client.chat.completions.create({ + stream: true, + model: "gpt-3.5-turbo", + messages: [{ role: "user", content: "What's the weather like in Boston today?" }], + tool_choice: "auto", + tools: [ + { + type: "function", + function: { + name: "get_current_weather", + description: "Get the current weather in a given location", + parameters: { + type: "object", + properties: { + location: { + type: "string", + description: "The city and state, e.g. San Francisco, CA", + }, + unit: { type: "string", enum: ["celsius", "fahrenheit"] }, + }, + required: ["location"], + }, + }, + }, + ], + user: "langfuse-user@gmail.com", + max_tokens: 300, + }); + + for await (const _ of stream) { + } + + await client.flushAsync(); + + const response = await getGeneration(name); + expect(response.status).toBe(200); + const generation = response.data.data[0]; + expect(generation.name).toBe(name); + expect(generation.modelParameters).toBeDefined(); + expect(generation.modelParameters).toMatchObject({ user: "langfuse-user@gmail.com", max_tokens: 300 }); + expect(generation.usage).toBeDefined(); + expect(generation.model).toBe("gpt-3.5-turbo"); + expect(generation.totalTokens).toBeDefined(); + expect(generation.promptTokens).toBeDefined(); + expect(generation.completionTokens).toBeDefined(); + expect(generation.input).toBeDefined(); + expect(generation.input.messages).toMatchObject([ + { role: "user", content: "What's the weather like in Boston today?" }, + ]); + expect(generation.input.tools).toMatchObject([ + { + type: "function", + function: { + name: "get_current_weather", + description: "Get the current weather in a given location", + parameters: { + type: "object", + properties: { + location: { + type: "string", + description: "The city and state, e.g. San Francisco, CA", + }, + unit: { type: "string", enum: ["celsius", "fahrenheit"] }, + }, + required: ["location"], + }, + }, + }, + ]); + expect(generation.input.tool_choice).toBe("auto"); + + expect(generation.output).toBeDefined(); + expect(generation.calculatedInputCost).toBeDefined(); + expect(generation.calculatedOutputCost).toBeDefined(); + expect(generation.calculatedTotalCost).toBeDefined(); + expect(generation.statusMessage).toBeNull(); + }, 10000); + it("Using a common OpenAI client for multiple requests", async () => { const name = `Common-client-initialisation-${randomUUID()}`; const client = observeOpenAI(openai, { generationName: name }); diff --git a/langfuse/src/openai/parseOpenAI.ts b/langfuse/src/openai/parseOpenAI.ts index 80dda610..b43590eb 100644 --- a/langfuse/src/openai/parseOpenAI.ts +++ b/langfuse/src/openai/parseOpenAI.ts @@ -74,20 +74,31 @@ export const parseUsage = (res: unknown): Usage | undefined => { } }; -export const parseChunk = (rawChunk: unknown): string => { +export const parseChunk = ( + rawChunk: unknown +): + | { isToolCall: false; data: string } + | { isToolCall: true; data: OpenAI.Chat.Completions.ChatCompletionChunk.Choice.Delta.ToolCall } => { + let isToolCall = false; const _chunk = rawChunk as OpenAI.ChatCompletionChunk | OpenAI.Completions.Completion; + const chunkData = _chunk?.choices[0]; try { - if ("delta" in _chunk?.choices[0]) { - return _chunk.choices[0].delta?.content || ""; + if ("delta" in chunkData && "tool_calls" in chunkData.delta && Array.isArray(chunkData.delta.tool_calls)) { + isToolCall = true; + + return { isToolCall, data: chunkData.delta.tool_calls[0] }; + } + if ("delta" in chunkData) { + return { isToolCall, data: chunkData.delta?.content || "" }; } - if ("text" in _chunk?.choices[0]) { - return _chunk?.choices[0].text || ""; + if ("text" in chunkData) { + return { isToolCall, data: chunkData.text || "" }; } } catch (e) {} - return ""; + return { isToolCall: false, data: "" }; }; // Type guard to check if an unknown object is a UsageResponse @@ -101,3 +112,33 @@ function hasCompletionUsage(obj: any): obj is { usage: OpenAI.CompletionUsage } typeof obj.usage.total_tokens === "number" ); } + +export const getToolCallOutput = ( + toolCallChunks: OpenAI.Chat.Completions.ChatCompletionChunk.Choice.Delta.ToolCall[] +): { + tool_calls: { + function: { + name: string; + arguments: string; + }; + }[]; +} => { + let name = ""; + let toolArguments = ""; + + for (const toolCall of toolCallChunks) { + name = toolCall.function?.name || name; + toolArguments += toolCall.function?.arguments || ""; + } + + return { + tool_calls: [ + { + function: { + name, + arguments: toolArguments, + }, + }, + ], + }; +}; diff --git a/langfuse/src/openai/traceMethod.ts b/langfuse/src/openai/traceMethod.ts index dc16636f..a8da4cb9 100644 --- a/langfuse/src/openai/traceMethod.ts +++ b/langfuse/src/openai/traceMethod.ts @@ -1,7 +1,8 @@ +import type OpenAI from "openai"; import type { LangfuseParent } from "./types"; import { LangfuseSingleton } from "./LangfuseSingleton"; -import { parseChunk, parseCompletionOutput, parseInputArgs, parseUsage } from "./parseOpenAI"; +import { getToolCallOutput, parseChunk, parseCompletionOutput, parseInputArgs, parseUsage } from "./parseOpenAI"; import { isAsyncIterable } from "./utils"; import type { LangfuseConfig } from "./types"; @@ -58,19 +59,25 @@ const wrapMethod = async ( if (isAsyncIterable(res)) { async function* tracedOutputGenerator(): AsyncGenerator { const response = res; - const processedChunks: string[] = []; + const textChunks: string[] = []; + const toolCallChunks: OpenAI.Chat.Completions.ChatCompletionChunk.Choice.Delta.ToolCall[] = []; let completionStartTime: Date | null = null; for await (const rawChunk of response as AsyncIterable) { completionStartTime = completionStartTime ?? new Date(); const processedChunk = parseChunk(rawChunk); - processedChunks.push(processedChunk); + + if (!processedChunk.isToolCall) { + textChunks.push(processedChunk.data); + } else { + toolCallChunks.push(processedChunk.data); + } yield rawChunk; } - const output = processedChunks.join(""); + const output = toolCallChunks.length > 0 ? getToolCallOutput(toolCallChunks) : textChunks.join(""); langfuseParent.generation({ ...observationData,