Skip to content

Commit

Permalink
Merge pull request #35 from mailgun/thrawn/develop
Browse files Browse the repository at this point in the history
election.Start() now blocks until first election is complete
  • Loading branch information
thrawn01 authored Oct 10, 2018
2 parents 64e6402 + 299652b commit 85b3be5
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 29 deletions.
8 changes: 2 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
# Holster
A place to holster mailgun's golang libraries and tools

## Bunker
Bunker is a key/value store library for efficiently storing large chunks of data into a cassandra cluster.
Bunker provides support for encryption, compression and data signing
See the [bunker readme](https://github.com/mailgun/holster/blob/master/bunker/README.md) for details

## Clock
A drop in (almost) replacement for the system `time` package to make scheduled
events deterministic in tests. See the [clock readme](https://github.com/mailgun/holster/blob/master/clock/README.md) for details
Expand All @@ -24,7 +19,8 @@ See the [secret readme](https://github.com/mailgun/holster/blob/master/secret/RE

## Distributed Election
A distributed election implementation using etcd to coordinate elections
See the [election readme](https://github.com/mailgun/holster/blob/master/election/README.md) for details
See the [etcd v2 readme](https://github.com/mailgun/holster/blob/master/election/README.md) for details
See the [etcd v3 readme](https://github.com/mailgun/holster/blob/master/etcdutil/README.md) for details

## Errors
Errors is a fork of [https://github.com/pkg/errors](https://github.com/pkg/errors) with additional
Expand Down
33 changes: 19 additions & 14 deletions etcdutil/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,17 @@ func main() {
return
}

client, err := etcdutil.NewClient(nil)
if err != nil {
fmt.Fprintf(os.Stderr, "while creating etcd client: %s\n", err)
return
}

// Preform an election called 'my-service' with hostname as the candidate name
leader, _ := etcdutil.NewElection("my-service", hostname, nil)
election, _ := etcdutil.NewElection("my-service", hostname, client)

// Start the election, will block until a leader is elected
election.Start()

// Handle graceful shutdown
signalChan := make(chan os.Signal, 1)
Expand All @@ -35,24 +44,24 @@ func main() {
select {
case <-tick.C:
// Are we currently leader?
if leader.IsLeader() {
if election.IsLeader() {
err := DoThing()
if err != nil {
// Have another instance DoThing(), we can't for some reason
leader.Concede()
election.Concede()
}
}
return true
case <-signalChan:
leader.Stop()
election.Stop()
return false
}
})
wg.Wait()
}
```

## NewEtcdConfig()
## NewConfig()
Designed to be used in applications that share the same etcd config
and wish to reuse the same config throughout the application.

Expand All @@ -66,7 +75,7 @@ import (

func main() {
// These environment variables provided by the environment,
// we set them here to only to illustrate how `NewEtcdConfig()`
// we set them here to only to illustrate how `NewConfig()`
// uses the environment to create a new etcd config
os.Setenv("ETCD3_USER", "root")
os.Setenv("ETCD3_PASSWORD", "rootpw")
Expand All @@ -81,20 +90,16 @@ func main() {
os.Setenv("ETCD3_SKIP_VERIFY", "true")

// Create a new etc config from available environment variables
cfg, err := etcdutil.NewEtcdConfig(nil)
cfg, err := etcdutil.NewConfig(nil)
if err != nil {
fmt.Fprintf(os.Stderr, "while creating etcd config: %s\n", err)
return
}

// Use cfg to init scroll
// Use cfg to init eventbus
// Use cfg to init leader election
}
```

## NewSecureClient()
Just like `NewEtcdConfig()` but returns a connected etcd client for use by the
## NewClient()
Just like `NewConfig()` but returns a connected etcd client for use by the
rest of the application.

```go
Expand All @@ -107,7 +112,7 @@ import (

func main() {
// Create a new etc client from available environment variables
client, err := etcdutil.NewSecureClient(nil)
client, err := etcdutil.NewClient(nil)
if err != nil {
fmt.Fprintf(os.Stderr, "while creating etcd client: %s\n", err)
return
Expand Down
2 changes: 1 addition & 1 deletion etcdutil/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewClient(cfg *etcd.Config) (*etcd.Client, error) {

etcdClt, err := etcd.New(*cfg)
if err != nil {
return nil, errors.Wrap(err, "failed to create secure etcd client")
return nil, errors.Wrap(err, "failed to create etcd client")
}
return etcdClt, nil
}
Expand Down
24 changes: 17 additions & 7 deletions etcdutil/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"os"
"path"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -83,8 +84,9 @@ func NewElection(election, candidate string, client *etcd.Client) (*Election, er
return e, nil
}

func (e *Election) Start() error {
var err error
func (e *Election) Start() (err error) {
var once sync.Once
var completed sync.WaitGroup

e.session, err = concurrency.NewSession(e.client, concurrency.WithTTL(e.TTL))
if err != nil {
Expand All @@ -94,35 +96,43 @@ func (e *Election) Start() error {
// Start a new election
e.election = concurrency.NewElection(e.session, e.Election)

completed.Add(1)
e.wg.Until(func(done chan struct{}) bool {
log.Debugf("attempting to become leader '%s'\n", e.Candidate)

// Start a new campaign and attempt to become leader
if err := e.election.Campaign(e.ctx, e.Candidate); err != nil {
errors.Wrap(err, "while starting a new campaign")
if err = e.election.Campaign(e.ctx, e.Candidate); err != nil {
err = errors.Wrap(err, "while starting a new campaign")
completed.Done()
return false
}

observeChan := e.election.Observe(e.ctx)
for {
select {
case node, ok := <-observeChan:
if !ok {
once.Do(completed.Done)
return false
}
if string(node.Kvs[0].Value) == e.Candidate {
log.Debug("IS Leader")
atomic.StoreInt32(&e.isLeader, 1)
} else {
// We are not leader
logrus.Debug("NOT Leader")
log.Debug("NOT Leader")
atomic.StoreInt32(&e.isLeader, 0)
}
once.Do(completed.Done)
case <-done:
return false
}
}
})
return nil

// Wait until the first election has completed
completed.Wait()
return err
}

func (e *Election) Stop() {
Expand All @@ -139,7 +149,7 @@ func (e *Election) IsLeader() bool {
func (e *Election) Concede() bool {
if atomic.LoadInt32(&e.isLeader) == 1 {
if err := e.election.Resign(e.ctx); err != nil {
logrus.WithField("err", err).
log.WithField("err", err).
Error("while attempting to concede the election")
}
atomic.StoreInt32(&e.isLeader, 0)
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.1.1
2.2.1

0 comments on commit 85b3be5

Please sign in to comment.