diff --git a/docs/configuration.md b/docs/configuration.md index 637b9bc9..1fc46168 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -179,6 +179,10 @@ There are two types of configuration parameters: **static parameters** and **dyn + In case of TAF (Transparent Application Failover), it is the timeout in milliseconds to wait for a query to complete before it is canceled to be re-tried on the falback database + default: 200 +#### taf_children_pct ++ If TAF is enabled, this is the percentage of workers connecting to a fallback database. By default, the fallback pool size is same as the primary pool size. ++ default: 100 + #### readonly_children_pct + If R/W split is enabled this is the percentage of workers connecting to a read node. + default: 0 diff --git a/lib/config.go b/lib/config.go index f0283cfa..e1b67efb 100644 --- a/lib/config.go +++ b/lib/config.go @@ -41,6 +41,7 @@ type Config struct { NumStdbyDbs int InitialMaxChildren int ReadonlyPct int + TafChildrenPct int // // backlog // @@ -392,6 +393,7 @@ func InitConfig() error { } gAppConfig.ReadonlyPct = cdb.GetOrDefaultInt("readonly_children_pct", 0) + gAppConfig.TafChildrenPct = cdb.GetOrDefaultInt("taf_children_pct", 100) gAppConfig.InitialMaxChildren = numWorkers if gAppConfig.EnableWhitelistTest { if gAppConfig.NumWhitelistChildren < 2 { @@ -564,6 +566,8 @@ func (cfg *Config) NumWorkersCh() <-chan int { func (cfg *Config) GetBacklogLimit(wtype HeraWorkerType, shard int) int { if wtype == wtypeRO { return gAppConfig.BacklogPct * GetNumRWorkers(shard) / 100 + } else if wtype == wtypeStdBy { + return gAppConfig.BacklogPct * GetNumStdByWorkers(shard) / 100 } return gAppConfig.BacklogPct * GetNumWWorkers(shard) / 100 } @@ -638,6 +642,26 @@ func GetNumRWorkers(shard int) int { return num } +// GetNumStdByWorkers gets the number of workers for the "StdBy" pool +func GetNumStdByWorkers(shard int) int { + num := GetNumWWorkers(shard) + // TafChildrenPct should not be greater than 100. + if gAppConfig.TafChildrenPct > 100 { + return num + } + if gAppConfig.EnableTAF && gAppConfig.TafChildrenPct < 100 { + if gAppConfig.TafChildrenPct < 0 { + num = 1 + } else { + num = num * gAppConfig.TafChildrenPct / 100 + if num == 0 { + num = 1 + } + } + } + return num +} + // GetNumWWorkers gets the number of workers for the "Write" pool func GetNumWWorkers(shard int) int { numWhiteList := GetWhiteListChildCount(shard) diff --git a/lib/workerbroker.go b/lib/workerbroker.go index 62157ac3..0f9df171 100644 --- a/lib/workerbroker.go +++ b/lib/workerbroker.go @@ -140,7 +140,7 @@ func (broker *WorkerBroker) init() error { broker.poolCfgs[s][wtypeStdBy] = new(WorkerPoolCfg) if GetConfig().EnableTAF { - broker.poolCfgs[s][wtypeStdBy].maxWorkerCnt = GetNumWWorkers(s) + broker.poolCfgs[s][wtypeStdBy].maxWorkerCnt = GetNumStdByWorkers(s) broker.poolCfgs[s][wtypeStdBy].instCnt = 1 } else { broker.poolCfgs[s][wtypeStdBy].maxWorkerCnt = 0 @@ -386,6 +386,7 @@ func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, sha func (broker *WorkerBroker) changeMaxWorkers() { wW := GetNumWWorkers(0) rW := GetNumRWorkers(0) + sW := GetNumStdByWorkers(0) for i := 0; i < GetConfig().NumOfShards; i++ { broker.resizePool(wtypeRW, wW, i) @@ -395,7 +396,7 @@ func (broker *WorkerBroker) changeMaxWorkers() { // if TAF enabled, handle stdby as well if GetConfig().EnableTAF { - broker.resizePool(wtypeStdBy, wW, i) + broker.resizePool(wtypeStdBy, sW, i) } if GetConfig().EnableWhitelistTest { diff --git a/tests/unittest/adjustTafChildrenPct/main_test.go b/tests/unittest/adjustTafChildrenPct/main_test.go new file mode 100644 index 00000000..1d3239e8 --- /dev/null +++ b/tests/unittest/adjustTafChildrenPct/main_test.go @@ -0,0 +1,110 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "os" + "testing" + "time" + + _ "github.com/paypal/hera/client/gosqldriver/tcp" + "github.com/paypal/hera/tests/unittest/testutil" + "github.com/paypal/hera/utility/logger" +) + +var mx testutil.Mux +//var tableName string + +func cfg() (map[string]string, map[string]string, testutil.WorkerType) { + fmt.Println ("setup() begin") + appcfg := make(map[string]string) + // best to chose an "unique" port in case golang runs tests in paralel + appcfg["bind_port"] = "31002" + appcfg["log_level"] = "5" + appcfg["log_file"] = "hera.log" + appcfg["enable_taf"] = "true" + + appcfg["opscfg.default.server.max_connections"] = "10" + + opscfg := make(map[string]string) + opscfg["opscfg.default.server.log_level"] = "5" + + if os.Getenv("WORKER") == "postgres" { + return appcfg, opscfg, testutil.PostgresWorker + } + return appcfg, opscfg, testutil.MySQLWorker +} + +func TestMain(m *testing.M) { + os.Exit(testutil.UtilMain(m, cfg, nil)) +} + +/* +Should have the same size as the primary pool. +11/04/2024 14:54:52: hera.taf 0 10 0 0 0 0 0 0 1 0 0 +11/04/2024 14:54:53: hera 0 10 0 0 0 0 0 0 1 0 0 +11/04/2024 14:54:53: hera.taf 0 10 0 0 0 0 0 0 1 0 0 +11/04/2024 14:54:54: hera 0 10 0 0 0 0 0 0 1 0 0 +11/04/2024 14:54:54: hera.taf 0 10 0 0 0 0 0 0 1 0 0 +11/04/2024 14:54:55: hera 0 10 0 0 0 0 0 0 1 0 0 +11/04/2024 14:54:55: hera.taf 0 10 0 0 0 0 0 0 1 0 0 +11/04/2024 14:54:56: hera 0 5 0 0 0 0 0 0 1 0 0 +11/04/2024 14:54:56: hera.taf 0 5 0 0 0 0 0 0 1 0 0 +11/04/2024 14:54:57: hera 0 5 0 0 0 0 0 0 1 0 0 +11/04/2024 14:54:57: hera.taf 0 5 0 0 0 0 0 0 1 0 0 +11/04/2024 14:54:58: hera 0 5 0 0 0 0 0 0 1 0 0 +11/04/2024 14:54:58: hera.taf 0 5 0 0 0 0 0 0 1 0 0 +11/04/2024 14:54:59: hera 0 5 0 0 0 0 0 0 1 0 0 +*/ + +func TestAdjustTafChildrenPct(t *testing.T) { + + logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPct begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + + shard := 0 + db, err := sql.Open("heraloop", fmt.Sprintf("%d:0:0", shard)) + if err != nil { + t.Fatal("Error starting Mux:", err) + return + } + db.SetMaxIdleConns(0) + defer db.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + conn, err := db.Conn(ctx) + if err != nil { + t.Fatalf("Error getting connection %s\n", err.Error()) + } + defer conn.Close() + + rows, _ := conn.QueryContext(ctx, "SELECT version()") + + if !rows.Next() { + t.Fatalf("Expected 1 row") + } + rows.Close() + + acpt, err := testutil.StatelogGetField(2, "hera.taf") + if err != nil { + t.Fatalf("Error reading state log: %s\n", err.Error()) + } + + if acpt != 10 { + t.Fatalf("Expected TAF pool size: 10, Actual %d\n", acpt) + } + + fmt.Println ("We now change max connections at runtime"); + testutil.ModifyOpscfgParam (t, "hera.txt", "max_connections", "5") + //Wait for opsfcg change to take effect + time.Sleep(45 * time.Second) + + acpt, _ = testutil.StatelogGetField(2, "hera.taf") + + if acpt != 5 { + t.Fatalf("Expected TAF pool size: 5, Actual %d\n", acpt) + } + + logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPct done -------------------------------------------------------------") +} diff --git a/tests/unittest/adjustTafChildrenPctModified/main_test.go b/tests/unittest/adjustTafChildrenPctModified/main_test.go new file mode 100644 index 00000000..ee59da8d --- /dev/null +++ b/tests/unittest/adjustTafChildrenPctModified/main_test.go @@ -0,0 +1,111 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "os" + "testing" + "time" + + _ "github.com/paypal/hera/client/gosqldriver/tcp" + "github.com/paypal/hera/tests/unittest/testutil" + "github.com/paypal/hera/utility/logger" +) + +var mx testutil.Mux +//var tableName string + +func cfg() (map[string]string, map[string]string, testutil.WorkerType) { + fmt.Println ("setup() begin") + appcfg := make(map[string]string) + // best to chose an "unique" port in case golang runs tests in paralel + appcfg["bind_port"] = "31002" + appcfg["log_level"] = "5" + appcfg["log_file"] = "hera.log" + appcfg["enable_taf"] = "true" + appcfg["taf_children_pct"] = "20" + appcfg["opscfg.default.server.max_connections"] = "10" + + opscfg := make(map[string]string) + opscfg["opscfg.default.server.log_level"] = "5" + + if os.Getenv("WORKER") == "postgres" { + return appcfg, opscfg, testutil.PostgresWorker + } + return appcfg, opscfg, testutil.MySQLWorker +} + +func TestMain(m *testing.M) { + os.Exit(testutil.UtilMain(m, cfg, nil)) +} + +/* +TAF pool should be 1/5th of the primary pool +11/04/2024 14:59:03: hera.taf 0 2 0 0 0 0 0 0 1 0 0 +11/04/2024 14:59:04: hera 0 10 0 0 0 0 0 0 1 0 0 +11/04/2024 14:59:04: hera.taf 0 2 0 0 0 0 0 0 1 0 0 +11/04/2024 14:59:05: hera 0 10 0 0 0 0 0 0 1 0 0 +11/04/2024 14:59:05: hera.taf 0 2 0 0 0 0 0 0 1 0 0 +11/04/2024 14:59:06: hera 0 10 0 0 0 0 0 0 1 0 0 +11/04/2024 14:59:06: hera.taf 0 2 0 0 0 0 0 0 1 0 0 +11/04/2024 14:59:07: hera 0 10 0 0 0 0 0 0 1 0 0 +11/04/2024 14:59:07: hera.taf 0 2 0 0 0 0 0 0 1 0 0 +11/04/2024 14:59:08: hera 0 5 0 0 0 0 0 0 1 0 0 +11/04/2024 14:59:08: hera.taf 0 1 0 0 0 0 0 0 1 0 0 +11/04/2024 14:59:09: hera 0 5 0 0 0 0 0 0 1 0 0 +11/04/2024 14:59:09: hera.taf 0 1 0 0 0 0 0 0 1 0 0 +11/04/2024 14:59:10: hera 0 5 0 0 0 0 0 0 1 0 0 +11/04/2024 14:59:10: hera.taf 0 1 0 0 0 0 0 0 1 0 0 +*/ + +func TestAdjustTafChildrenPctModified(t *testing.T) { + + logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPctModified begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + + shard := 0 + db, err := sql.Open("heraloop", fmt.Sprintf("%d:0:0", shard)) + if err != nil { + t.Fatal("Error starting Mux:", err) + return + } + db.SetMaxIdleConns(0) + defer db.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + conn, err := db.Conn(ctx) + if err != nil { + t.Fatalf("Error getting connection %s\n", err.Error()) + } + defer conn.Close() + + rows, _ := conn.QueryContext(ctx, "SELECT version()") + + if !rows.Next() { + t.Fatalf("Expected 1 row") + } + rows.Close() + + acpt, err := testutil.StatelogGetField(2, "hera.taf") + if err != nil { + t.Fatalf("Error reading state log: %s\n", err.Error()) + } + + if acpt != 2 { + t.Fatalf("Expected TAF pool size: 2, Actual %d\n", acpt) + } + + fmt.Println ("We now change max connections at runtime"); + testutil.ModifyOpscfgParam (t, "hera.txt", "max_connections", "5") + //Wait for opsfcg change to take effect + time.Sleep(45 * time.Second) + + acpt, _ = testutil.StatelogGetField(2, "hera.taf") + + if acpt != 1 { + t.Fatalf("Expected TAF pool size: 1, Actual %d\n", acpt) + } + + logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPctModified done -------------------------------------------------------------") +} diff --git a/tests/unittest/adjustTafChildrenPctModifiedWithSharding/main_test.go b/tests/unittest/adjustTafChildrenPctModifiedWithSharding/main_test.go new file mode 100644 index 00000000..b91294c5 --- /dev/null +++ b/tests/unittest/adjustTafChildrenPctModifiedWithSharding/main_test.go @@ -0,0 +1,112 @@ +package main + +import ( + "fmt" + "os" + "testing" + "time" + + _ "github.com/paypal/hera/client/gosqldriver/tcp" + "github.com/paypal/hera/tests/unittest/testutil" + "github.com/paypal/hera/utility/logger" +) + +var mx testutil.Mux +//var tableName string + +func cfg() (map[string]string, map[string]string, testutil.WorkerType) { + fmt.Println ("setup() begin") + appcfg := make(map[string]string) + // best to chose an "unique" port in case golang runs tests in paralel + appcfg["bind_port"] = "31002" + appcfg["log_level"] = "5" + appcfg["log_file"] = "hera.log" + appcfg["enable_taf"] = "true" + appcfg["taf_children_pct"] = "20" + appcfg["enable_sharding"] = "true" + appcfg["shard_key_name"] = "email_addr" + appcfg["shard_key_value_type_is_string"] = "true" + appcfg["num_shards"] = "2" + appcfg["sharding_cfg_reload_interval"] = "0" + + appcfg["opscfg.default.server.max_connections"] = "20" + + opscfg := make(map[string]string) + opscfg["opscfg.default.server.log_level"] = "5" + + if os.Getenv("WORKER") == "postgres" { + return appcfg, opscfg, testutil.PostgresWorker + } + return appcfg, opscfg, testutil.MySQLWorker +} + +func setupShardMap() { + testutil.RunDML("DROP TABLE IF EXISTS test_str_sk") + testutil.RunDML("create table test_str_sk (email_addr varchar(64), note varchar(64))") + testutil.RunDML("DROP TABLE IF EXISTS hera_shard_map") + testutil.RunDML("create table hera_shard_map ( scuttle_id smallint not null, shard_id smallint not null, status char(1) , read_status char(1), write_status char(1), remarks varchar(500))") + for i := 0; i < 1024; i++ { + testutil.RunDML(fmt.Sprintf("insert into hera_shard_map ( scuttle_id, shard_id, status, read_status, write_status ) values ( %d, 0, 'Y', 'Y', 'Y' )", i) ) + } +} + +func TestMain(m *testing.M) { + os.Exit(testutil.UtilMain(m, cfg, nil)) +} + +/* +11/04/2024 16:42:44: hera.sh1 0 20 0 0 0 0 0 1 0 0 0 +11/04/2024 16:42:44: hera.taf.sh1 0 4 0 0 0 0 0 1 0 0 0 +11/04/2024 16:42:45: hera.sh0 0 19 0 1 0 0 0 1 0 0 0 +11/04/2024 16:42:45: hera.taf.sh0 0 4 0 0 0 0 0 1 0 0 0 +11/04/2024 16:42:45: hera.sh1 0 20 0 0 0 0 0 1 0 0 0 +11/04/2024 16:42:45: hera.taf.sh1 0 4 0 0 0 0 0 1 0 0 0 +11/04/2024 16:42:46: hera.sh0 0 5 0 0 0 0 0 0 0 0 0 +11/04/2024 16:42:46: hera.taf.sh0 0 1 0 0 0 0 0 0 0 0 0 +11/04/2024 16:42:46: hera.sh1 0 5 0 0 0 0 0 0 0 0 0 +11/04/2024 16:42:46: hera.taf.sh1 0 1 0 0 0 0 0 0 0 0 0 +11/04/2024 16:42:47: hera.sh0 0 5 0 0 0 0 0 0 0 0 0 +11/04/2024 16:42:47: hera.taf.sh0 0 1 0 0 0 0 0 0 0 0 0 +11/04/2024 16:42:47: hera.sh1 0 5 0 0 0 0 0 0 0 0 0 +11/04/2024 16:42:47: hera.taf.sh1 0 1 0 0 0 0 0 0 0 0 0 +*/ + +func TestAdjustTafChildrenPctWithSharding(t *testing.T) { + setupShardMap() + logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPctWithSharding begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + + acpt, err := testutil.StatelogGetField(2, "hera.taf.sh0") + if err != nil { + t.Fatalf("Error reading state log: %s\n", err.Error()) + } + + if acpt != 4 { + t.Fatalf("Expected TAF sh0 pool size: 4, Actual %d\n", acpt) + } + + acpt, _ = testutil.StatelogGetField(2, "hera.taf.sh1") + + if acpt != 4 { + t.Fatalf("Expected TAF sh1 pool size: 4, Actual %d\n", acpt) + } + + fmt.Println ("We now change max connections at runtime"); + testutil.ModifyOpscfgParam (t, "hera.txt", "max_connections", "5") + //Wait for opsfcg change to take effect + time.Sleep(45 * time.Second) + + acpt, _ = testutil.StatelogGetField(2, "hera.taf.sh0") + + if acpt != 1 { + t.Fatalf("Expected TAF sh0 pool size: 1, Actual %d\n", acpt) + } + + acpt, _ = testutil.StatelogGetField(2, "hera.taf.sh1") + + if acpt != 1 { + t.Fatalf("Expected TAF sh1 pool size: 1, Actual %d\n", acpt) + } + + + logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPctWithSharding done -------------------------------------------------------------") +} \ No newline at end of file diff --git a/tests/unittest/testutil/setup.go b/tests/unittest/testutil/setup.go index 795c78d0..6c365e03 100644 --- a/tests/unittest/testutil/setup.go +++ b/tests/unittest/testutil/setup.go @@ -360,9 +360,15 @@ func (m *mux) StartServer() error { os.Setenv("username", "herausertest") os.Setenv("password", "Hera-User-Test-9") os.Setenv("TWO_TASK", "tcp(127.0.0.1:2121)/heratestdb") + os.Setenv("TWO_TASK_STANDBY0", "tcp(127.0.0.1:2121)/heratestdb") + os.Setenv("TWO_TASK_STANDBY0_0", "tcp(127.0.0.1:2121)/heratestdb") + os.Setenv("TWO_TASK_STANDBY0_1", "tcp(127.0.0.1:2121)/heratestdb") } else if xMysql == "auto" { ip := MakeDB("mysql22", "heratestdb", MySQL) os.Setenv("TWO_TASK", "tcp("+ip+":3306)/heratestdb") + os.Setenv("TWO_TASK_STANDBY0", "tcp("+ip+":3306)/heratestdb") + os.Setenv("TWO_TASK_STANDBY0_0", "tcp("+ip+":3306)/heratestdb") + os.Setenv("TWO_TASK_STANDBY0_1", "tcp("+ip+":3306)/heratestdb") os.Setenv("TWO_TASK_1", "tcp("+ip+":3306)/heratestdb") os.Setenv("TWO_TASK_2", "tcp("+ip+":3306)/heratestdb") os.Setenv("MYSQL_IP", ip) diff --git a/tests/unittest/testutil/util.go b/tests/unittest/testutil/util.go index 979ee4cb..51a72b2a 100644 --- a/tests/unittest/testutil/util.go +++ b/tests/unittest/testutil/util.go @@ -7,10 +7,12 @@ import ( "database/sql" "errors" "fmt" + "io/ioutil" "os" "os/exec" "regexp" "strings" + "testing" "time" "github.com/paypal/hera/utility/logger" @@ -20,12 +22,17 @@ var ( INCOMPLETE = errors.New("Incomplete row") ) -func StatelogGetField(pos int) (int, error) { - out, err := exec.Command("/bin/bash", "-c", "/usr/bin/tail -n 1 state.log").Output() +func StatelogGetField(pos int, pattern ...string) (int, error) { + cmd := "/usr/bin/tail -n 1 state.log" + if len(pattern) > 0 { + cmd = fmt.Sprintf("/usr/bin/tac state.log | grep -m 1 -w '%s'", pattern[0]) + fmt.Println("cmd:", cmd) + } + out, err := exec.Command("/bin/bash", "-c", cmd).Output() if err != nil { return -1, err } - if len(out) != 99 { + if len(out) < 99 { return -1, INCOMPLETE } c := 27 + 6*pos @@ -216,3 +223,24 @@ func ClearLogsData() { } defer calLogFile.Close() } + +func ModifyOpscfgParam (t *testing.T, logfile string, opscfg_param string, opscfg_value string) { + //Read file + data, err := ioutil.ReadFile(runFolder + "/" + logfile) + if err != nil { + t.Fatal(err) + } + lines := strings.Split(string(data), "\n") + //Modify the opcfg value + for i, line := range lines { + if strings.Contains(line, opscfg_param) { + lines[i] = "opscfg.default.server." + opscfg_param + "=" + opscfg_value + } + } + output := strings.Join(lines, "\n") + // write to file + err = ioutil.WriteFile(runFolder + "/" + logfile, []byte(output), 0644) + if err != nil { + t.Fatal(err) + } +}