diff --git a/functional_test.go b/functional_test.go index 43254eea..fa7a681c 100644 --- a/functional_test.go +++ b/functional_test.go @@ -1022,31 +1022,41 @@ func TestGlobalRateLimits(t *testing.T) { sendHit(peers[4].MustClient(), guber.Status_OVER_LIMIT, 1, 0) } +// Ensure global broadcast updates all peers when GetRateLimits is called on +// either owner or non-owner peer. func TestGlobalRateLimitsWithLoadBalancing(t *testing.T) { - owner := cluster.PeerAt(2).GRPCAddress - peer := cluster.PeerAt(0).GRPCAddress - assert.NotEqual(t, owner, peer) - key := fmt.Sprintf("key:%04x", rand.Intn(1<<16)) + ctx := context.Background() + const name = "test_global" + key := fmt.Sprintf("key:%016x", rand.Int()) + + // Determine owner and non-owner peers. + ownerPeerInfo, err := cluster.FindOwningPeer(name, key) + require.NoError(t, err) + owner := ownerPeerInfo.GRPCAddress + nonOwner := cluster.PeerAt(0).GRPCAddress + if nonOwner == owner { + nonOwner = cluster.PeerAt(1).GRPCAddress + } + require.NotEqual(t, owner, nonOwner) + // Connect to owner and non-owner peers in round robin. dialOpts := []grpc.DialOption{ grpc.WithResolvers(newStaticBuilder()), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), } - - address := fmt.Sprintf("static:///%s,%s", owner, peer) - conn, err := grpc.DialContext(context.Background(), address, dialOpts...) + address := fmt.Sprintf("static:///%s,%s", owner, nonOwner) + conn, err := grpc.DialContext(ctx, address, dialOpts...) require.NoError(t, err) - client := guber.NewV1Client(conn) - sendHit := func(status guber.Status, assertion func(resp *guber.RateLimitResp), i int) string { - ctx, cancel := context.WithTimeout(context.Background(), clock.Hour*5) + sendHit := func(status guber.Status, i int) { + ctx, cancel := context.WithTimeout(ctx, 10*clock.Second) defer cancel() resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ Requests: []*guber.RateLimitReq{ { - Name: "test_global", + Name: name, UniqueKey: key, Algorithm: guber.Algorithm_LEAKY_BUCKET, Behavior: guber.Behavior_GLOBAL, @@ -1057,25 +1067,22 @@ func TestGlobalRateLimitsWithLoadBalancing(t *testing.T) { }, }) require.NoError(t, err, i) - gotResp := resp.Responses[0] - assert.Equal(t, "", gotResp.GetError(), i) - assert.Equal(t, status, gotResp.GetStatus(), i) - - if assertion != nil { - assertion(gotResp) - } - - return gotResp.GetMetadata()["owner"] + item := resp.Responses[0] + assert.Equal(t, "", item.GetError(), fmt.Sprintf("mismatch error, iteration %d", i)) + assert.Equal(t, status, item.GetStatus(), fmt.Sprintf("mismatch status, iteration %d", i)) } - // Send two hits that should be processed by the owner and the peer and deplete the limit - sendHit(guber.Status_UNDER_LIMIT, nil, 1) - sendHit(guber.Status_UNDER_LIMIT, nil, 2) - // sleep to ensure the async forward has occurred and state should be shared - time.Sleep(time.Second * 5) + // Send two hits that should be processed by the owner and non-owner and + // deplete the limit consistently + sendHit(guber.Status_UNDER_LIMIT, 1) + sendHit(guber.Status_UNDER_LIMIT, 2) + + // Sleep to ensure the global broadcast occurs (every 100ms) + time.Sleep(150 * time.Millisecond) + // All successive hits should return OVER_LIMIT. for i := 0; i < 10; i++ { - sendHit(guber.Status_OVER_LIMIT, nil, i+2) + sendHit(guber.Status_OVER_LIMIT, i+2) } }