Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Merge pull request #83 from mailgun/thrawn/develop
Browse files Browse the repository at this point in the history
Misc improvements found during deployment testing
  • Loading branch information
thrawn01 authored Jan 13, 2021
2 parents 9b918af + 69313d4 commit 34a19c0
Show file tree
Hide file tree
Showing 15 changed files with 181 additions and 166 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.0.0-rc.5] - 2020-12-21
## Changes
* Respect SIGTERM from docker during shutdown
* Peer info provided to etcd and memberlist pools is now consistent
* Fixed a race in getGlobalRateLimit
* Fixed issues with EtcdPool
* Changes in preparation of MultiRegion support testing
### Added
* Added GUBER_K8S_WATCH_MECHANISM for k8s deployments.

## [1.0.0-rc.4] - 2020-12-18
### Change
* Fix leaky bucket algorithm
Expand Down
8 changes: 4 additions & 4 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func BenchmarkServer_GetPeerRateLimitNoBatching(b *testing.B) {
}

client := guber.NewPeerClient(guber.PeerConfig{
Info: cluster.GetRandomPeer(),
Info: cluster.GetRandomPeer(cluster.DataCenterNone),
Behavior: conf.Behaviors,
})

Expand All @@ -54,7 +54,7 @@ func BenchmarkServer_GetPeerRateLimitNoBatching(b *testing.B) {
}

func BenchmarkServer_GetRateLimit(b *testing.B) {
client, err := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil)
client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
if err != nil {
b.Errorf("NewV1Client err: %s", err)
}
Expand All @@ -80,7 +80,7 @@ func BenchmarkServer_GetRateLimit(b *testing.B) {
}

func BenchmarkServer_Ping(b *testing.B) {
client, err := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil)
client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
if err != nil {
b.Errorf("NewV1Client err: %s", err)
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func BenchmarkServer_Ping(b *testing.B) {
}*/

func BenchmarkServer_ThunderingHeard(b *testing.B) {
client, err := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress, nil)
client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
if err != nil {
b.Errorf("NewV1Client err: %s", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ var _ Cache = &LRUCache{}

// New creates a new Cache with a maximum size
func NewLRUCache(maxSize int) *LRUCache {
setter.SetDefault(&maxSize, 50000)
setter.SetDefault(&maxSize, 50_000)

return &LRUCache{
cache: make(map[interface{}]*list.Element),
ll: list.New(),
cacheSize: maxSize,
sizeMetric: prometheus.NewDesc("gubernator_cache_size",
"Size of the LRU Cache which holds the rate limits.", nil, nil),
"The number of items in LRU Cache which holds the rate limits.", nil, nil),
accessMetric: prometheus.NewDesc("gubernator_cache_access_count",
"Cache access counts.", []string{"type"}, nil),
}
Expand Down
25 changes: 23 additions & 2 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cluster

import (
"context"
"fmt"
"math/rand"

"github.com/mailgun/gubernator"
Expand All @@ -26,12 +27,30 @@ import (
"github.com/sirupsen/logrus"
)

const (
DataCenterNone = ""
DataCenterOne = "datacenter-1"
DataCenterTwo = "datacenter-2"
)

var daemons []*gubernator.Daemon
var peers []gubernator.PeerInfo

// Returns a random peer from the cluster
func GetRandomPeer() gubernator.PeerInfo {
return peers[rand.Intn(len(peers))]
func GetRandomPeer(dc string) gubernator.PeerInfo {
var local []gubernator.PeerInfo

for _, p := range peers {
if p.DataCenter == dc {
local = append(local, p)
}
}

if len(local) == 0 {
panic(fmt.Sprintf("failed to find random peer for dc '%s'", dc))
}

return local[rand.Intn(len(local))]
}

// Returns a list of all peers in the cluster
Expand Down Expand Up @@ -81,6 +100,7 @@ func StartWith(localPeers []gubernator.PeerInfo) error {
Logger: logrus.WithField("instance", peer.GRPCAddress),
GRPCListenAddress: peer.GRPCAddress,
HTTPListenAddress: peer.HTTPAddress,
DataCenter: peer.DataCenter,
Behaviors: gubernator.BehaviorConfig{
// Suitable for testing but not production
GlobalSyncWait: clock.Millisecond * 50,
Expand All @@ -98,6 +118,7 @@ func StartWith(localPeers []gubernator.PeerInfo) error {
peers = append(peers, gubernator.PeerInfo{
GRPCAddress: d.GRPCListeners[0].Addr().String(),
HTTPAddress: d.HTTPListener.Addr().String(),
DataCenter: peer.DataCenter,
})
daemons = append(daemons, d)
}
Expand Down
13 changes: 6 additions & 7 deletions cmd/gubernator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"os/signal"
"runtime"
"syscall"

"github.com/mailgun/gubernator"
"github.com/mailgun/holster/v3/clock"
Expand Down Expand Up @@ -62,13 +63,11 @@ func main() {

// Wait here for signals to clean up our mess
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
for sig := range c {
if sig == os.Interrupt {
log.Info("caught interrupt; user requested premature exit")
daemon.Close()
os.Exit(0)
}
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
for range c {
log.Info("caught signal; shutting down")
daemon.Close()
os.Exit(0)
}
}

Expand Down
16 changes: 8 additions & 8 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ func (c *Config) SetDefaults() error {

type PeerInfo struct {
// (Optional) The name of the data center this peer is in. Leave blank if not using multi data center support.
DataCenter string
DataCenter string `json:"data-center"`
// (Optional) The http address:port of the peer
HTTPAddress string
HTTPAddress string `json:"http-address"`
// (Required) The grpc address:port of the peer
GRPCAddress string
GRPCAddress string `json:"grpc-address"`
// (Optional) Is true if PeerInfo is for this instance of gubernator
IsOwner bool
IsOwner bool `json:"is-owner,omitempty"`
}

// Returns the hash key used to identify this peer in the Picker.
Expand Down Expand Up @@ -303,13 +303,13 @@ func SetupDaemonConfig(logger *logrus.Logger, configFile string) (DaemonConfig,
setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.DialTimeout, getEnvDuration(log, "GUBER_ETCD_DIAL_TIMEOUT"), clock.Second*5)
setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.Username, os.Getenv("GUBER_ETCD_USER"))
setter.SetDefault(&conf.EtcdPoolConf.EtcdConfig.Password, os.Getenv("GUBER_ETCD_PASSWORD"))
setter.SetDefault(&conf.EtcdPoolConf.AdvertiseAddress, os.Getenv("GUBER_ETCD_ADVERTISE_ADDRESS"), conf.AdvertiseAddress)
setter.SetDefault(&conf.EtcdPoolConf.DataCenter, os.Getenv("GUBER_ETCD_DATA_CENTER"), conf.DataCenter)
setter.SetDefault(&conf.EtcdPoolConf.Advertise.GRPCAddress, os.Getenv("GUBER_ETCD_ADVERTISE_ADDRESS"), conf.AdvertiseAddress)
setter.SetDefault(&conf.EtcdPoolConf.Advertise.DataCenter, os.Getenv("GUBER_ETCD_DATA_CENTER"), conf.DataCenter)

setter.SetDefault(&conf.MemberListPoolConf.AdvertiseAddress, os.Getenv("GUBER_MEMBERLIST_ADVERTISE_ADDRESS"), conf.AdvertiseAddress)
setter.SetDefault(&conf.MemberListPoolConf.Advertise.GRPCAddress, os.Getenv("GUBER_MEMBERLIST_ADVERTISE_ADDRESS"), conf.AdvertiseAddress)
setter.SetDefault(&conf.MemberListPoolConf.MemberListAddress, os.Getenv("GUBER_MEMBERLIST_ADDRESS"), fmt.Sprintf("%s:7946", advAddr))
setter.SetDefault(&conf.MemberListPoolConf.KnownNodes, getEnvSlice("GUBER_MEMBERLIST_KNOWN_NODES"), []string{})
setter.SetDefault(&conf.MemberListPoolConf.DataCenter, conf.DataCenter)
setter.SetDefault(&conf.MemberListPoolConf.Advertise.DataCenter, conf.DataCenter)

// Kubernetes Config
setter.SetDefault(&conf.K8PoolConf.Namespace, os.Getenv("GUBER_K8S_NAMESPACE"), "default")
Expand Down
55 changes: 19 additions & 36 deletions etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ type EtcdPool struct {
}

type EtcdPoolConfig struct {
// (Required) The address etcd will advertise to other gubernator instances
AdvertiseAddress string
// (Required) This is the peer information that will be advertised to other gubernator instances
Advertise PeerInfo

// (Required) An etcd client currently connected to an etcd cluster
Client *etcd.Client
Expand All @@ -68,17 +68,14 @@ type EtcdPoolConfig struct {

// (Optional) An interface through which logging will occur (Usually *logrus.Entry)
Logger logrus.FieldLogger

// (Optional) The datacenter this instance belongs too
DataCenter string
}

func NewEtcdPool(conf EtcdPoolConfig) (*EtcdPool, error) {
setter.SetDefault(&conf.KeyPrefix, defaultBaseKey)
setter.SetDefault(&conf.Logger, logrus.WithField("category", "gubernator"))

if conf.AdvertiseAddress == "" {
return nil, errors.New("AdvertiseAddress is required")
if conf.Advertise.GRPCAddress == "" {
return nil, errors.New("Advertise.GRPCAddress is required")
}

if conf.Client == nil {
Expand All @@ -93,13 +90,13 @@ func NewEtcdPool(conf EtcdPoolConfig) (*EtcdPool, error) {
conf: conf,
ctx: ctx,
}
return pool, pool.run(conf.AdvertiseAddress)
return pool, pool.run(conf.Advertise)
}

func (e *EtcdPool) run(addr string) error {
func (e *EtcdPool) run(peer PeerInfo) error {

// Register our instance with etcd
if err := e.register(addr); err != nil {
if err := e.register(peer); err != nil {
return err
}

Expand Down Expand Up @@ -150,12 +147,15 @@ func (e *EtcdPool) collectPeers(revision *int64) error {
return errors.Wrapf(err, "while fetching peer listing from '%s'", e.conf.KeyPrefix)
}

peers := make(map[string]PeerInfo)
// Collect all the peers
for _, v := range resp.Kvs {
p := e.unMarshallValue(v.Value)
e.peers[p.GRPCAddress] = p
peers[p.GRPCAddress] = p
}

e.peers = peers
*revision = resp.Header.Revision
e.callOnUpdate()
return nil
}
Expand All @@ -172,6 +172,8 @@ func (e *EtcdPool) unMarshallValue(v []byte) PeerInfo {
}

func (e *EtcdPool) watch() error {
var rev int64

// Initialize watcher
if err := e.watchPeers(); err != nil {
return errors.Wrap(err, "while attempting to start watch")
Expand All @@ -188,23 +190,7 @@ func (e *EtcdPool) watch() error {
e.log.Errorf("watch error: %v", err)
goto restart
}

for _, event := range response.Events {
switch event.Type {
case etcd.EventTypePut:
if event.Kv != nil {
e.log.Debugf("new peer [%s]", string(event.Kv.Value))
p := e.unMarshallValue(event.Kv.Value)
e.peers[p.GRPCAddress] = p
}
case etcd.EventTypeDelete:
if event.PrevKv != nil {
e.log.Debugf("removed peer [%s]", string(event.PrevKv.Value))
delete(e.peers, string(event.PrevKv.Value))
}
}
e.callOnUpdate()
}
e.collectPeers(&rev)
}

restart:
Expand Down Expand Up @@ -233,14 +219,11 @@ func (e *EtcdPool) watch() error {
return nil
}

func (e *EtcdPool) register(name string) error {
instanceKey := e.conf.KeyPrefix + name
e.log.Infof("Registering peer '%s' with etcd", name)
func (e *EtcdPool) register(peer PeerInfo) error {
instanceKey := e.conf.KeyPrefix + peer.GRPCAddress
e.log.Infof("Registering peer '%#v' with etcd", peer)

b, err := json.Marshal(PeerInfo{
GRPCAddress: e.conf.AdvertiseAddress,
DataCenter: e.conf.DataCenter,
})
b, err := json.Marshal(peer)
if err != nil {
return errors.Wrap(err, "while marshalling PeerInfo")
}
Expand Down Expand Up @@ -341,7 +324,7 @@ func (e *EtcdPool) callOnUpdate() {
var peers []PeerInfo

for _, p := range e.peers {
if p.GRPCAddress == e.conf.AdvertiseAddress {
if p.GRPCAddress == e.conf.Advertise.GRPCAddress {
p.IsOwner = true
}
peers = append(peers, p)
Expand Down
Loading

0 comments on commit 34a19c0

Please sign in to comment.