diff --git a/go.sum b/go.sum index 6ce1df5e358..cc300ba3899 100644 --- a/go.sum +++ b/go.sum @@ -169,7 +169,6 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/cyphar/filepath-securejoin v0.2.3 h1:YX6ebbZCZP7VkM3scTTokDgBL2TY741X51MTk3ycuNI= github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg= github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go index 206891e311e..c071f0694a6 100644 --- a/go/vt/throttler/throttler.go +++ b/go/vt/throttler/throttler.go @@ -33,6 +33,8 @@ import ( "sync" "time" + "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" @@ -224,6 +226,28 @@ func (t *Throttler) Throttle(threadID int) time.Duration { return t.threadThrottlers[threadID].throttle(t.nowFunc()) } +// MaxLag returns the max of all the last replication lag values seen across all tablets of +// the provided type, excluding ignored tablets. +func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 { + cache := t.maxReplicationLagModule.lagCacheByType(tabletType) + + var maxLag uint32 + cacheEntries := cache.entries + + for key := range cacheEntries { + if cache.isIgnored(key) { + continue + } + + lag := cache.latest(key).Stats.ReplicationLagSeconds + if lag > maxLag { + maxLag = lag + } + } + + return maxLag +} + // ThreadFinished marks threadID as finished and redistributes the thread's // rate allotment across the other threads. // After ThreadFinished() is called, Throttle() must not be called anymore. diff --git a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go index a3da535037e..76a02c121b7 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler (interfaces: ThrottlerInterface) +// Source: ./go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go // Package txthrottler is a generated GoMock package. package txthrottler @@ -12,6 +12,7 @@ import ( discovery "vitess.io/vitess/go/vt/discovery" throttlerdata "vitess.io/vitess/go/vt/proto/throttlerdata" + topodata "vitess.io/vitess/go/vt/proto/topodata" ) // MockThrottlerInterface is a mock of ThrottlerInterface interface. @@ -63,6 +64,20 @@ func (mr *MockThrottlerInterfaceMockRecorder) GetConfiguration() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConfiguration", reflect.TypeOf((*MockThrottlerInterface)(nil).GetConfiguration)) } +// MaxLag mocks base method. +func (m *MockThrottlerInterface) MaxLag(tabletType topodata.TabletType) uint32 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MaxLag", tabletType) + ret0, _ := ret[0].(uint32) + return ret0 +} + +// MaxLag indicates an expected call of MaxLag. +func (mr *MockThrottlerInterfaceMockRecorder) MaxLag(tabletType interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxLag", reflect.TypeOf((*MockThrottlerInterface)(nil).MaxLag), tabletType) +} + // MaxRate mocks base method. func (m *MockThrottlerInterface) MaxRate() int64 { m.ctrl.T.Helper() @@ -78,15 +93,15 @@ func (mr *MockThrottlerInterfaceMockRecorder) MaxRate() *gomock.Call { } // RecordReplicationLag mocks base method. -func (m *MockThrottlerInterface) RecordReplicationLag(arg0 time.Time, arg1 *discovery.LegacyTabletStats) { +func (m *MockThrottlerInterface) RecordReplicationLag(time time.Time, ts *discovery.LegacyTabletStats) { m.ctrl.T.Helper() - m.ctrl.Call(m, "RecordReplicationLag", arg0, arg1) + m.ctrl.Call(m, "RecordReplicationLag", time, ts) } // RecordReplicationLag indicates an expected call of RecordReplicationLag. -func (mr *MockThrottlerInterfaceMockRecorder) RecordReplicationLag(arg0, arg1 any) *gomock.Call { +func (mr *MockThrottlerInterfaceMockRecorder) RecordReplicationLag(time, ts interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordReplicationLag", reflect.TypeOf((*MockThrottlerInterface)(nil).RecordReplicationLag), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordReplicationLag", reflect.TypeOf((*MockThrottlerInterface)(nil).RecordReplicationLag), time, ts) } // ResetConfiguration mocks base method. @@ -102,53 +117,53 @@ func (mr *MockThrottlerInterfaceMockRecorder) ResetConfiguration() *gomock.Call } // SetMaxRate mocks base method. -func (m *MockThrottlerInterface) SetMaxRate(arg0 int64) { +func (m *MockThrottlerInterface) SetMaxRate(rate int64) { m.ctrl.T.Helper() - m.ctrl.Call(m, "SetMaxRate", arg0) + m.ctrl.Call(m, "SetMaxRate", rate) } // SetMaxRate indicates an expected call of SetMaxRate. -func (mr *MockThrottlerInterfaceMockRecorder) SetMaxRate(arg0 any) *gomock.Call { +func (mr *MockThrottlerInterfaceMockRecorder) SetMaxRate(rate interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMaxRate", reflect.TypeOf((*MockThrottlerInterface)(nil).SetMaxRate), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMaxRate", reflect.TypeOf((*MockThrottlerInterface)(nil).SetMaxRate), rate) } // ThreadFinished mocks base method. -func (m *MockThrottlerInterface) ThreadFinished(arg0 int) { +func (m *MockThrottlerInterface) ThreadFinished(threadID int) { m.ctrl.T.Helper() - m.ctrl.Call(m, "ThreadFinished", arg0) + m.ctrl.Call(m, "ThreadFinished", threadID) } // ThreadFinished indicates an expected call of ThreadFinished. -func (mr *MockThrottlerInterfaceMockRecorder) ThreadFinished(arg0 any) *gomock.Call { +func (mr *MockThrottlerInterfaceMockRecorder) ThreadFinished(threadID interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ThreadFinished", reflect.TypeOf((*MockThrottlerInterface)(nil).ThreadFinished), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ThreadFinished", reflect.TypeOf((*MockThrottlerInterface)(nil).ThreadFinished), threadID) } // Throttle mocks base method. -func (m *MockThrottlerInterface) Throttle(arg0 int) time.Duration { +func (m *MockThrottlerInterface) Throttle(threadID int) time.Duration { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Throttle", arg0) + ret := m.ctrl.Call(m, "Throttle", threadID) ret0, _ := ret[0].(time.Duration) return ret0 } // Throttle indicates an expected call of Throttle. -func (mr *MockThrottlerInterfaceMockRecorder) Throttle(arg0 any) *gomock.Call { +func (mr *MockThrottlerInterfaceMockRecorder) Throttle(threadID interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Throttle", reflect.TypeOf((*MockThrottlerInterface)(nil).Throttle), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Throttle", reflect.TypeOf((*MockThrottlerInterface)(nil).Throttle), threadID) } // UpdateConfiguration mocks base method. -func (m *MockThrottlerInterface) UpdateConfiguration(arg0 *throttlerdata.Configuration, arg1 bool) error { +func (m *MockThrottlerInterface) UpdateConfiguration(configuration *throttlerdata.Configuration, copyZeroValues bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateConfiguration", arg0, arg1) + ret := m.ctrl.Call(m, "UpdateConfiguration", configuration, copyZeroValues) ret0, _ := ret[0].(error) return ret0 } // UpdateConfiguration indicates an expected call of UpdateConfiguration. -func (mr *MockThrottlerInterfaceMockRecorder) UpdateConfiguration(arg0, arg1 any) *gomock.Call { +func (mr *MockThrottlerInterfaceMockRecorder) UpdateConfiguration(configuration, copyZeroValues interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateConfiguration", reflect.TypeOf((*MockThrottlerInterface)(nil).UpdateConfiguration), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateConfiguration", reflect.TypeOf((*MockThrottlerInterface)(nil).UpdateConfiguration), configuration, copyZeroValues) } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 9f2f369ffd9..18593bc73cb 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -21,6 +21,7 @@ import ( "fmt" "math/rand" "sync" + "sync/atomic" "time" "google.golang.org/protobuf/encoding/prototext" @@ -86,6 +87,7 @@ type ThrottlerInterface interface { GetConfiguration() *throttlerdatapb.Configuration UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error ResetConfiguration() + MaxLag(tabletType topodatapb.TabletType) uint32 } // TopologyWatcherInterface defines the public interface that is implemented by @@ -184,6 +186,10 @@ type txThrottlerStateImpl struct { healthCheck discovery.LegacyHealthCheck topologyWatchers []TopologyWatcherInterface + + maxLag int64 + done chan bool + waitForTermination sync.WaitGroup } // NewTxThrottler tries to construct a txThrottler from the @@ -301,7 +307,7 @@ func (t *txThrottler) Throttle(priority int, workload string) (result bool) { // Throttle according to both what the throttler state says and the priority. Workloads with lower priority value // are less likely to be throttled. - result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority + result = rand.Intn(sqlparser.MaxPriorityValue) < priority && t.state.throttle() t.requestsTotal.Add(workload, 1) if result { @@ -331,6 +337,7 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar result := &txThrottlerStateImpl{ config: config, throttler: t, + done: make(chan bool, 1), } result.healthCheck = healthCheckFactory() result.healthCheck.SetListener(result, false /* sendDownEvents */) @@ -348,6 +355,10 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar discovery.DefaultTopologyWatcherRefreshInterval, discovery.DefaultTopoReadConcurrency)) } + + result.waitForTermination.Add(1) + go result.updateMaxLag() + return result, nil } @@ -359,7 +370,34 @@ func (ts *txThrottlerStateImpl) throttle() bool { // Serialize calls to ts.throttle.Throttle() ts.throttleMu.Lock() defer ts.throttleMu.Unlock() - return ts.throttler.Throttle(0 /* threadId */) > 0 + + maxLag := atomic.LoadInt64(&ts.maxLag) + + return maxLag > ts.config.throttlerConfig.TargetReplicationLagSec && + ts.throttler.Throttle(0 /* threadId */) > 0 +} + +func (ts *txThrottlerStateImpl) updateMaxLag() { + defer ts.waitForTermination.Done() + // We use half of the target lag to ensure we have enough resolution to see changes in lag below that value + ticker := time.NewTicker(time.Duration(ts.config.throttlerConfig.TargetReplicationLagSec/2) * time.Second) +outerloop: + for { + select { + case <-ticker.C: + var maxLag uint32 + + for _, tabletType := range ts.config.tabletTypes { + maxLagPerTabletType := ts.throttler.MaxLag(tabletType) + if maxLagPerTabletType > maxLag { + maxLag = maxLagPerTabletType + } + } + atomic.StoreInt64(&ts.maxLag, int64(maxLag)) + case <-ts.done: + break outerloop + } + } } func (ts *txThrottlerStateImpl) deallocateResources() { @@ -374,6 +412,8 @@ func (ts *txThrottlerStateImpl) deallocateResources() { ts.healthCheck.Close() ts.healthCheck = nil + ts.done <- true + ts.waitForTermination.Wait() // After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not // to be executing, so we can safely close the throttler. ts.throttler.Close() diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 7b9e4bc98bd..a3e4d219da3 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -22,6 +22,7 @@ package txthrottler //go:generate mockgen -destination mock_topology_watcher_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler TopologyWatcherInterface import ( + "sync/atomic" "testing" "time" @@ -84,32 +85,50 @@ func TestEnabledThrottler(t *testing.T) { } mockThrottler := NewMockThrottlerInterface(mockCtrl) + throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { assert.Equal(t, 1, threadCount) return mockThrottler, nil } - call0 := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */) - call1 := mockThrottler.EXPECT().Throttle(0) - call1.Return(0 * time.Second) + var calls []*gomock.Call + + call := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */) + calls = append(calls, call) + + // 1 + call = mockThrottler.EXPECT().Throttle(0) + call.Return(0 * time.Second) + calls = append(calls, call) + tabletStats := &discovery.LegacyTabletStats{ Target: &querypb.Target{ TabletType: topodatapb.TabletType_REPLICA, }, } - call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats) - call3 := mockThrottler.EXPECT().Throttle(0) - call3.Return(1 * time.Second) - call4 := mockThrottler.EXPECT().Throttle(0) - call4.Return(1 * time.Second) - calllast := mockThrottler.EXPECT().Close() + call = mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats) + calls = append(calls, call) - call1.After(call0) - call2.After(call1) - call3.After(call2) - call4.After(call3) - calllast.After(call4) + // 2 + call = mockThrottler.EXPECT().Throttle(0) + call.Return(1 * time.Second) + calls = append(calls, call) + + // 3 + // Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first + // whether the priority allows for throttling or not, so no need to mock calls in mockThrottler.Throttle() + + // 4 + // Nothing gets mocked here because the order of evaluation in txThrottlerStateImpl.Throttle() evaluates first + // whether there is lag or not, so no call to the underlying mockThrottler is issued. + + call = mockThrottler.EXPECT().Close() + calls = append(calls, call) + + for i := 1; i < len(calls); i++ { + calls[i].After(calls[i-1]) + } config := tabletenv.NewDefaultConfig() config.EnableTxThrottler = true @@ -126,6 +145,14 @@ func TestEnabledThrottler(t *testing.T) { assert.Nil(t, throttler.Open()) assert.Equal(t, int64(1), throttler.throttlerRunning.Get()) + throttlerImpl, ok := throttler.state.(*txThrottlerStateImpl) + assert.True(t, ok) + // Stop the go routine that keeps updating the cached shard's max lag to preventi it from changing the value in a + // way that will interfere with how we manipulate that value in our tests to evaluate different cases: + throttlerImpl.done <- true + + // 1 should not throttle due to return value of underlying Throttle(), despite high lag + atomic.StoreInt64(&throttlerImpl.maxLag, 20) assert.False(t, throttler.Throttle(100, "some-workload")) assert.Equal(t, int64(1), throttler.requestsTotal.Counts()["some-workload"]) assert.Zero(t, throttler.requestsThrottled.Counts()["some-workload"]) @@ -138,15 +165,23 @@ func TestEnabledThrottler(t *testing.T) { } // This call should not be forwarded to the go/vt/throttler.Throttler object. hcListener.StatsUpdate(rdonlyTabletStats) - // The second throttle call should reject. + + // 2 should throttle due to return value of underlying Throttle(), high lag & priority = 100 assert.True(t, throttler.Throttle(100, "some-workload")) assert.Equal(t, int64(2), throttler.requestsTotal.Counts()["some-workload"]) assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"]) - // This call should not throttle due to priority. Check that's the case and counters agree. + // 3 should not throttle despite return value of underlying Throttle() and high lag, due to priority = 0 assert.False(t, throttler.Throttle(0, "some-workload")) assert.Equal(t, int64(3), throttler.requestsTotal.Counts()["some-workload"]) assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"]) + + // 4 should not throttle despite return value of underlying Throttle() and priority = 100, due to low lag + atomic.StoreInt64(&throttlerImpl.maxLag, 1) + assert.False(t, throttler.Throttle(100, "some-workload")) + assert.Equal(t, int64(4), throttler.requestsTotal.Counts()["some-workload"]) + assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"]) + throttler.Close() assert.Zero(t, throttler.throttlerRunning.Get()) }