Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Fix hit on non-owner peer changing status when Hits=0.
Browse files Browse the repository at this point in the history
It should just return whatever status is cached.
  • Loading branch information
Baliedge committed Mar 8, 2024
1 parent 5fd6dad commit 38b0f79
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 61 deletions.
61 changes: 9 additions & 52 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ func TestGlobalRateLimits(t *testing.T) {
var resetTime int64

sendHit := func(status guber.Status, remain int64, i int) string {
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), 10*clock.Second)
defer cancel()
resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
Expand Down Expand Up @@ -1032,7 +1032,7 @@ func TestGlobalRateLimitsPeerOverLimit(t *testing.T) {
require.NoError(t, err)

sendHit := func(expectedStatus guber.Status, hits, expectedRemaining int64) {
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10)
ctx, cancel := context.WithTimeout(context.Background(), 10*clock.Second)
defer cancel()
resp, err := peers[0].MustClient().GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
Expand All @@ -1041,7 +1041,7 @@ func TestGlobalRateLimitsPeerOverLimit(t *testing.T) {
UniqueKey: key,
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Behavior: guber.Behavior_GLOBAL,
Duration: guber.Minute * 5,
Duration: 5 * guber.Minute,
Hits: hits,
Limit: 2,
},
Expand All @@ -1056,65 +1056,22 @@ func TestGlobalRateLimitsPeerOverLimit(t *testing.T) {

require.NoError(t, waitForIdle(1*clock.Minute, cluster.GetDaemons()...))

// Send two hits that should be processed by the owner and the broadcast to peer, depleting the remaining
// Send two hits that should be processed by the owner and the broadcast to
// peer, depleting the remaining.
sendHit(guber.Status_UNDER_LIMIT, 1, 1)
sendHit(guber.Status_UNDER_LIMIT, 1, 0)

// Wait for the broadcast from the owner to the peer
require.NoError(t, waitForBroadcast(3*clock.Second, owner, 1))

// Since the remainder is 0, the peer should set OVER_LIMIT instead of waiting for the owner
// to respond with OVER_LIMIT.
// Since the remainder is 0, the peer should return OVER_LIMIT on next hit.
sendHit(guber.Status_OVER_LIMIT, 1, 0)

// Wait for the broadcast from the owner to the peer
// Wait for the broadcast from the owner to the peer.
require.NoError(t, waitForBroadcast(3*clock.Second, owner, 2))

// The status should still be OVER_LIMIT
sendHit(guber.Status_UNDER_LIMIT, 0, 0)
sendHit(guber.Status_OVER_LIMIT, 1, 0)
}

func TestGlobalRateLimitsPeerOverLimitLeaky(t *testing.T) {
name := t.Name()
key := randomKey()
peers, err := cluster.ListNonOwningDaemons(name, key)
require.NoError(t, err)
owner, err := cluster.FindOwningDaemon(name, key)
require.NoError(t, err)

sendHit := func(client guber.V1Client, expectedStatus guber.Status, hits int64) {
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10)
defer cancel()
resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: name,
UniqueKey: key,
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Behavior: guber.Behavior_GLOBAL,
Duration: guber.Minute * 5,
Hits: hits,
Limit: 2,
},
},
})
assert.NoError(t, err)
assert.Equal(t, "", resp.Responses[0].GetError())
assert.Equal(t, expectedStatus, resp.Responses[0].GetStatus())
}

require.NoError(t, waitForIdle(1*clock.Minute, cluster.GetDaemons()...))

// Send two hits that should be processed by the owner and the broadcast to peer, depleting the remaining
sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 1)
sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 1)

// Wait for the broadcast from the owner to the peers
require.NoError(t, waitForBroadcast(clock.Second*3, owner, 1))

// Ask a different peer if the status is over the limit
sendHit(peers[1].MustClient(), guber.Status_OVER_LIMIT, 1)
// The status should still be OVER_LIMIT.
sendHit(guber.Status_OVER_LIMIT, 0, 0)
}

func TestChangeLimit(t *testing.T) {
Expand Down
2 changes: 0 additions & 2 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ func (gm *globalManager) runAsyncHits() {
// sendHits takes the hits collected by runAsyncHits and sends them to their
// owning peers
func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) {
// fmt.Printf("sendHits() %s, hits: %d\n", gm.instance.conf.InstanceID, len(hits))
type pair struct {
client *PeerClient
req GetPeerRateLimitsReq
Expand Down Expand Up @@ -233,7 +232,6 @@ func (gm *globalManager) runBroadcasts() {

// broadcastPeers broadcasts global rate limit statuses to all other peers
func (gm *globalManager) broadcastPeers(updates map[string]*RateLimitReq) {
// fmt.Printf("broadcastPeers() %s, updates: %d\n", gm.instance.conf.InstanceID, len(updates))
defer prometheus.NewTimer(gm.metricBroadcastDuration).ObserveDuration()
ctx := context.Background()
var req UpdatePeerGlobalsReq
Expand Down
13 changes: 6 additions & 7 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,12 @@ func (s *V1Instance) getGlobalRateLimit(ctx context.Context, req *RateLimitReq)
rl, ok := item.Value.(*RateLimitResp)
if ok {
rl2 := proto.Clone(rl).(*RateLimitResp)
if req.Hits > rl2.Remaining {
rl2.Status = Status_OVER_LIMIT
} else {
rl2.Status = Status_UNDER_LIMIT
if req.Hits != 0 {
if req.Hits > rl2.Remaining {
rl2.Status = Status_OVER_LIMIT
} else {
rl2.Status = Status_UNDER_LIMIT
}
}
return rl2, nil
}
Expand All @@ -433,7 +435,6 @@ func (s *V1Instance) getGlobalRateLimit(ctx context.Context, req *RateLimitReq)
// UpdatePeerGlobals updates the local cache with a list of global rate limits. This method should only
// be called by a peer who is the owner of a global rate limit.
func (s *V1Instance) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobalsReq) (*UpdatePeerGlobalsResp, error) {
// fmt.Printf("UpdatePeerGlobals() %s, req: %s", s.conf.InstanceID, spew.Sdump(r))
defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.UpdatePeerGlobals")).ObserveDuration()
for _, g := range r.Globals {
item := &CacheItem{
Expand Down Expand Up @@ -501,7 +502,6 @@ func (s *V1Instance) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimits
rl = &RateLimitResp{Error: err.Error()}
// metricCheckErrorCounter is updated within getLocalRateLimit(), not in GetPeerRateLimits.
}
// fmt.Printf("GetPeerRateLimits() %s, hits: %d, resp: %#v\n", s.conf.InstanceID, req.Hits, rl)

respChan <- respOut{rin.idx, rl}
return nil
Expand Down Expand Up @@ -578,7 +578,6 @@ func (s *V1Instance) getLocalRateLimit(ctx context.Context, r *RateLimitReq) (_
return nil, errors.Wrap(err, "during workerPool.GetRateLimit")
}

// fmt.Printf("getLocalRateLimit() %s, resp: %#v\n", s.conf.InstanceID, resp)
metricGetRateLimitCounter.WithLabelValues("local").Inc()

// If global behavior and owning peer, broadcast update to all peers.
Expand Down

0 comments on commit 38b0f79

Please sign in to comment.