Skip to content

Commit

Permalink
GRPC Gateway tests
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed Oct 23, 2020
1 parent 9cd9b22 commit fb7ad6b
Show file tree
Hide file tree
Showing 28 changed files with 823 additions and 634 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.0.0-rc.1] - 2020-10-22
### Change
* Fix inconsistent tests failing
* Fix GRPC/HTTP Gateway
* Renamed functions to ensure clarity of version

## [0.9.2] - 2020-10-23
### Change
* ETCD discovery now sets the IsOwner property when updating the peers list.
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ VERSION=$(shell cat version)
LDFLAGS="-X main.Version=$(VERSION)"

test:
go test ./... -v -race -count=1
go test ./... -v -race -p=1 -count=1

docker:
docker build --build-arg VERSION=$(VERSION) -t thrawn01/gubernator:$(VERSION) .
Expand Down
6 changes: 3 additions & 3 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func BenchmarkServer_GetPeerRateLimitNoBatching(b *testing.B) {
}

func BenchmarkServer_GetRateLimit(b *testing.B) {
client, err := guber.DialV1Server(cluster.GetRandomPeer().Address)
client, err := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress)
if err != nil {
b.Errorf("NewV1Client err: %s", err)
}
Expand All @@ -77,7 +77,7 @@ func BenchmarkServer_GetRateLimit(b *testing.B) {
}

func BenchmarkServer_Ping(b *testing.B) {
client, err := guber.DialV1Server(cluster.GetRandomPeer().Address)
client, err := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress)
if err != nil {
b.Errorf("NewV1Client err: %s", err)
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func BenchmarkServer_Ping(b *testing.B) {
}*/

func BenchmarkServer_ThunderingHeard(b *testing.B) {
client, err := guber.DialV1Server(cluster.GetRandomPeer().Address)
client, err := guber.DialV1Server(cluster.GetRandomPeer().GRPCAddress)
if err != nil {
b.Errorf("NewV1Client err: %s", err)
}
Expand Down
5 changes: 4 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ func FromUnixMilliseconds(ts int64) time.Time {

// Given a list of peers, return a random peer
func RandomPeer(peers []PeerInfo) PeerInfo {
return peers[rand.Intn(len(peers))]
rand.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})
return peers[0]
}

// Return a random alpha string of 'n' length
Expand Down
173 changes: 64 additions & 109 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,159 +17,114 @@ limitations under the License.
package cluster

import (
"fmt"
"net"
"context"
"math/rand"
"time"

"github.com/mailgun/gubernator"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)

type instance struct {
GRPC *grpc.Server
Guber *gubernator.Instance
Address string
type Address struct {
HTTPAddress string
GRPCAddress string
}

func (i *instance) Peers() []gubernator.PeerInfo {
var result []gubernator.PeerInfo
for _, peer := range peers {
if peer.Address == i.Address {
peer.IsOwner = true
}
result = append(result, peer)
func (a *Address) PeerInfo() gubernator.PeerInfo {
return gubernator.PeerInfo{
HTTPAddress: a.GRPCAddress,
GRPCAddress: a.GRPCAddress,
}
return result
}

func (i *instance) Stop() error {
err := i.Guber.Close()
i.GRPC.GracefulStop()
return err
}

var instances []*instance
var daemons []*gubernator.Daemon
var peers []gubernator.PeerInfo

// Returns default testing configuration
func GetDefaultConfig() gubernator.Config {
return gubernator.Config{
Behaviors: gubernator.BehaviorConfig{
GlobalSyncWait: time.Millisecond * 50, // Suitable for testing but not production
GlobalTimeout: time.Second,
MultiRegionSyncWait: time.Millisecond * 50, // Suitable for testing but not production
MultiRegionTimeout: time.Second,
},
}
}

// Returns a random peer from the cluster
func GetRandomPeer() gubernator.PeerInfo {
return gubernator.RandomPeer(peers)
return peers[rand.Intn(len(peers))]
}

// Returns a specific peer
func PeerAt(idx int) gubernator.PeerInfo {
return peers[idx]
// Returns a list of all peers in the cluster
func GetPeers() []gubernator.PeerInfo {
return peers
}

// Returns a specific instance
func InstanceAt(idx int) *instance {
return instances[idx]
// Returns a list of all deamons in the cluster
func GetDaemons() []*gubernator.Daemon {
return daemons
}

// Return the specific instance for a host
func InstanceForHost(host string) *instance {
for i := range instances {
if instances[i].Address == host {
return instances[i]
}
}
return nil
// Returns a specific peer
func PeerAt(idx int) gubernator.PeerInfo {
return peers[idx]
}

// Stop an instance without updating peers, used to cause connection errors
func StopInstanceAt(idx int) {
instances[idx].Stop()
// Returns a specific daemon
func DaemonAt(idx int) *gubernator.Daemon {
return daemons[idx]
}

// Returns the number of instances
func NumOfInstances() int {
return len(instances)
func NumOfDaemons() int {
return len(daemons)
}

// Start a local cluster of gubernator servers
func Start(numInstances int) error {
addresses := make([]string, numInstances, numInstances)
return StartWith(addresses)
peers := make([]gubernator.PeerInfo, numInstances, numInstances)
return StartWith(peers)
}

func Restart(ctx context.Context) {
for i := 0; i < len(daemons); i++ {
daemons[i].Close()
daemons[i].Start(ctx)
daemons[i].SetPeers(peers)
}
}

// Start a local cluster with specific addresses
func StartWith(addresses []string) error {
config := GetDefaultConfig()
for _, address := range addresses {
ins, err := StartInstance(address, config)
func StartWith(localPeers []gubernator.PeerInfo) error {
for _, peer := range localPeers {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
d, err := gubernator.SpawnDaemon(ctx, gubernator.DaemonConfig{
Logger: logrus.WithField("instance", peer.GRPCAddress),
GRPCListenAddress: peer.GRPCAddress,
HTTPListenAddress: peer.HTTPAddress,
Behaviors: gubernator.BehaviorConfig{
// Suitable for testing but not production
GlobalSyncWait: time.Millisecond * 50,
GlobalTimeout: time.Second * 5,
BatchTimeout: time.Second * 5,
MultiRegionTimeout: time.Second * 5,
},
})
cancel()
if err != nil {
return errors.Wrapf(err, "while starting instance for addr '%s'", address)
return errors.Wrapf(err, "while starting server for addr '%s'", peer.GRPCAddress)
}

// Add the peers and instances to the package level variables
peers = append(peers, gubernator.PeerInfo{Address: ins.Address})
instances = append(instances, ins)
// Add the peers and daemons to the package level variables
peers = append(peers, gubernator.PeerInfo{
GRPCAddress: d.GRPCListener.Addr().String(),
HTTPAddress: d.HTTPListener.Addr().String(),
})
daemons = append(daemons, d)
}

// Tell each instance about the other peers
for _, ins := range instances {
ins.Guber.SetPeers(ins.Peers())
for _, d := range daemons {
d.SetPeers(peers)
}
return nil
}

func Stop() {
for _, ins := range instances {
ins.Stop()
}
}

// Start a single instance of gubernator with the provided config and listening address.
// If address is empty string a random port on the loopback device will be chosen.
func StartInstance(address string, conf gubernator.Config) (*instance, error) {
conf.GRPCServer = grpc.NewServer()

guber, err := gubernator.New(conf)
if err != nil {
return nil, errors.Wrap(err, "while creating new gubernator instance")
}

listener, err := net.Listen("tcp", address)
if err != nil {
return nil, errors.Wrap(err, "while listening on random interface")
for _, d := range daemons {
d.Close()
}

go func() {
logrus.Infof("Listening on %s", listener.Addr().String())
if err := conf.GRPCServer.Serve(listener); err != nil {
fmt.Printf("while serving: %s\n", err)
}
}()

// Wait until the instance responds to connect
for i := 0; i < 10; i++ {
conn, err := net.Dial("tcp", address)
if err != nil {
break
}
conn.Close()
time.Sleep(time.Millisecond * 50)
}

guber.SetPeers([]gubernator.PeerInfo{{Address: listener.Addr().String(), IsOwner: true}})

return &instance{
Address: listener.Addr().String(),
GRPC: conf.GRPCServer,
Guber: guber,
}, nil
peers = nil
daemons = nil
}
Loading

0 comments on commit fb7ad6b

Please sign in to comment.