Skip to content

Commit

Permalink
fixing race condition related to worker recovery issue
Browse files Browse the repository at this point in the history
  • Loading branch information
rasamala83 committed Jan 17, 2025
1 parent 0e4c25f commit 986a1b9
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 44 deletions.
47 changes: 25 additions & 22 deletions lib/adaptivequemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"errors"
"fmt"
"math/rand"
"sync/atomic"
"strings"
"sync/atomic"
"time"

"github.com/paypal/hera/cal"
Expand Down Expand Up @@ -118,28 +118,31 @@ type BindCount struct {
Workers map[string]*WorkerClient // lookup by ticket
}

func bindEvictNameOk(bindName string) (bool) {
func bindEvictNameOk(bindName string) bool {
commaNames := GetConfig().BindEvictionNames
if len(commaNames) == 0 {
// for tests, allow all names to be subject to bind eviction
return true
}
commaNames = strings.ToLower(commaNames)
bindName = strings.ToLower(bindName)
for _, okSubname := range strings.Split(commaNames,",") {
for _, okSubname := range strings.Split(commaNames, ",") {
if strings.Contains(bindName, okSubname) {
return true
}
}
return false
}

/* A bad query with multiple binds will add independent bind throttles to all
bind name and values */
func (mgr *adaptiveQueueManager) doBindEviction() (int) {
/*
A bad query with multiple binds will add independent bind throttles to all
bind name and values
*/
func (mgr *adaptiveQueueManager) doBindEviction() int {
throttleCount := 0
GetBindEvict().lock.Lock()
for _,keyValues := range GetBindEvict().BindThrottle {
for _, keyValues := range GetBindEvict().BindThrottle {
throttleCount += len(keyValues)
}
GetBindEvict().lock.Unlock()
Expand Down Expand Up @@ -172,14 +175,14 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
}
continue
}
contextBinds := parseBinds(request)
sqlsrcPrefix := worker.clientHostPrefix.Load().(string)
sqlsrcApp := worker.clientApp.Load().(string)
contextBinds := parseBinds(request)
sqlsrcPrefix := worker.clientHostPrefix.Load().(string)
sqlsrcApp := worker.clientApp.Load().(string)
if sqlsrcPrefix != "" {
contextBinds[SrcPrefixAppKey] = fmt.Sprintf("%s%s", sqlsrcPrefix, sqlsrcApp)
if logger.GetLogger().V(logger.Debug) {
msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey])
logger.GetLogger().Log(logger.Debug, msg)
msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey])
logger.GetLogger().Log(logger.Debug, msg)
}
}
for bindName0, bindValue := range contextBinds {
Expand All @@ -200,8 +203,8 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
}
concatKey := fmt.Sprintf("%d|%s|%s", sqlhash, bindName, bindValue)
if logger.GetLogger().V(logger.Debug) {
msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey)
logger.GetLogger().Log(logger.Debug, msg)
msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey)
logger.GetLogger().Log(logger.Debug, msg)
}
entry, ok := bindCounts[concatKey]
if !ok {
Expand All @@ -210,7 +213,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
Name: bindName,
Value: bindValue,
Workers: make(map[string]*WorkerClient),
}
}
bindCounts[concatKey] = entry
}

Expand All @@ -227,7 +230,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
bindName := entry.Name
bindValue := entry.Value

if len(entry.Workers) < int( float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers) ) {
if len(entry.Workers) < int(float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers)) {
continue
}
// evict sqlhash, bindvalue
Expand All @@ -241,7 +244,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {

if mgr.dispatchedWorkers[worker] != ticket ||
worker.Status == wsFnsh ||
worker.isUnderRecovery == 1 /* Recover() uses compare & swap */ {
atomic.LoadInt32(&worker.isUnderRecovery) == 1 /* Recover() uses compare & swap */ {

continue
}
Expand Down Expand Up @@ -274,10 +277,10 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
throttle.incrAllowEveryX()
} else {
throttle := BindThrottle{
Name: bindName,
Value: bindValue,
Sqlhash: sqlhash,
AllowEveryX: 3*len(entry.Workers) + 1,
Name: bindName,
Value: bindValue,
Sqlhash: sqlhash,
AllowEveryX: 3*len(entry.Workers) + 1,
}
now := time.Now()
throttle.RecentAttempt.Store(&now)
Expand Down Expand Up @@ -464,7 +467,7 @@ func (mgr *adaptiveQueueManager) getWorkerToRecover() (*WorkerClient, bool) {
}
}
} else {
if worker != nil && worker.Status == wsFnsh {
if worker != nil && worker.Status == wsFnsh {
if logger.GetLogger().V(logger.Warning) {
logger.GetLogger().Log(logger.Warning, "worker.pid state is in FNSH, so skipping", worker.pid)
}
Expand Down
7 changes: 4 additions & 3 deletions lib/workerbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package lib

import (
"errors"
"github.com/paypal/hera/utility/logger"
"os"
"os/signal"
"sync"
"syscall"

"github.com/paypal/hera/utility/logger"
"time"
)

// HeraWorkerType defines the possible worker type
Expand Down Expand Up @@ -291,7 +291,7 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) {
if errors.Is(err, syscall.ECHILD) {
break
} else {
logger.GetLogger().Log(logger.Verbose, "error in wait signal: ", err)
logger.GetLogger().Log(logger.Warning, "error in wait signal: ", err)
}
}
}
Expand Down Expand Up @@ -327,6 +327,7 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) {
logger.GetLogger().Log(logger.Debug, "worker (id=", workerclient.ID, "pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.")
}
workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long
time.Sleep(5 * time.Second)
pool.RestartWorker(workerclient)
}
} else {
Expand Down
22 changes: 12 additions & 10 deletions lib/workerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ type WorkerClient struct {
rqId uint32

//
// under recovery. 0: no; 1: yes. use atomic.CompareAndSwapInt32 to check state.
// under recovery. 0: no; 1: yes. use atomic.CompareAndSwapInt32 to check state and use atomic.LoadInt32 to read state
//
isUnderRecovery int32

Expand Down Expand Up @@ -204,7 +204,7 @@ func NewWorker(wid int, wType HeraWorkerType, instID int, shardID int, moduleNam
}
// TODO
worker.racID = -1
worker.isUnderRecovery = 0
atomic.CompareAndSwapInt32(&worker.isUnderRecovery, 1, 0)
if worker.ctrlCh != nil {
close(worker.ctrlCh)
}
Expand All @@ -214,6 +214,7 @@ func NewWorker(wid int, wType HeraWorkerType, instID int, shardID int, moduleNam
// msg. if adaptiveqmgr blocks on a non-buffered channel, there is a deadlock when return worker
//
worker.ctrlCh = make(chan *workerMsg, 5)

return worker
}

Expand Down Expand Up @@ -590,7 +591,7 @@ func (worker *WorkerClient) initiateRecover(param int, p *WorkerPool, prior Hera
param = common.StrandedSkipBreakHiLoad
}
} else {
rv = time.After(time.Millisecond * time.Duration(GetConfig().StrandedWorkerTimeoutMs))
rv = time.After(time.Millisecond * 100000)
}
buff := []byte{byte(param), byte((worker.rqId & 0xFF000000) >> 24), byte((worker.rqId & 0x00FF0000) >> 16),
byte((worker.rqId & 0x0000FF00) >> 8), byte((worker.rqId & 0x000000FF))}
Expand Down Expand Up @@ -643,9 +644,6 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "worker already underrecovery: ", worker.ID, " process Id: ", worker.pid)
}
//
// defer will not be called.
//
return
}
defer func() {
Expand Down Expand Up @@ -725,7 +723,6 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor
logger.GetLogger().Log(logger.Info, "stranded conn recovered", worker.Type, worker.pid)
}
worker.callogStranded("RECOVERED", info)

worker.setState(wsFnsh)
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, fmt.Sprintf("worker Id: %d, worker process: %d recovered as part of message from channel set status to FINSH", worker.ID, worker.pid))
Expand Down Expand Up @@ -918,7 +915,7 @@ func (worker *WorkerClient) doRead() {
worker.setState(wsWait)
}
if eor != common.EORMoreIncomingRequests {
worker.outCh <- &workerMsg{data: payload, eor: true, free: (eor == common.EORFree), inTransaction: ((eor == common.EORInTransaction) || (eor == common.EORInCursorInTransaction)), rqId: rqId}
worker.outCh <- &workerMsg{data: payload, eor: true, free: (eor == common.EORFree), inTransaction: ((eor == common.EORInTransaction) || (eor == common.EORInCursorInTransaction)), rqId: uint32(rqId)}
payload = nil
} else {
// buffer data to avoid race condition
Expand Down Expand Up @@ -955,8 +952,13 @@ func (worker *WorkerClient) doRead() {

// Write sends a message to the worker
func (worker *WorkerClient) Write(ns *netstring.Netstring, nsCount uint16) error {
if atomic.LoadInt32(&worker.isUnderRecovery) == 1 {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "workerclient write error: worker is under recovery.")
}
return ErrWorkerFail
}
worker.setState(wsBusy)

worker.rqId += uint32(nsCount)

//
Expand Down Expand Up @@ -984,7 +986,7 @@ func (worker *WorkerClient) setState(status HeraWorkerStatus) {
if currentStatus == status {
return
}
if worker.isUnderRecovery == 1 && (status == wsWait || status == wsBusy) {
if atomic.LoadInt32(&worker.isUnderRecovery) == 1 && (status == wsWait || status == wsBusy) {
logger.GetLogger().Log(logger.Warning, "worker : ", worker.ID, "processId: ", worker.pid, " seeing invalid state transition from ", currentStatus, " to ", status)
if logger.GetLogger().V(logger.Debug) {
worker.printCallStack()
Expand Down
2 changes: 1 addition & 1 deletion lib/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (pool *WorkerPool) RestartWorker(worker *WorkerClient) (err error) {
}
pool.activeQ.Remove(worker)
pool.poolCond.L.Unlock()

time.Sleep(time.Millisecond * 3000)
go pool.spawnWorker(worker.ID)
return nil
}
Expand Down
Loading

0 comments on commit 986a1b9

Please sign in to comment.