diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index a119b703d62..923c860d184 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -141,10 +141,7 @@ func fetchKnownCells(ctx context.Context, topoServer *topo.Server, target *query // be executing a method. The only exception is the 'Throttle' method where multiple goroutines are // allowed to execute it concurrently. type txThrottler struct { - // config stores the transaction throttler's configuration. - // It is populated in NewTxThrottler and is not modified - // since. - config *txThrottlerConfig + config *tabletenv.TabletConfig // state holds an open transaction throttler state. It is nil // if the TransactionThrottler is closed. @@ -162,30 +159,6 @@ type txThrottler struct { requestsThrottled *stats.CountersWithSingleLabel } -// txThrottlerConfig holds the parameters that need to be -// passed when constructing a TxThrottler object. -type txThrottlerConfig struct { - // enabled is true if the transaction throttler is enabled. All methods - // of a disabled transaction throttler do nothing and Throttle() always - // returns false. - enabled bool - - // if dryRun is true, the txThrottler will run only on monitoring mode, meaning that it will increase counters for - // total and actually throttled requests, but it will not actually return that a transaction should be throttled. - dryRun bool - - throttlerConfig *throttlerdatapb.Configuration - // healthCheckCells stores the cell names in which running vttablets will be monitored for - // replication lag. - healthCheckCells []string - - // tabletTypes stores the tablet types for throttling - tabletTypes map[topodatapb.TabletType]bool - - // rate to refresh topo for cells - topoRefreshInterval time.Duration -} - type txThrottlerState interface { deallocateResources() StatsUpdate(tabletStats *discovery.TabletHealth) @@ -194,7 +167,7 @@ type txThrottlerState interface { // txThrottlerStateImpl holds the state of an open TxThrottler object. type txThrottlerStateImpl struct { - config *txThrottlerConfig + config *tabletenv.TabletConfig txThrottler *txThrottler // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). @@ -208,49 +181,38 @@ type txThrottlerStateImpl struct { healthCheckChan chan *discovery.TabletHealth healthCheckCells []string cellsFromTopo bool + + // tabletTypes stores the tablet types for throttling + tabletTypes map[topodatapb.TabletType]bool } -// NewTxThrottler tries to construct a txThrottler from the -// relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if -// any error occurs. -// This function calls tryCreateTxThrottler that does the actual creation work -// and returns an error if one occurred. +// NewTxThrottler tries to construct a txThrottler from the relevant +// fields in the tabletenv.Env and topo.Server objects. func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler { - throttlerConfig := &txThrottlerConfig{enabled: false} - - if env.Config().EnableTxThrottler { - // Clone tsv.TxThrottlerHealthCheckCells so that we don't assume tsv.TxThrottlerHealthCheckCells - // is immutable. - healthCheckCells := env.Config().TxThrottlerHealthCheckCells - - tabletTypes := make(map[topodatapb.TabletType]bool, len(*env.Config().TxThrottlerTabletTypes)) - for _, tabletType := range *env.Config().TxThrottlerTabletTypes { - tabletTypes[tabletType] = true + config := env.Config() + if config.EnableTxThrottler { + if len(config.TxThrottlerHealthCheckCells) == 0 { + defer log.Infof("Initialized transaction throttler using tabletTypes: %+v, cellsFromTopo: true, topoRefreshInterval: %s, throttlerConfig: %q", + config.TxThrottlerTabletTypes, config.TxThrottlerTopoRefreshInterval, config.TxThrottlerConfig.Get(), + ) + } else { + defer log.Infof("Initialized transaction throttler using tabletTypes: %+v, healthCheckCells: %+v, throttlerConfig: %q", + config.TxThrottlerTabletTypes, config.TxThrottlerHealthCheckCells, config.TxThrottlerConfig.Get(), + ) } - - throttlerConfig = &txThrottlerConfig{ - enabled: true, - healthCheckCells: healthCheckCells, - dryRun: env.Config().TxThrottlerDryRun, - tabletTypes: tabletTypes, - throttlerConfig: env.Config().TxThrottlerConfig.Get(), - topoRefreshInterval: env.Config().TxThrottlerTopoRefreshInterval, - } - - defer log.Infof("Initialized transaction throttler with config: %+v", throttlerConfig) } return &txThrottler{ - config: throttlerConfig, + config: config, topoServer: topoServer, - throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"), - topoWatchers: env.Exporter().NewGaugesWithSingleLabel("TransactionThrottlerTopoWatchers", "transaction throttler topology watchers", "cell"), - healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRead", "transaction throttler healthchecks read", + throttlerRunning: env.Exporter().NewGauge(TxThrottlerName+"Running", "transaction throttler running state"), + topoWatchers: env.Exporter().NewGaugesWithSingleLabel(TxThrottlerName+"TopoWatchers", "transaction throttler topology watchers", "cell"), + healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRead", "transaction throttler healthchecks read", []string{"cell", "DbType"}), - healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRecorded", "transaction throttler healthchecks recorded", + healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRecorded", "transaction throttler healthchecks recorded", []string{"cell", "DbType"}), - requestsTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerRequests", "transaction throttler requests", "workload"), - requestsThrottled: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerThrottled", "transaction throttler requests throttled", "workload"), + requestsTotal: env.Exporter().NewCountersWithSingleLabel(TxThrottlerName+"Requests", "transaction throttler requests", "workload"), + requestsThrottled: env.Exporter().NewCountersWithSingleLabel(TxThrottlerName+"Throttled", "transaction throttler requests throttled", "workload"), } } @@ -261,7 +223,7 @@ func (t *txThrottler) InitDBConfig(target *querypb.Target) { // Open opens the transaction throttler. It must be called prior to 'Throttle'. func (t *txThrottler) Open() (err error) { - if !t.config.enabled { + if !t.config.EnableTxThrottler { return nil } if t.state != nil { @@ -277,7 +239,7 @@ func (t *txThrottler) Open() (err error) { // It should be called after the throttler is no longer needed. // It's ok to call this method on a closed throttler--in which case the method does nothing. func (t *txThrottler) Close() { - if !t.config.enabled { + if !t.config.EnableTxThrottler { return } if t.state == nil { @@ -294,7 +256,7 @@ func (t *txThrottler) Close() { // should back off). Throttle requires that Open() was previously called // successfully. func (t *txThrottler) Throttle(priority int, workload string) (result bool) { - if !t.config.enabled { + if !t.config.EnableTxThrottler { return false } if t.state == nil { @@ -310,11 +272,11 @@ func (t *txThrottler) Throttle(priority int, workload string) (result bool) { t.requestsThrottled.Add(workload, 1) } - return result && !t.config.dryRun + return result && !t.config.TxThrottlerDryRun } -func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, target *querypb.Target) (txThrottlerState, error) { - maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig} +func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfig, target *querypb.Target) (txThrottlerState, error) { + maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.TxThrottlerConfig.Get()} t, err := throttlerFactory( TxThrottlerName, @@ -326,13 +288,20 @@ func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, ta if err != nil { return nil, err } - if err := t.UpdateConfiguration(config.throttlerConfig, true /* copyZeroValues */); err != nil { + if err := t.UpdateConfiguration(config.TxThrottlerConfig.Get(), true /* copyZeroValues */); err != nil { t.Close() return nil, err } + + tabletTypes := make(map[topodatapb.TabletType]bool, len(*config.TxThrottlerTabletTypes)) + for _, tabletType := range *config.TxThrottlerTabletTypes { + tabletTypes[tabletType] = true + } + state := &txThrottlerStateImpl{ config: config, - healthCheckCells: config.healthCheckCells, + healthCheckCells: config.TxThrottlerHealthCheckCells, + tabletTypes: tabletTypes, throttler: t, txThrottler: txThrottler, } @@ -402,7 +371,7 @@ func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topo func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { var cellsUpdateTicks <-chan time.Time if ts.cellsFromTopo { - ticker := time.NewTicker(ts.config.topoRefreshInterval) + ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval) cellsUpdateTicks = ticker.C defer ticker.Stop() } @@ -420,7 +389,7 @@ func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoS func (ts *txThrottlerStateImpl) throttle() bool { if ts.throttler == nil { - log.Error("throttle called after deallocateResources was called") + log.Error("txThrottler: throttle called after deallocateResources was called") return false } // Serialize calls to ts.throttle.Throttle() @@ -442,7 +411,7 @@ func (ts *txThrottlerStateImpl) deallocateResources() { // StatsUpdate updates the health of a tablet with the given healthcheck. func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.TabletHealth) { - if ts.config.tabletTypes == nil { + if len(ts.tabletTypes) == 0 { return } @@ -451,8 +420,8 @@ func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.TabletHealth) ts.txThrottler.healthChecksReadTotal.Add(metricLabels, 1) // Monitor tablets for replication lag if they have a tablet - // type specified by the --tx_throttler_tablet_types flag. - if ts.config.tabletTypes[tabletType] { + // type specified by the --tx-throttler-tablet-types flag. + if ts.tabletTypes[tabletType] { ts.throttler.RecordReplicationLag(time.Now(), tabletStats) ts.txThrottler.healthChecksRecordedTotal.Add(metricLabels, 1) } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 6531580c446..62a4d7a4abb 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -126,6 +126,8 @@ func TestEnabledThrottler(t *testing.T) { }) assert.Nil(t, throttlerImpl.Open()) + throttlerStateImpl := throttlerImpl.state.(*txThrottlerStateImpl) + assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes) assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttlerImpl.topoWatchers.Counts()) @@ -174,33 +176,6 @@ func TestFetchKnownCells(t *testing.T) { } } -func TestNewTxThrottler(t *testing.T) { - config := tabletenv.NewDefaultConfig() - env := tabletenv.NewEnv(config, t.Name()) - - { - // disabled - config.EnableTxThrottler = false - throttler := NewTxThrottler(env, nil) - throttlerImpl, _ := throttler.(*txThrottler) - assert.NotNil(t, throttlerImpl) - assert.NotNil(t, throttlerImpl.config) - assert.False(t, throttlerImpl.config.enabled) - } - { - // enabled - config.EnableTxThrottler = true - config.TxThrottlerHealthCheckCells = []string{"cell1", "cell2"} - config.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA} - throttler := NewTxThrottler(env, nil) - throttlerImpl, _ := throttler.(*txThrottler) - assert.NotNil(t, throttlerImpl) - assert.NotNil(t, throttlerImpl.config) - assert.True(t, throttlerImpl.config.enabled) - assert.Equal(t, []string{"cell1", "cell2"}, throttlerImpl.config.healthCheckCells) - } -} - func TestDryRunThrottler(t *testing.T) { config := tabletenv.NewDefaultConfig() env := tabletenv.NewEnv(config, t.Name()) @@ -222,9 +197,9 @@ func TestDryRunThrottler(t *testing.T) { t.Run(theTestCase.Name, func(t *testing.T) { aTxThrottler := &txThrottler{ - config: &txThrottlerConfig{ - enabled: true, - dryRun: theTestCase.throttlerDryRun, + config: &tabletenv.TabletConfig{ + EnableTxThrottler: true, + TxThrottlerDryRun: theTestCase.throttlerDryRun, }, state: &mockTxThrottlerState{shouldThrottle: theTestCase.txThrottlerStateShouldThrottle}, throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),