Skip to content

Commit

Permalink
feat: add new async call latency histogram metric: `ftl.async_call.ms…
Browse files Browse the repository at this point in the history
…_to_complete` (#2221)

Printout showing the exact latency value is the same on the counter attr
and on the hist metric itself:
```
Metric #2
Descriptor:
     -> Name: ftl.async_call.completed
     -> Description: the number of times that the controller tries completing an async call
     -> Unit: 1
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.async_call.origin: Str(sub:echo.sub)
     -> ftl.async_call.time_since_scheduled_at_ms: Int(39)
     -> ftl.async_call.verb.ref: Str(echo.echoSinkOne)
     -> ftl.module.name: Str(echo)
     -> ftl.status.succeeded: Bool(true)
StartTimestamp: 2024-07-31 18:54:35.526397 +0000 UTC
Timestamp: 2024-07-31 18:55:10.527278 +0000 UTC
Value: 1

Metric #3
Descriptor:
     -> Name: ftl.async_call.ms_to_complete
     -> Description: duration in ms to complete an async call, from the earliest time it was scheduled to execute
     -> Unit: ms
     -> DataType: Histogram
     -> AggregationTemporality: Cumulative

HistogramDataPoints #0
Data point attributes:
     -> ftl.async_call.origin: Str(sub:echo.sub)
     -> ftl.async_call.verb.ref: Str(echo.echoSinkOne)
     -> ftl.module.name: Str(echo)
     -> ftl.status.succeeded: Bool(true)
StartTimestamp: 2024-07-31 18:54:35.526402 +0000 UTC
Timestamp: 2024-07-31 18:55:10.527279 +0000 UTC
Count: 1
Sum: 39.000000
Min: 39.000000
Max: 39.000000
```

Issue: #2194
  • Loading branch information
deniseli authored Jul 31, 2024
1 parent b9bca2f commit 816c133
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 34 deletions.
80 changes: 46 additions & 34 deletions backend/controller/observability/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,51 +24,58 @@ const (
)

type AsyncCallMetrics struct {
meter metric.Meter
acquired metric.Int64Counter
executed metric.Int64Counter
completed metric.Int64Counter
acquired metric.Int64Counter
executed metric.Int64Counter
completed metric.Int64Counter
msToComplete metric.Int64Histogram
}

func initAsyncCallMetrics() (*AsyncCallMetrics, error) {
result := &AsyncCallMetrics{}
var errs error
var err error
result := &AsyncCallMetrics{
acquired: noop.Int64Counter{},
executed: noop.Int64Counter{},
completed: noop.Int64Counter{},
msToComplete: noop.Int64Histogram{},
}

result.meter = otel.Meter(asyncCallMeterName)
var err error
meter := otel.Meter(asyncCallMeterName)

counterName := fmt.Sprintf("%s.acquired", asyncCallMeterName)
if result.acquired, err = result.meter.Int64Counter(
counterName,
metric.WithUnit("1"),
signalName := fmt.Sprintf("%s.acquired", asyncCallMeterName)
if result.acquired, err = meter.Int64Counter(signalName, metric.WithUnit("1"),
metric.WithDescription("the number of times that the controller tries acquiring an async call")); err != nil {
errs = handleInitCounterError(errs, err, counterName)
result.acquired = noop.Int64Counter{}
return nil, wrapErr(signalName, err)
}

counterName = fmt.Sprintf("%s.executed", asyncCallMeterName)
if result.executed, err = result.meter.Int64Counter(
counterName,
metric.WithUnit("1"),
signalName = fmt.Sprintf("%s.executed", asyncCallMeterName)
if result.executed, err = meter.Int64Counter(signalName, metric.WithUnit("1"),
metric.WithDescription("the number of times that the controller tries executing an async call")); err != nil {
errs = handleInitCounterError(errs, err, counterName)
result.executed = noop.Int64Counter{}
return nil, wrapErr(signalName, err)
}

counterName = fmt.Sprintf("%s.completed", asyncCallMeterName)
if result.completed, err = result.meter.Int64Counter(
counterName,
metric.WithUnit("1"),
signalName = fmt.Sprintf("%s.completed", asyncCallMeterName)
if result.completed, err = meter.Int64Counter(signalName, metric.WithUnit("1"),
metric.WithDescription("the number of times that the controller tries completing an async call")); err != nil {
errs = handleInitCounterError(errs, err, counterName)
result.completed = noop.Int64Counter{}
return nil, wrapErr(signalName, err)
}

return result, errs
signalName = fmt.Sprintf("%s.ms_to_complete", asyncCallMeterName)
if result.msToComplete, err = meter.Int64Histogram(signalName, metric.WithUnit("ms"),
metric.WithDescription("duration in ms to complete an async call, from the earliest time it was scheduled to execute")); err != nil {
return nil, wrapErr(signalName, err)
}

return result, nil
}

func wrapErr(signalName string, err error) error {
return fmt.Errorf("failed to create %q signal: %w", signalName, err)
}

func (m *AsyncCallMetrics) Acquired(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeErr error) {
m.acquired.Add(ctx, 1, metric.WithAttributes(extractAsyncCallAndMaybeErrAttrs(verb, origin, scheduledAt, maybeErr)...))
attrs := extractAsyncCallAttrs(verb, origin, scheduledAt)
attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil))
m.acquired.Add(ctx, 1, metric.WithAttributes(attrs...))
}

func (m *AsyncCallMetrics) Executed(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeFailureMode optional.Option[string]) {
Expand All @@ -84,19 +91,24 @@ func (m *AsyncCallMetrics) Executed(ctx context.Context, verb schema.RefKey, ori
}

func (m *AsyncCallMetrics) Completed(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeErr error) {
m.completed.Add(ctx, 1, metric.WithAttributes(extractAsyncCallAndMaybeErrAttrs(verb, origin, scheduledAt, maybeErr)...))
}
msToComplete := timeSinceMS(scheduledAt)

func extractAsyncCallAndMaybeErrAttrs(verb schema.RefKey, origin string, scheduledAt time.Time, maybeErr error) []attribute.KeyValue {
attrs := extractAsyncCallAttrs(verb, origin, scheduledAt)
return append(attrs, attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil))
attrs := extractRefAttrs(verb, origin)
attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil))
m.msToComplete.Record(ctx, msToComplete, metric.WithAttributes(attrs...))

attrs = append(attrs, attribute.Int64(asyncCallTimeSinceScheduledAtAttr, msToComplete))
m.completed.Add(ctx, 1, metric.WithAttributes(attrs...))
}

func extractAsyncCallAttrs(verb schema.RefKey, origin string, scheduledAt time.Time) []attribute.KeyValue {
return append(extractRefAttrs(verb, origin), attribute.Int64(asyncCallTimeSinceScheduledAtAttr, timeSinceMS(scheduledAt)))
}

func extractRefAttrs(verb schema.RefKey, origin string) []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String(observability.ModuleNameAttribute, verb.Module),
attribute.String(asyncCallVerbRefAttr, verb.String()),
attribute.String(asyncCallOriginAttr, origin),
attribute.Int64(asyncCallTimeSinceScheduledAtAttr, time.Since(scheduledAt).Milliseconds()),
}
}
5 changes: 5 additions & 0 deletions backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package observability
import (
"errors"
"fmt"
"time"
)

var (
Expand All @@ -26,3 +27,7 @@ func init() {
panic(fmt.Errorf("could not initialize controller metrics: %w", errs))
}
}

func timeSinceMS(start time.Time) int64 {
return time.Since(start).Milliseconds()
}

0 comments on commit 816c133

Please sign in to comment.