From 7cd9e6047dd8fe3039f5412f1d9357e1b83932c8 Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Fri, 9 Feb 2024 05:11:32 +0000 Subject: [PATCH 1/5] Cherry-pick 2b25639f250d929321416ae4824889f3f9d81c51 with conflicts --- go/vt/throttler/throttler.go | 23 ++++ .../txthrottler/mock_throttler_test.go | 15 +++ .../tabletserver/txthrottler/tx_throttler.go | 125 +++++++++++++++++- .../txthrottler/tx_throttler_test.go | 102 ++++++++++++-- 4 files changed, 249 insertions(+), 16 deletions(-) diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go index 83a1c52225e..68905db1ad5 100644 --- a/go/vt/throttler/throttler.go +++ b/go/vt/throttler/throttler.go @@ -35,6 +35,7 @@ import ( "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/proto/topodata" throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" ) @@ -224,6 +225,28 @@ func (t *Throttler) Throttle(threadID int) time.Duration { return t.threadThrottlers[threadID].throttle(t.nowFunc()) } +// MaxLag returns the max of all the last replication lag values seen across all tablets of +// the provided type, excluding ignored tablets. +func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 { + cache := t.maxReplicationLagModule.lagCacheByType(tabletType) + + var maxLag uint32 + cacheEntries := cache.entries + + for key := range cacheEntries { + if cache.isIgnored(key) { + continue + } + + lag := cache.latest(key).Stats.ReplicationLagSeconds + if lag > maxLag { + maxLag = lag + } + } + + return maxLag +} + // ThreadFinished marks threadID as finished and redistributes the thread's // rate allotment across the other threads. // After ThreadFinished() is called, Throttle() must not be called anymore. diff --git a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go index 3ffb3a78a1a..aeb75d258a3 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go @@ -12,6 +12,7 @@ import ( discovery "vitess.io/vitess/go/vt/discovery" throttlerdata "vitess.io/vitess/go/vt/proto/throttlerdata" + topodata "vitess.io/vitess/go/vt/proto/topodata" ) // MockThrottlerInterface is a mock of ThrottlerInterface interface. @@ -63,6 +64,20 @@ func (mr *MockThrottlerInterfaceMockRecorder) GetConfiguration() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConfiguration", reflect.TypeOf((*MockThrottlerInterface)(nil).GetConfiguration)) } +// MaxLag mocks base method. +func (m *MockThrottlerInterface) MaxLag(tabletType topodata.TabletType) uint32 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MaxLag", tabletType) + ret0, _ := ret[0].(uint32) + return ret0 +} + +// MaxLag indicates an expected call of LastMaxLagNotIgnoredForTabletType. +func (mr *MockThrottlerInterfaceMockRecorder) MaxLag(tabletType interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxLag", reflect.TypeOf((*MockThrottlerInterface)(nil).MaxLag), tabletType) +} + // MaxRate mocks base method. func (m *MockThrottlerInterface) MaxRate() int64 { m.ctrl.T.Helper() diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index bc5235593ac..b3ffec83014 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -22,6 +22,7 @@ import ( "math/rand" "strings" "sync" + "sync/atomic" "time" "google.golang.org/protobuf/encoding/prototext" @@ -89,6 +90,7 @@ type ThrottlerInterface interface { GetConfiguration() *throttlerdatapb.Configuration UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error ResetConfiguration() + MaxLag(tabletType topodatapb.TabletType) uint32 } // TopologyWatcherInterface defines the public interface that is implemented by @@ -177,7 +179,20 @@ type txThrottlerState struct { stopHealthCheck context.CancelFunc healthCheck discovery.HealthCheck +<<<<<<< HEAD topologyWatchers []TopologyWatcherInterface +======= + healthCheckChan chan *discovery.TabletHealth + healthCheckCells []string + cellsFromTopo bool + + // tabletTypes stores the tablet types for throttling + tabletTypes map[topodatapb.TabletType]bool + + maxLag int64 + done chan bool + waitForTermination sync.WaitGroup +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) } // NewTxThrottler tries to construct a txThrottler from the @@ -290,7 +305,7 @@ func (t *txThrottler) Throttle(priority int) (result bool) { // Throttle according to both what the throttler state says and the priority. Workloads with lower priority value // are less likely to be throttled. - result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority + result = rand.Intn(sqlparser.MaxPriorityValue) < priority && t.state.throttle() t.requestsTotal.Add(1) if result { @@ -323,6 +338,7 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar } createTxThrottlerHealthCheck(topoServer, config, result, target.Cell) +<<<<<<< HEAD result.topologyWatchers = make( []TopologyWatcherInterface, 0, len(config.healthCheckCells)) for _, cell := range config.healthCheckCells { @@ -336,12 +352,30 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar target.Shard, discovery.DefaultTopologyWatcherRefreshInterval, discovery.DefaultTopoReadConcurrency)) +======= + state := &txThrottlerStateImpl{ + config: config, + healthCheckCells: config.TxThrottlerHealthCheckCells, + tabletTypes: tabletTypes, + throttler: t, + txThrottler: txThrottler, + done: make(chan bool, 1), + } + + // get cells from topo if none defined in tabletenv config + if len(state.healthCheckCells) == 0 { + ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + state.healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target) + state.cellsFromTopo = true +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) } return result, nil } func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerConfig, result *txThrottlerState, cell string) { ctx, cancel := context.WithCancel(context.Background()) +<<<<<<< HEAD result.stopHealthCheck = cancel result.healthCheck = healthCheckFactory(topoServer, cell, config.healthCheckCells) ch := result.healthCheck.Subscribe() @@ -353,6 +387,59 @@ func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerCo case th := <-ch: result.StatsUpdate(th) } +======= + state.stopHealthCheck = cancel + state.initHealthCheckStream(txThrottler.topoServer, target) + go state.healthChecksProcessor(ctx, txThrottler.topoServer, target) + state.waitForTermination.Add(1) + go state.updateMaxLag() + + return state, nil +} + +func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) { + ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells) + ts.healthCheckChan = ts.healthCheck.Subscribe() + +} + +func (ts *txThrottlerStateImpl) closeHealthCheckStream() { + if ts.healthCheck == nil { + return + } + ts.stopHealthCheck() + ts.healthCheck.Close() +} + +func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { + fetchCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cancel() + + knownCells := fetchKnownCells(fetchCtx, topoServer, target) + if !reflect.DeepEqual(knownCells, ts.healthCheckCells) { + log.Info("txThrottler: restarting healthcheck stream due to topology cells update") + ts.healthCheckCells = knownCells + ts.closeHealthCheckStream() + ts.initHealthCheckStream(topoServer, target) + } +} + +func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { + var cellsUpdateTicks <-chan time.Time + if ts.cellsFromTopo { + ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval) + cellsUpdateTicks = ticker.C + defer ticker.Stop() + } + for { + select { + case <-ctx.Done(): + return + case <-cellsUpdateTicks: + ts.updateHealthCheckCells(ctx, topoServer, target) + case th := <-ts.healthCheckChan: + ts.StatsUpdate(th) +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) } }(ctx) } @@ -365,7 +452,35 @@ func (ts *txThrottlerState) throttle() bool { // Serialize calls to ts.throttle.Throttle() ts.throttleMu.Lock() defer ts.throttleMu.Unlock() - return ts.throttler.Throttle(0 /* threadId */) > 0 + + maxLag := atomic.LoadInt64(&ts.maxLag) + + return maxLag > ts.config.TxThrottlerConfig.TargetReplicationLagSec && + ts.throttler.Throttle(0 /* threadId */) > 0 +} + +func (ts *txThrottlerStateImpl) updateMaxLag() { + defer ts.waitForTermination.Done() + // We use half of the target lag to ensure we have enough resolution to see changes in lag below that value + ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second) + defer ticker.Stop() +outerloop: + for { + select { + case <-ticker.C: + var maxLag uint32 + + for tabletType := range ts.tabletTypes { + maxLagPerTabletType := ts.throttler.MaxLag(tabletType) + if maxLagPerTabletType > maxLag { + maxLag = maxLagPerTabletType + } + } + atomic.StoreInt64(&ts.maxLag, int64(maxLag)) + case <-ts.done: + break outerloop + } + } } func (ts *txThrottlerState) deallocateResources() { @@ -380,7 +495,13 @@ func (ts *txThrottlerState) deallocateResources() { ts.healthCheck.Close() ts.healthCheck = nil +<<<<<<< HEAD // After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not +======= + ts.done <- true + ts.waitForTermination.Wait() + // After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) // to be executing, so we can safely close the throttler. ts.throttler.Close() ts.throttler = nil diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 4e95ebe7097..5c9b8c0fe97 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -22,6 +22,11 @@ package txthrottler //go:generate mockgen -destination mock_topology_watcher_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler TopologyWatcherInterface import ( +<<<<<<< HEAD +======= + "context" + "sync/atomic" +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) "testing" "time" @@ -50,7 +55,11 @@ func TestDisabledThrottler(t *testing.T) { Shard: "shard", }) assert.Nil(t, throttler.Open()) +<<<<<<< HEAD assert.False(t, throttler.Throttle(0)) +======= + assert.False(t, throttler.Throttle(0, "some-workload")) +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) throttlerImpl, _ := throttler.(*txThrottler) assert.Zero(t, throttlerImpl.throttlerRunning.Get()) throttler.Close() @@ -88,27 +97,44 @@ func TestEnabledThrottler(t *testing.T) { return mockThrottler, nil } - call0 := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */) - call1 := mockThrottler.EXPECT().Throttle(0) - call1.Return(0 * time.Second) + var calls []*gomock.Call + + call := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */) + calls = append(calls, call) + + // 1 + call = mockThrottler.EXPECT().Throttle(0) + call.Return(0 * time.Second) + calls = append(calls, call) + tabletStats := &discovery.TabletHealth{ Target: &querypb.Target{ TabletType: topodatapb.TabletType_REPLICA, }, } - call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats) - call3 := mockThrottler.EXPECT().Throttle(0) - call3.Return(1 * time.Second) - call4 := mockThrottler.EXPECT().Throttle(0) - call4.Return(1 * time.Second) - calllast := mockThrottler.EXPECT().Close() + call = mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats) + calls = append(calls, call) + + // 2 + call = mockThrottler.EXPECT().Throttle(0) + call.Return(1 * time.Second) + calls = append(calls, call) + + // 3 + // Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first + // whether the priority allows for throttling or not, so no need to mock calls in mockThrottler.Throttle() - call1.After(call0) - call2.After(call1) - call3.After(call2) - call4.After(call3) - calllast.After(call4) + // 4 + // Nothing gets mocked here because the order of evaluation in txThrottlerStateImpl.Throttle() evaluates first + // whether there is lag or not, so no call to the underlying mockThrottler is issued. + + call = mockThrottler.EXPECT().Close() + calls = append(calls, call) + + for i := 1; i < len(calls); i++ { + calls[i].After(calls[i-1]) + } config := tabletenv.NewDefaultConfig() config.EnableTxThrottler = true @@ -125,16 +151,39 @@ func TestEnabledThrottler(t *testing.T) { assert.Nil(t, throttler.Open()) assert.Equal(t, int64(1), throttler.throttlerRunning.Get()) +<<<<<<< HEAD assert.False(t, throttler.Throttle(100)) assert.Equal(t, int64(1), throttler.requestsTotal.Get()) assert.Zero(t, throttler.requestsThrottled.Get()) throttler.state.StatsUpdate(tabletStats) // This calls replication lag thing +======= + assert.Nil(t, throttlerImpl.Open()) + throttlerStateImpl, ok := throttlerImpl.state.(*txThrottlerStateImpl) + assert.True(t, ok) + assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes) + assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) + + // Stop the go routine that keeps updating the cached shard's max lag to prevent it from changing the value in a + // way that will interfere with how we manipulate that value in our tests to evaluate different cases: + throttlerStateImpl.done <- true + + // 1 should not throttle due to return value of underlying Throttle(), despite high lag + atomic.StoreInt64(&throttlerStateImpl.maxLag, 20) + assert.False(t, throttlerImpl.Throttle(100, "some-workload")) + assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Counts()["some-workload"]) + assert.Zero(t, throttlerImpl.requestsThrottled.Counts()["some-workload"]) + + throttlerImpl.state.StatsUpdate(tabletStats) // This calls replication lag thing + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksReadTotal.Counts()) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts()) +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) rdonlyTabletStats := &discovery.TabletHealth{ Target: &querypb.Target{ TabletType: topodatapb.TabletType_RDONLY, }, } +<<<<<<< HEAD // This call should not be forwarded to the go/vt/throttler.Throttler object. throttler.state.StatsUpdate(rdonlyTabletStats) // The second throttle call should reject. @@ -148,6 +197,31 @@ func TestEnabledThrottler(t *testing.T) { assert.Equal(t, int64(1), throttler.requestsThrottled.Get()) throttler.Close() assert.Zero(t, throttler.throttlerRunning.Get()) +======= + // This call should not be forwarded to the go/vt/throttlerImpl.Throttler object. + throttlerImpl.state.StatsUpdate(rdonlyTabletStats) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1, "cell2.RDONLY": 1}, throttlerImpl.healthChecksReadTotal.Counts()) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts()) + + // 2 should throttle due to return value of underlying Throttle(), high lag & priority = 100 + assert.True(t, throttlerImpl.Throttle(100, "some-workload")) + assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Counts()["some-workload"]) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"]) + + // 3 should not throttle despite return value of underlying Throttle() and high lag, due to priority = 0 + assert.False(t, throttlerImpl.Throttle(0, "some-workload")) + assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Counts()["some-workload"]) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"]) + + // 4 should not throttle despite return value of underlying Throttle() and priority = 100, due to low lag + atomic.StoreInt64(&throttlerStateImpl.maxLag, 1) + assert.False(t, throttler.Throttle(100, "some-workload")) + assert.Equal(t, int64(4), throttlerImpl.requestsTotal.Counts()["some-workload"]) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"]) + + throttler.Close() + assert.Zero(t, throttlerImpl.throttlerRunning.Get()) +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) } func TestNewTxThrottler(t *testing.T) { From 17f44c21815d295a4fb8d812ccac3c7df5db1e20 Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Fri, 9 Feb 2024 05:11:32 +0000 Subject: [PATCH 2/5] Cherry-pick 2b25639f250d929321416ae4824889f3f9d81c51 with conflicts --- .../tabletserver/txthrottler/tx_throttler.go | 36 +++++++++++++++++++ .../txthrottler/tx_throttler_test.go | 25 +++++++++++++ 2 files changed, 61 insertions(+) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index b3ffec83014..08bf7c227b6 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -91,6 +91,7 @@ type ThrottlerInterface interface { UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error ResetConfiguration() MaxLag(tabletType topodatapb.TabletType) uint32 +<<<<<<< HEAD } // TopologyWatcherInterface defines the public interface that is implemented by @@ -99,6 +100,8 @@ type ThrottlerInterface interface { type TopologyWatcherInterface interface { Start() Stop() +======= +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) } // TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with @@ -192,6 +195,9 @@ type txThrottlerState struct { maxLag int64 done chan bool waitForTermination sync.WaitGroup +<<<<<<< HEAD +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) +======= >>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) } @@ -457,6 +463,33 @@ func (ts *txThrottlerState) throttle() bool { return maxLag > ts.config.TxThrottlerConfig.TargetReplicationLagSec && ts.throttler.Throttle(0 /* threadId */) > 0 +<<<<<<< HEAD +======= +} + +func (ts *txThrottlerStateImpl) updateMaxLag() { + defer ts.waitForTermination.Done() + // We use half of the target lag to ensure we have enough resolution to see changes in lag below that value + ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second) + defer ticker.Stop() +outerloop: + for { + select { + case <-ticker.C: + var maxLag uint32 + + for tabletType := range ts.tabletTypes { + maxLagPerTabletType := ts.throttler.MaxLag(tabletType) + if maxLagPerTabletType > maxLag { + maxLag = maxLagPerTabletType + } + } + atomic.StoreInt64(&ts.maxLag, int64(maxLag)) + case <-ts.done: + break outerloop + } + } +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) } func (ts *txThrottlerStateImpl) updateMaxLag() { @@ -495,9 +528,12 @@ func (ts *txThrottlerState) deallocateResources() { ts.healthCheck.Close() ts.healthCheck = nil +<<<<<<< HEAD <<<<<<< HEAD // After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not ======= +======= +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) ts.done <- true ts.waitForTermination.Wait() // After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 5c9b8c0fe97..6251ee4749d 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -26,6 +26,9 @@ import ( ======= "context" "sync/atomic" +<<<<<<< HEAD +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) +======= >>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) "testing" "time" @@ -55,8 +58,12 @@ func TestDisabledThrottler(t *testing.T) { Shard: "shard", }) assert.Nil(t, throttler.Open()) +<<<<<<< HEAD <<<<<<< HEAD assert.False(t, throttler.Throttle(0)) +======= + assert.False(t, throttler.Throttle(0, "some-workload")) +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) ======= assert.False(t, throttler.Throttle(0, "some-workload")) >>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) @@ -120,6 +127,24 @@ func TestEnabledThrottler(t *testing.T) { call = mockThrottler.EXPECT().Throttle(0) call.Return(1 * time.Second) calls = append(calls, call) +<<<<<<< HEAD +======= + + // 3 + // Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first + // whether the priority allows for throttling or not, so no need to mock calls in mockThrottler.Throttle() + + // 4 + // Nothing gets mocked here because the order of evaluation in txThrottlerStateImpl.Throttle() evaluates first + // whether there is lag or not, so no call to the underlying mockThrottler is issued. + + call = mockThrottler.EXPECT().Close() + calls = append(calls, call) + + for i := 1; i < len(calls); i++ { + calls[i].After(calls[i-1]) + } +>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) // 3 // Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first From 1edee0a8ff484bd66d2447581ba5b5fd6b875901 Mon Sep 17 00:00:00 2001 From: "Eduardo J. Ortega U" <5791035+ejortegau@users.noreply.github.com> Date: Thu, 21 Mar 2024 15:48:50 +0100 Subject: [PATCH 3/5] Fix backport merge conflicts Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> --- .../tabletserver/txthrottler/tx_throttler.go | 135 ++---------------- .../txthrottler/tx_throttler_test.go | 103 +++---------- 2 files changed, 27 insertions(+), 211 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 08bf7c227b6..081feb1dcc7 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -24,6 +24,7 @@ import ( "sync" "sync/atomic" "time" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" @@ -91,7 +92,6 @@ type ThrottlerInterface interface { UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error ResetConfiguration() MaxLag(tabletType topodatapb.TabletType) uint32 -<<<<<<< HEAD } // TopologyWatcherInterface defines the public interface that is implemented by @@ -100,8 +100,6 @@ type ThrottlerInterface interface { type TopologyWatcherInterface interface { Start() Stop() -======= ->>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) } // TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with @@ -182,23 +180,11 @@ type txThrottlerState struct { stopHealthCheck context.CancelFunc healthCheck discovery.HealthCheck -<<<<<<< HEAD topologyWatchers []TopologyWatcherInterface -======= - healthCheckChan chan *discovery.TabletHealth - healthCheckCells []string - cellsFromTopo bool - - // tabletTypes stores the tablet types for throttling - tabletTypes map[topodatapb.TabletType]bool maxLag int64 done chan bool waitForTermination sync.WaitGroup -<<<<<<< HEAD ->>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) -======= ->>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) } // NewTxThrottler tries to construct a txThrottler from the @@ -341,10 +327,10 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar result := &txThrottlerState{ config: config, throttler: t, + done: make(chan bool, 2), } createTxThrottlerHealthCheck(topoServer, config, result, target.Cell) -<<<<<<< HEAD result.topologyWatchers = make( []TopologyWatcherInterface, 0, len(config.healthCheckCells)) for _, cell := range config.healthCheckCells { @@ -358,30 +344,12 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar target.Shard, discovery.DefaultTopologyWatcherRefreshInterval, discovery.DefaultTopoReadConcurrency)) -======= - state := &txThrottlerStateImpl{ - config: config, - healthCheckCells: config.TxThrottlerHealthCheckCells, - tabletTypes: tabletTypes, - throttler: t, - txThrottler: txThrottler, - done: make(chan bool, 1), - } - - // get cells from topo if none defined in tabletenv config - if len(state.healthCheckCells) == 0 { - ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer cancel() - state.healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target) - state.cellsFromTopo = true ->>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) } return result, nil } func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerConfig, result *txThrottlerState, cell string) { ctx, cancel := context.WithCancel(context.Background()) -<<<<<<< HEAD result.stopHealthCheck = cancel result.healthCheck = healthCheckFactory(topoServer, cell, config.healthCheckCells) ch := result.healthCheck.Subscribe() @@ -393,59 +361,8 @@ func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerCo case th := <-ch: result.StatsUpdate(th) } -======= - state.stopHealthCheck = cancel - state.initHealthCheckStream(txThrottler.topoServer, target) - go state.healthChecksProcessor(ctx, txThrottler.topoServer, target) - state.waitForTermination.Add(1) - go state.updateMaxLag() - - return state, nil -} - -func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) { - ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells) - ts.healthCheckChan = ts.healthCheck.Subscribe() - -} - -func (ts *txThrottlerStateImpl) closeHealthCheckStream() { - if ts.healthCheck == nil { - return - } - ts.stopHealthCheck() - ts.healthCheck.Close() -} - -func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { - fetchCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) - defer cancel() - - knownCells := fetchKnownCells(fetchCtx, topoServer, target) - if !reflect.DeepEqual(knownCells, ts.healthCheckCells) { - log.Info("txThrottler: restarting healthcheck stream due to topology cells update") - ts.healthCheckCells = knownCells - ts.closeHealthCheckStream() - ts.initHealthCheckStream(topoServer, target) - } -} - -func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { - var cellsUpdateTicks <-chan time.Time - if ts.cellsFromTopo { - ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval) - cellsUpdateTicks = ticker.C - defer ticker.Stop() - } - for { - select { - case <-ctx.Done(): - return - case <-cellsUpdateTicks: - ts.updateHealthCheckCells(ctx, topoServer, target) - case th := <-ts.healthCheckChan: - ts.StatsUpdate(th) ->>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) + result.waitForTermination.Add(1) + go result.updateMaxLag() } }(ctx) } @@ -461,16 +378,14 @@ func (ts *txThrottlerState) throttle() bool { maxLag := atomic.LoadInt64(&ts.maxLag) - return maxLag > ts.config.TxThrottlerConfig.TargetReplicationLagSec && + return maxLag > ts.config.throttlerConfig.TargetReplicationLagSec && ts.throttler.Throttle(0 /* threadId */) > 0 -<<<<<<< HEAD -======= } -func (ts *txThrottlerStateImpl) updateMaxLag() { +func (ts *txThrottlerState) updateMaxLag() { defer ts.waitForTermination.Done() // We use half of the target lag to ensure we have enough resolution to see changes in lag below that value - ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second) + ticker := time.NewTicker(time.Duration(ts.config.throttlerConfig.TargetReplicationLagSec/2) * time.Second) defer ticker.Stop() outerloop: for { @@ -478,32 +393,7 @@ outerloop: case <-ticker.C: var maxLag uint32 - for tabletType := range ts.tabletTypes { - maxLagPerTabletType := ts.throttler.MaxLag(tabletType) - if maxLagPerTabletType > maxLag { - maxLag = maxLagPerTabletType - } - } - atomic.StoreInt64(&ts.maxLag, int64(maxLag)) - case <-ts.done: - break outerloop - } - } ->>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) -} - -func (ts *txThrottlerStateImpl) updateMaxLag() { - defer ts.waitForTermination.Done() - // We use half of the target lag to ensure we have enough resolution to see changes in lag below that value - ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second) - defer ticker.Stop() -outerloop: - for { - select { - case <-ticker.C: - var maxLag uint32 - - for tabletType := range ts.tabletTypes { + for _, tabletType := range *ts.config.tabletTypes { maxLagPerTabletType := ts.throttler.MaxLag(tabletType) if maxLagPerTabletType > maxLag { maxLag = maxLagPerTabletType @@ -528,16 +418,9 @@ func (ts *txThrottlerState) deallocateResources() { ts.healthCheck.Close() ts.healthCheck = nil -<<<<<<< HEAD -<<<<<<< HEAD - // After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not -======= -======= ->>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) ts.done <- true ts.waitForTermination.Wait() - // After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not ->>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) + // After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not // to be executing, so we can safely close the throttler. ts.throttler.Close() ts.throttler = nil diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 6251ee4749d..6d42869befa 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -22,14 +22,7 @@ package txthrottler //go:generate mockgen -destination mock_topology_watcher_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler TopologyWatcherInterface import ( -<<<<<<< HEAD -======= - "context" "sync/atomic" -<<<<<<< HEAD ->>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) -======= ->>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) "testing" "time" @@ -58,15 +51,7 @@ func TestDisabledThrottler(t *testing.T) { Shard: "shard", }) assert.Nil(t, throttler.Open()) -<<<<<<< HEAD -<<<<<<< HEAD assert.False(t, throttler.Throttle(0)) -======= - assert.False(t, throttler.Throttle(0, "some-workload")) ->>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) -======= - assert.False(t, throttler.Throttle(0, "some-workload")) ->>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) throttlerImpl, _ := throttler.(*txThrottler) assert.Zero(t, throttlerImpl.throttlerRunning.Get()) throttler.Close() @@ -127,24 +112,6 @@ func TestEnabledThrottler(t *testing.T) { call = mockThrottler.EXPECT().Throttle(0) call.Return(1 * time.Second) calls = append(calls, call) -<<<<<<< HEAD -======= - - // 3 - // Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first - // whether the priority allows for throttling or not, so no need to mock calls in mockThrottler.Throttle() - - // 4 - // Nothing gets mocked here because the order of evaluation in txThrottlerStateImpl.Throttle() evaluates first - // whether there is lag or not, so no call to the underlying mockThrottler is issued. - - call = mockThrottler.EXPECT().Close() - calls = append(calls, call) - - for i := 1; i < len(calls); i++ { - calls[i].After(calls[i-1]) - } ->>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) // 3 // Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first @@ -167,86 +134,52 @@ func TestEnabledThrottler(t *testing.T) { config.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA} env := tabletenv.NewEnv(config, t.Name()) - throttler, err := tryCreateTxThrottler(env, ts) + throttlerImpl, err := tryCreateTxThrottler(env, ts) assert.Nil(t, err) - throttler.InitDBConfig(&querypb.Target{ + throttlerImpl.InitDBConfig(&querypb.Target{ Keyspace: "keyspace", Shard: "shard", }) - assert.Nil(t, throttler.Open()) - assert.Equal(t, int64(1), throttler.throttlerRunning.Get()) - -<<<<<<< HEAD - assert.False(t, throttler.Throttle(100)) - assert.Equal(t, int64(1), throttler.requestsTotal.Get()) - assert.Zero(t, throttler.requestsThrottled.Get()) - - throttler.state.StatsUpdate(tabletStats) // This calls replication lag thing -======= assert.Nil(t, throttlerImpl.Open()) - throttlerStateImpl, ok := throttlerImpl.state.(*txThrottlerStateImpl) - assert.True(t, ok) - assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes) assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) // Stop the go routine that keeps updating the cached shard's max lag to prevent it from changing the value in a // way that will interfere with how we manipulate that value in our tests to evaluate different cases: - throttlerStateImpl.done <- true + throttlerImpl.state.done <- true // 1 should not throttle due to return value of underlying Throttle(), despite high lag - atomic.StoreInt64(&throttlerStateImpl.maxLag, 20) - assert.False(t, throttlerImpl.Throttle(100, "some-workload")) - assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Counts()["some-workload"]) - assert.Zero(t, throttlerImpl.requestsThrottled.Counts()["some-workload"]) + atomic.StoreInt64(&throttlerImpl.state.maxLag, 20) + assert.False(t, throttlerImpl.Throttle(100)) + assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Get()) + assert.Zero(t, throttlerImpl.requestsThrottled.Get()) throttlerImpl.state.StatsUpdate(tabletStats) // This calls replication lag thing - assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksReadTotal.Counts()) - assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts()) ->>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) rdonlyTabletStats := &discovery.TabletHealth{ Target: &querypb.Target{ TabletType: topodatapb.TabletType_RDONLY, }, } -<<<<<<< HEAD // This call should not be forwarded to the go/vt/throttler.Throttler object. - throttler.state.StatsUpdate(rdonlyTabletStats) - // The second throttle call should reject. - assert.True(t, throttler.Throttle(100)) - assert.Equal(t, int64(2), throttler.requestsTotal.Get()) - assert.Equal(t, int64(1), throttler.requestsThrottled.Get()) - - // This call should not throttle due to priority. Check that's the case and counters agree. - assert.False(t, throttler.Throttle(0)) - assert.Equal(t, int64(3), throttler.requestsTotal.Get()) - assert.Equal(t, int64(1), throttler.requestsThrottled.Get()) - throttler.Close() - assert.Zero(t, throttler.throttlerRunning.Get()) -======= - // This call should not be forwarded to the go/vt/throttlerImpl.Throttler object. throttlerImpl.state.StatsUpdate(rdonlyTabletStats) - assert.Equal(t, map[string]int64{"cell1.REPLICA": 1, "cell2.RDONLY": 1}, throttlerImpl.healthChecksReadTotal.Counts()) - assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts()) // 2 should throttle due to return value of underlying Throttle(), high lag & priority = 100 - assert.True(t, throttlerImpl.Throttle(100, "some-workload")) - assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Counts()["some-workload"]) - assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"]) + assert.True(t, throttlerImpl.Throttle(100)) + assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Get()) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get()) // 3 should not throttle despite return value of underlying Throttle() and high lag, due to priority = 0 - assert.False(t, throttlerImpl.Throttle(0, "some-workload")) - assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Counts()["some-workload"]) - assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"]) + assert.False(t, throttlerImpl.Throttle(0)) + assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Get()) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get()) // 4 should not throttle despite return value of underlying Throttle() and priority = 100, due to low lag - atomic.StoreInt64(&throttlerStateImpl.maxLag, 1) - assert.False(t, throttler.Throttle(100, "some-workload")) - assert.Equal(t, int64(4), throttlerImpl.requestsTotal.Counts()["some-workload"]) - assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"]) + atomic.StoreInt64(&throttlerImpl.state.maxLag, 1) + assert.False(t, throttlerImpl.Throttle(100)) + assert.Equal(t, int64(4), throttlerImpl.requestsTotal.Get()) + assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get()) - throttler.Close() + throttlerImpl.Close() assert.Zero(t, throttlerImpl.throttlerRunning.Get()) ->>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789)) } func TestNewTxThrottler(t *testing.T) { From d6979100ad77298e84cdc53a7d2bd7da18b797c3 Mon Sep 17 00:00:00 2001 From: "Eduardo J. Ortega U" <5791035+ejortegau@users.noreply.github.com> Date: Thu, 21 Mar 2024 15:52:00 +0100 Subject: [PATCH 4/5] Empyt commit to trigger CI Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> From 03cc61e477a40d0ecdf862693a9b57218310a137 Mon Sep 17 00:00:00 2001 From: "Eduardo J. Ortega U" <5791035+ejortegau@users.noreply.github.com> Date: Thu, 21 Mar 2024 16:02:13 +0100 Subject: [PATCH 5/5] Fix linter errors Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> --- go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 081feb1dcc7..82cb074b3d7 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -24,6 +24,7 @@ import ( "sync" "sync/atomic" "time" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "google.golang.org/protobuf/encoding/prototext"