Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slack vitess r14.0.5 dsdefense throttle only if lag 2 #172

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"sync"
"time"

"vitess.io/vitess/go/vt/proto/topodata"

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"

Expand Down Expand Up @@ -224,6 +226,28 @@ func (t *Throttler) Throttle(threadID int) time.Duration {
return t.threadThrottlers[threadID].throttle(t.nowFunc())
}

// LastMaxLagNotIgnoredForTabletType returns the max of all the last replication lag values seen across all tablets of
// the provided type, excluding ignored tablets.
func (t *Throttler) LastMaxLagNotIgnoredForTabletType(tabletType topodata.TabletType) uint32 {
cache := t.maxReplicationLagModule.lagCacheByType(tabletType)

var maxLag uint32
cacheEntries := cache.entries

for key := range cacheEntries {
if cache.isIgnored(key) {
continue
}

lag := cache.latest(key).Stats.ReplicationLagSeconds
if lag > maxLag {
maxLag = lag
}
}

return maxLag
}

// ThreadFinished marks threadID as finished and redistributes the thread's
// rate allotment across the other threads.
// After ThreadFinished() is called, Throttle() must not be called anymore.
Expand Down
57 changes: 36 additions & 21 deletions go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type ThrottlerInterface interface {
GetConfiguration() *throttlerdatapb.Configuration
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
ResetConfiguration()
LastMaxLagNotIgnoredForTabletType(tabletType topodatapb.TabletType) uint32
}

// TopologyWatcherInterface defines the public interface that is implemented by
Expand Down Expand Up @@ -359,7 +360,19 @@ func (ts *txThrottlerStateImpl) throttle() bool {
// Serialize calls to ts.throttle.Throttle()
ts.throttleMu.Lock()
defer ts.throttleMu.Unlock()
return ts.throttler.Throttle(0 /* threadId */) > 0

var maxLag uint32

for _, tabletType := range ts.config.tabletTypes {
maxLagPerTabletType := ts.throttler.LastMaxLagNotIgnoredForTabletType(tabletType)
if maxLagPerTabletType > maxLag {
maxLag = maxLagPerTabletType
}
}

return ts.throttler.Throttle(0 /* threadId */) > 0 &&
int64(maxLag) > ts.config.throttlerConfig.TargetReplicationLagSec

}

func (ts *txThrottlerStateImpl) deallocateResources() {
Expand Down
65 changes: 51 additions & 14 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,27 +89,58 @@ func TestEnabledThrottler(t *testing.T) {
return mockThrottler, nil
}

call0 := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */)
call1 := mockThrottler.EXPECT().Throttle(0)
call1.Return(0 * time.Second)
var calls []*gomock.Call

call := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */)
calls = append(calls, call)

call = mockThrottler.EXPECT().LastMaxLagNotIgnoredForTabletType(topodatapb.TabletType_REPLICA)
call.Return(uint32(20))
calls = append(calls, call)

call = mockThrottler.EXPECT().Throttle(0)
call.Return(0 * time.Second)
calls = append(calls, call)

tabletStats := &discovery.LegacyTabletStats{
Target: &querypb.Target{
TabletType: topodatapb.TabletType_REPLICA,
},
}
call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats)
call3 := mockThrottler.EXPECT().Throttle(0)
call3.Return(1 * time.Second)

call4 := mockThrottler.EXPECT().Throttle(0)
call4.Return(1 * time.Second)
calllast := mockThrottler.EXPECT().Close()
call = mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats)
calls = append(calls, call)

call = mockThrottler.EXPECT().LastMaxLagNotIgnoredForTabletType(topodatapb.TabletType_REPLICA)
call.Return(uint32(20))
calls = append(calls, call)

call = mockThrottler.EXPECT().Throttle(0)
call.Return(1 * time.Second)
calls = append(calls, call)

call = mockThrottler.EXPECT().LastMaxLagNotIgnoredForTabletType(topodatapb.TabletType_REPLICA)
call.Return(uint32(20))
calls = append(calls, call)

call = mockThrottler.EXPECT().Throttle(0)
call.Return(1 * time.Second)
calls = append(calls, call)

call1.After(call0)
call2.After(call1)
call3.After(call2)
call4.After(call3)
calllast.After(call4)
call = mockThrottler.EXPECT().LastMaxLagNotIgnoredForTabletType(topodatapb.TabletType_REPLICA)
call.Return(uint32(1))
calls = append(calls, call)

call = mockThrottler.EXPECT().Throttle(0)
call.Return(1 * time.Second)
calls = append(calls, call)

call = mockThrottler.EXPECT().Close()
calls = append(calls, call)

for i := 1; i < len(calls); i++ {
calls[i].After(calls[i-1])
}

config := tabletenv.NewDefaultConfig()
config.EnableTxThrottler = true
Expand Down Expand Up @@ -147,6 +178,12 @@ func TestEnabledThrottler(t *testing.T) {
assert.False(t, throttler.Throttle(0, "some-workload"))
assert.Equal(t, int64(3), throttler.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])

// This call should not throttle despite priority. Check that's the case and counters agree.
assert.False(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(4), throttler.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])

throttler.Close()
assert.Zero(t, throttler.throttlerRunning.Get())
}
Expand Down
Loading