diff --git a/js/plugins/google-cloud/src/telemetry/action.ts b/js/plugins/google-cloud/src/telemetry/action.ts index e3630bb1d..241a8f738 100644 --- a/js/plugins/google-cloud/src/telemetry/action.ts +++ b/js/plugins/google-cloud/src/telemetry/action.ts @@ -30,6 +30,8 @@ import { createCommonLogAttributes, extractErrorName, extractOuterFeatureNameFromPath, + truncate, + truncatePath, } from '../utils.js'; class ActionTelemetry implements Telemetry { @@ -79,8 +81,8 @@ class ActionTelemetry implements Telemetry { } if (subtype === 'tool' && logInputAndOutput) { - const input = attributes['genkit:input'] as string; - const output = attributes['genkit:output'] as string; + const input = truncate(attributes['genkit:input'] as string); + const output = truncate(attributes['genkit:output'] as string); const sessionId = attributes['genkit:sessionId'] as string; const threadName = attributes['genkit:threadName'] as string; @@ -159,7 +161,7 @@ class ActionTelemetry implements Telemetry { sessionId?: string, threadName?: string ) { - const path = toDisplayPath(qualifiedPath); + const path = truncatePath(toDisplayPath(qualifiedPath)); const sharedMetadata = { ...createCommonLogAttributes(span, projectId), path, diff --git a/js/plugins/google-cloud/src/telemetry/engagement.ts b/js/plugins/google-cloud/src/telemetry/engagement.ts index 77daef21d..cdf8d91cb 100644 --- a/js/plugins/google-cloud/src/telemetry/engagement.ts +++ b/js/plugins/google-cloud/src/telemetry/engagement.ts @@ -24,7 +24,7 @@ import { Telemetry, internalMetricNamespaceWrap, } from '../metrics.js'; -import { createCommonLogAttributes } from '../utils.js'; +import { createCommonLogAttributes, truncate } from '../utils.js'; class EngagementTelemetry implements Telemetry { /** @@ -81,7 +81,9 @@ class EngagementTelemetry implements Telemetry { feedbackValue: attributes['genkit:metadata:feedbackValue'], }; if (attributes['genkit:metadata:textFeedback']) { - metadata['textFeedback'] = attributes['genkit:metadata:textFeedback']; + metadata['textFeedback'] = truncate( + attributes['genkit:metadata:textFeedback'] as string + ); } logger.logStructured(`UserFeedback[${name}]`, metadata); } diff --git a/js/plugins/google-cloud/src/telemetry/feature.ts b/js/plugins/google-cloud/src/telemetry/feature.ts index 932bd40ae..2c3e3c980 100644 --- a/js/plugins/google-cloud/src/telemetry/feature.ts +++ b/js/plugins/google-cloud/src/telemetry/feature.ts @@ -26,7 +26,12 @@ import { Telemetry, internalMetricNamespaceWrap, } from '../metrics.js'; -import { createCommonLogAttributes, extractErrorName } from '../utils.js'; +import { + createCommonLogAttributes, + extractErrorName, + truncate, + truncatePath, +} from '../utils.js'; class FeaturesTelemetry implements Telemetry { /** @@ -77,8 +82,8 @@ class FeaturesTelemetry implements Telemetry { } if (logInputAndOutput) { - const input = attributes['genkit:input'] as string; - const output = attributes['genkit:output'] as string; + const input = truncate(attributes['genkit:input'] as string); + const output = truncate(attributes['genkit:output'] as string); const sessionId = attributes['genkit:sessionId'] as string; const threadName = attributes['genkit:threadName'] as string; @@ -146,7 +151,7 @@ class FeaturesTelemetry implements Telemetry { sessionId?: string, threadName?: string ) { - const path = toDisplayPath(qualifiedPath); + const path = truncatePath(toDisplayPath(qualifiedPath)); const sharedMetadata = { ...createCommonLogAttributes(span, projectId), path, diff --git a/js/plugins/google-cloud/src/telemetry/generate.ts b/js/plugins/google-cloud/src/telemetry/generate.ts index 484a322f6..ee81594de 100644 --- a/js/plugins/google-cloud/src/telemetry/generate.ts +++ b/js/plugins/google-cloud/src/telemetry/generate.ts @@ -39,6 +39,8 @@ import { createCommonLogAttributes, extractErrorName, extractOuterFeatureNameFromPath, + truncate, + truncatePath, } from '../utils.js'; type SharedDimensions = { @@ -59,9 +61,6 @@ class GenerateTelemetry implements Telemetry { */ private _N = internalMetricNamespaceWrap.bind(null, 'ai'); - /** The maximum length (in characters) of a logged prompt message. */ - private MAX_LOG_CONTENT_CHARS = 128_000; - private actionCounter = new MetricCounter(this._N('generate/requests'), { description: 'Counts calls to genkit generate actions.', valueType: ValueType.INT, @@ -136,7 +135,7 @@ class GenerateTelemetry implements Telemetry { projectId?: string ): void { const attributes = span.attributes; - const modelName = attributes['genkit:name'] as string; + const modelName = truncate(attributes['genkit:name'] as string, 1024); const path = (attributes['genkit:path'] as string) || ''; const input = 'genkit:input' in attributes @@ -152,8 +151,10 @@ class GenerateTelemetry implements Telemetry { : undefined; const errName = extractErrorName(span.events); - let featureName = (attributes['genkit:metadata:flow:name'] || - extractOuterFeatureNameFromPath(path)) as string; + let featureName = truncate( + (attributes['genkit:metadata:flow:name'] || + extractOuterFeatureNameFromPath(path)) as string + ); if (!featureName || featureName === '') { featureName = 'generate'; } @@ -162,7 +163,7 @@ class GenerateTelemetry implements Telemetry { const threadName = attributes['genkit:threadName'] as string; if (input) { - this.recordGenerateActionMetrics(modelName, featureName, path, input, { + this.recordGenerateActionMetrics(modelName, featureName, path, { response: output, errName, }); @@ -209,7 +210,6 @@ class GenerateTelemetry implements Telemetry { modelName: string, featureName: string, path: string, - input: GenerateRequestData, opts: { response?: GenerateResponseData; errName?: string; @@ -235,7 +235,7 @@ class GenerateTelemetry implements Telemetry { sessionId?: string, threadName?: string ) { - const path = toDisplayPath(qualifiedPath); + const path = truncatePath(toDisplayPath(qualifiedPath)); const sharedMetadata = { ...createCommonLogAttributes(span, projectId), model, @@ -251,7 +251,7 @@ class GenerateTelemetry implements Telemetry { topK: input.config?.topK, topP: input.config?.topP, maxOutputTokens: input.config?.maxOutputTokens, - stopSequences: input.config?.stopSequences, + stopSequences: truncate(input.config?.stopSequences, 1024), source: 'ts', sourceVersion: GENKIT_VERSION, }); @@ -267,7 +267,7 @@ class GenerateTelemetry implements Telemetry { sessionId?: string, threadName?: string ) { - const path = toDisplayPath(qualifiedPath); + const path = truncatePath(toDisplayPath(qualifiedPath)); const sharedMetadata = { ...createCommonLogAttributes(span, projectId), model, @@ -305,7 +305,7 @@ class GenerateTelemetry implements Telemetry { sessionId?: string, threadName?: string ) { - const path = toDisplayPath(qualifiedPath); + const path = truncatePath(toDisplayPath(qualifiedPath)); const sharedMetadata = { ...createCommonLogAttributes(span, projectId), model, @@ -321,7 +321,7 @@ class GenerateTelemetry implements Telemetry { message.content.forEach((part, partIdx) => { const partCounts = this.toPartCounts(partIdx, parts, 0, 1); const initial = output.finishMessage - ? { finishMessage: this.toPartLogText(output.finishMessage) } + ? { finishMessage: truncate(output.finishMessage) } : {}; logger.logStructured(`Output[${path}, ${model}] ${partCounts}`, { ...initial, @@ -364,10 +364,10 @@ class GenerateTelemetry implements Telemetry { private toPartLogContent(part: Part): string { if (part.text) { - return this.toPartLogText(part.text); + return truncate(part.text); } if (part.data) { - return this.toPartLogText(JSON.stringify(part.data)); + return truncate(JSON.stringify(part.data)); } if (part.media) { return this.toPartLogMedia(part); @@ -379,15 +379,11 @@ class GenerateTelemetry implements Telemetry { return this.toPartLogToolResponse(part); } if (part.custom) { - return this.toPartLogText(JSON.stringify(part.custom)); + return truncate(JSON.stringify(part.custom)); } return ''; } - private toPartLogText(text: string): string { - return text.substring(0, this.MAX_LOG_CONTENT_CHARS); - } - private toPartLogMedia(part: MediaPart): string { if (part.media.url.startsWith('data:')) { const splitIdx = part.media.url.indexOf('base64,'); @@ -400,7 +396,7 @@ class GenerateTelemetry implements Telemetry { .digest('hex'); return `${prefix}`; } - return this.toPartLogText(part.media.url); + return truncate(part.media.url); } private toPartLogToolRequest(part: ToolRequestPart): string { @@ -408,7 +404,7 @@ class GenerateTelemetry implements Telemetry { typeof part.toolRequest.input === 'string' ? part.toolRequest.input : JSON.stringify(part.toolRequest.input); - return this.toPartLogText( + return truncate( `Tool request: ${part.toolRequest.name}, ref: ${part.toolRequest.ref}, input: ${inputText}` ); } @@ -418,7 +414,7 @@ class GenerateTelemetry implements Telemetry { typeof part.toolResponse.output === 'string' ? part.toolResponse.output : JSON.stringify(part.toolResponse.output); - return this.toPartLogText( + return truncate( `Tool response: ${part.toolResponse.name}, ref: ${part.toolResponse.ref}, output: ${outputText}` ); } diff --git a/js/plugins/google-cloud/src/telemetry/path.ts b/js/plugins/google-cloud/src/telemetry/path.ts index 79e9fa5b1..495cc2105 100644 --- a/js/plugins/google-cloud/src/telemetry/path.ts +++ b/js/plugins/google-cloud/src/telemetry/path.ts @@ -31,6 +31,7 @@ import { extractErrorMessage, extractErrorName, extractErrorStack, + truncatePath, } from '../utils.js'; class PathsTelemetry implements Telemetry { @@ -123,7 +124,7 @@ class PathsTelemetry implements Telemetry { sessionId?: string, threadName?: string ) { - const displayPath = toDisplayPath(path); + const displayPath = truncatePath(toDisplayPath(path)); logger.logStructuredError(`Error[${displayPath}, ${errorName}]`, { ...createCommonLogAttributes(span, projectId), path: displayPath, @@ -207,7 +208,7 @@ class PathsTelemetry implements Telemetry { flowName: featureName, sessionId, threadName, - paths: flowPaths.map((p) => toDisplayPath(p.path)), + paths: flowPaths.map((p) => truncatePath(toDisplayPath(p.path))), }); flowPaths.forEach((p) => this.writePathMetric(featureName, p)); diff --git a/js/plugins/google-cloud/src/utils.ts b/js/plugins/google-cloud/src/utils.ts index bd3f63983..3235ac898 100644 --- a/js/plugins/google-cloud/src/utils.ts +++ b/js/plugins/google-cloud/src/utils.ts @@ -18,6 +18,17 @@ import { TraceFlags } from '@opentelemetry/api'; import { ReadableSpan, TimedEvent } from '@opentelemetry/sdk-trace-base'; import { resolveCurrentPrincipal } from './auth.js'; +/** + * The maximum length (in characters) of a logged input or output. + * This limit exists to align the logs with GCP logging size limits. + * */ +const MAX_LOG_CONTENT_CHARS = 128_000; + +/** + * The maximum length (in characters) of a flow path. + */ +const MAX_PATH_CHARS = 4096; + export function extractOuterFlowNameFromPath(path: string) { if (!path || path === '') { return ''; @@ -27,6 +38,17 @@ export function extractOuterFlowNameFromPath(path: string) { return flowName ? flowName[1] : ''; } +export function truncate( + text: string, + limit: number = MAX_LOG_CONTENT_CHARS +): string { + return text ? text.substring(0, limit) : text; +} + +export function truncatePath(path: string) { + return truncate(path, MAX_PATH_CHARS); +} + /** * Extract first feature name from a path * e.g. for /{myFlow,t:flow}/{myStep,t:flowStep}/{googleai/gemini-pro,t:action,s:model} @@ -47,7 +69,7 @@ export function extractErrorName(events: TimedEvent[]): string | undefined { .map((event) => { const attributes = event.attributes; return attributes - ? (attributes['exception.type'] as string) + ? truncate(attributes['exception.type'] as string, 1024) : ''; }) .at(0); @@ -59,7 +81,7 @@ export function extractErrorMessage(events: TimedEvent[]): string | undefined { .map((event) => { const attributes = event.attributes; return attributes - ? (attributes['exception.message'] as string) + ? truncate(attributes['exception.message'] as string, 4096) : ''; }) .at(0); @@ -71,7 +93,7 @@ export function extractErrorStack(events: TimedEvent[]): string | undefined { .map((event) => { const attributes = event.attributes; return attributes - ? (attributes['exception.stacktrace'] as string) + ? truncate(attributes['exception.stacktrace'] as string, 32_768) : ''; }) .at(0); diff --git a/js/plugins/google-cloud/tests/logs_test.ts b/js/plugins/google-cloud/tests/logs_test.ts index e4540c5c1..909740712 100644 --- a/js/plugins/google-cloud/tests/logs_test.ts +++ b/js/plugins/google-cloud/tests/logs_test.ts @@ -19,6 +19,7 @@ import { beforeAll, beforeEach, describe, + expect, it, jest, } from '@jest/globals'; @@ -31,6 +32,7 @@ import { __addTransportStreamForTesting, __forceFlushSpansForTesting, __getSpanExporterForTesting, + __useJsonFormatForTesting, enableGoogleCloudTelemetry, } from '../src/index.js'; @@ -56,6 +58,207 @@ jest.mock('../src/auth.js', () => { }; }); +describe('GoogleCloudLogs with truncation', () => { + let logLines = ''; + const logStream = new Writable(); + logStream._write = (chunk, encoding, next) => { + logLines = logLines += chunk.toString(); + next(); + }; + + let ai: Genkit; + + beforeAll(async () => { + process.env.GCLOUD_PROJECT = 'test'; + process.env.GENKIT_ENV = 'dev'; + __useJsonFormatForTesting(); + __addTransportStreamForTesting(logStream); + await enableGoogleCloudTelemetry({ + projectId: 'test', + forceDevExport: false, + metricExportIntervalMillis: 100, + metricExportTimeoutMillis: 100, + }); + ai = genkit({ + // Force GCP Plugin to use in-memory metrics exporter + plugins: [], + }); + await waitForLogsInit(ai, logLines); + }); + beforeEach(async () => { + logLines = ''; + __getSpanExporterForTesting().reset(); + }); + afterAll(async () => { + await ai.stopServers(); + }); + + it('truncates large output logs', async () => { + const testModel = createModel(ai, 'testModel', async () => { + return { + message: { + role: 'user', + content: [ + { + text: 'r'.repeat(130_000), + }, + ], + }, + finishReason: 'stop', + usage: { + inputTokens: 10, + outputTokens: 14, + inputCharacters: 8, + outputCharacters: 16, + inputImages: 1, + outputImages: 3, + }, + }; + }); + const testFlow = createFlowWithInput(ai, 'testFlow', async (input) => { + return await ai.run('sub1', async () => { + return await ai.run('sub2', async () => { + return await ai.generate({ + model: testModel, + prompt: `${input} prompt`, + config: { + temperature: 1.0, + topK: 3, + topP: 5, + maxOutputTokens: 7, + }, + }); + }); + }); + }); + + await testFlow('test'); + await getExportedSpans(); + + const logMessages = await getLogs(1, 100, logLines); + const logObjects = logMessages.map((l) => JSON.parse(l as string)); + const logObjectMessages = logObjects.map( + (structuredLog) => structuredLog.message + ); + + expect(logObjectMessages).toContain('Output[testFlow, testFlow]'); + + logObjects.map((structuredLog) => { + if (structuredLog.message === 'Output[testFlow, testFlow]') { + expect(structuredLog.content.length).toBe(128_000); + } + }); + }); + + it('truncates large input logs', async () => { + const testModel = createModel(ai, 'testModel', async () => { + return { + message: { + role: 'user', + content: [ + { + text: 'response', + }, + ], + }, + finishReason: 'stop', + usage: { + inputTokens: 10, + outputTokens: 14, + inputCharacters: 8, + outputCharacters: 16, + inputImages: 1, + outputImages: 3, + }, + }; + }); + const testFlow = createFlowWithInput(ai, 'testFlow', async (input) => { + return await ai.run('sub1', async () => { + return await ai.run('sub2', async () => { + return await ai.generate({ + model: testModel, + prompt: `${input} prompt`, + config: { + temperature: 1.0, + topK: 3, + topP: 5, + maxOutputTokens: 7, + }, + }); + }); + }); + }); + + await testFlow('t'.repeat(130_000)); + await getExportedSpans(); + + const logMessages = await getLogs(1, 100, logLines); + const logObjects = logMessages.map((l) => JSON.parse(l as string)); + const logObjectMessages = logObjects.map( + (structuredLog) => structuredLog.message + ); + + expect(logObjectMessages).toContain('Input[testFlow, testFlow]'); + + logObjects.map((structuredLog) => { + if (structuredLog.message === 'Input[testFlow, testFlow]') { + expect(structuredLog.content.length).toBe(128_000); + } + }); + }); + + it.only('truncates large model names', async () => { + const testModel = createModel(ai, 'm'.repeat(2046), async () => { + return { + message: { + role: 'user', + content: [ + { + text: 'response', + }, + ], + }, + finishReason: 'stop', + usage: { + inputTokens: 10, + outputTokens: 14, + inputCharacters: 8, + outputCharacters: 16, + inputImages: 1, + outputImages: 3, + }, + }; + }); + const testFlow = createFlowWithInput(ai, 'testFlow', async (input) => { + return await ai.run('sub1', async () => { + return await ai.run('sub2', async () => { + return await ai.generate({ + model: testModel, + prompt: `${input} prompt`, + config: { + temperature: 1.0, + topK: 3, + topP: 5, + maxOutputTokens: 7, + }, + }); + }); + }); + }); + + await testFlow('test'); + await getExportedSpans(); + + const logMessages = await getLogs(1, 100, logLines); + const logObjects = logMessages.map((l) => JSON.parse(l as string)); + const logObjectModels = logObjects.map( + (structuredLog) => structuredLog.model + ); + + expect(logObjectModels).toContain('m'.repeat(1024)); + }); +}); + describe('GoogleCloudLogs', () => { let logLines = ''; const logStream = new Writable();