diff --git a/cmd/election/main.go b/cmd/election/main.go new file mode 100644 index 00000000..c3468981 --- /dev/null +++ b/cmd/election/main.go @@ -0,0 +1,54 @@ +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "github.com/mailgun/holster/etcdutil" + "github.com/sirupsen/logrus" +) + +func main() { + logrus.SetLevel(logrus.DebugLevel) + + if len(os.Args) < 2 { + fmt.Println("a candidate name is required") + os.Exit(1) + } + + e, err := etcdutil.NewElection("cli-election", os.Args[1], nil) + if err != nil { + fmt.Printf("while creating a new election: %s\n", err) + os.Exit(1) + } + + e.Start() + if err != nil { + fmt.Printf("during election start: %s\n", err) + os.Exit(1) + } + + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT) + go func() { + for { + select { + case sig := <-c: + switch sig { + case syscall.SIGINT: + fmt.Printf("[%s] Concede and exit\n", os.Args[1]) + e.Stop() + os.Exit(1) + } + } + } + }() + + for { + fmt.Printf("[%s] Leader: %t\n", os.Args[1], e.IsLeader()) + time.Sleep(time.Second) + } +} diff --git a/etcdutil/README.md b/etcdutil/README.md new file mode 100644 index 00000000..6ede3c36 --- /dev/null +++ b/etcdutil/README.md @@ -0,0 +1,46 @@ +## ETCD Leader Election +Use etcd for leader election if you have several instances of a service running in production +and you only want one of the service instances to preform a task. + +`LeaderElection` starts a goroutine which performs an election and maintains a leader + while services join and leave the election. Calling `Stop()` will `Concede()` leadership if + we currently have it. + +```go + +import ( + "github.com/mailgun/holster" + "github.com/mailgun/holster/election" +) + +var wg holster.WaitGroup + +// Start the goroutine and preform the election +leader, _ := election.NewElection("my-service", "", nil) + +// Handle graceful shutdown +signalChan := make(chan os.Signal, 1) +signal.Notify(signalChan, os.Interrupt, os.Kill) + +// Do periodic thing +tick := time.NewTicker(time.Second * 2) +wg.Loop(func() bool { + select { + case <-tick.C: + // Are we currently leader? + if leader.IsLeader() { + err := DoThing() + if err != nil { + // Have another instance DoThing(), we can't for some reason + leader.Concede() + } + } + return true + case <-signalChan: + leader.Stop() + return false + } +}) +wg.Wait() +``` + diff --git a/etcdutil/config.go b/etcdutil/config.go new file mode 100644 index 00000000..a02a94c7 --- /dev/null +++ b/etcdutil/config.go @@ -0,0 +1,109 @@ +package etcdutil + +import ( + "crypto/tls" + "crypto/x509" + "io/ioutil" + "os" + + etcd "github.com/coreos/etcd/clientv3" + "github.com/mailgun/holster" + "github.com/pkg/errors" + "google.golang.org/grpc/grpclog" +) + +const ( + pathToCA = "/etc/mailgun/ssl/localhost/ca.pem" + pathToKey = "/etc/mailgun/ssl/localhost/etcd-key.pem" + pathToCert = "/etc/mailgun/ssl/localhost/etcd-cert.pem" + localSecureEndpoint = "https://127.0.0.1:23790" + localInsecureEndpoint = "http://127.0.0.1:23790" +) + +func NewSecureClient(cfg *etcd.Config) (*etcd.Client, error) { + var err error + if cfg, err = NewEtcdConfig(cfg); err != nil { + } + + etcdClt, err := etcd.New(*cfg) + if err != nil { + return nil, errors.Wrap(err, "failed to create secure etcd client") + } + return etcdClt, nil +} + +// Create a new etcd.Config using environment variables. If an existing +// config is passed, will fill in missing configuration using environment +// variables or defaults if they exists on the local system. + +// If no environment variables are set, will return a config set to +// connect without TLS via http://localhost:23790 +func NewEtcdConfig(cfg *etcd.Config) (*etcd.Config, error) { + var envEndpoint, tlsCertFile, tlsKeyFile, tlsCaFile string + + // Create a config if none exists and get user/pass + holster.SetDefault(&cfg, &etcd.Config{}) + holster.SetDefault(&cfg.Username, os.Getenv("ETCD3_USER")) + holster.SetDefault(&cfg.Password, os.Getenv("ETCD3_PASSWORD")) + + // Don't set default file locations for these if they don't exist on disk + // as dev or testing environments might not have certificates + holster.SetDefault(&tlsCertFile, os.Getenv("ETCD3_TLS_CERT"), ifExists(pathToCert)) + holster.SetDefault(&tlsKeyFile, os.Getenv("ETCD3_TLS_KEY"), ifExists(pathToKey)) + holster.SetDefault(&tlsCaFile, os.Getenv("ETCD3_CA"), ifExists(pathToCA)) + + if os.Getenv("ETCD3_DEBUG") != "" { + etcd.SetLogger(grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 4)) + } + + // If the CA file was provided + if tlsCaFile != "" { + holster.SetDefault(&cfg.TLS, &tls.Config{}) + + var certPool *x509.CertPool = nil + if pemBytes, err := ioutil.ReadFile(tlsCaFile); err == nil { + certPool = x509.NewCertPool() + certPool.AppendCertsFromPEM(pemBytes) + } else { + return nil, errors.Errorf("while loading cert CA file '%s': %s", tlsCaFile, err) + } + holster.SetDefault(&cfg.TLS.RootCAs, certPool) + cfg.TLS.InsecureSkipVerify = false + } + + // If the cert and key files are provided attempt to load them + if tlsCertFile != "" && tlsKeyFile != "" { + holster.SetDefault(&cfg.TLS, &tls.Config{}) + tlsCert, err := tls.LoadX509KeyPair(tlsCertFile, tlsKeyFile) + if err != nil { + return nil, errors.Errorf("while loading cert '%s' and key file '%s': %s", + tlsCertFile, tlsKeyFile, err) + } + holster.SetDefault(&cfg.TLS.Certificates, []tls.Certificate{tlsCert}) + } + + holster.SetDefault(&envEndpoint, os.Getenv("ETCD3_ENDPOINT"), secureOrInsecure(cfg.TLS)) + holster.SetDefault(&cfg.Endpoints, []string{envEndpoint}) + + // Override here if user REALLY wants this + if cfg.TLS != nil && os.Getenv("ETCD3_SKIP_VERIFY") != "" { + cfg.TLS.InsecureSkipVerify = true + } + + return cfg, nil +} + +// If the file exists, return the path provided +func ifExists(file string) string { + if _, err := os.Stat(file); err == nil { + return file + } + return "" +} + +func secureOrInsecure(tlsConfig *tls.Config) string { + if tlsConfig == nil { + return localInsecureEndpoint + } + return localSecureEndpoint +} diff --git a/etcdutil/election.go b/etcdutil/election.go new file mode 100644 index 00000000..5fd24b50 --- /dev/null +++ b/etcdutil/election.go @@ -0,0 +1,168 @@ +package etcdutil + +import ( + "context" + "os" + "path" + "sync/atomic" + "time" + + etcd "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" + "github.com/mailgun/holster" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +var log *logrus.Entry + +type LeaderElector interface { + IsLeader() bool + Concede() bool + Start() error + Stop() +} + +type Election struct { + // The name of the election (IE: scout, blackbird, etc...) + Election string + // The name of this instance (IE: worker-n01, worker-n02, etc...) + Candidate string + // Seconds to wait before giving up the election if leader disconnected + TTL int + + etcdConfig *etcd.Config + session *concurrency.Session + election *concurrency.Election + client *etcd.Client + cancel context.CancelFunc + wg holster.WaitGroup + ctx context.Context + isLeader int32 +} + +// Use leader election if you have several instances of a service running in production +// and you only want one of the service instances to preform a periodic task. +// +// election, _ := etcdv3.NewElection("election-name", "", nil) +// +// // Start the leader election and attempt to become leader +// election.Start() +// +// // Returns true if we are leader (thread safe) +// if election.IsLeader() { +// // Do periodic thing +// } +func NewElection(election, candidate string, etcdConfig *etcd.Config) (*Election, error) { + log = logrus.WithField("category", "election") + ctx, cancelFunc := context.WithCancel(context.Background()) + e := &Election{ + Candidate: candidate, + Election: election, + TTL: 5, + etcdConfig: etcdConfig, + cancel: cancelFunc, + ctx: ctx, + } + + if host, err := os.Hostname(); err == nil { + holster.SetDefault(&e.Candidate, host) + } + + // Set a prefix key for elections + e.Election = path.Join("/elections", e.Election) + + var err error + e.etcdConfig, err = NewEtcdConfig(etcdConfig) + if err != nil { + return nil, err + } + + e.client, err = etcd.New(*e.etcdConfig) + if err != nil { + return nil, err + } + + // Test the connection + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + _, err = e.client.Get(ctx, e.Election) + if err != nil { + return nil, errors.Wrap(err, "while connecting to etcd") + } + + return e, nil +} + +func (e *Election) Start() error { + var err error + + e.session, err = concurrency.NewSession(e.client, concurrency.WithTTL(e.TTL)) + if err != nil { + return errors.Wrap(err, "while creating new session") + } + + // Start a new election + e.election = concurrency.NewElection(e.session, e.Election) + + e.wg.Until(func(done chan struct{}) bool { + log.Debugf("attempting to become leader '%s'\n", e.Candidate) + + // Start a new campaign and attempt to become leader + if err := e.election.Campaign(e.ctx, e.Candidate); err != nil { + errors.Wrap(err, "while starting a new campaign") + } + + observeChan := e.election.Observe(e.ctx) + for { + select { + case node, ok := <-observeChan: + if !ok { + return false + } + if string(node.Kvs[0].Value) == e.Candidate { + log.Debug("IS Leader") + atomic.StoreInt32(&e.isLeader, 1) + } else { + // We are not leader + logrus.Debug("NOT Leader") + atomic.StoreInt32(&e.isLeader, 0) + } + case <-done: + return false + } + } + }) + return nil +} + +func (e *Election) Stop() { + e.Concede() + e.cancel() + e.wg.Wait() +} + +func (e *Election) IsLeader() bool { + return atomic.LoadInt32(&e.isLeader) == 1 +} + +// Release leadership and return true if we own it, else do nothing and return false +func (e *Election) Concede() bool { + if atomic.LoadInt32(&e.isLeader) == 1 { + if err := e.election.Resign(e.ctx); err != nil { + logrus.WithField("err", err). + Error("while attempting to concede the election") + } + atomic.StoreInt32(&e.isLeader, 0) + return true + } + return false +} + +type LeaderElectionMock struct{} + +func (s *LeaderElectionMock) IsLeader() bool { return true } +func (s *LeaderElectionMock) Concede() bool { return true } +func (s *LeaderElectionMock) Start() {} +func (s *LeaderElectionMock) Stop() {} diff --git a/version b/version index 943f9cbc..10c08801 100644 --- a/version +++ b/version @@ -1 +1 @@ -1.7.1 +1.7.4