Skip to content

Commit

Permalink
feat(otel): add opentelemety utility functions
Browse files Browse the repository at this point in the history
This PR extracts opentelemetry utility functions from my private project
and adds them to this project without calling them. It resolves rabbitmq#43

I'd like a broader discussion about whether these should be
automatically called by the library where possible, or if they should
simply be provided to clients to use if they so wish.

I did my best to follow OpenTelemetry semantic conventions as described
here
https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/,
but they are at times ambiguous for rabbitmq-- e.g. is the destination
for a message the Queue or the Consumer Tag the message was delivered to.

Given the channel based approaches of this library, it is impossible for
the library to know the full execution of a consumer. Unless
autoack=false, we cannot actually know when to end the span associated
with a delivery, so at least in the consumer case, it's probably best to
allow the client to manage spans for themselves.

We *can* manage spans on the producer side, and at the very least
extract span identifiers to include on published headers automatically,
and provide utilities for pulling them back out again.

My intention with putting this PR up is to move the conversation
forward. Because the PR *only* provides private methods (if I left
members public please call them out), it can be safely merged while
these questions are worked out.
  • Loading branch information
AndrewWinterman committed Jun 28, 2024
1 parent 6867443 commit d537aee
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 3 deletions.
17 changes: 15 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
module github.com/rabbitmq/amqp091-go

go 1.20
go 1.21

require go.uber.org/goleak v1.3.0
toolchain go1.22.0

require (
github.com/getoutreach/gobox v1.92.1
go.opentelemetry.io/otel v1.27.0
go.opentelemetry.io/otel/trace v1.27.0
go.uber.org/goleak v1.3.0
)

require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
go.opentelemetry.io/otel/metric v1.27.0 // indirect
)
23 changes: 22 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/getoutreach/gobox v1.92.1 h1:MBDedZCUN+ef/ljBHAOSyVisqvR5dPlSwso1JdMPbXw=
github.com/getoutreach/gobox v1.92.1/go.mod h1:IPy+RNuOYRMTizH6iTr33myGKcRhjEIIHS2VMqzZL0A=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg=
go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ=
go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik=
go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak=
go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw=
go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
196 changes: 196 additions & 0 deletions opentelemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package amqp091

import (
"context"
"fmt"

"github.com/getoutreach/gobox/pkg/app"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
"go.opentelemetry.io/otel/trace"
)

// tracer is the tracer used by the package
var tracer = otel.Tracer("amqp091")

// amqpHeaderCarrier is a carrier for AMQP headers.
type amqpHeaderCarrier Table

// Get returns the value associated with the passed key.
func (c amqpHeaderCarrier) Get(key string) string {
v, ok := c[key]
if !ok {
return ""
}
s, ok := v.(string)
if ok {
return s
}
return ""
}

// Set stores the key-value pair.
func (c amqpHeaderCarrier) Set(key, value string) {
c[key] = value
}

// Keys lists the keys stored in this carrier.
func (c amqpHeaderCarrier) Keys() []string {
keys := []string{}
for k, v := range c {
if _, ok := v.(string); !ok {
continue
}
keys = append(keys, k)
}
return keys
}

// ensure amqpHeaderCarrier implements the TextMapCarrier interface
var _ propagation.TextMapCarrier = amqpHeaderCarrier{}

// InjectSpan injects the span context into the AMQP headers.
// It returns the input headers with the span headers added.
func injectSpanFromContext(ctx context.Context, headers Table) Table {
carrier := amqpHeaderCarrier(headers)
if carrier == nil {
carrier = amqpHeaderCarrier{}
}
otel.GetTextMapPropagator().Inject(ctx, carrier)
return Table(carrier)
}

// ExtractSpanContext extracts the span context from the AMQP headers.
func ExtractSpanContext(ctx context.Context, headers Table) context.Context {
carrier := amqpHeaderCarrier(headers)
if carrier == nil {
carrier = amqpHeaderCarrier{}
}
return otel.GetTextMapPropagator().Extract(ctx, carrier)
}

// extractSpanFromReturn creates a span for a returned message
func extractSpanFromReturn(
ctx context.Context,
ret Return,
) (context.Context, trace.Span) {
spctx := ExtractSpanContext(ctx, ret.Headers)
spanName := fmt.Sprintf("%s return", ret.RoutingKey)
return tracer.Start(ctx, spanName,
trace.WithLinks(trace.LinkFromContext(spctx, semconv.MessagingMessageID(ret.MessageId))),
trace.WithSpanKind(trace.SpanKindProducer),
trace.WithAttributes(
semconv.MessagingRabbitmqDestinationRoutingKey(ret.RoutingKey),
semconv.MessagingDestinationPublishName(ret.Exchange),
semconv.MessagingOperationKey.String("return"),
semconv.MessagingMessageID(ret.MessageId),
semconv.MessagingMessageConversationID(ret.CorrelationId),
semconv.MessagingSystemRabbitmq,
semconv.MessagingClientIDKey.String(app.Info().Name),
semconv.ErrorTypeKey.String(ret.ReplyText),
// semconv.NetPeerPort(5672
// semconv.NetPeerIP("localhost")
// semconv.ServerAddress("localhost")
),
trace.WithNewRoot(),
)
}

// settleDelivery creates a span for the acking of a delivery
func settleDelivery(
ctx context.Context,
consumerTag string,
multiple, requeue bool,
) (context.Context, trace.Span) {
return tracer.Start(ctx,
fmt.Sprintf("%s settle", consumerTag),
trace.WithAttributes(
attribute.Bool("multiple", multiple),
attribute.Bool("requeue", requeue)))
}

// extractLinkFromDelivery creates a link for a delivered message
//
// The recommend way to link a consumer to the publisher is with a link, since
// the two operations can be quit far apart in time. If you have a usecase
// where you would like the spans to have a parent child relationship instead, use
// ExtractSpanContext
//
// The consumer span may containe 1 or more messages, which is why we don't
// manufacture the span in its entirety here.
func extractLinkFromDelivery(ctx context.Context, del Delivery) trace.Link {
spctx := ExtractSpanContext(ctx, del.Headers)
return trace.LinkFromContext(spctx, semconv.MessagingMessageID(del.MessageId))
}

// spanForDelivery creates a span for the delivered messages
// returns a new context with the span headers and the span
func spanForDelivery(
ctx context.Context,
consumerTag string,
delivery []Delivery,
options ...trace.SpanStartOption,
) (context.Context, trace.Span) {
spanName := fmt.Sprintf("%s consume", consumerTag)
links := []trace.Link{}
for _, del := range delivery {
links = append(links, extractLinkFromDelivery(ctx, del))
}
return tracer.Start(
ctx,
spanName,
append(
options,
trace.WithLinks(links...),
trace.WithSpanKind(trace.SpanKindConsumer),
)...,
)
}

// Publish creates a span for a publishing message returns a new context with
// the span headers, the mssage that was being published with span headers
// injected, and a function to be called with the result of the publish
func spanForPublication(
ctx context.Context,
publishing Publishing,
exchange, routinKey string,
immediate bool,
) (context.Context, Publishing, func(err error, typ string)) {
spanName := fmt.Sprintf("%s publish", routinKey)
ctx, span := tracer.Start(ctx, spanName,
trace.WithSpanKind(trace.SpanKindProducer),
trace.WithAttributes(
semconv.MessagingRabbitmqDestinationRoutingKey(routinKey),
semconv.MessagingDestinationPublishName(exchange),
semconv.MessagingOperationPublish,
semconv.MessagingMessageID(publishing.MessageId),
semconv.MessagingMessageConversationID(publishing.CorrelationId),
semconv.MessagingSystemRabbitmq,
semconv.MessagingClientIDKey.String(app.Info().Name),
semconv.MessagingMessageBodySize(len(publishing.Body)),
semconv.MessageTypeSent,
attribute.Bool("messaging.immediate", immediate),

// TODO(AWinterman): Add these attributes
// semconv.NetPeerPort(5672) // nolint:gocritic // Why: see to do
// semconv.NetworkPeerAddress() // nolint:gocritic // Why: see to do
// semconv.NetPeerPort() // nolint:gocritic // Why: see to do
),
)
headers := injectSpanFromContext(ctx, publishing.Headers)
publishing.Headers = Table(headers)

return ctx, publishing, func(err error, typ string) {
if err != nil {
span.RecordError(err)
span.SetAttributes(
semconv.ErrorTypeKey.String(typ),
)
span.SetStatus(codes.Error, err.Error())
}
span.End()
}
}

0 comments on commit d537aee

Please sign in to comment.