From 9758b985cb0e1b1cef147ed638bd95ee894e8dd3 Mon Sep 17 00:00:00 2001 From: Shawn Poulson Date: Fri, 29 Sep 2023 11:40:16 -0400 Subject: [PATCH] Reduce tracing detail. --- algorithms.go | 26 ++++++++++---------------- gubernator.go | 33 ++++++++++++--------------------- peer_client.go | 50 +++++++++++++++----------------------------------- 3 files changed, 37 insertions(+), 72 deletions(-) diff --git a/algorithms.go b/algorithms.go index 8a5442d1..42826c70 100644 --- a/algorithms.go +++ b/algorithms.go @@ -20,7 +20,6 @@ import ( "context" "github.com/mailgun/holster/v4/clock" - "github.com/mailgun/holster/v4/tracing" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/attribute" @@ -29,9 +28,6 @@ import ( // Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) { - ctx = tracing.StartNamedScopeDebug(ctx, "tokenBucket") - defer func() { tracing.EndScope(ctx, err) }() - span := trace.SpanFromContext(ctx) tokenBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("tokenBucket")) defer tokenBucketTimer.ObserveDuration() @@ -52,6 +48,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * if ok { if item.Value == nil { msgPart := "tokenBucket: Invalid cache item; Value is nil" + span := trace.SpanFromContext(ctx) span.AddEvent(msgPart, trace.WithAttributes( attribute.String("hashKey", hashKey), attribute.String("key", r.UniqueKey), @@ -61,6 +58,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * ok = false } else if item.Key != hashKey { msgPart := "tokenBucket: Invalid cache item; key mismatch" + span := trace.SpanFromContext(ctx) span.AddEvent(msgPart, trace.WithAttributes( attribute.String("itemKey", item.Key), attribute.String("hashKey", hashKey), @@ -95,6 +93,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * t, ok := item.Value.(*TokenBucketItem) if !ok { // Client switched algorithms; perhaps due to a migration? + span := trace.SpanFromContext(ctx) span.AddEvent("Client switched algorithms; perhaps due to a migration?") c.Remove(hashKey) @@ -125,6 +124,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * // If the duration config changed, update the new ExpireAt. if t.Duration != r.Duration { + span := trace.SpanFromContext(ctx) span.AddEvent("Duration changed") expire := t.CreatedAt + r.Duration if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) { @@ -163,6 +163,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * // If we are already at the limit. if rl.Remaining == 0 && r.Hits > 0 { + span := trace.SpanFromContext(ctx) span.AddEvent("Already over the limit") metricOverLimitCounter.Add(1) rl.Status = Status_OVER_LIMIT @@ -172,6 +173,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * // If requested hits takes the remainder. if t.Remaining == r.Hits { + span := trace.SpanFromContext(ctx) span.AddEvent("At the limit") t.Remaining = 0 rl.Remaining = 0 @@ -181,13 +183,13 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * // If requested is more than available, then return over the limit // without updating the cache. if r.Hits > t.Remaining { + span := trace.SpanFromContext(ctx) span.AddEvent("Over the limit") metricOverLimitCounter.Add(1) rl.Status = Status_OVER_LIMIT return rl, nil } - span.AddEvent("Under the limit") t.Remaining -= r.Hits rl.Remaining = t.Remaining return rl, nil @@ -199,10 +201,6 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * // Called by tokenBucket() when adding a new item in the store. func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) { - ctx = tracing.StartNamedScopeDebug(ctx, "tokenBucketNewItem") - defer func() { tracing.EndScope(ctx, err) }() - span := trace.SpanFromContext(ctx) - now := MillisecondNow() expire := now + r.Duration @@ -237,6 +235,7 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) // Client could be requesting that we always return OVER_LIMIT. if r.Hits > r.Limit { + span := trace.SpanFromContext(ctx) span.AddEvent("Over the limit") metricOverLimitCounter.Add(1) rl.Status = Status_OVER_LIMIT @@ -255,10 +254,6 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) // Implements leaky bucket algorithm for rate limiting https://en.wikipedia.org/wiki/Leaky_bucket func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) { - ctx = tracing.StartNamedScopeDebug(ctx, "leakyBucket") - defer func() { tracing.EndScope(ctx, err) }() - span := trace.SpanFromContext(ctx) - leakyBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getRateLimit_leakyBucket")) defer leakyBucketTimer.ObserveDuration() @@ -284,6 +279,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * if ok { if item.Value == nil { msgPart := "leakyBucket: Invalid cache item; Value is nil" + span := trace.SpanFromContext(ctx) span.AddEvent(msgPart, trace.WithAttributes( attribute.String("hashKey", hashKey), attribute.String("key", r.UniqueKey), @@ -293,6 +289,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * ok = false } else if item.Key != hashKey { msgPart := "leakyBucket: Invalid cache item; key mismatch" + span := trace.SpanFromContext(ctx) span.AddEvent(msgPart, trace.WithAttributes( attribute.String("itemKey", item.Key), attribute.String("hashKey", hashKey), @@ -425,9 +422,6 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * // Called by leakyBucket() when adding a new item in the store. func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) { - ctx = tracing.StartNamedScopeDebug(ctx, "leakyBucketNewItem") - defer func() { tracing.EndScope(ctx, err) }() - now := MillisecondNow() duration := r.Duration rate := float64(duration) / float64(r.Limit) diff --git a/gubernator.go b/gubernator.go index 934f1fd2..eb58ef12 100644 --- a/gubernator.go +++ b/gubernator.go @@ -115,9 +115,7 @@ var metricBatchSendDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ // NewV1Instance instantiate a single instance of a gubernator peer and register this // instance with the provided GRPCServer. func NewV1Instance(conf Config) (s *V1Instance, err error) { - ctx := tracing.StartNamedScopeDebug(context.Background(), "gubernator.NewV1Instance") - defer func() { tracing.EndScope(ctx, err) }() - + ctx := context.Background() if conf.GRPCServers == nil { return nil, errors.New("at least one GRPCServer instance is required") } @@ -213,21 +211,21 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G // For each item in the request body for i, req := range r.Requests { - _ = tracing.CallNamedScopeDebug(ctx, "Iterate requests", func(ctx context.Context) error { - key := req.Name + "_" + req.UniqueKey - var peer *PeerClient - var err error + key := req.Name + "_" + req.UniqueKey + var peer *PeerClient + var err error + for { if len(req.UniqueKey) == 0 { metricCheckErrorCounter.WithLabelValues("Invalid request").Add(1) resp.Responses[i] = &RateLimitResp{Error: "field 'unique_key' cannot be empty"} - return nil + break } if len(req.Name) == 0 { metricCheckErrorCounter.WithLabelValues("Invalid request").Add(1) resp.Responses[i] = &RateLimitResp{Error: "field 'namespace' cannot be empty"} - return nil + break } if ctx.Err() != nil { @@ -236,7 +234,7 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G resp.Responses[i] = &RateLimitResp{ Error: err.Error(), } - return nil + break } peer, err = s.GetPeer(ctx, key) @@ -246,7 +244,7 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G resp.Responses[i] = &RateLimitResp{ Error: err.Error(), } - return nil + break } // If our server instance is the owner of this rate limit @@ -272,7 +270,7 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G // Inform the client of the owner key of the key resp.Responses[i].Metadata = map[string]string{"owner": peer.Info().GRPCAddress} - return nil + break } // Request must be forwarded to peer that owns the key. @@ -287,9 +285,8 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G Idx: i, }) } - - return nil - }) + break + } } // Wait for any async responses if any @@ -577,12 +574,6 @@ func (s *V1Instance) getLocalRateLimit(ctx context.Context, r *RateLimitReq) (*R span.SetAttributes( attribute.String("request.key", r.UniqueKey), attribute.String("request.name", r.Name), - attribute.Int64("request.algorithm", int64(r.Algorithm)), - attribute.Int64("request.behavior", int64(r.Behavior)), - attribute.Int64("request.duration", r.Duration), - attribute.Int64("request.limit", r.Limit), - attribute.Int64("request.hits", r.Hits), - attribute.Int64("request.burst", r.Burst), ) defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getLocalRateLimit")).ObserveDuration() diff --git a/peer_client.go b/peer_client.go index c7116c30..99d15787 100644 --- a/peer_client.go +++ b/peer_client.go @@ -96,9 +96,6 @@ func NewPeerClient(conf PeerConfig) *PeerClient { // Connect establishes a GRPC connection to a peer func (c *PeerClient) connect(ctx context.Context) (err error) { - ctx = tracing.StartScopeDebug(ctx) - defer func() { tracing.EndScope(ctx, err) }() - // NOTE: To future self, this mutex is used here because we need to know if the peer is disconnecting and // handle ErrClosing. Since this mutex MUST be here we take this opportunity to also see if we are connected. // Doing this here encapsulates managing the connected state to the PeerClient struct. Previously a PeerClient @@ -172,21 +169,19 @@ func (c *PeerClient) Info() PeerInfo { // GetPeerRateLimit forwards a rate limit request to a peer. If the rate limit has `behavior == BATCHING` configured, // this method will attempt to batch the rate limits func (c *PeerClient) GetPeerRateLimit(ctx context.Context, r *RateLimitReq) (resp *RateLimitResp, err error) { - ctx = tracing.StartNamedScope(ctx, "PeerClient.GetPeerRateLimit") - defer func() { tracing.EndScope(ctx, err) }() span := trace.SpanFromContext(ctx) span.SetAttributes( - attribute.String("peer.GRPCAddress", c.conf.Info.GRPCAddress), - attribute.String("peer.HTTPAddress", c.conf.Info.HTTPAddress), - attribute.String("peer.Datacenter", c.conf.Info.DataCenter), + // attribute.String("peer.GRPCAddress", c.conf.Info.GRPCAddress), + // attribute.String("peer.HTTPAddress", c.conf.Info.HTTPAddress), + // attribute.String("peer.Datacenter", c.conf.Info.DataCenter), attribute.String("request.key", r.UniqueKey), attribute.String("request.name", r.Name), - attribute.Int64("request.algorithm", int64(r.Algorithm)), - attribute.Int64("request.behavior", int64(r.Behavior)), - attribute.Int64("request.duration", r.Duration), - attribute.Int64("request.limit", r.Limit), - attribute.Int64("request.hits", r.Hits), - attribute.Int64("request.burst", r.Burst), + // attribute.Int64("request.algorithm", int64(r.Algorithm)), + // attribute.Int64("request.behavior", int64(r.Behavior)), + // attribute.Int64("request.duration", r.Duration), + // attribute.Int64("request.limit", r.Limit), + // attribute.Int64("request.hits", r.Hits), + // attribute.Int64("request.burst", r.Burst), ) // If config asked for no batching @@ -222,9 +217,6 @@ func (c *PeerClient) GetPeerRateLimit(ctx context.Context, r *RateLimitReq) (res // GetPeerRateLimits requests a list of rate limit statuses from a peer func (c *PeerClient) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimitsReq) (resp *GetPeerRateLimitsResp, err error) { - ctx = tracing.StartNamedScopeDebug(ctx, "PeerClient.GetPeerRateLimits") - defer func() { tracing.EndScope(ctx, err) }() - if err := c.connect(ctx); err != nil { err = errors.Wrap(err, "Error in connect") metricCheckErrorCounter.WithLabelValues("Connect error").Add(1) @@ -259,9 +251,6 @@ func (c *PeerClient) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimits // UpdatePeerGlobals sends global rate limit status updates to a peer func (c *PeerClient) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobalsReq) (resp *UpdatePeerGlobalsResp, err error) { - ctx = tracing.StartNamedScope(ctx, "PeerClient.UpdatePeerGlobals") - defer func() { tracing.EndScope(ctx, err) }() - if err := c.connect(ctx); err != nil { return nil, c.setLastErr(err) } @@ -314,9 +303,6 @@ func (c *PeerClient) GetLastErr() []string { } func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq) (resp *RateLimitResp, err error) { - ctx = tracing.StartNamedScopeDebug(ctx, "PeerClient.getPeerRateLimitsBatch") - defer func() { tracing.EndScope(ctx, err) }() - funcTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("PeerClient.getPeerRateLimitsBatch")) defer funcTimer.ObserveDuration() @@ -333,12 +319,9 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq } // Wait for a response or context cancel - ctx2 := tracing.StartNamedScopeDebug(ctx, "Wait for response") - defer tracing.EndScope(ctx2, nil) - req := request{ resp: make(chan *response, 1), - ctx: ctx2, + ctx: ctx, request: r, } @@ -349,8 +332,8 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq select { case c.queue <- &req: // Successfully enqueued request. - case <-ctx2.Done(): - return nil, errors.Wrap(ctx2.Err(), "Context error while enqueuing request") + case <-ctx.Done(): + return nil, errors.Wrap(ctx.Err(), "Context error while enqueuing request") } c.wg.Add(1) @@ -366,8 +349,8 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq return nil, c.setLastErr(err) } return re.rl, nil - case <-ctx2.Done(): - return nil, errors.Wrap(ctx2.Err(), "Context error while waiting for response") + case <-ctx.Done(): + return nil, errors.Wrap(ctx.Err(), "Context error while waiting for response") } } @@ -432,9 +415,6 @@ func (c *PeerClient) runBatch() { // sendBatch sends the queue provided and returns the responses to // waiting go routines func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) { - ctx = tracing.StartNamedScopeDebug(ctx, "PeerClient.sendBatch") - defer tracing.EndScope(ctx, nil) - batchSendTimer := prometheus.NewTimer(metricBatchSendDuration.WithLabelValues(c.conf.Info.GRPCAddress)) defer batchSendTimer.ObserveDuration() funcTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("PeerClient.sendBatch")) @@ -444,7 +424,7 @@ func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) { for _, r := range queue { // NOTE: This trace has the same name because it's in a separate trace than the one above. // We link the two traces, so we can relate our rate limit trace back to the above trace. - r.ctx = tracing.StartNamedScopeDebug(r.ctx, "PeerClient.sendBatch", + r.ctx = tracing.StartNamedScope(r.ctx, "PeerClient.sendBatch", trace.WithLinks(trace.LinkFromContext(ctx))) // If no metadata is provided if r.request.Metadata == nil {