Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Fix global mode
Browse files Browse the repository at this point in the history
  • Loading branch information
elbuo8 authored and miparnisari committed Feb 8, 2024
1 parent f95acfd commit c8c40c6
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 40 deletions.
9 changes: 5 additions & 4 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,16 +383,17 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

// If requested hits takes the remainder
if int64(b.Remaining) == r.Hits {
b.Remaining -= float64(r.Hits)
rl.Remaining = 0
b.Remaining = 0
rl.Remaining = int64(b.Remaining)
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
return rl, nil
}

// If requested is more than available, then return over the limit
// without updating the bucket.
// If requested is more than available, drain bucket in order to converge as everything is returning OVER_LIMIT.
if r.Hits > int64(b.Remaining) {
metricOverLimitCounter.Add(1)
b.Remaining = 0
rl.Remaining = int64(b.Remaining)
rl.Status = Status_OVER_LIMIT
return rl, nil
}
Expand Down
9 changes: 1 addition & 8 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,17 +940,10 @@ func TestGlobalRateLimitsPeerOverLimitLeaky(t *testing.T) {
assert.Equal(t, expectedStatus, resp.Responses[0].GetStatus())
}

// Send two hits that should be processed by the owner and the broadcast to peer, depleting the remaining
sendHit(guber.Status_UNDER_LIMIT, 1)
sendHit(guber.Status_UNDER_LIMIT, 1)
// Wait for the broadcast from the owner to the peer
time.Sleep(time.Second * 3)
// Since the peer must wait for the owner to say it's over the limit, this will return under the limit.
sendHit(guber.Status_UNDER_LIMIT, 1)
// Wait for the broadcast from the owner to the peer
time.Sleep(time.Second * 3)
// The status should now be OVER_LIMIT
sendHit(guber.Status_OVER_LIMIT, 0)
sendHit(guber.Status_OVER_LIMIT, 1)
}

func getMetricRequest(t testutil.TestingT, url string, name string) *model.Sample {
Expand Down
2 changes: 2 additions & 0 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) {
fan.Run(func(in interface{}) error {
p := in.(*pair)
ctx, cancel := context.WithTimeout(context.Background(), gm.conf.GlobalTimeout)
gm.log.Infof("calling owner of key: %s with hits: %d", p.req.Requests[0].UniqueKey, p.req.Requests[0].Hits)
_, err := p.client.GetPeerRateLimits(ctx, &p.req)
cancel()

Expand Down Expand Up @@ -233,6 +234,7 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
fan.Run(func(in interface{}) error {
peer := in.(*PeerClient)
ctx, cancel := context.WithTimeout(ctx, gm.conf.GlobalTimeout)
gm.log.Infof("calling peer of key: %s with hits: %d", req.Globals[0].Key, req.Globals[0].Status.Remaining)
_, err := peer.UpdatePeerGlobals(ctx, &req)
cancel()

Expand Down
60 changes: 32 additions & 28 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,35 +396,23 @@ func (s *V1Instance) getGlobalRateLimit(ctx context.Context, req *RateLimitReq)
tracing.EndScope(ctx, err)
}()

item, ok, err := s.workerPool.GetCacheItem(ctx, req.HashKey())
if err != nil {
countError(err, "Error in workerPool.GetCacheItem")
return nil, errors.Wrap(err, "during in workerPool.GetCacheItem")
}
if ok {
// Global rate limits are always stored as RateLimitResp regardless of algorithm
rl, ok := item.Value.(*RateLimitResp)
/*
item, ok, err := s.workerPool.GetCacheItem(ctx, req.HashKey())
if err != nil {
countError(err, "Error in workerPool.GetCacheItem")
return nil, errors.Wrap(err, "during in workerPool.GetCacheItem")
}
if ok {
// In the case we are not the owner, global behavior dictates that we respond with
// what ever the owner has broadcast to use as the response. However, in the case
// of TOKEN_BUCKET it makes little sense to wait for the owner to respond with OVER_LIMIT
// if we already know the remainder is 0. So we check for a remainder of 0 here and set
// OVER_LIMIT only if there are actual hits and this is not a RESET_REMAINING request and
// it's a TOKEN_BUCKET.
//
// We cannot preform this for LEAKY_BUCKET as we don't know how much time or what other requests
// might have influenced the leak rate at the owning peer.
// (Maybe we should preform the leak calculation here?????)
if rl.Remaining == 0 && req.Hits > 0 && !HasBehavior(req.Behavior, Behavior_RESET_REMAINING) &&
req.Algorithm == Algorithm_TOKEN_BUCKET {
rl.Status = Status_OVER_LIMIT
// Global rate limits are always stored as RateLimitResp regardless of algorithm
rl, ok := item.Value.(*RateLimitResp)
if ok {
return rl, nil
}
return rl, nil
// We get here if the owning node hasn't asynchronously forwarded it's updates to us yet and
// our cache still holds the rate limit we created on the first hit.
}
// We get here if the owning node hasn't asynchronously forwarded its updates to us yet and
// our cache still holds the rate limit we created on the first hit.
}

*/
cpy := proto.Clone(req).(*RateLimitReq)
cpy.Behavior = Behavior_NO_BATCHING

Expand All @@ -441,13 +429,29 @@ func (s *V1Instance) getGlobalRateLimit(ctx context.Context, req *RateLimitReq)
// UpdatePeerGlobals updates the local cache with a list of global rate limits. This method should only
// be called by a peer who is the owner of a global rate limit.
func (s *V1Instance) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobalsReq) (*UpdatePeerGlobalsResp, error) {
now := MillisecondNow()
for _, g := range r.Globals {
item := &CacheItem{
ExpireAt: g.Status.ResetTime,
ExpireAt: g.Status.ResetTime + 100000,
Algorithm: g.Algorithm,
Value: g.Status,
Key: g.Key,
}
switch g.Algorithm {
case Algorithm_LEAKY_BUCKET:
item.Value = &LeakyBucketItem{
Remaining: float64(g.Status.Remaining),
Limit: g.Status.Limit,
Burst: g.Status.Limit,
UpdatedAt: now,
}
case Algorithm_TOKEN_BUCKET:
item.Value = &TokenBucketItem{
Status: g.Status.Status,
Limit: g.Status.Limit,
Remaining: g.Status.Remaining,
CreatedAt: now,
}
}
err := s.workerPool.AddCacheItem(ctx, g.Key, item)
if err != nil {
return nil, errors.Wrap(err, "Error in workerPool.AddCacheItem")
Expand Down

0 comments on commit c8c40c6

Please sign in to comment.