Skip to content

Commit

Permalink
metrics done
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Jan 17, 2025
1 parent d202977 commit 6ae10db
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 223 deletions.
6 changes: 4 additions & 2 deletions backend/runner/pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
remainingRetries := c.retryParams.Count
backoff := c.retryParams.MinBackoff
for {
startTime := time.Now()
callCtx, callCancel := context.WithCancel(ctx)
callChan := make(chan error)
go func() {
Expand All @@ -297,7 +298,8 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
var err error
select {
case cmd := <-resetOffset:
observability.AsyncCalls.Completed(ctx, schema.RefKey{Module: c.moduleName, Name: c.verb.Name})
logger.Debugf("Cancelled call for subscription %s due to offset reset", c.verb.Name)
observability.PubSub.Consumed(ctx, c.subscriber.Topic.ToRefKey(), schema.RefKey{Module: c.moduleName, Name: c.verb.Name}, startTime, errors.New("cancelled due to offset reset"))

// Don't wait for call to end before resetting offsets as it may take a while.
callCancel()
Expand All @@ -313,7 +315,7 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
// close call context now that the call is finished
callCancel()
}
observability.AsyncCalls.Completed(callCtx)
observability.PubSub.Consumed(ctx, c.subscriber.Topic.ToRefKey(), schema.RefKey{Module: c.moduleName, Name: c.verb.Name}, startTime, err)
if err == nil {
break
}
Expand Down
181 changes: 0 additions & 181 deletions backend/runner/pubsub/observability/async_calls.go

This file was deleted.

7 changes: 0 additions & 7 deletions backend/runner/pubsub/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import "fmt"

var (
PubSub *PubSubMetrics
// TODO: remove
AsyncCalls *AsyncCallMetrics
)

func init() {
Expand All @@ -14,9 +12,4 @@ func init() {
if err != nil {
panic(fmt.Errorf("could not initialize pubsub metrics: %w", err))
}

AsyncCalls, err = initAsyncCallMetrics()
if err != nil {
panic(fmt.Errorf("could not initialize async call metrics: %w", err))
}
}
74 changes: 41 additions & 33 deletions backend/runner/pubsub/observability/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package observability
import (
"context"
"fmt"
"time"

"github.com/alecthomas/types/optional"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand All @@ -17,43 +17,50 @@ import (
// To learn more about how sinks and subscriptions work together, check out the
// https://block.github.io/ftl/docs/reference/pubsub/
const (
pubsubMeterName = "ftl.pubsub"
pubsubTopicRefAttr = "ftl.pubsub.topic.ref"
pubsubTopicModuleAttr = "ftl.pubsub.topic.module.name"
pubsubCallerVerbRefAttr = "ftl.pubsub.publish.caller.verb.ref"
pubsubSubscriptionRefAttr = "ftl.pubsub.subscription.ref"
pubsubSubscriptionModuleAttr = "ftl.pubsub.subscription.module.name"
pubsubFailedOperationAttr = "ftl.pubsub.propagation.failed_operation"
meterName = "ftl.pubsub"
topicRefAttr = "ftl.pubsub.topic.ref"
topicModuleAttr = "ftl.pubsub.topic.module.name"
callerVerbRefAttr = "ftl.pubsub.publish.caller.verb.ref"
subscriptionRefAttr = "ftl.pubsub.subscription.ref"
subscriptionModuleAttr = "ftl.pubsub.subscription.module.name"
// We do not know publication date anymore
// timeSinceScheduledAtBucketAttr = "ftl.pubsub.time_since_scheduled_at_ms.bucket"
)

type PubSubMetrics struct {
published metric.Int64Counter
sinkCalled metric.Int64Counter
published metric.Int64Counter
consumed metric.Int64Counter
msToConsume metric.Int64Histogram
}

func initPubSubMetrics() (*PubSubMetrics, error) {
result := &PubSubMetrics{
published: noop.Int64Counter{},
sinkCalled: noop.Int64Counter{},
published: noop.Int64Counter{},
consumed: noop.Int64Counter{},
msToConsume: noop.Int64Histogram{},
}

var err error
meter := otel.Meter(pubsubMeterName)
meter := otel.Meter(meterName)

counterName := fmt.Sprintf("%s.published", pubsubMeterName)
counterName := fmt.Sprintf("%s.published", meterName)
if result.published, err = meter.Int64Counter(
counterName,
metric.WithUnit("1"),
metric.WithDescription("the number of times that an event is published to a topic")); err != nil {
return nil, wrapErr(counterName, err)
}

counterName = fmt.Sprintf("%s.sink.called", pubsubMeterName)
if result.sinkCalled, err = meter.Int64Counter(
counterName,
metric.WithUnit("1"),
metric.WithDescription("the number of times that a pubsub event has been enqueued to asynchronously send to a subscriber")); err != nil {
return nil, wrapErr(counterName, err)
signalName := fmt.Sprintf("%s.consumed", meterName)
if result.consumed, err = meter.Int64Counter(signalName, metric.WithUnit("1"),
metric.WithDescription("the number of times that the controller tries completing an async call")); err != nil {
return nil, wrapErr(signalName, err)
}

signalName = fmt.Sprintf("%s.ms_to_consume", meterName)
if result.msToConsume, err = meter.Int64Histogram(signalName, metric.WithUnit("ms"),
metric.WithDescription("duration in ms to complete an async call, from the earliest time it was scheduled to execute")); err != nil {
return nil, wrapErr(signalName, err)
}

return result, nil
Expand All @@ -62,28 +69,29 @@ func initPubSubMetrics() (*PubSubMetrics, error) {
func (m *PubSubMetrics) Published(ctx context.Context, module, topic, caller string, maybeErr error) {
attrs := []attribute.KeyValue{
attribute.String(observability.ModuleNameAttribute, module),
attribute.String(pubsubTopicRefAttr, schema.RefKey{Module: module, Name: topic}.String()),
attribute.String(pubsubCallerVerbRefAttr, schema.RefKey{Module: module, Name: caller}.String()),
attribute.String(topicRefAttr, schema.RefKey{Module: module, Name: topic}.String()),
attribute.String(callerVerbRefAttr, schema.RefKey{Module: module, Name: caller}.String()),
observability.SuccessOrFailureStatusAttr(maybeErr == nil),
}

m.published.Add(ctx, 1, metric.WithAttributes(attrs...))
}

func (m *PubSubMetrics) SinkCalled(ctx context.Context, topic schema.RefKey, optCaller optional.Option[string], subscription schema.RefKey) {
attrs := []attribute.KeyValue{
attribute.String(pubsubTopicRefAttr, schema.RefKey{Module: topic.Module, Name: topic.Name}.String()),
attribute.String(pubsubTopicModuleAttr, topic.Module),
attribute.String(pubsubSubscriptionRefAttr, subscription.String()),
attribute.String(pubsubSubscriptionModuleAttr, subscription.Module),
}
func (m *PubSubMetrics) Consumed(ctx context.Context, topic, subscription schema.RefKey, startTime time.Time, maybeErr error) {
// This used to be time since publication time, not consumption start time.
// We should consider changing this back to time since publication time.
msToComplete := time.Since(startTime).Milliseconds()

caller, ok := optCaller.Get()
if ok {
attrs = append(attrs, attribute.String(pubsubCallerVerbRefAttr, schema.RefKey{Module: topic.Module, Name: caller}.String()))
attrs := []attribute.KeyValue{
attribute.String(topicRefAttr, schema.RefKey{Module: topic.Module, Name: topic.Name}.String()),
attribute.String(topicModuleAttr, topic.Module),
attribute.String(subscriptionRefAttr, subscription.String()),
attribute.String(subscriptionModuleAttr, subscription.Module),
observability.SuccessOrFailureStatusAttr(maybeErr == nil),
}

m.sinkCalled.Add(ctx, 1, metric.WithAttributes(attrs...))
m.msToConsume.Record(ctx, msToComplete, metric.WithAttributes(attrs...))
m.consumed.Add(ctx, 1, metric.WithAttributes(attrs...))
}

func wrapErr(signalName string, err error) error {
Expand Down

0 comments on commit 6ae10db

Please sign in to comment.