From a9c706f99e83ac0ec0e3508930138e4e06d5b160 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Tue, 10 Dec 2024 11:26:53 -0800 Subject: [PATCH] [INFOPLAT-1592] Address high CPU utilization when telemetry is enabled (#967) * [loop/EnvConfig] parse sets TelemetryEmitterBatchProcessor, TelemetryEmitterExportTimeout * [beholder/client] BatchProcessor ExportTimeout option is non-zero value * [loop/EnvConfig] Use maps.Equal in tests --------- Co-authored-by: Patrick --- pkg/beholder/client.go | 20 +++++-- pkg/beholder/httpclient.go | 20 +++++-- pkg/loop/config.go | 8 +++ pkg/loop/config_test.go | 108 +++++++++++++++++++++++++++++-------- 4 files changed, 123 insertions(+), 33 deletions(-) diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index f707d3346..c6bac815a 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -14,7 +14,6 @@ import ( sdklog "go.opentelemetry.io/otel/sdk/log" sdkmetric "go.opentelemetry.io/otel/sdk/metric" sdkresource "go.opentelemetry.io/otel/sdk/resource" - "go.opentelemetry.io/otel/sdk/trace" sdktrace "go.opentelemetry.io/otel/sdk/trace" oteltrace "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/credentials" @@ -112,9 +111,13 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro // Logger var loggerProcessor sdklog.Processor if cfg.LogBatchProcessor { + batchProcessorOpts := []sdklog.BatchProcessorOption{} + if cfg.LogExportTimeout > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.LogExportTimeout)) // Default is 30s + } loggerProcessor = sdklog.NewBatchProcessor( sharedLogExporter, - sdklog.WithExportTimeout(cfg.LogExportTimeout), // Default is 30s + batchProcessorOpts..., ) } else { loggerProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) @@ -152,9 +155,13 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro // Message Emitter var messageLogProcessor sdklog.Processor if cfg.EmitterBatchProcessor { + batchProcessorOpts := []sdklog.BatchProcessorOption{} + if cfg.EmitterExportTimeout > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.EmitterExportTimeout)) // Default is 30s + } messageLogProcessor = sdklog.NewBatchProcessor( sharedLogExporter, - sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s + batchProcessorOpts..., ) } else { messageLogProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) @@ -314,9 +321,12 @@ func newTracerProvider(config Config, resource *sdkresource.Resource, creds cred if err != nil { return nil, err } - + batcherOpts := []sdktrace.BatchSpanProcessorOption{} + if config.TraceBatchTimeout > 0 { + batcherOpts = append(batcherOpts, sdktrace.WithBatchTimeout(config.TraceBatchTimeout)) // Default is 5s + } opts := []sdktrace.TracerProviderOption{ - sdktrace.WithBatcher(exporter, trace.WithBatchTimeout(config.TraceBatchTimeout)), // Default is 5s + sdktrace.WithBatcher(exporter, batcherOpts...), sdktrace.WithResource(resource), sdktrace.WithSampler( sdktrace.ParentBased( diff --git a/pkg/beholder/httpclient.go b/pkg/beholder/httpclient.go index ca2ebd014..0df44e647 100644 --- a/pkg/beholder/httpclient.go +++ b/pkg/beholder/httpclient.go @@ -14,7 +14,6 @@ import ( sdklog "go.opentelemetry.io/otel/sdk/log" sdkmetric "go.opentelemetry.io/otel/sdk/metric" sdkresource "go.opentelemetry.io/otel/sdk/resource" - "go.opentelemetry.io/otel/sdk/trace" sdktrace "go.opentelemetry.io/otel/sdk/trace" ) @@ -77,9 +76,13 @@ func newHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro // Logger var loggerProcessor sdklog.Processor if cfg.LogBatchProcessor { + batchProcessorOpts := []sdklog.BatchProcessorOption{} + if cfg.LogExportTimeout > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.LogExportTimeout)) // Default is 30s + } loggerProcessor = sdklog.NewBatchProcessor( sharedLogExporter, - sdklog.WithExportTimeout(cfg.LogExportTimeout), // Default is 30s + batchProcessorOpts..., ) } else { loggerProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) @@ -117,9 +120,13 @@ func newHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro // Message Emitter var messageLogProcessor sdklog.Processor if cfg.EmitterBatchProcessor { + batchProcessorOpts := []sdklog.BatchProcessorOption{} + if cfg.EmitterExportTimeout > 0 { + batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.EmitterExportTimeout)) // Default is 30s + } messageLogProcessor = sdklog.NewBatchProcessor( sharedLogExporter, - sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s + batchProcessorOpts..., // Default is 30s ) } else { messageLogProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) @@ -181,9 +188,12 @@ func newHTTPTracerProvider(config Config, resource *sdkresource.Resource, tlsCon if err != nil { return nil, err } - + batcherOpts := []sdktrace.BatchSpanProcessorOption{} + if config.TraceBatchTimeout > 0 { + batcherOpts = append(batcherOpts, sdktrace.WithBatchTimeout(config.TraceBatchTimeout)) // Default is 5s + } opts := []sdktrace.TracerProviderOption{ - sdktrace.WithBatcher(exporter, trace.WithBatchTimeout(config.TraceBatchTimeout)), // Default is 5s + sdktrace.WithBatcher(exporter, batcherOpts...), sdktrace.WithResource(resource), sdktrace.WithSampler( sdktrace.ParentBased( diff --git a/pkg/loop/config.go b/pkg/loop/config.go index 6e18e60ae..e63f72f2f 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -142,6 +142,14 @@ func (e *EnvConfig) parse() error { e.TelemetryTraceSampleRatio = getFloat64OrZero(envTelemetryTraceSampleRatio) e.TelemetryAuthHeaders = getMap(envTelemetryAuthHeader) e.TelemetryAuthPubKeyHex = os.Getenv(envTelemetryAuthPubKeyHex) + e.TelemetryEmitterBatchProcessor, err = getBool(envTelemetryEmitterBatchProcessor) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterBatchProcessor, err) + } + e.TelemetryEmitterExportTimeout, err = time.ParseDuration(os.Getenv(envTelemetryEmitterExportTimeout)) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterExportTimeout, err) + } } return nil } diff --git a/pkg/loop/config_test.go b/pkg/loop/config_test.go index b2eb18745..78d177aa4 100644 --- a/pkg/loop/config_test.go +++ b/pkg/loop/config_test.go @@ -1,6 +1,7 @@ package loop import ( + "maps" "net/url" "os" "strconv" @@ -18,34 +19,65 @@ import ( func TestEnvConfig_parse(t *testing.T) { cases := []struct { - name string - envVars map[string]string - expectError bool - expectedDatabaseURL string - expectedPrometheusPort int - expectedTracingEnabled bool - expectedTracingCollectorTarget string - expectedTracingSamplingRatio float64 - expectedTracingTLSCertPath string + name string + envVars map[string]string + expectError bool + expectedDatabaseURL string + expectedPrometheusPort int + expectedTracingEnabled bool + expectedTracingCollectorTarget string + expectedTracingSamplingRatio float64 + expectedTracingTLSCertPath string + expectedTelemetryEnabled bool + expectedTelemetryEndpoint string + expectedTelemetryInsecureConn bool + expectedTelemetryCACertFile string + expectedTelemetryAttributes OtelAttributes + expectedTelemetryTraceSampleRatio float64 + expectedTelemetryAuthHeaders map[string]string + expectedTelemetryAuthPubKeyHex string + expectedTelemetryEmitterBatchProcessor bool + expectedTelemetryEmitterExportTimeout time.Duration }{ { name: "All variables set correctly", envVars: map[string]string{ - envDatabaseURL: "postgres://user:password@localhost:5432/db", - envPromPort: "8080", - envTracingEnabled: "true", - envTracingCollectorTarget: "some:target", - envTracingSamplingRatio: "1.0", - envTracingTLSCertPath: "internal/test/fixtures/client.pem", - envTracingAttribute + "XYZ": "value", + envDatabaseURL: "postgres://user:password@localhost:5432/db", + envPromPort: "8080", + envTracingEnabled: "true", + envTracingCollectorTarget: "some:target", + envTracingSamplingRatio: "1.0", + envTracingTLSCertPath: "internal/test/fixtures/client.pem", + envTracingAttribute + "XYZ": "value", + envTelemetryEnabled: "true", + envTelemetryEndpoint: "example.com/beholder", + envTelemetryInsecureConn: "true", + envTelemetryCACertFile: "foo/bar", + envTelemetryAttribute + "foo": "bar", + envTelemetryAttribute + "baz": "42", + envTelemetryTraceSampleRatio: "0.42", + envTelemetryAuthHeader + "header-key": "header-value", + envTelemetryAuthPubKeyHex: "pub-key-hex", + envTelemetryEmitterBatchProcessor: "true", + envTelemetryEmitterExportTimeout: "1s", }, - expectError: false, - expectedDatabaseURL: "postgres://user:password@localhost:5432/db", - expectedPrometheusPort: 8080, - expectedTracingEnabled: true, - expectedTracingCollectorTarget: "some:target", - expectedTracingSamplingRatio: 1.0, - expectedTracingTLSCertPath: "internal/test/fixtures/client.pem", + expectError: false, + expectedDatabaseURL: "postgres://user:password@localhost:5432/db", + expectedPrometheusPort: 8080, + expectedTracingEnabled: true, + expectedTracingCollectorTarget: "some:target", + expectedTracingSamplingRatio: 1.0, + expectedTracingTLSCertPath: "internal/test/fixtures/client.pem", + expectedTelemetryEnabled: true, + expectedTelemetryEndpoint: "example.com/beholder", + expectedTelemetryInsecureConn: true, + expectedTelemetryCACertFile: "foo/bar", + expectedTelemetryAttributes: OtelAttributes{"foo": "bar", "baz": "42"}, + expectedTelemetryTraceSampleRatio: 0.42, + expectedTelemetryAuthHeaders: map[string]string{"header-key": "header-value"}, + expectedTelemetryAuthPubKeyHex: "pub-key-hex", + expectedTelemetryEmitterBatchProcessor: true, + expectedTelemetryEmitterExportTimeout: 1 * time.Second, }, { name: "CL_DATABASE_URL parse error", @@ -106,6 +138,36 @@ func TestEnvConfig_parse(t *testing.T) { if config.TracingTLSCertPath != tc.expectedTracingTLSCertPath { t.Errorf("Expected tracingTLSCertPath %s, got %s", tc.expectedTracingTLSCertPath, config.TracingTLSCertPath) } + if config.TelemetryEnabled != tc.expectedTelemetryEnabled { + t.Errorf("Expected telemetryEnabled %v, got %v", tc.expectedTelemetryEnabled, config.TelemetryEnabled) + } + if config.TelemetryEndpoint != tc.expectedTelemetryEndpoint { + t.Errorf("Expected telemetryEndpoint %s, got %s", tc.expectedTelemetryEndpoint, config.TelemetryEndpoint) + } + if config.TelemetryInsecureConnection != tc.expectedTelemetryInsecureConn { + t.Errorf("Expected telemetryInsecureConn %v, got %v", tc.expectedTelemetryInsecureConn, config.TelemetryInsecureConnection) + } + if config.TelemetryCACertFile != tc.expectedTelemetryCACertFile { + t.Errorf("Expected telemetryCACertFile %s, got %s", tc.expectedTelemetryCACertFile, config.TelemetryCACertFile) + } + if !maps.Equal(config.TelemetryAttributes, tc.expectedTelemetryAttributes) { + t.Errorf("Expected telemetryAttributes %v, got %v", tc.expectedTelemetryAttributes, config.TelemetryAttributes) + } + if config.TelemetryTraceSampleRatio != tc.expectedTelemetryTraceSampleRatio { + t.Errorf("Expected telemetryTraceSampleRatio %f, got %f", tc.expectedTelemetryTraceSampleRatio, config.TelemetryTraceSampleRatio) + } + if !maps.Equal(config.TelemetryAuthHeaders, tc.expectedTelemetryAuthHeaders) { + t.Errorf("Expected telemetryAuthHeaders %v, got %v", tc.expectedTelemetryAuthHeaders, config.TelemetryAuthHeaders) + } + if config.TelemetryAuthPubKeyHex != tc.expectedTelemetryAuthPubKeyHex { + t.Errorf("Expected telemetryAuthPubKeyHex %s, got %s", tc.expectedTelemetryAuthPubKeyHex, config.TelemetryAuthPubKeyHex) + } + if config.TelemetryEmitterBatchProcessor != tc.expectedTelemetryEmitterBatchProcessor { + t.Errorf("Expected telemetryEmitterBatchProcessor %v, got %v", tc.expectedTelemetryEmitterBatchProcessor, config.TelemetryEmitterBatchProcessor) + } + if config.TelemetryEmitterExportTimeout != tc.expectedTelemetryEmitterExportTimeout { + t.Errorf("Expected telemetryEmitterExportTimeout %v, got %v", tc.expectedTelemetryEmitterExportTimeout, config.TelemetryEmitterExportTimeout) + } } } })