From e2f360164b772772387d0841cccb50159b478068 Mon Sep 17 00:00:00 2001 From: Rajesh S <105205300+rasamala83@users.noreply.github.com> Date: Thu, 16 May 2024 10:17:34 +0530 Subject: [PATCH 1/5] adding changes to disable automatic bind throttle (#392) * adding changes to disable automatic bind throttle * updating values bind throttle decrese per sec and removed unused code * updating bind eviction test * fixing review comment * fixing review comment * adding test for if rate limit table not exist or empty * move tests to different package to avoid running them in parallel * updating sleep time in tests * added changes for increase throttling recovery speed * changes for updating text check condition in test code * reverting changes for bind throttle * reverted partial changes for local copy of bindEvict object it is going taken care separate change request * changes for simplifying test code for qbb --------- Co-authored-by: Rajesh S --- lib/bindevict.go | 16 +- lib/config.go | 26 +- lib/querybindblocker.go | 66 ++--- tests/unittest/bindThrottle/main_test.go | 242 ++++++++++++++++++ tests/unittest/querybindblocker/main_test.go | 188 +++++++++----- .../main_test.go | 68 +++++ 6 files changed, 491 insertions(+), 115 deletions(-) create mode 100644 tests/unittest/bindThrottle/main_test.go create mode 100644 tests/unittest/querybindblocker_ratelimit_table_empty/main_test.go diff --git a/lib/bindevict.go b/lib/bindevict.go index a31585b6..67d5bb22 100644 --- a/lib/bindevict.go +++ b/lib/bindevict.go @@ -42,21 +42,21 @@ type BindEvict struct { // evicted binds get throttled to have overall steady state during bad bind queries // nested map uses sqlhash "bindName|bindValue" BindThrottle map[uint32]map[string]*BindThrottle - lock sync.Mutex + lock sync.Mutex } func GetBindEvict() *BindEvict { cfg := gBindEvict.Load() if cfg == nil { - out := BindEvict{BindThrottle:make(map[uint32]map[string]*BindThrottle)} + out := BindEvict{BindThrottle: make(map[uint32]map[string]*BindThrottle)} gBindEvict.Store(&out) return &out } return cfg.(*BindEvict) } func (this *BindEvict) Copy() *BindEvict { - out := BindEvict{BindThrottle:make(map[uint32]map[string]*BindThrottle)} - for k,v := range this.BindThrottle { + out := BindEvict{BindThrottle: make(map[uint32]map[string]*BindThrottle)} + for k, v := range this.BindThrottle { out.BindThrottle[k] = v } return &out @@ -77,7 +77,7 @@ func NormalizeBindName(bindName0 string) string { func (entry *BindThrottle) decrAllowEveryX(y int) { if y >= 2 && logger.GetLogger().V(logger.Warning) { - info := fmt.Sprintf("hash:%d bindName:%s val:%s allowEveryX:%d-%d",entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX, y) + info := fmt.Sprintf("hash:%d bindName:%s val:%s allowEveryX:%d-%d", entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX, y) logger.GetLogger().Log(logger.Warning, "bind throttle decr", info) } entry.AllowEveryX -= y @@ -96,7 +96,7 @@ func (entry *BindThrottle) decrAllowEveryX(y int) { // copy everything except bindKV (skipping it is deleting it) bindKV := fmt.Sprintf("%s|%s", entry.Name, entry.Value) updateCopy := make(map[string]*BindThrottle) - for k,v := range GetBindEvict().BindThrottle[entry.Sqlhash] { + for k, v := range GetBindEvict().BindThrottle[entry.Sqlhash] { if k == bindKV { continue } @@ -107,7 +107,7 @@ func (entry *BindThrottle) decrAllowEveryX(y int) { } func (entry *BindThrottle) incrAllowEveryX() { if logger.GetLogger().V(logger.Warning) { - info := fmt.Sprintf("hash:%d bindName:%s val:%s prev:%d",entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX) + info := fmt.Sprintf("hash:%d bindName:%s val:%s prev:%d", entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX) logger.GetLogger().Log(logger.Warning, "bind throttle incr", info) } entry.AllowEveryX = 3*entry.AllowEveryX + 1 @@ -149,7 +149,7 @@ func (be *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavy entry.RecentAttempt.Store(&now) entry.AllowEveryXCount++ if entry.AllowEveryXCount < entry.AllowEveryX { - return true/*block*/, entry + return true /*block*/, entry } entry.AllowEveryXCount = 0 diff --git a/lib/config.go b/lib/config.go index f0283cfa..4b8bdc66 100644 --- a/lib/config.go +++ b/lib/config.go @@ -80,11 +80,11 @@ type Config struct { // time_skew_threshold_error(15) TimeSkewThresholdErrorSec int // max_stranded_time_interval(2000) - StrandedWorkerTimeoutMs int + StrandedWorkerTimeoutMs int HighLoadStrandedWorkerTimeoutMs int - HighLoadSkipInitiateRecoverPct int - HighLoadPct int - InitLimitPct int + HighLoadSkipInitiateRecoverPct int + HighLoadPct int + InitLimitPct int // the worker scheduler policy LifoScheduler bool @@ -110,7 +110,7 @@ type Config struct { HostnamePrefix map[string]string ShardingCrossKeysErr bool - CfgFromTns bool + CfgFromTns bool CfgFromTnsOverrideNumShards int // -1 no-override CfgFromTnsOverrideTaf int // -1 no-override, 0 override-false, 1 override-true CfgFromTnsOverrideRWSplit int // -1 no-override, readChildPct @@ -156,8 +156,8 @@ type Config struct { // when numWorkers changes, it will write to this channel, for worker manager to update numWorkersCh chan int - EnableConnLimitCheck bool - EnableQueryBindBlocker bool + EnableConnLimitCheck bool + EnableQueryBindBlocker bool QueryBindBlockerMinSqlPrefix int // taf testing @@ -169,7 +169,7 @@ type Config struct { EnableDanglingWorkerRecovery bool GoStatsInterval int - RandomStartMs int + RandomStartMs int // The max number of database connections to be established per second MaxDbConnectsPerSec int @@ -274,10 +274,9 @@ func InitConfig() error { gAppConfig.StrandedWorkerTimeoutMs = cdb.GetOrDefaultInt("max_stranded_time_interval", 2000) gAppConfig.HighLoadStrandedWorkerTimeoutMs = cdb.GetOrDefaultInt("high_load_max_stranded_time_interval", 600111) gAppConfig.HighLoadSkipInitiateRecoverPct = cdb.GetOrDefaultInt("high_load_skip_initiate_recover_pct", 80) - gAppConfig.HighLoadPct = cdb.GetOrDefaultInt("high_load_pct", 130) // >100 disabled + gAppConfig.HighLoadPct = cdb.GetOrDefaultInt("high_load_pct", 130) // >100 disabled gAppConfig.InitLimitPct = cdb.GetOrDefaultInt("init_limit_pct", 125) // >100 disabled - gAppConfig.StateLogInterval = cdb.GetOrDefaultInt("state_log_interval", 1) if gAppConfig.StateLogInterval <= 0 { gAppConfig.StateLogInterval = 1 @@ -300,7 +299,7 @@ func InitConfig() error { gAppConfig.ChildExecutable = "postgresworker" } } else { - // db type is not supported + // db type is not supported return errors.New("database type must be either Oracle or MySQL") } @@ -425,9 +424,8 @@ func InitConfig() error { fmt.Sscanf(cdb.GetOrDefaultString("bind_eviction_decr_per_sec", "10.0"), "%f", &gAppConfig.BindEvictionDecrPerSec) - gAppConfig.SkipEvictRegex= cdb.GetOrDefaultString("skip_eviction_host_prefix","") - gAppConfig.EvictRegex= cdb.GetOrDefaultString("eviction_host_prefix", "") - + gAppConfig.SkipEvictRegex = cdb.GetOrDefaultString("skip_eviction_host_prefix", "") + gAppConfig.EvictRegex = cdb.GetOrDefaultString("eviction_host_prefix", "") gAppConfig.BouncerEnabled = cdb.GetOrDefaultBool("bouncer_enabled", true) gAppConfig.BouncerStartupDelay = cdb.GetOrDefaultInt("bouncer_startup_delay", 10) diff --git a/lib/querybindblocker.go b/lib/querybindblocker.go index 001e9749..8ad2ad0f 100644 --- a/lib/querybindblocker.go +++ b/lib/querybindblocker.go @@ -31,14 +31,13 @@ import ( "github.com/paypal/hera/utility/logger" ) - type QueryBindBlockerEntry struct { - Herasqlhash uint32 - Herasqltext string // prefix since some sql is too long - Bindvarname string // prefix for in clause + Herasqlhash uint32 + Herasqltext string // prefix since some sql is too long + Bindvarname string // prefix for in clause Bindvarvalue string // when set to "BLOCKALLVALUES" should block all sqltext queries - Blockperc int - Heramodule string + Blockperc int + Heramodule string } type QueryBindBlockerCfg struct { @@ -48,7 +47,10 @@ type QueryBindBlockerCfg struct { // check by sqltext prefix (delay to end) } -func (cfg * QueryBindBlockerCfg) IsBlocked(sqltext string, bindPairs []string) (bool,string) { +var lastLoggingTime time.Time +var defaultQBBTableMissingErrorLoggingInterval = 2 * time.Hour + +func (cfg *QueryBindBlockerCfg) IsBlocked(sqltext string, bindPairs []string) (bool, string) { sqlhash := uint32(utility.GetSQLHash(sqltext)) if logger.GetLogger().V(logger.Verbose) { logger.GetLogger().Log(logger.Verbose, fmt.Sprintf("query bind blocker sqlhash and text %d %s", sqlhash, sqltext)) @@ -70,7 +72,7 @@ func (cfg * QueryBindBlockerCfg) IsBlocked(sqltext string, bindPairs []string) ( byBindValue, ok := byBindName[bindPairs[i]] if !ok { // strip numeric suffix to try to match - withoutNumSuffix := regexp.MustCompile("[_0-9]*$").ReplaceAllString(bindPairs[i],"") + withoutNumSuffix := regexp.MustCompile("[_0-9]*$").ReplaceAllString(bindPairs[i], "") byBindValue, ok = byBindName[withoutNumSuffix] if !ok { continue @@ -118,28 +120,27 @@ func (cfg * QueryBindBlockerCfg) IsBlocked(sqltext string, bindPairs []string) ( var g_module string var gQueryBindBlockerCfg atomic.Value -func GetQueryBindBlockerCfg() (*QueryBindBlockerCfg) { - cfg := gQueryBindBlockerCfg.Load() - if cfg == nil { - return nil - } - return cfg.(*QueryBindBlockerCfg) +func GetQueryBindBlockerCfg() *QueryBindBlockerCfg { + cfg := gQueryBindBlockerCfg.Load() + if cfg == nil { + return nil + } + return cfg.(*QueryBindBlockerCfg) } - func InitQueryBindBlocker(modName string) { g_module = modName - db, err := sql.Open("heraloop", fmt.Sprintf("0:0:0")) - if err != nil { + db, err := sql.Open("heraloop", fmt.Sprintf("0:0:0")) + if err != nil { logger.GetLogger().Log(logger.Alert, "Loading query bind blocker - conn err ", err) - return - } - db.SetMaxIdleConns(0) - + return + } + db.SetMaxIdleConns(0) go func() { - time.Sleep(4*time.Second) + time.Sleep(4 * time.Second) logger.GetLogger().Log(logger.Info, "Loading query bind blocker - initial") + loadBlockQueryBind(db) c := time.Tick(11 * time.Second) for now := range c { @@ -152,11 +153,12 @@ func InitQueryBindBlocker(modName string) { func loadBlockQueryBind(db *sql.DB) { ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond) defer cancel() - conn, err := db.Conn(ctx); + conn, err := db.Conn(ctx) if err != nil { logger.GetLogger().Log(logger.Alert, "Error (conn) loading query bind blocker:", err) return } + defer conn.Close() q := fmt.Sprintf("SELECT /*queryBindBlocker*/ %ssqlhash, %ssqltext, bindvarname, bindvarvalue, blockperc, %smodule FROM %s_rate_limiter where %smodule='%s'", GetConfig().StateLogPrefix, GetConfig().StateLogPrefix, GetConfig().StateLogPrefix, GetConfig().ManagementTablePrefix, GetConfig().StateLogPrefix, g_module) logger.GetLogger().Log(logger.Info, "Loading query bind blocker meta-sql "+q) @@ -167,12 +169,18 @@ func loadBlockQueryBind(db *sql.DB) { } rows, err := stmt.QueryContext(ctx) if err != nil { - logger.GetLogger().Log(logger.Alert, "Error (query) loading query bind blocker:", err) - return + if lastLoggingTime.IsZero() || time.Since(lastLoggingTime) > defaultQBBTableMissingErrorLoggingInterval { + //In case table missing log alert event for every 2 hour + logger.GetLogger().Log(logger.Alert, "Error (query) loading query bind blocker:", err) + lastLoggingTime = time.Now() + return + } else { + return + } } defer rows.Close() - cfgLoad := QueryBindBlockerCfg{BySqlHash:make(map[uint32]map[string]map[string][]QueryBindBlockerEntry)} + cfgLoad := QueryBindBlockerCfg{BySqlHash: make(map[uint32]map[string]map[string][]QueryBindBlockerEntry)} rowCount := 0 for rows.Next() { @@ -182,9 +190,9 @@ func loadBlockQueryBind(db *sql.DB) { logger.GetLogger().Log(logger.Alert, "Error (row scan) loading query bind blocker:", err) continue } - + if len(entry.Herasqltext) < GetConfig().QueryBindBlockerMinSqlPrefix { - logger.GetLogger().Log(logger.Alert, "Error (row scan) loading query bind blocker - sqltext must be ", GetConfig().QueryBindBlockerMinSqlPrefix," bytes or more - sqlhash:", entry.Herasqlhash) + logger.GetLogger().Log(logger.Alert, "Error (row scan) loading query bind blocker - sqltext must be ", GetConfig().QueryBindBlockerMinSqlPrefix, " bytes or more - sqlhash:", entry.Herasqlhash) continue } rowCount++ @@ -200,7 +208,7 @@ func loadBlockQueryBind(db *sql.DB) { } bindVal, ok := bindName[entry.Bindvarvalue] if !ok { - bindVal = make([]QueryBindBlockerEntry,0) + bindVal = make([]QueryBindBlockerEntry, 0) bindName[entry.Bindvarvalue] = bindVal } bindName[entry.Bindvarvalue] = append(bindName[entry.Bindvarvalue], entry) diff --git a/tests/unittest/bindThrottle/main_test.go b/tests/unittest/bindThrottle/main_test.go new file mode 100644 index 00000000..7cdede41 --- /dev/null +++ b/tests/unittest/bindThrottle/main_test.go @@ -0,0 +1,242 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "os" + "testing" + "time" + + //"github.com/paypal/hera/client/gosqldriver" + _ "github.com/paypal/hera/client/gosqldriver/tcp" /*to register the driver*/ + + "github.com/paypal/hera/tests/unittest/testutil" + "github.com/paypal/hera/utility/logger" +) + +var mx testutil.Mux +var tableName string +var max_conn float64 + +func cfg() (map[string]string, map[string]string, testutil.WorkerType) { + + appcfg := make(map[string]string) + // best to chose an "unique" port in case golang runs tests in paralel + appcfg["bind_port"] = "31002" + appcfg["log_level"] = "5" + appcfg["log_file"] = "hera.log" + appcfg["sharding_cfg_reload_interval"] = "0" + appcfg["rac_sql_interval"] = "0" + appcfg["child.executable"] = "mysqlworker" + appcfg["bind_eviction_names"] = "p" + appcfg["bind_eviction_threshold_pct"] = "50" + + appcfg["request_backlog_timeout"] = "1000" + appcfg["soft_eviction_probability"] = "100" + + opscfg := make(map[string]string) + max_conn = 25 + opscfg["opscfg.default.server.max_connections"] = fmt.Sprintf("%d", int(max_conn)) + opscfg["opscfg.default.server.log_level"] = "5" + + opscfg["opscfg.default.server.saturation_recover_threshold"] = "10" + //opscfg["opscfg.default.server.saturation_recover_throttle_rate"]= "100" + opscfg["opscfg.hera.server.saturation_recover_throttle_rate"] = "100" + // saturation_recover_throttle_rate + + return appcfg, opscfg, testutil.MySQLWorker +} + +func before() error { + fmt.Printf("before run mysql") + testutil.RunMysql("create table sleep_info (id bigint, seconds float);") + testutil.RunMysql("insert into sleep_info (id,seconds) values(10, 0.01);") + testutil.RunMysql("insert into sleep_info (id,seconds) values(100, 0.1);") + testutil.RunMysql("insert into sleep_info (id,seconds) values(1600, 2.6);") + testutil.RunMysql("insert into sleep_info (id,seconds) values(21001111, 0.1);") + testutil.RunMysql("insert into sleep_info (id,seconds) values(22001111, 0.1);") + testutil.RunMysql("insert into sleep_info (id,seconds) values(29001111, 3.9);") + out, err := testutil.RunMysql(`DELIMITER $$ +CREATE FUNCTION sleep_option (id bigint) +RETURNS float +DETERMINISTIC +BEGIN + declare dur float; + declare rv bigint; + select max(seconds) into dur from sleep_info where sleep_info.id=id; + select sleep(dur) into rv; + RETURN dur; +END$$ +DELIMITER ;`) + if err != nil { + fmt.Printf("err after run mysql " + err.Error()) + return nil + } + fmt.Printf("after run mysql " + out) // */ + return nil +} + +func TestMain(m *testing.M) { + logger.GetLogger().Log(logger.Debug, "begin 20230918kkang TestMain") + fmt.Printf("TestMain 20230918kkang\n") + os.Exit(testutil.UtilMain(m, cfg, before)) +} + +func sleepyQ(conn *sql.Conn, delayRow int) error { + stmt, err := conn.PrepareContext(context.Background(), fmt.Sprintf("select * from sleep_info where ( seconds > sleep_option(?) or seconds > 0.0 ) and id=%d", delayRow)) + if err != nil { + fmt.Printf("Error preparing sleepyQ %s\n", err.Error()) + return err + } + defer stmt.Close() + rows, err := stmt.Query(delayRow) + if err != nil { + fmt.Printf("Error query sleepyQ %s\n", err.Error()) + return err + } + defer rows.Close() + return nil +} + +var normCliErr error + +func NormCliErr() error { + if normCliErr == nil { + normCliErr = fmt.Errorf("normal client got error") + } + return normCliErr +} + +func partialBadLoad(fracBad float64) error { + db, err := sql.Open("hera", "127.0.0.1:31002") + if err != nil { + fmt.Printf("Error db %s\n", err.Error()) + return err + } + db.SetConnMaxLifetime(111 * time.Second) + db.SetMaxIdleConns(0) + db.SetMaxOpenConns(22111) + defer db.Close() + + // client threads of slow queries + var stop2 int + var stop3 int + var badCliErr string + var cliErr string + numBad := int(max_conn * fracBad) + numNorm := int(max_conn*2.1) + 1 - numBad + fmt.Printf("spawning clients bad%d norm%d\n", numBad, numNorm) + mkClients(numBad, &stop2, 29001111, "badClient", &badCliErr, db) + mkClients(numNorm, &stop3, 100, "normClient", &cliErr, db) // bind value is short, so bindevict won't trigger + time.Sleep(3000 * time.Millisecond) + + // start normal clients after initial backlog timeouts + var stop int + var normCliErrStr string + mkClients(1, &stop, 21001111, "n client", &normCliErrStr, db) + time.Sleep(1000 * time.Millisecond) + + // if we throttle down or stop, it restores + stop2 = 1 // stop bad clients + stop3 = 1 + time.Sleep(3 * time.Second) //Make sure that clear throttle + conn, err := db.Conn(context.Background()) + if err != nil { + fmt.Printf("Error conn %s\n", err.Error()) + return err + } + defer conn.Close() + err = sleepyQ(conn, 29001111) + if err != nil { + msg := fmt.Sprintf("test failed, throttle down didn't restore") + fmt.Printf("%s", msg) + return fmt.Errorf("%s", msg) + } + + stop = 1 + // tolerate soft eviction on normal client when we did not use bind eviction + if len(normCliErrStr) != 0 { + return NormCliErr() + } // */ + return nil +} + +func mkClients(num int, stop *int, bindV int, grpName string, outErr *string, db *sql.DB) { + for i := 0; i < num; i++ { + go func(clientId int) { + count := 0 + var conn *sql.Conn + var err error + var curErr string + for *stop == 0 { + nowStr := time.Now().Format("15:04:05.000000 ") + if conn == nil { + conn, err = db.Conn(context.Background()) + fmt.Printf("%s connected %d\n", grpName, clientId) + if err != nil { + fmt.Printf("%s %s Error %d conn %s\n", nowStr, grpName, clientId, err.Error()) + time.Sleep(7 * time.Millisecond) + continue + } + } + + fmt.Printf("%s %s %d loop%d %s\n", nowStr, grpName, clientId, count, time.Now().Format("20060102j150405.000000")) + err := sleepyQ(conn, bindV) + if err != nil { + if err.Error() == curErr { + fmt.Printf("%s %s %d same err twice\n", nowStr, grpName, clientId) + conn.Close() + conn = nil + } else { + curErr = err.Error() + *outErr = curErr + fmt.Printf("%s %s %d err %s\n", nowStr, grpName, clientId, curErr) + } + } + count++ + time.Sleep(10 * time.Millisecond) + } + fmt.Printf("%s %s %d END loop%d\n", time.Now().Format("15:04:05.000000 "), grpName, clientId, count) + }(i) + } +} + +func TestBindThrottle(t *testing.T) { + // we would like to clear hera.log, but even if we try, lots of messages still go there + logger.GetLogger().Log(logger.Debug, "BindThrottle +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + err := partialBadLoad(0.10) + if err != nil && err != NormCliErr() { + t.Fatalf("main step function returned err %s", err.Error()) + } + if testutil.RegexCountFile("BIND_THROTTLE", "cal.log") > 0 { + t.Fatalf("BIND_THROTTLE should not trigger") + } + if testutil.RegexCountFile("BIND_EVICT", "cal.log") > 0 { + t.Fatalf("BIND_EVICT should not trigger") + } + if testutil.RegexCountFile("HERA-10", "hera.log") == 0 { + t.Fatal("backlog timeout or saturation was not triggered") + } // */ + + logger.GetLogger().Log(logger.Debug, "BindThrottle midpt +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + err = partialBadLoad(0.8) + if err != nil { + // t.Fatalf("main step function returned err %s", err.Error()) // can be triggered since test only has one sql + } + if testutil.RegexCountFile("BIND_THROTTLE", "cal.log") < 0 { + t.Fatalf("BIND_THROTTLE should trigger") + } + if testutil.RegexCountFile("BIND_EVICT", "cal.log") == 0 { + t.Fatalf("BIND_EVICT should trigger") + } + + if testutil.RegexCountFile(".*BIND_EVICT\t1354401077\t1.*", "cal.log") < 1 { + t.Fatalf("BIND_EVICT should trigger for SQL HASH 1354401077") + } + + if testutil.RegexCountFile(".*BIND_THROTTLE\t1354401077\t1.*", "cal.log") < 1 { + t.Fatalf("BIND_THROTTLE should trigger for SQL HASH 1354401077") + } + logger.GetLogger().Log(logger.Debug, "BindThrottle done +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") +} // */ diff --git a/tests/unittest/querybindblocker/main_test.go b/tests/unittest/querybindblocker/main_test.go index a4defd1b..595a7eab 100644 --- a/tests/unittest/querybindblocker/main_test.go +++ b/tests/unittest/querybindblocker/main_test.go @@ -15,7 +15,7 @@ import ( var mx testutil.Mux func cfg() (map[string]string, map[string]string, testutil.WorkerType) { - fmt.Println ("setup() begin") + fmt.Println("setup() begin") appcfg := make(map[string]string) // best to chose an "unique" port in case golang runs tests in paralel appcfg["bind_port"] = "31002" @@ -29,7 +29,7 @@ func cfg() (map[string]string, map[string]string, testutil.WorkerType) { opscfg["opscfg.default.server.log_level"] = "5" if os.Getenv("WORKER") == "postgres" { return appcfg, opscfg, testutil.PostgresWorker - } + } return appcfg, opscfg, testutil.MySQLWorker } @@ -59,31 +59,45 @@ func TestQueryBindBlocker(t *testing.T) { ctx := context.Background() // cleanup and insert one row in the table - conn, err := db.Conn(ctx); + conn, err := db.Conn(ctx) if err != nil { t.Fatalf("Error getting connection %s\n", err.Error()) } if true { - tx0,err := conn.BeginTx(ctx, nil) - if err != nil { t.Fatalf("tx0 %s", err.Error()) } - stmtD,err := tx0.PrepareContext(ctx, "delete from hera_rate_limiter") - if err != nil { t.Fatalf("stmtD %s", err.Error()) } + tx0, err := conn.BeginTx(ctx, nil) + if err != nil { + t.Fatalf("tx0 %s", err.Error()) + } + stmtD, err := tx0.PrepareContext(ctx, "delete from hera_rate_limiter") + if err != nil { + t.Fatalf("stmtD %s", err.Error()) + } _, err = stmtD.Exec() - if err != nil { t.Fatalf("stmtD exec %s", err.Error()) } + if err != nil { + t.Fatalf("stmtD exec %s", err.Error()) + } err = tx0.Commit() - if err != nil { t.Fatalf("commit0 %s", err.Error()) } + if err != nil { + t.Fatalf("commit0 %s", err.Error()) + } - tx,err := conn.BeginTx(ctx, nil) - if err != nil { t.Fatalf("tx %s", err.Error()) } - stmt,err := tx.PrepareContext(ctx, "/*setup qbb t*/delete from qbb_test") - if err != nil { t.Fatalf("prep %s", err.Error()) } + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + t.Fatalf("tx %s", err.Error()) + } + stmt, err := tx.PrepareContext(ctx, "/*setup qbb t*/delete from qbb_test") + if err != nil { + t.Fatalf("prep %s", err.Error()) + } _, err = stmt.Exec() if err != nil { t.Fatalf("Error preparing test (delete table) %s\n", err.Error()) } - stmt,err = tx.PrepareContext(ctx, "/*setup qbb t*/insert into qbb_test(id, note) VALUES(?, ?)") - if err != nil { t.Fatalf("prep ins %s", err.Error()) } + stmt, err = tx.PrepareContext(ctx, "/*setup qbb t*/insert into qbb_test(id, note) VALUES(?, ?)") + if err != nil { + t.Fatalf("prep ins %s", err.Error()) + } _, err = stmt.Exec(11, "eleven") if err != nil { t.Fatalf("Error preparing test (create row in table) %s\n", err.Error()) @@ -95,13 +109,15 @@ func TestQueryBindBlocker(t *testing.T) { } if true { - tx,err := conn.BeginTx(ctx, nil) - if err != nil { t.Fatalf("tx findQ %s", err.Error()) } - stmt,err := tx.PrepareContext(ctx, "/*qbb_test.find*/select id, note from qbb_test where id=? for update") + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + t.Fatalf("tx findQ %s", err.Error()) + } + stmt, err := tx.PrepareContext(ctx, "/*qbb_test.find*/select id, note from qbb_test where id=? for update") if err != nil { t.Fatalf("Error prep sel %s\n", err.Error()) } - rows,err := stmt.Query(11) + rows, err := stmt.Query(11) if err != nil { t.Fatalf("Error query sel %s\n", err.Error()) } @@ -109,20 +125,30 @@ func TestQueryBindBlocker(t *testing.T) { t.Fatalf("Expected 1 row") } err = tx.Rollback() - if err != nil { t.Fatalf("rollback error %s", err.Error()) } + if err != nil { + t.Fatalf("rollback error %s", err.Error()) + } } // above baseline checks fmt.Printf("DONE DONE baseline check\n") if true { - tx0,err := conn.BeginTx(ctx, nil) - if err != nil { t.Fatalf("tx0 %s", err.Error()) } - stmtD,err := tx0.PrepareContext(ctx, "delete from hera_rate_limiter") - if err != nil { t.Fatalf("prep stmtD %s", err.Error()) } - _,err = stmtD.Exec() - if err != nil { t.Fatalf("stmtD %s", err.Error()) } - stmt,err := tx0.PrepareContext(ctx, "insert into hera_rate_limiter (herasqlhash, herasqltext, bindvarname, bindvarvalue, blockperc, heramodule, end_time, remarks) values ( ?, ?, ?, ?, ?, ?, ?, ?)") - if err != nil { t.Fatalf("ins prep %s", err.Error()) } + tx0, err := conn.BeginTx(ctx, nil) + if err != nil { + t.Fatalf("tx0 %s", err.Error()) + } + stmtD, err := tx0.PrepareContext(ctx, "delete from hera_rate_limiter") + if err != nil { + t.Fatalf("prep stmtD %s", err.Error()) + } + _, err = stmtD.Exec() + if err != nil { + t.Fatalf("stmtD %s", err.Error()) + } + stmt, err := tx0.PrepareContext(ctx, "insert into hera_rate_limiter (herasqlhash, herasqltext, bindvarname, bindvarvalue, blockperc, heramodule, end_time, remarks) values ( ?, ?, ?, ?, ?, ?, ?, ?)") + if err != nil { + t.Fatalf("ins prep %s", err.Error()) + } _, err = stmt.Exec(51938198, "/*qbb_test.find*/selec", "p1", @@ -131,74 +157,106 @@ func TestQueryBindBlocker(t *testing.T) { "hera-test", 2000111222, "block100") - if err != nil { t.Fatalf("ins exec %s", err.Error()) } + if err != nil { + t.Fatalf("ins exec %s", err.Error()) + } err = tx0.Commit() - if err != nil { t.Fatalf("commit tx0 %s", err.Error()) } + if err != nil { + t.Fatalf("commit tx0 %s", err.Error()) + } fmt.Printf("wait wait: loading basic block\n") time.Sleep(12 * time.Second) - tx,err := conn.BeginTx(ctx, nil) - if err != nil { t.Fatalf("tx %s", err.Error()) } - stmt,err = tx.PrepareContext(ctx, "/*qbb_test.find*/select id, note from qbb_test where id=? for update") + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + t.Fatalf("tx %s", err.Error()) + } + stmt, err = tx.PrepareContext(ctx, "/*qbb_test.find*/select id, note from qbb_test where id=? for update") if err != nil { t.Fatalf("Error prep sel %s\n", err.Error()) } - _,err = stmt.Query(11) + _, err = stmt.Query(11) if err == nil { t.Fatalf("Error query should have been blocked") } tx.Rollback() // can have error because connection could be closed - conn, err = db.Conn(ctx); - if err != nil { t.Fatalf("conn %s", err.Error()) } + conn, err = db.Conn(ctx) + if err != nil { + t.Fatalf("conn %s", err.Error()) + } } if true { - tx0,err := conn.BeginTx(ctx, nil) - if err != nil { t.Fatalf("tx0 %s", err.Error()) } - stmtD,err := tx0.PrepareContext(ctx, "delete from hera_rate_limiter") - if err != nil { t.Fatalf("prep err %s", err.Error()) } + tx0, err := conn.BeginTx(ctx, nil) + if err != nil { + t.Fatalf("tx0 %s", err.Error()) + } + stmtD, err := tx0.PrepareContext(ctx, "delete from hera_rate_limiter") + if err != nil { + t.Fatalf("prep err %s", err.Error()) + } _, err = stmtD.Exec() - if err != nil { t.Fatalf("stmtD %s", err.Error()) } - stmt,err := tx0.PrepareContext(ctx, "insert into hera_rate_limiter (herasqlhash, herasqltext, bindvarname, bindvarvalue, blockperc, heramodule, end_time, remarks) values ( ?, ?, ?, ?, ?, ?, 2000111222, ?)") - if err != nil { t.Fatalf("prep ins %s", err.Error()) } + if err != nil { + t.Fatalf("stmtD %s", err.Error()) + } + stmt, err := tx0.PrepareContext(ctx, "insert into hera_rate_limiter (herasqlhash, herasqltext, bindvarname, bindvarvalue, blockperc, heramodule, end_time, remarks) values ( ?, ?, ?, ?, ?, ?, 2000111222, ?)") + if err != nil { + t.Fatalf("prep ins %s", err.Error()) + } _, err = stmt.Exec(51938197, "/*qbb_test.find*/select id, note from qbb_test where id=:p1 for upd", "p1", "11", 100, "hera-test", "WrongHash") - if err != nil { t.Fatalf("exec1 %s", err.Error()) } + if err != nil { + t.Fatalf("exec1 %s", err.Error()) + } _, err = stmt.Exec(51938198, "/*bb_test.find*/select id, note from qbb_test where id=:p1 for upd", "p1", "11", 100, "hera-test", "WrongSqlText") - if err != nil { t.Fatalf("exec2 %s", err.Error()) } + if err != nil { + t.Fatalf("exec2 %s", err.Error()) + } _, err = stmt.Exec(51938198, "/*bb_test.find*/select id, note from qbb_test where id=:p1 for upd", "notId", "11", 100, "hera-test", "WrongBindName") - if err != nil { t.Fatalf("exec3 %s", err.Error()) } + if err != nil { + t.Fatalf("exec3 %s", err.Error()) + } _, err = stmt.Exec(51938198, "/*bb_test.find*/select id, note from qbb_test where id=:p1 for upd", "p1", "333", 100, "hera-test", "WrongBindVal") - if err != nil { t.Fatalf("exec4 %s", err.Error()) } + if err != nil { + t.Fatalf("exec4 %s", err.Error()) + } _, err = stmt.Exec(51938198, "/*bb_test.find*/select id, note from qbb_test where id=:p1 for upd", "p1", "11", 100, "nothera-test", "WrongBindModule") - if err != nil { t.Fatalf("exec5 %s", err.Error()) } + if err != nil { + t.Fatalf("exec5 %s", err.Error()) + } err = tx0.Commit() - if err != nil { t.Fatalf("tx0 commit %s", err.Error()) } + if err != nil { + t.Fatalf("tx0 commit %s", err.Error()) + } fmt.Printf("wait wait: loading close to block, but ultimately not\n") time.Sleep(12 * time.Second) - tx,err := conn.BeginTx(ctx, nil) - if err != nil { t.Fatalf("begin tx %s", err.Error()) } - stmt,err = tx.PrepareContext(ctx, "/*qbb_test.find*/select id, note from qbb_test where id=? for update") + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + t.Fatalf("begin tx %s", err.Error()) + } + stmt, err = tx.PrepareContext(ctx, "/*qbb_test.find*/select id, note from qbb_test where id=? for update") if err != nil { t.Fatalf("Error prep sel %s\n", err.Error()) } - _,err = stmt.Query(11) + _, err = stmt.Query(11) if err != nil { - t.Fatalf("Error query might have been erroneously blocked %s",err.Error()) + t.Fatalf("Error query might have been erroneously blocked %s", err.Error()) } err = tx.Rollback() - if err != nil { t.Fatalf("rollback %s", err.Error()) } + if err != nil { + t.Fatalf("rollback %s", err.Error()) + } } if true { - tx0,_ := conn.BeginTx(ctx, nil) - stmtD,_ := tx0.PrepareContext(ctx, "delete from hera_rate_limiter") + tx0, _ := conn.BeginTx(ctx, nil) + stmtD, _ := tx0.PrepareContext(ctx, "delete from hera_rate_limiter") stmtD.Exec() - stmt,_ := tx0.PrepareContext(ctx, "insert into hera_rate_limiter (herasqlhash, herasqltext, bindvarname, bindvarvalue, blockperc, heramodule, end_time, remarks) values ( ?, ?, ?, ?, ?, ?, ?, ?)") + stmt, _ := tx0.PrepareContext(ctx, "insert into hera_rate_limiter (herasqlhash, herasqltext, bindvarname, bindvarvalue, blockperc, heramodule, end_time, remarks) values ( ?, ?, ?, ?, ?, ?, ?, ?)") stmt.Exec(51938198, "/*qbb_test.find*/selec", "p1", @@ -213,16 +271,18 @@ func TestQueryBindBlocker(t *testing.T) { time.Sleep(12 * time.Second) countBlock := 0 - for i:=0; i<100; i++ { - conn, err = db.Conn(ctx); - if err != nil { t.Fatalf("conn %s", err.Error()) } + for i := 0; i < 100; i++ { + conn, err = db.Conn(ctx) + if err != nil { + t.Fatalf("conn %s", err.Error()) + } - tx,_ := conn.BeginTx(ctx, nil) - stmt,err := tx.PrepareContext(ctx, "/*qbb_test.find*/select id, note from qbb_test where id=? for update") + tx, _ := conn.BeginTx(ctx, nil) + stmt, err := tx.PrepareContext(ctx, "/*qbb_test.find*/select id, note from qbb_test where id=? for update") if err != nil { t.Fatalf("Error prep sel %s\n", err.Error()) } - _,err = stmt.Query(11) + _, err = stmt.Query(11) if err != nil { countBlock++ } diff --git a/tests/unittest/querybindblocker_ratelimit_table_empty/main_test.go b/tests/unittest/querybindblocker_ratelimit_table_empty/main_test.go new file mode 100644 index 00000000..c07118f2 --- /dev/null +++ b/tests/unittest/querybindblocker_ratelimit_table_empty/main_test.go @@ -0,0 +1,68 @@ +package main + +import ( + "database/sql" + "fmt" + "os" + "testing" + "time" + + "github.com/paypal/hera/tests/unittest/testutil" + "github.com/paypal/hera/utility/logger" +) + +var mx testutil.Mux + +func cfg() (map[string]string, map[string]string, testutil.WorkerType) { + fmt.Println("setup() begin") + appcfg := make(map[string]string) + // best to chose an "unique" port in case golang runs tests in paralel + appcfg["bind_port"] = "31002" + appcfg["log_level"] = "5" + appcfg["log_file"] = "hera.log" + appcfg["rac_sql_interval"] = "0" + appcfg["enable_query_bind_blocker"] = "true" + + opscfg := make(map[string]string) + opscfg["opscfg.default.server.max_connections"] = "3" + opscfg["opscfg.default.server.log_level"] = "5" + if os.Getenv("WORKER") == "postgres" { + return appcfg, opscfg, testutil.PostgresWorker + } + return appcfg, opscfg, testutil.MySQLWorker +} + +func teardown() { + mx.StopServer() +} + +func TestMain(m *testing.M) { + os.Exit(testutil.UtilMain(m, cfg, nil)) +} + +func TestQueryBindBlockerTableNotExistOrEmpty(t *testing.T) { + testutil.RunDML("DROP TABLE IF EXISTS hera_rate_limiter") + + logger.GetLogger().Log(logger.Debug, "TestQueryBindBlockerTableNotExistOrEmpty begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + + shard := 0 + db, err := sql.Open("heraloop", fmt.Sprintf("%d:0:0", shard)) + if err != nil { + t.Fatal("Error starting Mux:", err) + return + } + db.SetMaxIdleConns(0) + defer db.Close() + + time.Sleep(6 * time.Second) + if testutil.RegexCountFile("loading query bind blocker: SQL error: Error 1146", "hera.log") == 0 { + t.Fatalf("expected to see table 'hera_rate_limiter' doesn't exist error") + } + + testutil.RunDML("create table hera_rate_limiter (herasqlhash numeric not null, herasqltext varchar(4000) not null, bindvarname varchar(200) not null, bindvarvalue varchar(200) not null, blockperc numeric not null, heramodule varchar(100) not null, end_time numeric not null, remarks varchar(200) not null)") + time.Sleep(15 * time.Second) + if testutil.RegexCountFile("Loaded 0 sqlhashes, 0 entries, query bind blocker entries", "hera.log") == 0 { + t.Fatalf("expected to 0 entries from hera_rate_limiter table") + } + logger.GetLogger().Log(logger.Debug, "TestQueryBindBlockerTableNotExistOrEmpty ends +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") +} From 26a4881cecf7fd40a60112ad0074ec52c7c5f46f Mon Sep 17 00:00:00 2001 From: satyakamala03 <128077872+satyakamala03@users.noreply.github.com> Date: Mon, 24 Jun 2024 10:19:14 +0530 Subject: [PATCH 2/5] Occ config logging (#394) * occ configurations logging * cal event success * adding cal data * added TODOs * Remove log_occ_confogs.go * Remove testing files * source of configs - files * whitelist format change * code clean up * code review changes-1 * CR fixes * CR fixes * Delete tests/unittest/config_logging/main_test.go * clean up * Merge branch 'occ-config-logging' of /Users/simmidisetty/Documents/GitHub/OpenSourceHera/src/github.com/paypal/hera with conflicts. * test for config logging * removing test changes * tests for all cases * test * making minor changes for logging feature specific data * changes for incorporate review comments --------- Co-authored-by: simmidisetty Co-authored-by: Rajesh S --- lib/config.go | 157 ++++++++++++++++++++- lib/main.go | 13 ++ tests/unittest/config_logging/main_test.go | 141 ++++++++++++++++++ 3 files changed, 308 insertions(+), 3 deletions(-) create mode 100644 tests/unittest/config_logging/main_test.go diff --git a/lib/config.go b/lib/config.go index 4b8bdc66..3e8afe59 100644 --- a/lib/config.go +++ b/lib/config.go @@ -20,13 +20,13 @@ package lib import ( "errors" "fmt" + "github.com/paypal/hera/cal" + "github.com/paypal/hera/config" + "github.com/paypal/hera/utility/logger" "os" "path/filepath" "strings" "sync/atomic" - - "github.com/paypal/hera/config" - "github.com/paypal/hera/utility/logger" ) //The Config contains all the static configuration @@ -73,6 +73,9 @@ type Config struct { // config_reload_time_ms(30 * 1000) // ConfigReloadTimeMs int + // + // + ConfigLoggingReloadTimeHours int // custom_auth_timeout(1000) CustomAuthTimeoutMs int // time_skew_threshold_warn(2) @@ -268,6 +271,7 @@ func InitConfig() error { } gAppConfig.ConfigReloadTimeMs = cdb.GetOrDefaultInt("config_reload_time_ms", 30*1000) + gAppConfig.ConfigLoggingReloadTimeHours = cdb.GetOrDefaultInt("config_logging_reload_time_hours", 24) gAppConfig.CustomAuthTimeoutMs = cdb.GetOrDefaultInt("custom_auth_timeout", 1000) gAppConfig.TimeSkewThresholdWarnSec = cdb.GetOrDefaultInt("time_skew_threshold_warn", 2) gAppConfig.TimeSkewThresholdErrorSec = cdb.GetOrDefaultInt("time_skew_threshold_error", 15) @@ -464,6 +468,153 @@ func InitConfig() error { return nil } +func LogOccConfigs() { + whiteListConfigs := map[string]map[string]interface{}{ + "BACKLOG": { + "backlog_pct": gAppConfig.BacklogPct, + "request_backlog_timeout": gAppConfig.BacklogTimeoutMsec, + "short_backlog_timeout": gAppConfig.ShortBacklogTimeoutMsec, + }, + "BOUNCER": { + "bouncer_enabled": gAppConfig.BouncerEnabled, + "bouncer_startup_delay": gAppConfig.BouncerStartupDelay, + "bouncer_poll_interval_ms": gAppConfig.BouncerPollInterval, + }, + "PROFILE": { + "enable_profile": gAppConfig.EnableProfile, + "profile_http_port": gAppConfig.ProfileHTTPPort, + "profile_telnet_port": gAppConfig.ProfileTelnetPort, + }, + "SHARDING": { + "enable_sharding": gAppConfig.EnableSharding, + "use_shardmap": gAppConfig.UseShardMap, + "num_shards": gAppConfig.NumOfShards, + "shard_key_name": gAppConfig.ShardKeyName, + "max_scuttle": gAppConfig.MaxScuttleBuckets, + "scuttle_col_name": gAppConfig.ScuttleColName, + "shard_key_value_type_is_string": gAppConfig.ShardKeyValueTypeIsString, + "enable_whitelist_test": gAppConfig.EnableWhitelistTest, + "whitelist_children": gAppConfig.NumWhitelistChildren, + "sharding_postfix": gAppConfig.ShardingPostfix, + "sharding_cfg_reload_interval": gAppConfig.ShardingCfgReloadInterval, + "hostname_prefix": gAppConfig.HostnamePrefix, + "sharding_cross_keys_err": gAppConfig.ShardingCrossKeysErr, + //"enable_sql_rewrite", // not found anywhere? + "sharding_algo": gAppConfig.ShardingAlgoHash, + "cfg_from_tns_override_num_shards": gAppConfig.CfgFromTnsOverrideNumShards, + }, + "TAF": { + "enable_taf": gAppConfig.EnableTAF, + "cfg_from_tns_override_taf": gAppConfig.CfgFromTnsOverrideTaf, + "testing_enable_dml_taf": gAppConfig.TestingEnableDMLTaf, + "taf_timeout_ms": gAppConfig.TAFTimeoutMs, + "taf_bin_duration": gAppConfig.TAFBinDuration, + "taf_allow_slow_every_x": gAppConfig.TAFAllowSlowEveryX, + "taf_normally_slow_count": gAppConfig.TAFNormallySlowCount, + }, + "BIND-EVICTION": { + "child.executable": gAppConfig.ChildExecutable, + //"enable_bind_hash_logging" FOUND FOR SOME OCCs ONLY IN occ.def + "bind_eviction_threshold_pct": gAppConfig.BindEvictionThresholdPct, + "bind_eviction_decr_per_sec": gAppConfig.BindEvictionDecrPerSec, + "bind_eviction_target_conn_pct": gAppConfig.BindEvictionTargetConnPct, + "bind_eviction_max_throttle": gAppConfig.BindEvictionMaxThrottle, + "bind_eviction_names": gAppConfig.BindEvictionNames, + "skip_eviction_host_prefix": gAppConfig.SkipEvictRegex, + "eviction_host_prefix": gAppConfig.EvictRegex, + "query_bind_blocker_min_sql_prefix": gAppConfig.QueryBindBlockerMinSqlPrefix, + "enable_connlimit_check": gAppConfig.EnableConnLimitCheck, + }, + "MANUAL-RATE-LIMITER": { + "enable_query_bind_blocker": gAppConfig.EnableQueryBindBlocker, + }, + "SATURATION-RECOVERY": { + "saturation_recover_threshold": GetSatRecoverThresholdMs(), + "saturation_recover_throttle_rate": GetSatRecoverThrottleRate(), + }, + "SOFT-EVICTION": { + "soft_eviction_effective_time": gAppConfig.SoftEvictionEffectiveTimeMs, + "soft_eviction_probability": gAppConfig.SoftEvictionProbability, + }, + "WORKER-CONFIGURATIONS": { + "lifespan_check_interval": gAppConfig.lifeSpanCheckInterval, + "lifo_scheduler_enabled": gAppConfig.LifoScheduler, + //"num_workers_per_proxy", // only present in occ.def for some occs + //"max_clients_per_worker", // only present in occ.def for some occs + "max_stranded_time_interval": gAppConfig.StrandedWorkerTimeoutMs, + "high_load_max_stranded_time_interval": gAppConfig.HighLoadStrandedWorkerTimeoutMs, + "high_load_skip_initiate_recover_pct": gAppConfig.HighLoadSkipInitiateRecoverPct, + "enable_danglingworker_recovery": gAppConfig.EnableDanglingWorkerRecovery, + "max_db_connects_per_sec": gAppConfig.MaxDbConnectsPerSec, + "max_lifespan_per_child": GetMaxLifespanPerChild(), + "max_requests_per_child": GetMaxRequestsPerChild(), + "max_desire_healthy_worker_pct": gAppConfig.MaxDesiredHealthyWorkerPct, + }, + "R-W-SPLIT": { + "readonly_children_pct": gAppConfig.ReadonlyPct, + "cfg_from_tns_override_rw_split": gAppConfig.CfgFromTnsOverrideRWSplit, + }, + "RAC": { + "management_table_prefix": gAppConfig.ManagementTablePrefix, + "rac_sql_interval": gAppConfig.RacMaintReloadInterval, + "rac_restart_window": gAppConfig.RacRestartWindow, + }, + "NO-CATEGORY": { + "database_type": gAppConfig.DatabaseType, // Oracle = 0; MySQL=1; POSTGRES=2 + "cfg_from_tns": gAppConfig.CfgFromTns, + "log_level": gOpsConfig.logLevel, + "high_load_pct": gAppConfig.HighLoadPct, + "init_limit_pct": gAppConfig.InitLimitPct, + "num_standby_dbs": gAppConfig.NumStdbyDbs, + }, + } + + for feature, configs := range whiteListConfigs { + switch feature { + case "BACKLOG": + if gAppConfig.BacklogPct == 0 { + continue + } + case "BOUNCER": + if !gAppConfig.BouncerEnabled { + continue + } + case "PROFILE": + if !gAppConfig.EnableProfile { + continue + } + case "SHARDING": + if !gAppConfig.EnableSharding { + continue + } + case "TAF": + if !gAppConfig.EnableTAF { + continue + } + case "R-W-SPLIT": + if gAppConfig.ReadonlyPct == 0 { + continue + } + case "SOFT-EVICTION", "BIND-EVICTION": + if GetSatRecoverThrottleRate() <= 0 { + continue + } + case "MANUAL-RATE-LIMITER": + if !gAppConfig.EnableQueryBindBlocker { + continue + } + } + + evt := cal.NewCalEvent("OCC_CONFIG", fmt.Sprintf(feature), cal.TransOK, "") + for cfg, val := range configs { + s := fmt.Sprintf("%v", val) + evt.AddDataStr(cfg, s) + } + evt.Completed() + } + +} + // CheckOpsConfigChange checks if the ops config file needs to be reloaded and reloads it if necessary. // it is called every several seconds from a dedicated go-routine. func CheckOpsConfigChange() { diff --git a/lib/main.go b/lib/main.go index 7a6227fd..30b70f94 100644 --- a/lib/main.go +++ b/lib/main.go @@ -116,6 +116,19 @@ func Run() { } }() + //This logs the configured parameter with the feature name in the CAL log periodically based on ConfigLoggingReloadTimeHours. + LogOccConfigs() + configLoggingTicker := time.NewTicker(time.Duration(GetConfig().ConfigLoggingReloadTimeHours) * time.Hour) + defer configLoggingTicker.Stop() + go func() { + for { + select { + case <-configLoggingTicker.C: + LogOccConfigs() + } + } + }() + CheckEnableProfiling() GoStats() diff --git a/tests/unittest/config_logging/main_test.go b/tests/unittest/config_logging/main_test.go new file mode 100644 index 00000000..f9125b47 --- /dev/null +++ b/tests/unittest/config_logging/main_test.go @@ -0,0 +1,141 @@ +package main + +import ( + "context" + "database/sql" + + "fmt" + "os" + "strings" + "testing" + "time" + + //"github.com/paypal/hera/client/gosqldriver" + _ "github.com/paypal/hera/client/gosqldriver/tcp" + "github.com/paypal/hera/tests/unittest/testutil" + "github.com/paypal/hera/utility/logger" +) + +var mx testutil.Mux +var tableName string + +func cfg() (map[string]string, map[string]string, testutil.WorkerType) { + + appcfg := make(map[string]string) + // best to chose an "unique" port in case golang runs tests in paralel + appcfg["bind_port"] = "31003" + appcfg["log_level"] = "5" + appcfg["log_file"] = "hera.log" + appcfg["enable_sharding"] = "true" + appcfg["num_shards"] = "3" + appcfg["bouncer_enabled"] = "true" + appcfg["sharding_algo"] = "mod" + appcfg["shard_key_name"] = "id" + appcfg["config_logging_reload_time_hours"] = "0.0002" + pfx := os.Getenv("MGMT_TABLE_PREFIX") + if pfx != "" { + appcfg["management_table_prefix"] = pfx + } + appcfg["sharding_cfg_reload_interval"] = "3600" + appcfg["rac_sql_interval"] = "0" + //appcfg["readonly_children_pct"] = "40" + + appcfg["soft_eviction_effective_time"] = "10000" + appcfg["bind_eviction_threshold_pct"] = "60" + + opscfg := make(map[string]string) + opscfg["opscfg.default.server.max_connections"] = "3" + opscfg["opscfg.default.server.log_level"] = "5" + opscfg["opscfg.default.server.saturation_recover_throttle_rate"] = "30" + + return appcfg, opscfg, testutil.MySQLWorker +} + +func setupShardMap() { + twoTask := os.Getenv("TWO_TASK") + if !strings.HasPrefix(twoTask, "tcp") { + // not mysql + return + } + shard := 0 + db, err := sql.Open("heraloop", fmt.Sprintf("%d:0:0", shard)) + if err != nil { + testutil.Fatal("Error starting Mux:", err) + return + } + db.SetMaxIdleConns(0) + defer db.Close() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + conn, err := db.Conn(ctx) + if err != nil { + testutil.Fatalf("Error getting connection %s\n", err.Error()) + } + defer conn.Close() + + testutil.RunDML("create table hera_shard_map ( scuttle_id smallint not null, shard_id tinyint not null, status char(1) , read_status char(1), write_status char(1), remarks varchar(500))") + + for i := 0; i < 1024; i++ { + shard := 0 + if i <= 8 { + shard = i % 3 + } + testutil.RunDML(fmt.Sprintf("insert into hera_shard_map ( scuttle_id, shard_id, status, read_status, write_status ) values ( %d, %d, 'Y', 'Y', 'Y' )", i, shard)) + } +} + +func before() error { + tableName = os.Getenv("TABLE_NAME") + if tableName == "" { + tableName = "jdbc_hera_test" + } + if strings.HasPrefix(os.Getenv("TWO_TASK"), "tcp") { + // mysql + testutil.RunDML("create table jdbc_hera_test ( ID BIGINT, INT_VAL BIGINT, STR_VAL VARCHAR(500))") + } + return nil +} + +func TestMain(m *testing.M) { + os.Exit(testutil.UtilMain(m, cfg, before)) +} + +func TestConfigLogging(t *testing.T) { + logger.GetLogger().Log(logger.Debug, "TestConfigLogging setup") + setupShardMap() + logger.GetLogger().Log(logger.Debug, "TestConfigLogging begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + + hostname, _ := os.Hostname() + db, err := sql.Open("hera", hostname+":31003") + if err != nil { + t.Fatal("Error starting Mux:", err) + return + } + db.SetMaxIdleConns(0) + defer db.Close() + time.Sleep(10 * time.Second) + if testutil.RegexCountFile("OCC_CONFIG\tSHARDING", "cal.log") < 1 { + t.Fatalf("SHARDING configuration details are missing.") + } + + if testutil.RegexCountFile("OCC_CONFIG\tBACKLOG", "cal.log") < 1 { + t.Fatalf("BACKLOG config details are missing.") + } + + if testutil.RegexCountFile("OCC_CONFIG\tTAF", "cal.log") > 0 { + t.Fatalf("TAF is not enabled so we should not see TAF config logging.") + } + + if testutil.RegexCountFile("OCC_CONFIG\tR-W-SPLIT", "cal.log") > 0 { + t.Fatalf("R-W-SPLIT is not configured, it should not log R-W-SPLIT config details.") + } + + if testutil.RegexCountFile("OCC_CONFIG\tSOFT-EVICTION", "cal.log") < 1 { + t.Fatalf("Saturation recovery enabled, so it should log SOFT-EVICTION configurations") + } + + if testutil.RegexCountFile("OCC_CONFIG\tBIND-EVICTION", "cal.log") < 1 { + t.Fatalf("Saturation recovery enabled, so it should log BIND-EVICTION configurations") + } + logger.GetLogger().Log(logger.Debug, "TestShardingMod done -------------------------------------------------------------") +} From cc38b95aa5e031241bfb96dfa5adcbcbb533bcc0 Mon Sep 17 00:00:00 2001 From: Venkataraman Sridhar <89169991+venkatsridhar95@users.noreply.github.com> Date: Tue, 20 Aug 2024 21:27:19 -0700 Subject: [PATCH 3/5] add ORA-04025 to TAF retry list (#398) --- lib/coordinatortaf.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/coordinatortaf.go b/lib/coordinatortaf.go index e2ae8243..e7d4fa07 100644 --- a/lib/coordinatortaf.go +++ b/lib/coordinatortaf.go @@ -68,7 +68,7 @@ func (p *tafResponsePreproc) Write(bf []byte) (int, error) { } ora, sz := atoi(ns.Payload) switch ora { - case 3113, 3114, 3135, 12514, 3128, 3127, 3123, 3111, 3106, 1012, 28, 31, 51, 25400, 25401, 25402, 25403, 25404, 25405, 25407, 25408, 25409, 25425, 24343, 1041, 600, 700, 7445: + case 3113, 3114, 3135, 12514, 3128, 3127, 3123, 3111, 3106, 1012, 28, 31, 51, 25400, 25401, 25402, 25403, 25404, 25405, 25407, 25408, 25409, 25425, 24343, 1041, 600, 700, 7445, 4025: //for testing 962=: case 942: p.ok = false p.ora = string(ns.Payload[:sz]) From 4af36064565dbda8a25f1d1a1a14c934739ee139 Mon Sep 17 00:00:00 2001 From: Rajesh S <105205300+rasamala83@users.noreply.github.com> Date: Thu, 26 Sep 2024 10:06:41 +0530 Subject: [PATCH 4/5] changes for adding properties to callog to track rac timestamp value (#391) * changes for adding properties to callog to track rac timestamp value * added pid for worker process id in cal log * incorporate review comments for moving event logic to same loop * fixing rac maint concurrency issues * changes for racmaint in occ * fixing test bind less error * fixing test bind throttle error --------- Co-authored-by: Rajesh S --- lib/racmaint.go | 14 ++- lib/workerpool.go | 50 ++++---- tests/unittest/bindLess/main_test.go | 2 +- tests/unittest/bindThrottle/main_test.go | 2 +- tests/unittest/rac_maint_async/main_test.go | 122 ++++++++++++++++++++ 5 files changed, 157 insertions(+), 33 deletions(-) create mode 100644 tests/unittest/rac_maint_async/main_test.go diff --git a/lib/racmaint.go b/lib/racmaint.go index 27c3bbea..0221db44 100644 --- a/lib/racmaint.go +++ b/lib/racmaint.go @@ -61,12 +61,14 @@ func InitRacMaint(cmdLineModuleName string) { interval := GetConfig().RacMaintReloadInterval if interval > 0 { for i := 0; i < GetConfig().NumOfShards; i++ { - go racMaintMain(i, interval, cmdLineModuleName) + shardIndex := i //Address the behavior called variable capture. + go racMaintMain(shardIndex, interval, cmdLineModuleName) } } } // racMaintMain wakes up every n seconds (configured in "rac_sql_interval") and reads the table +// // [ManagementTablePrefix]_maint table to see if maintenance is requested func racMaintMain(shard int, interval int, cmdLineModuleName string) { if logger.GetLogger().V(logger.Debug) { @@ -109,8 +111,8 @@ func racMaintMain(shard int, interval int, cmdLineModuleName string) { } /* - racMaint is the main function for RAC maintenance processing, being called regularly. - When maintenance is planned, it calls workerpool.RacMaint to start the actuall processing +racMaint is the main function for RAC maintenance processing, being called regularly. +When maintenance is planned, it calls workerpool.RacMaint to start the actuall processing */ func racMaint(ctx context.Context, shard int, db *sql.DB, racSQL string, cmdLineModuleName string, prev map[racCfgKey]racCfg) { // @@ -220,12 +222,12 @@ func racMaint(ctx context.Context, shard int, db *sql.DB, racSQL string, cmdLine workerpool, err = GetWorkerBrokerInstance().GetWorkerPool(wtypeRW, 0, shard) } if err == nil { - go workerpool.RacMaint(racReq) + workerpool.RacMaint(racReq) } if GetConfig().ReadonlyPct > 0 { - workerpool, err := GetWorkerBrokerInstance().GetWorkerPool(wtypeRO, 0, shard) + workerpool, err = GetWorkerBrokerInstance().GetWorkerPool(wtypeRO, 0, shard) if err == nil { - go workerpool.RacMaint(racReq) + workerpool.RacMaint(racReq) } } prev[cfgKey] = row diff --git a/lib/workerpool.go b/lib/workerpool.go index d5450ca2..50aab16f 100644 --- a/lib/workerpool.go +++ b/lib/workerpool.go @@ -119,7 +119,7 @@ func (pool *WorkerPool) spawnWorker(wid int) error { worker.setState(wsSchd) millis := rand.Intn(GetConfig().RandomStartMs) if logger.GetLogger().V(logger.Alert) { - logger.GetLogger().Log(logger.Alert, wid, "randomized start ms",millis) + logger.GetLogger().Log(logger.Alert, wid, "randomized start ms", millis) } time.Sleep(time.Millisecond * time.Duration(millis)) @@ -131,7 +131,7 @@ func (pool *WorkerPool) spawnWorker(wid int) error { } millis := rand.Intn(3000) if logger.GetLogger().V(logger.Alert) { - logger.GetLogger().Log(logger.Alert, initCnt, "is too many in init state. waiting to start",wid) + logger.GetLogger().Log(logger.Alert, initCnt, "is too many in init state. waiting to start", wid) } time.Sleep(time.Millisecond * time.Duration(millis)) } @@ -233,8 +233,10 @@ func (pool *WorkerPool) WorkerReady(worker *WorkerClient) (err error) { // GetWorker gets the active worker if available. backlog with timeout if not. // // @param sqlhash to check for soft eviction against a blacklist of slow queries. -// if getworker needs to exam the incoming sql, there does not seem to be another elegant -// way to do this except to pass in the sqlhash as a parameter. +// +// if getworker needs to exam the incoming sql, there does not seem to be another elegant +// way to do this except to pass in the sqlhash as a parameter. +// // @param timeoutMs[0] timeout in milliseconds. default to adaptive queue timeout. func (pool *WorkerPool) GetWorker(sqlhash int32, timeoutMs ...int) (worker *WorkerClient, t string, err error) { if logger.GetLogger().V(logger.Debug) { @@ -559,10 +561,10 @@ func (pool *WorkerPool) ReturnWorker(worker *WorkerClient, ticket string) (err e } if skipRecycle { if logger.GetLogger().V(logger.Alert) { - logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=",pool.moduleName,"shard_id=",pool.ShardID, "HEALTHY worker Count=",pool.GetHealthyWorkersCount(),"TotalWorkers:=", pool.desiredSize) + logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=", pool.moduleName, "shard_id=", pool.ShardID, "HEALTHY worker Count=", pool.GetHealthyWorkersCount(), "TotalWorkers:=", pool.desiredSize) } calMsg := fmt.Sprintf("Recycle(worker_pid)=%d, module_name=%s,shard_id=%d", worker.pid, worker.moduleName, worker.shardID) - evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER","ReturnWorker", cal.TransOK, calMsg) + evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER", "ReturnWorker", cal.TransOK, calMsg) evt.Completed() } @@ -697,8 +699,6 @@ func (pool *WorkerPool) RacMaint(racReq racAct) { } now := time.Now().Unix() window := GetConfig().RacRestartWindow - dbUname := "" - cnt := 0 pool.poolCond.L.Lock() for i := 0; i < pool.currentSize; i++ { if (pool.workers[i] != nil) && (racReq.instID == 0 || pool.workers[i].racID == racReq.instID) && (pool.workers[i].startTime < int64(racReq.tm)) { @@ -716,23 +716,23 @@ func (pool *WorkerPool) RacMaint(racReq racAct) { } if logger.GetLogger().V(logger.Verbose) { - logger.GetLogger().Log(logger.Verbose, "Rac maint activating, worker", i, pool.workers[i].pid, "exittime=", pool.workers[i].exitTime, now, window, pool.currentSize) - } - cnt++ - if len(dbUname) == 0 { - dbUname = pool.workers[i].dbUname + logger.GetLogger().Log(logger.Verbose, "Rac maint activating, worker", i, pool.workers[i].pid, "exittime=", pool.workers[i].exitTime, now, window, pool.currentSize, "rac.req timestamp=", racReq.tm) } + //Trigger individual event for worker + evt := cal.NewCalEvent("RAC_ID", fmt.Sprintf("%d", racReq.instID), cal.TransOK, "") + evt.AddDataStr("poolModName", pool.moduleName) + evt.AddDataInt("workerId", int64(i)) + evt.AddDataInt("pid", int64(pool.workers[i].pid)) + evt.AddDataInt("shardId", int64(pool.ShardID)) + evt.AddDataInt("tm", int64(racReq.tm)) + evt.AddDataInt("exitTime", pool.workers[i].exitTime) + evt.AddDataStr("exitInSec", fmt.Sprintf("%dsec", pool.workers[i].exitTime-now)) + evt.Completed() + evt = cal.NewCalEvent("DB_UNAME", pool.workers[i].dbUname, cal.TransOK, "") + evt.Completed() } } pool.poolCond.L.Unlock() - // TODO: C++ worker logs one event for each worker, in the worker, so - // we keep the same. Think about changing it - for i := 0; i < cnt; i++ { - evt := cal.NewCalEvent("RAC_ID", fmt.Sprintf("%d", racReq.instID), cal.TransOK, "") - evt.Completed() - evt = cal.NewCalEvent("DB_UNAME", dbUname, cal.TransOK, "") - evt.Completed() - } } // checkWorkerLifespan is called periodically to check if any worker lifetime has expired and terminates it @@ -768,12 +768,12 @@ func (pool *WorkerPool) checkWorkerLifespan() { pool.poolCond.L.Lock() for i := 0; i < pool.currentSize; i++ { if (pool.workers[i] != nil) && (pool.workers[i].exitTime != 0) && (pool.workers[i].exitTime <= now) { - if pool.GetHealthyWorkersCount() < (int32(pool.desiredSize*GetConfig().MaxDesiredHealthyWorkerPct/100)) { // Should it be a config value + if pool.GetHealthyWorkersCount() < (int32(pool.desiredSize * GetConfig().MaxDesiredHealthyWorkerPct / 100)) { // Should it be a config value if logger.GetLogger().V(logger.Alert) { - logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=",pool.moduleName,"shard_id=",pool.ShardID, "HEALTHY worker Count=",pool.GetHealthyWorkersCount(),"TotalWorkers:", pool.desiredSize) + logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=", pool.moduleName, "shard_id=", pool.ShardID, "HEALTHY worker Count=", pool.GetHealthyWorkersCount(), "TotalWorkers:", pool.desiredSize) } calMsg := fmt.Sprintf("module_name=%s,shard_id=%d", pool.moduleName, pool.ShardID) - evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER","checkWorkerLifespan", cal.TransOK, calMsg) + evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER", "checkWorkerLifespan", cal.TransOK, calMsg) evt.Completed() break } @@ -814,7 +814,7 @@ func (pool *WorkerPool) checkWorkerLifespan() { pool.poolCond.L.Unlock() for _, w := range workers { if logger.GetLogger().V(logger.Info) { - logger.GetLogger().Log(logger.Info, "checkworkerlifespan - Lifespan exceeded, terminate worker: pid =", w.pid, ", pool_type =", w.Type, ", inst =", w.instID ,"HEALTHY worker Count=",pool.GetHealthyWorkersCount(),"TotalWorkers:", pool.desiredSize) + logger.GetLogger().Log(logger.Info, "checkworkerlifespan - Lifespan exceeded, terminate worker: pid =", w.pid, ", pool_type =", w.Type, ", inst =", w.instID, "HEALTHY worker Count=", pool.GetHealthyWorkersCount(), "TotalWorkers:", pool.desiredSize) } w.Terminate() } diff --git a/tests/unittest/bindLess/main_test.go b/tests/unittest/bindLess/main_test.go index b59b1f07..72c92ec4 100644 --- a/tests/unittest/bindLess/main_test.go +++ b/tests/unittest/bindLess/main_test.go @@ -211,7 +211,7 @@ func TestBindLess(t *testing.T) { logger.GetLogger().Log(logger.Debug, "TestBindLess +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") testutil.BackupAndClear("cal", "BindLess start") testutil.BackupAndClear("hera", "BindLess start") - err := partialBadLoad(0.10) + err := partialBadLoad(0.07) if err != nil && err != NormCliErr() { t.Fatalf("main step function returned err %s", err.Error()) } diff --git a/tests/unittest/bindThrottle/main_test.go b/tests/unittest/bindThrottle/main_test.go index 7cdede41..7e3c87da 100644 --- a/tests/unittest/bindThrottle/main_test.go +++ b/tests/unittest/bindThrottle/main_test.go @@ -205,7 +205,7 @@ func mkClients(num int, stop *int, bindV int, grpName string, outErr *string, db func TestBindThrottle(t *testing.T) { // we would like to clear hera.log, but even if we try, lots of messages still go there logger.GetLogger().Log(logger.Debug, "BindThrottle +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") - err := partialBadLoad(0.10) + err := partialBadLoad(0.07) if err != nil && err != NormCliErr() { t.Fatalf("main step function returned err %s", err.Error()) } diff --git a/tests/unittest/rac_maint_async/main_test.go b/tests/unittest/rac_maint_async/main_test.go new file mode 100644 index 00000000..e88e7d48 --- /dev/null +++ b/tests/unittest/rac_maint_async/main_test.go @@ -0,0 +1,122 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "math/rand" + "os" + "testing" + "time" + + "github.com/paypal/hera/tests/unittest/testutil" + "github.com/paypal/hera/utility/logger" +) + +var mx testutil.Mux +var tableName string + +func cfg() (map[string]string, map[string]string, testutil.WorkerType) { + + appcfg := make(map[string]string) + // best to chose an "unique" port in case golang runs tests in paralel + appcfg["bind_port"] = "31002" + appcfg["log_level"] = "5" + appcfg["log_file"] = "hera.log" + appcfg["sharding_cfg_reload_interval"] = "0" + appcfg["rac_sql_interval"] = "1" + + opscfg := make(map[string]string) + opscfg["opscfg.default.server.max_connections"] = "10" + opscfg["opscfg.default.server.log_level"] = "5" + + //return appcfg, opscfg, testutil.OracleWorker + return appcfg, opscfg, testutil.MySQLWorker +} + +func before() error { + os.Setenv("PARALLEL", "1") + pfx := os.Getenv("MGMT_TABLE_PREFIX") + if pfx == "" { + pfx = "hera" + } + tableName = pfx + "_maint" + return nil +} + +func TestMain(m *testing.M) { + os.Exit(testutil.UtilMain(m, cfg, before)) +} + +func TestRacMaintWithRandomStatusChangeInAsync(t *testing.T) { + logger.GetLogger().Log(logger.Debug, "TestRacMaintWithRandomStatusChangeInAsync begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + shard := 0 + db, err := sql.Open("heraloop", fmt.Sprintf("%d:0:0", shard)) + if err != nil { + t.Fatal("Error starting Mux:", err) + return + } + statusArray := []string{"U", "R", "F"} + time.Sleep(5 * time.Second) + + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + for { + status1 := rand.Intn(len(statusArray)) + status2 := rand.Intn(len(statusArray)) + var err error + var conn *sql.Conn + // cleanup and insert one row in the table + conn, err = db.Conn(ctx) + if err != nil { + t.Fatalf("Error getting connection %s\n", err.Error()) + } + tx, _ := conn.BeginTx(ctx, nil) + stmt, _ := tx.PrepareContext(ctx, "/*cmd*/delete from "+tableName) + _, err = stmt.Exec() + if err != nil { + t.Fatalf("Error preparing test (delete table) %s\n", err.Error()) + } + stmt, _ = tx.PrepareContext(ctx, "/*cmd*/insert into "+tableName+" (inst_id, status, status_time, module, machine) values (?,?,?,?,?)") + hostname, _ := os.Hostname() + // how to do inst_id + _, err = stmt.Exec(0 /*max instid*/, statusArray[status1], time.Now().Unix()+2, "hera-test", hostname) + _, err = stmt.Exec(0, statusArray[status2], time.Now().Unix()+2, "hera-test_taf", hostname) + if err != nil { + t.Fatalf("Error preparing test (create row in table) %s\n", err.Error()) + } + err = tx.Commit() + if err != nil { + t.Fatalf("Error commit %s\n", err.Error()) + } + conn.Close() + time.Sleep(1000 * time.Millisecond) + } + }() + if err != nil { + t.Fatal("Error starting Mux:", err) + return + } + db.SetMaxIdleConns(0) + defer db.Close() + + time.Sleep(45000 * time.Millisecond) + + if 0 == testutil.RegexCountFile("Rac maint activating, worker", "hera.log") { + t.Fatalf("requires rac maint activation for main module status") + } + + if 0 == testutil.RegexCountFile("module:HERA-TEST_TAF", "cal.log") { + t.Fatalf("Status 'U' should log the RACMAINT_INFO_CHANGE event") + } + if 0 != testutil.RegexCountFile("invalid_status", "cal.log") { + t.Fatalf("ram maint status 'U' should not skip with invalid-status event") + } + + if testutil.RegexCountFile("RAC_ID", "cal.log") < 20 { + t.Fatalf("ram maint should trigger for all workers once.") + } + + logger.GetLogger().Log(logger.Debug, "TestRacMaintWithRandomStatusChangeInAsync done -------------------------------------------------------------") +} From 850f18c4306d2e8343a7c7ae940e2819608db75f Mon Sep 17 00:00:00 2001 From: ishi-0987 <42084179+ishi-0987@users.noreply.github.com> Date: Thu, 26 Sep 2024 16:02:51 +0530 Subject: [PATCH 5/5] added more configurations to cal event (#400) --- lib/config.go | 101 +++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 83 insertions(+), 18 deletions(-) diff --git a/lib/config.go b/lib/config.go index 3e8afe59..c1677ba4 100644 --- a/lib/config.go +++ b/lib/config.go @@ -29,7 +29,12 @@ import ( "sync/atomic" ) -//The Config contains all the static configuration +const ( + mux_config_cal_name = "OCC_CONFIG" + oracle_worker_config_cal_name = "OCC_ORACLE_WORKER_CONFIG" +) + +// The Config contains all the static configuration type Config struct { CertChainFile string KeyFile string // leave blank for no SSL @@ -179,6 +184,14 @@ type Config struct { // Max desired percentage of healthy workers for the worker pool MaxDesiredHealthyWorkerPct int + + // Oracle Worker Configs + EnableCache bool + EnableHeartBeat bool + EnableQueryReplaceNL bool + EnableBindHashLogging bool + EnableSessionVariables bool + UseNonBlocking bool } // The OpsConfig contains the configuration that can be modified during run time @@ -230,7 +243,6 @@ func InitConfig() error { } else { currentDir = currentDir + "/" } - filename := currentDir + "hera.txt" cdb, err := config.NewTxtConfig(filename) @@ -308,6 +320,7 @@ func InitConfig() error { } gAppConfig.EnableSharding = cdb.GetOrDefaultBool("enable_sharding", false) + gAppConfig.UseShardMap = cdb.GetOrDefaultBool("use_shardmap", true) gAppConfig.NumOfShards = cdb.GetOrDefaultInt("num_shards", 1) if gAppConfig.EnableSharding == false || gAppConfig.UseShardMap == false { @@ -366,6 +379,14 @@ func InitConfig() error { // TODO: gAppConfig.NumStdbyDbs = 1 + // Fetch Oracle worker configurations.. The defaults must be same between oracle worker and here for accurate logging. + gAppConfig.EnableCache = cdb.GetOrDefaultBool("enable_cache", false) + gAppConfig.EnableHeartBeat = cdb.GetOrDefaultBool("enable_heart_beat", false) + gAppConfig.EnableQueryReplaceNL = cdb.GetOrDefaultBool("enable_query_replace_nl", true) + gAppConfig.EnableBindHashLogging = cdb.GetOrDefaultBool("enable_bind_hash_logging", false) + gAppConfig.EnableSessionVariables = cdb.GetOrDefaultBool("enable_session_variables", false) + gAppConfig.UseNonBlocking = cdb.GetOrDefaultBool("use_non_blocking", false) + var numWorkers int numWorkers = 6 //err = config.InitOpsConfigWithName("../opscfg/hera.txt") @@ -500,17 +521,15 @@ func LogOccConfigs() { "hostname_prefix": gAppConfig.HostnamePrefix, "sharding_cross_keys_err": gAppConfig.ShardingCrossKeysErr, //"enable_sql_rewrite", // not found anywhere? - "sharding_algo": gAppConfig.ShardingAlgoHash, - "cfg_from_tns_override_num_shards": gAppConfig.CfgFromTnsOverrideNumShards, + "sharding_algo": gAppConfig.ShardingAlgoHash, }, "TAF": { - "enable_taf": gAppConfig.EnableTAF, - "cfg_from_tns_override_taf": gAppConfig.CfgFromTnsOverrideTaf, - "testing_enable_dml_taf": gAppConfig.TestingEnableDMLTaf, - "taf_timeout_ms": gAppConfig.TAFTimeoutMs, - "taf_bin_duration": gAppConfig.TAFBinDuration, - "taf_allow_slow_every_x": gAppConfig.TAFAllowSlowEveryX, - "taf_normally_slow_count": gAppConfig.TAFNormallySlowCount, + "enable_taf": gAppConfig.EnableTAF, + "testing_enable_dml_taf": gAppConfig.TestingEnableDMLTaf, + "taf_timeout_ms": gAppConfig.TAFTimeoutMs, + "taf_bin_duration": gAppConfig.TAFBinDuration, + "taf_allow_slow_every_x": gAppConfig.TAFAllowSlowEveryX, + "taf_normally_slow_count": gAppConfig.TAFNormallySlowCount, }, "BIND-EVICTION": { "child.executable": gAppConfig.ChildExecutable, @@ -551,24 +570,42 @@ func LogOccConfigs() { "max_desire_healthy_worker_pct": gAppConfig.MaxDesiredHealthyWorkerPct, }, "R-W-SPLIT": { - "readonly_children_pct": gAppConfig.ReadonlyPct, - "cfg_from_tns_override_rw_split": gAppConfig.CfgFromTnsOverrideRWSplit, + "readonly_children_pct": gAppConfig.ReadonlyPct, }, "RAC": { "management_table_prefix": gAppConfig.ManagementTablePrefix, "rac_sql_interval": gAppConfig.RacMaintReloadInterval, "rac_restart_window": gAppConfig.RacRestartWindow, }, - "NO-CATEGORY": { + "GENERAL-CONFIGURATIONS": { "database_type": gAppConfig.DatabaseType, // Oracle = 0; MySQL=1; POSTGRES=2 - "cfg_from_tns": gAppConfig.CfgFromTns, "log_level": gOpsConfig.logLevel, "high_load_pct": gAppConfig.HighLoadPct, "init_limit_pct": gAppConfig.InitLimitPct, "num_standby_dbs": gAppConfig.NumStdbyDbs, }, + "ENABLE_CFG_FROM_TNS": { + "cfg_from_tns": gAppConfig.CfgFromTns, + "cfg_from_tns_override_num_shards": gAppConfig.CfgFromTnsOverrideNumShards, + "cfg_from_tns_override_taf": gAppConfig.CfgFromTnsOverrideTaf, + "cfg_from_tns_override_rw_split": gAppConfig.CfgFromTnsOverrideRWSplit, + }, + "STATEMENT-CACHE": { + "enable_cache": gAppConfig.EnableCache, + "enable_heart_beat": gAppConfig.EnableHeartBeat, + "enable_query_replace_nl": gAppConfig.EnableQueryReplaceNL, + }, + "SESSION-VARIABLES": { + "enable_session_variables": gAppConfig.EnableSessionVariables, + }, + "BIND-HASH-LOGGING": { + "enable_bind_hash_logging": gAppConfig.EnableBindHashLogging, + }, + "KEEP-ALIVE": { + "use_non_blocking": gAppConfig.UseNonBlocking, + }, } - + calName := mux_config_cal_name for feature, configs := range whiteListConfigs { switch feature { case "BACKLOG": @@ -595,17 +632,45 @@ func LogOccConfigs() { if gAppConfig.ReadonlyPct == 0 { continue } - case "SOFT-EVICTION", "BIND-EVICTION": + case "SATURATION-RECOVERY", "BIND-EVICTION": if GetSatRecoverThrottleRate() <= 0 { continue } + case "SOFT-EVICTION": + if GetSatRecoverThrottleRate() <= 0 && gAppConfig.SoftEvictionProbability <= 0 { + continue + } case "MANUAL-RATE-LIMITER": if !gAppConfig.EnableQueryBindBlocker { continue } + case "ENABLE_CFG_FROM_TNS": + if !gAppConfig.CfgFromTns { + continue + } + case "STATEMENT-CACHE": + if !gAppConfig.EnableCache { + continue + } + calName = oracle_worker_config_cal_name + case "SESSION-VARIABLES": + if !gAppConfig.EnableSessionVariables { + continue + } + calName = oracle_worker_config_cal_name + case "BIND-HASH-LOGGING": + if !gAppConfig.EnableBindHashLogging { + continue + } + calName = oracle_worker_config_cal_name + case "KEEP-ALIVE": + if !gAppConfig.UseNonBlocking { + continue + } + calName = oracle_worker_config_cal_name } - evt := cal.NewCalEvent("OCC_CONFIG", fmt.Sprintf(feature), cal.TransOK, "") + evt := cal.NewCalEvent(calName, fmt.Sprintf(feature), cal.TransOK, "") for cfg, val := range configs { s := fmt.Sprintf("%v", val) evt.AddDataStr(cfg, s)