Skip to content

Commit

Permalink
Beholder: add more options to config
Browse files Browse the repository at this point in the history
  • Loading branch information
pkcll committed Aug 9, 2024
1 parent 9e31965 commit 559ea06
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 68 deletions.
59 changes: 35 additions & 24 deletions pkg/beholder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package beholder
import (
"context"
"errors"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -41,8 +40,6 @@ var _ Client = (*beholderClient)(nil)
type messageEmitter struct {
exporter sdklog.Exporter
messageLogger otellog.Logger
retryCount uint
retryDelay time.Duration
}

type beholderClient struct {
Expand All @@ -53,8 +50,8 @@ type beholderClient struct {
tracer oteltrace.Tracer
// Meter
meter otelmetric.Meter
// MessageEmitter
messageEmitter Emitter
// Message Emitter
emitter Emitter
// Graceful shutdown for tracer, meter, logger providers
closeFunc func() error
}
Expand All @@ -64,16 +61,16 @@ func NewClient(
logger otellog.Logger,
tracer oteltrace.Tracer,
meter otelmetric.Meter,
messageEmitter Emitter,
emitter Emitter,
onClose func() error,
) Client {
return &beholderClient{
config: config,
logger: logger,
tracer: tracer,
meter: meter,
messageEmitter: messageEmitter,
closeFunc: onClose,
config: config,
logger: logger,
tracer: tracer,
meter: meter,
emitter: emitter,
closeFunc: onClose,
}
}

Expand All @@ -90,7 +87,7 @@ type otlploggrpcFactory func(ctx context.Context, options ...otlploggrpc.Option)

func newOtelClient(cfg Config, errorHandler errorHandlerFunc, otlploggrpcNew otlploggrpcFactory) (Client, error) {
ctx := context.Background()
baseResource, err := newOtelResource()
baseResource, err := newOtelResource(cfg)
if err != nil {
return nil, err
}
Expand All @@ -106,7 +103,7 @@ func newOtelClient(cfg Config, errorHandler errorHandlerFunc, otlploggrpcNew otl
// Logger
loggerProcessor := sdklog.NewBatchProcessor(
sharedLogExporter,
sdklog.WithExportTimeout(1*time.Second), // Default is 30s
sdklog.WithExportTimeout(cfg.LogExportTimeout), // Default is 30s
)
loggerAttributes := []attribute.KeyValue{
attribute.String("beholder_data_type", "zap_log_message"),
Expand Down Expand Up @@ -144,10 +141,10 @@ func newOtelClient(cfg Config, errorHandler errorHandlerFunc, otlploggrpcNew otl
meter := meterProvider.Meter(cfg.PackageName)
otel.SetMeterProvider(meterProvider)

// MessageEmitter
// Message Emitter
messageLogProcessor := sdklog.NewBatchProcessor(
sharedLogExporter,
sdklog.WithExportTimeout(1*time.Second), // Default is 30s
sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s
)
messageAttributes := []attribute.KeyValue{
attribute.String("beholder_data_type", "custom_message"),
Expand All @@ -164,7 +161,7 @@ func newOtelClient(cfg Config, errorHandler errorHandlerFunc, otlploggrpcNew otl
sdklog.WithProcessor(messageLogProcessor),
)
messageLogger := messageLoggerProvider.Logger(cfg.PackageName)
messageEmitter := newMessageEmitter(sharedLogExporter, messageLogger, cfg)
messageEmitter := newMessageEmitter(sharedLogExporter, messageLogger)

setOtelErrorHandler(errorHandler)

Expand All @@ -182,7 +179,7 @@ func setOtelErrorHandler(h errorHandlerFunc) {
otel.SetErrorHandler(otel.ErrorHandlerFunc(h))
}

func newOtelResource() (resource *sdkresource.Resource, err error) {
func newOtelResource(cfg Config) (resource *sdkresource.Resource, err error) {
extraResources, err := sdkresource.New(
context.Background(),
sdkresource.WithOS(),
Expand All @@ -199,19 +196,28 @@ func newOtelResource() (resource *sdkresource.Resource, err error) {
if err != nil {
return nil, err
}
// Add custom resource attributes
attrs := make([]attribute.KeyValue, 0, len(cfg.ResourceAttributes))
for k, v := range cfg.ResourceAttributes {
attrs = append(attrs, attribute.String(k, v))
}
resource, err = sdkresource.Merge(
sdkresource.NewSchemaless(attrs...),
resource,
)
if err != nil {
return nil, err
}
return
}

func newMessageEmitter(
exporter sdklog.Exporter,
messageLogger otellog.Logger,
config Config,
) Emitter {
return messageEmitter{
exporter: exporter,
messageLogger: messageLogger,
retryCount: config.MessageEmitterRetryCount,
retryDelay: config.MessageEmitterRetryDelay,
}
}

Expand Down Expand Up @@ -246,7 +252,7 @@ func (b *beholderClient) Meter() otelmetric.Meter {
return b.meter
}
func (b *beholderClient) Emitter() Emitter {
return b.messageEmitter
return b.emitter
}

func (b *beholderClient) Close() error {
Expand Down Expand Up @@ -282,8 +288,13 @@ func newTracerProvider(config Config, resource *sdkresource.Resource) (*sdktrace
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter,
trace.WithBatchTimeout(time.Second)), // Default is 5s
trace.WithBatchTimeout(config.TraceBatchTimeout)), // Default is 5s
sdktrace.WithResource(resource),
sdktrace.WithSampler(
sdktrace.ParentBased(
sdktrace.TraceIDRatioBased(config.TraceSampleRate),
),
),
)
return tp, nil
}
Expand All @@ -304,7 +315,7 @@ func newMeterProvider(config Config, resource *sdkresource.Resource) (*sdkmetric
sdkmetric.WithReader(
sdkmetric.NewPeriodicReader(
exporter,
sdkmetric.WithInterval(time.Second), // Default is 10s
sdkmetric.WithInterval(config.MetricReaderInterval), // Default is 10s
)),
sdkmetric.WithResource(resource),
)
Expand Down
35 changes: 8 additions & 27 deletions pkg/beholder/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ package beholder
import (
"context"
"fmt"
"slices"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
otellog "go.opentelemetry.io/otel/log"
sdklog "go.opentelemetry.io/otel/sdk/log"
Expand Down Expand Up @@ -55,11 +53,8 @@ func TestClient(t *testing.T) {
defaultMessageBody := []byte("body bytes")

tests := []struct {
name string
makeCustomAttributes func() map[string]any
// NOTE: skipping these attributes is necessary due to a limitation in sdklog.Record
// see INFOPLAT-811
skipAttributes []string
name string
makeCustomAttributes func() map[string]any
messageBody []byte
messageCount int
exporterMockErrorCount int
Expand All @@ -69,7 +64,6 @@ func TestClient(t *testing.T) {
{
name: "Test Emit",
makeCustomAttributes: defaultCustomAttributes,
skipAttributes: []string{},
messageBody: defaultMessageBody,
messageCount: 10,
exporterMockErrorCount: 0,
Expand All @@ -81,7 +75,6 @@ func TestClient(t *testing.T) {
}, {
name: "Test EmitMessage",
makeCustomAttributes: defaultCustomAttributes,
skipAttributes: []string{},
messageBody: defaultMessageBody,
messageCount: 10,
exporterMockErrorCount: 0,
Expand All @@ -100,13 +93,13 @@ func TestClient(t *testing.T) {
defer exporterMock.AssertExpectations(t)

otelErrorHandler := func(err error) {
t.Fatalf("OTel error: %v", err)
t.Fatalf("otel error: %v", err)
}
// Override exporter factory which is used by BeholderClient
exporterFactory := func(context.Context, ...otlploggrpc.Option) (sdklog.Exporter, error) {
return exporterMock, nil
}
client, err := newOtelClient(DefaultBeholderConfig(), otelErrorHandler, exporterFactory)
client, err := newOtelClient(DefaultConfig(), otelErrorHandler, exporterFactory)
if err != nil {
t.Fatalf("Error creating beholder client: %v", err)
}
Expand All @@ -131,12 +124,6 @@ func TestClient(t *testing.T) {
actualAttributeKeys := map[string]struct{}{}
record.WalkAttributes(func(kv otellog.KeyValue) bool {
key := kv.Key
if slices.Contains(tc.skipAttributes, key) {
// NOTE: skipping these attributes is necessary due to a limitation in sdklog.Record
// see INFOPLAT-811
t.Logf("Skipping attribute key: %s. See INFOPLAT-811", key)
return true
}
actualAttributeKeys[key] = struct{}{}
expectedValue, ok := customAttributes[key]
if !ok {
Expand All @@ -148,12 +135,6 @@ func TestClient(t *testing.T) {
return true
})
for key := range customAttributes {
if slices.Contains(tc.skipAttributes, key) {
// NOTE: skipping these attributes is necessary due to a limitation in sdklog.Record
// see INFOPLAT-811
t.Logf("Skipping attribute key: %s. See INFOPLAT-811", key)
continue
}
if _, ok := actualAttributeKeys[key]; !ok {
t.Fatalf("Record attribute key not found: %s", key)
}
Expand All @@ -177,7 +158,7 @@ func TestClient(t *testing.T) {
func TestEmitterMessageValidation(t *testing.T) {
getEmitter := func(exporterMock *mocks.OTLPExporter) Emitter {
client, err := newOtelClient(
DefaultBeholderConfig(),
DefaultConfig(),
func(err error) { t.Fatalf("otel error: %v", err) },
// Override exporter factory which is used by BeholderClient
func(context.Context, ...otlploggrpc.Option) (sdklog.Exporter, error) {
Expand Down Expand Up @@ -205,7 +186,7 @@ func TestEmitterMessageValidation(t *testing.T) {
{
name: "Invalid URI",
attrs: Attributes{
"beholder_data_schema": "beholder/pb/example.proto",
"beholder_data_schema": "example-schema",
},
exporterCalledTimes: 0,
expectedError: "'Metadata.BeholderDataSchema' Error:Field validation for 'BeholderDataSchema' failed on the 'uri' tag",
Expand All @@ -214,7 +195,7 @@ func TestEmitterMessageValidation(t *testing.T) {
name: "Valid URI",
exporterCalledTimes: 1,
attrs: Attributes{
"beholder_data_schema": "https://example.com/example.proto",
"beholder_data_schema": "/example-schema/versions/1",
},
expectedError: "",
},
Expand Down Expand Up @@ -248,7 +229,7 @@ func TestEmitterMessageValidation(t *testing.T) {
}

waitUntilSent := func(done <-chan struct{}) {
for i := 0; i < tc.exporterCalledTimes; i++ {
for range tc.exporterCalledTimes {
select {
case <-done:
case <-time.After(10 * time.Second):
Expand Down
44 changes: 36 additions & 8 deletions pkg/beholder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,54 @@ package beholder

import (
"time"

otelattr "go.opentelemetry.io/otel/attribute"
)

type Config struct {
Enabled bool
OtelExporterGRPCEndpoint string
PackageName string
MessageEmitterRetryCount uint
MessageEmitterRetryDelay time.Duration
// OTel Resource
ResourceAttributes map[string]string
// EventEmitter
EmitterExportTimeout time.Duration
// OTel Trace
TraceSampleRate float64
TraceBatchTimeout time.Duration
// OTel Metric
MetricReaderInterval time.Duration
// OTel Log
LogExportTimeout time.Duration
}

var defaultOtelAttributes = map[string]string{
"package_name": "beholder",
}

func DefaultBeholderConfig() Config {
func DefaultConfig() Config {
return Config{
Enabled: true,
OtelExporterGRPCEndpoint: "localhost:4317",
PackageName: "beholder",
MessageEmitterRetryCount: 3,
MessageEmitterRetryDelay: 100 * time.Millisecond,
// Resource
ResourceAttributes: defaultOtelAttributes,
// EventEmitter
EmitterExportTimeout: 1 * time.Second,
// Trace
TraceSampleRate: 1,
TraceBatchTimeout: 1 * time.Second,
// Metric
MetricReaderInterval: 1 * time.Second,
// Log
LogExportTimeout: 1 * time.Second,
}
}

func (c Config) Attributes() map[string]interface{} {
return map[string]interface{}{
"otel_exporter_otlp_endpoint": c.OtelExporterGRPCEndpoint,
func (c Config) Attributes() []otelattr.KeyValue {
attrs := make([]otelattr.KeyValue, 0, len(c.ResourceAttributes))
for k, v := range c.ResourceAttributes {
attrs = append(attrs, otelattr.String(k, v))
}
return attrs
}
26 changes: 23 additions & 3 deletions pkg/beholder/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,36 @@ package beholder_test

import (
"fmt"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
beholder "github.com/smartcontractkit/chainlink-common/pkg/beholder"
)

const (
packageName = "beholder"
)

func ExampleConfig() {
config := beholder.Config{
Enabled: true,
OtelExporterGRPCEndpoint: "localhost:4317",
PackageName: "beholder",
PackageName: packageName,
// Resource
ResourceAttributes: map[string]string{
"package_name": packageName,
"sender": "beholdeclient",
},
// EventEmitter
EmitterExportTimeout: 1 * time.Second,
// Trace
TraceSampleRate: 1,
TraceBatchTimeout: 1 * time.Second,
// Metric
MetricReaderInterval: 1 * time.Second,
// Log
LogExportTimeout: 1 * time.Second,
}
fmt.Printf("%+v", config)
// Output:
// {OtelExporterGRPCEndpoint:localhost:4317 PackageName:beholder MessageEmitterRetryCount:0 MessageEmitterRetryDelay:0s}
// {Enabled:true OtelExporterGRPCEndpoint:localhost:4317 PackageName:beholder ResourceAttributes:map[package_name:beholder sender:beholdeclient] EmitterExportTimeout:1s TraceSampleRate:1 TraceBatchTimeout:1s MetricReaderInterval:1s LogExportTimeout:1s}
}
Loading

0 comments on commit 559ea06

Please sign in to comment.