Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Tidy code.
Browse files Browse the repository at this point in the history
  • Loading branch information
Baliedge committed Feb 22, 2024
1 parent 13c3547 commit 9691805
Showing 1 changed file with 33 additions and 26 deletions.
59 changes: 33 additions & 26 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,31 +1022,41 @@ func TestGlobalRateLimits(t *testing.T) {
sendHit(peers[4].MustClient(), guber.Status_OVER_LIMIT, 1, 0)
}

// Ensure global broadcast updates all peers when GetRateLimits is called on
// either owner or non-owner peer.
func TestGlobalRateLimitsWithLoadBalancing(t *testing.T) {
owner := cluster.PeerAt(2).GRPCAddress
peer := cluster.PeerAt(0).GRPCAddress
assert.NotEqual(t, owner, peer)
key := fmt.Sprintf("key:%04x", rand.Intn(1<<16))
ctx := context.Background()
const name = "test_global"
key := fmt.Sprintf("key:%016x", rand.Int())

// Determine owner and non-owner peers.
ownerPeerInfo, err := cluster.FindOwningPeer(name, key)
require.NoError(t, err)
owner := ownerPeerInfo.GRPCAddress
nonOwner := cluster.PeerAt(0).GRPCAddress
if nonOwner == owner {
nonOwner = cluster.PeerAt(1).GRPCAddress
}
require.NotEqual(t, owner, nonOwner)

// Connect to owner and non-owner peers in round robin.
dialOpts := []grpc.DialOption{
grpc.WithResolvers(newStaticBuilder()),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}

address := fmt.Sprintf("static:///%s,%s", owner, peer)
conn, err := grpc.DialContext(context.Background(), address, dialOpts...)
address := fmt.Sprintf("static:///%s,%s", owner, nonOwner)
conn, err := grpc.DialContext(ctx, address, dialOpts...)
require.NoError(t, err)

client := guber.NewV1Client(conn)

sendHit := func(status guber.Status, assertion func(resp *guber.RateLimitResp), i int) string {
ctx, cancel := context.WithTimeout(context.Background(), clock.Hour*5)
sendHit := func(status guber.Status, i int) {
ctx, cancel := context.WithTimeout(ctx, 10*clock.Second)
defer cancel()
resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "test_global",
Name: name,
UniqueKey: key,
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Behavior: guber.Behavior_GLOBAL,
Expand All @@ -1057,25 +1067,22 @@ func TestGlobalRateLimitsWithLoadBalancing(t *testing.T) {
},
})
require.NoError(t, err, i)
gotResp := resp.Responses[0]
assert.Equal(t, "", gotResp.GetError(), i)
assert.Equal(t, status, gotResp.GetStatus(), i)

if assertion != nil {
assertion(gotResp)
}

return gotResp.GetMetadata()["owner"]
item := resp.Responses[0]
assert.Equal(t, "", item.GetError(), fmt.Sprintf("mismatch error, iteration %d", i))
assert.Equal(t, status, item.GetStatus(), fmt.Sprintf("mismatch status, iteration %d", i))
}

// Send two hits that should be processed by the owner and the peer and deplete the limit
sendHit(guber.Status_UNDER_LIMIT, nil, 1)
sendHit(guber.Status_UNDER_LIMIT, nil, 2)
// sleep to ensure the async forward has occurred and state should be shared
time.Sleep(time.Second * 5)
// Send two hits that should be processed by the owner and non-owner and
// deplete the limit consistently
sendHit(guber.Status_UNDER_LIMIT, 1)
sendHit(guber.Status_UNDER_LIMIT, 2)

// Sleep to ensure the global broadcast occurs (every 100ms)
time.Sleep(150 * time.Millisecond)

// All successive hits should return OVER_LIMIT.
for i := 0; i < 10; i++ {
sendHit(guber.Status_OVER_LIMIT, nil, i+2)
sendHit(guber.Status_OVER_LIMIT, i+2)
}
}

Expand Down

0 comments on commit 9691805

Please sign in to comment.