diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go index 206891e311e..abc6d760729 100644 --- a/go/vt/throttler/throttler.go +++ b/go/vt/throttler/throttler.go @@ -32,6 +32,7 @@ import ( "math" "sync" "time" + "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" @@ -224,6 +225,28 @@ func (t *Throttler) Throttle(threadID int) time.Duration { return t.threadThrottlers[threadID].throttle(t.nowFunc()) } +// LastMaxLagNotIgnoredForTabletType returns the max of all the last replication lag values seen across all tablets of +// the provided type, excluding ignored tablets. +func (t *Throttler) LastMaxLagNotIgnoredForTabletType(tabletType topodata.TabletType) uint32 { + cache := t.maxReplicationLagModule.lagCacheByType(tabletType) + + var maxLag uint32 + cacheEntries := cache.entries + + for key, _ := range cacheEntries { + if cache.isIgnored(key) { + continue + } + + lag := cache.latest(key).Stats.ReplicationLagSeconds + if lag > maxLag { + maxLag = lag + } + } + + return maxLag +} + // ThreadFinished marks threadID as finished and redistributes the thread's // rate allotment across the other threads. // After ThreadFinished() is called, Throttle() must not be called anymore. diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 9f2f369ffd9..6841fa0a1b4 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -86,6 +86,7 @@ type ThrottlerInterface interface { GetConfiguration() *throttlerdatapb.Configuration UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error ResetConfiguration() + LastMaxLagNotIgnoredForTabletType(tabletType topodatapb.TabletType) uint32 } // TopologyWatcherInterface defines the public interface that is implemented by @@ -359,7 +360,19 @@ func (ts *txThrottlerStateImpl) throttle() bool { // Serialize calls to ts.throttle.Throttle() ts.throttleMu.Lock() defer ts.throttleMu.Unlock() - return ts.throttler.Throttle(0 /* threadId */) > 0 + + var maxLag uint32 + + for _, tabletType := range ts.config.tabletTypes { + maxLagPerTabletType := ts.throttler.LastMaxLagNotIgnoredForTabletType(tabletType) + if maxLagPerTabletType > maxLag { + maxLag = maxLagPerTabletType + } + } + + return ts.throttler.Throttle(0 /* threadId */) > 0 && + int64(maxLag) > ts.config.throttlerConfig.TargetReplicationLagSec + } func (ts *txThrottlerStateImpl) deallocateResources() {