Skip to content

Commit

Permalink
Merge pull request #48 from mailgun/thrawn/develop
Browse files Browse the repository at this point in the history
PIP-607: Election.Concede() now resubmits the candidate for election
  • Loading branch information
thrawn01 authored Aug 28, 2019
2 parents 57ffcf4 + b169ee9 commit 06d3391
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 62 deletions.
90 changes: 50 additions & 40 deletions etcdutil/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/mailgun/holster"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

var log *logrus.Entry

type LeaderElector interface {
IsLeader() bool
Concede() (bool, error)
Expand Down Expand Up @@ -98,6 +101,8 @@ func NewElection(ctx context.Context, client *etcd.Client, conf ElectionConfig)
return nil, errors.New("ElectionConfig.Election can not be empty")
}

log = logrus.WithField("category", "election")

// Default to short 5 second leadership TTL
holster.SetDefault(&conf.TTL, int64(5))
conf.Election = path.Join("/elections", conf.Election)
Expand All @@ -115,21 +120,14 @@ func NewElection(ctx context.Context, client *etcd.Client, conf ElectionConfig)
conf: conf,
}

// Create a new Session
var err error
if e.session, err = NewSession(e.client, SessionConfig{
Observer: e.onSessionChange,
TTL: e.conf.TTL,
}); err != nil {
return nil, err
}
e.ctx, e.cancel = context.WithCancel(context.Background())

// If an observer was provided
if conf.EventObserver != nil {
e.observers["conf"] = conf.EventObserver
}

var err error
ready := make(chan struct{})
// Register ourselves as an observer for the initial election, then remove before returning
e.observers["init"] = func(event Event) {
Expand All @@ -141,6 +139,14 @@ func NewElection(ctx context.Context, client *etcd.Client, conf ElectionConfig)
close(ready)
}

// Create a new Session
if e.session, err = NewSession(e.client, SessionConfig{
Observer: e.onSessionChange,
TTL: e.conf.TTL,
}); err != nil {
return nil, err
}

// Wait for results of leader election
select {
case <-ready:
Expand All @@ -151,7 +157,7 @@ func NewElection(ctx context.Context, client *etcd.Client, conf ElectionConfig)
}

func (e *Election) onSessionChange(leaseID etcd.LeaseID, err error) {
//logrus.Debugf("Lease ID: %v running: %t err: %v", leaseID, e.isRunning, err)
//log.Debugf("SessionChange: Lease ID: %v running: %t err: %v", leaseID, e.isRunning, err)

// If we lost our lease, concede the campaign and stop
if leaseID == NoLease {
Expand All @@ -161,14 +167,14 @@ func (e *Election) onSessionChange(leaseID etcd.LeaseID, err error) {
}
e.wg.Stop()
e.isRunning = false
atomic.StoreInt32(&e.isLeader, 0)
if err != nil {
e.onErr(err, "lease error")
}
return
}

if e.isRunning {
//logrus.Debugf("already running '%v", leaseID)
return
}

Expand All @@ -178,20 +184,18 @@ func (e *Election) onSessionChange(leaseID etcd.LeaseID, err error) {
var err error
var rev int64

//logrus.Debug("registering")
rev, err = e.registerCampaign(leaseID)
if err != nil {
e.onErr(err, "during campaign registration")
select {
case <-time.After(e.backOff.Next()):
return true
case <-done:
e.isRunning = false
return false
}
}
e.backOff.Reset()

//logrus.Debugf("watching rev %v", rev)
if err := e.watchCampaign(rev); err != nil {
e.onErr(err, "during campaign watch")
select {
Expand All @@ -208,13 +212,14 @@ func (e *Election) onSessionChange(leaseID etcd.LeaseID, err error) {
e.onErr(err, "")
}
cancel()
return true
}
e.backOff.Reset()
return false
})
}

func (e *Election) withDrawCampaign(ctx context.Context) error {
//logrus.Debugf("withDrawCampaign(%s)", e.key)
defer func() {
atomic.StoreInt32(&e.isLeader, 0)
}()
Expand Down Expand Up @@ -260,6 +265,9 @@ func (e *Election) getLeader(ctx context.Context) (*mvccpb.KeyValue, error) {
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
return nil, nil
}
return resp.Kvs[0], nil
}

Expand All @@ -274,15 +282,15 @@ func (e *Election) watchCampaign(rev int64) error {
if err != nil {
return errors.Wrap(err, "while querying for current leader")
}

//logrus.Debugf("Current Leader %v", string(leaderKV.Key))
if leaderKV == nil {
return errors.Wrap(err, "found no leader when watch began")
}

watcher := etcd.NewWatcher(e.client)

// We do this because watcher does not reliably return when errors occur on connect
// or when cancelled (See https://github.com/etcd-io/etcd/pull/10020)
go func() {
//logrus.Debugf("watching prefix: %s", e.conf.Election)
watchChan = watcher.Watch(etcd.WithRequireLeader(e.ctx), e.conf.Election,
etcd.WithRev(int64(rev+1)), etcd.WithPrefix())
close(ready)
Expand All @@ -298,7 +306,6 @@ func (e *Election) watchCampaign(rev int64) error {
e.onLeaderChange(leaderKV)

e.wg.Until(func(done chan struct{}) bool {
//logrus.Debug("Watching...")
select {
case resp := <-watchChan:
if resp.Canceled {
Expand All @@ -310,23 +317,23 @@ func (e *Election) watchCampaign(rev int64) error {
return false
}

// Look for changes in leadership
// Watch for changes in leadership
for _, event := range resp.Events {
if event.Type == etcd.EventTypeDelete || event.Type == etcd.EventTypePut {
// Skip events that are about us
if string(event.Kv.Key) == e.key {
continue
}

// If the key is for our current leader
if bytes.Compare(event.Kv.Key, leaderKV.Key) == 0 {
//logrus.Debug("Leader Changed")
// Check our leadership status
resp, err := e.getLeader(e.ctx)
if err != nil {
e.onFatalErr(err, "while querying for new leader")
return false
}

// If we have no leader
if resp == nil {
e.onFatalErr(err, "After etcd event no leader was found, restarting election")
return false
}
// Notify if leadership has changed
if bytes.Compare(resp.Key, leaderKV.Key) != 0 {
leaderKV = resp
Expand All @@ -336,7 +343,6 @@ func (e *Election) watchCampaign(rev int64) error {
}
}
case <-done:
//logrus.Debug("done")
watcher.Close()
// If withdraw takes longer than our TTL then lease is expired
// and we are no longer leader anyway.
Expand All @@ -346,7 +352,7 @@ func (e *Election) watchCampaign(rev int64) error {
if err := e.withDrawCampaign(ctx); err != nil {
e.onErr(err, "")
}
e.onLeaderChange(nil)
e.onLeaderChange(&mvccpb.KeyValue{})
cancel()
return false
}
Expand All @@ -356,7 +362,6 @@ func (e *Election) watchCampaign(rev int64) error {
}

func (e *Election) onLeaderChange(kv *mvccpb.KeyValue) {
//logrus.Debug("onLeaderChange()")
event := Event{}

if kv != nil {
Expand Down Expand Up @@ -393,14 +398,16 @@ func (e *Election) onErr(err error, msg string) {
// onFatalErr reports errors to the observer and resets the election and session
func (e *Election) onFatalErr(err error, msg string) {
e.onErr(err, msg)
// Cancel any campaigns and reset the session
e.session.Reset(e.ctx)
// We call this in a go routine to avoid blocking on `Stop()` calls
go e.session.Reset()
}

// Close cancels the election and concedes the election if we are leader
func (e *Election) Close() {
e.session.Close()
e.wg.Wait()
// Emit the `Done:true` event
e.onLeaderChange(nil)
}

// IsLeader returns true if we are leader
Expand All @@ -413,19 +420,22 @@ func (e *Election) IsLeader() bool {
// and cancel the campaign call Close() instead.
func (e *Election) Concede() (bool, error) {
isLeader := atomic.LoadInt32(&e.isLeader)
// If resign takes longer than our TTL then lease is expired and we are no longer leader anyway.
ctx, cancel := context.WithTimeout(e.ctx, time.Duration(e.conf.TTL)*time.Second)
if isLeader == 0 {
return false, nil
}
oldCampaignKey := e.key
e.session.Reset()

defer func() {
cancel()
// Even if the delete fails we should consider ourselves no longer leader
atomic.StoreInt32(&e.isLeader, 0)
}()
// Ensure there are no lingering candiates
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.conf.TTL)*time.Second)
cancel()

if _, err := e.client.Delete(ctx, e.key); err != nil {
return isLeader == 1, err
_, err := e.client.Delete(ctx, oldCampaignKey)
if err != nil {
return true, errors.Wrapf(err, "while cleaning up campaign '%s'", oldCampaignKey)
}
return isLeader == 1, nil

return true, nil
}

type AlwaysLeaderMock struct{}
Expand Down
4 changes: 4 additions & 0 deletions etcdutil/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func TestTwoCampaigns(t *testing.T) {
assert.Equal(t, false, e.IsDone)

c2.Close()
e = <-c2Chan
assert.Equal(t, false, e.IsLeader)
assert.Equal(t, false, e.IsDone)

e = <-c2Chan
assert.Equal(t, false, e.IsLeader)
assert.Equal(t, true, e.IsDone)
Expand Down
Loading

0 comments on commit 06d3391

Please sign in to comment.