From 299652b7ccb15c4db9d99dcc17f00f6e42b07836 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Fri, 5 Oct 2018 20:58:56 -0500 Subject: [PATCH] election.Start() now blocks until first election is complete --- README.md | 8 ++------ etcdutil/README.md | 33 +++++++++++++++++++-------------- etcdutil/config.go | 2 +- etcdutil/election.go | 24 +++++++++++++++++------- version | 2 +- 5 files changed, 40 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 043af73b..cb8e4077 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,6 @@ # Holster A place to holster mailgun's golang libraries and tools -## Bunker -Bunker is a key/value store library for efficiently storing large chunks of data into a cassandra cluster. - Bunker provides support for encryption, compression and data signing -See the [bunker readme](https://github.com/mailgun/holster/blob/master/bunker/README.md) for details - ## Clock A drop in (almost) replacement for the system `time` package to make scheduled events deterministic in tests. See the [clock readme](https://github.com/mailgun/holster/blob/master/clock/README.md) for details @@ -24,7 +19,8 @@ See the [secret readme](https://github.com/mailgun/holster/blob/master/secret/RE ## Distributed Election A distributed election implementation using etcd to coordinate elections -See the [election readme](https://github.com/mailgun/holster/blob/master/election/README.md) for details +See the [etcd v2 readme](https://github.com/mailgun/holster/blob/master/election/README.md) for details +See the [etcd v3 readme](https://github.com/mailgun/holster/blob/master/etcdutil/README.md) for details ## Errors Errors is a fork of [https://github.com/pkg/errors](https://github.com/pkg/errors) with additional diff --git a/etcdutil/README.md b/etcdutil/README.md index ada1dfe6..e1535688 100644 --- a/etcdutil/README.md +++ b/etcdutil/README.md @@ -22,8 +22,17 @@ func main() { return } + client, err := etcdutil.NewClient(nil) + if err != nil { + fmt.Fprintf(os.Stderr, "while creating etcd client: %s\n", err) + return + } + // Preform an election called 'my-service' with hostname as the candidate name - leader, _ := etcdutil.NewElection("my-service", hostname, nil) + election, _ := etcdutil.NewElection("my-service", hostname, client) + + // Start the election, will block until a leader is elected + election.Start() // Handle graceful shutdown signalChan := make(chan os.Signal, 1) @@ -35,16 +44,16 @@ func main() { select { case <-tick.C: // Are we currently leader? - if leader.IsLeader() { + if election.IsLeader() { err := DoThing() if err != nil { // Have another instance DoThing(), we can't for some reason - leader.Concede() + election.Concede() } } return true case <-signalChan: - leader.Stop() + election.Stop() return false } }) @@ -52,7 +61,7 @@ func main() { } ``` -## NewEtcdConfig() +## NewConfig() Designed to be used in applications that share the same etcd config and wish to reuse the same config throughout the application. @@ -66,7 +75,7 @@ import ( func main() { // These environment variables provided by the environment, - // we set them here to only to illustrate how `NewEtcdConfig()` + // we set them here to only to illustrate how `NewConfig()` // uses the environment to create a new etcd config os.Setenv("ETCD3_USER", "root") os.Setenv("ETCD3_PASSWORD", "rootpw") @@ -81,20 +90,16 @@ func main() { os.Setenv("ETCD3_SKIP_VERIFY", "true") // Create a new etc config from available environment variables - cfg, err := etcdutil.NewEtcdConfig(nil) + cfg, err := etcdutil.NewConfig(nil) if err != nil { fmt.Fprintf(os.Stderr, "while creating etcd config: %s\n", err) return } - - // Use cfg to init scroll - // Use cfg to init eventbus - // Use cfg to init leader election } ``` -## NewSecureClient() -Just like `NewEtcdConfig()` but returns a connected etcd client for use by the +## NewClient() +Just like `NewConfig()` but returns a connected etcd client for use by the rest of the application. ```go @@ -107,7 +112,7 @@ import ( func main() { // Create a new etc client from available environment variables - client, err := etcdutil.NewSecureClient(nil) + client, err := etcdutil.NewClient(nil) if err != nil { fmt.Fprintf(os.Stderr, "while creating etcd client: %s\n", err) return diff --git a/etcdutil/config.go b/etcdutil/config.go index ad9b9894..2b2a18cf 100644 --- a/etcdutil/config.go +++ b/etcdutil/config.go @@ -38,7 +38,7 @@ func NewClient(cfg *etcd.Config) (*etcd.Client, error) { etcdClt, err := etcd.New(*cfg) if err != nil { - return nil, errors.Wrap(err, "failed to create secure etcd client") + return nil, errors.Wrap(err, "failed to create etcd client") } return etcdClt, nil } diff --git a/etcdutil/election.go b/etcdutil/election.go index 32ab6017..d1225003 100644 --- a/etcdutil/election.go +++ b/etcdutil/election.go @@ -4,6 +4,7 @@ import ( "context" "os" "path" + "sync" "sync/atomic" "time" @@ -83,8 +84,9 @@ func NewElection(election, candidate string, client *etcd.Client) (*Election, er return e, nil } -func (e *Election) Start() error { - var err error +func (e *Election) Start() (err error) { + var once sync.Once + var completed sync.WaitGroup e.session, err = concurrency.NewSession(e.client, concurrency.WithTTL(e.TTL)) if err != nil { @@ -94,12 +96,15 @@ func (e *Election) Start() error { // Start a new election e.election = concurrency.NewElection(e.session, e.Election) + completed.Add(1) e.wg.Until(func(done chan struct{}) bool { log.Debugf("attempting to become leader '%s'\n", e.Candidate) // Start a new campaign and attempt to become leader - if err := e.election.Campaign(e.ctx, e.Candidate); err != nil { - errors.Wrap(err, "while starting a new campaign") + if err = e.election.Campaign(e.ctx, e.Candidate); err != nil { + err = errors.Wrap(err, "while starting a new campaign") + completed.Done() + return false } observeChan := e.election.Observe(e.ctx) @@ -107,6 +112,7 @@ func (e *Election) Start() error { select { case node, ok := <-observeChan: if !ok { + once.Do(completed.Done) return false } if string(node.Kvs[0].Value) == e.Candidate { @@ -114,15 +120,19 @@ func (e *Election) Start() error { atomic.StoreInt32(&e.isLeader, 1) } else { // We are not leader - logrus.Debug("NOT Leader") + log.Debug("NOT Leader") atomic.StoreInt32(&e.isLeader, 0) } + once.Do(completed.Done) case <-done: return false } } }) - return nil + + // Wait until the first election has completed + completed.Wait() + return err } func (e *Election) Stop() { @@ -139,7 +149,7 @@ func (e *Election) IsLeader() bool { func (e *Election) Concede() bool { if atomic.LoadInt32(&e.isLeader) == 1 { if err := e.election.Resign(e.ctx); err != nil { - logrus.WithField("err", err). + log.WithField("err", err). Error("while attempting to concede the election") } atomic.StoreInt32(&e.isLeader, 0) diff --git a/version b/version index 3e3c2f1e..c043eea7 100644 --- a/version +++ b/version @@ -1 +1 @@ -2.1.1 +2.2.1