Skip to content

Commit

Permalink
fix: otel rpc metrics also as span attributes (#2202)
Browse files Browse the repository at this point in the history
also adds an array of sent and received message sizes (in addition to
the unary fields) for the streaming interceptors
  • Loading branch information
safeer authored Jul 30, 2024
1 parent 606d028 commit 0777796
Showing 1 changed file with 114 additions and 80 deletions.
194 changes: 114 additions & 80 deletions internal/rpc/otel_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,23 @@ import (
)

const (
otelFtlRequestKeyAttr = attribute.Key("ftl.requestKey")
otelFtlVerbRefAttr = attribute.Key("ftl.verb.ref")
otelFtlVerbModuleAttr = attribute.Key("ftl.verb.module")
otelMessageEvent = "message"
otelMessageIDAttr = attribute.Key("message.id")
otelMessageSizeAttr = attribute.Key("message.uncompressed_size")
otelMessageTypeAttr = attribute.Key("message.type")
otelMessageTypeSent = "SENT"
otelMessageTypeReceived = "RECEIVED"
otelFtlRequestKeyAttr = attribute.Key("ftl.request_key")
otelFtlVerbChainAttr = attribute.Key("ftl.verb_chain")
otelFtlVerbRefAttr = attribute.Key("ftl.verb.ref")
otelFtlVerbModuleAttr = attribute.Key("ftl.verb.module")
otelMessageEventName = "message"
otelMessageEventIDAttr = attribute.Key("message.id")
otelMessageEventSizeAttr = attribute.Key("message.uncompressed_size")
otelMessageEventTypeAttr = attribute.Key("message.type")
otelMessageEventTypeSent = "SENT"
otelMessageEventTypeReceived = "RECEIVED"
otelMessageSentSizesAttr = attribute.Key("rpc.message.sent.sizes_bytes")
otelMessageReceivedSizesAttr = attribute.Key("rpc.message.received.sizes_bytes")
otelRPCDurationMetricName = "rpc.duration_ms"
otelRPCRequestSizeMetricName = "rpc.request.size_bytes"
otelRPCRequestsPerRPCMetricName = "rpc.request.count_per_rpc"
otelRPCResponseSizeMetricName = "rpc.response.size_bytes"
otelRPCResponsesPerRPCMetricName = "rpc.response.count_per_rpc"
)

func OtelInterceptor() connect.Interceptor {
Expand Down Expand Up @@ -72,6 +80,13 @@ func getAttributes(ctx context.Context, rpcSystemKey string) []attribute.KeyValu
attributes = append(attributes, otelFtlVerbRefAttr.String(verb.String()))
attributes = append(attributes, otelFtlVerbModuleAttr.String(verb.Module))
}
if verbs, ok := VerbsFromContext(ctx); ok {
verbStrings := make([]string, len(verbs))
for i, v := range verbs {
verbStrings[i] = v.String()
}
attributes = append(attributes, otelFtlVerbChainAttr.StringSlice(verbStrings))
}
return attributes
}

Expand All @@ -83,12 +98,16 @@ func (i *otelInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
name := strings.TrimLeft(request.Spec().Procedure, "/")

spanKind := trace.SpanKindClient
requestSpan := otelMessageTypeAttr.String(otelMessageTypeSent)
responseSpan := otelMessageTypeAttr.String(otelMessageTypeReceived)
requestSpan := otelMessageEventTypeAttr.String(otelMessageEventTypeSent)
responseSpan := otelMessageEventTypeAttr.String(otelMessageEventTypeReceived)
requestSizesAttr := otelMessageSentSizesAttr
responseSizesAttr := otelMessageReceivedSizesAttr
if !isClient {
spanKind = trace.SpanKindServer
requestSpan = otelMessageTypeAttr.String(otelMessageTypeReceived)
responseSpan = otelMessageTypeAttr.String(otelMessageTypeSent)
requestSpan = otelMessageEventTypeAttr.String(otelMessageEventTypeReceived)
responseSpan = otelMessageEventTypeAttr.String(otelMessageEventTypeSent)
requestSizesAttr = otelMessageReceivedSizesAttr
responseSizesAttr = otelMessageSentSizesAttr
}

attributes := getAttributes(ctx, request.Peer().Protocol)
Expand All @@ -107,30 +126,39 @@ func (i *otelInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
}
}

span.AddEvent(otelMessageEvent,
span.AddEvent(otelMessageEventName,
trace.WithAttributes(
requestSpan,
otelMessageIDAttr.Int(1),
otelMessageSizeAttr.Int(requestSize),
otelMessageEventIDAttr.Int(1),
otelMessageEventSizeAttr.Int(requestSize),
),
)

response, err := next(ctx, request)
attributes = append(attributes, statusCodeAttribute(request.Peer().Protocol, err))
var responseSize int
responseSize := 0
if err == nil {
if msg, ok := response.Any().(proto.Message); ok {
responseSize = proto.Size(msg)
}
}
span.AddEvent(otelMessageEvent,
span.AddEvent(otelMessageEventName,
trace.WithAttributes(
responseSpan,
otelMessageIDAttr.Int(1),
otelMessageSizeAttr.Int(responseSize),
otelMessageEventIDAttr.Int(1),
otelMessageEventSizeAttr.Int(responseSize),
),
)
span.SetAttributes(attributes...)
duration := time.Since(requestStartTime).Milliseconds()
span.SetAttributes(append(attributes,
attribute.Int64(otelRPCRequestsPerRPCMetricName, 1),
attribute.Int64(otelRPCRequestSizeMetricName, int64(requestSize)),
attribute.Int64(otelRPCResponsesPerRPCMetricName, 1),
attribute.Int64(otelRPCResponseSizeMetricName, int64(responseSize)),
attribute.Int64(otelRPCDurationMetricName, duration),
requestSizesAttr.Int64Slice([]int64{int64(requestSize)}),
responseSizesAttr.Int64Slice([]int64{int64(responseSize)}),
)...)
instruments := getInstruments(isClient)
instruments.duration.Record(
ctx,
Expand Down Expand Up @@ -160,11 +188,13 @@ func (i *otelInterceptor) WrapStreamingClient(next connect.StreamingClientFunc)

instruments := getInstruments(spec.IsClient)
state := &streamingState{
spec: spec,
protocol: conn.Peer().Protocol,
attributes: attributes,
receiveSize: instruments.responseSize,
sendSize: instruments.requestSize,
spec: spec,
protocol: conn.Peer().Protocol,
attributes: attributes,
receiveSizeMetric: instruments.responseSize,
sendSizeMetric: instruments.requestSize,
receiveSizes: []int64{},
sendSizes: []int64{},
}

return &streamingClientInterceptor{ // nolint:spancheck
Expand All @@ -179,22 +209,21 @@ func (i *otelInterceptor) WrapStreamingClient(next connect.StreamingClientFunc)
state.attributes = append(
state.attributes,
statusCodeAttribute(conn.Peer().Protocol, state.error))
span.SetAttributes(state.attributes...)
duration := time.Since(requestStartTime).Milliseconds()
span.SetAttributes(append(state.attributes,
attribute.Int64(otelRPCRequestsPerRPCMetricName, state.sentCounter),
attribute.Int64(otelRPCResponsesPerRPCMetricName, state.receivedCounter),
attribute.Int64(otelRPCDurationMetricName, duration),
otelMessageSentSizesAttr.Int64Slice(state.sendSizes),
otelMessageReceivedSizesAttr.Int64Slice(state.receiveSizes),
)...)
if state.error != nil {
span.SetStatus(codes.Error, state.error.Error())
}
span.End()
instruments.requestsPerRPC.Record(
ctx,
state.sentCounter,
metric.WithAttributes(state.attributes...))
instruments.responsesPerRPC.Record(
ctx,
state.receivedCounter,
metric.WithAttributes(state.attributes...))
instruments.duration.Record(ctx,
time.Since(requestStartTime).Milliseconds(),
metric.WithAttributes(state.attributes...))
instruments.requestsPerRPC.Record(ctx, state.sentCounter, metric.WithAttributes(state.attributes...))
instruments.responsesPerRPC.Record(ctx, state.receivedCounter, metric.WithAttributes(state.attributes...))
instruments.duration.Record(ctx, duration, metric.WithAttributes(state.attributes...))
},
}
}
Expand All @@ -217,11 +246,13 @@ func (i *otelInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc

instruments := getInstruments(conn.Spec().IsClient)
state := &streamingState{
spec: conn.Spec(),
protocol: conn.Peer().Protocol,
attributes: attributes,
receiveSize: instruments.responseSize,
sendSize: instruments.requestSize,
spec: conn.Spec(),
protocol: conn.Peer().Protocol,
attributes: attributes,
receiveSizeMetric: instruments.requestSize,
sendSizeMetric: instruments.responseSize,
receiveSizes: []int64{},
sendSizes: []int64{},
}
streamingHandler := &streamingHandlerInterceptor{
StreamingHandlerConn: conn,
Expand All @@ -239,18 +270,17 @@ func (i *otelInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc
if err != nil {
span.SetStatus(codes.Error, err.Error())
}
span.SetAttributes(state.attributes...)
instruments.requestsPerRPC.Record(
ctx,
state.receivedCounter,
metric.WithAttributes(state.attributes...))
instruments.responsesPerRPC.Record(
ctx,
state.sentCounter,
metric.WithAttributes(state.attributes...))
instruments.duration.Record(ctx,
time.Since(requestStartTime).Milliseconds(),
metric.WithAttributes(state.attributes...))
duration := time.Since(requestStartTime).Milliseconds()
span.SetAttributes(append(state.attributes,
attribute.Int64(otelRPCRequestsPerRPCMetricName, state.receivedCounter),
attribute.Int64(otelRPCResponsesPerRPCMetricName, state.sentCounter),
attribute.Int64(otelRPCDurationMetricName, duration),
otelMessageSentSizesAttr.Int64Slice(state.sendSizes),
otelMessageReceivedSizesAttr.Int64Slice(state.receiveSizes),
)...)
instruments.requestsPerRPC.Record(ctx, state.receivedCounter, metric.WithAttributes(state.attributes...))
instruments.responsesPerRPC.Record(ctx, state.sentCounter, metric.WithAttributes(state.attributes...))
instruments.duration.Record(ctx, duration, metric.WithAttributes(state.attributes...))
return err
}
}
Expand All @@ -273,23 +303,23 @@ type instrumentation struct {
}

func createInstruments(meter metric.Meter) instrumentation {
duration, err := meter.Int64Histogram("duration", metric.WithUnit("ms"))
duration, err := meter.Int64Histogram(otelRPCDurationMetricName, metric.WithUnit("ms"), metric.WithDescription("Duration of the RPC call"))
if err != nil {
panic(fmt.Errorf("failed to create duration metric: %w", err))
}
requestSize, err := meter.Int64Histogram("request.size", metric.WithUnit("By"))
requestSize, err := meter.Int64Histogram(otelRPCRequestSizeMetricName, metric.WithUnit("By"), metric.WithDescription("Size of the request payload"))
if err != nil {
panic(fmt.Errorf("failed to create request size metric: %w", err))
}
responseSize, err := meter.Int64Histogram("response.size", metric.WithUnit("By"))
responseSize, err := meter.Int64Histogram(otelRPCResponseSizeMetricName, metric.WithUnit("By"), metric.WithDescription("Size of the response payload"))
if err != nil {
panic(fmt.Errorf("failed to create response size metric: %w", err))
}
requestsPerRPC, err := meter.Int64Histogram("requests_per_rpc", metric.WithUnit("1"))
requestsPerRPC, err := meter.Int64Histogram(otelRPCRequestsPerRPCMetricName, metric.WithUnit("1"), metric.WithDescription("Number of requests made in the RPC call"))
if err != nil {
panic(fmt.Errorf("failed to create requests per rpc metric: %w", err))
}
responsesPerRPC, err := meter.Int64Histogram("responses_per_rpc", metric.WithUnit("1"))
responsesPerRPC, err := meter.Int64Histogram(otelRPCResponsesPerRPCMetricName, metric.WithUnit("1"), metric.WithDescription("Number of responses received in the RPC call"))
if err != nil {
panic(fmt.Errorf("failed to create responses per rpc metric: %w", err))
}
Expand All @@ -313,15 +343,17 @@ func statusCodeAttribute(protocol string, err error) attribute.KeyValue {

// streamingState stores the ongoing metrics for streaming interceptors.
type streamingState struct {
mu sync.Mutex
spec connect.Spec
protocol string
attributes []attribute.KeyValue
error error
sentCounter int64
receivedCounter int64
receiveSize metric.Int64Histogram
sendSize metric.Int64Histogram
mu sync.Mutex
spec connect.Spec
protocol string
attributes []attribute.KeyValue
error error
sentCounter int64
receivedCounter int64
receiveSizeMetric metric.Int64Histogram
sendSizeMetric metric.Int64Histogram
receiveSizes []int64
sendSizes []int64
}

// streamingSenderReceiver encapsulates either a StreamingClientConn or a StreamingHandlerConn.
Expand All @@ -337,23 +369,24 @@ func (s *streamingState) receive(ctx context.Context, msg any, conn streamingSen
if errors.Is(err, io.EOF) {
return err // nolint:wrapcheck
}
s.receivedCounter++
if err != nil {
s.error = err
s.attributes = append(s.attributes, statusCodeAttribute(s.protocol, err))
}
s.receivedCounter++
attrs := append(s.attributes, []attribute.KeyValue{ // nolint:gocritic
otelMessageTypeAttr.String(otelMessageTypeReceived),
otelMessageIDAttr.Int64(s.receivedCounter),
otelMessageEventTypeAttr.String(otelMessageEventTypeReceived),
otelMessageEventIDAttr.Int64(s.receivedCounter),
}...)
if protomsg, ok := msg.(proto.Message); ok {
size := proto.Size(protomsg)
attrs = append(attrs, otelMessageSizeAttr.Int(size))
s.receiveSize.Record(ctx, int64(size), metric.WithAttributes(attrs...))
attrs = append(attrs, otelMessageEventSizeAttr.Int(size))
s.receiveSizes = append(s.receiveSizes, int64(size))
s.receiveSizeMetric.Record(ctx, int64(size), metric.WithAttributes(attrs...))
}

span := trace.SpanFromContext(ctx)
span.AddEvent(otelMessageEvent, trace.WithAttributes(attrs...))
span.AddEvent(otelMessageEventName, trace.WithAttributes(attrs...))
return err // nolint:wrapcheck
}

Expand All @@ -370,17 +403,18 @@ func (s *streamingState) send(ctx context.Context, msg any, conn streamingSender
s.attributes = append(s.attributes, statusCodeAttribute(s.protocol, err))
}
attrs := append(s.attributes, []attribute.KeyValue{ // nolint:gocritic
otelMessageTypeAttr.String(otelMessageTypeSent),
otelMessageIDAttr.Int64(s.sentCounter),
otelMessageEventTypeAttr.String(otelMessageEventTypeSent),
otelMessageEventIDAttr.Int64(s.sentCounter),
}...)
if protomsg, ok := msg.(proto.Message); ok {
size := proto.Size(protomsg)
attrs = append(attrs, otelMessageSizeAttr.Int(size))
s.sendSize.Record(ctx, int64(size), metric.WithAttributes(attrs...))
attrs = append(attrs, otelMessageEventSizeAttr.Int(size))
s.sendSizes = append(s.sendSizes, int64(size))
s.sendSizeMetric.Record(ctx, int64(size), metric.WithAttributes(attrs...))
}

span := trace.SpanFromContext(ctx)
span.AddEvent(otelMessageEvent, trace.WithAttributes(attrs...))
span.AddEvent(otelMessageEventName, trace.WithAttributes(attrs...))
return err // nolint:wrapcheck
}

Expand Down

0 comments on commit 0777796

Please sign in to comment.