Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose a config to adjust the StdBy pool size (if required) #387

Merged
merged 3 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ There are two types of configuration parameters: **static parameters** and **dyn
+ In case of TAF (Transparent Application Failover), it is the timeout in milliseconds to wait for a query to complete before it is canceled to be re-tried on the falback database
+ default: 200

#### taf_children_pct
+ If TAF is enabled, this is the percentage of workers connecting to a fallback database. By default, the fallback pool size is same as the primary pool size.
+ default: 100

#### readonly_children_pct
+ If R/W split is enabled this is the percentage of workers connecting to a read node.
+ default: 0
Expand Down
24 changes: 24 additions & 0 deletions lib/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Config struct {
NumStdbyDbs int
InitialMaxChildren int
ReadonlyPct int
TafChildrenPct int
//
// backlog
//
Expand Down Expand Up @@ -392,6 +393,7 @@ func InitConfig() error {
}

gAppConfig.ReadonlyPct = cdb.GetOrDefaultInt("readonly_children_pct", 0)
gAppConfig.TafChildrenPct = cdb.GetOrDefaultInt("taf_children_pct", 100)
gAppConfig.InitialMaxChildren = numWorkers
if gAppConfig.EnableWhitelistTest {
if gAppConfig.NumWhitelistChildren < 2 {
Expand Down Expand Up @@ -564,6 +566,8 @@ func (cfg *Config) NumWorkersCh() <-chan int {
func (cfg *Config) GetBacklogLimit(wtype HeraWorkerType, shard int) int {
if wtype == wtypeRO {
return gAppConfig.BacklogPct * GetNumRWorkers(shard) / 100
} else if wtype == wtypeStdBy {
return gAppConfig.BacklogPct * GetNumStdByWorkers(shard) / 100
}
return gAppConfig.BacklogPct * GetNumWWorkers(shard) / 100
}
Expand Down Expand Up @@ -638,6 +642,26 @@ func GetNumRWorkers(shard int) int {
return num
}

// GetNumStdByWorkers gets the number of workers for the "StdBy" pool
func GetNumStdByWorkers(shard int) int {
num := GetNumWWorkers(shard)
// TafChildrenPct should not be greater than 100.
if gAppConfig.TafChildrenPct > 100 {
return num
}
if gAppConfig.EnableTAF && gAppConfig.TafChildrenPct < 100 {
if gAppConfig.TafChildrenPct < 0 {
num = 1
} else {
num = num * gAppConfig.TafChildrenPct / 100
if num == 0 {
num = 1
}
}
}
return num
}

// GetNumWWorkers gets the number of workers for the "Write" pool
func GetNumWWorkers(shard int) int {
numWhiteList := GetWhiteListChildCount(shard)
Expand Down
5 changes: 3 additions & 2 deletions lib/workerbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (broker *WorkerBroker) init() error {

broker.poolCfgs[s][wtypeStdBy] = new(WorkerPoolCfg)
if GetConfig().EnableTAF {
broker.poolCfgs[s][wtypeStdBy].maxWorkerCnt = GetNumWWorkers(s)
broker.poolCfgs[s][wtypeStdBy].maxWorkerCnt = GetNumStdByWorkers(s)
broker.poolCfgs[s][wtypeStdBy].instCnt = 1
} else {
broker.poolCfgs[s][wtypeStdBy].maxWorkerCnt = 0
Expand Down Expand Up @@ -386,6 +386,7 @@ func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, sha
func (broker *WorkerBroker) changeMaxWorkers() {
wW := GetNumWWorkers(0)
rW := GetNumRWorkers(0)
sW := GetNumStdByWorkers(0)

for i := 0; i < GetConfig().NumOfShards; i++ {
broker.resizePool(wtypeRW, wW, i)
Expand All @@ -395,7 +396,7 @@ func (broker *WorkerBroker) changeMaxWorkers() {

// if TAF enabled, handle stdby as well
if GetConfig().EnableTAF {
broker.resizePool(wtypeStdBy, wW, i)
broker.resizePool(wtypeStdBy, sW, i)
}

if GetConfig().EnableWhitelistTest {
Expand Down
110 changes: 110 additions & 0 deletions tests/unittest/adjustTafChildrenPct/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package main

import (
"context"
"database/sql"
"fmt"
"os"
"testing"
"time"

_ "github.com/paypal/hera/client/gosqldriver/tcp"
"github.com/paypal/hera/tests/unittest/testutil"
"github.com/paypal/hera/utility/logger"
)

var mx testutil.Mux
//var tableName string

func cfg() (map[string]string, map[string]string, testutil.WorkerType) {
fmt.Println ("setup() begin")
appcfg := make(map[string]string)
// best to chose an "unique" port in case golang runs tests in paralel
appcfg["bind_port"] = "31002"
appcfg["log_level"] = "5"
appcfg["log_file"] = "hera.log"
appcfg["enable_taf"] = "true"

appcfg["opscfg.default.server.max_connections"] = "10"

opscfg := make(map[string]string)
opscfg["opscfg.default.server.log_level"] = "5"

if os.Getenv("WORKER") == "postgres" {
return appcfg, opscfg, testutil.PostgresWorker
}
return appcfg, opscfg, testutil.MySQLWorker
}

func TestMain(m *testing.M) {
os.Exit(testutil.UtilMain(m, cfg, nil))
}

/*
Should have the same size as the primary pool.
11/04/2024 14:54:52: hera.taf 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:53: hera 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:53: hera.taf 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:54: hera 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:54: hera.taf 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:55: hera 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:55: hera.taf 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:56: hera 0 5 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:56: hera.taf 0 5 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:57: hera 0 5 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:57: hera.taf 0 5 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:58: hera 0 5 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:58: hera.taf 0 5 0 0 0 0 0 0 1 0 0
11/04/2024 14:54:59: hera 0 5 0 0 0 0 0 0 1 0 0
*/

func TestAdjustTafChildrenPct(t *testing.T) {

logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPct begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")

shard := 0
db, err := sql.Open("heraloop", fmt.Sprintf("%d:0:0", shard))
if err != nil {
t.Fatal("Error starting Mux:", err)
return
}
db.SetMaxIdleConns(0)
defer db.Close()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
conn, err := db.Conn(ctx)
if err != nil {
t.Fatalf("Error getting connection %s\n", err.Error())
}
defer conn.Close()

rows, _ := conn.QueryContext(ctx, "SELECT version()")

if !rows.Next() {
t.Fatalf("Expected 1 row")
}
rows.Close()

acpt, err := testutil.StatelogGetField(2, "hera.taf")
if err != nil {
t.Fatalf("Error reading state log: %s\n", err.Error())
}

if acpt != 10 {
t.Fatalf("Expected TAF pool size: 10, Actual %d\n", acpt)
}

fmt.Println ("We now change max connections at runtime");
testutil.ModifyOpscfgParam (t, "hera.txt", "max_connections", "5")
//Wait for opsfcg change to take effect
time.Sleep(45 * time.Second)

acpt, _ = testutil.StatelogGetField(2, "hera.taf")

if acpt != 5 {
t.Fatalf("Expected TAF pool size: 5, Actual %d\n", acpt)
}

logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPct done -------------------------------------------------------------")
}
111 changes: 111 additions & 0 deletions tests/unittest/adjustTafChildrenPctModified/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package main

import (
"context"
"database/sql"
"fmt"
"os"
"testing"
"time"

_ "github.com/paypal/hera/client/gosqldriver/tcp"
"github.com/paypal/hera/tests/unittest/testutil"
"github.com/paypal/hera/utility/logger"
)

var mx testutil.Mux
//var tableName string

func cfg() (map[string]string, map[string]string, testutil.WorkerType) {
fmt.Println ("setup() begin")
appcfg := make(map[string]string)
// best to chose an "unique" port in case golang runs tests in paralel
appcfg["bind_port"] = "31002"
appcfg["log_level"] = "5"
appcfg["log_file"] = "hera.log"
appcfg["enable_taf"] = "true"
appcfg["taf_children_pct"] = "20"
appcfg["opscfg.default.server.max_connections"] = "10"

opscfg := make(map[string]string)
opscfg["opscfg.default.server.log_level"] = "5"

if os.Getenv("WORKER") == "postgres" {
return appcfg, opscfg, testutil.PostgresWorker
}
return appcfg, opscfg, testutil.MySQLWorker
}

func TestMain(m *testing.M) {
os.Exit(testutil.UtilMain(m, cfg, nil))
}

/*
TAF pool should be 1/5th of the primary pool
11/04/2024 14:59:03: hera.taf 0 2 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:04: hera 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:04: hera.taf 0 2 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:05: hera 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:05: hera.taf 0 2 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:06: hera 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:06: hera.taf 0 2 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:07: hera 0 10 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:07: hera.taf 0 2 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:08: hera 0 5 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:08: hera.taf 0 1 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:09: hera 0 5 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:09: hera.taf 0 1 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:10: hera 0 5 0 0 0 0 0 0 1 0 0
11/04/2024 14:59:10: hera.taf 0 1 0 0 0 0 0 0 1 0 0
*/

func TestAdjustTafChildrenPctModified(t *testing.T) {

logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPctModified begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")

shard := 0
db, err := sql.Open("heraloop", fmt.Sprintf("%d:0:0", shard))
if err != nil {
t.Fatal("Error starting Mux:", err)
return
}
db.SetMaxIdleConns(0)
defer db.Close()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
conn, err := db.Conn(ctx)
if err != nil {
t.Fatalf("Error getting connection %s\n", err.Error())
}
defer conn.Close()

rows, _ := conn.QueryContext(ctx, "SELECT version()")

if !rows.Next() {
t.Fatalf("Expected 1 row")
}
rows.Close()

acpt, err := testutil.StatelogGetField(2, "hera.taf")
if err != nil {
t.Fatalf("Error reading state log: %s\n", err.Error())
}

if acpt != 2 {
t.Fatalf("Expected TAF pool size: 2, Actual %d\n", acpt)
}

fmt.Println ("We now change max connections at runtime");
testutil.ModifyOpscfgParam (t, "hera.txt", "max_connections", "5")
//Wait for opsfcg change to take effect
time.Sleep(45 * time.Second)

acpt, _ = testutil.StatelogGetField(2, "hera.taf")

if acpt != 1 {
t.Fatalf("Expected TAF pool size: 1, Actual %d\n", acpt)
}

logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPctModified done -------------------------------------------------------------")
}
Loading
Loading