Skip to content

Commit

Permalink
[release-17.0] TxThrottler: dont throttle unless lag (#14789) (#15189)
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo J. Ortega U <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Eduardo J. Ortega U <[email protected]>
  • Loading branch information
vitess-bot[bot] and ejortegau authored Mar 22, 2024
1 parent 1c0d866 commit a19be92
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 35 deletions.
23 changes: 23 additions & 0 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/topodata"

throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata"
)
Expand Down Expand Up @@ -224,6 +225,28 @@ func (t *Throttler) Throttle(threadID int) time.Duration {
return t.threadThrottlers[threadID].throttle(t.nowFunc())
}

// MaxLag returns the max of all the last replication lag values seen across all tablets of
// the provided type, excluding ignored tablets.
func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 {
cache := t.maxReplicationLagModule.lagCacheByType(tabletType)

var maxLag uint32
cacheEntries := cache.entries

for key := range cacheEntries {
if cache.isIgnored(key) {
continue
}

lag := cache.latest(key).Stats.ReplicationLagSeconds
if lag > maxLag {
maxLag = lag
}
}

return maxLag
}

// ThreadFinished marks threadID as finished and redistributes the thread's
// rate allotment across the other threads.
// After ThreadFinished() is called, Throttle() must not be called anymore.
Expand Down
15 changes: 15 additions & 0 deletions go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 43 additions & 2 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ import (
"math/rand"
"strings"
"sync"
"sync/atomic"
"time"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -89,6 +92,7 @@ type ThrottlerInterface interface {
GetConfiguration() *throttlerdatapb.Configuration
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
ResetConfiguration()
MaxLag(tabletType topodatapb.TabletType) uint32
}

// TopologyWatcherInterface defines the public interface that is implemented by
Expand Down Expand Up @@ -178,6 +182,10 @@ type txThrottlerState struct {

healthCheck discovery.HealthCheck
topologyWatchers []TopologyWatcherInterface

maxLag int64
done chan bool
waitForTermination sync.WaitGroup
}

// NewTxThrottler tries to construct a txThrottler from the
Expand Down Expand Up @@ -290,7 +298,7 @@ func (t *txThrottler) Throttle(priority int) (result bool) {

// Throttle according to both what the throttler state says and the priority. Workloads with lower priority value
// are less likely to be throttled.
result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority
result = rand.Intn(sqlparser.MaxPriorityValue) < priority && t.state.throttle()

t.requestsTotal.Add(1)
if result {
Expand Down Expand Up @@ -320,6 +328,7 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
result := &txThrottlerState{
config: config,
throttler: t,
done: make(chan bool, 2),
}
createTxThrottlerHealthCheck(topoServer, config, result, target.Cell)

Expand Down Expand Up @@ -353,6 +362,8 @@ func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerCo
case th := <-ch:
result.StatsUpdate(th)
}
result.waitForTermination.Add(1)
go result.updateMaxLag()
}
}(ctx)
}
Expand All @@ -365,7 +376,35 @@ func (ts *txThrottlerState) throttle() bool {
// Serialize calls to ts.throttle.Throttle()
ts.throttleMu.Lock()
defer ts.throttleMu.Unlock()
return ts.throttler.Throttle(0 /* threadId */) > 0

maxLag := atomic.LoadInt64(&ts.maxLag)

return maxLag > ts.config.throttlerConfig.TargetReplicationLagSec &&
ts.throttler.Throttle(0 /* threadId */) > 0
}

func (ts *txThrottlerState) updateMaxLag() {
defer ts.waitForTermination.Done()
// We use half of the target lag to ensure we have enough resolution to see changes in lag below that value
ticker := time.NewTicker(time.Duration(ts.config.throttlerConfig.TargetReplicationLagSec/2) * time.Second)
defer ticker.Stop()
outerloop:
for {
select {
case <-ticker.C:
var maxLag uint32

for _, tabletType := range *ts.config.tabletTypes {
maxLagPerTabletType := ts.throttler.MaxLag(tabletType)
if maxLagPerTabletType > maxLag {
maxLag = maxLagPerTabletType
}
}
atomic.StoreInt64(&ts.maxLag, int64(maxLag))
case <-ts.done:
break outerloop
}
}
}

func (ts *txThrottlerState) deallocateResources() {
Expand All @@ -380,6 +419,8 @@ func (ts *txThrottlerState) deallocateResources() {
ts.healthCheck.Close()
ts.healthCheck = nil

ts.done <- true
ts.waitForTermination.Wait()
// After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not
// to be executing, so we can safely close the throttler.
ts.throttler.Close()
Expand Down
98 changes: 65 additions & 33 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package txthrottler
//go:generate mockgen -destination mock_topology_watcher_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler TopologyWatcherInterface

import (
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -88,66 +89,97 @@ func TestEnabledThrottler(t *testing.T) {
return mockThrottler, nil
}

call0 := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */)
call1 := mockThrottler.EXPECT().Throttle(0)
call1.Return(0 * time.Second)
var calls []*gomock.Call

call := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */)
calls = append(calls, call)

// 1
call = mockThrottler.EXPECT().Throttle(0)
call.Return(0 * time.Second)
calls = append(calls, call)

tabletStats := &discovery.TabletHealth{
Target: &querypb.Target{
TabletType: topodatapb.TabletType_REPLICA,
},
}
call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats)
call3 := mockThrottler.EXPECT().Throttle(0)
call3.Return(1 * time.Second)

call4 := mockThrottler.EXPECT().Throttle(0)
call4.Return(1 * time.Second)
calllast := mockThrottler.EXPECT().Close()
call = mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats)
calls = append(calls, call)

// 2
call = mockThrottler.EXPECT().Throttle(0)
call.Return(1 * time.Second)
calls = append(calls, call)

// 3
// Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first
// whether the priority allows for throttling or not, so no need to mock calls in mockThrottler.Throttle()

call1.After(call0)
call2.After(call1)
call3.After(call2)
call4.After(call3)
calllast.After(call4)
// 4
// Nothing gets mocked here because the order of evaluation in txThrottlerStateImpl.Throttle() evaluates first
// whether there is lag or not, so no call to the underlying mockThrottler is issued.

call = mockThrottler.EXPECT().Close()
calls = append(calls, call)

for i := 1; i < len(calls); i++ {
calls[i].After(calls[i-1])
}

config := tabletenv.NewDefaultConfig()
config.EnableTxThrottler = true
config.TxThrottlerHealthCheckCells = []string{"cell1", "cell2"}
config.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA}

env := tabletenv.NewEnv(config, t.Name())
throttler, err := tryCreateTxThrottler(env, ts)
throttlerImpl, err := tryCreateTxThrottler(env, ts)
assert.Nil(t, err)
throttler.InitDBConfig(&querypb.Target{
throttlerImpl.InitDBConfig(&querypb.Target{
Keyspace: "keyspace",
Shard: "shard",
})
assert.Nil(t, throttler.Open())
assert.Equal(t, int64(1), throttler.throttlerRunning.Get())
assert.Nil(t, throttlerImpl.Open())
assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get())

// Stop the go routine that keeps updating the cached shard's max lag to prevent it from changing the value in a
// way that will interfere with how we manipulate that value in our tests to evaluate different cases:
throttlerImpl.state.done <- true

assert.False(t, throttler.Throttle(100))
assert.Equal(t, int64(1), throttler.requestsTotal.Get())
assert.Zero(t, throttler.requestsThrottled.Get())
// 1 should not throttle due to return value of underlying Throttle(), despite high lag
atomic.StoreInt64(&throttlerImpl.state.maxLag, 20)
assert.False(t, throttlerImpl.Throttle(100))
assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Get())
assert.Zero(t, throttlerImpl.requestsThrottled.Get())

throttler.state.StatsUpdate(tabletStats) // This calls replication lag thing
throttlerImpl.state.StatsUpdate(tabletStats) // This calls replication lag thing
rdonlyTabletStats := &discovery.TabletHealth{
Target: &querypb.Target{
TabletType: topodatapb.TabletType_RDONLY,
},
}
// This call should not be forwarded to the go/vt/throttler.Throttler object.
throttler.state.StatsUpdate(rdonlyTabletStats)
// The second throttle call should reject.
assert.True(t, throttler.Throttle(100))
assert.Equal(t, int64(2), throttler.requestsTotal.Get())
assert.Equal(t, int64(1), throttler.requestsThrottled.Get())
throttlerImpl.state.StatsUpdate(rdonlyTabletStats)

// This call should not throttle due to priority. Check that's the case and counters agree.
assert.False(t, throttler.Throttle(0))
assert.Equal(t, int64(3), throttler.requestsTotal.Get())
assert.Equal(t, int64(1), throttler.requestsThrottled.Get())
throttler.Close()
assert.Zero(t, throttler.throttlerRunning.Get())
// 2 should throttle due to return value of underlying Throttle(), high lag & priority = 100
assert.True(t, throttlerImpl.Throttle(100))
assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Get())
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get())

// 3 should not throttle despite return value of underlying Throttle() and high lag, due to priority = 0
assert.False(t, throttlerImpl.Throttle(0))
assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Get())
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get())

// 4 should not throttle despite return value of underlying Throttle() and priority = 100, due to low lag
atomic.StoreInt64(&throttlerImpl.state.maxLag, 1)
assert.False(t, throttlerImpl.Throttle(100))
assert.Equal(t, int64(4), throttlerImpl.requestsTotal.Get())
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get())

throttlerImpl.Close()
assert.Zero(t, throttlerImpl.throttlerRunning.Get())
}

func TestNewTxThrottler(t *testing.T) {
Expand Down

0 comments on commit a19be92

Please sign in to comment.