Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changes for adding properties to callog to track rac timestamp value #391

Merged
merged 8 commits into from
Sep 26, 2024
33 changes: 22 additions & 11 deletions lib/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (pool *WorkerPool) spawnWorker(wid int) error {
worker.setState(wsSchd)
millis := rand.Intn(GetConfig().RandomStartMs)
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, wid, "randomized start ms",millis)
logger.GetLogger().Log(logger.Alert, wid, "randomized start ms", millis)
}
time.Sleep(time.Millisecond * time.Duration(millis))

Expand All @@ -131,7 +131,7 @@ func (pool *WorkerPool) spawnWorker(wid int) error {
}
millis := rand.Intn(3000)
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, initCnt, "is too many in init state. waiting to start",wid)
logger.GetLogger().Log(logger.Alert, initCnt, "is too many in init state. waiting to start", wid)
}
time.Sleep(time.Millisecond * time.Duration(millis))
}
Expand Down Expand Up @@ -559,10 +559,10 @@ func (pool *WorkerPool) ReturnWorker(worker *WorkerClient, ticket string) (err e
}
if skipRecycle {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=",pool.moduleName,"shard_id=",pool.ShardID, "HEALTHY worker Count=",pool.GetHealthyWorkersCount(),"TotalWorkers:=", pool.desiredSize)
logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=", pool.moduleName, "shard_id=", pool.ShardID, "HEALTHY worker Count=", pool.GetHealthyWorkersCount(), "TotalWorkers:=", pool.desiredSize)
}
calMsg := fmt.Sprintf("Recycle(worker_pid)=%d, module_name=%s,shard_id=%d", worker.pid, worker.moduleName, worker.shardID)
evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER","ReturnWorker", cal.TransOK, calMsg)
evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER", "ReturnWorker", cal.TransOK, calMsg)
evt.Completed()
}

Expand Down Expand Up @@ -698,7 +698,7 @@ func (pool *WorkerPool) RacMaint(racReq racAct) {
now := time.Now().Unix()
window := GetConfig().RacRestartWindow
dbUname := ""
cnt := 0
var racMaintWorkers [][]interface{}
pool.poolCond.L.Lock()
for i := 0; i < pool.currentSize; i++ {
if (pool.workers[i] != nil) && (racReq.instID == 0 || pool.workers[i].racID == racReq.instID) && (pool.workers[i].startTime < int64(racReq.tm)) {
Expand All @@ -718,7 +718,7 @@ func (pool *WorkerPool) RacMaint(racReq racAct) {
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, "Rac maint activating, worker", i, pool.workers[i].pid, "exittime=", pool.workers[i].exitTime, now, window, pool.currentSize)
}
cnt++
racMaintWorkers = append(racMaintWorkers, []interface{}{pool.moduleName, pool.ShardID, i, pool.workers[i].exitTime, pool.workers[i].exitTime - now})
rasamala83 marked this conversation as resolved.
Show resolved Hide resolved
if len(dbUname) == 0 {
dbUname = pool.workers[i].dbUname
}
Expand All @@ -727,8 +727,19 @@ func (pool *WorkerPool) RacMaint(racReq racAct) {
pool.poolCond.L.Unlock()
// TODO: C++ worker logs one event for each worker, in the worker, so
// we keep the same. Think about changing it
for i := 0; i < cnt; i++ {
for i := 0; i < len(racMaintWorkers); i++ {
poolModName, _ := racMaintWorkers[i][0].(string)
shardId, _ := racMaintWorkers[i][1].(int64)
workerId, _ := racMaintWorkers[i][2].(int64)
rasamala83 marked this conversation as resolved.
Show resolved Hide resolved
workerExitTime, _ := racMaintWorkers[i][3].(int64)
exitInSec := racMaintWorkers[i][4].(int64)
evt := cal.NewCalEvent("RAC_ID", fmt.Sprintf("%d", racReq.instID), cal.TransOK, "")
evt.AddDataStr("poolModName", poolModName)
evt.AddDataInt("workerId", workerId)
evt.AddDataInt("shardId", shardId)
evt.AddDataInt("tm", int64(racReq.tm))
evt.AddDataInt("exitTime", workerExitTime)
evt.AddDataStr("exitInSec", fmt.Sprintf("%dsec", exitInSec))
evt.Completed()
evt = cal.NewCalEvent("DB_UNAME", dbUname, cal.TransOK, "")
evt.Completed()
Expand Down Expand Up @@ -768,12 +779,12 @@ func (pool *WorkerPool) checkWorkerLifespan() {
pool.poolCond.L.Lock()
for i := 0; i < pool.currentSize; i++ {
if (pool.workers[i] != nil) && (pool.workers[i].exitTime != 0) && (pool.workers[i].exitTime <= now) {
if pool.GetHealthyWorkersCount() < (int32(pool.desiredSize*GetConfig().MaxDesiredHealthyWorkerPct/100)) { // Should it be a config value
if pool.GetHealthyWorkersCount() < (int32(pool.desiredSize * GetConfig().MaxDesiredHealthyWorkerPct / 100)) { // Should it be a config value
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=",pool.moduleName,"shard_id=",pool.ShardID, "HEALTHY worker Count=",pool.GetHealthyWorkersCount(),"TotalWorkers:", pool.desiredSize)
logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=", pool.moduleName, "shard_id=", pool.ShardID, "HEALTHY worker Count=", pool.GetHealthyWorkersCount(), "TotalWorkers:", pool.desiredSize)
}
calMsg := fmt.Sprintf("module_name=%s,shard_id=%d", pool.moduleName, pool.ShardID)
evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER","checkWorkerLifespan", cal.TransOK, calMsg)
evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER", "checkWorkerLifespan", cal.TransOK, calMsg)
evt.Completed()
break
}
Expand Down Expand Up @@ -814,7 +825,7 @@ func (pool *WorkerPool) checkWorkerLifespan() {
pool.poolCond.L.Unlock()
for _, w := range workers {
if logger.GetLogger().V(logger.Info) {
logger.GetLogger().Log(logger.Info, "checkworkerlifespan - Lifespan exceeded, terminate worker: pid =", w.pid, ", pool_type =", w.Type, ", inst =", w.instID ,"HEALTHY worker Count=",pool.GetHealthyWorkersCount(),"TotalWorkers:", pool.desiredSize)
logger.GetLogger().Log(logger.Info, "checkworkerlifespan - Lifespan exceeded, terminate worker: pid =", w.pid, ", pool_type =", w.Type, ", inst =", w.instID, "HEALTHY worker Count=", pool.GetHealthyWorkersCount(), "TotalWorkers:", pool.desiredSize)
}
w.Terminate()
}
Expand Down
Loading