Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update task health status based on health_status_changed_event #25

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"fmt"
"github.com/CiscoCloud/marathon-consul/apps"
"github.com/CiscoCloud/marathon-consul/health"
"github.com/CiscoCloud/marathon-consul/tasks"
"strings"
)
Expand Down Expand Up @@ -161,3 +162,30 @@ func (consul *Consul) DeleteTask(task *tasks.Task) error {
_, err := consul.kv.Delete(WithPrefix(consul.AppsPrefix, task.Key()))
return err
}

// UpdateHealth takes a health update message, retrieves its associated task
// from Consul, and updates the health information
func (consul *Consul) UpdateHealth(health *health.Health) error {
key := WithPrefix(consul.AppsPrefix, health.TaskKey())
remote, _, err := consul.kv.Get(key)

if err != nil {
return err
}

if remote == nil {
return fmt.Errorf("Task key %s doesn't exist, can't update health", health.TaskKey())
}

task, err := tasks.ParseTask(remote.Value)

if err != nil {
return err
}

task.HealthCheckResults = []tasks.TaskHealthCheckResult{tasks.TaskHealthCheckResult{Alive: health.Alive}}

err = consul.UpdateTask(task)

return err
}
18 changes: 18 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,21 @@ func (event AppTerminatedEvent) Apps() []*apps.App {
func (event AppTerminatedEvent) GetType() string {
return event.Type
}

type HealthStatusChangeEvent struct {
Type string `json:"eventType"`
AppID string `json:"appId"`
TaskID string `json:"taskId"`
Timestamp string `json:"timestamp"`
Alive bool `json:"alive"`
}

func (event HealthStatusChangeEvent) Apps() []*apps.App {
return []*apps.App{
&apps.App{ID: event.AppID},
}
}

func (event HealthStatusChangeEvent) GetType() string {
return event.Type
}
10 changes: 10 additions & 0 deletions events/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ func parseAppTerminatedEvent(jsonBlob []byte) (Event, error) {
return event, err
}

func parseHealthStatusChangeEvent(jsonBlob []byte) (Event, error) {
event := HealthStatusChangeEvent{}
err := json.Unmarshal(jsonBlob, &event)

return event, err
}

// ParseEvent combines the functions in this module to return an event without
// the user having to worry about the *type* of the event.
func ParseEvent(jsonBlob []byte) (event Event, err error) {
Expand All @@ -58,6 +65,9 @@ func ParseEvent(jsonBlob []byte) (event Event, err error) {
return parseDeploymentInfoEvent(jsonBlob)
case "app_terminated_event":
return parseAppTerminatedEvent(jsonBlob)
case "health_status_changed_event":
return parseHealthStatusChangeEvent(jsonBlob)

default:
return nil, errors.New("Unknown event type: " + eventType)
}
Expand Down
10 changes: 10 additions & 0 deletions events/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,13 @@ func TestParseEvent_AppTerminatedEvent(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, event, parsed.(AppTerminatedEvent))
}

func TestParseEvent_HealthStatusChangeEvent(t *testing.T) {
event := HealthStatusChangeEvent{Type: "health_status_changed_event"}
jsonBlob, err := json.Marshal(event)
assert.Nil(t, err)

parsed, err := ParseEvent(jsonBlob)
assert.Nil(t, err)
assert.Equal(t, event, parsed.(HealthStatusChangeEvent))
}
28 changes: 28 additions & 0 deletions health/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package health

import (
"encoding/json"
"fmt"
"github.com/CiscoCloud/marathon-consul/utils"
)

type Health struct {
AppID string `json:"appId"`
TaskID string `json:"taskId"`
Timestamp string `json:"timestamp"`
Alive bool `json:"alive"`
}

func ParseHealth(event []byte) (*Health, error) {
health := &Health{}
err := json.Unmarshal(event, health)
return health, err
}

func (health *Health) TaskKey() string {
return fmt.Sprintf(
"%s/tasks/%s",
utils.CleanID(health.AppID),
health.TaskID,
)
}
38 changes: 38 additions & 0 deletions health/health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package health

import (
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
"testing"
)

var testHealth = &Health{
AppID: "/my-app",
TaskID: "my-app_0-1396592784349",
Timestamp: "2014-03-01T23:29:30.158Z",
Alive: true,
}

func TestParseHealth(t *testing.T) {
t.Parallel()

jsonified, err := json.Marshal(testHealth)
assert.Nil(t, err)

health, err := ParseHealth(jsonified)
assert.Nil(t, err)

assert.Equal(t, testHealth.AppID, health.AppID)
assert.Equal(t, testHealth.TaskID, health.TaskID)
assert.Equal(t, testHealth.Timestamp, health.Timestamp)
assert.Equal(t, testHealth.Alive, health.Alive)
}

func TestTaskKey(t *testing.T) {
t.Parallel()

tk := testHealth.TaskKey()

assert.Equal(t, fmt.Sprintf("%s/tasks/%s", "my-app", testHealth.TaskID), tk)
}
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ Reconnect:
case "status_update_event":
eventLogger.Info("handling event")
err = fh.HandleStatusEvent(body)
case "health_status_changed_event":
err = fh.HandleHealthStatusEvent(body)
default:
eventLogger.Info("not handling event")
}
Expand Down
47 changes: 39 additions & 8 deletions tasks/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@ import (
)

type Task struct {
Timestamp string `json:"timestamp"`
SlaveID string `json:"slaveId"`
ID string `json:"id"`
TaskStatus string `json:"taskStatus"`
AppID string `json:"appId"`
Host string `json:"host"`
Ports []int `json:"ports"`
Version string `json:"version"`
Timestamp string `json:"timestamp"`
SlaveID string `json:"slaveId"`
ID string `json:"id"`
TaskStatus string `json:"taskStatus"`
AppID string `json:"appId"`
Host string `json:"host"`
Ports []int `json:"ports"`
Version string `json:"version"`
HealthCheckResults []TaskHealthCheckResult `json:"healthCheckResults"`
}

type TaskHealthCheckResult struct {
Alive bool `json:"alive"`
}

func ParseTask(event []byte) (*Task, error) {
Expand All @@ -40,3 +45,29 @@ func (task *Task) KV() *api.KVPair {
Value: serialized,
}
}

// Include a derived 'healthy' field in the json output to summarize the
// health check results, making it easier to act on in a template
func (task *Task) MarshalJSON() ([]byte, error) {
type Alias Task
return json.Marshal(&struct {
ReportsHealth bool `json:"reportsHealth"`
Healthy bool `json:"healthy"`
*Alias
}{
ReportsHealth: true,
Healthy: task.IsHealthy(),
Alias: (*Alias)(task),
})
}

// return true if any health check says the task is alive
func (task *Task) IsHealthy() bool {
for _, r := range task.HealthCheckResults {
if r.Alive {
return true
}
}

return false
}
21 changes: 13 additions & 8 deletions tasks/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
)

var testTask = &Task{
Timestamp: "2014-03-01T23:29:30.158Z",
SlaveID: "20140909-054127-177048842-5050-1494-0",
ID: "my-app_0-1396592784349",
TaskStatus: "TASK_RUNNING",
AppID: "/my-app",
Host: "slave-1234.acme.org",
Ports: []int{31372},
Version: "2014-04-04T06:26:23.051Z",
Timestamp: "2014-03-01T23:29:30.158Z",
SlaveID: "20140909-054127-177048842-5050-1494-0",
ID: "my-app_0-1396592784349",
TaskStatus: "TASK_RUNNING",
AppID: "/my-app",
Host: "slave-1234.acme.org",
Ports: []int{31372},
Version: "2014-04-04T06:26:23.051Z",
HealthCheckResults: []TaskHealthCheckResult{TaskHealthCheckResult{Alive: true}},
}

func TestParseTask(t *testing.T) {
Expand All @@ -35,6 +36,10 @@ func TestParseTask(t *testing.T) {
assert.Equal(t, testTask.Host, service.Host)
assert.Equal(t, testTask.Ports, service.Ports)
assert.Equal(t, testTask.Version, service.Version)
assert.Equal(t, testTask.HealthCheckResults, service.HealthCheckResults)

// check that derived field 'healthy' is serialized
assert.Contains(t, string(jsonified), "\"healthy\":true")
}

func TestKV(t *testing.T) {
Expand Down
16 changes: 16 additions & 0 deletions web.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/CiscoCloud/marathon-consul/consul"
"github.com/CiscoCloud/marathon-consul/events"
"github.com/CiscoCloud/marathon-consul/health"
"github.com/CiscoCloud/marathon-consul/tasks"
log "github.com/Sirupsen/logrus"
)
Expand Down Expand Up @@ -46,6 +47,9 @@ func (fh *ForwardHandler) Handle(w http.ResponseWriter, r *http.Request) {
case "status_update_event":
log.WithField("eventType", "status_update_event").Info("handling event")
err = fh.HandleStatusEvent(body)
case "health_status_changed_event":
log.WithField("eventType", "health_status_changed_event").Info("handling event")
err = fh.HandleHealthStatusEvent(body)
default:
log.WithField("eventType", eventType).Info("not handling event")
w.WriteHeader(200)
Expand Down Expand Up @@ -90,6 +94,18 @@ func (fh *ForwardHandler) HandleTerminationEvent(body []byte) error {
return fh.consul.DeleteApp(event.Apps()[0])
}

func (fh *ForwardHandler) HandleHealthStatusEvent(body []byte) error {
health, err := health.ParseHealth(body)

if err != nil {
return err
}

err = fh.consul.UpdateHealth(health)

return err
}

func (fh *ForwardHandler) HandleStatusEvent(body []byte) error {
// for every other use of Tasks, Marathon uses the "id" field for the task ID.
// Here, it uses "taskId", with most of the other fields being equal. We'll
Expand Down
46 changes: 46 additions & 0 deletions web_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/CiscoCloud/marathon-consul/apps"
"github.com/CiscoCloud/marathon-consul/consul"
"github.com/CiscoCloud/marathon-consul/events"
"github.com/CiscoCloud/marathon-consul/health"
"github.com/CiscoCloud/marathon-consul/mocks"
"github.com/CiscoCloud/marathon-consul/tasks"
"github.com/stretchr/testify/assert"
Expand All @@ -30,6 +31,13 @@ var (

testTaskKV = testTask.KV()
testAppKV = testApp.KV()

testHealth = &health.Health{
AppID: testTask.AppID,
TaskID: testTask.ID,
Timestamp: testTask.Timestamp,
Alive: true,
}
)

func TestHealthHandler(t *testing.T) {
Expand Down Expand Up @@ -151,3 +159,41 @@ func TestForwardHandlerHandleStatusEvent(t *testing.T) {
assert.NotNil(t, err)
assert.Equal(t, err.Error(), "unknown task status")
}

func TestForwardHandlerHandleHealthStatusEvent(t *testing.T) {
t.Parallel()

kv := mocks.NewKVer()
consul := consul.NewConsul(kv, "")
handler := ForwardHandler{consul}

testEvent := events.HealthStatusChangeEvent{"health_status_changed_event",
testHealth.AppID, testHealth.TaskID, testHealth.Timestamp, testHealth.Alive}

body, err := json.Marshal(testEvent)
assert.Nil(t, err)

// first check that we get an error if expected task isn't in the KV
err = handler.HandleHealthStatusEvent(body)
assert.NotNil(t, err)

// populate task that gets updated by health
err = consul.UpdateTask(testTask)
err = handler.HandleHealthStatusEvent(body)
assert.Nil(t, err)

// check task is updated with alive=true
result, _, err := kv.Get(testHealth.TaskKey())
assert.Nil(t, err)
assert.Contains(t, string(result.Value), "\"alive\":true")

// test again with alive=false
testEvent.Alive = false
body, err = json.Marshal(testEvent)
err = handler.HandleHealthStatusEvent(body)
assert.Nil(t, err)

result, _, err = kv.Get(testHealth.TaskKey())
assert.Nil(t, err)
assert.Contains(t, string(result.Value), "\"alive\":false")
}