Skip to content

Commit

Permalink
Merge pull request #77 from mailgun/thrawn/develop
Browse files Browse the repository at this point in the history
PIP-1036: leader election
  • Loading branch information
thrawn01 authored Apr 15, 2021
2 parents b3925e2 + a150f59 commit 3a5a9b6
Show file tree
Hide file tree
Showing 31 changed files with 3,962 additions and 9 deletions.
Binary file removed .DS_Store
Binary file not shown.
38 changes: 38 additions & 0 deletions cancel/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package cancel

import (
"context"
"time"
)

type Context interface {
context.Context
Cancel()
}

type cancelCtx struct {
ctx context.Context
cancel context.CancelFunc
}

// Creates a context that wraps the given context and returns an obj that can be cancelled.
// This allows an object which desires to cancel a long running operation to store a single
// cancel.Context in it's struct variables instead of having to store both the context.Context
// and context.CancelFunc.
func New(ctx context.Context) Context {
if ctx == nil {
ctx = context.Background()
}

ctx, cancel := context.WithCancel(ctx)
return &cancelCtx{
cancel: cancel,
ctx: ctx,
}
}

func (c *cancelCtx) Cancel() { c.cancel() }
func (c *cancelCtx) Deadline() (deadline time.Time, ok bool) { return c.ctx.Deadline() }
func (c *cancelCtx) Done() <-chan struct{} { return c.ctx.Done() }
func (c *cancelCtx) Err() error { return c.ctx.Err() }
func (c *cancelCtx) Value(key interface{}) interface{} { return c.ctx.Value(key) }
147 changes: 147 additions & 0 deletions cmd/election/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package main

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/google/uuid"
"github.com/mailgun/holster/v3/discovery"
"github.com/mailgun/holster/v3/election"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

func sendRPC(ctx context.Context, peer string, req election.RPCRequest, resp *election.RPCResponse) error {
// Marshall the RPC request to json
b, err := json.Marshal(req)
if err != nil {
return errors.Wrap(err, "while encoding request")
}

// Create a new http request with context
hr, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s/rpc", peer), bytes.NewBuffer(b))
if err != nil {
return errors.Wrap(err, "while creating request")
}
hr.WithContext(ctx)

// Send the request
hp, err := http.DefaultClient.Do(hr)
if err != nil {
return errors.Wrap(err, "while sending http request")
}

// Decode the response from JSON
dec := json.NewDecoder(hp.Body)
if err := dec.Decode(&resp); err != nil {
return errors.Wrap(err, "while decoding response")
}
return nil
}

func newHandler(node election.Node) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
dec := json.NewDecoder(r.Body)
var req election.RPCRequest
if err := dec.Decode(&req); err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
}

// Example of how a peer might exclude RPC
// commands it doesn't want made.
if req.RPC == election.SetPeersRPC {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(fmt.Sprintf("RPC request '%s' not allowed", req.RPC)))
return
}

var resp election.RPCResponse
node.ReceiveRPC(req, &resp)

enc := json.NewEncoder(w)
if err := enc.Encode(resp); err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
}
}
}

func main() {
if len(os.Args) != 4 {
logrus.Fatal("usage: <election-address:8080> <memberlist-address:8180> <known-address:8180>")
}

electionAddr, memberListAddr, knownAddr := os.Args[1], os.Args[2], os.Args[3]
//logrus.SetLevel(logrus.DebugLevel)

node, err := election.NewNode(election.Config{
// A unique identifier used to identify us in a list of peers
UniqueID: electionAddr,
// Called whenever the library detects a change in leadership
OnUpdate: func(leader string) {
logrus.Printf("Current Leader: %s\n", leader)
},
// Called when the library wants to contact other peers
SendRPC: sendRPC,
})
if err != nil {
logrus.Fatal(err)
}

// Create a member list catalog
ml, err := discovery.NewMemberList(context.Background(), discovery.MemberListConfig{
BindAddress: memberListAddr,
Peer: discovery.Peer{
ID: uuid.New().String(),
Metadata: []byte(electionAddr),
},
KnownPeers: []string{knownAddr},
OnUpdate: func(peers []discovery.Peer) {
var result []string
for _, p := range peers {
result = append(result, string(p.Metadata))
}
logrus.Infof("Update Peers: %s", result)
node.SetPeers(context.Background(), result)
},
})
if err != nil {
logrus.Fatal(err)
}

mux := http.NewServeMux()
mux.HandleFunc("/rpc", newHandler(node))
go func() {
logrus.Fatal(http.ListenAndServe(electionAddr, mux))
}()

// Wait until the http server is up and can receive RPC requests
if err := election.WaitForConnect(electionAddr, 10, time.Millisecond*100); err != nil {
logrus.Fatal(err)
}

// Now that our http handler is listening for requests we
// can safely start the election.
node.Start(context.Background())

// Wait here for signals to clean up our mess
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
for range c {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
if err := ml.Close(ctx); err != nil {
logrus.WithError(err).Error("during member list catalog close")
}
cancel()
node.Stop(context.Background())
os.Exit(0)
}
}
4 changes: 2 additions & 2 deletions collections/lru_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (c *LRUCache) Peek(key interface{}) (value interface{}, ok bool) {
// Processes each item in the cache in a thread safe way, such that the cache can be in use
// while processing items in the cache. Processing the cache with `Each()` does not update
// the expiration or last used.
func (c LRUCache) Each(concurrent int, callBack func(key interface{}, value interface{}) error) []error {
func (c *LRUCache) Each(concurrent int, callBack func(key interface{}, value interface{}) error) []error {
fanOut := syncutil.NewFanOut(concurrent)
keys := c.Keys()

Expand Down Expand Up @@ -239,4 +239,4 @@ func (c *LRUCache) Map(mapping func(item *CacheItem) bool) {
c.removeElement(v)
}
}
}
}
2 changes: 1 addition & 1 deletion collections/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestUpdate(t *testing.T) {
assert.Equal(t, 1, toInt(mh.Peek().Value))
}

func Example_Priority_Queue_Usage() {
func ExampleNewPriorityQueue() {
queue := collections.NewPriorityQueue()

queue.Push(&collections.PQItem{
Expand Down
183 changes: 183 additions & 0 deletions consul/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package consul

import (
"bytes"
"io"
"log"
"os"

"github.com/hashicorp/go-hclog"
"github.com/sirupsen/logrus"
)

// HCLogAdapter implements the hclog interface, and wraps it
// around a Logrus entry
type HCLogAdapter struct {
log logrus.FieldLogger
name string
args []interface{} // key/value pairs if this logger was created via With()
}

func NewHCLogAdapter(log logrus.FieldLogger, name string) *HCLogAdapter {
return &HCLogAdapter{
log: log,
name: name,
}
}

// HCLog has one more level than we do. As such, we will never
// set trace level.
func (*HCLogAdapter) Trace(_ string, _ ...interface{}) {
}

func (a *HCLogAdapter) Debug(msg string, args ...interface{}) {
a.CreateEntry(args).Debug(msg)
}

func (a *HCLogAdapter) Info(msg string, args ...interface{}) {
a.CreateEntry(args).Info(msg)
}

func (a *HCLogAdapter) Warn(msg string, args ...interface{}) {
a.CreateEntry(args).Warn(msg)
}

func (a *HCLogAdapter) Error(msg string, args ...interface{}) {
a.CreateEntry(args).Error(msg)
}

func (a *HCLogAdapter) Log(level hclog.Level, msg string, args ...interface{}) {
switch level {
case hclog.Trace:
a.Trace(msg, args...)
case hclog.Debug:
a.Debug(msg, args...)
case hclog.Info:
a.Info(msg, args...)
case hclog.Warn:
a.Warn(msg, args...)
case hclog.Error:
a.Error(msg, args...)
}
}

func (a *HCLogAdapter) IsTrace() bool {
return false
}

func (a *HCLogAdapter) IsDebug() bool {
return a.shouldEmit(logrus.DebugLevel)
}

func (a *HCLogAdapter) IsInfo() bool {
return a.shouldEmit(logrus.InfoLevel)
}

func (a *HCLogAdapter) IsWarn() bool {
return a.shouldEmit(logrus.WarnLevel)
}

func (a *HCLogAdapter) IsError() bool {
return a.shouldEmit(logrus.ErrorLevel)
}

func (a *HCLogAdapter) SetLevel(hclog.Level) {
// interface definition says it is ok for this to be a noop if
// implementations don't need/want to support dynamic level changing, which
// we don't currently.
}

func (a *HCLogAdapter) With(args ...interface{}) hclog.Logger {
e := a.CreateEntry(args)
return &HCLogAdapter{
log: e,
args: concatFields(a.args, args),
}
}

// concatFields combines two sets of key/value pairs.
// It allocates a new slice to avoid using append() and
// accidentally overriding the original slice a, e.g.
// when logger.With() is called multiple times to create
// sub-scoped loggers.
func concatFields(a, b []interface{}) []interface{} {
c := make([]interface{}, len(a)+len(b))
copy(c, a)
copy(c[len(a):], b)
return c
}

// ImpliedArgs returns With key/value pairs
func (a *HCLogAdapter) ImpliedArgs() []interface{} {
return a.args
}

func (a *HCLogAdapter) Name() string {
return a.name
}

func (a *HCLogAdapter) Named(name string) hclog.Logger {
var newName bytes.Buffer
if a.name != "" {
newName.WriteString(a.name)
newName.WriteString(".")
}
newName.WriteString(name)

return a.ResetNamed(newName.String())
}

func (a *HCLogAdapter) ResetNamed(name string) hclog.Logger {
fields := []interface{}{"subsystem_name", name}
e := a.CreateEntry(fields)
return &HCLogAdapter{log: e, name: name}
}

// StandardLogger is meant to return a stdlib Logger type which wraps around
// hclog. It does this by providing an io.Writer and instantiating a new
// Logger. It then tries to interpret the log level by parsing the message.
//
// Since we are not using `hclog` in a generic way, and I cannot find any
// calls to this method from go-plugin, we will poorly support this method.
// Rather than pull in all of hclog writer parsing logic, pass it a Logrus
// writer, and hardcode the level to INFO.
//
// Apologies to those who find themselves here.
func (a *HCLogAdapter) StandardLogger(opts *hclog.StandardLoggerOptions) *log.Logger {
entry := a.log.WithFields(logrus.Fields{})
return log.New(entry.WriterLevel(logrus.InfoLevel), "", 0)
}

func (a *HCLogAdapter) StandardWriter(opts *hclog.StandardLoggerOptions) io.Writer {
var w io.Writer
logger, ok := a.log.(*logrus.Logger)
if ok {
w = logger.Out
}
if w == nil {
w = os.Stderr
}
return w
}

func (a *HCLogAdapter) shouldEmit(level logrus.Level) bool {
return a.log.WithFields(logrus.Fields{}).Level >= level
}

func (a *HCLogAdapter) CreateEntry(args []interface{}) *logrus.Entry {
if len(args)%2 != 0 {
args = append(args, "<unknown>")
}

fields := make(logrus.Fields)
for i := 0; i < len(args); i += 2 {
k, ok := args[i].(string)
if !ok {
continue
}
v := args[i+1]
fields[k] = v
}

return a.log.WithFields(fields)
}
Loading

0 comments on commit 3a5a9b6

Please sign in to comment.