diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 30e2ec19c56..e25e4c0da89 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -31,11 +31,11 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/throttler" "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" querypb "vitess.io/vitess/go/vt/proto/query" throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) // These vars store the functions used to create the topo server, healthcheck, @@ -63,6 +63,10 @@ func resetTxThrottlerFactories() { } } +func init() { + resetTxThrottlerFactories() +} + // TxThrottler defines the interface for the transaction throttler. type TxThrottler interface { InitDBConfig(target *querypb.Target) @@ -71,10 +75,6 @@ type TxThrottler interface { Throttle(priority int) (result bool) } -func init() { - resetTxThrottlerFactories() -} - // ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler // It is only used here to allow mocking out a throttler object. type ThrottlerInterface interface { @@ -142,9 +142,12 @@ type txThrottler struct { topoServer *topo.Server // stats - throttlerRunning *stats.Gauge - requestsTotal *stats.Counter - requestsThrottled *stats.Counter + throttlerRunning *stats.Gauge + topoWatchers *stats.GaugesWithSingleLabel + healthChecksReadTotal *stats.CountersWithMultiLabels + healthChecksRecordedTotal *stats.CountersWithMultiLabels + requestsTotal *stats.Counter + requestsThrottled *stats.Counter } // txThrottlerConfig holds the parameters that need to be @@ -161,12 +164,13 @@ type txThrottlerConfig struct { healthCheckCells []string // tabletTypes stores the tablet types for throttling - tabletTypes *topoproto.TabletTypeListFlag + tabletTypes map[topodatapb.TabletType]bool } // txThrottlerState holds the state of an open TxThrottler object. type txThrottlerState struct { - config *txThrottlerConfig + config *txThrottlerConfig + txThrottler *txThrottler // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). // That method is required to be called in serial for each threadId. @@ -175,7 +179,7 @@ type txThrottlerState struct { stopHealthCheck context.CancelFunc healthCheck discovery.HealthCheck - topologyWatchers []TopologyWatcherInterface + topologyWatchers map[string]TopologyWatcherInterface } // NewTxThrottler tries to construct a txThrottler from the @@ -191,9 +195,14 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler { // is immutable. healthCheckCells := env.Config().TxThrottlerHealthCheckCells + tabletTypes := make(map[topodatapb.TabletType]bool, len(*env.Config().TxThrottlerTabletTypes)) + for _, tabletType := range *env.Config().TxThrottlerTabletTypes { + tabletTypes[tabletType] = true + } + throttlerConfig = &txThrottlerConfig{ enabled: true, - tabletTypes: env.Config().TxThrottlerTabletTypes, + tabletTypes: tabletTypes, throttlerConfig: env.Config().TxThrottlerConfig.Get(), healthCheckCells: healthCheckCells, } @@ -202,9 +211,14 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler { } return &txThrottler{ - config: throttlerConfig, - topoServer: topoServer, - throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"), + config: throttlerConfig, + topoServer: topoServer, + throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"), + topoWatchers: env.Exporter().NewGaugesWithSingleLabel("TransactionThrottlerTopoWatchers", "transaction throttler topology watchers", "cell"), + healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRead", "transaction throttler healthchecks read", + []string{"cell", "DbType"}), + healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRecorded", "transaction throttler healthchecks recorded", + []string{"cell", "DbType"}), requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"), requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"), } @@ -225,7 +239,7 @@ func (t *txThrottler) Open() (err error) { } log.Info("txThrottler: opening") t.throttlerRunning.Set(1) - t.state, err = newTxThrottlerState(t.topoServer, t.config, t.target) + t.state, err = newTxThrottlerState(t, t.config, t.target) return err } @@ -269,7 +283,7 @@ func (t *txThrottler) Throttle(priority int) (result bool) { return result } -func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) { +func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) { maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig} t, err := throttlerFactory( @@ -286,27 +300,28 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar t.Close() return nil, err } - result := &txThrottlerState{ - config: config, - throttler: t, + state := &txThrottlerState{ + config: config, + throttler: t, + txThrottler: txThrottler, } - createTxThrottlerHealthCheck(topoServer, config, result, target.Cell) + createTxThrottlerHealthCheck(txThrottler.topoServer, config, state, target.Cell) - result.topologyWatchers = make( - []TopologyWatcherInterface, 0, len(config.healthCheckCells)) + state.topologyWatchers = make( + map[string]TopologyWatcherInterface, len(config.healthCheckCells)) for _, cell := range config.healthCheckCells { - result.topologyWatchers = append( - result.topologyWatchers, - topologyWatcherFactory( - topoServer, - result.healthCheck, - cell, - target.Keyspace, - target.Shard, - discovery.DefaultTopologyWatcherRefreshInterval, - discovery.DefaultTopoReadConcurrency)) + state.topologyWatchers[cell] = topologyWatcherFactory( + txThrottler.topoServer, + state.healthCheck, + cell, + target.Keyspace, + target.Shard, + discovery.DefaultTopologyWatcherRefreshInterval, + discovery.DefaultTopoReadConcurrency, + ) + txThrottler.topoWatchers.Add(cell, 1) } - return result, nil + return state, nil } func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerConfig, result *txThrottlerState, cell string) { @@ -341,8 +356,9 @@ func (ts *txThrottlerState) deallocateResources() { // We don't really need to nil out the fields here // as deallocateResources is not expected to be called // more than once, but it doesn't hurt to do so. - for _, watcher := range ts.topologyWatchers { + for cell, watcher := range ts.topologyWatchers { watcher.Stop() + ts.txThrottler.topoWatchers.Reset(cell) } ts.topologyWatchers = nil @@ -361,12 +377,14 @@ func (ts *txThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) { return } + tabletType := tabletStats.Target.TabletType + metricLabels := []string{tabletStats.Target.Cell, tabletType.String()} + ts.txThrottler.healthChecksReadTotal.Add(metricLabels, 1) + // Monitor tablets for replication lag if they have a tablet // type specified by the --tx_throttler_tablet_types flag. - for _, expectedTabletType := range *ts.config.tabletTypes { - if tabletStats.Target.TabletType == expectedTabletType { - ts.throttler.RecordReplicationLag(time.Now(), tabletStats) - return - } + if ts.config.tabletTypes[tabletType] { + ts.throttler.RecordReplicationLag(time.Now(), tabletStats) + ts.txThrottler.healthChecksRecordedTotal.Add(metricLabels, 1) } } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index ffb88bf21f6..9c9c725e1fd 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -92,6 +92,7 @@ func TestEnabledThrottler(t *testing.T) { call1.Return(0 * time.Second) tabletStats := &discovery.TabletHealth{ Target: &querypb.Target{ + Cell: "cell1", TabletType: topodatapb.TabletType_REPLICA, }, } @@ -119,24 +120,33 @@ func TestEnabledThrottler(t *testing.T) { throttlerImpl, _ := throttler.(*txThrottler) assert.NotNil(t, throttlerImpl) throttler.InitDBConfig(&querypb.Target{ + Cell: "cell1", Keyspace: "keyspace", Shard: "shard", }) + assert.Nil(t, throttlerImpl.Open()) assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) + assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttlerImpl.topoWatchers.Counts()) assert.False(t, throttlerImpl.Throttle(100)) assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Get()) assert.Zero(t, throttlerImpl.requestsThrottled.Get()) throttlerImpl.state.StatsUpdate(tabletStats) // This calls replication lag thing + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksReadTotal.Counts()) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts()) rdonlyTabletStats := &discovery.TabletHealth{ Target: &querypb.Target{ + Cell: "cell2", TabletType: topodatapb.TabletType_RDONLY, }, } // This call should not be forwarded to the go/vt/throttlerImpl.Throttler object. throttlerImpl.state.StatsUpdate(rdonlyTabletStats) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1, "cell2.RDONLY": 1}, throttlerImpl.healthChecksReadTotal.Counts()) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts()) + // The second throttle call should reject. assert.True(t, throttlerImpl.Throttle(100)) assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Get()) @@ -148,6 +158,7 @@ func TestEnabledThrottler(t *testing.T) { assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get()) throttlerImpl.Close() assert.Zero(t, throttlerImpl.throttlerRunning.Get()) + assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttlerImpl.topoWatchers.Counts()) } func TestNewTxThrottler(t *testing.T) {