Skip to content

Commit

Permalink
move process reaping login go from C
Browse files Browse the repository at this point in the history
  • Loading branch information
rasamala83 committed Dec 19, 2024
1 parent 0ad258d commit 0e4c25f
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 65 deletions.
101 changes: 55 additions & 46 deletions lib/workerbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"sync"
"syscall"

"github.com/paypal/hera/utility"
"github.com/paypal/hera/utility/logger"
)

Expand Down Expand Up @@ -275,59 +274,69 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) {
// we can get all the pids in this call. double the size in case we
// get none-hera defunct processes. +1 in case racing casue mapsize=0.
//
var arraySize = 2*len(broker.pidworkermap) + 1
var defunctPids = make([]int32, arraySize)
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, "Wait SIGCHLD len=", arraySize-1, ", pwmap:", broker.pidworkermap)
}
if arraySize > 0 {
utility.ReapDefunctPids(defunctPids)
}
if logger.GetLogger().V(logger.Info) {
logger.GetLogger().Log(logger.Info, "exited worker", defunctPids)
}
broker.lock.Lock()
for i := 0; i < arraySize; i++ {
//
// last valid entry in stoppedpids is followed by one or more zeros.
//
if defunctPids[i] == 0 {
defunctPids := make([]int32, 0)
for {
var status syscall.WaitStatus

//Reap exited children in non-blocking mode
pid, err := syscall.Wait4(-1, &status, syscall.WNOHANG, nil)
if pid > 0 {
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, "received worker exit signal for pid:", pid, " status: ", status)
}
defunctPids = append(defunctPids, int32(pid))
} else if pid == 0 {
break
} else {
if errors.Is(err, syscall.ECHILD) {
break
} else {
logger.GetLogger().Log(logger.Verbose, "error in wait signal: ", err)
}
}
var workerclient = broker.pidworkermap[defunctPids[i]]
if workerclient != nil {
delete(broker.pidworkermap, defunctPids[i])
pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID)
if err != nil {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err)
}

if len(defunctPids) > 0 {
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "worker exit signal received from pids :", defunctPids)
}
broker.lock.Lock()
for _, pid := range defunctPids {
var workerclient = broker.pidworkermap[pid]
if workerclient != nil {
delete(broker.pidworkermap, pid)
pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID)
if err != nil {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err)
}
} else {
//
// a worker could be terminated while serving a request.
// in these cases, doRead() in workerclient will get an
// EOF and exit. doSession() in coordinator will get the
// worker outCh closed event and exit, at which point
// coordinator itself calls returnworker to set connstate
// from assign to idle.
// no need to publish the following event again.
//
//if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) {
// GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle})
//}
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "worker (id=", workerclient.ID, "pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.")
}
workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long
pool.RestartWorker(workerclient)
}
} else {
//
// a worker could be terminated while serving a request.
// in these cases, doRead() in workerclient will get an
// EOF and exit. doSession() in coordinator will get the
// worker outCh closed event and exit, at which point
// coordinator itself calls returnworker to set connstate
// from assign to idle.
// no need to publish the following event again.
//
//if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) {
// GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle})
//}
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "worker (pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.")
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Exited worker pid =", pid, " not found")
}
workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long
pool.RestartWorker(workerclient)
}
} else {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Exited worker pid =", defunctPids[i], " not found")
}
}
broker.lock.Unlock()
}
broker.lock.Unlock()
case syscall.SIGTERM:
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "Got SIGTERM")
Expand Down
24 changes: 5 additions & 19 deletions lib/workerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,6 @@ const (
MaxWorkerState = 7
)

var validStateTransitionMap map[HeraWorkerStatus][]HeraWorkerStatus = map[HeraWorkerStatus][]HeraWorkerStatus{
wsUnset: {wsSchd, wsInit},
wsSchd: {wsInit, wsUnset},
wsInit: {wsSchd, wsAcpt, wsUnset},
wsAcpt: {wsBusy},
wsBusy: {wsWait, wsQuce, wsFnsh},
wsWait: {wsQuce, wsFnsh},
wsFnsh: {wsAcpt, wsSchd},
wsQuce: {wsInit, wsFnsh}, //Forceful termination target state "wsInit", Graceful termination "wsFnsh"
}

const bfChannelSize = 30

// workerMsg is used to communicate with the coordinator, it contains the control message metadata plus the actual payload
Expand Down Expand Up @@ -995,19 +984,16 @@ func (worker *WorkerClient) setState(status HeraWorkerStatus) {
if currentStatus == status {
return
}
//This checks whether state transition is valid or not
if Contains(validStateTransitionMap[currentStatus], status) {
worker.stateLock.Lock()
worker.Status = status
worker.stateLock.Unlock()
GetStateLog().PublishStateEvent(StateEvent{eType: WorkerStateEvt, shardID: worker.shardID, wType: worker.Type, instID: worker.instID, workerID: worker.ID, newWState: status})
} else {
if worker.isUnderRecovery == 1 && (status == wsWait || status == wsBusy) {
logger.GetLogger().Log(logger.Warning, "worker : ", worker.ID, "processId: ", worker.pid, " seeing invalid state transition from ", currentStatus, " to ", status)
if logger.GetLogger().V(logger.Debug) {
worker.printCallStack()
}
return
}

//This checks whether state transition is valid or not
worker.Status = status
GetStateLog().PublishStateEvent(StateEvent{eType: WorkerStateEvt, shardID: worker.shardID, wType: worker.Type, instID: worker.instID, workerID: worker.ID, newWState: status})
}

// Channel returns the worker out channel
Expand Down

0 comments on commit 0e4c25f

Please sign in to comment.