Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Add back tests that were erroneously removed.
Browse files Browse the repository at this point in the history
  • Loading branch information
Baliedge committed Mar 11, 2024
1 parent 24bee89 commit ea42442
Showing 1 changed file with 207 additions and 8 deletions.
215 changes: 207 additions & 8 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,205 @@ func TestGlobalRateLimitsPeerOverLimit(t *testing.T) {
sendHit(guber.Status_OVER_LIMIT, 0, 0)
}

func TestGlobalRequestMoreThanAvailable(t *testing.T) {
name := t.Name()
key := randomKey()
owner, err := cluster.FindOwningDaemon(name, key)
require.NoError(t, err)
peers, err := cluster.ListNonOwningDaemons(name, key)
require.NoError(t, err)

sendHit := func(client guber.V1Client, expectedStatus guber.Status, hits int64, remaining int64) {
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10)
defer cancel()
resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: name,
UniqueKey: key,
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Behavior: guber.Behavior_GLOBAL,
Duration: guber.Minute * 1_000,
Hits: hits,
Limit: 100,
},
},
})
assert.NoError(t, err)
assert.Equal(t, "", resp.Responses[0].GetError())
assert.Equal(t, expectedStatus, resp.Responses[0].GetStatus())
}

require.NoError(t, waitForIdle(1*time.Minute, cluster.GetDaemons()...))
prev := getMetricValue(t, owner, "gubernator_broadcast_duration_count")

// Ensure GRPC has connections to each peer before we start, as we want
// the actual test requests to happen quite fast.
for _, p := range peers {
sendHit(p.MustClient(), guber.Status_UNDER_LIMIT, 0, 100)
}

// Send a request for 50 hits from each non owning peer in the cluster. These requests
// will be queued and sent to the owner as accumulated hits. As a result of the async nature
// of `Behavior_GLOBAL` rate limit requests spread across peers like this will be allowed to
// over-consume their resource within the rate limit window until the owner is updated and
// a broadcast to all peers is received.
//
// The maximum number of resources that can be over-consumed can be calculated by multiplying
// the remainder by the number of peers in the cluster. For example: If you have a remainder of 100
// and a cluster of 10 instances, then the maximum over-consumed resource is 1,000. If you need
// a more accurate remaining calculation, and wish to avoid over consuming a resource, then do
// not use `Behavior_GLOBAL`.
for _, p := range peers {
sendHit(p.MustClient(), guber.Status_UNDER_LIMIT, 50, 50)
}

// Wait for the broadcast from the owner to the peers
require.NoError(t, waitForBroadcast(clock.Second*10, owner, prev+1))

// We should be over the limit
sendHit(peers[0].MustClient(), guber.Status_OVER_LIMIT, 1, 0)
}

func TestGlobalNegativeHits(t *testing.T) {
name := t.Name()
key := randomKey()
owner, err := cluster.FindOwningDaemon(name, key)
require.NoError(t, err)
peers, err := cluster.ListNonOwningDaemons(name, key)
require.NoError(t, err)

sendHit := func(client guber.V1Client, status guber.Status, hits int64, remaining int64) {
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10)
defer cancel()
resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: name,
UniqueKey: key,
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Behavior: guber.Behavior_GLOBAL,
Duration: guber.Minute * 100,
Hits: hits,
Limit: 2,
},
},
})
assert.NoError(t, err)
assert.Equal(t, "", resp.Responses[0].GetError())
assert.Equal(t, status, resp.Responses[0].GetStatus())
assert.Equal(t, remaining, resp.Responses[0].Remaining)
}

require.NoError(t, waitForIdle(1*time.Minute, cluster.GetDaemons()...))

prev := getMetricValue(t, owner, "gubernator_broadcast_duration_count")
require.NoError(t, err)

// Send a negative hit on a rate limit with no hits
sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, -1, 3)

// Wait for the negative remaining to propagate
require.NoError(t, waitForBroadcast(clock.Second*10, owner, prev+1))

// Send another negative hit to a different peer
sendHit(peers[1].MustClient(), guber.Status_UNDER_LIMIT, -1, 4)

require.NoError(t, waitForBroadcast(clock.Second*10, owner, prev+2))

// Should have 4 in the remainder
sendHit(peers[2].MustClient(), guber.Status_UNDER_LIMIT, 4, 0)

require.NoError(t, waitForBroadcast(clock.Second*10, owner, prev+3))

sendHit(peers[3].MustClient(), guber.Status_UNDER_LIMIT, 0, 0)
}

func TestGlobalResetRemaining(t *testing.T) {
name := t.Name()
key := randomKey()
owner, err := cluster.FindOwningDaemon(name, key)
require.NoError(t, err)
peers, err := cluster.ListNonOwningDaemons(name, key)
require.NoError(t, err)

sendHit := func(client guber.V1Client, expectedStatus guber.Status, hits int64, remaining int64) {
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10)
defer cancel()
resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: name,
UniqueKey: key,
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Behavior: guber.Behavior_GLOBAL,
Duration: guber.Minute * 1_000,
Hits: hits,
Limit: 100,
},
},
})
assert.NoError(t, err)
assert.Equal(t, "", resp.Responses[0].GetError())
assert.Equal(t, expectedStatus, resp.Responses[0].GetStatus())
assert.Equal(t, remaining, resp.Responses[0].Remaining)
}

require.NoError(t, waitForIdle(1*time.Minute, cluster.GetDaemons()...))

prev := getMetricValue(t, owner, "gubernator_broadcast_duration_count")
require.NoError(t, err)

for _, p := range peers {
sendHit(p.MustClient(), guber.Status_UNDER_LIMIT, 50, 50)
}

// Wait for the broadcast from the owner to the peers
require.NoError(t, waitForBroadcast(clock.Second*10, owner, prev+1))

// We should be over the limit and remaining should be zero
sendHit(peers[0].MustClient(), guber.Status_OVER_LIMIT, 1, 0)

// Now reset the remaining
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10)
defer cancel()
resp, err := peers[0].MustClient().GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: name,
UniqueKey: key,
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Behavior: guber.Behavior_GLOBAL | guber.Behavior_RESET_REMAINING,
Duration: guber.Minute * 1_000,
Hits: 0,
Limit: 100,
},
},
})
require.NoError(t, err)
assert.NotEqual(t, 100, resp.Responses[0].Remaining)

// Wait for the reset to propagate.
require.NoError(t, waitForBroadcast(clock.Second*10, owner, prev+2))

// Check a different peer to ensure remaining has been reset
resp, err = peers[1].MustClient().GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: name,
UniqueKey: key,
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Behavior: guber.Behavior_GLOBAL,
Duration: guber.Minute * 1_000,
Hits: 0,
Limit: 100,
},
},
})
require.NoError(t, err)
assert.NotEqual(t, 100, resp.Responses[0].Remaining)
}

func TestChangeLimit(t *testing.T) {
client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
require.Nil(t, errs)
Expand Down Expand Up @@ -1931,7 +2130,7 @@ func TestGlobalBehavior(t *testing.T) {
for _, peer := range cluster.GetDaemons() {
expected := gprlCounters[peer.InstanceID]
if peer.InstanceID == owner.InstanceID {
expected += len(expectUpdate)
expected += float64(len(expectUpdate))
}
assert.Equal(t, expected, gprlCounters2[peer.InstanceID], "gprlCounter %s", peer.InstanceID)
}
Expand Down Expand Up @@ -2033,7 +2232,7 @@ func getMetric(in io.Reader, name string) (*model.Sample, error) {
// waitForBroadcast waits until the broadcast count for the daemon changes to
// at least the expected value and the broadcast queue is empty.
// Returns an error if timeout waiting for conditions to be met.
func waitForBroadcast(timeout clock.Duration, d *guber.Daemon, expect int) error {
func waitForBroadcast(timeout clock.Duration, d *guber.Daemon, expect float64) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

Expand All @@ -2048,7 +2247,7 @@ func waitForBroadcast(timeout clock.Duration, d *guber.Daemon, expect int) error

// It's possible a broadcast occurred twice if waiting for multiple
// peers to forward updates to non-owners.
if int(gbdc.Value) >= expect && ggql.Value == 0 {
if float64(gbdc.Value) >= expect && ggql.Value == 0 {
return nil
}

Expand All @@ -2063,7 +2262,7 @@ func waitForBroadcast(timeout clock.Duration, d *guber.Daemon, expect int) error
// waitForUpdate waits until the global hits update count for the daemon
// changes to at least the expected value and the global update queue is empty.
// Returns an error if timeout waiting for conditions to be met.
func waitForUpdate(timeout clock.Duration, d *guber.Daemon, expect int) error {
func waitForUpdate(timeout clock.Duration, d *guber.Daemon, expect float64) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

Expand All @@ -2078,7 +2277,7 @@ func waitForUpdate(timeout clock.Duration, d *guber.Daemon, expect int) error {

// It's possible a hit occurred twice if waiting for multiple peers to
// forward updates to the owner.
if int(gsdc.Value) >= expect && gsql.Value == 0 {
if float64(gsdc.Value) >= expect && gsql.Value == 0 {
return nil
}

Expand Down Expand Up @@ -2138,10 +2337,10 @@ func getMetricValue(t *testing.T, d *guber.Daemon, name string) float64 {
}

// Get metric counter values on each peer.
func getPeerCounters(t *testing.T, peers []*guber.Daemon, name string) map[string]int {
counters := make(map[string]int)
func getPeerCounters(t *testing.T, peers []*guber.Daemon, name string) map[string]float64 {
counters := make(map[string]float64)
for _, peer := range peers {
counters[peer.InstanceID] = int(getMetricValue(t, peer, name))
counters[peer.InstanceID] = getMetricValue(t, peer, name)
}
return counters
}
Expand Down

0 comments on commit ea42442

Please sign in to comment.