From 39e5adeab15995be01447ac6a39871cb5cca475f Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Sun, 18 Jun 2023 06:54:33 +0200 Subject: [PATCH] txthrottler: verify config at vttablet startup, consolidate funcs (#13115) * txthrottler: verify config at vttablet startup, consolidate funcs Signed-off-by: Tim Vaillancourt * Use explicit dest in prototext.Unmarshal Signed-off-by: Tim Vaillancourt * Use for loop for TestVerifyTxThrottlerConfig Signed-off-by: Tim Vaillancourt * Cleanup test Signed-off-by: Tim Vaillancourt * Fix go vet complaint Signed-off-by: Tim Vaillancourt * Add back synonym flag Signed-off-by: Tim Vaillancourt * Update go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go Co-authored-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Signed-off-by: Tim Vaillancourt * Address staticcheck linter error Signed-off-by: Tim Vaillancourt --------- Signed-off-by: Tim Vaillancourt Co-authored-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/flags/endtoend/vttablet.txt | 4 +- go/flagutil/flagutil.go | 11 ++ .../vttablet/tabletserver/tabletenv/config.go | 61 ++++++-- .../tabletserver/tabletenv/config_test.go | 148 +++++++++++++++--- .../tabletserver/txthrottler/tx_throttler.go | 73 +++------ .../txthrottler/tx_throttler_test.go | 74 +++++---- 6 files changed, 242 insertions(+), 129 deletions(-) diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index d26e491be12..3fae6486d94 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -342,11 +342,11 @@ Usage of vttablet: --twopc_abandon_age float time in seconds. Any unresolved transaction older than this time will be sent to the coordinator to be resolved. --twopc_coordinator_address string address of the (VTGate) process(es) that will be used to notify of abandoned transactions. --twopc_enable if the flag is on, 2pc is enabled. Other 2pc flags must be supplied. - --tx-throttler-config string Synonym to -tx_throttler_config (default "target_replication_lag_sec: 2\nmax_replication_lag_sec: 10\ninitial_rate: 100\nmax_increase: 1\nemergency_decrease: 0.5\nmin_duration_between_increases_sec: 40\nmax_duration_between_increases_sec: 62\nmin_duration_between_decreases_sec: 20\nspread_backlog_across_sec: 20\nage_bad_rate_after_sec: 180\nbad_rate_increase: 0.1\nmax_rate_approach_threshold: 0.9\n") + --tx-throttler-config string Synonym to -tx_throttler_config (default "target_replication_lag_sec:2 max_replication_lag_sec:10 initial_rate:100 max_increase:1 emergency_decrease:0.5 min_duration_between_increases_sec:40 max_duration_between_increases_sec:62 min_duration_between_decreases_sec:20 spread_backlog_across_sec:20 age_bad_rate_after_sec:180 bad_rate_increase:0.1 max_rate_approach_threshold:0.9") --tx-throttler-default-priority int Default priority assigned to queries that lack priority information (default 100) --tx-throttler-healthcheck-cells strings Synonym to -tx_throttler_healthcheck_cells --tx-throttler-tablet-types strings A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly. (default replica) - --tx_throttler_config string The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message. (default "target_replication_lag_sec: 2\nmax_replication_lag_sec: 10\ninitial_rate: 100\nmax_increase: 1\nemergency_decrease: 0.5\nmin_duration_between_increases_sec: 40\nmax_duration_between_increases_sec: 62\nmin_duration_between_decreases_sec: 20\nspread_backlog_across_sec: 20\nage_bad_rate_after_sec: 180\nbad_rate_increase: 0.1\nmax_rate_approach_threshold: 0.9\n") + --tx_throttler_config string The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message. (default "target_replication_lag_sec:2 max_replication_lag_sec:10 initial_rate:100 max_increase:1 emergency_decrease:0.5 min_duration_between_increases_sec:40 max_duration_between_increases_sec:62 min_duration_between_decreases_sec:20 spread_backlog_across_sec:20 age_bad_rate_after_sec:180 bad_rate_increase:0.1 max_rate_approach_threshold:0.9") --tx_throttler_healthcheck_cells strings A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler. --unhealthy_threshold duration replication lag after which a replica is considered unhealthy (default 2h0m0s) --use_super_read_only Set super_read_only flag when performing planned failover. diff --git a/go/flagutil/flagutil.go b/go/flagutil/flagutil.go index d010ea0bc4f..c9c8973ace7 100644 --- a/go/flagutil/flagutil.go +++ b/go/flagutil/flagutil.go @@ -193,6 +193,17 @@ func DualFormatBoolVar(fs *pflag.FlagSet, p *bool, name string, value bool, usag } } +// DualFormatVar creates a flag which supports both dashes and underscores +func DualFormatVar(fs *pflag.FlagSet, val pflag.Value, name string, usage string) { + dashes := strings.Replace(name, "_", "-", -1) + underscores := strings.Replace(name, "-", "_", -1) + + fs.Var(val, underscores, usage) + if dashes != underscores { + fs.Var(val, dashes, fmt.Sprintf("Synonym to -%s", underscores)) + } +} + // DurationOrIntVar implements pflag.Value for flags that have historically been // of type IntVar (and then converted to seconds or some other unit) but are // now transitioning to a proper DurationVar type. diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index 0bd5c01cf18..ee42fd513bc 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -30,14 +30,16 @@ import ( "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/throttler" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" + + querypb "vitess.io/vitess/go/vt/proto/query" + throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // These constants represent values for various config parameters. @@ -88,6 +90,24 @@ var ( txLogHandler = "/debug/txlog" ) +type TxThrottlerConfigFlag struct { + *throttlerdatapb.Configuration +} + +func NewTxThrottlerConfigFlag() *TxThrottlerConfigFlag { + return &TxThrottlerConfigFlag{&throttlerdatapb.Configuration{}} +} + +func (t *TxThrottlerConfigFlag) Get() *throttlerdatapb.Configuration { + return t.Configuration +} + +func (t *TxThrottlerConfigFlag) Set(arg string) error { + return prototext.Unmarshal([]byte(arg), t.Configuration) +} + +func (t *TxThrottlerConfigFlag) Type() string { return "string" } + // RegisterTabletEnvFlags is a public API to register tabletenv flags for use by test cases that expect // some flags to be set with default values func RegisterTabletEnvFlags(fs *pflag.FlagSet) { @@ -143,7 +163,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) { SecondsVar(fs, ¤tConfig.TwoPCAbandonAge, "twopc_abandon_age", defaultConfig.TwoPCAbandonAge, "time in seconds. Any unresolved transaction older than this time will be sent to the coordinator to be resolved.") // Tx throttler config flagutil.DualFormatBoolVar(fs, ¤tConfig.EnableTxThrottler, "enable_tx_throttler", defaultConfig.EnableTxThrottler, "If true replication-lag-based throttling on transactions will be enabled.") - flagutil.DualFormatStringVar(fs, ¤tConfig.TxThrottlerConfig, "tx_throttler_config", defaultConfig.TxThrottlerConfig, "The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message.") + flagutil.DualFormatVar(fs, currentConfig.TxThrottlerConfig, "tx_throttler_config", "The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message.") flagutil.DualFormatStringListVar(fs, ¤tConfig.TxThrottlerHealthCheckCells, "tx_throttler_healthcheck_cells", defaultConfig.TxThrottlerHealthCheckCells, "A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.") fs.IntVar(¤tConfig.TxThrottlerDefaultPriority, "tx-throttler-default-priority", defaultConfig.TxThrottlerDefaultPriority, "Default priority assigned to queries that lack priority information") fs.Var(currentConfig.TxThrottlerTabletTypes, "tx-throttler-tablet-types", "A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly.") @@ -316,7 +336,7 @@ type TabletConfig struct { TwoPCAbandonAge Seconds `json:"-"` EnableTxThrottler bool `json:"-"` - TxThrottlerConfig string `json:"-"` + TxThrottlerConfig *TxThrottlerConfigFlag `json:"-"` TxThrottlerHealthCheckCells []string `json:"-"` TxThrottlerDefaultPriority int `json:"-"` TxThrottlerTabletTypes *topoproto.TabletTypeListFlag `json:"-"` @@ -470,9 +490,6 @@ func (c *TabletConfig) Verify() error { if v := c.HotRowProtection.MaxConcurrency; v <= 0 { return fmt.Errorf("-hot_row_protection_concurrent_transactions must be > 0 (specified value: %v)", v) } - if v := c.TxThrottlerDefaultPriority; v > sqlparser.MaxPriorityValue || v < 0 { - return fmt.Errorf("--tx-throttler-default-priority must be > 0 and < 100 (specified value: %d)", v) - } return nil } @@ -508,6 +525,22 @@ func (c *TabletConfig) verifyTransactionLimitConfig() error { // verifyTxThrottlerConfig checks the TxThrottler related config for sanity. func (c *TabletConfig) verifyTxThrottlerConfig() error { + if !c.EnableTxThrottler { + return nil + } + + err := throttler.MaxReplicationLagModuleConfig{Configuration: c.TxThrottlerConfig.Get()}.Verify() + if err != nil { + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse throttlerdatapb.Configuration config: %v", err) + } + + if len(c.TxThrottlerHealthCheckCells) == 0 { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "empty healthCheckCells given: %+v", c.TxThrottlerHealthCheckCells) + } + if v := c.TxThrottlerDefaultPriority; v > sqlparser.MaxPriorityValue || v < 0 { + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--tx-throttler-default-priority must be > 0 and < 100 (specified value: %d)", v) + } + if c.TxThrottlerTabletTypes == nil || len(*c.TxThrottlerTabletTypes) == 0 { return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "--tx-throttler-tablet-types must be defined when transaction throttler is enabled") } @@ -519,6 +552,7 @@ func (c *TabletConfig) verifyTxThrottlerConfig() error { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported tablet type %q", tabletType) } } + return nil } @@ -606,17 +640,16 @@ var defaultConfig = TabletConfig{ EnablePerWorkloadTableMetrics: false, } -// defaultTxThrottlerConfig formats the default throttlerdata.Configuration -// object in text format. It uses the object returned by -// throttler.DefaultMaxReplicationLagModuleConfig().Configuration and overrides some of its -// fields. It panics on error. -func defaultTxThrottlerConfig() string { +// defaultTxThrottlerConfig returns the default TxThrottlerConfigFlag object based on +// a throttler.DefaultMaxReplicationLagModuleConfig().Configuration and overrides some of +// its fields. It panics on error. +func defaultTxThrottlerConfig() *TxThrottlerConfigFlag { // Take throttler.DefaultMaxReplicationLagModuleConfig and override some fields. config := throttler.DefaultMaxReplicationLagModuleConfig().Configuration // TODO(erez): Make DefaultMaxReplicationLagModuleConfig() return a MaxReplicationLagSec of 10 // and remove this line. config.MaxReplicationLagSec = 10 - return prototext.Format(config) + return &TxThrottlerConfigFlag{config} } func defaultTransactionLimitConfig() TransactionLimitConfig { diff --git a/go/vt/vttablet/tabletserver/tabletenv/config_test.go b/go/vt/vttablet/tabletserver/tabletenv/config_test.go index 9a4cdaa95b5..79c9091d077 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config_test.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config_test.go @@ -26,11 +26,13 @@ import ( "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/dbconfigs" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/throttler" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/yaml2" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) func TestConfigParse(t *testing.T) { @@ -325,31 +327,133 @@ func TestFlags(t *testing.T) { assert.Equal(t, want, currentConfig) } -func TestVerifyTxThrottlerConfig(t *testing.T) { +func TestTxThrottlerConfigFlag(t *testing.T) { + f := NewTxThrottlerConfigFlag() + defaultMaxReplicationLagModuleConfig := throttler.DefaultMaxReplicationLagModuleConfig().Configuration + { - // default config (replica) - assert.Nil(t, currentConfig.verifyTxThrottlerConfig()) + assert.Nil(t, f.Set(defaultMaxReplicationLagModuleConfig.String())) + assert.Equal(t, defaultMaxReplicationLagModuleConfig.String(), f.String()) + assert.Equal(t, "string", f.Type()) } { - // replica + rdonly (allowed) - currentConfig.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{ - topodatapb.TabletType_REPLICA, - topodatapb.TabletType_RDONLY, - } - assert.Nil(t, currentConfig.verifyTxThrottlerConfig()) + defaultMaxReplicationLagModuleConfig.TargetReplicationLagSec = 5 + assert.Nil(t, f.Set(defaultMaxReplicationLagModuleConfig.String())) + assert.NotNil(t, f.Get()) + assert.Equal(t, int64(5), f.Get().TargetReplicationLagSec) } { - // no tablet types - currentConfig.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{} - err := currentConfig.verifyTxThrottlerConfig() - assert.NotNil(t, err) - assert.Equal(t, vtrpcpb.Code_FAILED_PRECONDITION, vterrors.Code(err)) + assert.NotNil(t, f.Set("should not parse")) } - { - // disallowed tablet type - currentConfig.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_DRAINED} - err := currentConfig.verifyTxThrottlerConfig() - assert.NotNil(t, err) - assert.Equal(t, vtrpcpb.Code_INVALID_ARGUMENT, vterrors.Code(err)) +} + +func TestVerifyTxThrottlerConfig(t *testing.T) { + defaultMaxReplicationLagModuleConfig := throttler.DefaultMaxReplicationLagModuleConfig().Configuration + invalidMaxReplicationLagModuleConfig := throttler.DefaultMaxReplicationLagModuleConfig().Configuration + invalidMaxReplicationLagModuleConfig.TargetReplicationLagSec = -1 + + type testConfig struct { + Name string + ExpectedErrorCode vtrpcpb.Code + // + EnableTxThrottler bool + TxThrottlerConfig *TxThrottlerConfigFlag + TxThrottlerHealthCheckCells []string + TxThrottlerTabletTypes *topoproto.TabletTypeListFlag + TxThrottlerDefaultPriority int + } + + tests := []testConfig{ + { + // default (disabled) + Name: "default", + EnableTxThrottler: false, + }, + { + // enabled with invalid throttler config + Name: "enabled invalid config", + ExpectedErrorCode: vtrpcpb.Code_INVALID_ARGUMENT, + EnableTxThrottler: true, + TxThrottlerConfig: &TxThrottlerConfigFlag{invalidMaxReplicationLagModuleConfig}, + }, + { + // enabled without cells defined + Name: "enabled without cells", + ExpectedErrorCode: vtrpcpb.Code_FAILED_PRECONDITION, + EnableTxThrottler: true, + TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}, + }, + { + // enabled with good config (default/replica tablet type) + Name: "enabled", + EnableTxThrottler: true, + TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}, + TxThrottlerHealthCheckCells: []string{"cell1"}, + }, + { + // enabled + replica and rdonly tablet types + Name: "enabled plus rdonly", + EnableTxThrottler: true, + TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}, + TxThrottlerHealthCheckCells: []string{"cell1"}, + TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{ + topodatapb.TabletType_REPLICA, + topodatapb.TabletType_RDONLY, + }, + }, + { + // enabled without tablet types + Name: "enabled without tablet types", + ExpectedErrorCode: vtrpcpb.Code_FAILED_PRECONDITION, + EnableTxThrottler: true, + TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}, + TxThrottlerHealthCheckCells: []string{"cell1"}, + TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{}, + }, + { + // enabled + disallowed tablet type + Name: "enabled disallowed tablet type", + ExpectedErrorCode: vtrpcpb.Code_INVALID_ARGUMENT, + EnableTxThrottler: true, + TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}, + TxThrottlerHealthCheckCells: []string{"cell1"}, + TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{topodatapb.TabletType_DRAINED}, + }, + { + // enabled + disallowed priority + Name: "enabled disallowed priority", + ExpectedErrorCode: vtrpcpb.Code_INVALID_ARGUMENT, + EnableTxThrottler: true, + TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}, + TxThrottlerDefaultPriority: 12345, + TxThrottlerHealthCheckCells: []string{"cell1"}, + }, + } + + for _, test := range tests { + test := test + t.Run(test.Name, func(t *testing.T) { + t.Parallel() + + config := defaultConfig + config.EnableTxThrottler = test.EnableTxThrottler + if test.TxThrottlerConfig == nil { + test.TxThrottlerConfig = NewTxThrottlerConfigFlag() + } + config.TxThrottlerConfig = test.TxThrottlerConfig + config.TxThrottlerHealthCheckCells = test.TxThrottlerHealthCheckCells + config.TxThrottlerDefaultPriority = test.TxThrottlerDefaultPriority + if test.TxThrottlerTabletTypes != nil { + config.TxThrottlerTabletTypes = test.TxThrottlerTabletTypes + } + + err := config.verifyTxThrottlerConfig() + if test.ExpectedErrorCode == vtrpcpb.Code_OK { + assert.Nil(t, err) + } else { + assert.NotNil(t, err) + assert.Equal(t, test.ExpectedErrorCode, vterrors.Code(err)) + } + }) } } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index bc5235593ac..30e2ec19c56 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -18,13 +18,11 @@ package txthrottler import ( "context" - "fmt" "math/rand" "strings" "sync" "time" - "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" "vitess.io/vitess/go/stats" @@ -186,64 +184,35 @@ type txThrottlerState struct { // This function calls tryCreateTxThrottler that does the actual creation work // and returns an error if one occurred. func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler { - txThrottler, err := tryCreateTxThrottler(env, topoServer) - if err != nil { - log.Errorf("Error creating transaction throttler. Transaction throttling will"+ - " be disabled. Error: %v", err) - // newTxThrottler with disabled config never returns an error - txThrottler, _ = newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false}) - } else { - log.Infof("Initialized transaction throttler with config: %+v", txThrottler.config) - } - return txThrottler -} - -// InitDBConfig initializes the target parameters for the throttler. -func (t *txThrottler) InitDBConfig(target *querypb.Target) { - t.target = proto.Clone(target).(*querypb.Target) -} - -func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*txThrottler, error) { - if !env.Config().EnableTxThrottler { - return newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false}) - } + throttlerConfig := &txThrottlerConfig{enabled: false} + + if env.Config().EnableTxThrottler { + // Clone tsv.TxThrottlerHealthCheckCells so that we don't assume tsv.TxThrottlerHealthCheckCells + // is immutable. + healthCheckCells := env.Config().TxThrottlerHealthCheckCells + + throttlerConfig = &txThrottlerConfig{ + enabled: true, + tabletTypes: env.Config().TxThrottlerTabletTypes, + throttlerConfig: env.Config().TxThrottlerConfig.Get(), + healthCheckCells: healthCheckCells, + } - var throttlerConfig throttlerdatapb.Configuration - if err := prototext.Unmarshal([]byte(env.Config().TxThrottlerConfig), &throttlerConfig); err != nil { - return nil, err + defer log.Infof("Initialized transaction throttler with config: %+v", throttlerConfig) } - // Clone tsv.TxThrottlerHealthCheckCells so that we don't assume tsv.TxThrottlerHealthCheckCells - // is immutable. - healthCheckCells := make([]string, len(env.Config().TxThrottlerHealthCheckCells)) - copy(healthCheckCells, env.Config().TxThrottlerHealthCheckCells) - - return newTxThrottler(env, topoServer, &txThrottlerConfig{ - enabled: true, - tabletTypes: env.Config().TxThrottlerTabletTypes, - throttlerConfig: &throttlerConfig, - healthCheckCells: healthCheckCells, - }) -} - -func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrottlerConfig) (*txThrottler, error) { - if config.enabled { - // Verify config. - err := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}.Verify() - if err != nil { - return nil, err - } - if len(config.healthCheckCells) == 0 { - return nil, fmt.Errorf("empty healthCheckCells given. %+v", config) - } - } return &txThrottler{ - config: config, + config: throttlerConfig, topoServer: topoServer, throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"), requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"), requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"), - }, nil + } +} + +// InitDBConfig initializes the target parameters for the throttler. +func (t *txThrottler) InitDBConfig(target *querypb.Target) { + t.target = proto.Clone(target).(*querypb.Target) } // Open opens the transaction throttler. It must be called prior to 'Throttle'. diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 97138e3928c..ffb88bf21f6 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -36,7 +36,6 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" querypb "vitess.io/vitess/go/vt/proto/query" - throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -116,38 +115,39 @@ func TestEnabledThrottler(t *testing.T) { config.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA} env := tabletenv.NewEnv(config, t.Name()) - throttler, err := tryCreateTxThrottler(env, ts) - assert.Nil(t, err) + throttler := NewTxThrottler(env, ts) + throttlerImpl, _ := throttler.(*txThrottler) + assert.NotNil(t, throttlerImpl) throttler.InitDBConfig(&querypb.Target{ Keyspace: "keyspace", Shard: "shard", }) - assert.Nil(t, throttler.Open()) - assert.Equal(t, int64(1), throttler.throttlerRunning.Get()) + assert.Nil(t, throttlerImpl.Open()) + assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) - assert.False(t, throttler.Throttle(100)) - assert.Equal(t, int64(1), throttler.requestsTotal.Get()) - assert.Zero(t, throttler.requestsThrottled.Get()) + assert.False(t, throttlerImpl.Throttle(100)) + assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Get()) + assert.Zero(t, throttlerImpl.requestsThrottled.Get()) - throttler.state.StatsUpdate(tabletStats) // This calls replication lag thing + throttlerImpl.state.StatsUpdate(tabletStats) // This calls replication lag thing rdonlyTabletStats := &discovery.TabletHealth{ Target: &querypb.Target{ TabletType: topodatapb.TabletType_RDONLY, }, } - // This call should not be forwarded to the go/vt/throttler.Throttler object. - throttler.state.StatsUpdate(rdonlyTabletStats) + // This call should not be forwarded to the go/vt/throttlerImpl.Throttler object. + throttlerImpl.state.StatsUpdate(rdonlyTabletStats) // The second throttle call should reject. - assert.True(t, throttler.Throttle(100)) - assert.Equal(t, int64(2), throttler.requestsTotal.Get()) - assert.Equal(t, int64(1), throttler.requestsThrottled.Get()) + assert.True(t, throttlerImpl.Throttle(100)) + assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Get()) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get()) // This call should not throttle due to priority. Check that's the case and counters agree. - assert.False(t, throttler.Throttle(0)) - assert.Equal(t, int64(3), throttler.requestsTotal.Get()) - assert.Equal(t, int64(1), throttler.requestsThrottled.Get()) - throttler.Close() - assert.Zero(t, throttler.throttlerRunning.Get()) + assert.False(t, throttlerImpl.Throttle(0)) + assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Get()) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get()) + throttlerImpl.Close() + assert.Zero(t, throttlerImpl.throttlerRunning.Get()) } func TestNewTxThrottler(t *testing.T) { @@ -155,28 +155,24 @@ func TestNewTxThrottler(t *testing.T) { env := tabletenv.NewEnv(config, t.Name()) { - // disabled config - throttler, err := newTxThrottler(env, nil, &txThrottlerConfig{enabled: false}) - assert.Nil(t, err) - assert.NotNil(t, throttler) - } - { - // enabled with invalid throttler config - throttler, err := newTxThrottler(env, nil, &txThrottlerConfig{ - enabled: true, - throttlerConfig: &throttlerdatapb.Configuration{}, - }) - assert.NotNil(t, err) - assert.Nil(t, throttler) + // disabled + config.EnableTxThrottler = false + throttler := NewTxThrottler(env, nil) + throttlerImpl, _ := throttler.(*txThrottler) + assert.NotNil(t, throttlerImpl) + assert.NotNil(t, throttlerImpl.config) + assert.False(t, throttlerImpl.config.enabled) } { // enabled - throttler, err := newTxThrottler(env, nil, &txThrottlerConfig{ - enabled: true, - healthCheckCells: []string{"cell1"}, - throttlerConfig: throttler.DefaultMaxReplicationLagModuleConfig().Configuration, - }) - assert.Nil(t, err) - assert.NotNil(t, throttler) + config.EnableTxThrottler = true + config.TxThrottlerHealthCheckCells = []string{"cell1", "cell2"} + config.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA} + throttler := NewTxThrottler(env, nil) + throttlerImpl, _ := throttler.(*txThrottler) + assert.NotNil(t, throttlerImpl) + assert.NotNil(t, throttlerImpl.config) + assert.True(t, throttlerImpl.config.enabled) + assert.Equal(t, []string{"cell1", "cell2"}, throttlerImpl.config.healthCheckCells) } }