diff --git a/consul/consul.go b/consul/consul.go index 02e381f..24df758 100644 --- a/consul/consul.go +++ b/consul/consul.go @@ -5,6 +5,7 @@ import ( "bytes" "fmt" "github.com/CiscoCloud/marathon-consul/apps" + "github.com/CiscoCloud/marathon-consul/health" "github.com/CiscoCloud/marathon-consul/tasks" "strings" ) @@ -161,3 +162,30 @@ func (consul *Consul) DeleteTask(task *tasks.Task) error { _, err := consul.kv.Delete(WithPrefix(consul.AppsPrefix, task.Key())) return err } + +// UpdateHealth takes a health update message, retrieves its associated task +// from Consul, and updates the health information +func (consul *Consul) UpdateHealth(health *health.Health) error { + key := WithPrefix(consul.AppsPrefix, health.TaskKey()) + remote, _, err := consul.kv.Get(key) + + if err != nil { + return err + } + + if remote == nil { + return fmt.Errorf("Task key %s doesn't exist, can't update health", health.TaskKey()) + } + + task, err := tasks.ParseTask(remote.Value) + + if err != nil { + return err + } + + task.HealthCheckResults = []tasks.TaskHealthCheckResult{tasks.TaskHealthCheckResult{Alive: health.Alive}} + + err = consul.UpdateTask(task) + + return err +} diff --git a/events/events.go b/events/events.go index a7cb106..c29410a 100644 --- a/events/events.go +++ b/events/events.go @@ -62,3 +62,21 @@ func (event AppTerminatedEvent) Apps() []*apps.App { func (event AppTerminatedEvent) GetType() string { return event.Type } + +type HealthStatusChangeEvent struct { + Type string `json:"eventType"` + AppID string `json:"appId"` + TaskID string `json:"taskId"` + Timestamp string `json:"timestamp"` + Alive bool `json:"alive"` +} + +func (event HealthStatusChangeEvent) Apps() []*apps.App { + return []*apps.App{ + &apps.App{ID: event.AppID}, + } +} + +func (event HealthStatusChangeEvent) GetType() string { + return event.Type +} diff --git a/events/parse.go b/events/parse.go index a738eda..bf7f2d4 100644 --- a/events/parse.go +++ b/events/parse.go @@ -43,6 +43,13 @@ func parseAppTerminatedEvent(jsonBlob []byte) (Event, error) { return event, err } +func parseHealthStatusChangeEvent(jsonBlob []byte) (Event, error) { + event := HealthStatusChangeEvent{} + err := json.Unmarshal(jsonBlob, &event) + + return event, err +} + // ParseEvent combines the functions in this module to return an event without // the user having to worry about the *type* of the event. func ParseEvent(jsonBlob []byte) (event Event, err error) { @@ -58,6 +65,9 @@ func ParseEvent(jsonBlob []byte) (event Event, err error) { return parseDeploymentInfoEvent(jsonBlob) case "app_terminated_event": return parseAppTerminatedEvent(jsonBlob) + case "health_status_changed_event": + return parseHealthStatusChangeEvent(jsonBlob) + default: return nil, errors.New("Unknown event type: " + eventType) } diff --git a/events/parse_test.go b/events/parse_test.go index 0dd4311..81dfe7c 100644 --- a/events/parse_test.go +++ b/events/parse_test.go @@ -53,3 +53,13 @@ func TestParseEvent_AppTerminatedEvent(t *testing.T) { assert.Nil(t, err) assert.Equal(t, event, parsed.(AppTerminatedEvent)) } + +func TestParseEvent_HealthStatusChangeEvent(t *testing.T) { + event := HealthStatusChangeEvent{Type: "health_status_changed_event"} + jsonBlob, err := json.Marshal(event) + assert.Nil(t, err) + + parsed, err := ParseEvent(jsonBlob) + assert.Nil(t, err) + assert.Equal(t, event, parsed.(HealthStatusChangeEvent)) +} diff --git a/health/health.go b/health/health.go new file mode 100644 index 0000000..9a0b556 --- /dev/null +++ b/health/health.go @@ -0,0 +1,28 @@ +package health + +import ( + "encoding/json" + "fmt" + "github.com/CiscoCloud/marathon-consul/utils" +) + +type Health struct { + AppID string `json:"appId"` + TaskID string `json:"taskId"` + Timestamp string `json:"timestamp"` + Alive bool `json:"alive"` +} + +func ParseHealth(event []byte) (*Health, error) { + health := &Health{} + err := json.Unmarshal(event, health) + return health, err +} + +func (health *Health) TaskKey() string { + return fmt.Sprintf( + "%s/tasks/%s", + utils.CleanID(health.AppID), + health.TaskID, + ) +} diff --git a/health/health_test.go b/health/health_test.go new file mode 100644 index 0000000..885e634 --- /dev/null +++ b/health/health_test.go @@ -0,0 +1,38 @@ +package health + +import ( + "encoding/json" + "fmt" + "github.com/stretchr/testify/assert" + "testing" +) + +var testHealth = &Health{ + AppID: "/my-app", + TaskID: "my-app_0-1396592784349", + Timestamp: "2014-03-01T23:29:30.158Z", + Alive: true, +} + +func TestParseHealth(t *testing.T) { + t.Parallel() + + jsonified, err := json.Marshal(testHealth) + assert.Nil(t, err) + + health, err := ParseHealth(jsonified) + assert.Nil(t, err) + + assert.Equal(t, testHealth.AppID, health.AppID) + assert.Equal(t, testHealth.TaskID, health.TaskID) + assert.Equal(t, testHealth.Timestamp, health.Timestamp) + assert.Equal(t, testHealth.Alive, health.Alive) +} + +func TestTaskKey(t *testing.T) { + t.Parallel() + + tk := testHealth.TaskKey() + + assert.Equal(t, fmt.Sprintf("%s/tasks/%s", "my-app", testHealth.TaskID), tk) +} diff --git a/main.go b/main.go index 1e5ba67..4e86706 100644 --- a/main.go +++ b/main.go @@ -113,6 +113,8 @@ Reconnect: case "status_update_event": eventLogger.Info("handling event") err = fh.HandleStatusEvent(body) + case "health_status_changed_event": + err = fh.HandleHealthStatusEvent(body) default: eventLogger.Info("not handling event") } diff --git a/tasks/task.go b/tasks/task.go index 1a17d70..1db4254 100644 --- a/tasks/task.go +++ b/tasks/task.go @@ -8,14 +8,19 @@ import ( ) type Task struct { - Timestamp string `json:"timestamp"` - SlaveID string `json:"slaveId"` - ID string `json:"id"` - TaskStatus string `json:"taskStatus"` - AppID string `json:"appId"` - Host string `json:"host"` - Ports []int `json:"ports"` - Version string `json:"version"` + Timestamp string `json:"timestamp"` + SlaveID string `json:"slaveId"` + ID string `json:"id"` + TaskStatus string `json:"taskStatus"` + AppID string `json:"appId"` + Host string `json:"host"` + Ports []int `json:"ports"` + Version string `json:"version"` + HealthCheckResults []TaskHealthCheckResult `json:"healthCheckResults"` +} + +type TaskHealthCheckResult struct { + Alive bool `json:"alive"` } func ParseTask(event []byte) (*Task, error) { @@ -40,3 +45,29 @@ func (task *Task) KV() *api.KVPair { Value: serialized, } } + +// Include a derived 'healthy' field in the json output to summarize the +// health check results, making it easier to act on in a template +func (task *Task) MarshalJSON() ([]byte, error) { + type Alias Task + return json.Marshal(&struct { + ReportsHealth bool `json:"reportsHealth"` + Healthy bool `json:"healthy"` + *Alias + }{ + ReportsHealth: true, + Healthy: task.IsHealthy(), + Alias: (*Alias)(task), + }) +} + +// return true if any health check says the task is alive +func (task *Task) IsHealthy() bool { + for _, r := range task.HealthCheckResults { + if r.Alive { + return true + } + } + + return false +} diff --git a/tasks/task_test.go b/tasks/task_test.go index fc8a22a..1f6a3bd 100644 --- a/tasks/task_test.go +++ b/tasks/task_test.go @@ -8,14 +8,15 @@ import ( ) var testTask = &Task{ - Timestamp: "2014-03-01T23:29:30.158Z", - SlaveID: "20140909-054127-177048842-5050-1494-0", - ID: "my-app_0-1396592784349", - TaskStatus: "TASK_RUNNING", - AppID: "/my-app", - Host: "slave-1234.acme.org", - Ports: []int{31372}, - Version: "2014-04-04T06:26:23.051Z", + Timestamp: "2014-03-01T23:29:30.158Z", + SlaveID: "20140909-054127-177048842-5050-1494-0", + ID: "my-app_0-1396592784349", + TaskStatus: "TASK_RUNNING", + AppID: "/my-app", + Host: "slave-1234.acme.org", + Ports: []int{31372}, + Version: "2014-04-04T06:26:23.051Z", + HealthCheckResults: []TaskHealthCheckResult{TaskHealthCheckResult{Alive: true}}, } func TestParseTask(t *testing.T) { @@ -35,6 +36,10 @@ func TestParseTask(t *testing.T) { assert.Equal(t, testTask.Host, service.Host) assert.Equal(t, testTask.Ports, service.Ports) assert.Equal(t, testTask.Version, service.Version) + assert.Equal(t, testTask.HealthCheckResults, service.HealthCheckResults) + + // check that derived field 'healthy' is serialized + assert.Contains(t, string(jsonified), "\"healthy\":true") } func TestKV(t *testing.T) { diff --git a/web.go b/web.go index 7a9aace..7c39f65 100644 --- a/web.go +++ b/web.go @@ -9,6 +9,7 @@ import ( "github.com/CiscoCloud/marathon-consul/consul" "github.com/CiscoCloud/marathon-consul/events" + "github.com/CiscoCloud/marathon-consul/health" "github.com/CiscoCloud/marathon-consul/tasks" log "github.com/Sirupsen/logrus" ) @@ -46,6 +47,9 @@ func (fh *ForwardHandler) Handle(w http.ResponseWriter, r *http.Request) { case "status_update_event": log.WithField("eventType", "status_update_event").Info("handling event") err = fh.HandleStatusEvent(body) + case "health_status_changed_event": + log.WithField("eventType", "health_status_changed_event").Info("handling event") + err = fh.HandleHealthStatusEvent(body) default: log.WithField("eventType", eventType).Info("not handling event") w.WriteHeader(200) @@ -90,6 +94,18 @@ func (fh *ForwardHandler) HandleTerminationEvent(body []byte) error { return fh.consul.DeleteApp(event.Apps()[0]) } +func (fh *ForwardHandler) HandleHealthStatusEvent(body []byte) error { + health, err := health.ParseHealth(body) + + if err != nil { + return err + } + + err = fh.consul.UpdateHealth(health) + + return err +} + func (fh *ForwardHandler) HandleStatusEvent(body []byte) error { // for every other use of Tasks, Marathon uses the "id" field for the task ID. // Here, it uses "taskId", with most of the other fields being equal. We'll diff --git a/web_test.go b/web_test.go index 6e86dd8..7dfef23 100644 --- a/web_test.go +++ b/web_test.go @@ -10,6 +10,7 @@ import ( "github.com/CiscoCloud/marathon-consul/apps" "github.com/CiscoCloud/marathon-consul/consul" "github.com/CiscoCloud/marathon-consul/events" + "github.com/CiscoCloud/marathon-consul/health" "github.com/CiscoCloud/marathon-consul/mocks" "github.com/CiscoCloud/marathon-consul/tasks" "github.com/stretchr/testify/assert" @@ -30,6 +31,13 @@ var ( testTaskKV = testTask.KV() testAppKV = testApp.KV() + + testHealth = &health.Health{ + AppID: testTask.AppID, + TaskID: testTask.ID, + Timestamp: testTask.Timestamp, + Alive: true, + } ) func TestHealthHandler(t *testing.T) { @@ -151,3 +159,41 @@ func TestForwardHandlerHandleStatusEvent(t *testing.T) { assert.NotNil(t, err) assert.Equal(t, err.Error(), "unknown task status") } + +func TestForwardHandlerHandleHealthStatusEvent(t *testing.T) { + t.Parallel() + + kv := mocks.NewKVer() + consul := consul.NewConsul(kv, "") + handler := ForwardHandler{consul} + + testEvent := events.HealthStatusChangeEvent{"health_status_changed_event", + testHealth.AppID, testHealth.TaskID, testHealth.Timestamp, testHealth.Alive} + + body, err := json.Marshal(testEvent) + assert.Nil(t, err) + + // first check that we get an error if expected task isn't in the KV + err = handler.HandleHealthStatusEvent(body) + assert.NotNil(t, err) + + // populate task that gets updated by health + err = consul.UpdateTask(testTask) + err = handler.HandleHealthStatusEvent(body) + assert.Nil(t, err) + + // check task is updated with alive=true + result, _, err := kv.Get(testHealth.TaskKey()) + assert.Nil(t, err) + assert.Contains(t, string(result.Value), "\"alive\":true") + + // test again with alive=false + testEvent.Alive = false + body, err = json.Marshal(testEvent) + err = handler.HandleHealthStatusEvent(body) + assert.Nil(t, err) + + result, _, err = kv.Get(testHealth.TaskKey()) + assert.Nil(t, err) + assert.Contains(t, string(result.Value), "\"alive\":false") +}