From c77c8d07d4f1a8d3669f7e08d81b09f500745a20 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Mon, 21 Dec 2020 13:39:26 -0600 Subject: [PATCH 1/4] Peer info provided to etcd and memberlist pools is now consistent --- CHANGELOG | 4 ++ config.go | 16 ++++---- etcd.go | 30 ++++++--------- memberlist.go | 100 +++++++++++++++++++------------------------------- version | 2 +- 5 files changed, 63 insertions(+), 89 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 2808b439..c9ed2053 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.0.0-rc.5] - 2020-12-21 +### Change +* Peer info provided to etcd and memberlist pools is now consistent. + ## [1.0.0-rc.4] - 2020-12-18 ### Change * Fix leaky bucket algorithm diff --git a/config.go b/config.go index db70099b..63aed505 100644 --- a/config.go +++ b/config.go @@ -134,13 +134,13 @@ func (c *Config) SetDefaults() error { type PeerInfo struct { // (Optional) The name of the data center this peer is in. Leave blank if not using multi data center support. - DataCenter string + DataCenter string `json:"data-center"` // (Optional) The http address:port of the peer - HTTPAddress string + HTTPAddress string `json:"http-address"` // (Required) The grpc address:port of the peer - GRPCAddress string + GRPCAddress string `json:"grpc-address"` // (Optional) Is true if PeerInfo is for this instance of gubernator - IsOwner bool + IsOwner bool `json:"is-owner,omitempty"` } // Returns the hash key used to identify this peer in the Picker. @@ -303,13 +303,13 @@ func SetupDaemonConfig(logger *logrus.Logger, configFile string) (DaemonConfig, setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.DialTimeout, getEnvDuration(log, "GUBER_ETCD_DIAL_TIMEOUT"), clock.Second*5) setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.Username, os.Getenv("GUBER_ETCD_USER")) setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.Password, os.Getenv("GUBER_ETCD_PASSWORD")) - setter.SetDefault(&conf.EtcdPoolConf.AdvertiseAddress, os.Getenv("GUBER_ETCD_ADVERTISE_ADDRESS"), conf.AdvertiseAddress) - setter.SetDefault(&conf.EtcdPoolConf.DataCenter, os.Getenv("GUBER_ETCD_DATA_CENTER"), conf.DataCenter) + setter.SetDefault(&conf.EtcdPoolConf.Advertise.GRPCAddress, os.Getenv("GUBER_ETCD_ADVERTISE_ADDRESS"), conf.AdvertiseAddress) + setter.SetDefault(&conf.EtcdPoolConf.Advertise.DataCenter, os.Getenv("GUBER_ETCD_DATA_CENTER"), conf.DataCenter) - setter.SetDefault(&conf.MemberListPoolConf.AdvertiseAddress, os.Getenv("GUBER_MEMBERLIST_ADVERTISE_ADDRESS"), conf.AdvertiseAddress) + setter.SetDefault(&conf.MemberListPoolConf.Advertise.GRPCAddress, os.Getenv("GUBER_MEMBERLIST_ADVERTISE_ADDRESS"), conf.AdvertiseAddress) setter.SetDefault(&conf.MemberListPoolConf.MemberListAddress, os.Getenv("GUBER_MEMBERLIST_ADDRESS"), fmt.Sprintf("%s:7946", advAddr)) setter.SetDefault(&conf.MemberListPoolConf.KnownNodes, getEnvSlice("GUBER_MEMBERLIST_KNOWN_NODES"), []string{}) - setter.SetDefault(&conf.MemberListPoolConf.DataCenter, conf.DataCenter) + setter.SetDefault(&conf.MemberListPoolConf.Advertise.DataCenter, conf.DataCenter) // Kubernetes Config setter.SetDefault(&conf.K8PoolConf.Namespace, os.Getenv("GUBER_K8S_NAMESPACE"), "default") diff --git a/etcd.go b/etcd.go index b15632ba..ec873a41 100644 --- a/etcd.go +++ b/etcd.go @@ -51,8 +51,8 @@ type EtcdPool struct { } type EtcdPoolConfig struct { - // (Required) The address etcd will advertise to other gubernator instances - AdvertiseAddress string + // (Required) This is the peer information that will be advertised to other gubernator instances + Advertise PeerInfo // (Required) An etcd client currently connected to an etcd cluster Client *etcd.Client @@ -68,17 +68,14 @@ type EtcdPoolConfig struct { // (Optional) An interface through which logging will occur (Usually *logrus.Entry) Logger logrus.FieldLogger - - // (Optional) The datacenter this instance belongs too - DataCenter string } func NewEtcdPool(conf EtcdPoolConfig) (*EtcdPool, error) { setter.SetDefault(&conf.KeyPrefix, defaultBaseKey) setter.SetDefault(&conf.Logger, logrus.WithField("category", "gubernator")) - if conf.AdvertiseAddress == "" { - return nil, errors.New("AdvertiseAddress is required") + if conf.Advertise.GRPCAddress == "" { + return nil, errors.New("Advertise.GRPCAddress is required") } if conf.Client == nil { @@ -93,13 +90,13 @@ func NewEtcdPool(conf EtcdPoolConfig) (*EtcdPool, error) { conf: conf, ctx: ctx, } - return pool, pool.run(conf.AdvertiseAddress) + return pool, pool.run(conf.Advertise) } -func (e *EtcdPool) run(addr string) error { +func (e *EtcdPool) run(peer PeerInfo) error { // Register our instance with etcd - if err := e.register(addr); err != nil { + if err := e.register(peer); err != nil { return err } @@ -233,14 +230,11 @@ func (e *EtcdPool) watch() error { return nil } -func (e *EtcdPool) register(name string) error { - instanceKey := e.conf.KeyPrefix + name - e.log.Infof("Registering peer '%s' with etcd", name) +func (e *EtcdPool) register(peer PeerInfo) error { + instanceKey := e.conf.KeyPrefix + peer.GRPCAddress + e.log.Infof("Registering peer '%s' with etcd", peer) - b, err := json.Marshal(PeerInfo{ - GRPCAddress: e.conf.AdvertiseAddress, - DataCenter: e.conf.DataCenter, - }) + b, err := json.Marshal(peer) if err != nil { return errors.Wrap(err, "while marshalling PeerInfo") } @@ -341,7 +335,7 @@ func (e *EtcdPool) callOnUpdate() { var peers []PeerInfo for _, p := range e.peers { - if p.GRPCAddress == e.conf.AdvertiseAddress { + if p.GRPCAddress == e.conf.Advertise.GRPCAddress { p.IsOwner = true } peers = append(peers, p) diff --git a/memberlist.go b/memberlist.go index 03b5b649..4e906b62 100644 --- a/memberlist.go +++ b/memberlist.go @@ -43,6 +43,9 @@ type MemberListPool struct { } type MemberListPoolConfig struct { + // (Required) This is the peer information that will be advertised to other members + Advertise PeerInfo + // (Required) This is the address:port the member list protocol listen for other members on MemberListAddress string @@ -60,9 +63,6 @@ type MemberListPoolConfig struct { // (Optional) An interface through which logging will occur (Usually *logrus.Entry) Logger logrus.FieldLogger - - // (Optional) The datacenter this instance belongs too - DataCenter string } func NewMemberListPool(ctx context.Context, conf MemberListPoolConfig) (*MemberListPool, error) { @@ -89,11 +89,6 @@ func NewMemberListPool(ctx context.Context, conf MemberListPoolConfig) (*MemberL host = addrs[0] } - _, advPort, err := splitAddress(conf.AdvertiseAddress) - if err != nil { - return nil, errors.Wrap(err, "AdvertiseAddress=`%s` is invalid;") - } - // Configure member list event handler m.events = newMemberListEventHandler(m.log, conf) @@ -118,14 +113,9 @@ func NewMemberListPool(ctx context.Context, conf MemberListPoolConfig) (*MemberL // Prep metadata gob.Register(memberListMetadata{}) - metadata := memberListMetadata{ - DataCenter: conf.DataCenter, - AdvertiseAddress: conf.AdvertiseAddress, - GubernatorPort: advPort, - } // Join member list pool - err = m.joinPool(ctx, conf.KnownNodes, metadata) + err = m.joinPool(ctx, conf) if err != nil { return nil, errors.Wrap(err, "while attempting to join the member-list pool") } @@ -133,18 +123,18 @@ func NewMemberListPool(ctx context.Context, conf MemberListPoolConfig) (*MemberL return m, nil } -func (m *MemberListPool) joinPool(ctx context.Context, knownNodes []string, metadata memberListMetadata) error { +func (m *MemberListPool) joinPool(ctx context.Context, conf MemberListPoolConfig) error { // Get local node and set metadata node := m.memberList.LocalNode() - serializedMetadata, err := serializeMemberListMetadata(metadata) + b, err := json.Marshal(&conf.Advertise) if err != nil { - return err + return errors.Wrap(err, "error marshalling PeerInfo as JSON") } - node.Meta = serializedMetadata + node.Meta = b err = retry.Until(ctx, retry.Interval(clock.Millisecond*300), func(ctx context.Context, i int) error { // Join member list - _, err = m.memberList.Join(knownNodes) + _, err = m.memberList.Join(conf.KnownNodes) if err != nil { return errors.Wrap(err, "while joining member-list") } @@ -185,16 +175,11 @@ func newMemberListEventHandler(log logrus.FieldLogger, conf MemberListPoolConfig func (e *memberListEventHandler) addPeer(node *ml.Node) { ip := getIP(node.Address()) - // Deserialize metadata - metadata, err := deserializeMemberListMetadata(node.Meta) + peer, err := unmarshallPeer(node.Meta, ip) if err != nil { e.log.WithError(err).Warnf("while adding to peers") } else { - // Handle deprecated GubernatorPort - if metadata.AdvertiseAddress == "" { - metadata.AdvertiseAddress = makeAddress(ip, metadata.GubernatorPort) - } - e.peers[ip] = PeerInfo{GRPCAddress: metadata.AdvertiseAddress, DataCenter: metadata.DataCenter} + e.peers[ip] = peer e.callOnUpdate() } } @@ -202,20 +187,16 @@ func (e *memberListEventHandler) addPeer(node *ml.Node) { func (e *memberListEventHandler) NotifyJoin(node *ml.Node) { ip := getIP(node.Address()) - // Deserialize metadata - metadata, err := deserializeMemberListMetadata(node.Meta) + peer, err := unmarshallPeer(node.Meta, ip) if err != nil { - // This is called during memberlist initialization due to the fact that the local node + // This is called during member list initialization due to the fact that the local node // has no metadata yet - e.log.WithError(err).Warn("while deserialize member-list metadata") - } else { - // Handle deprecated GubernatorPort - if metadata.AdvertiseAddress == "" { - metadata.AdvertiseAddress = makeAddress(ip, metadata.GubernatorPort) - } - e.peers[ip] = PeerInfo{GRPCAddress: metadata.AdvertiseAddress, DataCenter: metadata.DataCenter} - e.callOnUpdate() + e.log.WithError(err).Warn("while deserialize member-list peer") + return } + peer.IsOwner = false + e.peers[ip] = peer + e.callOnUpdate() } func (e *memberListEventHandler) NotifyLeave(node *ml.Node) { @@ -230,23 +211,20 @@ func (e *memberListEventHandler) NotifyLeave(node *ml.Node) { func (e *memberListEventHandler) NotifyUpdate(node *ml.Node) { ip := getIP(node.Address()) - // Deserialize metadata - metadata, err := deserializeMemberListMetadata(node.Meta) + peer, err := unmarshallPeer(node.Meta, ip) if err != nil { - e.log.WithError(err).Warn("while updating member-list") - } else { - // Construct Gubernator address and create PeerInfo - gubernatorAddress := makeAddress(ip, metadata.GubernatorPort) - e.peers[ip] = PeerInfo{GRPCAddress: gubernatorAddress, DataCenter: metadata.DataCenter} - e.callOnUpdate() + e.log.WithError(err).Warn("while unmarshalling peer info") } + peer.IsOwner = false + e.peers[ip] = peer + e.callOnUpdate() } func (e *memberListEventHandler) callOnUpdate() { var peers []PeerInfo for _, p := range e.peers { - if p.GRPCAddress == e.conf.AdvertiseAddress { + if p.GRPCAddress == e.conf.Advertise.GRPCAddress { p.IsOwner = true } peers = append(peers, p) @@ -263,30 +241,28 @@ func makeAddress(ip string, port int) string { return net.JoinHostPort(ip, strconv.Itoa(port)) } +// Deprecated type memberListMetadata struct { DataCenter string AdvertiseAddress string - // Deprecated - GubernatorPort int -} - -func serializeMemberListMetadata(metadata memberListMetadata) ([]byte, error) { - b, err := json.Marshal(&metadata) - if err != nil { - return nil, errors.Wrap(err, "error marshalling metadata as JSON") - } - return b, nil + GubernatorPort int } -func deserializeMemberListMetadata(b []byte) (*memberListMetadata, error) { - var metadata memberListMetadata - if err := json.Unmarshal(b, &metadata); err != nil { +func unmarshallPeer(b []byte, ip string) (PeerInfo, error) { + var peer PeerInfo + if err := json.Unmarshal(b, &peer); err != nil { + var metadata memberListMetadata decoder := gob.NewDecoder(bytes.NewBuffer(b)) - if err := decoder.Decode(&metadata); err != nil { - return nil, errors.Wrap(err, "error decoding metadata") + if err := decoder.Decode(&peer); err != nil { + return peer, errors.Wrap(err, "error decoding peer") + } + // Handle deprecated GubernatorPort + if metadata.AdvertiseAddress == "" { + metadata.AdvertiseAddress = makeAddress(ip, metadata.GubernatorPort) } + return PeerInfo{GRPCAddress: metadata.AdvertiseAddress, DataCenter: metadata.DataCenter}, nil } - return &metadata, nil + return peer, nil } func newLogWriter(log logrus.FieldLogger) *io.PipeWriter { diff --git a/version b/version index d17f0653..8d3e6e31 100644 --- a/version +++ b/version @@ -1 +1 @@ -1.0.0-rc.4 +1.0.0-rc.5 From 9a92025084c977815040ecf762bd91af3f05f5d4 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Mon, 21 Dec 2020 14:00:18 -0600 Subject: [PATCH 2/4] Respect SIGTERM from docker during shutdown --- CHANGELOG | 3 +++ cmd/gubernator/main.go | 13 ++++++------- etcd.go | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index c9ed2053..26ebf0bf 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -6,7 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [1.0.0-rc.5] - 2020-12-21 ### Change +* Respect SIGTERM from docker during shutdown. * Peer info provided to etcd and memberlist pools is now consistent. +### Added +* Added GUBER_K8S_WATCH_MECHANISM for k8s deployments. ## [1.0.0-rc.4] - 2020-12-18 ### Change diff --git a/cmd/gubernator/main.go b/cmd/gubernator/main.go index c1832cd2..fede97a5 100644 --- a/cmd/gubernator/main.go +++ b/cmd/gubernator/main.go @@ -22,6 +22,7 @@ import ( "os" "os/signal" "runtime" + "syscall" "github.com/mailgun/gubernator" "github.com/mailgun/holster/v3/clock" @@ -62,13 +63,11 @@ func main() { // Wait here for signals to clean up our mess c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - for sig := range c { - if sig == os.Interrupt { - log.Info("caught interrupt; user requested premature exit") - daemon.Close() - os.Exit(0) - } + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + for range c { + log.Info("caught signal; shutting down") + daemon.Close() + os.Exit(0) } } diff --git a/etcd.go b/etcd.go index ec873a41..c8d30f18 100644 --- a/etcd.go +++ b/etcd.go @@ -232,7 +232,7 @@ func (e *EtcdPool) watch() error { func (e *EtcdPool) register(peer PeerInfo) error { instanceKey := e.conf.KeyPrefix + peer.GRPCAddress - e.log.Infof("Registering peer '%s' with etcd", peer) + e.log.Infof("Registering peer '%#v' with etcd", peer) b, err := json.Marshal(peer) if err != nil { From 502dc9bc94734635967be9030fd5a91811fccdde Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Wed, 23 Dec 2020 15:47:43 -0600 Subject: [PATCH 3/4] fixed EtcdPool --- cache.go | 4 ++-- etcd.go | 25 +++++++------------------ grpc_stats.go | 4 ++-- gubernator.go | 4 ++-- 4 files changed, 13 insertions(+), 24 deletions(-) diff --git a/cache.go b/cache.go index 4840ca58..f09497fb 100644 --- a/cache.go +++ b/cache.go @@ -79,14 +79,14 @@ var _ Cache = &LRUCache{} // New creates a new Cache with a maximum size func NewLRUCache(maxSize int) *LRUCache { - setter.SetDefault(&maxSize, 50000) + setter.SetDefault(&maxSize, 50_000) return &LRUCache{ cache: make(map[interface{}]*list.Element), ll: list.New(), cacheSize: maxSize, sizeMetric: prometheus.NewDesc("gubernator_cache_size", - "Size of the LRU Cache which holds the rate limits.", nil, nil), + "The number of items in LRU Cache which holds the rate limits.", nil, nil), accessMetric: prometheus.NewDesc("gubernator_cache_access_count", "Cache access counts.", []string{"type"}, nil), } diff --git a/etcd.go b/etcd.go index c8d30f18..fe1532e3 100644 --- a/etcd.go +++ b/etcd.go @@ -147,12 +147,15 @@ func (e *EtcdPool) collectPeers(revision *int64) error { return errors.Wrapf(err, "while fetching peer listing from '%s'", e.conf.KeyPrefix) } + peers := make(map[string]PeerInfo) // Collect all the peers for _, v := range resp.Kvs { p := e.unMarshallValue(v.Value) - e.peers[p.GRPCAddress] = p + peers[p.GRPCAddress] = p } + e.peers = peers + *revision = resp.Header.Revision e.callOnUpdate() return nil } @@ -169,6 +172,8 @@ func (e *EtcdPool) unMarshallValue(v []byte) PeerInfo { } func (e *EtcdPool) watch() error { + var rev int64 + // Initialize watcher if err := e.watchPeers(); err != nil { return errors.Wrap(err, "while attempting to start watch") @@ -185,23 +190,7 @@ func (e *EtcdPool) watch() error { e.log.Errorf("watch error: %v", err) goto restart } - - for _, event := range response.Events { - switch event.Type { - case etcd.EventTypePut: - if event.Kv != nil { - e.log.Debugf("new peer [%s]", string(event.Kv.Value)) - p := e.unMarshallValue(event.Kv.Value) - e.peers[p.GRPCAddress] = p - } - case etcd.EventTypeDelete: - if event.PrevKv != nil { - e.log.Debugf("removed peer [%s]", string(event.PrevKv.Value)) - delete(e.peers, string(event.PrevKv.Value)) - } - } - e.callOnUpdate() - } + e.collectPeers(&rev) } restart: diff --git a/grpc_stats.go b/grpc_stats.go index ba74038e..44d147ab 100644 --- a/grpc_stats.go +++ b/grpc_stats.go @@ -53,8 +53,8 @@ func NewGRPCStatsHandler() *GRPCStatsHandler { Help: "GRPC requests by status.", }, []string{"status", "method"}), grpcRequestDuration: prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Name: "gubernator_grpc_request_duration_milliseconds", - Help: "GRPC request durations in milliseconds.", + Name: "gubernator_grpc_request_duration", + Help: "GRPC request durations in seconds", Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001}, }, []string{"method"}), } diff --git a/gubernator.go b/gubernator.go index ee398cb3..70868271 100644 --- a/gubernator.go +++ b/gubernator.go @@ -51,7 +51,7 @@ type V1Instance struct { // instance with the provided GRPCServer. func NewV1Instance(conf Config) (*V1Instance, error) { if conf.GRPCServers == nil { - return nil, errors.New("At least one GRPCServer instance is required") + return nil, errors.New("at least one GRPCServer instance is required") } if err := conf.SetDefaults(); err != nil { @@ -427,7 +427,7 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { for _, p := range shutdownPeers { peers = append(peers, p.Info().GRPCAddress) } - s.log.WithField("peers", peers).Debug("Peers shutdown") + s.log.WithField("peers", peers).Debug("peers shutdown") } } From 69313d48a462dd77cfa427d5718e2d70866f7889 Mon Sep 17 00:00:00 2001 From: Derrick Wippler Date: Sun, 10 Jan 2021 13:23:24 -0600 Subject: [PATCH 4/4] Fixed a race in getGlobalRateLimit --- CHANGELOG | 9 +++-- benchmark_test.go | 8 ++-- cluster/cluster.go | 25 ++++++++++++- functional_test.go | 91 ++++++++++++++++++++++++++++----------------- global.go | 4 +- gubernator.go | 7 +++- multiregion.go | 2 +- peer_client_test.go | 2 +- 8 files changed, 99 insertions(+), 49 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 26ebf0bf..3aa2d005 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -5,9 +5,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [1.0.0-rc.5] - 2020-12-21 -### Change -* Respect SIGTERM from docker during shutdown. -* Peer info provided to etcd and memberlist pools is now consistent. +## Changes +* Respect SIGTERM from docker during shutdown +* Peer info provided to etcd and memberlist pools is now consistent +* Fixed a race in getGlobalRateLimit +* Fixed issues with EtcdPool +* Changes in preparation of MultiRegion support testing ### Added * Added GUBER_K8S_WATCH_MECHANISM for k8s deployments. diff --git a/benchmark_test.go b/benchmark_test.go index 77b54132..9c37d48d 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -32,7 +32,7 @@ func BenchmarkServer_GetPeerRateLimitNoBatching(b *testing.B) { } client := guber.NewPeerClient(guber.PeerConfig{ - Info: cluster.GetRandomPeer(), + Info: cluster.GetRandomPeer(cluster.DataCenterNone), Behavior: conf.Behaviors, }) @@ -54,7 +54,7 @@ func BenchmarkServer_GetPeerRateLimitNoBatching(b *testing.B) { } func BenchmarkServer_GetRateLimit(b *testing.B) { - client, err := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil) + client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil) if err != nil { b.Errorf("NewV1Client err: %s", err) } @@ -80,7 +80,7 @@ func BenchmarkServer_GetRateLimit(b *testing.B) { } func BenchmarkServer_Ping(b *testing.B) { - client, err := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil) + client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil) if err != nil { b.Errorf("NewV1Client err: %s", err) } @@ -108,7 +108,7 @@ func BenchmarkServer_Ping(b *testing.B) { }*/ func BenchmarkServer_ThunderingHeard(b *testing.B) { - client, err := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil) + client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil) if err != nil { b.Errorf("NewV1Client err: %s", err) } diff --git a/cluster/cluster.go b/cluster/cluster.go index dfa39985..9d89037e 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -18,6 +18,7 @@ package cluster import ( "context" + "fmt" "math/rand" "github.com/mailgun/gubernator" @@ -26,12 +27,30 @@ import ( "github.com/sirupsen/logrus" ) +const ( + DataCenterNone = "" + DataCenterOne = "datacenter-1" + DataCenterTwo = "datacenter-2" +) + var daemons []*gubernator.Daemon var peers []gubernator.PeerInfo // Returns a random peer from the cluster -func GetRandomPeer() gubernator.PeerInfo { - return peers[rand.Intn(len(peers))] +func GetRandomPeer(dc string) gubernator.PeerInfo { + var local []gubernator.PeerInfo + + for _, p := range peers { + if p.DataCenter == dc { + local = append(local, p) + } + } + + if len(local) == 0 { + panic(fmt.Sprintf("failed to find random peer for dc '%s'", dc)) + } + + return local[rand.Intn(len(local))] } // Returns a list of all peers in the cluster @@ -81,6 +100,7 @@ func StartWith(localPeers []gubernator.PeerInfo) error { Logger: logrus.WithField("instance", peer.GRPCAddress), GRPCListenAddress: peer.GRPCAddress, HTTPListenAddress: peer.HTTPAddress, + DataCenter: peer.DataCenter, Behaviors: gubernator.BehaviorConfig{ // Suitable for testing but not production GlobalSyncWait: clock.Millisecond * 50, @@ -98,6 +118,7 @@ func StartWith(localPeers []gubernator.PeerInfo) error { peers = append(peers, gubernator.PeerInfo{ GRPCAddress: d.GRPCListeners[0].Addr().String(), HTTPAddress: d.HTTPListener.Addr().String(), + DataCenter: peer.DataCenter, }) daemons = append(daemons, d) } diff --git a/functional_test.go b/functional_test.go index bf18235c..46c1c8f9 100644 --- a/functional_test.go +++ b/functional_test.go @@ -38,12 +38,18 @@ import ( // Setup and shutdown the mock gubernator cluster for the entire test suite func TestMain(m *testing.M) { if err := cluster.StartWith([]guber.PeerInfo{ - {GRPCAddress: "127.0.0.1:9990", HTTPAddress: "127.0.0.1:9980"}, - {GRPCAddress: "127.0.0.1:9991", HTTPAddress: "127.0.0.1:9981"}, - {GRPCAddress: "127.0.0.1:9992", HTTPAddress: "127.0.0.1:9982"}, - {GRPCAddress: "127.0.0.1:9993", HTTPAddress: "127.0.0.1:9983"}, - {GRPCAddress: "127.0.0.1:9994", HTTPAddress: "127.0.0.1:9984"}, - {GRPCAddress: "127.0.0.1:9995", HTTPAddress: "127.0.0.1:9985"}, + {GRPCAddress: "127.0.0.1:9990", HTTPAddress: "127.0.0.1:9980", DataCenter: cluster.DataCenterNone}, + {GRPCAddress: "127.0.0.1:9991", HTTPAddress: "127.0.0.1:9981", DataCenter: cluster.DataCenterNone}, + {GRPCAddress: "127.0.0.1:9992", HTTPAddress: "127.0.0.1:9982", DataCenter: cluster.DataCenterNone}, + {GRPCAddress: "127.0.0.1:9993", HTTPAddress: "127.0.0.1:9983", DataCenter: cluster.DataCenterNone}, + {GRPCAddress: "127.0.0.1:9994", HTTPAddress: "127.0.0.1:9984", DataCenter: cluster.DataCenterNone}, + {GRPCAddress: "127.0.0.1:9995", HTTPAddress: "127.0.0.1:9985", DataCenter: cluster.DataCenterNone}, + + // DataCenterOne + {GRPCAddress: "127.0.0.1:9890", HTTPAddress: "127.0.0.1:9880", DataCenter: cluster.DataCenterOne}, + {GRPCAddress: "127.0.0.1:9891", HTTPAddress: "127.0.0.1:9881", DataCenter: cluster.DataCenterOne}, + {GRPCAddress: "127.0.0.1:9892", HTTPAddress: "127.0.0.1:9882", DataCenter: cluster.DataCenterOne}, + {GRPCAddress: "127.0.0.1:9893", HTTPAddress: "127.0.0.1:9883", DataCenter: cluster.DataCenterOne}, }); err != nil { fmt.Println(err) os.Exit(1) @@ -53,7 +59,7 @@ func TestMain(m *testing.M) { } func TestOverTheLimit(t *testing.T) { - client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil) + client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil) require.Nil(t, errs) tests := []struct { @@ -102,61 +108,68 @@ func TestOverTheLimit(t *testing.T) { func TestTokenBucket(t *testing.T) { defer clock.Freeze(clock.Now()).Unfreeze() - client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil) + addr := cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress + client, errs := guber.DialV1Server(addr, nil) require.Nil(t, errs) tests := []struct { + name string Remaining int64 Status guber.Status Sleep clock.Duration }{ { + name: "remaining should be one", Remaining: 1, Status: guber.Status_UNDER_LIMIT, Sleep: clock.Duration(0), }, { + name: "remaining should be zero and under limit", Remaining: 0, Status: guber.Status_UNDER_LIMIT, Sleep: clock.Millisecond * 100, }, { + name: "after waiting 100ms remaining should be 1 and under limit", Remaining: 1, Status: guber.Status_UNDER_LIMIT, Sleep: clock.Duration(0), }, } - for _, test := range tests { - resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{ - Requests: []*guber.RateLimitReq{ - { - Name: "test_token_bucket", - UniqueKey: "account:1234", - Algorithm: guber.Algorithm_TOKEN_BUCKET, - Duration: guber.Millisecond * 5, - Limit: 2, - Hits: 1, + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{ + Requests: []*guber.RateLimitReq{ + { + Name: "test_token_bucket", + UniqueKey: "account:1234", + Algorithm: guber.Algorithm_TOKEN_BUCKET, + Duration: guber.Millisecond * 5, + Limit: 2, + Hits: 1, + }, }, - }, - }) - require.Nil(t, err) + }) + require.Nil(t, err) - rl := resp.Responses[0] + rl := resp.Responses[0] - assert.Empty(t, rl.Error) - assert.Equal(t, test.Status, rl.Status) - assert.Equal(t, test.Remaining, rl.Remaining) - assert.Equal(t, int64(2), rl.Limit) - assert.True(t, rl.ResetTime != 0) - clock.Advance(test.Sleep) + assert.Empty(t, rl.Error) + assert.Equal(t, tt.Status, rl.Status) + assert.Equal(t, tt.Remaining, rl.Remaining) + assert.Equal(t, int64(2), rl.Limit) + assert.True(t, rl.ResetTime != 0) + clock.Advance(tt.Sleep) + }) } } func TestTokenBucketGregorian(t *testing.T) { defer clock.Freeze(clock.Now()).Unfreeze() - client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil) + client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil) require.Nil(t, errs) tests := []struct { @@ -394,7 +407,7 @@ func TestLeakyBucketGregorian(t *testing.T) { } func TestMissingFields(t *testing.T) { - client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil) + client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil) require.Nil(t, errs) tests := []struct { @@ -527,7 +540,7 @@ func TestGlobalRateLimits(t *testing.T) { } func TestChangeLimit(t *testing.T) { - client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil) + client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil) require.Nil(t, errs) tests := []struct { @@ -622,7 +635,7 @@ func TestChangeLimit(t *testing.T) { } func TestResetRemaining(t *testing.T) { - client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil) + client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil) require.Nil(t, errs) tests := []struct { @@ -765,7 +778,7 @@ func TestHealthCheck(t *testing.T) { func TestLeakyBucketDivBug(t *testing.T) { defer clock.Freeze(clock.Now()).Unfreeze() - client, err := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil) + client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil) require.NoError(t, err) resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{ @@ -804,8 +817,18 @@ func TestLeakyBucketDivBug(t *testing.T) { assert.Equal(t, int64(2000), resp.Responses[0].Limit) } +func TestMutliRegion(t *testing.T) { + + // TODO: Queue a rate limit with multi region behavior on the DataCenterNone cluster + // TODO: Check the immediate response is correct + // TODO: Wait until the rate limit count shows up on the DataCenterOne and DataCenterTwo cluster + + // TODO: Increment the counts on the DataCenterTwo and DataCenterOne clusters + // TODO: Wait until both rate limit count show up on all datacenters +} + func TestGRPCGateway(t *testing.T) { - resp, err := http.DefaultClient.Get("http://" + cluster.GetRandomPeer().HTTPAddress + "/v1/HealthCheck") + resp, err := http.DefaultClient.Get("http://" + cluster.GetRandomPeer(cluster.DataCenterNone).HTTPAddress + "/v1/HealthCheck") require.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode) } diff --git a/global.go b/global.go index 0247dbea..15682e8e 100644 --- a/global.go +++ b/global.go @@ -53,8 +53,8 @@ func newGlobalManager(conf BehaviorConfig, instance *V1Instance) *globalManager Name: "gubernator_broadcast_durations", Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001}, }), - asyncQueue: make(chan *RateLimitReq, 0), - broadcastQueue: make(chan *RateLimitReq, 0), + asyncQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit), + broadcastQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit), instance: instance, conf: conf, } diff --git a/gubernator.go b/gubernator.go index 70868271..0531d486 100644 --- a/gubernator.go +++ b/gubernator.go @@ -227,8 +227,11 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G // getGlobalRateLimit handles rate limits that are marked as `Behavior = GLOBAL`. Rate limit responses // are returned from the local cache and the hits are queued to be sent to the owning peer. func (s *V1Instance) getGlobalRateLimit(req *RateLimitReq) (*RateLimitResp, error) { - // Queue the hit for async update - s.global.QueueHit(req) + // Queue the hit for async update after we have prepared our response. + // NOTE: The defer here avoids a race condition where we queue the req to + // be forwarded to the owning peer in a separate goroutine but simultaneously + // access and possibly copy the req in this method. + defer s.global.QueueHit(req) s.conf.Cache.Lock() item, ok := s.conf.Cache.GetItem(req.HashKey()) diff --git a/multiregion.go b/multiregion.go index 6c49c96f..e23fbb2c 100644 --- a/multiregion.go +++ b/multiregion.go @@ -18,7 +18,7 @@ func newMultiRegionManager(conf BehaviorConfig, instance *V1Instance) *mutliRegi conf: conf, instance: instance, log: instance.log, - reqQueue: make(chan *RateLimitReq, 0), + reqQueue: make(chan *RateLimitReq, conf.MultiRegionBatchLimit), } mm.runAsyncReqs() return &mm diff --git a/peer_client_test.go b/peer_client_test.go index b4591464..026b36c8 100644 --- a/peer_client_test.go +++ b/peer_client_test.go @@ -41,7 +41,7 @@ func TestPeerClientShutdown(t *testing.T) { t.Run(c.Name, func(t *testing.T) { client := gubernator.NewPeerClient(gubernator.PeerConfig{ - Info: cluster.GetRandomPeer(), + Info: cluster.GetRandomPeer(cluster.DataCenterNone), Behavior: config, })