From 6ae10dba3516209ab8a9d96c0035c274d53a6988 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 17 Jan 2025 14:23:20 +1100 Subject: [PATCH] metrics done --- backend/runner/pubsub/consumer.go | 6 +- .../pubsub/observability/async_calls.go | 181 ------------------ .../pubsub/observability/observability.go | 7 - backend/runner/pubsub/observability/pubsub.go | 74 +++---- 4 files changed, 45 insertions(+), 223 deletions(-) delete mode 100644 backend/runner/pubsub/observability/async_calls.go diff --git a/backend/runner/pubsub/consumer.go b/backend/runner/pubsub/consumer.go index ede35df4e..e8433024f 100644 --- a/backend/runner/pubsub/consumer.go +++ b/backend/runner/pubsub/consumer.go @@ -289,6 +289,7 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram remainingRetries := c.retryParams.Count backoff := c.retryParams.MinBackoff for { + startTime := time.Now() callCtx, callCancel := context.WithCancel(ctx) callChan := make(chan error) go func() { @@ -297,7 +298,8 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram var err error select { case cmd := <-resetOffset: - observability.AsyncCalls.Completed(ctx, schema.RefKey{Module: c.moduleName, Name: c.verb.Name}) + logger.Debugf("Cancelled call for subscription %s due to offset reset", c.verb.Name) + observability.PubSub.Consumed(ctx, c.subscriber.Topic.ToRefKey(), schema.RefKey{Module: c.moduleName, Name: c.verb.Name}, startTime, errors.New("cancelled due to offset reset")) // Don't wait for call to end before resetting offsets as it may take a while. callCancel() @@ -313,7 +315,7 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram // close call context now that the call is finished callCancel() } - observability.AsyncCalls.Completed(callCtx) + observability.PubSub.Consumed(ctx, c.subscriber.Topic.ToRefKey(), schema.RefKey{Module: c.moduleName, Name: c.verb.Name}, startTime, err) if err == nil { break } diff --git a/backend/runner/pubsub/observability/async_calls.go b/backend/runner/pubsub/observability/async_calls.go deleted file mode 100644 index 7b2982b03..000000000 --- a/backend/runner/pubsub/observability/async_calls.go +++ /dev/null @@ -1,181 +0,0 @@ -package observability - -import ( - "context" - "encoding/json" - "fmt" - "time" - - "github.com/alecthomas/types/optional" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/noop" - "go.opentelemetry.io/otel/propagation" - - "github.com/block/ftl/common/schema" - "github.com/block/ftl/internal/observability" -) - -const ( - asyncCallMeterName = "ftl.async_call" - asyncCallOriginAttr = "ftl.async_call.origin" - asyncCallVerbRefAttr = "ftl.async_call.verb.ref" - asyncCallCatchVerbRefAttr = "ftl.async_call.catch_verb.ref" - asyncCallIsCatchingAttr = "ftl.async_call.catching" - asyncCallTimeSinceScheduledAtBucketAttr = "ftl.async_call.time_since_scheduled_at_ms.bucket" - asyncCallRemainingAttemptsAttr = "ftl.async_call.remaining_attempts" - asyncCallExecFailureModeAttr = "ftl.async_call.execution.failure_mode" -) - -type AsyncCallMetrics struct { - created metric.Int64Counter - acquired metric.Int64Counter - executed metric.Int64Counter - completed metric.Int64Counter - msToComplete metric.Int64Histogram - queueDepth metric.Int64Gauge -} - -func initAsyncCallMetrics() (*AsyncCallMetrics, error) { - result := &AsyncCallMetrics{ - created: noop.Int64Counter{}, - acquired: noop.Int64Counter{}, - executed: noop.Int64Counter{}, - completed: noop.Int64Counter{}, - msToComplete: noop.Int64Histogram{}, - queueDepth: noop.Int64Gauge{}, - } - - var err error - meter := otel.Meter(asyncCallMeterName) - - signalName := fmt.Sprintf("%s.created", asyncCallMeterName) - if result.created, err = meter.Int64Counter(signalName, metric.WithUnit("1"), - metric.WithDescription("the number of times that an async call was created")); err != nil { - return nil, wrapErr(signalName, err) - } - - signalName = fmt.Sprintf("%s.acquired", asyncCallMeterName) - if result.acquired, err = meter.Int64Counter(signalName, metric.WithUnit("1"), - metric.WithDescription("the number of times that the controller tries acquiring an async call")); err != nil { - return nil, wrapErr(signalName, err) - } - - signalName = fmt.Sprintf("%s.executed", asyncCallMeterName) - if result.executed, err = meter.Int64Counter(signalName, metric.WithUnit("1"), - metric.WithDescription("the number of times that the controller tries executing an async call")); err != nil { - return nil, wrapErr(signalName, err) - } - - signalName = fmt.Sprintf("%s.completed", asyncCallMeterName) - if result.completed, err = meter.Int64Counter(signalName, metric.WithUnit("1"), - metric.WithDescription("the number of times that the controller tries completing an async call")); err != nil { - return nil, wrapErr(signalName, err) - } - - signalName = fmt.Sprintf("%s.ms_to_complete", asyncCallMeterName) - if result.msToComplete, err = meter.Int64Histogram(signalName, metric.WithUnit("ms"), - metric.WithDescription("duration in ms to complete an async call, from the earliest time it was scheduled to execute")); err != nil { - return nil, wrapErr(signalName, err) - } - - signalName = fmt.Sprintf("%s.queue_depth", asyncCallMeterName) - if result.queueDepth, err = meter.Int64Gauge(signalName, metric.WithUnit("1"), - metric.WithDescription("number of async calls queued up")); err != nil { - return nil, wrapErr(signalName, err) - } - - return result, nil -} - -// func (m *AsyncCallMetrics) Created(ctx context.Context, verb schema.RefKey, catchVerb optional.Option[schema.RefKey], origin string, remainingAttempts int64, maybeErr error) { -// attrs := extractRefAttrs(verb, catchVerb, origin, false) -// attrs = append(attrs, observability.SuccessOrFailureStatusAttr(maybeErr == nil)) -// attrs = append(attrs, attribute.Int64(asyncCallRemainingAttemptsAttr, remainingAttempts)) - -// m.created.Add(ctx, 1, metric.WithAttributes(attrs...)) -// } - -// func (m *AsyncCallMetrics) RecordQueueDepth(ctx context.Context, queueDepth int64) { -// m.queueDepth.Record(ctx, queueDepth) -// } - -// func (m *AsyncCallMetrics) Acquired(ctx context.Context, verb schema.RefKey, catchVerb optional.Option[schema.RefKey], origin string, scheduledAt time.Time, isCatching bool, maybeErr error) { -// attrs := extractAsyncCallAttrs(verb, catchVerb, origin, scheduledAt, isCatching) -// attrs = append(attrs, observability.SuccessOrFailureStatusAttr(maybeErr == nil)) -// m.acquired.Add(ctx, 1, metric.WithAttributes(attrs...)) -// } - -// // AcquireFailed should be called if an acquisition failed before any call data could be retrieved. -// func (m *AsyncCallMetrics) AcquireFailed(ctx context.Context, err error) { -// m.acquired.Add(ctx, 1, metric.WithAttributes(observability.SuccessOrFailureStatusAttr(false))) -// } - -// func (m *AsyncCallMetrics) Executed(ctx context.Context, verb schema.RefKey, catchVerb optional.Option[schema.RefKey], origin string, scheduledAt time.Time, isCatching bool, maybeFailureMode optional.Option[string]) { -// attrs := extractAsyncCallAttrs(verb, catchVerb, origin, scheduledAt, isCatching) - -// failureMode, ok := maybeFailureMode.Get() -// attrs = append(attrs, observability.SuccessOrFailureStatusAttr(!ok)) -// if ok { -// attrs = append(attrs, attribute.String(asyncCallExecFailureModeAttr, failureMode)) -// } - -// m.executed.Add(ctx, 1, metric.WithAttributes(attrs...)) -// } - -func (m *AsyncCallMetrics) Completed(ctx context.Context, subscription schema.RefKey, origin string, maybeErr error) { - msToComplete := timeSinceMS(scheduledAt) - - attrs := extractRefAttrs(verb, catchVerb, origin, isCatching) - attrs = append(attrs, observability.SuccessOrFailureStatusAttr(maybeErr == nil)) - m.msToComplete.Record(ctx, msToComplete, metric.WithAttributes(attrs...)) - - attrs = append(attrs, attribute.String(asyncCallTimeSinceScheduledAtBucketAttr, asyncLogBucket(msToComplete))) - m.completed.Add(ctx, 1, metric.WithAttributes(attrs...)) - - m.queueDepth.Record(ctx, queueDepth) -} - -func ExtractTraceContextToContext(ctx context.Context, traceContext []byte) (context.Context, error) { - if len(traceContext) == 0 { - return ctx, nil - } - var oc propagation.MapCarrier - err := json.Unmarshal(traceContext, &oc) - if err != nil { - return ctx, fmt.Errorf("failed to unmarshal otel context: %w", err) - } - return otel.GetTextMapPropagator().Extract(ctx, oc), nil -} - -func RetrieveTraceContextFromContext(ctx context.Context) ([]byte, error) { - oc := propagation.MapCarrier(make(map[string]string)) - otel.GetTextMapPropagator().Inject(ctx, oc) - jsonOc, err := json.Marshal(oc) - if err != nil { - return jsonOc, fmt.Errorf("failed to marshal otel context: %w", err) - } - return jsonOc, nil -} - -func extractAsyncCallAttrs(verb schema.RefKey, catchVerb optional.Option[schema.RefKey], origin string, scheduledAt time.Time, isCatching bool) []attribute.KeyValue { - return append(extractRefAttrs(verb, catchVerb, origin, isCatching), attribute.String(asyncCallTimeSinceScheduledAtBucketAttr, asyncLogBucket(timeSinceMS(scheduledAt)))) -} - -func asyncLogBucket(msToComplete int64) string { - return logBucket(4, msToComplete, optional.Some(4), optional.Some(6)) -} - -func extractRefAttrs(verb schema.RefKey, catchVerb optional.Option[schema.RefKey], origin string, isCatching bool) []attribute.KeyValue { - attributes := []attribute.KeyValue{ - attribute.String(observability.ModuleNameAttribute, verb.Module), - attribute.String(asyncCallVerbRefAttr, verb.String()), - attribute.String(asyncCallOriginAttr, origin), - attribute.Bool(asyncCallIsCatchingAttr, isCatching), - } - if catch, ok := catchVerb.Get(); ok { - attributes = append(attributes, attribute.String(asyncCallCatchVerbRefAttr, catch.String())) - } - return attributes -} diff --git a/backend/runner/pubsub/observability/observability.go b/backend/runner/pubsub/observability/observability.go index 22f44aa24..3e1f499b6 100644 --- a/backend/runner/pubsub/observability/observability.go +++ b/backend/runner/pubsub/observability/observability.go @@ -4,8 +4,6 @@ import "fmt" var ( PubSub *PubSubMetrics - // TODO: remove - AsyncCalls *AsyncCallMetrics ) func init() { @@ -14,9 +12,4 @@ func init() { if err != nil { panic(fmt.Errorf("could not initialize pubsub metrics: %w", err)) } - - AsyncCalls, err = initAsyncCallMetrics() - if err != nil { - panic(fmt.Errorf("could not initialize async call metrics: %w", err)) - } } diff --git a/backend/runner/pubsub/observability/pubsub.go b/backend/runner/pubsub/observability/pubsub.go index 94487de1e..9ae2af194 100644 --- a/backend/runner/pubsub/observability/pubsub.go +++ b/backend/runner/pubsub/observability/pubsub.go @@ -3,8 +3,8 @@ package observability import ( "context" "fmt" + "time" - "github.com/alecthomas/types/optional" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -17,30 +17,33 @@ import ( // To learn more about how sinks and subscriptions work together, check out the // https://block.github.io/ftl/docs/reference/pubsub/ const ( - pubsubMeterName = "ftl.pubsub" - pubsubTopicRefAttr = "ftl.pubsub.topic.ref" - pubsubTopicModuleAttr = "ftl.pubsub.topic.module.name" - pubsubCallerVerbRefAttr = "ftl.pubsub.publish.caller.verb.ref" - pubsubSubscriptionRefAttr = "ftl.pubsub.subscription.ref" - pubsubSubscriptionModuleAttr = "ftl.pubsub.subscription.module.name" - pubsubFailedOperationAttr = "ftl.pubsub.propagation.failed_operation" + meterName = "ftl.pubsub" + topicRefAttr = "ftl.pubsub.topic.ref" + topicModuleAttr = "ftl.pubsub.topic.module.name" + callerVerbRefAttr = "ftl.pubsub.publish.caller.verb.ref" + subscriptionRefAttr = "ftl.pubsub.subscription.ref" + subscriptionModuleAttr = "ftl.pubsub.subscription.module.name" + // We do not know publication date anymore + // timeSinceScheduledAtBucketAttr = "ftl.pubsub.time_since_scheduled_at_ms.bucket" ) type PubSubMetrics struct { - published metric.Int64Counter - sinkCalled metric.Int64Counter + published metric.Int64Counter + consumed metric.Int64Counter + msToConsume metric.Int64Histogram } func initPubSubMetrics() (*PubSubMetrics, error) { result := &PubSubMetrics{ - published: noop.Int64Counter{}, - sinkCalled: noop.Int64Counter{}, + published: noop.Int64Counter{}, + consumed: noop.Int64Counter{}, + msToConsume: noop.Int64Histogram{}, } var err error - meter := otel.Meter(pubsubMeterName) + meter := otel.Meter(meterName) - counterName := fmt.Sprintf("%s.published", pubsubMeterName) + counterName := fmt.Sprintf("%s.published", meterName) if result.published, err = meter.Int64Counter( counterName, metric.WithUnit("1"), @@ -48,12 +51,16 @@ func initPubSubMetrics() (*PubSubMetrics, error) { return nil, wrapErr(counterName, err) } - counterName = fmt.Sprintf("%s.sink.called", pubsubMeterName) - if result.sinkCalled, err = meter.Int64Counter( - counterName, - metric.WithUnit("1"), - metric.WithDescription("the number of times that a pubsub event has been enqueued to asynchronously send to a subscriber")); err != nil { - return nil, wrapErr(counterName, err) + signalName := fmt.Sprintf("%s.consumed", meterName) + if result.consumed, err = meter.Int64Counter(signalName, metric.WithUnit("1"), + metric.WithDescription("the number of times that the controller tries completing an async call")); err != nil { + return nil, wrapErr(signalName, err) + } + + signalName = fmt.Sprintf("%s.ms_to_consume", meterName) + if result.msToConsume, err = meter.Int64Histogram(signalName, metric.WithUnit("ms"), + metric.WithDescription("duration in ms to complete an async call, from the earliest time it was scheduled to execute")); err != nil { + return nil, wrapErr(signalName, err) } return result, nil @@ -62,28 +69,29 @@ func initPubSubMetrics() (*PubSubMetrics, error) { func (m *PubSubMetrics) Published(ctx context.Context, module, topic, caller string, maybeErr error) { attrs := []attribute.KeyValue{ attribute.String(observability.ModuleNameAttribute, module), - attribute.String(pubsubTopicRefAttr, schema.RefKey{Module: module, Name: topic}.String()), - attribute.String(pubsubCallerVerbRefAttr, schema.RefKey{Module: module, Name: caller}.String()), + attribute.String(topicRefAttr, schema.RefKey{Module: module, Name: topic}.String()), + attribute.String(callerVerbRefAttr, schema.RefKey{Module: module, Name: caller}.String()), observability.SuccessOrFailureStatusAttr(maybeErr == nil), } m.published.Add(ctx, 1, metric.WithAttributes(attrs...)) } -func (m *PubSubMetrics) SinkCalled(ctx context.Context, topic schema.RefKey, optCaller optional.Option[string], subscription schema.RefKey) { - attrs := []attribute.KeyValue{ - attribute.String(pubsubTopicRefAttr, schema.RefKey{Module: topic.Module, Name: topic.Name}.String()), - attribute.String(pubsubTopicModuleAttr, topic.Module), - attribute.String(pubsubSubscriptionRefAttr, subscription.String()), - attribute.String(pubsubSubscriptionModuleAttr, subscription.Module), - } +func (m *PubSubMetrics) Consumed(ctx context.Context, topic, subscription schema.RefKey, startTime time.Time, maybeErr error) { + // This used to be time since publication time, not consumption start time. + // We should consider changing this back to time since publication time. + msToComplete := time.Since(startTime).Milliseconds() - caller, ok := optCaller.Get() - if ok { - attrs = append(attrs, attribute.String(pubsubCallerVerbRefAttr, schema.RefKey{Module: topic.Module, Name: caller}.String())) + attrs := []attribute.KeyValue{ + attribute.String(topicRefAttr, schema.RefKey{Module: topic.Module, Name: topic.Name}.String()), + attribute.String(topicModuleAttr, topic.Module), + attribute.String(subscriptionRefAttr, subscription.String()), + attribute.String(subscriptionModuleAttr, subscription.Module), + observability.SuccessOrFailureStatusAttr(maybeErr == nil), } - m.sinkCalled.Add(ctx, 1, metric.WithAttributes(attrs...)) + m.msToConsume.Record(ctx, msToComplete, metric.WithAttributes(attrs...)) + m.consumed.Add(ctx, 1, metric.WithAttributes(attrs...)) } func wrapErr(signalName string, err error) error {