Skip to content

Commit

Permalink
TxThrottler only throttles if current lag is above threshold.
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo J. Ortega U <[email protected]>
  • Loading branch information
ejortegau committed Dec 13, 2023
1 parent bebddba commit 39ad5e6
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 1 deletion.
4 changes: 4 additions & 0 deletions examples/common/scripts/vttablet-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ vttablet \
--heartbeat_enable \
--heartbeat_interval=250ms \
--heartbeat_on_demand_duration=5s \
--enable-tx-throttler \
--tx-throttler-config "target_replication_lag_sec: 5 max_replication_lag_sec: 25 initial_rate: 10000 max_increase: 1 emergency_decrease: 0.5 min_duration_between_increases_sec: 2 max_duration_between_increases_sec: 5 min_duration_between_decreases_sec: 1 spread_backlog_across_sec: 20 age_bad_rate_after_sec: 60 bad_rate_increase: 0.1 max_rate_approach_threshold: 0.1" \
--tx-throttler-healthcheck-cells zone1 \
--tx-throttler-default-priority 0 \
> $VTDATAROOT/$tablet_dir/vttablet.out 2>&1 &

# Block waiting for the tablet to be listening
Expand Down
23 changes: 23 additions & 0 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/topodata"

throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata"
)
Expand Down Expand Up @@ -224,6 +225,28 @@ func (t *Throttler) Throttle(threadID int) time.Duration {
return t.threadThrottlers[threadID].throttle(t.nowFunc())
}

// LastMaxLagNotIgnoredForTabletType returns the max of all the last replication lag values seen across all tablets of
// the provided type, excluding ignored tablets.
func (t *Throttler) LastMaxLagNotIgnoredForTabletType(tabletType topodata.TabletType) uint32 {
cache := t.maxReplicationLagModule.lagCacheByType(tabletType)

var maxLag uint32
cacheEntries := cache.entries

for key, _ := range cacheEntries {
if cache.isIgnored(key) {
continue
}

lag := cache.latest(key).Stats.ReplicationLagSeconds
if lag > maxLag {
maxLag = lag
}
}

return maxLag
}

// ThreadFinished marks threadID as finished and redistributes the thread's
// rate allotment across the other threads.
// After ThreadFinished() is called, Throttle() must not be called anymore.
Expand Down
14 changes: 13 additions & 1 deletion go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type ThrottlerInterface interface {
GetConfiguration() *throttlerdatapb.Configuration
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
ResetConfiguration()
LastMaxLagNotIgnoredForTabletType(tabletType topodatapb.TabletType) uint32
}

// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with
Expand Down Expand Up @@ -356,7 +357,18 @@ func (ts *txThrottlerStateImpl) throttle() bool {
// Serialize calls to ts.throttle.Throttle()
ts.throttleMu.Lock()
defer ts.throttleMu.Unlock()
return ts.throttler.Throttle(0 /* threadId */) > 0

var maxLag uint32

for tabletType, _ := range ts.tabletTypes {
maxLagPerTabletType := ts.throttler.LastMaxLagNotIgnoredForTabletType(tabletType)
if maxLagPerTabletType > maxLag {
maxLag = maxLagPerTabletType
}
}

return ts.throttler.Throttle(0 /* threadId */) > 0 &&
int64(maxLag) > ts.config.TxThrottlerConfig.TargetReplicationLagSec
}

func (ts *txThrottlerStateImpl) deallocateResources() {
Expand Down

0 comments on commit 39ad5e6

Please sign in to comment.