diff --git a/examples/common/scripts/vttablet-up.sh b/examples/common/scripts/vttablet-up.sh index 56d212af218..3947ee8190f 100755 --- a/examples/common/scripts/vttablet-up.sh +++ b/examples/common/scripts/vttablet-up.sh @@ -56,6 +56,10 @@ vttablet \ --heartbeat_enable \ --heartbeat_interval=250ms \ --heartbeat_on_demand_duration=5s \ + --enable-tx-throttler \ + --tx-throttler-config "target_replication_lag_sec: 5 max_replication_lag_sec: 25 initial_rate: 10000 max_increase: 1 emergency_decrease: 0.5 min_duration_between_increases_sec: 2 max_duration_between_increases_sec: 5 min_duration_between_decreases_sec: 1 spread_backlog_across_sec: 20 age_bad_rate_after_sec: 60 bad_rate_increase: 0.1 max_rate_approach_threshold: 0.1" \ + --tx-throttler-healthcheck-cells zone1 \ + --tx-throttler-default-priority 0 \ > $VTDATAROOT/$tablet_dir/vttablet.out 2>&1 & # Block waiting for the tablet to be listening diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go index 83a1c52225e..2a11d406b11 100644 --- a/go/vt/throttler/throttler.go +++ b/go/vt/throttler/throttler.go @@ -35,6 +35,7 @@ import ( "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/proto/topodata" throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" ) @@ -224,6 +225,28 @@ func (t *Throttler) Throttle(threadID int) time.Duration { return t.threadThrottlers[threadID].throttle(t.nowFunc()) } +// LastMaxLagNotIgnoredForTabletType returns the max of all the last replication lag values seen across all tablets of +// the provided type, excluding ignored tablets. +func (t *Throttler) LastMaxLagNotIgnoredForTabletType(tabletType topodata.TabletType) uint32 { + cache := t.maxReplicationLagModule.lagCacheByType(tabletType) + + var maxLag uint32 + cacheEntries := cache.entries + + for key, _ := range cacheEntries { + if cache.isIgnored(key) { + continue + } + + lag := cache.latest(key).Stats.ReplicationLagSeconds + if lag > maxLag { + maxLag = lag + } + } + + return maxLag +} + // ThreadFinished marks threadID as finished and redistributes the thread's // rate allotment across the other threads. // After ThreadFinished() is called, Throttle() must not be called anymore. diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 78392a4b078..1c8d62d523b 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -81,6 +81,7 @@ type ThrottlerInterface interface { GetConfiguration() *throttlerdatapb.Configuration UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error ResetConfiguration() + LastMaxLagNotIgnoredForTabletType(tabletType topodatapb.TabletType) uint32 } // TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with @@ -356,7 +357,18 @@ func (ts *txThrottlerStateImpl) throttle() bool { // Serialize calls to ts.throttle.Throttle() ts.throttleMu.Lock() defer ts.throttleMu.Unlock() - return ts.throttler.Throttle(0 /* threadId */) > 0 + + var maxLag uint32 + + for tabletType, _ := range ts.tabletTypes { + maxLagPerTabletType := ts.throttler.LastMaxLagNotIgnoredForTabletType(tabletType) + if maxLagPerTabletType > maxLag { + maxLag = maxLagPerTabletType + } + } + + return ts.throttler.Throttle(0 /* threadId */) > 0 && + int64(maxLag) > ts.config.TxThrottlerConfig.TargetReplicationLagSec } func (ts *txThrottlerStateImpl) deallocateResources() {