Skip to content

Commit

Permalink
expose a config to adjust the StdBy pool size (if required) (#387)
Browse files Browse the repository at this point in the history
* expose a config to adjust the StdBy pool (if required)

* added tests

* test fixes
  • Loading branch information
venkatsridhar95 authored Nov 6, 2024
1 parent 850f18c commit 1124f4b
Show file tree
Hide file tree
Showing 8 changed files with 401 additions and 5 deletions.
4 changes: 4 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ There are two types of configuration parameters: **static parameters** and **dyn
+ In case of TAF (Transparent Application Failover), it is the timeout in milliseconds to wait for a query to complete before it is canceled to be re-tried on the falback database
+ default: 200

#### taf_children_pct
+ If TAF is enabled, this is the percentage of workers connecting to a fallback database. By default, the fallback pool size is same as the primary pool size.
+ default: 100

#### readonly_children_pct
+ If R/W split is enabled this is the percentage of workers connecting to a read node.
+ default: 0
Expand Down
24 changes: 24 additions & 0 deletions lib/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Config struct {
NumStdbyDbs int
InitialMaxChildren int
ReadonlyPct int
TafChildrenPct int
//
// backlog
//
Expand Down Expand Up @@ -416,6 +417,7 @@ func InitConfig() error {
}

gAppConfig.ReadonlyPct = cdb.GetOrDefaultInt("readonly_children_pct", 0)
gAppConfig.TafChildrenPct = cdb.GetOrDefaultInt("taf_children_pct", 100)
gAppConfig.InitialMaxChildren = numWorkers
if gAppConfig.EnableWhitelistTest {
if gAppConfig.NumWhitelistChildren < 2 {
Expand Down Expand Up @@ -778,6 +780,8 @@ func (cfg *Config) NumWorkersCh() <-chan int {
func (cfg *Config) GetBacklogLimit(wtype HeraWorkerType, shard int) int {
if wtype == wtypeRO {
return gAppConfig.BacklogPct * GetNumRWorkers(shard) / 100
} else if wtype == wtypeStdBy {
return gAppConfig.BacklogPct * GetNumStdByWorkers(shard) / 100
}
return gAppConfig.BacklogPct * GetNumWWorkers(shard) / 100
}
Expand Down Expand Up @@ -852,6 +856,26 @@ func GetNumRWorkers(shard int) int {
return num
}

// GetNumStdByWorkers gets the number of workers for the "StdBy" pool
func GetNumStdByWorkers(shard int) int {
num := GetNumWWorkers(shard)
// TafChildrenPct should not be greater than 100.
if gAppConfig.TafChildrenPct > 100 {
return num
}
if gAppConfig.EnableTAF && gAppConfig.TafChildrenPct < 100 {
if gAppConfig.TafChildrenPct < 0 {
num = 1
} else {
num = num * gAppConfig.TafChildrenPct / 100
if num == 0 {
num = 1
}
}
}
return num
}

// GetNumWWorkers gets the number of workers for the "Write" pool
func GetNumWWorkers(shard int) int {
numWhiteList := GetWhiteListChildCount(shard)
Expand Down
5 changes: 3 additions & 2 deletions lib/workerbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (broker *WorkerBroker) init() error {

broker.poolCfgs[s][wtypeStdBy] = new(WorkerPoolCfg)
if GetConfig().EnableTAF {
broker.poolCfgs[s][wtypeStdBy].maxWorkerCnt = GetNumWWorkers(s)
broker.poolCfgs[s][wtypeStdBy].maxWorkerCnt = GetNumStdByWorkers(s)
broker.poolCfgs[s][wtypeStdBy].instCnt = 1
} else {
broker.poolCfgs[s][wtypeStdBy].maxWorkerCnt = 0
Expand Down Expand Up @@ -386,6 +386,7 @@ func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, sha
func (broker *WorkerBroker) changeMaxWorkers() {
wW := GetNumWWorkers(0)
rW := GetNumRWorkers(0)
sW := GetNumStdByWorkers(0)

for i := 0; i < GetConfig().NumOfShards; i++ {
broker.resizePool(wtypeRW, wW, i)
Expand All @@ -395,7 +396,7 @@ func (broker *WorkerBroker) changeMaxWorkers() {

// if TAF enabled, handle stdby as well
if GetConfig().EnableTAF {
broker.resizePool(wtypeStdBy, wW, i)
broker.resizePool(wtypeStdBy, sW, i)
}

if GetConfig().EnableWhitelistTest {
Expand Down
110 changes: 110 additions & 0 deletions tests/unittest/adjustTafChildrenPct/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package main

import (
"context"
"database/sql"
"fmt"
"os"
"testing"
"time"

_ "github.com/paypal/hera/client/gosqldriver/tcp"
"github.com/paypal/hera/tests/unittest/testutil"
"github.com/paypal/hera/utility/logger"
)

var mx testutil.Mux
//var tableName string

func cfg() (map[string]string, map[string]string, testutil.WorkerType) {
fmt.Println ("setup() begin")
appcfg := make(map[string]string)
// best to chose an "unique" port in case golang runs tests in paralel
appcfg["bind_port"] = "31002"
appcfg["log_level"] = "5"
appcfg["log_file"] = "hera.log"
appcfg["enable_taf"] = "true"

appcfg["opscfg.default.server.max_connections"] = "10"

opscfg := make(map[string]string)
opscfg["opscfg.default.server.log_level"] = "5"

if os.Getenv("WORKER") == "postgres" {
return appcfg, opscfg, testutil.PostgresWorker
}
return appcfg, opscfg, testutil.MySQLWorker
}

func TestMain(m *testing.M) {
os.Exit(testutil.UtilMain(m, cfg, nil))
}

/*
Should have the same size as the primary pool.
11/04/2024 14:54:52: hera.taf 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:53: hera 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:53: hera.taf 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:54: hera 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:54: hera.taf 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:55: hera 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:55: hera.taf 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:56: hera 0 5 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:56: hera.taf 0 5 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:57: hera 0 5 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:57: hera.taf 0 5 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:58: hera 0 5 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:58: hera.taf 0 5 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:59: hera 0 5 0 0 0 0 0 0 1 0 0
*/

func TestAdjustTafChildrenPct(t *testing.T) {

logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPct begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")

shard := 0
db, err := sql.Open("heraloop", fmt.Sprintf("%d:0:0", shard))
if err != nil {
t.Fatal("Error starting Mux:", err)
return
}
db.SetMaxIdleConns(0)
defer db.Close()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
conn, err := db.Conn(ctx)
if err != nil {
t.Fatalf("Error getting connection %s\n", err.Error())
}
defer conn.Close()

rows, _ := conn.QueryContext(ctx, "SELECT version()")

if !rows.Next() {
t.Fatalf("Expected 1 row")
}
rows.Close()

acpt, err := testutil.StatelogGetField(2, "hera.taf")
if err != nil {
t.Fatalf("Error reading state log: %s\n", err.Error())
}

if acpt != 10 {
t.Fatalf("Expected TAF pool size: 10, Actual %d\n", acpt)
}

fmt.Println ("We now change max connections at runtime");
testutil.ModifyOpscfgParam (t, "hera.txt", "max_connections", "5")
//Wait for opsfcg change to take effect
time.Sleep(45 * time.Second)

acpt, _ = testutil.StatelogGetField(2, "hera.taf")

if acpt != 5 {
t.Fatalf("Expected TAF pool size: 5, Actual %d\n", acpt)
}

logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPct done -------------------------------------------------------------")
}
111 changes: 111 additions & 0 deletions tests/unittest/adjustTafChildrenPctModified/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package main

import (
"context"
"database/sql"
"fmt"
"os"
"testing"
"time"

_ "github.com/paypal/hera/client/gosqldriver/tcp"
"github.com/paypal/hera/tests/unittest/testutil"
"github.com/paypal/hera/utility/logger"
)

var mx testutil.Mux
//var tableName string

func cfg() (map[string]string, map[string]string, testutil.WorkerType) {
fmt.Println ("setup() begin")
appcfg := make(map[string]string)
// best to chose an "unique" port in case golang runs tests in paralel
appcfg["bind_port"] = "31002"
appcfg["log_level"] = "5"
appcfg["log_file"] = "hera.log"
appcfg["enable_taf"] = "true"
appcfg["taf_children_pct"] = "20"
appcfg["opscfg.default.server.max_connections"] = "10"

opscfg := make(map[string]string)
opscfg["opscfg.default.server.log_level"] = "5"

if os.Getenv("WORKER") == "postgres" {
return appcfg, opscfg, testutil.PostgresWorker
}
return appcfg, opscfg, testutil.MySQLWorker
}

func TestMain(m *testing.M) {
os.Exit(testutil.UtilMain(m, cfg, nil))
}

/*
TAF pool should be 1/5th of the primary pool
11/04/2024 14:59:03: hera.taf 0 2 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:04: hera 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:04: hera.taf 0 2 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:05: hera 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:05: hera.taf 0 2 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:06: hera 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:06: hera.taf 0 2 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:07: hera 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:07: hera.taf 0 2 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:08: hera 0 5 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:08: hera.taf 0 1 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:09: hera 0 5 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:09: hera.taf 0 1 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:10: hera 0 5 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:10: hera.taf 0 1 0 0 0 0 0 0 1 0 0
*/

func TestAdjustTafChildrenPctModified(t *testing.T) {

logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPctModified begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")

shard := 0
db, err := sql.Open("heraloop", fmt.Sprintf("%d:0:0", shard))
if err != nil {
t.Fatal("Error starting Mux:", err)
return
}
db.SetMaxIdleConns(0)
defer db.Close()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
conn, err := db.Conn(ctx)
if err != nil {
t.Fatalf("Error getting connection %s\n", err.Error())
}
defer conn.Close()

rows, _ := conn.QueryContext(ctx, "SELECT version()")

if !rows.Next() {
t.Fatalf("Expected 1 row")
}
rows.Close()

acpt, err := testutil.StatelogGetField(2, "hera.taf")
if err != nil {
t.Fatalf("Error reading state log: %s\n", err.Error())
}

if acpt != 2 {
t.Fatalf("Expected TAF pool size: 2, Actual %d\n", acpt)
}

fmt.Println ("We now change max connections at runtime");
testutil.ModifyOpscfgParam (t, "hera.txt", "max_connections", "5")
//Wait for opsfcg change to take effect
time.Sleep(45 * time.Second)

acpt, _ = testutil.StatelogGetField(2, "hera.taf")

if acpt != 1 {
t.Fatalf("Expected TAF pool size: 1, Actual %d\n", acpt)
}

logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPctModified done -------------------------------------------------------------")
}
Loading

0 comments on commit 1124f4b

Please sign in to comment.