Skip to content

Commit

Permalink
Merge pull request #26 from thrawn01/etcdv3
Browse files Browse the repository at this point in the history
Add functions to setup etcd TLS clients
  • Loading branch information
thrawn01 committed Jul 31, 2018
2 parents 3df49c4 + 6ebd41b commit 28b1958
Show file tree
Hide file tree
Showing 5 changed files with 378 additions and 1 deletion.
54 changes: 54 additions & 0 deletions cmd/election/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package main

import (
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/mailgun/holster/etcdutil"
"github.com/sirupsen/logrus"
)

func main() {
logrus.SetLevel(logrus.DebugLevel)

if len(os.Args) < 2 {
fmt.Println("a candidate name is required")
os.Exit(1)
}

e, err := etcdutil.NewElection("cli-election", os.Args[1], nil)
if err != nil {
fmt.Printf("while creating a new election: %s\n", err)
os.Exit(1)
}

e.Start()
if err != nil {
fmt.Printf("during election start: %s\n", err)
os.Exit(1)
}

c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT)
go func() {
for {
select {
case sig := <-c:
switch sig {
case syscall.SIGINT:
fmt.Printf("[%s] Concede and exit\n", os.Args[1])
e.Stop()
os.Exit(1)
}
}
}
}()

for {
fmt.Printf("[%s] Leader: %t\n", os.Args[1], e.IsLeader())
time.Sleep(time.Second)
}
}
46 changes: 46 additions & 0 deletions etcdutil/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
## ETCD Leader Election
Use etcd for leader election if you have several instances of a service running in production
and you only want one of the service instances to preform a task.

`LeaderElection` starts a goroutine which performs an election and maintains a leader
while services join and leave the election. Calling `Stop()` will `Concede()` leadership if
we currently have it.

```go

import (
"github.com/mailgun/holster"
"github.com/mailgun/holster/election"
)

var wg holster.WaitGroup

// Start the goroutine and preform the election
leader, _ := election.NewElection("my-service", "", nil)

// Handle graceful shutdown
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, os.Kill)

// Do periodic thing
tick := time.NewTicker(time.Second * 2)
wg.Loop(func() bool {
select {
case <-tick.C:
// Are we currently leader?
if leader.IsLeader() {
err := DoThing()
if err != nil {
// Have another instance DoThing(), we can't for some reason
leader.Concede()
}
}
return true
case <-signalChan:
leader.Stop()
return false
}
})
wg.Wait()
```

109 changes: 109 additions & 0 deletions etcdutil/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package etcdutil

import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"os"

etcd "github.com/coreos/etcd/clientv3"
"github.com/mailgun/holster"
"github.com/pkg/errors"
"google.golang.org/grpc/grpclog"
)

const (
pathToCA = "/etc/mailgun/ssl/localhost/ca.pem"
pathToKey = "/etc/mailgun/ssl/localhost/etcd-key.pem"
pathToCert = "/etc/mailgun/ssl/localhost/etcd-cert.pem"
localSecureEndpoint = "https://127.0.0.1:23790"
localInsecureEndpoint = "http://127.0.0.1:23790"
)

func NewSecureClient(cfg *etcd.Config) (*etcd.Client, error) {
var err error
if cfg, err = NewEtcdConfig(cfg); err != nil {
}

etcdClt, err := etcd.New(*cfg)
if err != nil {
return nil, errors.Wrap(err, "failed to create secure etcd client")
}
return etcdClt, nil
}

// Create a new etcd.Config using environment variables. If an existing
// config is passed, will fill in missing configuration using environment
// variables or defaults if they exists on the local system.

// If no environment variables are set, will return a config set to
// connect without TLS via http://localhost:23790
func NewEtcdConfig(cfg *etcd.Config) (*etcd.Config, error) {
var envEndpoint, tlsCertFile, tlsKeyFile, tlsCaFile string

// Create a config if none exists and get user/pass
holster.SetDefault(&cfg, &etcd.Config{})
holster.SetDefault(&cfg.Username, os.Getenv("ETCD3_USER"))
holster.SetDefault(&cfg.Password, os.Getenv("ETCD3_PASSWORD"))

// Don't set default file locations for these if they don't exist on disk
// as dev or testing environments might not have certificates
holster.SetDefault(&tlsCertFile, os.Getenv("ETCD3_TLS_CERT"), ifExists(pathToCert))
holster.SetDefault(&tlsKeyFile, os.Getenv("ETCD3_TLS_KEY"), ifExists(pathToKey))
holster.SetDefault(&tlsCaFile, os.Getenv("ETCD3_CA"), ifExists(pathToCA))

if os.Getenv("ETCD3_DEBUG") != "" {
etcd.SetLogger(grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 4))
}

// If the CA file was provided
if tlsCaFile != "" {
holster.SetDefault(&cfg.TLS, &tls.Config{})

var certPool *x509.CertPool = nil
if pemBytes, err := ioutil.ReadFile(tlsCaFile); err == nil {
certPool = x509.NewCertPool()
certPool.AppendCertsFromPEM(pemBytes)
} else {
return nil, errors.Errorf("while loading cert CA file '%s': %s", tlsCaFile, err)
}
holster.SetDefault(&cfg.TLS.RootCAs, certPool)
cfg.TLS.InsecureSkipVerify = false
}

// If the cert and key files are provided attempt to load them
if tlsCertFile != "" && tlsKeyFile != "" {
holster.SetDefault(&cfg.TLS, &tls.Config{})
tlsCert, err := tls.LoadX509KeyPair(tlsCertFile, tlsKeyFile)
if err != nil {
return nil, errors.Errorf("while loading cert '%s' and key file '%s': %s",
tlsCertFile, tlsKeyFile, err)
}
holster.SetDefault(&cfg.TLS.Certificates, []tls.Certificate{tlsCert})
}

holster.SetDefault(&envEndpoint, os.Getenv("ETCD3_ENDPOINT"), secureOrInsecure(cfg.TLS))
holster.SetDefault(&cfg.Endpoints, []string{envEndpoint})

// Override here if user REALLY wants this
if cfg.TLS != nil && os.Getenv("ETCD3_SKIP_VERIFY") != "" {
cfg.TLS.InsecureSkipVerify = true
}

return cfg, nil
}

// If the file exists, return the path provided
func ifExists(file string) string {
if _, err := os.Stat(file); err == nil {
return file
}
return ""
}

func secureOrInsecure(tlsConfig *tls.Config) string {
if tlsConfig == nil {
return localInsecureEndpoint
}
return localSecureEndpoint
}
168 changes: 168 additions & 0 deletions etcdutil/election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package etcdutil

import (
"context"
"os"
"path"
"sync/atomic"
"time"

etcd "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/mailgun/holster"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

var log *logrus.Entry

type LeaderElector interface {
IsLeader() bool
Concede() bool
Start() error
Stop()
}

type Election struct {
// The name of the election (IE: scout, blackbird, etc...)
Election string
// The name of this instance (IE: worker-n01, worker-n02, etc...)
Candidate string
// Seconds to wait before giving up the election if leader disconnected
TTL int

etcdConfig *etcd.Config
session *concurrency.Session
election *concurrency.Election
client *etcd.Client
cancel context.CancelFunc
wg holster.WaitGroup
ctx context.Context
isLeader int32
}

// Use leader election if you have several instances of a service running in production
// and you only want one of the service instances to preform a periodic task.
//
// election, _ := etcdv3.NewElection("election-name", "", nil)
//
// // Start the leader election and attempt to become leader
// election.Start()
//
// // Returns true if we are leader (thread safe)
// if election.IsLeader() {
// // Do periodic thing
// }
func NewElection(election, candidate string, etcdConfig *etcd.Config) (*Election, error) {
log = logrus.WithField("category", "election")
ctx, cancelFunc := context.WithCancel(context.Background())
e := &Election{
Candidate: candidate,
Election: election,
TTL: 5,
etcdConfig: etcdConfig,
cancel: cancelFunc,
ctx: ctx,
}

if host, err := os.Hostname(); err == nil {
holster.SetDefault(&e.Candidate, host)
}

// Set a prefix key for elections
e.Election = path.Join("/elections", e.Election)

var err error
e.etcdConfig, err = NewEtcdConfig(etcdConfig)
if err != nil {
return nil, err
}

e.client, err = etcd.New(*e.etcdConfig)
if err != nil {
return nil, err
}

// Test the connection
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

_, err = e.client.Get(ctx, e.Election)
if err != nil {
return nil, errors.Wrap(err, "while connecting to etcd")
}

return e, nil
}

func (e *Election) Start() error {
var err error

e.session, err = concurrency.NewSession(e.client, concurrency.WithTTL(e.TTL))
if err != nil {
return errors.Wrap(err, "while creating new session")
}

// Start a new election
e.election = concurrency.NewElection(e.session, e.Election)

e.wg.Until(func(done chan struct{}) bool {
log.Debugf("attempting to become leader '%s'\n", e.Candidate)

// Start a new campaign and attempt to become leader
if err := e.election.Campaign(e.ctx, e.Candidate); err != nil {
errors.Wrap(err, "while starting a new campaign")
}

observeChan := e.election.Observe(e.ctx)
for {
select {
case node, ok := <-observeChan:
if !ok {
return false
}
if string(node.Kvs[0].Value) == e.Candidate {
log.Debug("IS Leader")
atomic.StoreInt32(&e.isLeader, 1)
} else {
// We are not leader
logrus.Debug("NOT Leader")
atomic.StoreInt32(&e.isLeader, 0)
}
case <-done:
return false
}
}
})
return nil
}

func (e *Election) Stop() {
e.Concede()
e.cancel()
e.wg.Wait()
}

func (e *Election) IsLeader() bool {
return atomic.LoadInt32(&e.isLeader) == 1
}

// Release leadership and return true if we own it, else do nothing and return false
func (e *Election) Concede() bool {
if atomic.LoadInt32(&e.isLeader) == 1 {
if err := e.election.Resign(e.ctx); err != nil {
logrus.WithField("err", err).
Error("while attempting to concede the election")
}
atomic.StoreInt32(&e.isLeader, 0)
return true
}
return false
}

type LeaderElectionMock struct{}

func (s *LeaderElectionMock) IsLeader() bool { return true }
func (s *LeaderElectionMock) Concede() bool { return true }
func (s *LeaderElectionMock) Start() {}
func (s *LeaderElectionMock) Stop() {}
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.7.1
1.7.4

0 comments on commit 28b1958

Please sign in to comment.