Skip to content

Commit

Permalink
fixed review comments pt.2
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed May 3, 2024
1 parent 5cbb8e4 commit 6b34cfd
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 26 deletions.
41 changes: 15 additions & 26 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,34 +607,23 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
var slotLagGauge *shared.Float64Gauge
var openConnectionsGauge *shared.Int64Gauge
if a.OtelManager != nil {
// seriously
var ok bool
slotLagGaugeKey := fmt.Sprintf("%s_slotlag_%s", peerName, slotName)
slotLagGauge, ok = a.OtelManager.SlotLagGaugesCache[slotLagGaugeKey]
if !ok {
slotLagGauge, err = shared.NewFloat64SyncGauge(a.OtelManager.SlotLagMeter,
slotLagGaugeKey,
metric.WithUnit("MB"),
metric.WithDescription(fmt.Sprintf("Slot lag for slot %s on %s", slotName, peerName)))
if err != nil {
logger.Error("Failed to create slot lag gauge", slog.Any("error", err))
return
}
a.OtelManager.SlotLagGaugesCache[slotLagGaugeKey] = slotLagGauge
slotLagGauge, err = shared.GetOrInitFloat64Gauge(a.OtelManager.SlotLagMeter,
a.OtelManager.SlotLagGaugesCache,
fmt.Sprintf("%s_slotlag_%s", peerName, slotName),
metric.WithUnit("MB"),
metric.WithDescription(fmt.Sprintf("Slot lag for slot %s on %s", slotName, peerName)))
if err != nil {
logger.Error("Failed to get slot lag gauge", slog.Any("error", err))
return
}

openConnectionsGaugeKey := peerName + "_open_connections"
openConnectionsGauge, ok = a.OtelManager.OpenConnectionsGaugesCache[openConnectionsGaugeKey]
if !ok {
openConnectionsGauge, err = shared.NewInt64SyncGauge(a.OtelManager.SlotLagMeter,
openConnectionsGaugeKey,
metric.WithUnit("connections"),
metric.WithDescription("Current open connections for PeerDB user on "+peerName))
if err != nil {
logger.Error("Failed to create open connections gauge", slog.Any("error", err))
return
}
a.OtelManager.OpenConnectionsGaugesCache[openConnectionsGaugeKey] = openConnectionsGauge
openConnectionsGauge, err = shared.GetOrInitInt64Gauge(a.OtelManager.OpenConnectionsMeter,
a.OtelManager.OpenConnectionsGaugesCache,
peerName+"_open_connections", metric.WithUnit("connections"),
metric.WithDescription("Current open connections for PeerDB user on "+peerName))
if err != nil {
logger.Error("Failed to get open connections gauge", slog.Any("error", err))
return
}
}

Expand Down
30 changes: 30 additions & 0 deletions flow/shared/otel_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,33 @@ func (g *Float64Gauge) Set(val float64) {
}
g.currentValAsU64.Store(math.Float64bits(val))
}

func GetOrInitInt64Gauge(meter metric.Meter, cache map[string]*Int64Gauge,
name string, opts ...metric.Int64ObservableGaugeOption,
) (*Int64Gauge, error) {
gauge, ok := cache[name]
if !ok {
var err error
gauge, err = NewInt64SyncGauge(meter, name, opts...)
if err != nil {
return nil, err
}
cache[name] = gauge
}
return gauge, nil
}

func GetOrInitFloat64Gauge(meter metric.Meter, cache map[string]*Float64Gauge,
name string, opts ...metric.Float64ObservableGaugeOption,
) (*Float64Gauge, error) {
gauge, ok := cache[name]
if !ok {
var err error
gauge, err = NewFloat64SyncGauge(meter, name, opts...)
if err != nil {
return nil, err
}
cache[name] = gauge
}
return gauge, nil
}

0 comments on commit 6b34cfd

Please sign in to comment.