diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 2fc4b289828..8e730475c43 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -205,7 +205,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt } qre.options.TransactionIsolation = querypb.ExecuteOptions_AUTOCOMMIT - if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) { + if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options), qre.options.GetWorkloadName()) { return nil, errTxThrottled } @@ -220,7 +220,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt } func (qre *QueryExecutor) execAsTransaction(f func(conn *StatefulConnection) (*sqltypes.Result, error)) (*sqltypes.Result, error) { - if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) { + if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options), qre.options.GetWorkloadName()) { return nil, errTxThrottled } conn, beginSQL, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting) diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index 4131a97b039..934d933ecac 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -1587,6 +1587,6 @@ func (m mockTxThrottler) Open() (err error) { func (m mockTxThrottler) Close() { } -func (m mockTxThrottler) Throttle(priority int) (result bool) { +func (m mockTxThrottler) Throttle(priority int, workload string) (result bool) { return m.throttle } diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index efb5fd5add4..cc3160fe1a9 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -488,7 +488,7 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { startTime := time.Now() - if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options)) { + if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options), options.GetWorkloadName()) { return errTxThrottled } var connSetting *pools.Setting diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 961430ad560..0bc8c34a69b 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -73,7 +73,7 @@ type TxThrottler interface { InitDBConfig(target *querypb.Target) Open() (err error) Close() - Throttle(priority int) (result bool) + Throttle(priority int, workload string) (result bool) } // ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler @@ -158,8 +158,8 @@ type txThrottler struct { topoWatchers *stats.GaugesWithSingleLabel healthChecksReadTotal *stats.CountersWithMultiLabels healthChecksRecordedTotal *stats.CountersWithMultiLabels - requestsTotal *stats.Counter - requestsThrottled *stats.Counter + requestsTotal *stats.CountersWithSingleLabel + requestsThrottled *stats.CountersWithSingleLabel } // txThrottlerConfig holds the parameters that need to be @@ -238,8 +238,8 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler { []string{"cell", "DbType"}), healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRecorded", "transaction throttler healthchecks recorded", []string{"cell", "DbType"}), - requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"), - requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"), + requestsTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerRequests", "transaction throttler requests", "workload"), + requestsThrottled: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerThrottled", "transaction throttler requests throttled", "workload"), } } @@ -282,7 +282,7 @@ func (t *txThrottler) Close() { // It returns true if the transaction should not proceed (the caller // should back off). Throttle requires that Open() was previously called // successfully. -func (t *txThrottler) Throttle(priority int) (result bool) { +func (t *txThrottler) Throttle(priority int, workload string) (result bool) { if !t.config.enabled { return false } @@ -294,9 +294,9 @@ func (t *txThrottler) Throttle(priority int) (result bool) { // are less likely to be throttled. result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority - t.requestsTotal.Add(1) + t.requestsTotal.Add(workload, 1) if result { - t.requestsThrottled.Add(1) + t.requestsThrottled.Add(workload, 1) } return result diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 5d7089cc6ed..6320a641d4a 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -50,7 +50,7 @@ func TestDisabledThrottler(t *testing.T) { Shard: "shard", }) assert.Nil(t, throttler.Open()) - assert.False(t, throttler.Throttle(0)) + assert.False(t, throttler.Throttle(0, "some_workload")) throttlerImpl, _ := throttler.(*txThrottler) assert.Zero(t, throttlerImpl.throttlerRunning.Get()) throttler.Close() @@ -129,9 +129,9 @@ func TestEnabledThrottler(t *testing.T) { assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttlerImpl.topoWatchers.Counts()) - assert.False(t, throttlerImpl.Throttle(100)) - assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Get()) - assert.Zero(t, throttlerImpl.requestsThrottled.Get()) + assert.False(t, throttlerImpl.Throttle(100, "some_workload")) + assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Counts()["some_workload"]) + assert.Zero(t, throttlerImpl.requestsThrottled.Counts()["some_workload"]) throttlerImpl.state.StatsUpdate(tabletStats) // This calls replication lag thing assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksReadTotal.Counts()) @@ -148,14 +148,14 @@ func TestEnabledThrottler(t *testing.T) { assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts()) // The second throttle call should reject. - assert.True(t, throttlerImpl.Throttle(100)) - assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Get()) - assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get()) + assert.True(t, throttlerImpl.Throttle(100, "some_workload")) + assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Counts()["some_workload"]) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some_workload"]) // This call should not throttle due to priority. Check that's the case and counters agree. - assert.False(t, throttlerImpl.Throttle(0)) - assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Get()) - assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get()) + assert.False(t, throttlerImpl.Throttle(0, "some_workload")) + assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Counts()["some_workload"]) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some_workload"]) throttlerImpl.Close() assert.Zero(t, throttlerImpl.throttlerRunning.Get()) assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttlerImpl.topoWatchers.Counts())