From b169ee949619e177c6fed8c965d4304c14ef955a Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Fri, 23 Aug 2019 11:12:14 -0500 Subject: [PATCH] Election.Concede() now resubmits the candidate for election --- etcdutil/election.go | 90 ++++++++++++++++++++++----------------- etcdutil/election_test.go | 4 ++ etcdutil/session.go | 47 +++++++++++--------- version | 2 +- 4 files changed, 81 insertions(+), 62 deletions(-) diff --git a/etcdutil/election.go b/etcdutil/election.go index 088f321b..91556753 100644 --- a/etcdutil/election.go +++ b/etcdutil/election.go @@ -13,8 +13,11 @@ import ( "github.com/coreos/etcd/mvcc/mvccpb" "github.com/mailgun/holster" "github.com/pkg/errors" + "github.com/sirupsen/logrus" ) +var log *logrus.Entry + type LeaderElector interface { IsLeader() bool Concede() (bool, error) @@ -98,6 +101,8 @@ func NewElection(ctx context.Context, client *etcd.Client, conf ElectionConfig) return nil, errors.New("ElectionConfig.Election can not be empty") } + log = logrus.WithField("category", "election") + // Default to short 5 second leadership TTL holster.SetDefault(&conf.TTL, int64(5)) conf.Election = path.Join("/elections", conf.Election) @@ -115,14 +120,6 @@ func NewElection(ctx context.Context, client *etcd.Client, conf ElectionConfig) conf: conf, } - // Create a new Session - var err error - if e.session, err = NewSession(e.client, SessionConfig{ - Observer: e.onSessionChange, - TTL: e.conf.TTL, - }); err != nil { - return nil, err - } e.ctx, e.cancel = context.WithCancel(context.Background()) // If an observer was provided @@ -130,6 +127,7 @@ func NewElection(ctx context.Context, client *etcd.Client, conf ElectionConfig) e.observers["conf"] = conf.EventObserver } + var err error ready := make(chan struct{}) // Register ourselves as an observer for the initial election, then remove before returning e.observers["init"] = func(event Event) { @@ -141,6 +139,14 @@ func NewElection(ctx context.Context, client *etcd.Client, conf ElectionConfig) close(ready) } + // Create a new Session + if e.session, err = NewSession(e.client, SessionConfig{ + Observer: e.onSessionChange, + TTL: e.conf.TTL, + }); err != nil { + return nil, err + } + // Wait for results of leader election select { case <-ready: @@ -151,7 +157,7 @@ func NewElection(ctx context.Context, client *etcd.Client, conf ElectionConfig) } func (e *Election) onSessionChange(leaseID etcd.LeaseID, err error) { - //logrus.Debugf("Lease ID: %v running: %t err: %v", leaseID, e.isRunning, err) + //log.Debugf("SessionChange: Lease ID: %v running: %t err: %v", leaseID, e.isRunning, err) // If we lost our lease, concede the campaign and stop if leaseID == NoLease { @@ -161,6 +167,7 @@ func (e *Election) onSessionChange(leaseID etcd.LeaseID, err error) { } e.wg.Stop() e.isRunning = false + atomic.StoreInt32(&e.isLeader, 0) if err != nil { e.onErr(err, "lease error") } @@ -168,7 +175,6 @@ func (e *Election) onSessionChange(leaseID etcd.LeaseID, err error) { } if e.isRunning { - //logrus.Debugf("already running '%v", leaseID) return } @@ -178,7 +184,6 @@ func (e *Election) onSessionChange(leaseID etcd.LeaseID, err error) { var err error var rev int64 - //logrus.Debug("registering") rev, err = e.registerCampaign(leaseID) if err != nil { e.onErr(err, "during campaign registration") @@ -186,12 +191,11 @@ func (e *Election) onSessionChange(leaseID etcd.LeaseID, err error) { case <-time.After(e.backOff.Next()): return true case <-done: + e.isRunning = false return false } } - e.backOff.Reset() - //logrus.Debugf("watching rev %v", rev) if err := e.watchCampaign(rev); err != nil { e.onErr(err, "during campaign watch") select { @@ -208,13 +212,14 @@ func (e *Election) onSessionChange(leaseID etcd.LeaseID, err error) { e.onErr(err, "") } cancel() + return true } + e.backOff.Reset() return false }) } func (e *Election) withDrawCampaign(ctx context.Context) error { - //logrus.Debugf("withDrawCampaign(%s)", e.key) defer func() { atomic.StoreInt32(&e.isLeader, 0) }() @@ -260,6 +265,9 @@ func (e *Election) getLeader(ctx context.Context) (*mvccpb.KeyValue, error) { if err != nil { return nil, err } + if len(resp.Kvs) == 0 { + return nil, nil + } return resp.Kvs[0], nil } @@ -274,15 +282,15 @@ func (e *Election) watchCampaign(rev int64) error { if err != nil { return errors.Wrap(err, "while querying for current leader") } - - //logrus.Debugf("Current Leader %v", string(leaderKV.Key)) + if leaderKV == nil { + return errors.Wrap(err, "found no leader when watch began") + } watcher := etcd.NewWatcher(e.client) // We do this because watcher does not reliably return when errors occur on connect // or when cancelled (See https://github.com/etcd-io/etcd/pull/10020) go func() { - //logrus.Debugf("watching prefix: %s", e.conf.Election) watchChan = watcher.Watch(etcd.WithRequireLeader(e.ctx), e.conf.Election, etcd.WithRev(int64(rev+1)), etcd.WithPrefix()) close(ready) @@ -298,7 +306,6 @@ func (e *Election) watchCampaign(rev int64) error { e.onLeaderChange(leaderKV) e.wg.Until(func(done chan struct{}) bool { - //logrus.Debug("Watching...") select { case resp := <-watchChan: if resp.Canceled { @@ -310,23 +317,23 @@ func (e *Election) watchCampaign(rev int64) error { return false } - // Look for changes in leadership + // Watch for changes in leadership for _, event := range resp.Events { if event.Type == etcd.EventTypeDelete || event.Type == etcd.EventTypePut { - // Skip events that are about us - if string(event.Kv.Key) == e.key { - continue - } - // If the key is for our current leader if bytes.Compare(event.Kv.Key, leaderKV.Key) == 0 { - //logrus.Debug("Leader Changed") // Check our leadership status resp, err := e.getLeader(e.ctx) if err != nil { e.onFatalErr(err, "while querying for new leader") return false } + + // If we have no leader + if resp == nil { + e.onFatalErr(err, "After etcd event no leader was found, restarting election") + return false + } // Notify if leadership has changed if bytes.Compare(resp.Key, leaderKV.Key) != 0 { leaderKV = resp @@ -336,7 +343,6 @@ func (e *Election) watchCampaign(rev int64) error { } } case <-done: - //logrus.Debug("done") watcher.Close() // If withdraw takes longer than our TTL then lease is expired // and we are no longer leader anyway. @@ -346,7 +352,7 @@ func (e *Election) watchCampaign(rev int64) error { if err := e.withDrawCampaign(ctx); err != nil { e.onErr(err, "") } - e.onLeaderChange(nil) + e.onLeaderChange(&mvccpb.KeyValue{}) cancel() return false } @@ -356,7 +362,6 @@ func (e *Election) watchCampaign(rev int64) error { } func (e *Election) onLeaderChange(kv *mvccpb.KeyValue) { - //logrus.Debug("onLeaderChange()") event := Event{} if kv != nil { @@ -393,14 +398,16 @@ func (e *Election) onErr(err error, msg string) { // onFatalErr reports errors to the observer and resets the election and session func (e *Election) onFatalErr(err error, msg string) { e.onErr(err, msg) - // Cancel any campaigns and reset the session - e.session.Reset(e.ctx) + // We call this in a go routine to avoid blocking on `Stop()` calls + go e.session.Reset() } // Close cancels the election and concedes the election if we are leader func (e *Election) Close() { e.session.Close() e.wg.Wait() + // Emit the `Done:true` event + e.onLeaderChange(nil) } // IsLeader returns true if we are leader @@ -413,19 +420,22 @@ func (e *Election) IsLeader() bool { // and cancel the campaign call Close() instead. func (e *Election) Concede() (bool, error) { isLeader := atomic.LoadInt32(&e.isLeader) - // If resign takes longer than our TTL then lease is expired and we are no longer leader anyway. - ctx, cancel := context.WithTimeout(e.ctx, time.Duration(e.conf.TTL)*time.Second) + if isLeader == 0 { + return false, nil + } + oldCampaignKey := e.key + e.session.Reset() - defer func() { - cancel() - // Even if the delete fails we should consider ourselves no longer leader - atomic.StoreInt32(&e.isLeader, 0) - }() + // Ensure there are no lingering candiates + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.conf.TTL)*time.Second) + cancel() - if _, err := e.client.Delete(ctx, e.key); err != nil { - return isLeader == 1, err + _, err := e.client.Delete(ctx, oldCampaignKey) + if err != nil { + return true, errors.Wrapf(err, "while cleaning up campaign '%s'", oldCampaignKey) } - return isLeader == 1, nil + + return true, nil } type AlwaysLeaderMock struct{} diff --git a/etcdutil/election_test.go b/etcdutil/election_test.go index 2e234666..e4359451 100644 --- a/etcdutil/election_test.go +++ b/etcdutil/election_test.go @@ -76,6 +76,10 @@ func TestTwoCampaigns(t *testing.T) { assert.Equal(t, false, e.IsDone) c2.Close() + e = <-c2Chan + assert.Equal(t, false, e.IsLeader) + assert.Equal(t, false, e.IsDone) + e = <-c2Chan assert.Equal(t, false, e.IsLeader) assert.Equal(t, true, e.IsDone) diff --git a/etcdutil/session.go b/etcdutil/session.go index f31fdecf..c1895319 100644 --- a/etcdutil/session.go +++ b/etcdutil/session.go @@ -2,7 +2,7 @@ package etcdutil import ( "context" - "sync" + "sync/atomic" "time" etcd "github.com/coreos/etcd/clientv3" @@ -24,8 +24,8 @@ type Session struct { conf SessionConfig client *etcd.Client timeout time.Duration - once *sync.Once lastKeepAlive time.Time + isRunning int32 } type SessionConfig struct { @@ -52,7 +52,6 @@ func NewSession(c *etcd.Client, conf SessionConfig) (*Session, error) { s := Session{ timeout: time.Second * time.Duration(conf.TTL), backOff: holster.NewBackOff(time.Millisecond*500, time.Duration(conf.TTL)*time.Second, 2), - once: &sync.Once{}, conf: conf, client: c, } @@ -65,6 +64,7 @@ func (s *Session) run() { s.ctx, s.cancel = context.WithCancel(context.Background()) ticker := time.NewTicker(s.timeout) s.lastKeepAlive = time.Now() + atomic.StoreInt32(&s.isRunning, 1) s.wg.Until(func(done chan struct{}) bool { // If we have lost our keep alive, attempt to regain it @@ -75,9 +75,11 @@ func (s *Session) run() { case <-time.After(s.backOff.Next()): return true case <-s.ctx.Done(): + atomic.StoreInt32(&s.isRunning, 0) return false } - return true + // TODO: Fix this in the library. Unreachable code + // return true } } s.backOff.Reset() @@ -85,19 +87,20 @@ func (s *Session) run() { select { case _, ok := <-s.keepAlive: if !ok { - //logrus.Warn("heartbeat lost") + //log.Warn("heartbeat lost") s.keepAlive = nil } else { - //logrus.Debug("heartbeat received") + //log.Debug("heartbeat received") s.lastKeepAlive = time.Now() } case <-ticker.C: // Ensure we are getting heartbeats regularly if time.Now().Sub(s.lastKeepAlive) > s.timeout { - //logrus.Warn("too long between heartbeats") + //log.Warn("too long between heartbeats") s.keepAlive = nil } case <-done: + s.keepAlive = nil if s.lease != nil { ctx, cancel := context.WithTimeout(context.Background(), s.timeout) if _, err := s.client.Revoke(ctx, s.lease.ID); err != nil { @@ -105,6 +108,7 @@ func (s *Session) run() { } cancel() } + atomic.StoreInt32(&s.isRunning, 0) return false } @@ -115,9 +119,11 @@ func (s *Session) run() { }) } -func (s *Session) Reset(ctx context.Context) { +func (s *Session) Reset() { + if atomic.LoadInt32(&s.isRunning) != 1 { + return + } s.Close() - s.once = &sync.Once{} s.run() } @@ -125,27 +131,26 @@ func (s *Session) Reset(ctx context.Context) { // then SessionConfig.Observer is called with -1 (NoLease), only returns // once the session has closed successfully. func (s *Session) Close() { - s.once.Do(func() { - if s.cancel != nil { - s.cancel() - } - s.wg.Stop() - s.conf.Observer(NoLease, nil) - }) + if atomic.LoadInt32(&s.isRunning) != 1 { + return + } + + s.cancel() + s.wg.Stop() + s.conf.Observer(NoLease, nil) } func (s *Session) gainLease(ctx context.Context) error { - //logrus.Debug("attempting to grant new lease") - lease, err := s.client.Grant(ctx, s.conf.TTL) + var err error + s.lease, err = s.client.Grant(ctx, s.conf.TTL) if err != nil { return errors.Wrapf(err, "during grant lease") } - s.keepAlive, err = s.client.KeepAlive(s.ctx, lease.ID) + s.keepAlive, err = s.client.KeepAlive(s.ctx, s.lease.ID) if err != nil { return err } - //logrus.Debugf("new lease %d", lease.ID) - s.conf.Observer(lease.ID, nil) + s.conf.Observer(s.lease.ID, nil) return nil } diff --git a/version b/version index b5021469..fd2a0186 100644 --- a/version +++ b/version @@ -1 +1 @@ -3.0.2 +3.1.0