diff --git a/CHANGELOG b/CHANGELOG index decf4de..e9262e2 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.0.0-rc.1] - 2020-10-22 +### Change +* Fix inconsistent tests failing +* Fix GRPC/HTTP Gateway +* Renamed functions to ensure clarity of version + ## [0.9.2] - 2020-10-23 ### Change * ETCD discovery now sets the IsOwner property when updating the peers list. diff --git a/Makefile b/Makefile index 6bb3226..b559589 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ VERSION=$(shell cat version) LDFLAGS="-X main.Version=$(VERSION)" test: - go test ./... -v -race -count=1 + go test ./... -v -race -p=1 -count=1 docker: docker build --build-arg VERSION=$(VERSION) -t thrawn01/gubernator:$(VERSION) . diff --git a/benchmark_test.go b/benchmark_test.go index ac45c96..4968275 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -51,7 +51,7 @@ func BenchmarkServer_GetPeerRateLimitNoBatching(b *testing.B) { } func BenchmarkServer_GetRateLimit(b *testing.B) { - client, err := guber.DialV1Server(cluster.GetRandomPeer().Address) + client, err := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress) if err != nil { b.Errorf("NewV1Client err: %s", err) } @@ -77,7 +77,7 @@ func BenchmarkServer_GetRateLimit(b *testing.B) { } func BenchmarkServer_Ping(b *testing.B) { - client, err := guber.DialV1Server(cluster.GetRandomPeer().Address) + client, err := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress) if err != nil { b.Errorf("NewV1Client err: %s", err) } @@ -105,7 +105,7 @@ func BenchmarkServer_Ping(b *testing.B) { }*/ func BenchmarkServer_ThunderingHeard(b *testing.B) { - client, err := guber.DialV1Server(cluster.GetRandomPeer().Address) + client, err := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress) if err != nil { b.Errorf("NewV1Client err: %s", err) } diff --git a/client.go b/client.go index fbe6aac..7efdce4 100644 --- a/client.go +++ b/client.go @@ -64,7 +64,10 @@ func FromUnixMilliseconds(ts int64) time.Time { // Given a list of peers, return a random peer func RandomPeer(peers []PeerInfo) PeerInfo { - return peers[rand.Intn(len(peers))] + rand.Shuffle(len(peers), func(i, j int) { + peers[i], peers[j] = peers[j], peers[i] + }) + return peers[0] } // Return a random alpha string of 'n' length diff --git a/cluster/cluster.go b/cluster/cluster.go index 03e855e..f58da95 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -17,159 +17,114 @@ limitations under the License. package cluster import ( - "fmt" - "net" + "context" + "math/rand" "time" "github.com/mailgun/gubernator" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "google.golang.org/grpc" ) -type instance struct { - GRPC *grpc.Server - Guber *gubernator.Instance - Address string +type Address struct { + HTTPAddress string + GRPCAddress string } -func (i *instance) Peers() []gubernator.PeerInfo { - var result []gubernator.PeerInfo - for _, peer := range peers { - if peer.Address == i.Address { - peer.IsOwner = true - } - result = append(result, peer) +func (a *Address) PeerInfo() gubernator.PeerInfo { + return gubernator.PeerInfo{ + HTTPAddress: a.GRPCAddress, + GRPCAddress: a.GRPCAddress, } - return result -} - -func (i *instance) Stop() error { - err := i.Guber.Close() - i.GRPC.GracefulStop() - return err } -var instances []*instance +var daemons []*gubernator.Daemon var peers []gubernator.PeerInfo -// Returns default testing configuration -func GetDefaultConfig() gubernator.Config { - return gubernator.Config{ - Behaviors: gubernator.BehaviorConfig{ - GlobalSyncWait: time.Millisecond * 50, // Suitable for testing but not production - GlobalTimeout: time.Second, - MultiRegionSyncWait: time.Millisecond * 50, // Suitable for testing but not production - MultiRegionTimeout: time.Second, - }, - } -} - // Returns a random peer from the cluster func GetRandomPeer() gubernator.PeerInfo { - return gubernator.RandomPeer(peers) + return peers[rand.Intn(len(peers))] } -// Returns a specific peer -func PeerAt(idx int) gubernator.PeerInfo { - return peers[idx] +// Returns a list of all peers in the cluster +func GetPeers() []gubernator.PeerInfo { + return peers } -// Returns a specific instance -func InstanceAt(idx int) *instance { - return instances[idx] +// Returns a list of all deamons in the cluster +func GetDaemons() []*gubernator.Daemon { + return daemons } -// Return the specific instance for a host -func InstanceForHost(host string) *instance { - for i := range instances { - if instances[i].Address == host { - return instances[i] - } - } - return nil +// Returns a specific peer +func PeerAt(idx int) gubernator.PeerInfo { + return peers[idx] } -// Stop an instance without updating peers, used to cause connection errors -func StopInstanceAt(idx int) { - instances[idx].Stop() +// Returns a specific daemon +func DaemonAt(idx int) *gubernator.Daemon { + return daemons[idx] } // Returns the number of instances -func NumOfInstances() int { - return len(instances) +func NumOfDaemons() int { + return len(daemons) } // Start a local cluster of gubernator servers func Start(numInstances int) error { - addresses := make([]string, numInstances, numInstances) - return StartWith(addresses) + peers := make([]gubernator.PeerInfo, numInstances, numInstances) + return StartWith(peers) +} + +func Restart(ctx context.Context) { + for i := 0; i < len(daemons); i++ { + daemons[i].Close() + daemons[i].Start(ctx) + daemons[i].SetPeers(peers) + } } // Start a local cluster with specific addresses -func StartWith(addresses []string) error { - config := GetDefaultConfig() - for _, address := range addresses { - ins, err := StartInstance(address, config) +func StartWith(localPeers []gubernator.PeerInfo) error { + for _, peer := range localPeers { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + d, err := gubernator.SpawnDaemon(ctx, gubernator.DaemonConfig{ + Logger: logrus.WithField("instance", peer.GRPCAddress), + GRPCListenAddress: peer.GRPCAddress, + HTTPListenAddress: peer.HTTPAddress, + Behaviors: gubernator.BehaviorConfig{ + // Suitable for testing but not production + GlobalSyncWait: time.Millisecond * 50, + GlobalTimeout: time.Second * 5, + BatchTimeout: time.Second * 5, + MultiRegionTimeout: time.Second * 5, + }, + }) + cancel() if err != nil { - return errors.Wrapf(err, "while starting instance for addr '%s'", address) + return errors.Wrapf(err, "while starting server for addr '%s'", peer.GRPCAddress) } - // Add the peers and instances to the package level variables - peers = append(peers, gubernator.PeerInfo{Address: ins.Address}) - instances = append(instances, ins) + // Add the peers and daemons to the package level variables + peers = append(peers, gubernator.PeerInfo{ + GRPCAddress: d.GRPCListener.Addr().String(), + HTTPAddress: d.HTTPListener.Addr().String(), + }) + daemons = append(daemons, d) } // Tell each instance about the other peers - for _, ins := range instances { - ins.Guber.SetPeers(ins.Peers()) + for _, d := range daemons { + d.SetPeers(peers) } return nil } func Stop() { - for _, ins := range instances { - ins.Stop() - } -} - -// Start a single instance of gubernator with the provided config and listening address. -// If address is empty string a random port on the loopback device will be chosen. -func StartInstance(address string, conf gubernator.Config) (*instance, error) { - conf.GRPCServer = grpc.NewServer() - - guber, err := gubernator.New(conf) - if err != nil { - return nil, errors.Wrap(err, "while creating new gubernator instance") - } - - listener, err := net.Listen("tcp", address) - if err != nil { - return nil, errors.Wrap(err, "while listening on random interface") + for _, d := range daemons { + d.Close() } - - go func() { - logrus.Infof("Listening on %s", listener.Addr().String()) - if err := conf.GRPCServer.Serve(listener); err != nil { - fmt.Printf("while serving: %s\n", err) - } - }() - - // Wait until the instance responds to connect - for i := 0; i < 10; i++ { - conn, err := net.Dial("tcp", address) - if err != nil { - break - } - conn.Close() - time.Sleep(time.Millisecond * 50) - } - - guber.SetPeers([]gubernator.PeerInfo{{Address: listener.Addr().String(), IsOwner: true}}) - - return &instance{ - Address: listener.Addr().String(), - GRPC: conf.GRPCServer, - Guber: guber, - }, nil + peers = nil + daemons = nil } diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 9658cfe..e44a6e9 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -14,204 +14,60 @@ See the License for the specific language governing permissions and limitations under the License. */ -package cluster +package cluster_test import ( "testing" "github.com/mailgun/gubernator" + "github.com/mailgun/gubernator/cluster" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -func Test_instance_Peers(t *testing.T) { - tests := []struct { - name string - instance *instance - peers []gubernator.PeerInfo - want []gubernator.PeerInfo - }{ - { - name: "Happy path", - instance: &instance{Address: "mailgun.com"}, - peers: []gubernator.PeerInfo{{Address: "mailgun.com"}}, - want: []gubernator.PeerInfo{ - {Address: "mailgun.com", IsOwner: true}, - }, - }, - { - name: "Get multy peers", - instance: &instance{Address: "mailgun.com"}, - peers: []gubernator.PeerInfo{{Address: "localhost:11111"}, {Address: "mailgun.com"}}, - want: []gubernator.PeerInfo{ - {Address: "localhost:11111"}, - {Address: "mailgun.com", IsOwner: true}, - }, - }, - { - name: "No Peers", - instance: &instance{Address: "www.mailgun.com:11111"}, - peers: []gubernator.PeerInfo{}, - want: []gubernator.PeerInfo(nil), - }, - { - name: "Peers are nil", - instance: &instance{Address: "www.mailgun.com:11111"}, - peers: nil, - want: []gubernator.PeerInfo(nil), - }, - { - name: "Owner does not exist", - instance: &instance{Address: "mailgun.com"}, - peers: []gubernator.PeerInfo{{Address: "localhost:11111"}}, - want: []gubernator.PeerInfo{ - {Address: "localhost:11111"}, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - peers = tt.peers - - got := tt.instance.Peers() - - assert.Equal(t, tt.want, got) - }) - } -} - -func TestGetPeer(t *testing.T) { - tests := []struct { - name string - peers []gubernator.PeerInfo - oneOf map[string]bool - }{ - { - name: "Happy path", - peers: []gubernator.PeerInfo{{Address: "mailgun.com"}}, - }, - { - name: "Get one peer from multiple peers", - peers: []gubernator.PeerInfo{{Address: "mailgun.com"}, {Address: "localhost"}, {Address: "test.com"}}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - peers = tt.peers - got := GetRandomPeer() - - assert.Contains(t, peers, got) - }) - } -} - -func TestPeerAt(t *testing.T) { - peers = []gubernator.PeerInfo{{Address: "mailgun.com"}} - - got := PeerAt(0) - want := gubernator.PeerInfo{Address: "mailgun.com"} - - assert.Equal(t, want, got) -} - -func TestInstanceAt(t *testing.T) { - tests := []struct { - name string - instances []*instance - index int - want *instance - }{ - { - name: "Get first instance", - instances: []*instance{ - {Address: "test.com"}, - {Address: "localhost"}, - }, - index: 0, - want: &instance{Address: "test.com"}, - }, - { - name: "Get second instance", - instances: []*instance{ - {Address: "mailgun.com"}, - {Address: "google.com"}, - }, - index: 1, - want: &instance{Address: "google.com"}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - instances = tt.instances - - got := InstanceAt(tt.index) - - assert.Equal(t, tt.want, got) - }) - } -} - func TestStartMultipleInstances(t *testing.T) { - // to be tests independent we need to reset the global variables - instances = nil - peers = nil - - err := Start(2) - assert.Nil(t, err) + err := cluster.Start(2) + require.NoError(t, err) + defer cluster.Stop() - assert.Equal(t, 2, len(instances)) - assert.Equal(t, 2, len(peers)) + assert.Equal(t, 2, len(cluster.GetPeers())) + assert.Equal(t, 2, len(cluster.GetDaemons())) } -func TestStartZeroInstances(t *testing.T) { - // to be tests independent we need to reset the global variables - instances = nil - peers = nil +func TestStartOneInstance(t *testing.T) { + err := cluster.Start(1) + require.NoError(t, err) + defer cluster.Stop() - err := Start(0) - assert.Nil(t, err) - - assert.Equal(t, 0, len(instances)) - assert.Equal(t, 0, len(peers)) + assert.Equal(t, 1, len(cluster.GetPeers())) + assert.Equal(t, 1, len(cluster.GetDaemons())) } -func TestStartMultipleInstancesWithAddresses(t *testing.T) { - // to be tests independent we need to reset the global variables - instances = nil - peers = nil - - addresses := []string{"localhost:11111", "localhost:22222"} - err := StartWith(addresses) - assert.Nil(t, err) - - wantPeers := []gubernator.PeerInfo{{Address: "127.0.0.1:11111"}, {Address: "127.0.0.1:22222"}} - wantInstances := []*instance{ - {Address: "127.0.0.1:11111"}, - {Address: "127.0.0.1:22222"}, +func TestStartMultipleDaemons(t *testing.T) { + peers := []gubernator.PeerInfo{ + {GRPCAddress: "localhost:1111", HTTPAddress: "localhost:1112"}, + {GRPCAddress: "localhost:2222", HTTPAddress: "localhost:2221"}} + err := cluster.StartWith(peers) + require.NoError(t, err) + defer cluster.Stop() + + wantPeers := []gubernator.PeerInfo{ + {GRPCAddress: "127.0.0.1:1111", HTTPAddress: "127.0.0.1:1112"}, + {GRPCAddress: "127.0.0.1:2222", HTTPAddress: "127.0.0.1:2221"}, } - assert.Equal(t, wantPeers, peers) - assert.Equal(t, 2, len(instances)) - assert.Equal(t, wantInstances[0].Address, instances[0].Address) - assert.Equal(t, wantInstances[1].Address, instances[1].Address) + daemons := cluster.GetDaemons() + assert.Equal(t, wantPeers, cluster.GetPeers()) + assert.Equal(t, 2, len(daemons)) + assert.Equal(t, "127.0.0.1:1111", daemons[0].GRPCListener.Addr().String()) + assert.Equal(t, "127.0.0.1:2222", daemons[1].GRPCListener.Addr().String()) + assert.Equal(t, "127.0.0.1:2222", cluster.DaemonAt(1).GRPCListener.Addr().String()) + assert.Equal(t, "127.0.0.1:2222", cluster.PeerAt(1).GRPCAddress) } -func TestStartWithAddressesFail(t *testing.T) { - // to be tests independent we need to reset the global variables - instances = nil - peers = nil - - addresses := []string{"11111"} - err := StartWith(addresses) +func TestStartWithInvalidPeer(t *testing.T) { + err := cluster.StartWith([]gubernator.PeerInfo{{GRPCAddress: "1111"}}) assert.NotNil(t, err) - assert.Nil(t, peers) - assert.Nil(t, instances) -} - -func stringInSlice(a string, list []string) bool { - for _, b := range list { - if b == a { - return true - } - } - return false + assert.Nil(t, cluster.GetPeers()) + assert.Nil(t, cluster.GetDaemons()) } diff --git a/cmd/gubernator-cluster/main.go b/cmd/gubernator-cluster/main.go index 6726eb8..e9394af 100644 --- a/cmd/gubernator-cluster/main.go +++ b/cmd/gubernator-cluster/main.go @@ -21,6 +21,7 @@ import ( "os" "os/signal" + "github.com/mailgun/gubernator" "github.com/mailgun/gubernator/cluster" "github.com/sirupsen/logrus" ) @@ -29,13 +30,13 @@ import ( func main() { logrus.SetLevel(logrus.InfoLevel) // Start a local cluster - err := cluster.StartWith([]string{ - "127.0.0.1:9090", - "127.0.0.1:9091", - "127.0.0.1:9092", - "127.0.0.1:9093", - "127.0.0.1:9094", - "127.0.0.1:9095", + err := cluster.StartWith([]gubernator.PeerInfo{ + {GRPCAddress: "127.0.0.1:9090"}, + {GRPCAddress: "127.0.0.1:9091"}, + {GRPCAddress: "127.0.0.1:9092"}, + {GRPCAddress: "127.0.0.1:9093"}, + {GRPCAddress: "127.0.0.1:9094"}, + {GRPCAddress: "127.0.0.1:9095"}, }) if err != nil { fmt.Println(err) diff --git a/cmd/gubernator/config.go b/cmd/gubernator/config.go index 1967435..a8f777f 100644 --- a/cmd/gubernator/config.go +++ b/cmd/gubernator/config.go @@ -58,7 +58,7 @@ type ServerConfig struct { K8PoolConf gubernator.K8sPoolConfig // Memberlist configuration used to find peers - MemberlistPoolConf gubernator.MemberlistPoolConfig + MemberlistPoolConf gubernator.MemberListPoolConfig // The PeerPicker as selected by `GUBER_PEER_PICKER` Picker gubernator.PeerPicker diff --git a/cmd/gubernator/main.go b/cmd/gubernator/main.go index afb06cb..31b763c 100644 --- a/cmd/gubernator/main.go +++ b/cmd/gubernator/main.go @@ -18,130 +18,39 @@ package main import ( "context" - "net" - "net/http" + "flag" "os" "os/signal" - "strconv" - "strings" + "time" - "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/davecgh/go-spew/spew" "github.com/mailgun/gubernator" - "github.com/mailgun/holster/etcdutil" - "github.com/mailgun/holster/v3/syncutil" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/mailgun/holster/v3/setter" + "github.com/pkg/errors" "github.com/sirupsen/logrus" - "google.golang.org/grpc" + "k8s.io/klog" ) var log = logrus.WithField("category", "server") var Version = "dev-build" func main() { - var wg syncutil.WaitGroup - var conf ServerConfig - var err error + var configFile string + + flags := flag.NewFlagSet("gubernator", flag.ContinueOnError) + flags.StringVar(&configFile, "config", "", "yaml config file") + flags.BoolVar(&gubernator.DebugEnabled, "debug", false, "enable debug") + checkErr(flags.Parse(os.Args[1:]), "while parsing flags") // Read our config from the environment or optional environment config file - conf, err = confFromEnv() + conf, err := confFromFile(configFile) checkErr(err, "while getting config") - // The LRU cache we store rate limits in - cache := gubernator.NewLRUCache(conf.CacheSize) - - // cache also implements prometheus.Collector interface - prometheus.MustRegister(cache) - - // Handler to collect duration and API access metrics for GRPC - statsHandler := gubernator.NewGRPCStatsHandler() - - // New GRPC server - grpcSrv := grpc.NewServer( - grpc.StatsHandler(statsHandler), - grpc.MaxRecvMsgSize(1024*1024)) - - // Registers a new gubernator instance with the GRPC server - guber, err := gubernator.New(gubernator.Config{ - LocalPicker: conf.Picker, - GRPCServer: grpcSrv, - Cache: cache, - DataCenter: conf.DataCenter, - }) - checkErr(err, "while creating new gubernator instance") - - // guber instance also implements prometheus.Collector interface - prometheus.MustRegister(guber) - - // Start serving GRPC Requests - wg.Go(func() { - listener, err := net.Listen("tcp", conf.GRPCListenAddress) - checkErr(err, "while starting GRPC listener") - - log.Infof("Gubernator Listening on %s ...", conf.GRPCListenAddress) - checkErr(grpcSrv.Serve(listener), "while starting GRPC server") - }) - - var pool gubernator.PoolInterface - - if conf.K8PoolConf.Enabled { - // Source our list of peers from kubernetes endpoint API - conf.K8PoolConf.OnUpdate = guber.SetPeers - pool, err = gubernator.NewK8sPool(conf.K8PoolConf) - checkErr(err, "while querying kubernetes API") - - } else if conf.MemberlistPoolConf.Enabled { - gubernatorPort, err := strconv.Atoi(strings.Split(conf.GRPCListenAddress, ":")[1]) - checkErr(err, "while converting gubernator port to int") - - // Register peer on memberlist - pool, err = gubernator.NewMemberlistPool(gubernator.MemberlistPoolConfig{ - AdvertiseAddress: conf.MemberlistPoolConf.AdvertiseAddress, - AdvertisePort: conf.MemberlistPoolConf.AdvertisePort, - KnownNodes: conf.MemberlistPoolConf.KnownNodes, - LoggerOutput: logrus.WithField("category", "memberlist").Writer(), - DataCenter: conf.DataCenter, - GubernatorPort: gubernatorPort, - OnUpdate: guber.SetPeers, - }) - checkErr(err, "while creating memberlist") - - } else { - // Register ourselves with other peers via ETCD - etcdClient, err := etcdutil.NewClient(&conf.EtcdConf) - checkErr(err, "while connecting to etcd") - - pool, err = gubernator.NewEtcdPool(gubernator.EtcdPoolConfig{ - AdvertiseAddress: conf.EtcdAdvertiseAddress, - OnUpdate: guber.SetPeers, - Client: etcdClient, - BaseKey: conf.EtcdKeyPrefix, - }) - checkErr(err, "while registering with ETCD pool") - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Setup an JSON Gateway API for our GRPC methods - gateway := runtime.NewServeMux() - err = gubernator.RegisterV1HandlerFromEndpoint(ctx, gateway, - conf.EtcdAdvertiseAddress, []grpc.DialOption{grpc.WithInsecure()}) - checkErr(err, "while registering GRPC gateway handler") - - // Serve the JSON Gateway and metrics handlers via standard HTTP/1 - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.Handler()) - mux.Handle("/", gateway) - httpSrv := &http.Server{Addr: conf.GRPCListenAddress, Handler: mux} - - wg.Go(func() { - listener, err := net.Listen("tcp", conf.HTTPListenAddress) - checkErr(err, "while starting HTTP listener") - - log.Infof("HTTP Gateway Listening on %s ...", conf.HTTPListenAddress) - checkErr(httpSrv.Serve(listener), "while starting HTTP server") - }) + // Start the server + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + srv, err := gubernator.SpawnDaemon(ctx, conf) + checkErr(err, "while starting server") + cancel() // Wait here for signals to clean up our mess c := make(chan os.Signal, 1) @@ -149,16 +58,94 @@ func main() { for sig := range c { if sig == os.Interrupt { log.Info("caught interrupt; user requested premature exit") - pool.Close() - httpSrv.Shutdown(ctx) - grpcSrv.GracefulStop() - wg.Stop() - statsHandler.Close() + srv.Close() os.Exit(0) } } } +func confFromFile(configFile string) (gubernator.DaemonConfig, error) { + var conf gubernator.DaemonConfig + + // in order to prevent logging to /tmp by k8s.io/client-go + // and other kubernetes related dependencies which are using + // klog (https://github.com/kubernetes/klog), we need to + // initialize klog in the way it prints to stderr only. + klog.InitFlags(nil) + flag.Set("logtostderr", "true") + + if gubernator.DebugEnabled || os.Getenv("GUBER_DEBUG") != "" { + logrus.SetLevel(logrus.DebugLevel) + logrus.Debug("Debug enabled") + gubernator.DebugEnabled = true + } + + if configFile != "" { + log.Infof("Loading env config: %s", configFile) + if err := fromEnvFile(configFile); err != nil { + return conf, err + } + } + + // Main config + setter.SetDefault(&conf.GRPCListenAddress, os.Getenv("GUBER_GRPC_ADDRESS"), "0.0.0.0:81") + setter.SetDefault(&conf.HTTPListenAddress, os.Getenv("GUBER_HTTP_ADDRESS"), "0.0.0.0:80") + setter.SetDefault(&conf.CacheSize, getEnvInteger("GUBER_CACHE_SIZE"), 50000) + + // Behaviors + setter.SetDefault(&conf.Behaviors.BatchTimeout, getEnvDuration("GUBER_BATCH_TIMEOUT")) + setter.SetDefault(&conf.Behaviors.BatchLimit, getEnvInteger("GUBER_BATCH_LIMIT")) + setter.SetDefault(&conf.Behaviors.BatchWait, getEnvDuration("GUBER_BATCH_WAIT")) + + setter.SetDefault(&conf.Behaviors.GlobalTimeout, getEnvDuration("GUBER_GLOBAL_TIMEOUT")) + setter.SetDefault(&conf.Behaviors.GlobalBatchLimit, getEnvInteger("GUBER_GLOBAL_BATCH_LIMIT")) + setter.SetDefault(&conf.Behaviors.GlobalSyncWait, getEnvDuration("GUBER_GLOBAL_SYNC_WAIT")) + + // ETCD Config + setter.SetDefault(&conf.EtcdAdvertiseAddress, os.Getenv("GUBER_ETCD_ADVERTISE_ADDRESS"), "127.0.0.1:81") + setter.SetDefault(&conf.EtcdKeyPrefix, os.Getenv("GUBER_ETCD_KEY_PREFIX"), "/gubernator-peers") + setter.SetDefault(&conf.EtcdConf.Endpoints, getEnvSlice("GUBER_ETCD_ENDPOINTS"), []string{"localhost:2379"}) + setter.SetDefault(&conf.EtcdConf.DialTimeout, getEnvDuration("GUBER_ETCD_DIAL_TIMEOUT"), time.Second*5) + setter.SetDefault(&conf.EtcdConf.Username, os.Getenv("GUBER_ETCD_USER")) + setter.SetDefault(&conf.EtcdConf.Password, os.Getenv("GUBER_ETCD_PASSWORD")) + + // Kubernetes Config + setter.SetDefault(&conf.K8PoolConf.Namespace, os.Getenv("GUBER_K8S_NAMESPACE"), "default") + conf.K8PoolConf.PodIP = os.Getenv("GUBER_K8S_POD_IP") + conf.K8PoolConf.PodPort = os.Getenv("GUBER_K8S_POD_PORT") + conf.K8PoolConf.Selector = os.Getenv("GUBER_K8S_ENDPOINTS_SELECTOR") + + if anyHasPrefix("GUBER_K8S_", os.Environ()) { + logrus.Debug("K8s peer pool config found") + conf.K8PoolConf.Enabled = true + if conf.K8PoolConf.Selector == "" { + return conf, errors.New("when using k8s for peer discovery, you MUST provide a " + + "`GUBER_K8S_ENDPOINTS_SELECTOR` to select the gubernator peers from the endpoints listing") + } + } + + if anyHasPrefix("GUBER_ETCD_", os.Environ()) { + logrus.Debug("ETCD peer pool config found") + if conf.K8PoolConf.Enabled { + return conf, errors.New("refusing to register gubernator peers with both etcd and k8s;" + + " remove either `GUBER_ETCD_*` or `GUBER_K8S_*` variables from the environment") + } + } + + // If env contains any TLS configuration + if anyHasPrefix("GUBER_ETCD_TLS_", os.Environ()) { + if err := setupTLS(&conf.EtcdConf); err != nil { + return conf, err + } + } + + if gubernator.DebugEnabled { + spew.Dump(conf) + } + + return conf, nil +} + func checkErr(err error, msg string) { if err != nil { log.WithError(err).Error(msg) diff --git a/config.go b/config.go index babe974..bcdff6a 100644 --- a/config.go +++ b/config.go @@ -21,6 +21,7 @@ import ( "time" "github.com/mailgun/holster/v3/setter" + "github.com/sirupsen/logrus" "google.golang.org/grpc" ) @@ -57,6 +58,9 @@ type Config struct { // deciding who we should immediately connect too for our local picker. Should remain empty if not // using multi data center support. DataCenter string + + // (Optional) Logger to be used when + Logger logrus.FieldLogger } type BehaviorConfig struct { diff --git a/daemon.go b/daemon.go new file mode 100644 index 0000000..ce8dd24 --- /dev/null +++ b/daemon.go @@ -0,0 +1,293 @@ +/* +Copyright 2018-2020 Technologies Inc + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gubernator + +import ( + "context" + "net" + "net/http" + "strings" + + etcd "github.com/coreos/etcd/clientv3" + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/mailgun/holster/v3/etcdutil" + "github.com/mailgun/holster/v3/setter" + "github.com/mailgun/holster/v3/syncutil" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" + "google.golang.org/grpc" +) + +var DebugEnabled = false + +type DaemonConfig struct { + GRPCListenAddress string + HTTPListenAddress string + AdvertiseAddress string + EtcdKeyPrefix string + CacheSize int + + // Deprecated, use AdvertiseAddress instead + EtcdAdvertiseAddress string + + // Etcd configuration used to find peers + EtcdConf etcd.Config + + // Configure how behaviours behave + Behaviors BehaviorConfig + + // K8s configuration used to find peers inside a K8s cluster + K8PoolConf K8sPoolConfig + + // A Logger from logrus + Logger logrus.FieldLogger +} + +type Daemon struct { + GRPCListener net.Listener + HTTPListener net.Listener + V1Server *V1Instance + + log logrus.FieldLogger + pool PoolInterface + conf DaemonConfig + httpSrv *http.Server + grpcSrv *grpc.Server + wg syncutil.WaitGroup + statsHandler *GRPCStatsHandler + promRegister *prometheus.Registry + gwCancel context.CancelFunc +} + +// SpawnDaemon starts a new gubernator daemon according to the provided DaemonConfig. +// This function will block until the daemon responds to connections as specified +// by GRPCListenAddress and HTTPListenAddress +func SpawnDaemon(ctx context.Context, conf DaemonConfig) (*Daemon, error) { + s := Daemon{ + log: conf.Logger, + conf: conf, + } + setter.SetDefault(&s.log, logrus.WithField("category", "gubernator")) + + // Handle deprecated advertise address + s.conf.AdvertiseAddress = s.conf.EtcdAdvertiseAddress + + if err := s.Start(ctx); err != nil { + return nil, err + } + return &s, nil +} + +func (s *Daemon) Start(ctx context.Context) error { + var err error + + // The LRU cache we store rate limits in + cache := NewLRUCache(s.conf.CacheSize) + + // cache also implements prometheus.Collector interface + s.promRegister = prometheus.NewRegistry() + s.promRegister.Register(cache) + + // Handler to collect duration and API access metrics for GRPC + s.statsHandler = NewGRPCStatsHandler() + s.promRegister.Register(s.statsHandler) + + // New GRPC server + s.grpcSrv = grpc.NewServer( + grpc.StatsHandler(s.statsHandler), + grpc.MaxRecvMsgSize(1024*1024)) + + // Registers a new gubernator instance with the GRPC server + s.V1Server, err = NewV1Instance(Config{ + GRPCServer: s.grpcSrv, + Cache: cache, + Logger: s.log, + }) + if err != nil { + return errors.Wrap(err, "while creating new gubernator instance") + } + + // v1Server instance also implements prometheus.Collector interface + s.promRegister.Register(s.V1Server) + + s.GRPCListener, err = net.Listen("tcp", s.conf.GRPCListenAddress) + if err != nil { + return errors.Wrap(err, "while starting GRPC listener") + } + + // Start serving GRPC Requests + s.wg.Go(func() { + s.log.Infof("GRPC Listening on %s ...", s.conf.GRPCListenAddress) + if err := s.grpcSrv.Serve(s.GRPCListener); err != nil { + s.log.WithError(err).Error("while starting GRPC server") + } + }) + + if s.conf.K8PoolConf.Enabled { + // Source our list of peers from kubernetes endpoint API + s.conf.K8PoolConf.OnUpdate = s.V1Server.SetPeers + s.pool, err = NewK8sPool(s.conf.K8PoolConf) + if err != nil { + return errors.Wrap(err, "while querying kubernetes API") + } + } + + if s.conf.EtcdConf.Endpoints != nil { + // Register ourselves with other peers via ETCD + etcdClient, err := etcdutil.NewClient(&s.conf.EtcdConf) + if err != nil { + return errors.Wrap(err, "while connecting to etcd") + } + + s.pool, err = NewEtcdPool(EtcdPoolConfig{ + AdvertiseAddress: s.conf.AdvertiseAddress, + OnUpdate: s.V1Server.SetPeers, + Client: etcdClient, + BaseKey: s.conf.EtcdKeyPrefix, + }) + if err != nil { + return errors.Wrap(err, "while registering with ETCD API") + } + } + + // Setup an JSON Gateway API for our GRPC methods + gateway := runtime.NewServeMux() + var gwCtx context.Context + gwCtx, s.gwCancel = context.WithCancel(context.Background()) + err = RegisterV1HandlerFromEndpoint(gwCtx, gateway, + s.conf.GRPCListenAddress, []grpc.DialOption{grpc.WithInsecure()}) + if err != nil { + return errors.Wrap(err, "while registering GRPC gateway handler") + } + + // Serve the JSON Gateway and metrics handlers via standard HTTP/1 + mux := http.NewServeMux() + + mux.Handle("/metrics", promhttp.InstrumentMetricHandler( + s.promRegister, promhttp.HandlerFor(s.promRegister, promhttp.HandlerOpts{}), + )) + mux.Handle("/", gateway) + s.httpSrv = &http.Server{Addr: s.conf.HTTPListenAddress, Handler: mux} + + s.HTTPListener, err = net.Listen("tcp", s.conf.HTTPListenAddress) + if err != nil { + return errors.Wrap(err, "while starting HTTP listener") + } + + s.wg.Go(func() { + s.log.Infof("HTTP Gateway Listening on %s ...", s.conf.HTTPListenAddress) + if err := s.httpSrv.Serve(s.HTTPListener); err != nil { + s.log.WithError(err).Error("while starting HTTP server") + } + }) + + // Validate we can reach the GRPC and HTTP endpoints before returning + if err := WaitForConnect(ctx, []string{s.conf.HTTPListenAddress, s.conf.GRPCListenAddress}); err != nil { + return err + } + + return nil +} + +// Close gracefully closes all server connections and listening sockets +func (s *Daemon) Close() { + if s.httpSrv == nil { + return + } + + if s.pool != nil { + s.pool.Close() + } + + s.log.Infof("HTTP Gateway close for %s ...", s.conf.HTTPListenAddress) + s.httpSrv.Shutdown(context.Background()) + s.log.Infof("GRPC close for %s ...", s.conf.GRPCListenAddress) + s.grpcSrv.GracefulStop() + s.wg.Stop() + s.statsHandler.Close() + s.gwCancel() + s.httpSrv = nil + s.grpcSrv = nil +} + +// SetPeers sets the peers for this daemon +func (s *Daemon) SetPeers(in []PeerInfo) { + peers := make([]PeerInfo, len(in)) + copy(peers, in) + + for i, p := range peers { + if s.conf.GRPCListenAddress == p.GRPCAddress { + peers[i].IsOwner = true + } + } + s.V1Server.SetPeers(peers) +} + +// Config returns the current config for this Daemon +func (s *Daemon) Config() DaemonConfig { + return s.conf +} + +// Peers returns the peers this daemon knows about +func (s *Daemon) Peers() []PeerInfo { + var peers []PeerInfo + for _, client := range s.V1Server.GetPeerList() { + peers = append(peers, client.PeerInfo()) + } + return peers +} + +// WaitForConnect returns nil if the list of addresses is listening for connections; will block until context is cancelled. +func WaitForConnect(ctx context.Context, addresses []string) error { + var d net.Dialer + var errs []error + for { + errs = nil + for _, addr := range addresses { + if addr == "" { + continue + } + + conn, err := d.DialContext(ctx, "tcp", addr) + if err != nil { + errs = append(errs, err) + continue + } + conn.Close() + } + + if len(errs) == 0 { + break + } + + select { + case <-ctx.Done(): + return ctx.Err() + } + } + + if len(errs) != 0 { + var errStrings []string + for _, err := range errs { + errStrings = append(errStrings, err.Error()) + } + return errors.New(strings.Join(errStrings, "\n")) + } + return nil +} diff --git a/etcd.go b/etcd.go index b7d24f9..0e85ecd 100644 --- a/etcd.go +++ b/etcd.go @@ -28,20 +28,19 @@ import ( ) type PeerInfo struct { - // (Optional) The name of the data center this peer is in. Leave blank if not using multi data center support. DataCenter string - - // (Required) The IP address of the peer which will field peer requests - Address string - + // (Optional) The http address:port of the peer + HTTPAddress string + // (Required) The grpc address:port of the peer + GRPCAddress string // (Optional) Is true if PeerInfo is for this instance of gubernator IsOwner bool } // Returns the hash key used to identify this peer in the Picker. func (p PeerInfo) HashKey() string { - return p.Address + return p.GRPCAddress } type UpdateFunc func([]PeerInfo) diff --git a/functional_test.go b/functional_test.go index 04f2842..92cf64e 100644 --- a/functional_test.go +++ b/functional_test.go @@ -19,12 +19,14 @@ package gubernator_test import ( "context" "fmt" + "net/http" "os" "testing" "time" guber "github.com/mailgun/gubernator" "github.com/mailgun/gubernator/cluster" + "github.com/mailgun/holster/v3/testutil" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" @@ -33,13 +35,13 @@ import ( // Setup and shutdown the mock gubernator cluster for the entire test suite func TestMain(m *testing.M) { - if err := cluster.StartWith([]string{ - "127.0.0.1:9990", - "127.0.0.1:9991", - "127.0.0.1:9992", - "127.0.0.1:9993", - "127.0.0.1:9994", - "127.0.0.1:9995", + if err := cluster.StartWith([]guber.PeerInfo{ + {GRPCAddress: "127.0.0.1:9990", HTTPAddress: "127.0.0.1:9980"}, + {GRPCAddress: "127.0.0.1:9991", HTTPAddress: "127.0.0.1:9981"}, + {GRPCAddress: "127.0.0.1:9992", HTTPAddress: "127.0.0.1:9982"}, + {GRPCAddress: "127.0.0.1:9993", HTTPAddress: "127.0.0.1:9983"}, + {GRPCAddress: "127.0.0.1:9994", HTTPAddress: "127.0.0.1:9984"}, + {GRPCAddress: "127.0.0.1:9995", HTTPAddress: "127.0.0.1:9985"}, }); err != nil { fmt.Println(err) os.Exit(1) @@ -49,7 +51,7 @@ func TestMain(m *testing.M) { } func TestOverTheLimit(t *testing.T) { - client, errs := guber.DialV1Server(cluster.GetRandomPeer().Address) + client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress) require.Nil(t, errs) tests := []struct { @@ -96,7 +98,7 @@ func TestOverTheLimit(t *testing.T) { } func TestTokenBucket(t *testing.T) { - client, errs := guber.DialV1Server(cluster.GetRandomPeer().Address) + client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress) require.Nil(t, errs) tests := []struct { @@ -148,7 +150,7 @@ func TestTokenBucket(t *testing.T) { } func TestLeakyBucket(t *testing.T) { - client, errs := guber.DialV1Server(cluster.GetRandomPeer().Address) + client, errs := guber.DialV1Server(cluster.PeerAt(0).GRPCAddress) require.Nil(t, errs) tests := []struct { @@ -209,7 +211,7 @@ func TestLeakyBucket(t *testing.T) { } func TestMissingFields(t *testing.T) { - client, errs := guber.DialV1Server(cluster.GetRandomPeer().Address) + client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress) require.Nil(t, errs) tests := []struct { @@ -272,15 +274,13 @@ func TestMissingFields(t *testing.T) { } func TestGlobalRateLimits(t *testing.T) { - const clientInstance = 1 - peer := cluster.PeerAt(clientInstance) - client, errs := guber.DialV1Server(peer.Address) - require.Nil(t, errs) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() + peer := cluster.PeerAt(0).GRPCAddress + client, errs := guber.DialV1Server(peer) + require.NoError(t, errs) sendHit := func(status guber.Status, remain int64, i int) string { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ Requests: []*guber.RateLimitReq{ { @@ -294,7 +294,7 @@ func TestGlobalRateLimits(t *testing.T) { }, }, }) - require.Nil(t, err, i) + require.NoError(t, err, i) assert.Equal(t, "", resp.Responses[0].Error, i) assert.Equal(t, status, resp.Responses[0].Status, i) assert.Equal(t, remain, resp.Responses[0].Remaining, i) @@ -316,36 +316,31 @@ func TestGlobalRateLimits(t *testing.T) { // Our second should be processed as if we own it since the async forward hasn't occurred yet sendHit(guber.Status_UNDER_LIMIT, 3, 2) - time.Sleep(time.Second) - - // After sleeping this response should be from the updated async call from our owner. Notice the - // remaining is still 3 as the hit is queued for update to the owner - canonicalHost := sendHit(guber.Status_UNDER_LIMIT, 3, 3) - - canonicalInstance := cluster.InstanceForHost(canonicalHost) - - // Inspect our metrics, ensure they collected the counts we expected during this test - instance := cluster.InstanceForHost(peer.Address) - - metricCh := make(chan prometheus.Metric, 5) - instance.Guber.Collect(metricCh) - - buf := dto.Metric{} - m := <-metricCh // Async metric - assert.Nil(t, m.Write(&buf)) - assert.Equal(t, uint64(1), *buf.Histogram.SampleCount) - - metricCh = make(chan prometheus.Metric, 5) - canonicalInstance.Guber.Collect(metricCh) - - m = <-metricCh // Async metric - m = <-metricCh // Broadcast metric - assert.Nil(t, m.Write(&buf)) - assert.Equal(t, uint64(1), *buf.Histogram.SampleCount) + testutil.UntilPass(t, 10, time.Millisecond*200, func(t testutil.TestingT) { + // Inspect our metrics, ensure they collected the counts we expected during this test + d := cluster.DaemonAt(0) + metricCh := make(chan prometheus.Metric, 5) + d.V1Server.Collect(metricCh) + + buf := dto.Metric{} + m := <-metricCh // Async metric + assert.Nil(t, m.Write(&buf)) + assert.Equal(t, uint64(2), *buf.Histogram.SampleCount) + + // V1Instance 3 should be the owner of our global rate limit + d = cluster.DaemonAt(3) + metricCh = make(chan prometheus.Metric, 5) + d.V1Server.Collect(metricCh) + + m = <-metricCh // Async metric + m = <-metricCh // Broadcast metric + assert.Nil(t, m.Write(&buf)) + assert.Equal(t, uint64(2), *buf.Histogram.SampleCount) + }) } func TestChangeLimit(t *testing.T) { - client, errs := guber.DialV1Server(cluster.GetRandomPeer().Address) + client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress) require.Nil(t, errs) tests := []struct { @@ -440,7 +435,7 @@ func TestChangeLimit(t *testing.T) { } func TestResetRemaining(t *testing.T) { - client, errs := guber.DialV1Server(cluster.GetRandomPeer().Address) + client, errs := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress) require.Nil(t, errs) tests := []struct { @@ -512,14 +507,14 @@ func TestResetRemaining(t *testing.T) { } func TestHealthCheck(t *testing.T) { - client, errs := guber.DialV1Server(cluster.InstanceAt(0).Address) - require.Nil(t, errs) + client, err := guber.DialV1Server(cluster.DaemonAt(0).GRPCListener.Addr().String()) + require.NoError(t, err) // Check that the cluster is healthy to start with healthResp, err := client.HealthCheck(context.Background(), &guber.HealthCheckReq{}) - require.Nil(t, err) + require.NoError(t, err) - assert.Equal(t, "healthy", healthResp.GetStatus()) + require.Equal(t, "healthy", healthResp.GetStatus()) // Create a global rate limit that will need to be sent to all peers in the cluster _, err = client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{ @@ -528,7 +523,7 @@ func TestHealthCheck(t *testing.T) { Name: "test_health_check", UniqueKey: "account:12345", Algorithm: guber.Algorithm_TOKEN_BUCKET, - Behavior: guber.Behavior_GLOBAL, + Behavior: guber.Behavior_BATCHING, Duration: guber.Second * 3, Hits: 1, Limit: 5, @@ -538,13 +533,14 @@ func TestHealthCheck(t *testing.T) { require.Nil(t, err) // Stop the rest of the cluster to ensure errors occur on our instance and - // collect addresses to restart the stopped instances after the test completes - var addresses []string - for i := 1; i < cluster.NumOfInstances(); i++ { - addresses = append(addresses, cluster.InstanceAt(i).Address) - cluster.StopInstanceAt(i) + // collect daemons to restart the stopped peers after the test completes + var daemons []*guber.Daemon + for i := 1; i < cluster.NumOfDaemons(); i++ { + d := cluster.DaemonAt(i) + require.NotNil(t, d) + d.Close() + daemons = append(daemons, d) } - time.Sleep(time.Second) // Hit the global rate limit again this time causing a connection error _, err = client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{ @@ -562,28 +558,33 @@ func TestHealthCheck(t *testing.T) { }) require.Nil(t, err) - // Check the health again to get back the connection error - healthResp, err = client.HealthCheck(context.Background(), &guber.HealthCheckReq{}) - require.Nil(t, err) + testutil.UntilPass(t, 20, time.Millisecond*300, func(t testutil.TestingT) { + // Check the health again to get back the connection error + healthResp, err = client.HealthCheck(context.Background(), &guber.HealthCheckReq{}) + if assert.Nil(t, err) { + return + } - assert.Equal(t, "unhealthy", healthResp.GetStatus()) - assert.Contains(t, healthResp.GetMessage(), "connect: connection refused") + assert.Equal(t, "unhealthy", healthResp.GetStatus()) + assert.Contains(t, healthResp.GetMessage(), "connect: connection refused") + }) // Restart stopped instances - for i := 1; i < cluster.NumOfInstances(); i++ { - cluster.StartInstance(addresses[i-1], cluster.GetDefaultConfig()) - } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + defer cancel() + cluster.Restart(ctx) + } func TestLeakyBucketDivBug(t *testing.T) { - client, errs := guber.DialV1Server(cluster.GetRandomPeer().Address) - require.Nil(t, errs) + client, err := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress) + require.NoError(t, err) resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{ Requests: []*guber.RateLimitReq{ { - Name: "test_leaky_bucket", - UniqueKey: "account:1234", + Name: "test_leaky_bucket_div", + UniqueKey: "account:12345", Algorithm: guber.Algorithm_LEAKY_BUCKET, Duration: guber.Millisecond * 1000, Hits: 1, @@ -591,17 +592,18 @@ func TestLeakyBucketDivBug(t *testing.T) { }, }, }) + require.NoError(t, err) + assert.Equal(t, "", resp.Responses[0].Error) assert.Equal(t, guber.Status_UNDER_LIMIT, resp.Responses[0].Status) assert.Equal(t, int64(1999), resp.Responses[0].Remaining) assert.Equal(t, int64(2000), resp.Responses[0].Limit) - require.Nil(t, err) // Should result in a rate of 0.5 resp, err = client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{ Requests: []*guber.RateLimitReq{ { - Name: "test_leaky_bucket", - UniqueKey: "account:1234", + Name: "test_leaky_bucket_div", + UniqueKey: "account:12345", Algorithm: guber.Algorithm_LEAKY_BUCKET, Duration: guber.Millisecond * 1000, Hits: 100, @@ -609,9 +611,20 @@ func TestLeakyBucketDivBug(t *testing.T) { }, }, }) - require.Nil(t, err) + require.NoError(t, err) assert.Equal(t, int64(1900), resp.Responses[0].Remaining) assert.Equal(t, int64(2000), resp.Responses[0].Limit) } +func TestGRPCGateway(t *testing.T) { + resp, err := http.DefaultClient.Get("http://" + cluster.GetRandomPeer().HTTPAddress + "/v1/HealthCheck") + require.NoError(t, err) + + assert.Equal(t, http.StatusOK, resp.StatusCode) + // TODO: Test /cmd/gubernator + // TODO: Fix GRPC Gateway + // TODO: Update docker image + // TODO: Update docker-compose +} + // TODO: Add a test for sending no rate limits RateLimitReqList.RateLimits = nil diff --git a/global.go b/global.go index 11eb99d..3788c06 100644 --- a/global.go +++ b/global.go @@ -32,16 +32,16 @@ type globalManager struct { broadcastQueue chan *RateLimitReq wg syncutil.WaitGroup conf BehaviorConfig - log *logrus.Entry - instance *Instance + log logrus.FieldLogger + instance *V1Instance asyncMetrics prometheus.Histogram broadcastMetrics prometheus.Histogram } -func newGlobalManager(conf BehaviorConfig, instance *Instance) *globalManager { +func newGlobalManager(conf BehaviorConfig, instance *V1Instance) *globalManager { gm := globalManager{ - log: log.WithField("category", "global-manager"), + log: instance.log, asyncMetrics: prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "async_durations", Help: "The duration of GLOBAL async sends in seconds.", @@ -129,11 +129,11 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { continue } - p, ok := peerRequests[peer.info.Address] + p, ok := peerRequests[peer.info.GRPCAddress] if ok { p.req.Requests = append(p.req.Requests, r) } else { - peerRequests[peer.info.Address] = &pair{ + peerRequests[peer.info.GRPCAddress] = &pair{ client: peer, req: GetPeerRateLimitsReq{Requests: []*RateLimitReq{r}}, } @@ -148,7 +148,7 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { if err != nil { gm.log.WithError(err). - Errorf("error sending global hits to '%s'", p.client.info.Address) + Errorf("error sending global hits to '%s'", p.client.info.GRPCAddress) continue } } @@ -167,7 +167,7 @@ func (gm *globalManager) runBroadcasts() { // Send the hits if we reached our batch limit if len(updates) == gm.conf.GlobalBatchLimit { - gm.updatePeers(updates) + gm.broadcastPeers(updates) updates = make(map[string]*RateLimitReq) return true } @@ -180,7 +180,7 @@ func (gm *globalManager) runBroadcasts() { case <-interval.C: if len(updates) != 0 { - gm.updatePeers(updates) + gm.broadcastPeers(updates) updates = make(map[string]*RateLimitReq) } case <-done: @@ -190,8 +190,8 @@ func (gm *globalManager) runBroadcasts() { }) } -// updatePeers broadcasts global rate limit statuses to all other peers -func (gm *globalManager) updatePeers(updates map[string]*RateLimitReq) { +// broadcastPeers broadcasts global rate limit statuses to all other peers +func (gm *globalManager) broadcastPeers(updates map[string]*RateLimitReq) { var req UpdatePeerGlobalsReq start := time.Now() @@ -205,7 +205,7 @@ func (gm *globalManager) updatePeers(updates map[string]*RateLimitReq) { status, err := gm.instance.getRateLimit(&rl) if err != nil { - gm.log.WithError(err).Errorf("while sending global updates to peers for: '%s'", rl.HashKey()) + gm.log.WithError(err).Errorf("while broadcasting update to peers for: '%s'", rl.HashKey()) continue } // Build an UpdatePeerGlobalsReq @@ -229,7 +229,7 @@ func (gm *globalManager) updatePeers(updates map[string]*RateLimitReq) { if err != nil { // Skip peers that are not in a ready state if !IsNotReady(err) { - gm.log.WithError(err).Errorf("error sending global updates to '%s'", peer.info.Address) + gm.log.WithError(err).Errorf("while broadcasting global updates to '%s'", peer.info.GRPCAddress) } continue } diff --git a/go.mod b/go.mod index 0a73095..554ff3a 100644 --- a/go.mod +++ b/go.mod @@ -7,11 +7,11 @@ require ( github.com/golang/protobuf v1.3.2 github.com/grpc-ecosystem/grpc-gateway v1.11.1 github.com/hashicorp/memberlist v0.2.0 - github.com/mailgun/holster v3.0.0+incompatible - github.com/mailgun/holster/v3 v3.8.1 - github.com/pkg/errors v0.8.1 + github.com/mailgun/holster/v3 v3.14.0 + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.1.0 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 + github.com/prometheus/common v0.6.0 github.com/segmentio/fasthash v1.0.2 github.com/sirupsen/logrus v1.4.2 github.com/stretchr/testify v1.4.0 diff --git a/go.sum b/go.sum index 9ff515d..61fdd2d 100644 --- a/go.sum +++ b/go.sum @@ -131,6 +131,8 @@ github.com/mailgun/holster v3.0.0+incompatible h1:bpt8ZCwLBrzjqfBZ5mobNb2NjesNeD github.com/mailgun/holster v3.0.0+incompatible/go.mod h1:crzolGx27RP/IBT/BnPQiYBB9igmAFHGRrz0zlMP0b0= github.com/mailgun/holster/v3 v3.8.1 h1:Z9D3F1ShnxGUlofougjSht08OpIiQKtryBjExB+uz9Q= github.com/mailgun/holster/v3 v3.8.1/go.mod h1:rNcFlhMTxFDa1dnQC4sUqI71IpAa9/aPeU6w8IGF3aQ= +github.com/mailgun/holster/v3 v3.14.0 h1:SgDJqxLiHFpOQ5YIn00zZQo79k142DAFvAgVlFKiUvQ= +github.com/mailgun/holster/v3 v3.14.0/go.mod h1:K8liVWqma64dBz3wY3YOf+biRYTZOEESaizryvqJYnI= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.1.26 h1:gPxPSwALAeHJSjarOs00QjVdV9QoBvc1D2ujQUr5BzU= @@ -151,6 +153,8 @@ github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= diff --git a/prometheus.go b/grpc_stats.go similarity index 78% rename from prometheus.go rename to grpc_stats.go index ccccda0..ba4f8ae 100644 --- a/prometheus.go +++ b/grpc_stats.go @@ -22,7 +22,6 @@ import ( "github.com/mailgun/holster/v3/syncutil" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/grpc/stats" ) @@ -39,22 +38,21 @@ var statsContextKey = contextKey{} // Implements the Prometheus collector interface. Such that when the /metrics handler is // called this collector pulls all the stats from -type Collector struct { +type GRPCStatsHandler struct { reqCh chan *GRPCStats wg syncutil.WaitGroup - // Metrics collectors grpcRequestCount *prometheus.CounterVec grpcRequestDuration *prometheus.HistogramVec } -func NewGRPCStatsHandler() *Collector { - c := &Collector{ - grpcRequestCount: promauto.NewCounterVec(prometheus.CounterOpts{ +func NewGRPCStatsHandler() *GRPCStatsHandler { + c := &GRPCStatsHandler{ + grpcRequestCount: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "grpc_request_counts", Help: "GRPC requests by status."}, []string{"status", "method"}), - grpcRequestDuration: promauto.NewHistogramVec(prometheus.HistogramOpts{ + grpcRequestDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "grpc_request_duration_milliseconds", Help: "GRPC request durations in milliseconds.", }, []string{"method"}), @@ -63,7 +61,7 @@ func NewGRPCStatsHandler() *Collector { return c } -func (c *Collector) run() { +func (c *GRPCStatsHandler) run() { c.reqCh = make(chan *GRPCStats, 10000) c.wg.Until(func(done chan struct{}) bool { @@ -98,11 +96,21 @@ func (c *Collector) run() { }) } -func (c *Collector) Close() { +func (c *GRPCStatsHandler) Describe(ch chan<- *prometheus.Desc) { + c.grpcRequestCount.Describe(ch) + c.grpcRequestDuration.Describe(ch) +} + +func (c *GRPCStatsHandler) Collect(ch chan<- prometheus.Metric) { + c.grpcRequestCount.Collect(ch) + c.grpcRequestDuration.Collect(ch) +} + +func (c *GRPCStatsHandler) Close() { c.wg.Stop() } -func (c *Collector) HandleRPC(ctx context.Context, s stats.RPCStats) { +func (c *GRPCStatsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) { rs := StatsFromContext(ctx) if rs == nil { return @@ -127,13 +135,13 @@ func (c *Collector) HandleRPC(ctx context.Context, s stats.RPCStats) { } } -func (c *Collector) HandleConn(ctx context.Context, s stats.ConnStats) {} +func (c *GRPCStatsHandler) HandleConn(ctx context.Context, s stats.ConnStats) {} -func (c *Collector) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { +func (c *GRPCStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { return ctx } -func (c *Collector) TagRPC(ctx context.Context, tagInfo *stats.RPCTagInfo) context.Context { +func (c *GRPCStatsHandler) TagRPC(ctx context.Context, tagInfo *stats.RPCTagInfo) context.Context { return ContextWithStats(ctx, &GRPCStats{Method: tagInfo.FullMethodName}) } diff --git a/gubernator.go b/gubernator.go index e566766..bc112c3 100644 --- a/gubernator.go +++ b/gubernator.go @@ -19,6 +19,7 @@ package gubernator import ( "context" "fmt" + "github.com/mailgun/holster/v3/setter" "strings" "sync" @@ -36,30 +37,32 @@ const ( UnHealthy = "unhealthy" ) -var log *logrus.Entry - -type Instance struct { +type V1Instance struct { health HealthCheckResp global *globalManager mutliRegion *mutliRegionManager peerMutex sync.RWMutex + log logrus.FieldLogger conf Config isClosed bool } -func New(conf Config) (*Instance, error) { +// NewV1Instance instantiate a single instance of a gubernator peer and registers this +// instance with the provided GRPCServer. +func NewV1Instance(conf Config) (*V1Instance, error) { if conf.GRPCServer == nil { return nil, errors.New("GRPCServer instance is required") } - log = logrus.WithField("category", "gubernator") if err := conf.SetDefaults(); err != nil { return nil, err } - s := Instance{ + s := V1Instance{ + log: conf.Logger, conf: conf, } + setter.SetDefault(&s.log, logrus.WithField("category", "gubernator")) s.global = newGlobalManager(conf.Behaviors, &s) s.mutliRegion = newMultiRegionManager(conf.Behaviors, &s) @@ -83,7 +86,7 @@ func New(conf Config) (*Instance, error) { return &s, nil } -func (s *Instance) Close() error { +func (s *V1Instance) Close() error { if s.isClosed { return nil } @@ -106,7 +109,7 @@ func (s *Instance) Close() error { // GetRateLimits is the public interface used by clients to request rate limits from the system. If the // rate limit `Name` and `UniqueKey` is not owned by this instance then we forward the request to the // peer that does. -func (s *Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*GetRateLimitsResp, error) { +func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*GetRateLimitsResp, error) { var resp GetRateLimitsResp if len(r.Requests) > maxBatchSize { @@ -181,7 +184,7 @@ func (s *Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*Get } // Inform the client of the owner key of the key - inOut.Out.Metadata = map[string]string{"owner": peer.info.Address} + inOut.Out.Metadata = map[string]string{"owner": peer.info.GRPCAddress} out <- inOut return nil @@ -200,7 +203,7 @@ func (s *Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*Get } // Inform the client of the owner key of the key - inOut.Out.Metadata = map[string]string{"owner": peer.info.Address} + inOut.Out.Metadata = map[string]string{"owner": peer.info.GRPCAddress} } out <- inOut @@ -222,7 +225,7 @@ func (s *Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*Get // getGlobalRateLimit handles rate limits that are marked as `Behavior = GLOBAL`. Rate limit responses // are returned from the local cache and the hits are queued to be sent to the owning peer. -func (s *Instance) getGlobalRateLimit(req *RateLimitReq) (*RateLimitResp, error) { +func (s *V1Instance) getGlobalRateLimit(req *RateLimitReq) (*RateLimitResp, error) { // Queue the hit for async update s.global.QueueHit(req) @@ -247,7 +250,7 @@ func (s *Instance) getGlobalRateLimit(req *RateLimitReq) (*RateLimitResp, error) // UpdatePeerGlobals updates the local cache with a list of global rate limits. This method should only // be called by a peer who is the owner of a global rate limit. -func (s *Instance) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobalsReq) (*UpdatePeerGlobalsResp, error) { +func (s *V1Instance) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobalsReq) (*UpdatePeerGlobalsResp, error) { s.conf.Cache.Lock() defer s.conf.Cache.Unlock() @@ -263,7 +266,7 @@ func (s *Instance) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobalsRe } // GetPeerRateLimits is called by other peers to get the rate limits owned by this peer. -func (s *Instance) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimitsReq) (*GetPeerRateLimitsResp, error) { +func (s *V1Instance) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimitsReq) (*GetPeerRateLimitsResp, error) { var resp GetPeerRateLimitsResp if len(r.Requests) > maxBatchSize { @@ -283,7 +286,7 @@ func (s *Instance) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimitsRe } // HealthCheck Returns the health of our instance. -func (s *Instance) HealthCheck(ctx context.Context, r *HealthCheckReq) (*HealthCheckResp, error) { +func (s *V1Instance) HealthCheck(ctx context.Context, r *HealthCheckReq) (*HealthCheckResp, error) { var errs []string s.peerMutex.RLock() @@ -323,7 +326,7 @@ func (s *Instance) HealthCheck(ctx context.Context, r *HealthCheckReq) (*HealthC return &s.health, nil } -func (s *Instance) getRateLimit(r *RateLimitReq) (*RateLimitResp, error) { +func (s *V1Instance) getRateLimit(r *RateLimitReq) (*RateLimitResp, error) { s.conf.Cache.Lock() defer s.conf.Cache.Unlock() @@ -345,7 +348,7 @@ func (s *Instance) getRateLimit(r *RateLimitReq) (*RateLimitResp, error) { } // SetPeers is called by the implementor to indicate the pool of peers has changed -func (s *Instance) SetPeers(peerInfo []PeerInfo) { +func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { localPicker := s.conf.LocalPicker.New() regionPicker := s.conf.RegionPicker.New() @@ -403,7 +406,7 @@ func (s *Instance) SetPeers(peerInfo []PeerInfo) { pc := obj.(*PeerClient) err := pc.Shutdown(ctx) if err != nil { - log.WithError(err).WithField("peer", pc).Error("while shutting down peer") + s.log.WithError(err).WithField("peer", pc).Error("while shutting down peer") } return nil }, p) @@ -411,12 +414,12 @@ func (s *Instance) SetPeers(peerInfo []PeerInfo) { wg.Wait() if len(shutdownPeers) > 0 { - log.WithField("peers", shutdownPeers).Info("Peers shutdown") + s.log.WithField("peers", shutdownPeers).Info("Peers shutdown") } } // GetPeers returns a peer client for the hash key provided -func (s *Instance) GetPeer(key string) (*PeerClient, error) { +func (s *V1Instance) GetPeer(key string) (*PeerClient, error) { s.peerMutex.RLock() peer, err := s.conf.LocalPicker.Get(key) if err != nil { @@ -427,26 +430,26 @@ func (s *Instance) GetPeer(key string) (*PeerClient, error) { return peer, nil } -func (s *Instance) GetPeerList() []*PeerClient { +func (s *V1Instance) GetPeerList() []*PeerClient { s.peerMutex.RLock() defer s.peerMutex.RUnlock() return s.conf.LocalPicker.Peers() } -func (s *Instance) GetRegionPickers() map[string]PeerPicker { +func (s *V1Instance) GetRegionPickers() map[string]PeerPicker { s.peerMutex.RLock() defer s.peerMutex.RUnlock() return s.conf.RegionPicker.Pickers() } // Describe fetches prometheus metrics to be registered -func (s *Instance) Describe(ch chan<- *prometheus.Desc) { +func (s *V1Instance) Describe(ch chan<- *prometheus.Desc) { ch <- s.global.asyncMetrics.Desc() ch <- s.global.broadcastMetrics.Desc() } // Collect fetches metrics from the server for use by prometheus -func (s *Instance) Collect(ch chan<- prometheus.Metric) { +func (s *V1Instance) Collect(ch chan<- prometheus.Metric) { ch <- s.global.asyncMetrics ch <- s.global.broadcastMetrics } diff --git a/hash_test.go b/hash_test.go index 02d26c6..8970521 100644 --- a/hash_test.go +++ b/hash_test.go @@ -23,14 +23,14 @@ func TestConsistantHash(t *testing.T) { } hash := NewConsistantHash(nil) for _, h := range hosts { - hash.Add(&PeerClient{info: PeerInfo{Address: h}}) + hash.Add(&PeerClient{info: PeerInfo{GRPCAddress: h}}) } for input, addr := range cases { t.Run(input, func(t *testing.T) { peer, err := hash.Get(input) assert.Nil(t, err) - assert.Equal(t, addr, peer.info.Address) + assert.Equal(t, addr, peer.info.GRPCAddress) }) } @@ -40,7 +40,7 @@ func TestConsistantHash(t *testing.T) { hash := NewConsistantHash(nil) for _, h := range hosts { - hash.Add(&PeerClient{info: PeerInfo{Address: h}}) + hash.Add(&PeerClient{info: PeerInfo{GRPCAddress: h}}) } assert.Equal(t, len(hosts), hash.Size()) @@ -51,13 +51,13 @@ func TestConsistantHash(t *testing.T) { hostMap := map[string]*PeerClient{} for _, h := range hosts { - peer := &PeerClient{info: PeerInfo{Address: h}} + peer := &PeerClient{info: PeerInfo{GRPCAddress: h}} hash.Add(peer) hostMap[h] = peer } for host, peer := range hostMap { - assert.Equal(t, peer, hash.GetByPeerInfo(PeerInfo{Address: host})) + assert.Equal(t, peer, hash.GetByPeerInfo(PeerInfo{GRPCAddress: host})) } }) @@ -85,13 +85,13 @@ func TestConsistantHash(t *testing.T) { hostMap := map[string]int{} for _, h := range hosts { - hash.Add(&PeerClient{info: PeerInfo{Address: h}}) + hash.Add(&PeerClient{info: PeerInfo{GRPCAddress: h}}) hostMap[h] = 0 } for i := range strings { peer, _ := hash.Get(strings[i]) - hostMap[peer.info.Address]++ + hostMap[peer.info.GRPCAddress]++ } for host, a := range hostMap { @@ -120,7 +120,7 @@ func BenchmarkConsistantHash(b *testing.B) { hash := NewConsistantHash(hashFunc) hosts := []string{"a.svc.local", "b.svc.local", "c.svc.local"} for _, h := range hosts { - hash.Add(&PeerClient{info: PeerInfo{Address: h}}) + hash.Add(&PeerClient{info: PeerInfo{GRPCAddress: h}}) } b.ResetTimer() diff --git a/kubernetes.go b/kubernetes.go index 842c045..8d68875 100644 --- a/kubernetes.go +++ b/kubernetes.go @@ -144,7 +144,7 @@ func (e *K8sPool) updatePeers() { for _, s := range endpoint.Subsets { for _, addr := range s.Addresses { - peer := PeerInfo{Address: fmt.Sprintf("%s:%s", addr.IP, e.conf.PodPort)} + peer := PeerInfo{GRPCAddress: fmt.Sprintf("%s:%s", addr.IP, e.conf.PodPort)} if addr.IP == e.conf.PodIP { peer.IsOwner = true diff --git a/memberlist.go b/memberlist.go index 171c1fe..2c6f358 100644 --- a/memberlist.go +++ b/memberlist.go @@ -12,15 +12,18 @@ import ( ml "github.com/hashicorp/memberlist" "github.com/pkg/errors" + "github.com/prometheus/common/log" + "github.com/sirupsen/logrus" ) -type MemberlistPool struct { - memberlist *ml.Memberlist - conf MemberlistPoolConfig - events *memberlistEventHandler +type MemberListPool struct { + log *logrus.Entry + memberList *ml.Memberlist + conf MemberListPoolConfig + events *memberListEventHandler } -type MemberlistPoolConfig struct { +type MemberListPoolConfig struct { AdvertiseAddress string AdvertisePort int NodeName string @@ -33,15 +36,18 @@ type MemberlistPoolConfig struct { Enabled bool } -func NewMemberlistPool(conf MemberlistPoolConfig) (*MemberlistPool, error) { - memberlistPool := &MemberlistPool{conf: conf} +func NewMemberListPool(conf MemberListPoolConfig) (*MemberListPool, error) { + m := &MemberListPool{ + log: logrus.WithField("category", "member-list"), + conf: conf, + } - // Configure memberlist event handler - memberlistPool.events = newMemberListEventHandler(conf.OnUpdate) + // Configure member list event handler + m.events = newMemberListEventHandler(m.log, conf.OnUpdate) - // Configure memberlist + // Configure member list config := ml.DefaultWANConfig() - config.Events = memberlistPool.events + config.Events = m.events config.AdvertiseAddr = conf.AdvertiseAddress config.AdvertisePort = conf.AdvertisePort @@ -57,37 +63,37 @@ func NewMemberlistPool(conf MemberlistPoolConfig) (*MemberlistPool, error) { config.Logger = conf.Logger } - // Create and set memberlist + // Create and set member list memberlist, err := ml.Create(config) if err != nil { return nil, err } - memberlistPool.memberlist = memberlist + m.memberList = memberlist // Prep metadata - gob.Register(memberlistMetadata{}) - metadata := memberlistMetadata{DataCenter: conf.DataCenter, GubernatorPort: conf.GubernatorPort} + gob.Register(memberListMetadata{}) + metadata := memberListMetadata{DataCenter: conf.DataCenter, GubernatorPort: conf.GubernatorPort} - // Join memberlist pool - err = memberlistPool.joinPool(conf.KnownNodes, metadata) + // Join member list pool + err = m.joinPool(conf.KnownNodes, metadata) if err != nil { return nil, err } - return memberlistPool, nil + return m, nil } -func (m *MemberlistPool) joinPool(knownNodes []string, metadata memberlistMetadata) error { +func (m *MemberListPool) joinPool(knownNodes []string, metadata memberListMetadata) error { // Get local node and set metadata - node := m.memberlist.LocalNode() - serializedMetadata, err := serializeMemberlistMetadata(metadata) + node := m.memberList.LocalNode() + serializedMetadata, err := serializeMemberListMetadata(metadata) if err != nil { return err } node.Meta = serializedMetadata - // Join memberlist - _, err = m.memberlist.Join(knownNodes) + // Join member list + _, err = m.memberList.Join(knownNodes) if err != nil { return errors.Wrap(err, "while joining memberlist") } @@ -98,44 +104,48 @@ func (m *MemberlistPool) joinPool(knownNodes []string, metadata memberlistMetada return nil } -func (m *MemberlistPool) Close() { - err := m.memberlist.Leave(time.Second) +func (m *MemberListPool) Close() { + err := m.memberList.Leave(time.Second) if err != nil { - log.Warn(errors.Wrap(err, "while leaving memberlist")) + m.log.Warn(errors.Wrap(err, "while leaving memberlist")) } } -type memberlistEventHandler struct { +type memberListEventHandler struct { peers map[string]PeerInfo + log *logrus.Entry OnUpdate UpdateFunc } -func newMemberListEventHandler(onUpdate UpdateFunc) *memberlistEventHandler { - eventhandler := memberlistEventHandler{OnUpdate: onUpdate} - eventhandler.peers = make(map[string]PeerInfo) - return &eventhandler +func newMemberListEventHandler(log *logrus.Entry, onUpdate UpdateFunc) *memberListEventHandler { + handler := memberListEventHandler{ + OnUpdate: onUpdate, + log: log, + } + handler.peers = make(map[string]PeerInfo) + return &handler } -func (e *memberlistEventHandler) addPeer(node *ml.Node) { +func (e *memberListEventHandler) addPeer(node *ml.Node) { ip := getIP(node.Address()) // Deserialize metadata - metadata, err := deserializeMemberlistMetadata(node.Meta) + metadata, err := deserializeMemberListMetadata(node.Meta) if err != nil { - log.Warn(errors.Wrap(err, "while adding to peers")) + e.log.Warn(errors.Wrap(err, "while adding to peers")) } else { // Construct Gubernator address and create PeerInfo gubernatorAddress := makeAddress(ip, metadata.GubernatorPort) - e.peers[ip] = PeerInfo{Address: gubernatorAddress, DataCenter: metadata.DataCenter} + e.peers[ip] = PeerInfo{GRPCAddress: gubernatorAddress, DataCenter: metadata.DataCenter} e.callOnUpdate() } } -func (e *memberlistEventHandler) NotifyJoin(node *ml.Node) { +func (e *memberListEventHandler) NotifyJoin(node *ml.Node) { ip := getIP(node.Address()) // Deserialize metadata - metadata, err := deserializeMemberlistMetadata(node.Meta) + metadata, err := deserializeMemberListMetadata(node.Meta) if err != nil { // This is called during memberlist initialization due to the fact that the local node // has no metadata yet @@ -143,12 +153,12 @@ func (e *memberlistEventHandler) NotifyJoin(node *ml.Node) { } else { // Construct Gubernator address and create PeerInfo gubernatorAddress := makeAddress(ip, metadata.GubernatorPort) - e.peers[ip] = PeerInfo{Address: gubernatorAddress, DataCenter: metadata.DataCenter} + e.peers[ip] = PeerInfo{GRPCAddress: gubernatorAddress, DataCenter: metadata.DataCenter} e.callOnUpdate() } } -func (e *memberlistEventHandler) NotifyLeave(node *ml.Node) { +func (e *memberListEventHandler) NotifyLeave(node *ml.Node) { ip := getIP(node.Address()) // Remove PeerInfo @@ -157,23 +167,23 @@ func (e *memberlistEventHandler) NotifyLeave(node *ml.Node) { e.callOnUpdate() } -func (e *memberlistEventHandler) NotifyUpdate(node *ml.Node) { +func (e *memberListEventHandler) NotifyUpdate(node *ml.Node) { ip := getIP(node.Address()) // Deserialize metadata - metadata, err := deserializeMemberlistMetadata(node.Meta) + metadata, err := deserializeMemberListMetadata(node.Meta) if err != nil { log.Warn(errors.Wrap(err, "while updating memberlist")) } else { // Construct Gubernator address and create PeerInfo gubernatorAddress := makeAddress(ip, metadata.GubernatorPort) - e.peers[ip] = PeerInfo{Address: gubernatorAddress, DataCenter: metadata.DataCenter} + e.peers[ip] = PeerInfo{GRPCAddress: gubernatorAddress, DataCenter: metadata.DataCenter} e.callOnUpdate() } } -func (e *memberlistEventHandler) callOnUpdate() { - var peers = []PeerInfo{} +func (e *memberListEventHandler) callOnUpdate() { + var peers []PeerInfo for _, p := range e.peers { peers = append(peers, p) @@ -190,12 +200,12 @@ func makeAddress(ip string, port int) string { return fmt.Sprintf("%s:%s", ip, strconv.Itoa(port)) } -type memberlistMetadata struct { +type memberListMetadata struct { DataCenter string GubernatorPort int } -func serializeMemberlistMetadata(metadata memberlistMetadata) ([]byte, error) { +func serializeMemberListMetadata(metadata memberListMetadata) ([]byte, error) { buf := bytes.Buffer{} encoder := gob.NewEncoder(&buf) @@ -208,8 +218,8 @@ func serializeMemberlistMetadata(metadata memberlistMetadata) ([]byte, error) { return buf.Bytes(), nil } -func deserializeMemberlistMetadata(metadataAsByteSlice []byte) (*memberlistMetadata, error) { - metadata := memberlistMetadata{} +func deserializeMemberListMetadata(metadataAsByteSlice []byte) (*memberListMetadata, error) { + metadata := memberListMetadata{} buf := bytes.Buffer{} buf.Write(metadataAsByteSlice) diff --git a/multiregion.go b/multiregion.go index 5843ca7..6c49c96 100644 --- a/multiregion.go +++ b/multiregion.go @@ -9,14 +9,15 @@ type mutliRegionManager struct { reqQueue chan *RateLimitReq wg syncutil.WaitGroup conf BehaviorConfig - log *logrus.Entry - instance *Instance + log logrus.FieldLogger + instance *V1Instance } -func newMultiRegionManager(conf BehaviorConfig, instance *Instance) *mutliRegionManager { +func newMultiRegionManager(conf BehaviorConfig, instance *V1Instance) *mutliRegionManager { mm := mutliRegionManager{ conf: conf, instance: instance, + log: instance.log, reqQueue: make(chan *RateLimitReq, 0), } mm.runAsyncReqs() @@ -48,7 +49,7 @@ func (mm *mutliRegionManager) runAsyncReqs() { // Send the hits if we reached our batch limit if len(hits) == mm.conf.MultiRegionBatchLimit { for dc, picker := range mm.instance.GetRegionPickers() { - log.Infof("Sending %v hit(s) to %s picker", len(hits), dc) + mm.log.Debugf("Sending %v hit(s) to %s picker", len(hits), dc) mm.sendHits(hits, picker) } hits = make(map[string]*RateLimitReq) @@ -62,7 +63,7 @@ func (mm *mutliRegionManager) runAsyncReqs() { case <-interval.C: if len(hits) > 0 { for dc, picker := range mm.instance.GetRegionPickers() { - log.Infof("Sending %v hit(s) to %s picker", len(hits), dc) + mm.log.Debugf("Sending %v hit(s) to %s picker", len(hits), dc) mm.sendHits(hits, picker) } hits = make(map[string]*RateLimitReq) diff --git a/peer_client.go b/peer_client.go index daef6ba..a91e27d 100644 --- a/peer_client.go +++ b/peer_client.go @@ -108,10 +108,10 @@ func (c *PeerClient) connect() error { } var err error - // c.conn, err = grpc.Dial(fmt.Sprintf("%s:%s", c.info.Address, ""), grpc.WithInsecure()) - c.conn, err = grpc.Dial(c.info.Address, grpc.WithInsecure()) + // c.conn, err = grpc.Dial(fmt.Sprintf("%s:%s", c.info.GRPCAddress, ""), grpc.WithInsecure()) + c.conn, err = grpc.Dial(c.info.GRPCAddress, grpc.WithInsecure()) if err != nil { - return c.setLastErr(&PeerErr{err: errors.Wrapf(err, "failed to dial peer %s", c.info.Address)}) + return c.setLastErr(&PeerErr{err: errors.Wrapf(err, "failed to dial peer %s", c.info.GRPCAddress)}) } c.client = NewPeersV1Client(c.conn) c.status = peerConnected @@ -122,6 +122,11 @@ func (c *PeerClient) connect() error { return nil } +// PeerInfo returns PeerInfo struct that describes this PeerClient +func (c *PeerClient) PeerInfo() PeerInfo { + return c.info +} + // GetPeerRateLimit forwards a rate limit request to a peer. If the rate limit has `behavior == BATCHING` configured // this method will attempt to batch the rate limits func (c *PeerClient) GetPeerRateLimit(ctx context.Context, r *RateLimitReq) (*RateLimitResp, error) { @@ -196,7 +201,7 @@ func (c *PeerClient) setLastErr(err error) error { } // Prepend client address to error - errWithHostname := errors.Wrap(err, fmt.Sprintf("from host %s", c.info.Address)) + errWithHostname := errors.Wrap(err, fmt.Sprintf("from host %s", c.info.GRPCAddress)) key := err.Error() // Add error to the cache with a TTL of 5 minutes diff --git a/peer_client_test.go b/peer_client_test.go index af0e3f1..990f5e9 100644 --- a/peer_client_test.go +++ b/peer_client_test.go @@ -40,7 +40,6 @@ func TestPeerClientShutdown(t *testing.T) { c := cases[i] t.Run(c.Name, func(t *testing.T) { - client := gubernator.NewPeerClient(config, cluster.GetRandomPeer()) wg := sync.WaitGroup{} diff --git a/replicated_hash.go b/replicated_hash.go index 9a1b3b7..17421f0 100644 --- a/replicated_hash.go +++ b/replicated_hash.go @@ -74,10 +74,10 @@ func (ch *ReplicatedConsistantHash) Peers() []*PeerClient { // Adds a peer to the hash func (ch *ReplicatedConsistantHash) Add(peer *PeerClient) { - ch.peers[peer.info.Address] = peer + ch.peers[peer.info.GRPCAddress] = peer for i := 0; i < ch.replicas; i++ { - hash := ch.hashFunc(strToBytesUnsafe(strconv.Itoa(i) + peer.info.Address)) + hash := ch.hashFunc(strToBytesUnsafe(strconv.Itoa(i) + peer.info.GRPCAddress)) ch.peerKeys = append(ch.peerKeys, peerInfo{ hash: hash, peer: peer, @@ -94,7 +94,7 @@ func (ch *ReplicatedConsistantHash) Size() int { // Returns the peer by hostname func (ch *ReplicatedConsistantHash) GetByPeerInfo(peer PeerInfo) *PeerClient { - return ch.peers[peer.Address] + return ch.peers[peer.GRPCAddress] } // Given a key, return the peer that key is assigned too diff --git a/replicated_hash_test.go b/replicated_hash_test.go index cb2fd5e..360e072 100644 --- a/replicated_hash_test.go +++ b/replicated_hash_test.go @@ -18,7 +18,7 @@ func TestReplicatedConsistantHash(t *testing.T) { hash := NewReplicatedConsistantHash(nil, DefaultReplicas) for _, h := range hosts { - hash.Add(&PeerClient{info: PeerInfo{Address: h}}) + hash.Add(&PeerClient{info: PeerInfo{GRPCAddress: h}}) } assert.Equal(t, len(hosts), hash.Size()) @@ -29,13 +29,13 @@ func TestReplicatedConsistantHash(t *testing.T) { hostMap := map[string]*PeerClient{} for _, h := range hosts { - peer := &PeerClient{info: PeerInfo{Address: h}} + peer := &PeerClient{info: PeerInfo{GRPCAddress: h}} hash.Add(peer) hostMap[h] = peer } for host, peer := range hostMap { - assert.Equal(t, peer, hash.GetByPeerInfo(PeerInfo{Address: host})) + assert.Equal(t, peer, hash.GetByPeerInfo(PeerInfo{GRPCAddress: host})) } }) @@ -62,13 +62,13 @@ func TestReplicatedConsistantHash(t *testing.T) { hostMap := map[string]int{} for _, h := range hosts { - hash.Add(&PeerClient{info: PeerInfo{Address: h}}) + hash.Add(&PeerClient{info: PeerInfo{GRPCAddress: h}}) hostMap[h] = 0 } for i := range strings { peer, _ := hash.Get(strings[i]) - hostMap[peer.info.Address]++ + hostMap[peer.info.GRPCAddress]++ } for host, a := range hostMap { @@ -96,7 +96,7 @@ func BenchmarkReplicatedConsistantHash(b *testing.B) { hash := NewReplicatedConsistantHash(hashFunc, DefaultReplicas) hosts := []string{"a.svc.local", "b.svc.local", "c.svc.local"} for _, h := range hosts { - hash.Add(&PeerClient{info: PeerInfo{Address: h}}) + hash.Add(&PeerClient{info: PeerInfo{GRPCAddress: h}}) } b.ResetTimer() diff --git a/store_test.go b/store_test.go index 7189d75..f788849 100644 --- a/store_test.go +++ b/store_test.go @@ -18,32 +18,76 @@ package gubernator_test import ( "context" + "fmt" + "google.golang.org/grpc" + "net" "testing" "time" "github.com/mailgun/gubernator" - "github.com/mailgun/gubernator/cluster" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +type v1Server struct { + conf gubernator.Config + listener net.Listener + srv *gubernator.V1Instance +} + +func (s *v1Server) Close() { + s.conf.GRPCServer.GracefulStop() + s.srv.Close() +} + +// Start a single instance of V1Server with the provided config and listening address. +func newV1Server(t *testing.T, address string, conf gubernator.Config) *v1Server { + t.Helper() + conf.GRPCServer = grpc.NewServer() + + srv, err := gubernator.NewV1Instance(conf) + require.NoError(t, err) + + listener, err := net.Listen("tcp", address) + require.NoError(t, err) + + go func() { + if err := conf.GRPCServer.Serve(listener); err != nil { + fmt.Printf("while serving: %s\n", err) + } + }() + + srv.SetPeers([]gubernator.PeerInfo{{GRPCAddress: listener.Addr().String(), IsOwner: true}}) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + + err = gubernator.WaitForConnect(ctx, []string{listener.Addr().String()}) + require.NoError(t, err) + cancel() + + return &v1Server{ + conf: conf, + listener: listener, + srv: srv, + } +} + func TestLoader(t *testing.T) { loader := gubernator.NewMockLoader() - ins, err := cluster.StartInstance("", gubernator.Config{ + srv := newV1Server(t, "", gubernator.Config{ Behaviors: gubernator.BehaviorConfig{ GlobalSyncWait: time.Millisecond * 50, // Suitable for testing but not production GlobalTimeout: time.Second, }, Loader: loader, }) - assert.Nil(t, err) // loader.Load() should have been called for gubernator startup assert.Equal(t, 1, loader.Called["Load()"]) assert.Equal(t, 0, loader.Called["Save()"]) - client, err := gubernator.DialV1Server(ins.Address) + client, err := gubernator.DialV1Server(srv.listener.Addr().String()) assert.Nil(t, err) resp, err := client.GetRateLimits(context.Background(), &gubernator.GetRateLimitsReq{ @@ -63,8 +107,7 @@ func TestLoader(t *testing.T) { require.Equal(t, 1, len(resp.Responses)) require.Equal(t, "", resp.Responses[0].Error) - err = ins.Stop() - require.Nil(t, err) + srv.Close() // Loader.Save() should been called during gubernator shutdown assert.Equal(t, 1, loader.Called["Load()"]) @@ -164,20 +207,19 @@ func TestStore(t *testing.T) { t.Run(tt.name, func(t *testing.T) { store := gubernator.NewMockStore() - ins, err := cluster.StartInstance("", gubernator.Config{ + srv := newV1Server(t, "", gubernator.Config{ Behaviors: gubernator.BehaviorConfig{ GlobalSyncWait: time.Millisecond * 50, // Suitable for testing but not production GlobalTimeout: time.Second, }, Store: store, }) - assert.Nil(t, err) // No calls to store assert.Equal(t, 0, store.Called["OnChange()"]) assert.Equal(t, 0, store.Called["Get()"]) - client, err := gubernator.DialV1Server(ins.Address) + client, err := gubernator.DialV1Server(srv.listener.Addr().String()) assert.Nil(t, err) req := gubernator.RateLimitReq{ diff --git a/version b/version index 2003b63..8ba2dc9 100644 --- a/version +++ b/version @@ -1 +1 @@ -0.9.2 +1.0.0.rc.1