Skip to content

Commit

Permalink
[INFOPLAT-1592] Address high CPU utilization when telemetry is enabled (
Browse files Browse the repository at this point in the history
#967)

* [loop/EnvConfig] parse sets TelemetryEmitterBatchProcessor, TelemetryEmitterExportTimeout

* [beholder/client] BatchProcessor ExportTimeout option is non-zero value

* [loop/EnvConfig] Use maps.Equal in tests

---------

Co-authored-by: Patrick <[email protected]>
  • Loading branch information
pkcll and patrickhuie19 authored Dec 10, 2024
1 parent eb2f2bc commit a9c706f
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 33 deletions.
20 changes: 15 additions & 5 deletions pkg/beholder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
sdklog "go.opentelemetry.io/otel/sdk/log"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
oteltrace "go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -112,9 +111,13 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
// Logger
var loggerProcessor sdklog.Processor
if cfg.LogBatchProcessor {
batchProcessorOpts := []sdklog.BatchProcessorOption{}
if cfg.LogExportTimeout > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.LogExportTimeout)) // Default is 30s
}
loggerProcessor = sdklog.NewBatchProcessor(
sharedLogExporter,
sdklog.WithExportTimeout(cfg.LogExportTimeout), // Default is 30s
batchProcessorOpts...,
)
} else {
loggerProcessor = sdklog.NewSimpleProcessor(sharedLogExporter)
Expand Down Expand Up @@ -152,9 +155,13 @@ func newGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
// Message Emitter
var messageLogProcessor sdklog.Processor
if cfg.EmitterBatchProcessor {
batchProcessorOpts := []sdklog.BatchProcessorOption{}
if cfg.EmitterExportTimeout > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.EmitterExportTimeout)) // Default is 30s
}
messageLogProcessor = sdklog.NewBatchProcessor(
sharedLogExporter,
sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s
batchProcessorOpts...,
)
} else {
messageLogProcessor = sdklog.NewSimpleProcessor(sharedLogExporter)
Expand Down Expand Up @@ -314,9 +321,12 @@ func newTracerProvider(config Config, resource *sdkresource.Resource, creds cred
if err != nil {
return nil, err
}

batcherOpts := []sdktrace.BatchSpanProcessorOption{}
if config.TraceBatchTimeout > 0 {
batcherOpts = append(batcherOpts, sdktrace.WithBatchTimeout(config.TraceBatchTimeout)) // Default is 5s
}
opts := []sdktrace.TracerProviderOption{
sdktrace.WithBatcher(exporter, trace.WithBatchTimeout(config.TraceBatchTimeout)), // Default is 5s
sdktrace.WithBatcher(exporter, batcherOpts...),
sdktrace.WithResource(resource),
sdktrace.WithSampler(
sdktrace.ParentBased(
Expand Down
20 changes: 15 additions & 5 deletions pkg/beholder/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
sdklog "go.opentelemetry.io/otel/sdk/log"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

Expand Down Expand Up @@ -77,9 +76,13 @@ func newHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro
// Logger
var loggerProcessor sdklog.Processor
if cfg.LogBatchProcessor {
batchProcessorOpts := []sdklog.BatchProcessorOption{}
if cfg.LogExportTimeout > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.LogExportTimeout)) // Default is 30s
}
loggerProcessor = sdklog.NewBatchProcessor(
sharedLogExporter,
sdklog.WithExportTimeout(cfg.LogExportTimeout), // Default is 30s
batchProcessorOpts...,
)
} else {
loggerProcessor = sdklog.NewSimpleProcessor(sharedLogExporter)
Expand Down Expand Up @@ -117,9 +120,13 @@ func newHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro
// Message Emitter
var messageLogProcessor sdklog.Processor
if cfg.EmitterBatchProcessor {
batchProcessorOpts := []sdklog.BatchProcessorOption{}
if cfg.EmitterExportTimeout > 0 {
batchProcessorOpts = append(batchProcessorOpts, sdklog.WithExportTimeout(cfg.EmitterExportTimeout)) // Default is 30s
}
messageLogProcessor = sdklog.NewBatchProcessor(
sharedLogExporter,
sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s
batchProcessorOpts..., // Default is 30s
)
} else {
messageLogProcessor = sdklog.NewSimpleProcessor(sharedLogExporter)
Expand Down Expand Up @@ -181,9 +188,12 @@ func newHTTPTracerProvider(config Config, resource *sdkresource.Resource, tlsCon
if err != nil {
return nil, err
}

batcherOpts := []sdktrace.BatchSpanProcessorOption{}
if config.TraceBatchTimeout > 0 {
batcherOpts = append(batcherOpts, sdktrace.WithBatchTimeout(config.TraceBatchTimeout)) // Default is 5s
}
opts := []sdktrace.TracerProviderOption{
sdktrace.WithBatcher(exporter, trace.WithBatchTimeout(config.TraceBatchTimeout)), // Default is 5s
sdktrace.WithBatcher(exporter, batcherOpts...),
sdktrace.WithResource(resource),
sdktrace.WithSampler(
sdktrace.ParentBased(
Expand Down
8 changes: 8 additions & 0 deletions pkg/loop/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ func (e *EnvConfig) parse() error {
e.TelemetryTraceSampleRatio = getFloat64OrZero(envTelemetryTraceSampleRatio)
e.TelemetryAuthHeaders = getMap(envTelemetryAuthHeader)
e.TelemetryAuthPubKeyHex = os.Getenv(envTelemetryAuthPubKeyHex)
e.TelemetryEmitterBatchProcessor, err = getBool(envTelemetryEmitterBatchProcessor)
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterBatchProcessor, err)
}
e.TelemetryEmitterExportTimeout, err = time.ParseDuration(os.Getenv(envTelemetryEmitterExportTimeout))
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterExportTimeout, err)
}
}
return nil
}
Expand Down
108 changes: 85 additions & 23 deletions pkg/loop/config_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package loop

import (
"maps"
"net/url"
"os"
"strconv"
Expand All @@ -18,34 +19,65 @@ import (

func TestEnvConfig_parse(t *testing.T) {
cases := []struct {
name string
envVars map[string]string
expectError bool
expectedDatabaseURL string
expectedPrometheusPort int
expectedTracingEnabled bool
expectedTracingCollectorTarget string
expectedTracingSamplingRatio float64
expectedTracingTLSCertPath string
name string
envVars map[string]string
expectError bool
expectedDatabaseURL string
expectedPrometheusPort int
expectedTracingEnabled bool
expectedTracingCollectorTarget string
expectedTracingSamplingRatio float64
expectedTracingTLSCertPath string
expectedTelemetryEnabled bool
expectedTelemetryEndpoint string
expectedTelemetryInsecureConn bool
expectedTelemetryCACertFile string
expectedTelemetryAttributes OtelAttributes
expectedTelemetryTraceSampleRatio float64
expectedTelemetryAuthHeaders map[string]string
expectedTelemetryAuthPubKeyHex string
expectedTelemetryEmitterBatchProcessor bool
expectedTelemetryEmitterExportTimeout time.Duration
}{
{
name: "All variables set correctly",
envVars: map[string]string{
envDatabaseURL: "postgres://user:password@localhost:5432/db",
envPromPort: "8080",
envTracingEnabled: "true",
envTracingCollectorTarget: "some:target",
envTracingSamplingRatio: "1.0",
envTracingTLSCertPath: "internal/test/fixtures/client.pem",
envTracingAttribute + "XYZ": "value",
envDatabaseURL: "postgres://user:password@localhost:5432/db",
envPromPort: "8080",
envTracingEnabled: "true",
envTracingCollectorTarget: "some:target",
envTracingSamplingRatio: "1.0",
envTracingTLSCertPath: "internal/test/fixtures/client.pem",
envTracingAttribute + "XYZ": "value",
envTelemetryEnabled: "true",
envTelemetryEndpoint: "example.com/beholder",
envTelemetryInsecureConn: "true",
envTelemetryCACertFile: "foo/bar",
envTelemetryAttribute + "foo": "bar",
envTelemetryAttribute + "baz": "42",
envTelemetryTraceSampleRatio: "0.42",
envTelemetryAuthHeader + "header-key": "header-value",
envTelemetryAuthPubKeyHex: "pub-key-hex",
envTelemetryEmitterBatchProcessor: "true",
envTelemetryEmitterExportTimeout: "1s",
},
expectError: false,
expectedDatabaseURL: "postgres://user:password@localhost:5432/db",
expectedPrometheusPort: 8080,
expectedTracingEnabled: true,
expectedTracingCollectorTarget: "some:target",
expectedTracingSamplingRatio: 1.0,
expectedTracingTLSCertPath: "internal/test/fixtures/client.pem",
expectError: false,
expectedDatabaseURL: "postgres://user:password@localhost:5432/db",
expectedPrometheusPort: 8080,
expectedTracingEnabled: true,
expectedTracingCollectorTarget: "some:target",
expectedTracingSamplingRatio: 1.0,
expectedTracingTLSCertPath: "internal/test/fixtures/client.pem",
expectedTelemetryEnabled: true,
expectedTelemetryEndpoint: "example.com/beholder",
expectedTelemetryInsecureConn: true,
expectedTelemetryCACertFile: "foo/bar",
expectedTelemetryAttributes: OtelAttributes{"foo": "bar", "baz": "42"},
expectedTelemetryTraceSampleRatio: 0.42,
expectedTelemetryAuthHeaders: map[string]string{"header-key": "header-value"},
expectedTelemetryAuthPubKeyHex: "pub-key-hex",
expectedTelemetryEmitterBatchProcessor: true,
expectedTelemetryEmitterExportTimeout: 1 * time.Second,
},
{
name: "CL_DATABASE_URL parse error",
Expand Down Expand Up @@ -106,6 +138,36 @@ func TestEnvConfig_parse(t *testing.T) {
if config.TracingTLSCertPath != tc.expectedTracingTLSCertPath {
t.Errorf("Expected tracingTLSCertPath %s, got %s", tc.expectedTracingTLSCertPath, config.TracingTLSCertPath)
}
if config.TelemetryEnabled != tc.expectedTelemetryEnabled {
t.Errorf("Expected telemetryEnabled %v, got %v", tc.expectedTelemetryEnabled, config.TelemetryEnabled)
}
if config.TelemetryEndpoint != tc.expectedTelemetryEndpoint {
t.Errorf("Expected telemetryEndpoint %s, got %s", tc.expectedTelemetryEndpoint, config.TelemetryEndpoint)
}
if config.TelemetryInsecureConnection != tc.expectedTelemetryInsecureConn {
t.Errorf("Expected telemetryInsecureConn %v, got %v", tc.expectedTelemetryInsecureConn, config.TelemetryInsecureConnection)
}
if config.TelemetryCACertFile != tc.expectedTelemetryCACertFile {
t.Errorf("Expected telemetryCACertFile %s, got %s", tc.expectedTelemetryCACertFile, config.TelemetryCACertFile)
}
if !maps.Equal(config.TelemetryAttributes, tc.expectedTelemetryAttributes) {
t.Errorf("Expected telemetryAttributes %v, got %v", tc.expectedTelemetryAttributes, config.TelemetryAttributes)
}
if config.TelemetryTraceSampleRatio != tc.expectedTelemetryTraceSampleRatio {
t.Errorf("Expected telemetryTraceSampleRatio %f, got %f", tc.expectedTelemetryTraceSampleRatio, config.TelemetryTraceSampleRatio)
}
if !maps.Equal(config.TelemetryAuthHeaders, tc.expectedTelemetryAuthHeaders) {
t.Errorf("Expected telemetryAuthHeaders %v, got %v", tc.expectedTelemetryAuthHeaders, config.TelemetryAuthHeaders)
}
if config.TelemetryAuthPubKeyHex != tc.expectedTelemetryAuthPubKeyHex {
t.Errorf("Expected telemetryAuthPubKeyHex %s, got %s", tc.expectedTelemetryAuthPubKeyHex, config.TelemetryAuthPubKeyHex)
}
if config.TelemetryEmitterBatchProcessor != tc.expectedTelemetryEmitterBatchProcessor {
t.Errorf("Expected telemetryEmitterBatchProcessor %v, got %v", tc.expectedTelemetryEmitterBatchProcessor, config.TelemetryEmitterBatchProcessor)
}
if config.TelemetryEmitterExportTimeout != tc.expectedTelemetryEmitterExportTimeout {
t.Errorf("Expected telemetryEmitterExportTimeout %v, got %v", tc.expectedTelemetryEmitterExportTimeout, config.TelemetryEmitterExportTimeout)
}
}
}
})
Expand Down

0 comments on commit a9c706f

Please sign in to comment.