From 38b0f795ad790a8d4416af6305b9af960966c993 Mon Sep 17 00:00:00 2001 From: Shawn Poulson Date: Fri, 8 Mar 2024 15:56:17 -0500 Subject: [PATCH] Fix hit on non-owner peer changing status when Hits=0. It should just return whatever status is cached. --- functional_test.go | 61 +++++++--------------------------------------- global.go | 2 -- gubernator.go | 13 +++++----- 3 files changed, 15 insertions(+), 61 deletions(-) diff --git a/functional_test.go b/functional_test.go index d81a1f19..f1c25285 100644 --- a/functional_test.go +++ b/functional_test.go @@ -896,7 +896,7 @@ func TestGlobalRateLimits(t *testing.T) { var resetTime int64 sendHit := func(status guber.Status, remain int64, i int) string { - ctx, cancel := context.WithTimeout(context.Background(), clock.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), 10*clock.Second) defer cancel() resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ Requests: []*guber.RateLimitReq{ @@ -1032,7 +1032,7 @@ func TestGlobalRateLimitsPeerOverLimit(t *testing.T) { require.NoError(t, err) sendHit := func(expectedStatus guber.Status, hits, expectedRemaining int64) { - ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) + ctx, cancel := context.WithTimeout(context.Background(), 10*clock.Second) defer cancel() resp, err := peers[0].MustClient().GetRateLimits(ctx, &guber.GetRateLimitsReq{ Requests: []*guber.RateLimitReq{ @@ -1041,7 +1041,7 @@ func TestGlobalRateLimitsPeerOverLimit(t *testing.T) { UniqueKey: key, Algorithm: guber.Algorithm_TOKEN_BUCKET, Behavior: guber.Behavior_GLOBAL, - Duration: guber.Minute * 5, + Duration: 5 * guber.Minute, Hits: hits, Limit: 2, }, @@ -1056,65 +1056,22 @@ func TestGlobalRateLimitsPeerOverLimit(t *testing.T) { require.NoError(t, waitForIdle(1*clock.Minute, cluster.GetDaemons()...)) - // Send two hits that should be processed by the owner and the broadcast to peer, depleting the remaining + // Send two hits that should be processed by the owner and the broadcast to + // peer, depleting the remaining. sendHit(guber.Status_UNDER_LIMIT, 1, 1) sendHit(guber.Status_UNDER_LIMIT, 1, 0) // Wait for the broadcast from the owner to the peer require.NoError(t, waitForBroadcast(3*clock.Second, owner, 1)) - // Since the remainder is 0, the peer should set OVER_LIMIT instead of waiting for the owner - // to respond with OVER_LIMIT. + // Since the remainder is 0, the peer should return OVER_LIMIT on next hit. sendHit(guber.Status_OVER_LIMIT, 1, 0) - // Wait for the broadcast from the owner to the peer + // Wait for the broadcast from the owner to the peer. require.NoError(t, waitForBroadcast(3*clock.Second, owner, 2)) - // The status should still be OVER_LIMIT - sendHit(guber.Status_UNDER_LIMIT, 0, 0) - sendHit(guber.Status_OVER_LIMIT, 1, 0) -} - -func TestGlobalRateLimitsPeerOverLimitLeaky(t *testing.T) { - name := t.Name() - key := randomKey() - peers, err := cluster.ListNonOwningDaemons(name, key) - require.NoError(t, err) - owner, err := cluster.FindOwningDaemon(name, key) - require.NoError(t, err) - - sendHit := func(client guber.V1Client, expectedStatus guber.Status, hits int64) { - ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) - defer cancel() - resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ - Requests: []*guber.RateLimitReq{ - { - Name: name, - UniqueKey: key, - Algorithm: guber.Algorithm_LEAKY_BUCKET, - Behavior: guber.Behavior_GLOBAL, - Duration: guber.Minute * 5, - Hits: hits, - Limit: 2, - }, - }, - }) - assert.NoError(t, err) - assert.Equal(t, "", resp.Responses[0].GetError()) - assert.Equal(t, expectedStatus, resp.Responses[0].GetStatus()) - } - - require.NoError(t, waitForIdle(1*clock.Minute, cluster.GetDaemons()...)) - - // Send two hits that should be processed by the owner and the broadcast to peer, depleting the remaining - sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 1) - sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 1) - - // Wait for the broadcast from the owner to the peers - require.NoError(t, waitForBroadcast(clock.Second*3, owner, 1)) - - // Ask a different peer if the status is over the limit - sendHit(peers[1].MustClient(), guber.Status_OVER_LIMIT, 1) + // The status should still be OVER_LIMIT. + sendHit(guber.Status_OVER_LIMIT, 0, 0) } func TestChangeLimit(t *testing.T) { diff --git a/global.go b/global.go index e8c17d64..5709dea8 100644 --- a/global.go +++ b/global.go @@ -140,7 +140,6 @@ func (gm *globalManager) runAsyncHits() { // sendHits takes the hits collected by runAsyncHits and sends them to their // owning peers func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { - // fmt.Printf("sendHits() %s, hits: %d\n", gm.instance.conf.InstanceID, len(hits)) type pair struct { client *PeerClient req GetPeerRateLimitsReq @@ -233,7 +232,6 @@ func (gm *globalManager) runBroadcasts() { // broadcastPeers broadcasts global rate limit statuses to all other peers func (gm *globalManager) broadcastPeers(updates map[string]*RateLimitReq) { - // fmt.Printf("broadcastPeers() %s, updates: %d\n", gm.instance.conf.InstanceID, len(updates)) defer prometheus.NewTimer(gm.metricBroadcastDuration).ObserveDuration() ctx := context.Background() var req UpdatePeerGlobalsReq diff --git a/gubernator.go b/gubernator.go index dc4be285..a4a2625a 100644 --- a/gubernator.go +++ b/gubernator.go @@ -406,10 +406,12 @@ func (s *V1Instance) getGlobalRateLimit(ctx context.Context, req *RateLimitReq) rl, ok := item.Value.(*RateLimitResp) if ok { rl2 := proto.Clone(rl).(*RateLimitResp) - if req.Hits > rl2.Remaining { - rl2.Status = Status_OVER_LIMIT - } else { - rl2.Status = Status_UNDER_LIMIT + if req.Hits != 0 { + if req.Hits > rl2.Remaining { + rl2.Status = Status_OVER_LIMIT + } else { + rl2.Status = Status_UNDER_LIMIT + } } return rl2, nil } @@ -433,7 +435,6 @@ func (s *V1Instance) getGlobalRateLimit(ctx context.Context, req *RateLimitReq) // UpdatePeerGlobals updates the local cache with a list of global rate limits. This method should only // be called by a peer who is the owner of a global rate limit. func (s *V1Instance) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobalsReq) (*UpdatePeerGlobalsResp, error) { - // fmt.Printf("UpdatePeerGlobals() %s, req: %s", s.conf.InstanceID, spew.Sdump(r)) defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.UpdatePeerGlobals")).ObserveDuration() for _, g := range r.Globals { item := &CacheItem{ @@ -501,7 +502,6 @@ func (s *V1Instance) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimits rl = &RateLimitResp{Error: err.Error()} // metricCheckErrorCounter is updated within getLocalRateLimit(), not in GetPeerRateLimits. } - // fmt.Printf("GetPeerRateLimits() %s, hits: %d, resp: %#v\n", s.conf.InstanceID, req.Hits, rl) respChan <- respOut{rin.idx, rl} return nil @@ -578,7 +578,6 @@ func (s *V1Instance) getLocalRateLimit(ctx context.Context, r *RateLimitReq) (_ return nil, errors.Wrap(err, "during workerPool.GetRateLimit") } - // fmt.Printf("getLocalRateLimit() %s, resp: %#v\n", s.conf.InstanceID, resp) metricGetRateLimitCounter.WithLabelValues("local").Inc() // If global behavior and owning peer, broadcast update to all peers.