Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Reduce tracing detail.
Browse files Browse the repository at this point in the history
  • Loading branch information
Baliedge committed Sep 29, 2023
1 parent f6046d1 commit 9758b98
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 72 deletions.
26 changes: 10 additions & 16 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"

"github.com/mailgun/holster/v4/clock"
"github.com/mailgun/holster/v4/tracing"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -29,9 +28,6 @@ import (

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartNamedScopeDebug(ctx, "tokenBucket")
defer func() { tracing.EndScope(ctx, err) }()
span := trace.SpanFromContext(ctx)

tokenBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("tokenBucket"))
defer tokenBucketTimer.ObserveDuration()
Expand All @@ -52,6 +48,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
if ok {
if item.Value == nil {
msgPart := "tokenBucket: Invalid cache item; Value is nil"
span := trace.SpanFromContext(ctx)
span.AddEvent(msgPart, trace.WithAttributes(
attribute.String("hashKey", hashKey),
attribute.String("key", r.UniqueKey),
Expand All @@ -61,6 +58,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
ok = false
} else if item.Key != hashKey {
msgPart := "tokenBucket: Invalid cache item; key mismatch"
span := trace.SpanFromContext(ctx)
span.AddEvent(msgPart, trace.WithAttributes(
attribute.String("itemKey", item.Key),
attribute.String("hashKey", hashKey),
Expand Down Expand Up @@ -95,6 +93,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
t, ok := item.Value.(*TokenBucketItem)
if !ok {
// Client switched algorithms; perhaps due to a migration?
span := trace.SpanFromContext(ctx)
span.AddEvent("Client switched algorithms; perhaps due to a migration?")

c.Remove(hashKey)
Expand Down Expand Up @@ -125,6 +124,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// If the duration config changed, update the new ExpireAt.
if t.Duration != r.Duration {
span := trace.SpanFromContext(ctx)
span.AddEvent("Duration changed")
expire := t.CreatedAt + r.Duration
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
Expand Down Expand Up @@ -163,6 +163,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// If we are already at the limit.
if rl.Remaining == 0 && r.Hits > 0 {
span := trace.SpanFromContext(ctx)
span.AddEvent("Already over the limit")
metricOverLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT
Expand All @@ -172,6 +173,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// If requested hits takes the remainder.
if t.Remaining == r.Hits {
span := trace.SpanFromContext(ctx)
span.AddEvent("At the limit")
t.Remaining = 0
rl.Remaining = 0
Expand All @@ -181,13 +183,13 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
// If requested is more than available, then return over the limit
// without updating the cache.
if r.Hits > t.Remaining {
span := trace.SpanFromContext(ctx)
span.AddEvent("Over the limit")
metricOverLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT
return rl, nil
}

span.AddEvent("Under the limit")
t.Remaining -= r.Hits
rl.Remaining = t.Remaining
return rl, nil
Expand All @@ -199,10 +201,6 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// Called by tokenBucket() when adding a new item in the store.
func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartNamedScopeDebug(ctx, "tokenBucketNewItem")
defer func() { tracing.EndScope(ctx, err) }()
span := trace.SpanFromContext(ctx)

now := MillisecondNow()
expire := now + r.Duration

Expand Down Expand Up @@ -237,6 +235,7 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)

// Client could be requesting that we always return OVER_LIMIT.
if r.Hits > r.Limit {
span := trace.SpanFromContext(ctx)
span.AddEvent("Over the limit")
metricOverLimitCounter.Add(1)
rl.Status = Status_OVER_LIMIT
Expand All @@ -255,10 +254,6 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)

// Implements leaky bucket algorithm for rate limiting https://en.wikipedia.org/wiki/Leaky_bucket
func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartNamedScopeDebug(ctx, "leakyBucket")
defer func() { tracing.EndScope(ctx, err) }()
span := trace.SpanFromContext(ctx)

leakyBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getRateLimit_leakyBucket"))
defer leakyBucketTimer.ObserveDuration()

Expand All @@ -284,6 +279,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
if ok {
if item.Value == nil {
msgPart := "leakyBucket: Invalid cache item; Value is nil"
span := trace.SpanFromContext(ctx)
span.AddEvent(msgPart, trace.WithAttributes(
attribute.String("hashKey", hashKey),
attribute.String("key", r.UniqueKey),
Expand All @@ -293,6 +289,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
ok = false
} else if item.Key != hashKey {
msgPart := "leakyBucket: Invalid cache item; key mismatch"
span := trace.SpanFromContext(ctx)
span.AddEvent(msgPart, trace.WithAttributes(
attribute.String("itemKey", item.Key),
attribute.String("hashKey", hashKey),
Expand Down Expand Up @@ -425,9 +422,6 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// Called by leakyBucket() when adding a new item in the store.
func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartNamedScopeDebug(ctx, "leakyBucketNewItem")
defer func() { tracing.EndScope(ctx, err) }()

now := MillisecondNow()
duration := r.Duration
rate := float64(duration) / float64(r.Limit)
Expand Down
33 changes: 12 additions & 21 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ var metricBatchSendDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
// NewV1Instance instantiate a single instance of a gubernator peer and register this
// instance with the provided GRPCServer.
func NewV1Instance(conf Config) (s *V1Instance, err error) {
ctx := tracing.StartNamedScopeDebug(context.Background(), "gubernator.NewV1Instance")
defer func() { tracing.EndScope(ctx, err) }()

ctx := context.Background()
if conf.GRPCServers == nil {
return nil, errors.New("at least one GRPCServer instance is required")
}
Expand Down Expand Up @@ -213,21 +211,21 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G

// For each item in the request body
for i, req := range r.Requests {
_ = tracing.CallNamedScopeDebug(ctx, "Iterate requests", func(ctx context.Context) error {
key := req.Name + "_" + req.UniqueKey
var peer *PeerClient
var err error
key := req.Name + "_" + req.UniqueKey
var peer *PeerClient
var err error

for {
if len(req.UniqueKey) == 0 {
metricCheckErrorCounter.WithLabelValues("Invalid request").Add(1)
resp.Responses[i] = &RateLimitResp{Error: "field 'unique_key' cannot be empty"}
return nil
break
}

if len(req.Name) == 0 {
metricCheckErrorCounter.WithLabelValues("Invalid request").Add(1)
resp.Responses[i] = &RateLimitResp{Error: "field 'namespace' cannot be empty"}
return nil
break
}

if ctx.Err() != nil {
Expand All @@ -236,7 +234,7 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G
resp.Responses[i] = &RateLimitResp{
Error: err.Error(),
}
return nil
break
}

peer, err = s.GetPeer(ctx, key)
Expand All @@ -246,7 +244,7 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G
resp.Responses[i] = &RateLimitResp{
Error: err.Error(),
}
return nil
break
}

// If our server instance is the owner of this rate limit
Expand All @@ -272,7 +270,7 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G

// Inform the client of the owner key of the key
resp.Responses[i].Metadata = map[string]string{"owner": peer.Info().GRPCAddress}
return nil
break
}

// Request must be forwarded to peer that owns the key.
Expand All @@ -287,9 +285,8 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G
Idx: i,
})
}

return nil
})
break
}
}

// Wait for any async responses if any
Expand Down Expand Up @@ -577,12 +574,6 @@ func (s *V1Instance) getLocalRateLimit(ctx context.Context, r *RateLimitReq) (*R
span.SetAttributes(
attribute.String("request.key", r.UniqueKey),
attribute.String("request.name", r.Name),
attribute.Int64("request.algorithm", int64(r.Algorithm)),
attribute.Int64("request.behavior", int64(r.Behavior)),
attribute.Int64("request.duration", r.Duration),
attribute.Int64("request.limit", r.Limit),
attribute.Int64("request.hits", r.Hits),
attribute.Int64("request.burst", r.Burst),
)

defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getLocalRateLimit")).ObserveDuration()
Expand Down
50 changes: 15 additions & 35 deletions peer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ func NewPeerClient(conf PeerConfig) *PeerClient {

// Connect establishes a GRPC connection to a peer
func (c *PeerClient) connect(ctx context.Context) (err error) {
ctx = tracing.StartScopeDebug(ctx)
defer func() { tracing.EndScope(ctx, err) }()

// NOTE: To future self, this mutex is used here because we need to know if the peer is disconnecting and
// handle ErrClosing. Since this mutex MUST be here we take this opportunity to also see if we are connected.
// Doing this here encapsulates managing the connected state to the PeerClient struct. Previously a PeerClient
Expand Down Expand Up @@ -172,21 +169,19 @@ func (c *PeerClient) Info() PeerInfo {
// GetPeerRateLimit forwards a rate limit request to a peer. If the rate limit has `behavior == BATCHING` configured,
// this method will attempt to batch the rate limits
func (c *PeerClient) GetPeerRateLimit(ctx context.Context, r *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartNamedScope(ctx, "PeerClient.GetPeerRateLimit")
defer func() { tracing.EndScope(ctx, err) }()
span := trace.SpanFromContext(ctx)
span.SetAttributes(
attribute.String("peer.GRPCAddress", c.conf.Info.GRPCAddress),
attribute.String("peer.HTTPAddress", c.conf.Info.HTTPAddress),
attribute.String("peer.Datacenter", c.conf.Info.DataCenter),
// attribute.String("peer.GRPCAddress", c.conf.Info.GRPCAddress),
// attribute.String("peer.HTTPAddress", c.conf.Info.HTTPAddress),
// attribute.String("peer.Datacenter", c.conf.Info.DataCenter),
attribute.String("request.key", r.UniqueKey),
attribute.String("request.name", r.Name),
attribute.Int64("request.algorithm", int64(r.Algorithm)),
attribute.Int64("request.behavior", int64(r.Behavior)),
attribute.Int64("request.duration", r.Duration),
attribute.Int64("request.limit", r.Limit),
attribute.Int64("request.hits", r.Hits),
attribute.Int64("request.burst", r.Burst),
// attribute.Int64("request.algorithm", int64(r.Algorithm)),
// attribute.Int64("request.behavior", int64(r.Behavior)),
// attribute.Int64("request.duration", r.Duration),
// attribute.Int64("request.limit", r.Limit),
// attribute.Int64("request.hits", r.Hits),
// attribute.Int64("request.burst", r.Burst),
)

// If config asked for no batching
Expand Down Expand Up @@ -222,9 +217,6 @@ func (c *PeerClient) GetPeerRateLimit(ctx context.Context, r *RateLimitReq) (res

// GetPeerRateLimits requests a list of rate limit statuses from a peer
func (c *PeerClient) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimitsReq) (resp *GetPeerRateLimitsResp, err error) {
ctx = tracing.StartNamedScopeDebug(ctx, "PeerClient.GetPeerRateLimits")
defer func() { tracing.EndScope(ctx, err) }()

if err := c.connect(ctx); err != nil {
err = errors.Wrap(err, "Error in connect")
metricCheckErrorCounter.WithLabelValues("Connect error").Add(1)
Expand Down Expand Up @@ -259,9 +251,6 @@ func (c *PeerClient) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimits

// UpdatePeerGlobals sends global rate limit status updates to a peer
func (c *PeerClient) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobalsReq) (resp *UpdatePeerGlobalsResp, err error) {
ctx = tracing.StartNamedScope(ctx, "PeerClient.UpdatePeerGlobals")
defer func() { tracing.EndScope(ctx, err) }()

if err := c.connect(ctx); err != nil {
return nil, c.setLastErr(err)
}
Expand Down Expand Up @@ -314,9 +303,6 @@ func (c *PeerClient) GetLastErr() []string {
}

func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartNamedScopeDebug(ctx, "PeerClient.getPeerRateLimitsBatch")
defer func() { tracing.EndScope(ctx, err) }()

funcTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("PeerClient.getPeerRateLimitsBatch"))
defer funcTimer.ObserveDuration()

Expand All @@ -333,12 +319,9 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq
}

// Wait for a response or context cancel
ctx2 := tracing.StartNamedScopeDebug(ctx, "Wait for response")
defer tracing.EndScope(ctx2, nil)

req := request{
resp: make(chan *response, 1),
ctx: ctx2,
ctx: ctx,
request: r,
}

Expand All @@ -349,8 +332,8 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq
select {
case c.queue <- &req:
// Successfully enqueued request.
case <-ctx2.Done():
return nil, errors.Wrap(ctx2.Err(), "Context error while enqueuing request")
case <-ctx.Done():
return nil, errors.Wrap(ctx.Err(), "Context error while enqueuing request")
}

c.wg.Add(1)
Expand All @@ -366,8 +349,8 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq
return nil, c.setLastErr(err)
}
return re.rl, nil
case <-ctx2.Done():
return nil, errors.Wrap(ctx2.Err(), "Context error while waiting for response")
case <-ctx.Done():
return nil, errors.Wrap(ctx.Err(), "Context error while waiting for response")
}
}

Expand Down Expand Up @@ -432,9 +415,6 @@ func (c *PeerClient) runBatch() {
// sendBatch sends the queue provided and returns the responses to
// waiting go routines
func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) {
ctx = tracing.StartNamedScopeDebug(ctx, "PeerClient.sendBatch")
defer tracing.EndScope(ctx, nil)

batchSendTimer := prometheus.NewTimer(metricBatchSendDuration.WithLabelValues(c.conf.Info.GRPCAddress))
defer batchSendTimer.ObserveDuration()
funcTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("PeerClient.sendBatch"))
Expand All @@ -444,7 +424,7 @@ func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) {
for _, r := range queue {
// NOTE: This trace has the same name because it's in a separate trace than the one above.
// We link the two traces, so we can relate our rate limit trace back to the above trace.
r.ctx = tracing.StartNamedScopeDebug(r.ctx, "PeerClient.sendBatch",
r.ctx = tracing.StartNamedScope(r.ctx, "PeerClient.sendBatch",
trace.WithLinks(trace.LinkFromContext(ctx)))
// If no metadata is provided
if r.request.Metadata == nil {
Expand Down

0 comments on commit 9758b98

Please sign in to comment.