From b1c21a36e07165fb71d798daf43a2eec9c869404 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 3 May 2024 18:54:20 +0530 Subject: [PATCH] fixed lints, added ENABLE_OTEL_METRICS --- flow/activities/flowable.go | 55 +++++++++++++++++++++---------------- flow/cmd/worker.go | 34 +++++++++++++++-------- flow/main.go | 8 ++++++ flow/shared/otel_metrics.go | 14 +++++++--- 4 files changed, 71 insertions(+), 40 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 821c25c672..0f58515a69 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -604,35 +604,42 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error { return } - slotLagGaugeKey := fmt.Sprintf("%s_slotlag_%s", peerName, slotName) - slotLagGauge, ok := a.OtelManager.SlotLagGaugesCache[slotLagGaugeKey] - if !ok { - slotLagGauge, err = shared.NewFloat64SyncGauge(a.OtelManager.SlotLagMeter, - slotLagGaugeKey, - metric.WithUnit("MB"), - metric.WithDescription(fmt.Sprintf("Slot lag for slot %s on %s", slotName, peerName))) - if err != nil { - logger.Error("Failed to create slot lag gauge", slog.Any("error", err)) - return + var slotLagGauge *shared.Float64Gauge + var openConnectionsGauge *shared.Int64Gauge + if a.OtelManager != nil { + // seriously + var ok bool + slotLagGaugeKey := fmt.Sprintf("%s_slotlag_%s", peerName, slotName) + slotLagGauge, ok = a.OtelManager.SlotLagGaugesCache[slotLagGaugeKey] + if !ok { + slotLagGauge, err = shared.NewFloat64SyncGauge(a.OtelManager.SlotLagMeter, + slotLagGaugeKey, + metric.WithUnit("MB"), + metric.WithDescription(fmt.Sprintf("Slot lag for slot %s on %s", slotName, peerName))) + if err != nil { + logger.Error("Failed to create slot lag gauge", slog.Any("error", err)) + return + } + a.OtelManager.SlotLagGaugesCache[slotLagGaugeKey] = slotLagGauge } - a.OtelManager.SlotLagGaugesCache[slotLagGaugeKey] = slotLagGauge - } - openConnectionsGaugeKey := fmt.Sprintf("%s_open_connections", peerName) - openConnectionsGauge, ok := a.OtelManager.OpenConnectionsGaugesCache[openConnectionsGaugeKey] - if !ok { - openConnectionsGauge, err = shared.NewInt64SyncGauge(a.OtelManager.SlotLagMeter, - openConnectionsGaugeKey, - metric.WithUnit("connections"), - metric.WithDescription(fmt.Sprintf("Current open connections for PeerDB user on %s", peerName))) - if err != nil { - logger.Error("Failed to create open connections gauge", slog.Any("error", err)) - return + openConnectionsGaugeKey := peerName + "_open_connections" + openConnectionsGauge, ok = a.OtelManager.OpenConnectionsGaugesCache[openConnectionsGaugeKey] + if !ok { + openConnectionsGauge, err = shared.NewInt64SyncGauge(a.OtelManager.SlotLagMeter, + openConnectionsGaugeKey, + metric.WithUnit("connections"), + metric.WithDescription("Current open connections for PeerDB user on "+peerName)) + if err != nil { + logger.Error("Failed to create open connections gauge", slog.Any("error", err)) + return + } + a.OtelManager.OpenConnectionsGaugesCache[openConnectionsGaugeKey] = openConnectionsGauge } - a.OtelManager.OpenConnectionsGaugesCache[openConnectionsGaugeKey] = openConnectionsGauge } - err = srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName, slotLagGauge, openConnectionsGauge) + err = srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName, + slotLagGauge, openConnectionsGauge) if err != nil { logger.Error("Failed to handle slot info", slog.Any("error", err)) } diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index 2364e3f16a..af166f4374 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -35,6 +35,7 @@ type WorkerSetupOptions struct { TemporalMaxConcurrentActivities int TemporalMaxConcurrentWorkflowTasks int EnableProfiling bool + EnableOtelMetrics bool } type workerSetupResponse struct { @@ -149,11 +150,6 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { return nil, fmt.Errorf("unable to create catalog connection pool: %w", err) } - metricsProvider, err := setupOtelMetricsExporter() - if err != nil { - return nil, err - } - c, err := client.Dial(clientOptions) if err != nil { return nil, fmt.Errorf("unable to create Temporal client: %w", err) @@ -178,24 +174,38 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { }) peerflow.RegisterFlowWorkerWorkflows(w) - w.RegisterActivity(&activities.FlowableActivity{ - CatalogPool: conn, - Alerter: alerting.NewAlerter(context.Background(), conn), - CdcCache: make(map[string]activities.CdcCacheEntry), - OtelManager: &activities.OtelManager{ + var metricsProvider *sdkmetric.MeterProvider + var otelManager *activities.OtelManager + if opts.EnableOtelMetrics { + metricsProvider, err = setupOtelMetricsExporter() + if err != nil { + return nil, err + } + otelManager = &activities.OtelManager{ MetricsProvider: metricsProvider, SlotLagMeter: metricsProvider.Meter("flow-worker/cdc/slot-lag"), OpenConnectionsMeter: metricsProvider.Meter("flow-worker/open-connections"), SlotLagGaugesCache: make(map[string]*shared.Float64Gauge), OpenConnectionsGaugesCache: make(map[string]*shared.Int64Gauge), - }, + } + } + w.RegisterActivity(&activities.FlowableActivity{ + CatalogPool: conn, + Alerter: alerting.NewAlerter(context.Background(), conn), + CdcCache: make(map[string]activities.CdcCacheEntry), + OtelManager: otelManager, }) return &workerSetupResponse{ Client: c, Worker: w, Cleanup: func() { - metricsProvider.Shutdown(context.Background()) + if otelManager != nil { + err := otelManager.MetricsProvider.Shutdown(context.Background()) + if err != nil { + slog.Error("Failed to shutdown metrics provider", slog.Any("error", err)) + } + } c.Close() }, }, nil diff --git a/flow/main.go b/flow/main.go index 78593f47e6..3b1696161b 100644 --- a/flow/main.go +++ b/flow/main.go @@ -47,6 +47,12 @@ func main() { Usage: "Enable profiling for the application", Sources: cli.EnvVars("ENABLE_PROFILING"), } + otelMetricsFlag := &cli.BoolFlag{ + Name: "enable-otel-metrics", + Value: false, // Default is off + Usage: "Enable OpenTelemetry metrics for the application", + Sources: cli.EnvVars("ENABLE_OTEL_METRICS"), + } pyroscopeServerFlag := &cli.StringFlag{ Name: "pyroscope-server-address", @@ -86,6 +92,7 @@ func main() { res, err := cmd.WorkerSetup(&cmd.WorkerSetupOptions{ TemporalHostPort: temporalHostPort, EnableProfiling: clicmd.Bool("enable-profiling"), + EnableOtelMetrics: clicmd.Bool("enable-otel-metrics"), PyroscopeServer: clicmd.String("pyroscope-server-address"), TemporalNamespace: clicmd.String("temporal-namespace"), TemporalCert: clicmd.String("temporal-cert"), @@ -102,6 +109,7 @@ func main() { Flags: []cli.Flag{ temporalHostPortFlag, profilingFlag, + otelMetricsFlag, pyroscopeServerFlag, temporalNamespaceFlag, &temporalCertFlag, diff --git a/flow/shared/otel_metrics.go b/flow/shared/otel_metrics.go index 7566d99818..89017296f3 100644 --- a/flow/shared/otel_metrics.go +++ b/flow/shared/otel_metrics.go @@ -19,7 +19,7 @@ type Int64Gauge struct { func NewInt64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Int64ObservableGaugeOption) (*Int64Gauge, error) { syncGauge := &Int64Gauge{} - observableGauge, err := meter.Int64ObservableGauge(gaugeName, append(opts, metric.WithInt64Callback(syncGauge.Callback))...) + observableGauge, err := meter.Int64ObservableGauge(gaugeName, append(opts, metric.WithInt64Callback(syncGauge.callback))...) if err != nil { return nil, fmt.Errorf("failed to create Int64SyncGauge: %w", err) } @@ -27,12 +27,15 @@ func NewInt64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Int6 return syncGauge, nil } -func (g *Int64Gauge) Callback(ctx context.Context, o metric.Int64Observer) error { +func (g *Int64Gauge) callback(ctx context.Context, o metric.Int64Observer) error { o.Observe(g.currentVal.Load()) return nil } func (g *Int64Gauge) Set(val int64) { + if g == nil { + return + } g.currentVal.Store(val) } @@ -44,7 +47,7 @@ type Float64Gauge struct { func NewFloat64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Float64ObservableGaugeOption) (*Float64Gauge, error) { syncGauge := &Float64Gauge{} - observableGauge, err := meter.Float64ObservableGauge(gaugeName, append(opts, metric.WithFloat64Callback(syncGauge.Callback))...) + observableGauge, err := meter.Float64ObservableGauge(gaugeName, append(opts, metric.WithFloat64Callback(syncGauge.callback))...) if err != nil { return nil, fmt.Errorf("failed to create Int64SyncGauge: %w", err) } @@ -52,7 +55,7 @@ func NewFloat64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Fl return syncGauge, nil } -func (g *Float64Gauge) Callback(ctx context.Context, o metric.Float64Observer) error { +func (g *Float64Gauge) callback(ctx context.Context, o metric.Float64Observer) error { g.floatMutex.Lock() defer g.floatMutex.Unlock() o.Observe(g.currentVal) @@ -60,6 +63,9 @@ func (g *Float64Gauge) Callback(ctx context.Context, o metric.Float64Observer) e } func (g *Float64Gauge) Set(val float64) { + if g == nil { + return + } g.floatMutex.Lock() defer g.floatMutex.Unlock() g.currentVal = val