Skip to content

Commit

Permalink
Go: add LHTaskWorkerHealth (#736)
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin authored Mar 22, 2024
1 parent 3a3a6c2 commit 9e7e2e2
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 56 deletions.
78 changes: 22 additions & 56 deletions sdk-go/taskworker/task_worker_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,76 +137,42 @@ func (c *serverConnection) close() {
// Below is equivalent to LHServerConnectionManager.java
/////////////////////////////////////////////////////////////

type connectionManagerLivenessController struct {
timeoutInMilliseconds int64
failureOccurredAt *time.Time
clusterHealthy bool
}

type serverConnectionManager struct {
tw *LHTaskWorker
connections []*serverConnection
taskChannel chan *taskExecutionInfo
wg *sync.WaitGroup
running bool
livenessController *connectionManagerLivenessController
tw *LHTaskWorker
connections []*serverConnection
taskChannel chan *taskExecutionInfo
wg *sync.WaitGroup
running bool
clusterHealthy bool
workerHealthy bool
}

func newServerConnectionManager(tw *LHTaskWorker) *serverConnectionManager {
channel := make(chan *taskExecutionInfo, 1)
var wg sync.WaitGroup
livenessController := &connectionManagerLivenessController{
timeoutInMilliseconds: 60000,
failureOccurredAt: nil,
clusterHealthy: true,
}
return &serverConnectionManager{
tw: tw,
connections: make([]*serverConnection, 0),
taskChannel: channel,
wg: &wg,
running: false,
livenessController: livenessController,
}
}

func (controller *connectionManagerLivenessController) notifyCallFailure() {
if controller.failureOccurredAt == nil {
t := time.Now()
controller.failureOccurredAt = &t
}
}

func (controller *connectionManagerLivenessController) notifyCallSuccess() {
controller.failureOccurredAt = nil
}

func (controller *connectionManagerLivenessController) wasFailureNotified() bool {
return controller.failureOccurredAt == nil
}

func (controller *connectionManagerLivenessController) keepManagerRunning() bool {
if controller.failureOccurredAt == nil {
return true
tw: tw,
connections: make([]*serverConnection, 0),
taskChannel: channel,
wg: &wg,
running: false,
clusterHealthy: true,
workerHealthy: true,
}

timeoutMillis := controller.timeoutInMilliseconds

upperLimit := controller.failureOccurredAt.Add(time.Duration(timeoutMillis) * time.Millisecond)

return time.Now().Before(upperLimit)
}

func (controller *connectionManagerLivenessController) isClusterHealthy() bool {
return controller.clusterHealthy
func (controller *serverConnectionManager) notifyCallFailure() {
controller.workerHealthy = false
}

func (controller *connectionManagerLivenessController) clusterHealth(response *model.RegisterTaskWorkerResponse) {
func (controller *serverConnectionManager) notifyCallSuccess(response *model.RegisterTaskWorkerResponse) {
if response.IsClusterHealthy != nil {
controller.clusterHealthy = *response.IsClusterHealthy
} else {
controller.clusterHealthy = true
}

controller.workerHealthy = true
}

func (m *serverConnectionManager) start() {
Expand All @@ -227,7 +193,7 @@ func (m *serverConnectionManager) start() {
}

// This is the rebalance/heartbeat thread
for m.livenessController.keepManagerRunning() && m.running {
for m.running {
reply, err := (*m.tw.grpcStub).RegisterTaskWorker(
context.Background(),
&model.RegisterTaskWorkerRequest{
Expand All @@ -237,11 +203,11 @@ func (m *serverConnectionManager) start() {
},
)
if err != nil {
m.livenessController.notifyCallFailure()
m.notifyCallFailure()
time.Sleep(time.Duration(time.Second * 8))
continue
}
m.livenessController.notifyCallSuccess()
m.notifyCallSuccess(reply)
for _, host := range reply.YourHosts {
if !m.isAlreadyRunning(host) {
newConn, err := newServerConnection(m, host)
Expand Down
49 changes: 49 additions & 0 deletions sdk-go/taskworker/task_worker_public.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,38 @@
package taskworker

import (
"fmt"

"github.com/littlehorse-enterprises/littlehorse/sdk-go/common"
"github.com/littlehorse-enterprises/littlehorse/sdk-go/common/model"
)

type TaskWorkerHealthReason int32

const (
Healthy TaskWorkerHealthReason = iota
Unhealthy
ServerRebalancing
)

func (e TaskWorkerHealthReason) String() string {
switch e {
case Healthy:
return "Healthy"
case Unhealthy:
return "Unhealthy"
case ServerRebalancing:
return "ServerRebalancing"
default:
return fmt.Sprintf("%d", int(e))
}
}

type LHTaskWorkerHealth struct {
Healthy bool
Reason TaskWorkerHealthReason
}

type LHTaskWorker struct {
config *common.LHConfig
grpcStub *model.LittleHorseClient
Expand Down Expand Up @@ -51,3 +79,24 @@ func (tw *LHTaskWorker) Start() error {
func (tw *LHTaskWorker) Close() error {
return tw.close()
}

func (tw *LHTaskWorker) Health() LHTaskWorkerHealth {
if !tw.manager.clusterHealthy {
return LHTaskWorkerHealth{
Healthy: false,
Reason: ServerRebalancing,
}
}

if !tw.manager.workerHealthy {
return LHTaskWorkerHealth{
Healthy: false,
Reason: Unhealthy,
}
}

return LHTaskWorkerHealth{
Healthy: true,
Reason: Healthy,
}
}

0 comments on commit 9e7e2e2

Please sign in to comment.