Skip to content

Commit

Permalink
fixed lints, added ENABLE_OTEL_METRICS
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed May 3, 2024
1 parent e525937 commit b1c21a3
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 40 deletions.
55 changes: 31 additions & 24 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,35 +604,42 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
return
}

slotLagGaugeKey := fmt.Sprintf("%s_slotlag_%s", peerName, slotName)
slotLagGauge, ok := a.OtelManager.SlotLagGaugesCache[slotLagGaugeKey]
if !ok {
slotLagGauge, err = shared.NewFloat64SyncGauge(a.OtelManager.SlotLagMeter,
slotLagGaugeKey,
metric.WithUnit("MB"),
metric.WithDescription(fmt.Sprintf("Slot lag for slot %s on %s", slotName, peerName)))
if err != nil {
logger.Error("Failed to create slot lag gauge", slog.Any("error", err))
return
var slotLagGauge *shared.Float64Gauge
var openConnectionsGauge *shared.Int64Gauge
if a.OtelManager != nil {
// seriously
var ok bool
slotLagGaugeKey := fmt.Sprintf("%s_slotlag_%s", peerName, slotName)
slotLagGauge, ok = a.OtelManager.SlotLagGaugesCache[slotLagGaugeKey]
if !ok {
slotLagGauge, err = shared.NewFloat64SyncGauge(a.OtelManager.SlotLagMeter,
slotLagGaugeKey,
metric.WithUnit("MB"),
metric.WithDescription(fmt.Sprintf("Slot lag for slot %s on %s", slotName, peerName)))
if err != nil {
logger.Error("Failed to create slot lag gauge", slog.Any("error", err))
return
}
a.OtelManager.SlotLagGaugesCache[slotLagGaugeKey] = slotLagGauge
}
a.OtelManager.SlotLagGaugesCache[slotLagGaugeKey] = slotLagGauge
}

openConnectionsGaugeKey := fmt.Sprintf("%s_open_connections", peerName)
openConnectionsGauge, ok := a.OtelManager.OpenConnectionsGaugesCache[openConnectionsGaugeKey]
if !ok {
openConnectionsGauge, err = shared.NewInt64SyncGauge(a.OtelManager.SlotLagMeter,
openConnectionsGaugeKey,
metric.WithUnit("connections"),
metric.WithDescription(fmt.Sprintf("Current open connections for PeerDB user on %s", peerName)))
if err != nil {
logger.Error("Failed to create open connections gauge", slog.Any("error", err))
return
openConnectionsGaugeKey := peerName + "_open_connections"
openConnectionsGauge, ok = a.OtelManager.OpenConnectionsGaugesCache[openConnectionsGaugeKey]
if !ok {
openConnectionsGauge, err = shared.NewInt64SyncGauge(a.OtelManager.SlotLagMeter,
openConnectionsGaugeKey,
metric.WithUnit("connections"),
metric.WithDescription("Current open connections for PeerDB user on "+peerName))
if err != nil {
logger.Error("Failed to create open connections gauge", slog.Any("error", err))
return
}
a.OtelManager.OpenConnectionsGaugesCache[openConnectionsGaugeKey] = openConnectionsGauge
}
a.OtelManager.OpenConnectionsGaugesCache[openConnectionsGaugeKey] = openConnectionsGauge
}

err = srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName, slotLagGauge, openConnectionsGauge)
err = srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName,
slotLagGauge, openConnectionsGauge)
if err != nil {
logger.Error("Failed to handle slot info", slog.Any("error", err))
}
Expand Down
34 changes: 22 additions & 12 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type WorkerSetupOptions struct {
TemporalMaxConcurrentActivities int
TemporalMaxConcurrentWorkflowTasks int
EnableProfiling bool
EnableOtelMetrics bool
}

type workerSetupResponse struct {
Expand Down Expand Up @@ -149,11 +150,6 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
return nil, fmt.Errorf("unable to create catalog connection pool: %w", err)
}

metricsProvider, err := setupOtelMetricsExporter()
if err != nil {
return nil, err
}

c, err := client.Dial(clientOptions)
if err != nil {
return nil, fmt.Errorf("unable to create Temporal client: %w", err)
Expand All @@ -178,24 +174,38 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
})
peerflow.RegisterFlowWorkerWorkflows(w)

w.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerting.NewAlerter(context.Background(), conn),
CdcCache: make(map[string]activities.CdcCacheEntry),
OtelManager: &activities.OtelManager{
var metricsProvider *sdkmetric.MeterProvider
var otelManager *activities.OtelManager
if opts.EnableOtelMetrics {
metricsProvider, err = setupOtelMetricsExporter()
if err != nil {
return nil, err
}
otelManager = &activities.OtelManager{
MetricsProvider: metricsProvider,
SlotLagMeter: metricsProvider.Meter("flow-worker/cdc/slot-lag"),
OpenConnectionsMeter: metricsProvider.Meter("flow-worker/open-connections"),
SlotLagGaugesCache: make(map[string]*shared.Float64Gauge),
OpenConnectionsGaugesCache: make(map[string]*shared.Int64Gauge),
},
}
}
w.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerting.NewAlerter(context.Background(), conn),
CdcCache: make(map[string]activities.CdcCacheEntry),
OtelManager: otelManager,
})

return &workerSetupResponse{
Client: c,
Worker: w,
Cleanup: func() {
metricsProvider.Shutdown(context.Background())
if otelManager != nil {
err := otelManager.MetricsProvider.Shutdown(context.Background())
if err != nil {
slog.Error("Failed to shutdown metrics provider", slog.Any("error", err))
}
}
c.Close()
},
}, nil
Expand Down
8 changes: 8 additions & 0 deletions flow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ func main() {
Usage: "Enable profiling for the application",
Sources: cli.EnvVars("ENABLE_PROFILING"),
}
otelMetricsFlag := &cli.BoolFlag{
Name: "enable-otel-metrics",
Value: false, // Default is off
Usage: "Enable OpenTelemetry metrics for the application",
Sources: cli.EnvVars("ENABLE_OTEL_METRICS"),
}

pyroscopeServerFlag := &cli.StringFlag{
Name: "pyroscope-server-address",
Expand Down Expand Up @@ -86,6 +92,7 @@ func main() {
res, err := cmd.WorkerSetup(&cmd.WorkerSetupOptions{
TemporalHostPort: temporalHostPort,
EnableProfiling: clicmd.Bool("enable-profiling"),
EnableOtelMetrics: clicmd.Bool("enable-otel-metrics"),
PyroscopeServer: clicmd.String("pyroscope-server-address"),
TemporalNamespace: clicmd.String("temporal-namespace"),
TemporalCert: clicmd.String("temporal-cert"),
Expand All @@ -102,6 +109,7 @@ func main() {
Flags: []cli.Flag{
temporalHostPortFlag,
profilingFlag,
otelMetricsFlag,
pyroscopeServerFlag,
temporalNamespaceFlag,
&temporalCertFlag,
Expand Down
14 changes: 10 additions & 4 deletions flow/shared/otel_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@ type Int64Gauge struct {

func NewInt64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Int64ObservableGaugeOption) (*Int64Gauge, error) {
syncGauge := &Int64Gauge{}
observableGauge, err := meter.Int64ObservableGauge(gaugeName, append(opts, metric.WithInt64Callback(syncGauge.Callback))...)
observableGauge, err := meter.Int64ObservableGauge(gaugeName, append(opts, metric.WithInt64Callback(syncGauge.callback))...)
if err != nil {
return nil, fmt.Errorf("failed to create Int64SyncGauge: %w", err)
}
syncGauge.observableGauge = observableGauge
return syncGauge, nil
}

func (g *Int64Gauge) Callback(ctx context.Context, o metric.Int64Observer) error {
func (g *Int64Gauge) callback(ctx context.Context, o metric.Int64Observer) error {
o.Observe(g.currentVal.Load())
return nil
}

func (g *Int64Gauge) Set(val int64) {
if g == nil {
return
}
g.currentVal.Store(val)
}

Expand All @@ -44,22 +47,25 @@ type Float64Gauge struct {

func NewFloat64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Float64ObservableGaugeOption) (*Float64Gauge, error) {
syncGauge := &Float64Gauge{}
observableGauge, err := meter.Float64ObservableGauge(gaugeName, append(opts, metric.WithFloat64Callback(syncGauge.Callback))...)
observableGauge, err := meter.Float64ObservableGauge(gaugeName, append(opts, metric.WithFloat64Callback(syncGauge.callback))...)
if err != nil {
return nil, fmt.Errorf("failed to create Int64SyncGauge: %w", err)
}
syncGauge.observableGauge = observableGauge
return syncGauge, nil
}

func (g *Float64Gauge) Callback(ctx context.Context, o metric.Float64Observer) error {
func (g *Float64Gauge) callback(ctx context.Context, o metric.Float64Observer) error {
g.floatMutex.Lock()
defer g.floatMutex.Unlock()
o.Observe(g.currentVal)
return nil
}

func (g *Float64Gauge) Set(val float64) {
if g == nil {
return
}
g.floatMutex.Lock()
defer g.floatMutex.Unlock()
g.currentVal = val
Expand Down

0 comments on commit b1c21a3

Please sign in to comment.