From f4cda6345bef1188b7bf2f071e8c82486fd86887 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 10 Jan 2025 01:37:30 +0530 Subject: [PATCH 1/5] refactor: revert to using observables modified to sync gauges --- flow/otel_metrics/observables.go | 199 ++++++++++++++++++++++++++++++ flow/otel_metrics/otel_manager.go | 4 +- 2 files changed, 201 insertions(+), 2 deletions(-) create mode 100644 flow/otel_metrics/observables.go diff --git a/flow/otel_metrics/observables.go b/flow/otel_metrics/observables.go new file mode 100644 index 0000000000..36fc3c55fc --- /dev/null +++ b/flow/otel_metrics/observables.go @@ -0,0 +1,199 @@ +package otel_metrics + +import ( + "context" + "fmt" + "log/slog" + "sync" + "sync/atomic" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/embedded" +) + +type ObservationMapValue[V comparable] struct { + Value V +} + +// SyncGauge is a generic synchronous gauge that can be used to observe any type of value +// Inspired from https://github.com/open-telemetry/opentelemetry-go/issues/3984#issuecomment-1743231837 +type SyncGauge[V comparable, O metric.Observable] struct { + observableGauge O + observations sync.Map + name string +} + +func (a *SyncGauge[V, O]) Callback(ctx context.Context, observeFunc func(value V, options ...metric.ObserveOption)) error { + a.observations.Range(func(key, value interface{}) bool { + attrs := key.(attribute.Set) + val := value.(*ObservationMapValue[V]) + observeFunc(val.Value, metric.WithAttributeSet(attrs)) + // If the pointer is still same we can safely delete, else it means that the value was overwritten in parallel + a.observations.CompareAndDelete(attrs, val) + return true + }) + return nil +} + +func (a *SyncGauge[V, O]) Record(input V, attrs attribute.Set) { + val := ObservationMapValue[V]{Value: input} + a.observations.Store(attrs, &val) +} + +type Int64SyncGauge struct { + embedded.Int64Gauge + syncGauge *SyncGauge[int64, metric.Int64ObservableGauge] +} + +func (a *Int64SyncGauge) Record(ctx context.Context, value int64, options ...metric.RecordOption) { + if a == nil { + return + } + c := metric.NewRecordConfig(options) + a.syncGauge.Record(value, c.Attributes()) +} + +func NewInt64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Int64ObservableGaugeOption) (*Int64SyncGauge, error) { + syncGauge := &SyncGauge[int64, metric.Int64ObservableGauge]{ + name: gaugeName, + } + observableGauge, err := meter.Int64ObservableGauge(gaugeName, + append(opts, metric.WithInt64Callback(func(ctx context.Context, observer metric.Int64Observer) error { + return syncGauge.Callback(ctx, func(value int64, options ...metric.ObserveOption) { + observer.Observe(value, options...) + }) + }))...) + if err != nil { + return nil, fmt.Errorf("failed to create Int64SyncGauge: %w", err) + } + syncGauge.observableGauge = observableGauge + return &Int64SyncGauge{syncGauge: syncGauge}, nil +} + +type Float64SyncGauge struct { + embedded.Float64Gauge + syncGauge *SyncGauge[float64, metric.Float64Observable] +} + +func (a *Float64SyncGauge) Record(ctx context.Context, value float64, options ...metric.RecordOption) { + if a == nil { + return + } + c := metric.NewRecordConfig(options) + a.syncGauge.Record(value, c.Attributes()) +} + +func NewFloat64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Float64ObservableGaugeOption) (*Float64SyncGauge, error) { + syncGauge := &SyncGauge[float64, metric.Float64Observable]{ + name: gaugeName, + } + observableGauge, err := meter.Float64ObservableGauge(gaugeName, + append(opts, metric.WithFloat64Callback(func(ctx context.Context, observer metric.Float64Observer) error { + return syncGauge.Callback(ctx, func(value float64, options ...metric.ObserveOption) { + observer.Observe(value, options...) + }) + }))...) + if err != nil { + return nil, fmt.Errorf("failed to create Float64SyncGauge: %w", err) + } + syncGauge.observableGauge = observableGauge + return &Float64SyncGauge{syncGauge: syncGauge}, nil +} + +func atomicAdd[V any](a *V, b V) error { + switch v := any(b).(type) { + case int64: + atomic.AddInt64(any(*a).(*int64), v) + default: + return fmt.Errorf("unsupported type %T for atomicAdd", v) + } + return nil +} + +type SyncCounter[V comparable, O metric.Observable] struct { + observableCounter O + observations sync.Map + name string +} + +func (a *SyncCounter[V, O]) Callback(ctx context.Context, observeFunc func(value V, options ...metric.ObserveOption)) error { + a.observations.Range(func(key, value interface{}) bool { + attrs := key.(attribute.Set) + val := value.(*ObservationMapValue[V]) + observeFunc(val.Value, metric.WithAttributeSet(attrs)) + // TODO what to do for counters?? + // If the pointer is still same we can safely delete, else it means that the value was overwritten in parallel + a.observations.CompareAndDelete(attrs, val) + return true + }) + return nil +} + +func (a *SyncCounter[V, O]) Add(input V, attrs attribute.Set) { + val := ObservationMapValue[V]{Value: input} + + actual, loaded := a.observations.LoadOrStore(attrs, &val) + if loaded { + // If the value was already present, we need to add the new value to the existing value + existing := actual.(*ObservationMapValue[V]) + if err := atomicAdd(&existing.Value, val.Value); err != nil { + slog.Error("Failed to add value to existing value", slog.Any("error", err)) + } + } +} + +type Int64SyncCounter struct { + embedded.Int64Counter + syncCounter *SyncCounter[int64, metric.Int64ObservableCounter] +} + +func (i *Int64SyncCounter) Add(ctx context.Context, incr int64, options ...metric.AddOption) { + if i == nil { + return + } + c := metric.NewAddConfig(options) + i.syncCounter.Add(incr, c.Attributes()) +} + +func NewInt64SyncCounter(meter metric.Meter, counterName string, opts ...metric.Int64ObservableCounterOption) (*Int64SyncCounter, error) { + syncCounter := &SyncCounter[int64, metric.Int64ObservableCounter]{ + name: counterName, + } + observableCounter, err := meter.Int64ObservableCounter(counterName, + append(opts, metric.WithInt64Callback(func(ctx context.Context, observer metric.Int64Observer) error { + return syncCounter.Callback(ctx, func(value int64, options ...metric.ObserveOption) { + observer.Observe(value, options...) + }) + }))...) + if err != nil { + return nil, fmt.Errorf("failed to create Int64SyncCounter: %w", err) + } + syncCounter.observableCounter = observableCounter + return &Int64SyncCounter{syncCounter: syncCounter}, nil +} + +func Int64Gauge(meter metric.Meter, name string, opts ...metric.Int64GaugeOption) (metric.Int64Gauge, error) { + gaugeConfig := metric.NewInt64GaugeConfig(opts...) + return NewInt64SyncGauge(meter, name, + metric.WithDescription(gaugeConfig.Description()), + metric.WithUnit(gaugeConfig.Unit()), + ) +} + +func Float64Gauge(meter metric.Meter, name string, opts ...metric.Float64GaugeOption) (metric.Float64Gauge, error) { + gaugeConfig := metric.NewFloat64GaugeConfig(opts...) + return NewFloat64SyncGauge(meter, name, + metric.WithDescription(gaugeConfig.Description()), + metric.WithUnit(gaugeConfig.Unit()), + ) +} + +// TODO this implementation is still pending +func Int64Counter(meter metric.Meter, name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) { + counterConfig := metric.NewInt64CounterConfig(opts...) + return NewInt64SyncCounter(meter, name, + metric.WithDescription(counterConfig.Description()), + metric.WithUnit(counterConfig.Unit()), + ) +} diff --git a/flow/otel_metrics/otel_manager.go b/flow/otel_metrics/otel_manager.go index 5849e6d390..33f0a5d5ab 100644 --- a/flow/otel_metrics/otel_manager.go +++ b/flow/otel_metrics/otel_manager.go @@ -89,11 +89,11 @@ func getOrInitMetric[M any, O any]( } func (om *OtelManager) GetOrInitInt64Gauge(name string, opts ...metric.Int64GaugeOption) (metric.Int64Gauge, error) { - return getOrInitMetric(metric.Meter.Int64Gauge, om.Meter, om.Int64GaugesCache, name, opts...) + return getOrInitMetric(Int64Gauge, om.Meter, om.Int64GaugesCache, name, opts...) // Once fixed, replace first argument with metric.Meter.Int64Gauge } func (om *OtelManager) GetOrInitFloat64Gauge(name string, opts ...metric.Float64GaugeOption) (metric.Float64Gauge, error) { - return getOrInitMetric(metric.Meter.Float64Gauge, om.Meter, om.Float64GaugesCache, name, opts...) + return getOrInitMetric(Float64Gauge, om.Meter, om.Float64GaugesCache, name, opts...) // Once fixed, replace first argument with metric.Meter.Float64Gauge } func (om *OtelManager) GetOrInitInt64Counter(name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) { From a10c173ece234041aa173247e676e6db61164af6 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Mon, 13 Jan 2025 22:58:28 +0530 Subject: [PATCH 2/5] chore: allow golang-ci lint to write annotations on PRs --- .github/workflows/golang-lint.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/golang-lint.yml b/.github/workflows/golang-lint.yml index 9b4e558a75..fd4f5eef57 100644 --- a/.github/workflows/golang-lint.yml +++ b/.github/workflows/golang-lint.yml @@ -7,6 +7,7 @@ on: permissions: contents: read + checks: write jobs: golangci: From e9ed6775b59f837b0a9fccb6ceb229a342a5522a Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Mon, 13 Jan 2025 23:07:28 +0530 Subject: [PATCH 3/5] fix: add out-format --- .github/workflows/golang-lint.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/golang-lint.yml b/.github/workflows/golang-lint.yml index fd4f5eef57..fe8ddd309e 100644 --- a/.github/workflows/golang-lint.yml +++ b/.github/workflows/golang-lint.yml @@ -32,4 +32,4 @@ jobs: with: version: v1.62 working-directory: ./flow - args: --timeout=10m + args: --timeout=10m --out-format=github-actions From 0a752c7c90689ecf4254a7904ccfa952b7918c7c Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Mon, 13 Jan 2025 23:34:10 +0530 Subject: [PATCH 4/5] chore: fix lint --- flow/otel_metrics/otel_manager.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flow/otel_metrics/otel_manager.go b/flow/otel_metrics/otel_manager.go index 33f0a5d5ab..2e32d82982 100644 --- a/flow/otel_metrics/otel_manager.go +++ b/flow/otel_metrics/otel_manager.go @@ -89,11 +89,13 @@ func getOrInitMetric[M any, O any]( } func (om *OtelManager) GetOrInitInt64Gauge(name string, opts ...metric.Int64GaugeOption) (metric.Int64Gauge, error) { - return getOrInitMetric(Int64Gauge, om.Meter, om.Int64GaugesCache, name, opts...) // Once fixed, replace first argument with metric.Meter.Int64Gauge + // Once fixed, replace first argument below with metric.Meter.Int64Gauge + return getOrInitMetric(Int64Gauge, om.Meter, om.Int64GaugesCache, name, opts...) } func (om *OtelManager) GetOrInitFloat64Gauge(name string, opts ...metric.Float64GaugeOption) (metric.Float64Gauge, error) { - return getOrInitMetric(Float64Gauge, om.Meter, om.Float64GaugesCache, name, opts...) // Once fixed, replace first argument with metric.Meter.Float64Gauge + // Once fixed, replace first argument below with metric.Meter.Float64Gauge + return getOrInitMetric(Float64Gauge, om.Meter, om.Float64GaugesCache, name, opts...) } func (om *OtelManager) GetOrInitInt64Counter(name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) { From d0f00d28b62e777fc13a08271e162df7e0541612 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Tue, 14 Jan 2025 01:08:54 +0530 Subject: [PATCH 5/5] chore: remove sync counter and take it up later --- flow/otel_metrics/observables.go | 83 -------------------------------- 1 file changed, 83 deletions(-) diff --git a/flow/otel_metrics/observables.go b/flow/otel_metrics/observables.go index 36fc3c55fc..6d9f970172 100644 --- a/flow/otel_metrics/observables.go +++ b/flow/otel_metrics/observables.go @@ -3,9 +3,7 @@ package otel_metrics import ( "context" "fmt" - "log/slog" "sync" - "sync/atomic" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -101,78 +99,6 @@ func NewFloat64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Fl return &Float64SyncGauge{syncGauge: syncGauge}, nil } -func atomicAdd[V any](a *V, b V) error { - switch v := any(b).(type) { - case int64: - atomic.AddInt64(any(*a).(*int64), v) - default: - return fmt.Errorf("unsupported type %T for atomicAdd", v) - } - return nil -} - -type SyncCounter[V comparable, O metric.Observable] struct { - observableCounter O - observations sync.Map - name string -} - -func (a *SyncCounter[V, O]) Callback(ctx context.Context, observeFunc func(value V, options ...metric.ObserveOption)) error { - a.observations.Range(func(key, value interface{}) bool { - attrs := key.(attribute.Set) - val := value.(*ObservationMapValue[V]) - observeFunc(val.Value, metric.WithAttributeSet(attrs)) - // TODO what to do for counters?? - // If the pointer is still same we can safely delete, else it means that the value was overwritten in parallel - a.observations.CompareAndDelete(attrs, val) - return true - }) - return nil -} - -func (a *SyncCounter[V, O]) Add(input V, attrs attribute.Set) { - val := ObservationMapValue[V]{Value: input} - - actual, loaded := a.observations.LoadOrStore(attrs, &val) - if loaded { - // If the value was already present, we need to add the new value to the existing value - existing := actual.(*ObservationMapValue[V]) - if err := atomicAdd(&existing.Value, val.Value); err != nil { - slog.Error("Failed to add value to existing value", slog.Any("error", err)) - } - } -} - -type Int64SyncCounter struct { - embedded.Int64Counter - syncCounter *SyncCounter[int64, metric.Int64ObservableCounter] -} - -func (i *Int64SyncCounter) Add(ctx context.Context, incr int64, options ...metric.AddOption) { - if i == nil { - return - } - c := metric.NewAddConfig(options) - i.syncCounter.Add(incr, c.Attributes()) -} - -func NewInt64SyncCounter(meter metric.Meter, counterName string, opts ...metric.Int64ObservableCounterOption) (*Int64SyncCounter, error) { - syncCounter := &SyncCounter[int64, metric.Int64ObservableCounter]{ - name: counterName, - } - observableCounter, err := meter.Int64ObservableCounter(counterName, - append(opts, metric.WithInt64Callback(func(ctx context.Context, observer metric.Int64Observer) error { - return syncCounter.Callback(ctx, func(value int64, options ...metric.ObserveOption) { - observer.Observe(value, options...) - }) - }))...) - if err != nil { - return nil, fmt.Errorf("failed to create Int64SyncCounter: %w", err) - } - syncCounter.observableCounter = observableCounter - return &Int64SyncCounter{syncCounter: syncCounter}, nil -} - func Int64Gauge(meter metric.Meter, name string, opts ...metric.Int64GaugeOption) (metric.Int64Gauge, error) { gaugeConfig := metric.NewInt64GaugeConfig(opts...) return NewInt64SyncGauge(meter, name, @@ -188,12 +114,3 @@ func Float64Gauge(meter metric.Meter, name string, opts ...metric.Float64GaugeOp metric.WithUnit(gaugeConfig.Unit()), ) } - -// TODO this implementation is still pending -func Int64Counter(meter metric.Meter, name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) { - counterConfig := metric.NewInt64CounterConfig(opts...) - return NewInt64SyncCounter(meter, name, - metric.WithDescription(counterConfig.Description()), - metric.WithUnit(counterConfig.Unit()), - ) -}