diff --git a/functional_test.go b/functional_test.go index 516c3f8d..dfa12ee7 100644 --- a/functional_test.go +++ b/functional_test.go @@ -1159,6 +1159,205 @@ func TestGlobalRateLimitsPeerOverLimit(t *testing.T) { sendHit(guber.Status_OVER_LIMIT, 0, 0) } +func TestGlobalRequestMoreThanAvailable(t *testing.T) { + name := t.Name() + key := randomKey() + owner, err := cluster.FindOwningDaemon(name, key) + require.NoError(t, err) + peers, err := cluster.ListNonOwningDaemons(name, key) + require.NoError(t, err) + + sendHit := func(client guber.V1Client, expectedStatus guber.Status, hits int64, remaining int64) { + ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) + defer cancel() + resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ + Requests: []*guber.RateLimitReq{ + { + Name: name, + UniqueKey: key, + Algorithm: guber.Algorithm_LEAKY_BUCKET, + Behavior: guber.Behavior_GLOBAL, + Duration: guber.Minute * 1_000, + Hits: hits, + Limit: 100, + }, + }, + }) + assert.NoError(t, err) + assert.Equal(t, "", resp.Responses[0].GetError()) + assert.Equal(t, expectedStatus, resp.Responses[0].GetStatus()) + } + + require.NoError(t, waitForIdle(1*time.Minute, cluster.GetDaemons()...)) + prev := getMetricValue(t, owner, "gubernator_broadcast_duration_count") + + // Ensure GRPC has connections to each peer before we start, as we want + // the actual test requests to happen quite fast. + for _, p := range peers { + sendHit(p.MustClient(), guber.Status_UNDER_LIMIT, 0, 100) + } + + // Send a request for 50 hits from each non owning peer in the cluster. These requests + // will be queued and sent to the owner as accumulated hits. As a result of the async nature + // of `Behavior_GLOBAL` rate limit requests spread across peers like this will be allowed to + // over-consume their resource within the rate limit window until the owner is updated and + // a broadcast to all peers is received. + // + // The maximum number of resources that can be over-consumed can be calculated by multiplying + // the remainder by the number of peers in the cluster. For example: If you have a remainder of 100 + // and a cluster of 10 instances, then the maximum over-consumed resource is 1,000. If you need + // a more accurate remaining calculation, and wish to avoid over consuming a resource, then do + // not use `Behavior_GLOBAL`. + for _, p := range peers { + sendHit(p.MustClient(), guber.Status_UNDER_LIMIT, 50, 50) + } + + // Wait for the broadcast from the owner to the peers + require.NoError(t, waitForBroadcast(clock.Second*10, owner, prev+1)) + + // We should be over the limit + sendHit(peers[0].MustClient(), guber.Status_OVER_LIMIT, 1, 0) +} + +func TestGlobalNegativeHits(t *testing.T) { + name := t.Name() + key := randomKey() + owner, err := cluster.FindOwningDaemon(name, key) + require.NoError(t, err) + peers, err := cluster.ListNonOwningDaemons(name, key) + require.NoError(t, err) + + sendHit := func(client guber.V1Client, status guber.Status, hits int64, remaining int64) { + ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) + defer cancel() + resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ + Requests: []*guber.RateLimitReq{ + { + Name: name, + UniqueKey: key, + Algorithm: guber.Algorithm_TOKEN_BUCKET, + Behavior: guber.Behavior_GLOBAL, + Duration: guber.Minute * 100, + Hits: hits, + Limit: 2, + }, + }, + }) + assert.NoError(t, err) + assert.Equal(t, "", resp.Responses[0].GetError()) + assert.Equal(t, status, resp.Responses[0].GetStatus()) + assert.Equal(t, remaining, resp.Responses[0].Remaining) + } + + require.NoError(t, waitForIdle(1*time.Minute, cluster.GetDaemons()...)) + + prev := getMetricValue(t, owner, "gubernator_broadcast_duration_count") + require.NoError(t, err) + + // Send a negative hit on a rate limit with no hits + sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, -1, 3) + + // Wait for the negative remaining to propagate + require.NoError(t, waitForBroadcast(clock.Second*10, owner, prev+1)) + + // Send another negative hit to a different peer + sendHit(peers[1].MustClient(), guber.Status_UNDER_LIMIT, -1, 4) + + require.NoError(t, waitForBroadcast(clock.Second*10, owner, prev+2)) + + // Should have 4 in the remainder + sendHit(peers[2].MustClient(), guber.Status_UNDER_LIMIT, 4, 0) + + require.NoError(t, waitForBroadcast(clock.Second*10, owner, prev+3)) + + sendHit(peers[3].MustClient(), guber.Status_UNDER_LIMIT, 0, 0) +} + +func TestGlobalResetRemaining(t *testing.T) { + name := t.Name() + key := randomKey() + owner, err := cluster.FindOwningDaemon(name, key) + require.NoError(t, err) + peers, err := cluster.ListNonOwningDaemons(name, key) + require.NoError(t, err) + + sendHit := func(client guber.V1Client, expectedStatus guber.Status, hits int64, remaining int64) { + ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) + defer cancel() + resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ + Requests: []*guber.RateLimitReq{ + { + Name: name, + UniqueKey: key, + Algorithm: guber.Algorithm_LEAKY_BUCKET, + Behavior: guber.Behavior_GLOBAL, + Duration: guber.Minute * 1_000, + Hits: hits, + Limit: 100, + }, + }, + }) + assert.NoError(t, err) + assert.Equal(t, "", resp.Responses[0].GetError()) + assert.Equal(t, expectedStatus, resp.Responses[0].GetStatus()) + assert.Equal(t, remaining, resp.Responses[0].Remaining) + } + + require.NoError(t, waitForIdle(1*time.Minute, cluster.GetDaemons()...)) + + prev := getMetricValue(t, owner, "gubernator_broadcast_duration_count") + require.NoError(t, err) + + for _, p := range peers { + sendHit(p.MustClient(), guber.Status_UNDER_LIMIT, 50, 50) + } + + // Wait for the broadcast from the owner to the peers + require.NoError(t, waitForBroadcast(clock.Second*10, owner, prev+1)) + + // We should be over the limit and remaining should be zero + sendHit(peers[0].MustClient(), guber.Status_OVER_LIMIT, 1, 0) + + // Now reset the remaining + ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) + defer cancel() + resp, err := peers[0].MustClient().GetRateLimits(ctx, &guber.GetRateLimitsReq{ + Requests: []*guber.RateLimitReq{ + { + Name: name, + UniqueKey: key, + Algorithm: guber.Algorithm_LEAKY_BUCKET, + Behavior: guber.Behavior_GLOBAL | guber.Behavior_RESET_REMAINING, + Duration: guber.Minute * 1_000, + Hits: 0, + Limit: 100, + }, + }, + }) + require.NoError(t, err) + assert.NotEqual(t, 100, resp.Responses[0].Remaining) + + // Wait for the reset to propagate. + require.NoError(t, waitForBroadcast(clock.Second*10, owner, prev+2)) + + // Check a different peer to ensure remaining has been reset + resp, err = peers[1].MustClient().GetRateLimits(ctx, &guber.GetRateLimitsReq{ + Requests: []*guber.RateLimitReq{ + { + Name: name, + UniqueKey: key, + Algorithm: guber.Algorithm_LEAKY_BUCKET, + Behavior: guber.Behavior_GLOBAL, + Duration: guber.Minute * 1_000, + Hits: 0, + Limit: 100, + }, + }, + }) + require.NoError(t, err) + assert.NotEqual(t, 100, resp.Responses[0].Remaining) +} + func TestChangeLimit(t *testing.T) { client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil) require.Nil(t, errs) @@ -1931,7 +2130,7 @@ func TestGlobalBehavior(t *testing.T) { for _, peer := range cluster.GetDaemons() { expected := gprlCounters[peer.InstanceID] if peer.InstanceID == owner.InstanceID { - expected += len(expectUpdate) + expected += float64(len(expectUpdate)) } assert.Equal(t, expected, gprlCounters2[peer.InstanceID], "gprlCounter %s", peer.InstanceID) } @@ -2033,7 +2232,7 @@ func getMetric(in io.Reader, name string) (*model.Sample, error) { // waitForBroadcast waits until the broadcast count for the daemon changes to // at least the expected value and the broadcast queue is empty. // Returns an error if timeout waiting for conditions to be met. -func waitForBroadcast(timeout clock.Duration, d *guber.Daemon, expect int) error { +func waitForBroadcast(timeout clock.Duration, d *guber.Daemon, expect float64) error { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() @@ -2048,7 +2247,7 @@ func waitForBroadcast(timeout clock.Duration, d *guber.Daemon, expect int) error // It's possible a broadcast occurred twice if waiting for multiple // peers to forward updates to non-owners. - if int(gbdc.Value) >= expect && ggql.Value == 0 { + if float64(gbdc.Value) >= expect && ggql.Value == 0 { return nil } @@ -2063,7 +2262,7 @@ func waitForBroadcast(timeout clock.Duration, d *guber.Daemon, expect int) error // waitForUpdate waits until the global hits update count for the daemon // changes to at least the expected value and the global update queue is empty. // Returns an error if timeout waiting for conditions to be met. -func waitForUpdate(timeout clock.Duration, d *guber.Daemon, expect int) error { +func waitForUpdate(timeout clock.Duration, d *guber.Daemon, expect float64) error { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() @@ -2078,7 +2277,7 @@ func waitForUpdate(timeout clock.Duration, d *guber.Daemon, expect int) error { // It's possible a hit occurred twice if waiting for multiple peers to // forward updates to the owner. - if int(gsdc.Value) >= expect && gsql.Value == 0 { + if float64(gsdc.Value) >= expect && gsql.Value == 0 { return nil } @@ -2138,10 +2337,10 @@ func getMetricValue(t *testing.T, d *guber.Daemon, name string) float64 { } // Get metric counter values on each peer. -func getPeerCounters(t *testing.T, peers []*guber.Daemon, name string) map[string]int { - counters := make(map[string]int) +func getPeerCounters(t *testing.T, peers []*guber.Daemon, name string) map[string]float64 { + counters := make(map[string]float64) for _, peer := range peers { - counters[peer.InstanceID] = int(getMetricValue(t, peer, name)) + counters[peer.InstanceID] = getMetricValue(t, peer, name) } return counters }