Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slack vitess r14.0.5 dsdefense throttle only if lag 2 #172

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cyphar/filepath-securejoin v0.2.3 h1:YX6ebbZCZP7VkM3scTTokDgBL2TY741X51MTk3ycuNI=
github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=
github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg=
github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=
Expand Down
24 changes: 24 additions & 0 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"sync"
"time"

"vitess.io/vitess/go/vt/proto/topodata"

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"

Expand Down Expand Up @@ -224,6 +226,28 @@ func (t *Throttler) Throttle(threadID int) time.Duration {
return t.threadThrottlers[threadID].throttle(t.nowFunc())
}

// MaxLag returns the max of all the last replication lag values seen across all tablets of
// the provided type, excluding ignored tablets.
func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 {
cache := t.maxReplicationLagModule.lagCacheByType(tabletType)

var maxLag uint32
cacheEntries := cache.entries

for key := range cacheEntries {
if cache.isIgnored(key) {
continue
}

lag := cache.latest(key).Stats.ReplicationLagSeconds
if lag > maxLag {
maxLag = lag
}
}

return maxLag
}

// ThreadFinished marks threadID as finished and redistributes the thread's
// rate allotment across the other threads.
// After ThreadFinished() is called, Throttle() must not be called anymore.
Expand Down
57 changes: 36 additions & 21 deletions go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 42 additions & 2 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"

"google.golang.org/protobuf/encoding/prototext"
Expand Down Expand Up @@ -86,6 +87,7 @@ type ThrottlerInterface interface {
GetConfiguration() *throttlerdatapb.Configuration
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
ResetConfiguration()
MaxLag(tabletType topodatapb.TabletType) uint32
}

// TopologyWatcherInterface defines the public interface that is implemented by
Expand Down Expand Up @@ -184,6 +186,10 @@ type txThrottlerStateImpl struct {

healthCheck discovery.LegacyHealthCheck
topologyWatchers []TopologyWatcherInterface

maxLag int64
done chan bool
waitForTermination sync.WaitGroup
}

// NewTxThrottler tries to construct a txThrottler from the
Expand Down Expand Up @@ -301,7 +307,7 @@ func (t *txThrottler) Throttle(priority int, workload string) (result bool) {

// Throttle according to both what the throttler state says and the priority. Workloads with lower priority value
// are less likely to be throttled.
result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority
result = rand.Intn(sqlparser.MaxPriorityValue) < priority && t.state.throttle()

t.requestsTotal.Add(workload, 1)
if result {
Expand Down Expand Up @@ -331,6 +337,7 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
result := &txThrottlerStateImpl{
config: config,
throttler: t,
done: make(chan bool, 1),
}
result.healthCheck = healthCheckFactory()
result.healthCheck.SetListener(result, false /* sendDownEvents */)
Expand All @@ -348,6 +355,10 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency))
}

result.waitForTermination.Add(1)
go result.updateMaxLag()

return result, nil
}

Expand All @@ -359,7 +370,34 @@ func (ts *txThrottlerStateImpl) throttle() bool {
// Serialize calls to ts.throttle.Throttle()
ts.throttleMu.Lock()
defer ts.throttleMu.Unlock()
return ts.throttler.Throttle(0 /* threadId */) > 0

maxLag := atomic.LoadInt64(&ts.maxLag)

return maxLag > ts.config.throttlerConfig.TargetReplicationLagSec &&
ts.throttler.Throttle(0 /* threadId */) > 0
}

func (ts *txThrottlerStateImpl) updateMaxLag() {
defer ts.waitForTermination.Done()
// We use half of the target lag to ensure we have enough resolution to see changes in lag below that value
ticker := time.NewTicker(time.Duration(ts.config.throttlerConfig.TargetReplicationLagSec/2) * time.Second)
outerloop:
for {
select {
case <-ticker.C:
var maxLag uint32

for _, tabletType := range ts.config.tabletTypes {
maxLagPerTabletType := ts.throttler.MaxLag(tabletType)
if maxLagPerTabletType > maxLag {
maxLag = maxLagPerTabletType
}
}
atomic.StoreInt64(&ts.maxLag, int64(maxLag))
case <-ts.done:
break outerloop
}
}
}

func (ts *txThrottlerStateImpl) deallocateResources() {
Expand All @@ -374,6 +412,8 @@ func (ts *txThrottlerStateImpl) deallocateResources() {
ts.healthCheck.Close()
ts.healthCheck = nil

ts.done <- true
ts.waitForTermination.Wait()
// After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not
// to be executing, so we can safely close the throttler.
ts.throttler.Close()
Expand Down
67 changes: 51 additions & 16 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package txthrottler
//go:generate mockgen -destination mock_topology_watcher_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler TopologyWatcherInterface

import (
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -84,32 +85,50 @@ func TestEnabledThrottler(t *testing.T) {
}

mockThrottler := NewMockThrottlerInterface(mockCtrl)

throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
assert.Equal(t, 1, threadCount)
return mockThrottler, nil
}

call0 := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */)
call1 := mockThrottler.EXPECT().Throttle(0)
call1.Return(0 * time.Second)
var calls []*gomock.Call

call := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */)
calls = append(calls, call)

// 1
call = mockThrottler.EXPECT().Throttle(0)
call.Return(0 * time.Second)
calls = append(calls, call)

tabletStats := &discovery.LegacyTabletStats{
Target: &querypb.Target{
TabletType: topodatapb.TabletType_REPLICA,
},
}
call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats)
call3 := mockThrottler.EXPECT().Throttle(0)
call3.Return(1 * time.Second)

call4 := mockThrottler.EXPECT().Throttle(0)
call4.Return(1 * time.Second)
calllast := mockThrottler.EXPECT().Close()
call = mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats)
calls = append(calls, call)

call1.After(call0)
call2.After(call1)
call3.After(call2)
call4.After(call3)
calllast.After(call4)
// 2
call = mockThrottler.EXPECT().Throttle(0)
call.Return(1 * time.Second)
calls = append(calls, call)

// 3
// Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first
// whether the priority allows for throttling or not, so no need to mock calls in mockThrottler.Throttle()

// 4
// Nothing gets mocked here because the order of evaluation in txThrottlerStateImpl.Throttle() evaluates first
// whether there is lag or not, so no call to the underlying mockThrottler is issued.

call = mockThrottler.EXPECT().Close()
calls = append(calls, call)

for i := 1; i < len(calls); i++ {
calls[i].After(calls[i-1])
}

config := tabletenv.NewDefaultConfig()
config.EnableTxThrottler = true
Expand All @@ -126,6 +145,14 @@ func TestEnabledThrottler(t *testing.T) {
assert.Nil(t, throttler.Open())
assert.Equal(t, int64(1), throttler.throttlerRunning.Get())

throttlerImpl, ok := throttler.state.(*txThrottlerStateImpl)
assert.True(t, ok)
// Stop the go routine that keeps updating the cached shard's max lag to preventi it from changing the value in a
// way that will interfere with how we manipulate that value in our tests to evaluate different cases:
throttlerImpl.done <- true

// 1 should not throttle due to return value of underlying Throttle(), despite high lag
atomic.StoreInt64(&throttlerImpl.maxLag, 20)
assert.False(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(1), throttler.requestsTotal.Counts()["some-workload"])
assert.Zero(t, throttler.requestsThrottled.Counts()["some-workload"])
Expand All @@ -138,15 +165,23 @@ func TestEnabledThrottler(t *testing.T) {
}
// This call should not be forwarded to the go/vt/throttler.Throttler object.
hcListener.StatsUpdate(rdonlyTabletStats)
// The second throttle call should reject.

// 2 should throttle due to return value of underlying Throttle(), high lag & priority = 100
assert.True(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(2), throttler.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])

// This call should not throttle due to priority. Check that's the case and counters agree.
// 3 should not throttle despite return value of underlying Throttle() and high lag, due to priority = 0
assert.False(t, throttler.Throttle(0, "some-workload"))
assert.Equal(t, int64(3), throttler.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])

// 4 should not throttle despite return value of underlying Throttle() and priority = 100, due to low lag
atomic.StoreInt64(&throttlerImpl.maxLag, 1)
assert.False(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(4), throttler.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])

throttler.Close()
assert.Zero(t, throttler.throttlerRunning.Get())
}
Expand Down
Loading