Skip to content

Commit

Permalink
Merge pull request FeatureBaseDB#470 from travisturner/nodestate
Browse files Browse the repository at this point in the history
Implement NodeState as an attribute of *Node
  • Loading branch information
travisturner authored Apr 24, 2017
2 parents 6fe8e2b + 445023a commit 03aa944
Show file tree
Hide file tree
Showing 9 changed files with 318 additions and 99 deletions.
5 changes: 5 additions & 0 deletions broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ func (s *StaticNodeSet) Open() error {
return nil
}

func (s *StaticNodeSet) Join(nodes []*Node) error {
s.nodes = nodes
return nil
}

// Broadcaster is an interface for broadcasting messages.
type Broadcaster interface {
SendSync(pb proto.Message) error
Expand Down
47 changes: 40 additions & 7 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package pilosa
import (
"encoding/binary"
"hash/fnv"

"github.com/pilosa/pilosa/internal"
)

const (
Expand All @@ -12,15 +14,30 @@ const (
// DefaultReplicaN is the default number of replicas per partition.
DefaultReplicaN = 1

// HealthStatus is the return value of the /health endpoint for a node in the cluster.
HealthStatusUp = "UP"
HealthStatusDown = "DOWN"
// NodeState represents node state returned in /status endpoint for a node in the cluster.
NodeStateUp = "UP"
NodeStateDown = "DOWN"
)

// Node represents a node in the cluster.
type Node struct {
Host string `json:"host"`
InternalHost string `json:"internalHost"`

status *internal.NodeStatus `json:"status"`
}

// SetStatus sets the NodeStatus.
func (n *Node) SetStatus(s *internal.NodeStatus) {
n.status = s
}

// SetState sets the Node.status.state.
func (n *Node) SetState(s string) {
if n.status == nil {
n.status = &internal.NodeStatus{}
}
n.status.State = s
}

// Nodes represents a list of nodes.
Expand Down Expand Up @@ -120,21 +137,37 @@ func (c *Cluster) NodeSetHosts() []string {
return a
}

// Health returns a map of nodes in the cluster with each node's state (UP/DOWN) as the value.
func (c *Cluster) Health() map[string]string {
// NodeStates returns a map of nodes in the cluster with each node's state (UP/DOWN) as the value.
func (c *Cluster) NodeStates() map[string]string {
h := make(map[string]string)
for _, n := range c.Nodes {
h[n.Host] = HealthStatusDown
h[n.Host] = NodeStateDown
}
// we are assuming that NodeSetHosts is a subset of c.Nodes
for _, m := range c.NodeSetHosts() {
if _, ok := h[m]; ok {
h[m] = HealthStatusUp
h[m] = NodeStateUp
}
}
return h
}

// State returns the internal ClusterState representation.
func (c *Cluster) Status() *internal.ClusterStatus {
return &internal.ClusterStatus{
Nodes: encodeClusterStatus(c.Nodes),
}
}

// encodeClusterStatus converts a into its internal representation.
func encodeClusterStatus(a []*Node) []*internal.NodeStatus {
other := make([]*internal.NodeStatus, len(a))
for i := range a {
other[i] = a[i].status
}
return other
}

// NodeByHost returns a node reference by host.
func (c *Cluster) NodeByHost(host string) *Node {
for _, n := range c.Nodes {
Expand Down
12 changes: 6 additions & 6 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestCluster_NodeSetHosts(t *testing.T) {
}

// Ensure cluster can compare its Nodes and Members
func TestCluster_Health(t *testing.T) {
func TestCluster_NodeStates(t *testing.T) {
c := pilosa.Cluster{
Nodes: []*pilosa.Node{
{Host: "serverA:1000"},
Expand All @@ -109,12 +109,12 @@ func TestCluster_Health(t *testing.T) {
}

// Verify a DOWN node is reported, and extraneous nodes are ignored
if a := c.Health(); !reflect.DeepEqual(a, map[string]string{
"serverA:1000": pilosa.HealthStatusUp,
"serverB:1000": pilosa.HealthStatusDown,
"serverC:1000": pilosa.HealthStatusUp,
if a := c.NodeStates(); !reflect.DeepEqual(a, map[string]string{
"serverA:1000": pilosa.NodeStateUp,
"serverB:1000": pilosa.NodeStateDown,
"serverC:1000": pilosa.NodeStateUp,
}) {
t.Fatalf("unexpected health: %s", spew.Sdump(a))
t.Fatalf("unexpected node state: %s", spew.Sdump(a))
}
}

Expand Down
22 changes: 7 additions & 15 deletions gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,6 @@ import (
"github.com/pilosa/pilosa/internal"
)

// StateHandler specifies two methods which an object must implement to share
// state in the cluster. These are used by the GossipNodeSet to implement the
// LocalState and MergeRemoteState methods of memberlist.Delegate
type StateHandler interface {
LocalState() (proto.Message, error)
HandleRemoteState(proto.Message) error
}

// GossipNodeSet represents a gossip implementation of NodeSet using memberlist
// GossipNodeSet also represents a gossip implementation of pilosa.Broadcaster
// GossipNodeSet also represents an implementation of memberlist.Delegate
Expand All @@ -31,8 +23,8 @@ type GossipNodeSet struct {

broadcasts *memberlist.TransmitLimitedQueue

stateHandler StateHandler
config *GossipConfig
statusHandler pilosa.StatusHandler
config *GossipConfig

// The writer for any logging.
LogOutput io.Writer
Expand Down Expand Up @@ -89,7 +81,7 @@ type GossipConfig struct {
}

// NewGossipNodeSet returns a new instance of GossipNodeSet.
func NewGossipNodeSet(name string, gossipHost string, gossipPort int, gossipSeed string, sh StateHandler) *GossipNodeSet {
func NewGossipNodeSet(name string, gossipHost string, gossipPort int, gossipSeed string, sh pilosa.StatusHandler) *GossipNodeSet {
g := &GossipNodeSet{
LogOutput: os.Stderr,
}
Expand All @@ -106,7 +98,7 @@ func NewGossipNodeSet(name string, gossipHost string, gossipPort int, gossipSeed
g.config.memberlistConfig.AdvertisePort = gossipPort
g.config.memberlistConfig.Delegate = g

g.stateHandler = sh
g.statusHandler = sh

return g
}
Expand Down Expand Up @@ -176,7 +168,7 @@ func (g *GossipNodeSet) GetBroadcasts(overhead, limit int) [][]byte {
}

func (g *GossipNodeSet) LocalState(join bool) []byte {
pb, err := g.stateHandler.LocalState()
pb, err := g.statusHandler.LocalStatus()
if err != nil {
g.logger().Printf("error getting local state, err=%s", err)
return []byte{}
Expand All @@ -193,12 +185,12 @@ func (g *GossipNodeSet) LocalState(join bool) []byte {

func (g *GossipNodeSet) MergeRemoteState(buf []byte, join bool) {
// Unmarshal nodestate data.
var pb internal.NodeState
var pb internal.NodeStatus
if err := proto.Unmarshal(buf, &pb); err != nil {
g.logger().Printf("error unmarshalling nodestate data, err=%s", err)
return
}
err := g.stateHandler.HandleRemoteState(&pb)
err := g.statusHandler.HandleRemoteStatus(&pb)
if err != nil {
g.logger().Printf("merge state error: %s", err)
}
Expand Down
15 changes: 11 additions & 4 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (

// Handler represents an HTTP handler.
type Handler struct {
Holder *Holder
Broadcaster Broadcaster
Holder *Holder
Broadcaster Broadcaster
StatusHandler StatusHandler

// Local hostname & cluster configuration.
Host string
Expand Down Expand Up @@ -85,6 +86,7 @@ func NewRouter(handler *Handler) *mux.Router {
router.HandleFunc("/nodes", handler.handleGetNodes).Methods("GET")
router.HandleFunc("/schema", handler.handleGetSchema).Methods("GET")
router.HandleFunc("/slices/max", handler.handleGetSliceMax).Methods("GET")
router.HandleFunc("/status", handler.handleGetStatus).Methods("GET")
router.HandleFunc("/version", handler.handleGetVersion).Methods("GET")

// TODO: Apply MethodNotAllowed statuses to all endpoints.
Expand Down Expand Up @@ -116,8 +118,13 @@ func (h *Handler) handleGetSchema(w http.ResponseWriter, r *http.Request) {

// handleGetStatus handles GET /status requests.
func (h *Handler) handleGetStatus(w http.ResponseWriter, r *http.Request) {
status, err := h.StatusHandler.ClusterStatus()
if err != nil {
h.logger().Printf("cluster status error: %s", err)
return
}
if err := json.NewEncoder(w).Encode(getStatusResponse{
Health: h.Cluster.Health(),
Status: status,
}); err != nil {
h.logger().Printf("write status response error: %s", err)
}
Expand All @@ -128,7 +135,7 @@ type getSchemaResponse struct {
}

type getStatusResponse struct {
Health map[string]string `json:"health"`
Status proto.Message `json:"status"`
}

// handlePostQuery handles /query requests.
Expand Down
Loading

0 comments on commit 03aa944

Please sign in to comment.