Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Fixed a race in getGlobalRateLimit
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed Jan 13, 2021
1 parent 502dc9b commit 69313d4
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 49 deletions.
9 changes: 6 additions & 3 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.0.0-rc.5] - 2020-12-21
### Change
* Respect SIGTERM from docker during shutdown.
* Peer info provided to etcd and memberlist pools is now consistent.
## Changes
* Respect SIGTERM from docker during shutdown
* Peer info provided to etcd and memberlist pools is now consistent
* Fixed a race in getGlobalRateLimit
* Fixed issues with EtcdPool
* Changes in preparation of MultiRegion support testing
### Added
* Added GUBER_K8S_WATCH_MECHANISM for k8s deployments.

Expand Down
8 changes: 4 additions & 4 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func BenchmarkServer_GetPeerRateLimitNoBatching(b *testing.B) {
}

client := guber.NewPeerClient(guber.PeerConfig{
Info: cluster.GetRandomPeer(),
Info: cluster.GetRandomPeer(cluster.DataCenterNone),
Behavior: conf.Behaviors,
})

Expand All @@ -54,7 +54,7 @@ func BenchmarkServer_GetPeerRateLimitNoBatching(b *testing.B) {
}

func BenchmarkServer_GetRateLimit(b *testing.B) {
client, err := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil)
client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
if err != nil {
b.Errorf("NewV1Client err: %s", err)
}
Expand All @@ -80,7 +80,7 @@ func BenchmarkServer_GetRateLimit(b *testing.B) {
}

func BenchmarkServer_Ping(b *testing.B) {
client, err := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil)
client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
if err != nil {
b.Errorf("NewV1Client err: %s", err)
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func BenchmarkServer_Ping(b *testing.B) {
}*/

func BenchmarkServer_ThunderingHeard(b *testing.B) {
client, err := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil)
client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
if err != nil {
b.Errorf("NewV1Client err: %s", err)
}
Expand Down
25 changes: 23 additions & 2 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cluster

import (
"context"
"fmt"
"math/rand"

"github.com/mailgun/gubernator"
Expand All @@ -26,12 +27,30 @@ import (
"github.com/sirupsen/logrus"
)

const (
DataCenterNone = ""
DataCenterOne = "datacenter-1"
DataCenterTwo = "datacenter-2"
)

var daemons []*gubernator.Daemon
var peers []gubernator.PeerInfo

// Returns a random peer from the cluster
func GetRandomPeer() gubernator.PeerInfo {
return peers[rand.Intn(len(peers))]
func GetRandomPeer(dc string) gubernator.PeerInfo {
var local []gubernator.PeerInfo

for _, p := range peers {
if p.DataCenter == dc {
local = append(local, p)
}
}

if len(local) == 0 {
panic(fmt.Sprintf("failed to find random peer for dc '%s'", dc))
}

return local[rand.Intn(len(local))]
}

// Returns a list of all peers in the cluster
Expand Down Expand Up @@ -81,6 +100,7 @@ func StartWith(localPeers []gubernator.PeerInfo) error {
Logger: logrus.WithField("instance", peer.GRPCAddress),
GRPCListenAddress: peer.GRPCAddress,
HTTPListenAddress: peer.HTTPAddress,
DataCenter: peer.DataCenter,
Behaviors: gubernator.BehaviorConfig{
// Suitable for testing but not production
GlobalSyncWait: clock.Millisecond * 50,
Expand All @@ -98,6 +118,7 @@ func StartWith(localPeers []gubernator.PeerInfo) error {
peers = append(peers, gubernator.PeerInfo{
GRPCAddress: d.GRPCListeners[0].Addr().String(),
HTTPAddress: d.HTTPListener.Addr().String(),
DataCenter: peer.DataCenter,
})
daemons = append(daemons, d)
}
Expand Down
91 changes: 57 additions & 34 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,18 @@ import (
// Setup and shutdown the mock gubernator cluster for the entire test suite
func TestMain(m *testing.M) {
if err := cluster.StartWith([]guber.PeerInfo{
{GRPCAddress: "127.0.0.1:9990", HTTPAddress: "127.0.0.1:9980"},
{GRPCAddress: "127.0.0.1:9991", HTTPAddress: "127.0.0.1:9981"},
{GRPCAddress: "127.0.0.1:9992", HTTPAddress: "127.0.0.1:9982"},
{GRPCAddress: "127.0.0.1:9993", HTTPAddress: "127.0.0.1:9983"},
{GRPCAddress: "127.0.0.1:9994", HTTPAddress: "127.0.0.1:9984"},
{GRPCAddress: "127.0.0.1:9995", HTTPAddress: "127.0.0.1:9985"},
{GRPCAddress: "127.0.0.1:9990", HTTPAddress: "127.0.0.1:9980", DataCenter: cluster.DataCenterNone},
{GRPCAddress: "127.0.0.1:9991", HTTPAddress: "127.0.0.1:9981", DataCenter: cluster.DataCenterNone},
{GRPCAddress: "127.0.0.1:9992", HTTPAddress: "127.0.0.1:9982", DataCenter: cluster.DataCenterNone},
{GRPCAddress: "127.0.0.1:9993", HTTPAddress: "127.0.0.1:9983", DataCenter: cluster.DataCenterNone},
{GRPCAddress: "127.0.0.1:9994", HTTPAddress: "127.0.0.1:9984", DataCenter: cluster.DataCenterNone},
{GRPCAddress: "127.0.0.1:9995", HTTPAddress: "127.0.0.1:9985", DataCenter: cluster.DataCenterNone},

// DataCenterOne
{GRPCAddress: "127.0.0.1:9890", HTTPAddress: "127.0.0.1:9880", DataCenter: cluster.DataCenterOne},
{GRPCAddress: "127.0.0.1:9891", HTTPAddress: "127.0.0.1:9881", DataCenter: cluster.DataCenterOne},
{GRPCAddress: "127.0.0.1:9892", HTTPAddress: "127.0.0.1:9882", DataCenter: cluster.DataCenterOne},
{GRPCAddress: "127.0.0.1:9893", HTTPAddress: "127.0.0.1:9883", DataCenter: cluster.DataCenterOne},
}); err != nil {
fmt.Println(err)
os.Exit(1)
Expand All @@ -53,7 +59,7 @@ func TestMain(m *testing.M) {
}

func TestOverTheLimit(t *testing.T) {
client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil)
client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
require.Nil(t, errs)

tests := []struct {
Expand Down Expand Up @@ -102,61 +108,68 @@ func TestOverTheLimit(t *testing.T) {
func TestTokenBucket(t *testing.T) {
defer clock.Freeze(clock.Now()).Unfreeze()

client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil)
addr := cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress
client, errs := guber.DialV1Server(addr, nil)
require.Nil(t, errs)

tests := []struct {
name string
Remaining int64
Status guber.Status
Sleep clock.Duration
}{
{
name: "remaining should be one",
Remaining: 1,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Duration(0),
},
{
name: "remaining should be zero and under limit",
Remaining: 0,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Millisecond * 100,
},
{
name: "after waiting 100ms remaining should be 1 and under limit",
Remaining: 1,
Status: guber.Status_UNDER_LIMIT,
Sleep: clock.Duration(0),
},
}

for _, test := range tests {
resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "test_token_bucket",
UniqueKey: "account:1234",
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Duration: guber.Millisecond * 5,
Limit: 2,
Hits: 1,
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "test_token_bucket",
UniqueKey: "account:1234",
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Duration: guber.Millisecond * 5,
Limit: 2,
Hits: 1,
},
},
},
})
require.Nil(t, err)
})
require.Nil(t, err)

rl := resp.Responses[0]
rl := resp.Responses[0]

assert.Empty(t, rl.Error)
assert.Equal(t, test.Status, rl.Status)
assert.Equal(t, test.Remaining, rl.Remaining)
assert.Equal(t, int64(2), rl.Limit)
assert.True(t, rl.ResetTime != 0)
clock.Advance(test.Sleep)
assert.Empty(t, rl.Error)
assert.Equal(t, tt.Status, rl.Status)
assert.Equal(t, tt.Remaining, rl.Remaining)
assert.Equal(t, int64(2), rl.Limit)
assert.True(t, rl.ResetTime != 0)
clock.Advance(tt.Sleep)
})
}
}

func TestTokenBucketGregorian(t *testing.T) {
defer clock.Freeze(clock.Now()).Unfreeze()

client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil)
client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
require.Nil(t, errs)

tests := []struct {
Expand Down Expand Up @@ -394,7 +407,7 @@ func TestLeakyBucketGregorian(t *testing.T) {
}

func TestMissingFields(t *testing.T) {
client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil)
client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
require.Nil(t, errs)

tests := []struct {
Expand Down Expand Up @@ -527,7 +540,7 @@ func TestGlobalRateLimits(t *testing.T) {
}

func TestChangeLimit(t *testing.T) {
client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil)
client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
require.Nil(t, errs)

tests := []struct {
Expand Down Expand Up @@ -622,7 +635,7 @@ func TestChangeLimit(t *testing.T) {
}

func TestResetRemaining(t *testing.T) {
client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil)
client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
require.Nil(t, errs)

tests := []struct {
Expand Down Expand Up @@ -765,7 +778,7 @@ func TestHealthCheck(t *testing.T) {
func TestLeakyBucketDivBug(t *testing.T) {
defer clock.Freeze(clock.Now()).Unfreeze()

client, err := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil)
client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
require.NoError(t, err)

resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Expand Down Expand Up @@ -804,8 +817,18 @@ func TestLeakyBucketDivBug(t *testing.T) {
assert.Equal(t, int64(2000), resp.Responses[0].Limit)
}

func TestMutliRegion(t *testing.T) {

// TODO: Queue a rate limit with multi region behavior on the DataCenterNone cluster
// TODO: Check the immediate response is correct
// TODO: Wait until the rate limit count shows up on the DataCenterOne and DataCenterTwo cluster

// TODO: Increment the counts on the DataCenterTwo and DataCenterOne clusters
// TODO: Wait until both rate limit count show up on all datacenters
}

func TestGRPCGateway(t *testing.T) {
resp, err := http.DefaultClient.Get("http://" + cluster.GetRandomPeer().HTTPAddress + "/v1/HealthCheck")
resp, err := http.DefaultClient.Get("http://" + cluster.GetRandomPeer(cluster.DataCenterNone).HTTPAddress + "/v1/HealthCheck")
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
Expand Down
4 changes: 2 additions & 2 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func newGlobalManager(conf BehaviorConfig, instance *V1Instance) *globalManager
Name: "gubernator_broadcast_durations",
Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001},
}),
asyncQueue: make(chan *RateLimitReq, 0),
broadcastQueue: make(chan *RateLimitReq, 0),
asyncQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit),
broadcastQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit),
instance: instance,
conf: conf,
}
Expand Down
7 changes: 5 additions & 2 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,11 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G
// getGlobalRateLimit handles rate limits that are marked as `Behavior = GLOBAL`. Rate limit responses
// are returned from the local cache and the hits are queued to be sent to the owning peer.
func (s *V1Instance) getGlobalRateLimit(req *RateLimitReq) (*RateLimitResp, error) {
// Queue the hit for async update
s.global.QueueHit(req)
// Queue the hit for async update after we have prepared our response.
// NOTE: The defer here avoids a race condition where we queue the req to
// be forwarded to the owning peer in a separate goroutine but simultaneously
// access and possibly copy the req in this method.
defer s.global.QueueHit(req)

s.conf.Cache.Lock()
item, ok := s.conf.Cache.GetItem(req.HashKey())
Expand Down
2 changes: 1 addition & 1 deletion multiregion.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func newMultiRegionManager(conf BehaviorConfig, instance *V1Instance) *mutliRegi
conf: conf,
instance: instance,
log: instance.log,
reqQueue: make(chan *RateLimitReq, 0),
reqQueue: make(chan *RateLimitReq, conf.MultiRegionBatchLimit),
}
mm.runAsyncReqs()
return &mm
Expand Down
2 changes: 1 addition & 1 deletion peer_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestPeerClientShutdown(t *testing.T) {

t.Run(c.Name, func(t *testing.T) {
client := gubernator.NewPeerClient(gubernator.PeerConfig{
Info: cluster.GetRandomPeer(),
Info: cluster.GetRandomPeer(cluster.DataCenterNone),
Behavior: config,
})

Expand Down

0 comments on commit 69313d4

Please sign in to comment.