From 986a1b9c63383d5ebf9077a840a6a34cf71fc2e2 Mon Sep 17 00:00:00 2001 From: Rajesh Samala Date: Fri, 17 Jan 2025 11:17:31 +0530 Subject: [PATCH] fixing race condition related to worker recovery issue --- lib/adaptivequemgr.go | 47 ++++----- lib/workerbroker.go | 7 +- lib/workerclient.go | 22 +++-- lib/workerpool.go | 2 +- tests/unittest/sqlEvict/main_test.go | 140 +++++++++++++++++++++++++-- 5 files changed, 174 insertions(+), 44 deletions(-) diff --git a/lib/adaptivequemgr.go b/lib/adaptivequemgr.go index 8e49e2e0..4fdd2860 100644 --- a/lib/adaptivequemgr.go +++ b/lib/adaptivequemgr.go @@ -21,8 +21,8 @@ import ( "errors" "fmt" "math/rand" - "sync/atomic" "strings" + "sync/atomic" "time" "github.com/paypal/hera/cal" @@ -118,7 +118,7 @@ type BindCount struct { Workers map[string]*WorkerClient // lookup by ticket } -func bindEvictNameOk(bindName string) (bool) { +func bindEvictNameOk(bindName string) bool { commaNames := GetConfig().BindEvictionNames if len(commaNames) == 0 { // for tests, allow all names to be subject to bind eviction @@ -126,7 +126,7 @@ func bindEvictNameOk(bindName string) (bool) { } commaNames = strings.ToLower(commaNames) bindName = strings.ToLower(bindName) - for _, okSubname := range strings.Split(commaNames,",") { + for _, okSubname := range strings.Split(commaNames, ",") { if strings.Contains(bindName, okSubname) { return true } @@ -134,12 +134,15 @@ func bindEvictNameOk(bindName string) (bool) { return false } -/* A bad query with multiple binds will add independent bind throttles to all -bind name and values */ -func (mgr *adaptiveQueueManager) doBindEviction() (int) { +/* + A bad query with multiple binds will add independent bind throttles to all + +bind name and values +*/ +func (mgr *adaptiveQueueManager) doBindEviction() int { throttleCount := 0 GetBindEvict().lock.Lock() - for _,keyValues := range GetBindEvict().BindThrottle { + for _, keyValues := range GetBindEvict().BindThrottle { throttleCount += len(keyValues) } GetBindEvict().lock.Unlock() @@ -172,14 +175,14 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { } continue } - contextBinds := parseBinds(request) - sqlsrcPrefix := worker.clientHostPrefix.Load().(string) - sqlsrcApp := worker.clientApp.Load().(string) + contextBinds := parseBinds(request) + sqlsrcPrefix := worker.clientHostPrefix.Load().(string) + sqlsrcApp := worker.clientApp.Load().(string) if sqlsrcPrefix != "" { contextBinds[SrcPrefixAppKey] = fmt.Sprintf("%s%s", sqlsrcPrefix, sqlsrcApp) if logger.GetLogger().V(logger.Debug) { - msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey]) - logger.GetLogger().Log(logger.Debug, msg) + msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey]) + logger.GetLogger().Log(logger.Debug, msg) } } for bindName0, bindValue := range contextBinds { @@ -200,8 +203,8 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { } concatKey := fmt.Sprintf("%d|%s|%s", sqlhash, bindName, bindValue) if logger.GetLogger().V(logger.Debug) { - msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey) - logger.GetLogger().Log(logger.Debug, msg) + msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey) + logger.GetLogger().Log(logger.Debug, msg) } entry, ok := bindCounts[concatKey] if !ok { @@ -210,7 +213,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { Name: bindName, Value: bindValue, Workers: make(map[string]*WorkerClient), - } + } bindCounts[concatKey] = entry } @@ -227,7 +230,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { bindName := entry.Name bindValue := entry.Value - if len(entry.Workers) < int( float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers) ) { + if len(entry.Workers) < int(float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers)) { continue } // evict sqlhash, bindvalue @@ -241,7 +244,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { if mgr.dispatchedWorkers[worker] != ticket || worker.Status == wsFnsh || - worker.isUnderRecovery == 1 /* Recover() uses compare & swap */ { + atomic.LoadInt32(&worker.isUnderRecovery) == 1 /* Recover() uses compare & swap */ { continue } @@ -274,10 +277,10 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { throttle.incrAllowEveryX() } else { throttle := BindThrottle{ - Name: bindName, - Value: bindValue, - Sqlhash: sqlhash, - AllowEveryX: 3*len(entry.Workers) + 1, + Name: bindName, + Value: bindValue, + Sqlhash: sqlhash, + AllowEveryX: 3*len(entry.Workers) + 1, } now := time.Now() throttle.RecentAttempt.Store(&now) @@ -464,7 +467,7 @@ func (mgr *adaptiveQueueManager) getWorkerToRecover() (*WorkerClient, bool) { } } } else { - if worker != nil && worker.Status == wsFnsh { + if worker != nil && worker.Status == wsFnsh { if logger.GetLogger().V(logger.Warning) { logger.GetLogger().Log(logger.Warning, "worker.pid state is in FNSH, so skipping", worker.pid) } diff --git a/lib/workerbroker.go b/lib/workerbroker.go index 8d44b504..0c1035fc 100644 --- a/lib/workerbroker.go +++ b/lib/workerbroker.go @@ -19,12 +19,12 @@ package lib import ( "errors" + "github.com/paypal/hera/utility/logger" "os" "os/signal" "sync" "syscall" - - "github.com/paypal/hera/utility/logger" + "time" ) // HeraWorkerType defines the possible worker type @@ -291,7 +291,7 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) { if errors.Is(err, syscall.ECHILD) { break } else { - logger.GetLogger().Log(logger.Verbose, "error in wait signal: ", err) + logger.GetLogger().Log(logger.Warning, "error in wait signal: ", err) } } } @@ -327,6 +327,7 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) { logger.GetLogger().Log(logger.Debug, "worker (id=", workerclient.ID, "pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.") } workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long + time.Sleep(5 * time.Second) pool.RestartWorker(workerclient) } } else { diff --git a/lib/workerclient.go b/lib/workerclient.go index 4572d000..fff56c49 100644 --- a/lib/workerclient.go +++ b/lib/workerclient.go @@ -148,7 +148,7 @@ type WorkerClient struct { rqId uint32 // - // under recovery. 0: no; 1: yes. use atomic.CompareAndSwapInt32 to check state. + // under recovery. 0: no; 1: yes. use atomic.CompareAndSwapInt32 to check state and use atomic.LoadInt32 to read state // isUnderRecovery int32 @@ -204,7 +204,7 @@ func NewWorker(wid int, wType HeraWorkerType, instID int, shardID int, moduleNam } // TODO worker.racID = -1 - worker.isUnderRecovery = 0 + atomic.CompareAndSwapInt32(&worker.isUnderRecovery, 1, 0) if worker.ctrlCh != nil { close(worker.ctrlCh) } @@ -214,6 +214,7 @@ func NewWorker(wid int, wType HeraWorkerType, instID int, shardID int, moduleNam // msg. if adaptiveqmgr blocks on a non-buffered channel, there is a deadlock when return worker // worker.ctrlCh = make(chan *workerMsg, 5) + return worker } @@ -590,7 +591,7 @@ func (worker *WorkerClient) initiateRecover(param int, p *WorkerPool, prior Hera param = common.StrandedSkipBreakHiLoad } } else { - rv = time.After(time.Millisecond * time.Duration(GetConfig().StrandedWorkerTimeoutMs)) + rv = time.After(time.Millisecond * 100000) } buff := []byte{byte(param), byte((worker.rqId & 0xFF000000) >> 24), byte((worker.rqId & 0x00FF0000) >> 16), byte((worker.rqId & 0x0000FF00) >> 8), byte((worker.rqId & 0x000000FF))} @@ -643,9 +644,6 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor if logger.GetLogger().V(logger.Debug) { logger.GetLogger().Log(logger.Debug, "worker already underrecovery: ", worker.ID, " process Id: ", worker.pid) } - // - // defer will not be called. - // return } defer func() { @@ -725,7 +723,6 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor logger.GetLogger().Log(logger.Info, "stranded conn recovered", worker.Type, worker.pid) } worker.callogStranded("RECOVERED", info) - worker.setState(wsFnsh) if logger.GetLogger().V(logger.Debug) { logger.GetLogger().Log(logger.Debug, fmt.Sprintf("worker Id: %d, worker process: %d recovered as part of message from channel set status to FINSH", worker.ID, worker.pid)) @@ -918,7 +915,7 @@ func (worker *WorkerClient) doRead() { worker.setState(wsWait) } if eor != common.EORMoreIncomingRequests { - worker.outCh <- &workerMsg{data: payload, eor: true, free: (eor == common.EORFree), inTransaction: ((eor == common.EORInTransaction) || (eor == common.EORInCursorInTransaction)), rqId: rqId} + worker.outCh <- &workerMsg{data: payload, eor: true, free: (eor == common.EORFree), inTransaction: ((eor == common.EORInTransaction) || (eor == common.EORInCursorInTransaction)), rqId: uint32(rqId)} payload = nil } else { // buffer data to avoid race condition @@ -955,8 +952,13 @@ func (worker *WorkerClient) doRead() { // Write sends a message to the worker func (worker *WorkerClient) Write(ns *netstring.Netstring, nsCount uint16) error { + if atomic.LoadInt32(&worker.isUnderRecovery) == 1 { + if logger.GetLogger().V(logger.Alert) { + logger.GetLogger().Log(logger.Alert, "workerclient write error: worker is under recovery.") + } + return ErrWorkerFail + } worker.setState(wsBusy) - worker.rqId += uint32(nsCount) // @@ -984,7 +986,7 @@ func (worker *WorkerClient) setState(status HeraWorkerStatus) { if currentStatus == status { return } - if worker.isUnderRecovery == 1 && (status == wsWait || status == wsBusy) { + if atomic.LoadInt32(&worker.isUnderRecovery) == 1 && (status == wsWait || status == wsBusy) { logger.GetLogger().Log(logger.Warning, "worker : ", worker.ID, "processId: ", worker.pid, " seeing invalid state transition from ", currentStatus, " to ", status) if logger.GetLogger().V(logger.Debug) { worker.printCallStack() diff --git a/lib/workerpool.go b/lib/workerpool.go index 50aab16f..d64bae50 100644 --- a/lib/workerpool.go +++ b/lib/workerpool.go @@ -193,7 +193,7 @@ func (pool *WorkerPool) RestartWorker(worker *WorkerClient) (err error) { } pool.activeQ.Remove(worker) pool.poolCond.L.Unlock() - + time.Sleep(time.Millisecond * 3000) go pool.spawnWorker(worker.ID) return nil } diff --git a/tests/unittest/sqlEvict/main_test.go b/tests/unittest/sqlEvict/main_test.go index 5cf84296..5a704796 100644 --- a/tests/unittest/sqlEvict/main_test.go +++ b/tests/unittest/sqlEvict/main_test.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "math/rand" "os" "testing" "time" @@ -33,11 +34,11 @@ func cfg() (map[string]string, map[string]string, testutil.WorkerType) { appcfg["bind_eviction_threshold_pct"] = "50" appcfg["request_backlog_timeout"] = "1000" - appcfg["soft_eviction_probability"] = "100" + appcfg["soft_eviction_probability"] = "10" opscfg := make(map[string]string) - max_conn = 25 - opscfg["opscfg.default.server.max_connections"] = fmt.Sprintf("%d", int(max_conn)) + max_conn = 50 + opscfg["opscfg.default.server.max_connections"] = fmt.Sprintf("%d", 10) opscfg["opscfg.default.server.log_level"] = "5" opscfg["opscfg.default.server.saturation_recover_threshold"] = "10" @@ -99,6 +100,56 @@ func sleepyQ(conn *sql.Conn, delayRow int) error { return nil } +func sleepyDmlQ(conn *sql.Conn, delayRow int) error { + inserQuery := "insert into sleep_info (id,seconds) values (:id, sleep_option(:seconds))" + updateQuery := "update sleep_info set seconds = sleep_option(:seconds) where id=:id" + defer func(conn *sql.Conn) { + err := conn.Close() + if err != nil { + fmt.Printf("Error closing conn %s\n", err.Error()) + } + }(conn) + tx, _ := conn.BeginTx(context.Background(), nil) + inst1, err := conn.PrepareContext(context.Background(), inserQuery) + if err != nil { + fmt.Printf("Error preparing sleepyDmlQ %s\n", err.Error()) + return err + } + defer func(inst1 *sql.Stmt) { + err := inst1.Close() + if err != nil { + fmt.Printf("Error closing insert statement sleepyDmlQ %s\n", err.Error()) + } + }(inst1) + _, err = inst1.ExecContext(context.Background(), sql.Named("id", rand.Int()), sql.Named("seconds", delayRow)) + if err != nil { + fmt.Printf("Error query sleepyDmlQ %s\n", err.Error()) + return err + } + updateStmt, err := conn.PrepareContext(context.Background(), updateQuery) + if err != nil { + fmt.Printf("Error preparing sleepyDmlQ %s\n", err.Error()) + return err + } + defer func(updateStmt *sql.Stmt) { + err := updateStmt.Close() + if err != nil { + fmt.Printf("Error closing update statement sleepyDmlQ %s\n", err.Error()) + } + }(updateStmt) + _, err = updateStmt.ExecContext(context.Background(), sql.Named("id", rand.Int()), sql.Named("seconds", delayRow)) + if err != nil { + fmt.Printf("Error query sleepyDmlQ %s\n", err.Error()) + return err + } + err = tx.Commit() + if err != nil { + fmt.Printf("Error committing sleepyDmlQ %s\n", err.Error()) + return err + } + return nil +} + func simpleEvict() { db, err := sql.Open("hera", "127.0.0.1:31002") if err != nil { @@ -157,10 +208,7 @@ func TestSqlEvict(t *testing.T) { simpleEvict() if testutil.RegexCountFile("HERA-100: backlog timeout", "hera.log") == 0 { t.Fatal("backlog timeout was not triggered") - } // */ - /* if (testutil.RegexCountFile("coordinator dispatchrequest: no worker HERA-102: backlog eviction", "hera.log") == 0) { - t.Fatal("backlog eviction was not triggered") - } // */ + } if testutil.RegexCountFile("coordinator dispatchrequest: no worker HERA-104: saturation soft sql eviction", "hera.log") == 0 { t.Fatal("soft eviction was not triggered") } @@ -168,5 +216,81 @@ func TestSqlEvict(t *testing.T) { t.Fatal("eviction was not triggered") } logger.GetLogger().Log(logger.Debug, "TestSqlEvict stop +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") - time.Sleep(2 * time.Second) + time.Sleep(10 * time.Second) } // */ + +func TestSqlEvictDML(t *testing.T) { + logger.GetLogger().Log(logger.Debug, "TestSqlEvictDML begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + dmlEvict() + if testutil.RegexCountFile("HERA-100: backlog timeout", "hera.log") == 0 { + t.Fatal("backlog timeout was not triggered") + } + if testutil.RegexCountFile("coordinator dispatchrequest: no worker HERA-104: saturation soft sql eviction", "hera.log") == 0 { + t.Fatal("soft eviction was not triggered") + } + if testutil.RegexCountFile("coordinator dispatchrequest: stranded conn HERA-101: saturation kill", "hera.log") == 0 { + t.Fatal("eviction was not triggered") + } + logger.GetLogger().Log(logger.Debug, "TestSqlEvictDML stop +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + time.Sleep(10 * time.Second) +} + +func dmlEvict() { + db, err := sql.Open("hera", "127.0.0.1:31002") + if err != nil { + fmt.Printf("Error db %s\n", err.Error()) + return + } + db.SetConnMaxLifetime(2 * time.Second) + db.SetMaxIdleConns(0) + db.SetMaxOpenConns(22111) + defer func(db *sql.DB) { + err := db.Close() + if err != nil { + fmt.Printf("Error closing db %s\n", err.Error()) + } + }(db) + + conn, err := db.Conn(context.Background()) + if err != nil { + fmt.Printf("Error conn %s\n", err.Error()) + return + } + err = sleepyDmlQ(conn, 1600) + if err != nil { + fmt.Printf("Error Executing first sleepyDmlQ %s\n", err.Error()) + return + } + + for i := 0; i < int(max_conn)+1; i++ { + conn, err := db.Conn(context.Background()) + if err != nil { + fmt.Printf("Error #%d conn %s\n", i, err.Error()) + continue + } + time.Sleep(time.Millisecond * 100) + fmt.Printf("connection count %d\n", i) + go func(index int) { + err := sleepyDmlQ(conn, 1600) + if err != nil { + fmt.Printf("Long query Request Id: %d Error executing the sleepyDmlQ %s\n", index, err.Error()) + } + }(i) + } + + for i := 0; i < 50; i++ { + conn, err := db.Conn(context.Background()) + if err != nil { + fmt.Printf("Error #%d conn %s\n", i, err.Error()) + continue + } + time.Sleep(time.Millisecond * 100) + fmt.Printf("connection count %d\n", i) + go func(index int) { + err := sleepyDmlQ(conn, 1600) + if err != nil { + fmt.Printf("Request id: %d Error executing the sleepyDmlQ %s\n", index, err.Error()) + } + }(i) + } +}