From 0e4c25f7ccb5f5ec84a5a568c49324d82f5680c4 Mon Sep 17 00:00:00 2001 From: Rajesh Samala Date: Thu, 19 Dec 2024 13:59:44 +0530 Subject: [PATCH] move process reaping login go from C --- lib/workerbroker.go | 101 ++++++++++++++++++++++++-------------------- lib/workerclient.go | 24 +++-------- 2 files changed, 60 insertions(+), 65 deletions(-) diff --git a/lib/workerbroker.go b/lib/workerbroker.go index 95cfe0a..8d44b50 100644 --- a/lib/workerbroker.go +++ b/lib/workerbroker.go @@ -24,7 +24,6 @@ import ( "sync" "syscall" - "github.com/paypal/hera/utility" "github.com/paypal/hera/utility/logger" ) @@ -275,59 +274,69 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) { // we can get all the pids in this call. double the size in case we // get none-hera defunct processes. +1 in case racing casue mapsize=0. // - var arraySize = 2*len(broker.pidworkermap) + 1 - var defunctPids = make([]int32, arraySize) - if logger.GetLogger().V(logger.Verbose) { - logger.GetLogger().Log(logger.Verbose, "Wait SIGCHLD len=", arraySize-1, ", pwmap:", broker.pidworkermap) - } - if arraySize > 0 { - utility.ReapDefunctPids(defunctPids) - } - if logger.GetLogger().V(logger.Info) { - logger.GetLogger().Log(logger.Info, "exited worker", defunctPids) - } - broker.lock.Lock() - for i := 0; i < arraySize; i++ { - // - // last valid entry in stoppedpids is followed by one or more zeros. - // - if defunctPids[i] == 0 { + defunctPids := make([]int32, 0) + for { + var status syscall.WaitStatus + + //Reap exited children in non-blocking mode + pid, err := syscall.Wait4(-1, &status, syscall.WNOHANG, nil) + if pid > 0 { + if logger.GetLogger().V(logger.Verbose) { + logger.GetLogger().Log(logger.Verbose, "received worker exit signal for pid:", pid, " status: ", status) + } + defunctPids = append(defunctPids, int32(pid)) + } else if pid == 0 { break + } else { + if errors.Is(err, syscall.ECHILD) { + break + } else { + logger.GetLogger().Log(logger.Verbose, "error in wait signal: ", err) + } } - var workerclient = broker.pidworkermap[defunctPids[i]] - if workerclient != nil { - delete(broker.pidworkermap, defunctPids[i]) - pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID) - if err != nil { - if logger.GetLogger().V(logger.Alert) { - logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err) + } + + if len(defunctPids) > 0 { + if logger.GetLogger().V(logger.Debug) { + logger.GetLogger().Log(logger.Debug, "worker exit signal received from pids :", defunctPids) + } + broker.lock.Lock() + for _, pid := range defunctPids { + var workerclient = broker.pidworkermap[pid] + if workerclient != nil { + delete(broker.pidworkermap, pid) + pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID) + if err != nil { + if logger.GetLogger().V(logger.Alert) { + logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err) + } + } else { + // + // a worker could be terminated while serving a request. + // in these cases, doRead() in workerclient will get an + // EOF and exit. doSession() in coordinator will get the + // worker outCh closed event and exit, at which point + // coordinator itself calls returnworker to set connstate + // from assign to idle. + // no need to publish the following event again. + // + //if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) { + // GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle}) + //} + if logger.GetLogger().V(logger.Debug) { + logger.GetLogger().Log(logger.Debug, "worker (id=", workerclient.ID, "pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.") + } + workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long + pool.RestartWorker(workerclient) } } else { - // - // a worker could be terminated while serving a request. - // in these cases, doRead() in workerclient will get an - // EOF and exit. doSession() in coordinator will get the - // worker outCh closed event and exit, at which point - // coordinator itself calls returnworker to set connstate - // from assign to idle. - // no need to publish the following event again. - // - //if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) { - // GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle}) - //} - if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, "worker (pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.") + if logger.GetLogger().V(logger.Alert) { + logger.GetLogger().Log(logger.Alert, "Exited worker pid =", pid, " not found") } - workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long - pool.RestartWorker(workerclient) - } - } else { - if logger.GetLogger().V(logger.Alert) { - logger.GetLogger().Log(logger.Alert, "Exited worker pid =", defunctPids[i], " not found") } } + broker.lock.Unlock() } - broker.lock.Unlock() case syscall.SIGTERM: if logger.GetLogger().V(logger.Debug) { logger.GetLogger().Log(logger.Debug, "Got SIGTERM") diff --git a/lib/workerclient.go b/lib/workerclient.go index e1788bb..4572d00 100644 --- a/lib/workerclient.go +++ b/lib/workerclient.go @@ -54,17 +54,6 @@ const ( MaxWorkerState = 7 ) -var validStateTransitionMap map[HeraWorkerStatus][]HeraWorkerStatus = map[HeraWorkerStatus][]HeraWorkerStatus{ - wsUnset: {wsSchd, wsInit}, - wsSchd: {wsInit, wsUnset}, - wsInit: {wsSchd, wsAcpt, wsUnset}, - wsAcpt: {wsBusy}, - wsBusy: {wsWait, wsQuce, wsFnsh}, - wsWait: {wsQuce, wsFnsh}, - wsFnsh: {wsAcpt, wsSchd}, - wsQuce: {wsInit, wsFnsh}, //Forceful termination target state "wsInit", Graceful termination "wsFnsh" -} - const bfChannelSize = 30 // workerMsg is used to communicate with the coordinator, it contains the control message metadata plus the actual payload @@ -995,19 +984,16 @@ func (worker *WorkerClient) setState(status HeraWorkerStatus) { if currentStatus == status { return } - //This checks whether state transition is valid or not - if Contains(validStateTransitionMap[currentStatus], status) { - worker.stateLock.Lock() - worker.Status = status - worker.stateLock.Unlock() - GetStateLog().PublishStateEvent(StateEvent{eType: WorkerStateEvt, shardID: worker.shardID, wType: worker.Type, instID: worker.instID, workerID: worker.ID, newWState: status}) - } else { + if worker.isUnderRecovery == 1 && (status == wsWait || status == wsBusy) { logger.GetLogger().Log(logger.Warning, "worker : ", worker.ID, "processId: ", worker.pid, " seeing invalid state transition from ", currentStatus, " to ", status) if logger.GetLogger().V(logger.Debug) { worker.printCallStack() } + return } - + //This checks whether state transition is valid or not + worker.Status = status + GetStateLog().PublishStateEvent(StateEvent{eType: WorkerStateEvt, shardID: worker.shardID, wType: worker.Type, instID: worker.instID, workerID: worker.ID, newWState: status}) } // Channel returns the worker out channel