diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go index 83a1c52225e..68905db1ad5 100644 --- a/go/vt/throttler/throttler.go +++ b/go/vt/throttler/throttler.go @@ -35,6 +35,7 @@ import ( "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/proto/topodata" throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" ) @@ -224,6 +225,28 @@ func (t *Throttler) Throttle(threadID int) time.Duration { return t.threadThrottlers[threadID].throttle(t.nowFunc()) } +// MaxLag returns the max of all the last replication lag values seen across all tablets of +// the provided type, excluding ignored tablets. +func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 { + cache := t.maxReplicationLagModule.lagCacheByType(tabletType) + + var maxLag uint32 + cacheEntries := cache.entries + + for key := range cacheEntries { + if cache.isIgnored(key) { + continue + } + + lag := cache.latest(key).Stats.ReplicationLagSeconds + if lag > maxLag { + maxLag = lag + } + } + + return maxLag +} + // ThreadFinished marks threadID as finished and redistributes the thread's // rate allotment across the other threads. // After ThreadFinished() is called, Throttle() must not be called anymore. diff --git a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go index 3ffb3a78a1a..aeb75d258a3 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go @@ -12,6 +12,7 @@ import ( discovery "vitess.io/vitess/go/vt/discovery" throttlerdata "vitess.io/vitess/go/vt/proto/throttlerdata" + topodata "vitess.io/vitess/go/vt/proto/topodata" ) // MockThrottlerInterface is a mock of ThrottlerInterface interface. @@ -63,6 +64,20 @@ func (mr *MockThrottlerInterfaceMockRecorder) GetConfiguration() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConfiguration", reflect.TypeOf((*MockThrottlerInterface)(nil).GetConfiguration)) } +// MaxLag mocks base method. +func (m *MockThrottlerInterface) MaxLag(tabletType topodata.TabletType) uint32 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MaxLag", tabletType) + ret0, _ := ret[0].(uint32) + return ret0 +} + +// MaxLag indicates an expected call of LastMaxLagNotIgnoredForTabletType. +func (mr *MockThrottlerInterfaceMockRecorder) MaxLag(tabletType interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxLag", reflect.TypeOf((*MockThrottlerInterface)(nil).MaxLag), tabletType) +} + // MaxRate mocks base method. func (m *MockThrottlerInterface) MaxRate() int64 { m.ctrl.T.Helper() diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index bc5235593ac..82cb074b3d7 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -22,8 +22,11 @@ import ( "math/rand" "strings" "sync" + "sync/atomic" "time" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" @@ -89,6 +92,7 @@ type ThrottlerInterface interface { GetConfiguration() *throttlerdatapb.Configuration UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error ResetConfiguration() + MaxLag(tabletType topodatapb.TabletType) uint32 } // TopologyWatcherInterface defines the public interface that is implemented by @@ -178,6 +182,10 @@ type txThrottlerState struct { healthCheck discovery.HealthCheck topologyWatchers []TopologyWatcherInterface + + maxLag int64 + done chan bool + waitForTermination sync.WaitGroup } // NewTxThrottler tries to construct a txThrottler from the @@ -290,7 +298,7 @@ func (t *txThrottler) Throttle(priority int) (result bool) { // Throttle according to both what the throttler state says and the priority. Workloads with lower priority value // are less likely to be throttled. - result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority + result = rand.Intn(sqlparser.MaxPriorityValue) < priority && t.state.throttle() t.requestsTotal.Add(1) if result { @@ -320,6 +328,7 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar result := &txThrottlerState{ config: config, throttler: t, + done: make(chan bool, 2), } createTxThrottlerHealthCheck(topoServer, config, result, target.Cell) @@ -353,6 +362,8 @@ func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerCo case th := <-ch: result.StatsUpdate(th) } + result.waitForTermination.Add(1) + go result.updateMaxLag() } }(ctx) } @@ -365,7 +376,35 @@ func (ts *txThrottlerState) throttle() bool { // Serialize calls to ts.throttle.Throttle() ts.throttleMu.Lock() defer ts.throttleMu.Unlock() - return ts.throttler.Throttle(0 /* threadId */) > 0 + + maxLag := atomic.LoadInt64(&ts.maxLag) + + return maxLag > ts.config.throttlerConfig.TargetReplicationLagSec && + ts.throttler.Throttle(0 /* threadId */) > 0 +} + +func (ts *txThrottlerState) updateMaxLag() { + defer ts.waitForTermination.Done() + // We use half of the target lag to ensure we have enough resolution to see changes in lag below that value + ticker := time.NewTicker(time.Duration(ts.config.throttlerConfig.TargetReplicationLagSec/2) * time.Second) + defer ticker.Stop() +outerloop: + for { + select { + case <-ticker.C: + var maxLag uint32 + + for _, tabletType := range *ts.config.tabletTypes { + maxLagPerTabletType := ts.throttler.MaxLag(tabletType) + if maxLagPerTabletType > maxLag { + maxLag = maxLagPerTabletType + } + } + atomic.StoreInt64(&ts.maxLag, int64(maxLag)) + case <-ts.done: + break outerloop + } + } } func (ts *txThrottlerState) deallocateResources() { @@ -380,6 +419,8 @@ func (ts *txThrottlerState) deallocateResources() { ts.healthCheck.Close() ts.healthCheck = nil + ts.done <- true + ts.waitForTermination.Wait() // After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not // to be executing, so we can safely close the throttler. ts.throttler.Close() diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 4e95ebe7097..6d42869befa 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -22,6 +22,7 @@ package txthrottler //go:generate mockgen -destination mock_topology_watcher_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler TopologyWatcherInterface import ( + "sync/atomic" "testing" "time" @@ -88,27 +89,44 @@ func TestEnabledThrottler(t *testing.T) { return mockThrottler, nil } - call0 := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */) - call1 := mockThrottler.EXPECT().Throttle(0) - call1.Return(0 * time.Second) + var calls []*gomock.Call + + call := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */) + calls = append(calls, call) + + // 1 + call = mockThrottler.EXPECT().Throttle(0) + call.Return(0 * time.Second) + calls = append(calls, call) + tabletStats := &discovery.TabletHealth{ Target: &querypb.Target{ TabletType: topodatapb.TabletType_REPLICA, }, } - call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats) - call3 := mockThrottler.EXPECT().Throttle(0) - call3.Return(1 * time.Second) - call4 := mockThrottler.EXPECT().Throttle(0) - call4.Return(1 * time.Second) - calllast := mockThrottler.EXPECT().Close() + call = mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats) + calls = append(calls, call) + + // 2 + call = mockThrottler.EXPECT().Throttle(0) + call.Return(1 * time.Second) + calls = append(calls, call) + + // 3 + // Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first + // whether the priority allows for throttling or not, so no need to mock calls in mockThrottler.Throttle() - call1.After(call0) - call2.After(call1) - call3.After(call2) - call4.After(call3) - calllast.After(call4) + // 4 + // Nothing gets mocked here because the order of evaluation in txThrottlerStateImpl.Throttle() evaluates first + // whether there is lag or not, so no call to the underlying mockThrottler is issued. + + call = mockThrottler.EXPECT().Close() + calls = append(calls, call) + + for i := 1; i < len(calls); i++ { + calls[i].After(calls[i-1]) + } config := tabletenv.NewDefaultConfig() config.EnableTxThrottler = true @@ -116,38 +134,52 @@ func TestEnabledThrottler(t *testing.T) { config.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA} env := tabletenv.NewEnv(config, t.Name()) - throttler, err := tryCreateTxThrottler(env, ts) + throttlerImpl, err := tryCreateTxThrottler(env, ts) assert.Nil(t, err) - throttler.InitDBConfig(&querypb.Target{ + throttlerImpl.InitDBConfig(&querypb.Target{ Keyspace: "keyspace", Shard: "shard", }) - assert.Nil(t, throttler.Open()) - assert.Equal(t, int64(1), throttler.throttlerRunning.Get()) + assert.Nil(t, throttlerImpl.Open()) + assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) + + // Stop the go routine that keeps updating the cached shard's max lag to prevent it from changing the value in a + // way that will interfere with how we manipulate that value in our tests to evaluate different cases: + throttlerImpl.state.done <- true - assert.False(t, throttler.Throttle(100)) - assert.Equal(t, int64(1), throttler.requestsTotal.Get()) - assert.Zero(t, throttler.requestsThrottled.Get()) + // 1 should not throttle due to return value of underlying Throttle(), despite high lag + atomic.StoreInt64(&throttlerImpl.state.maxLag, 20) + assert.False(t, throttlerImpl.Throttle(100)) + assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Get()) + assert.Zero(t, throttlerImpl.requestsThrottled.Get()) - throttler.state.StatsUpdate(tabletStats) // This calls replication lag thing + throttlerImpl.state.StatsUpdate(tabletStats) // This calls replication lag thing rdonlyTabletStats := &discovery.TabletHealth{ Target: &querypb.Target{ TabletType: topodatapb.TabletType_RDONLY, }, } // This call should not be forwarded to the go/vt/throttler.Throttler object. - throttler.state.StatsUpdate(rdonlyTabletStats) - // The second throttle call should reject. - assert.True(t, throttler.Throttle(100)) - assert.Equal(t, int64(2), throttler.requestsTotal.Get()) - assert.Equal(t, int64(1), throttler.requestsThrottled.Get()) + throttlerImpl.state.StatsUpdate(rdonlyTabletStats) - // This call should not throttle due to priority. Check that's the case and counters agree. - assert.False(t, throttler.Throttle(0)) - assert.Equal(t, int64(3), throttler.requestsTotal.Get()) - assert.Equal(t, int64(1), throttler.requestsThrottled.Get()) - throttler.Close() - assert.Zero(t, throttler.throttlerRunning.Get()) + // 2 should throttle due to return value of underlying Throttle(), high lag & priority = 100 + assert.True(t, throttlerImpl.Throttle(100)) + assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Get()) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get()) + + // 3 should not throttle despite return value of underlying Throttle() and high lag, due to priority = 0 + assert.False(t, throttlerImpl.Throttle(0)) + assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Get()) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get()) + + // 4 should not throttle despite return value of underlying Throttle() and priority = 100, due to low lag + atomic.StoreInt64(&throttlerImpl.state.maxLag, 1) + assert.False(t, throttlerImpl.Throttle(100)) + assert.Equal(t, int64(4), throttlerImpl.requestsTotal.Get()) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get()) + + throttlerImpl.Close() + assert.Zero(t, throttlerImpl.throttlerRunning.Get()) } func TestNewTxThrottler(t *testing.T) {