Skip to content

Commit

Permalink
TxThrottler only throttles if current lag is above threshold.
Browse files Browse the repository at this point in the history
This uses the lag caches in MaxReplicationLagModule to make the decision
whether to throttle or not, in addition to the result of the underlying
throttler struct.

Signed-off-by: Eduardo J. Ortega U <[email protected]>
  • Loading branch information
ejortegau committed Dec 15, 2023
1 parent 6b481a7 commit 231c2f6
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
23 changes: 23 additions & 0 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/topodata"

throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata"
)
Expand Down Expand Up @@ -224,6 +225,28 @@ func (t *Throttler) Throttle(threadID int) time.Duration {
return t.threadThrottlers[threadID].throttle(t.nowFunc())
}

// LastMaxLagNotIgnoredForTabletType returns the max of all the last replication lag values seen across all tablets of
// the provided type, excluding ignored tablets.
func (t *Throttler) LastMaxLagNotIgnoredForTabletType(tabletType topodata.TabletType) uint32 {
cache := t.maxReplicationLagModule.lagCacheByType(tabletType)

var maxLag uint32
cacheEntries := cache.entries

for key, _ := range cacheEntries {
if cache.isIgnored(key) {
continue
}

lag := cache.latest(key).Stats.ReplicationLagSeconds
if lag > maxLag {
maxLag = lag
}
}

return maxLag
}

// ThreadFinished marks threadID as finished and redistributes the thread's
// rate allotment across the other threads.
// After ThreadFinished() is called, Throttle() must not be called anymore.
Expand Down
14 changes: 13 additions & 1 deletion go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type ThrottlerInterface interface {
GetConfiguration() *throttlerdatapb.Configuration
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
ResetConfiguration()
LastMaxLagNotIgnoredForTabletType(tabletType topodatapb.TabletType) uint32
}

// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with
Expand Down Expand Up @@ -356,7 +357,18 @@ func (ts *txThrottlerStateImpl) throttle() bool {
// Serialize calls to ts.throttle.Throttle()
ts.throttleMu.Lock()
defer ts.throttleMu.Unlock()
return ts.throttler.Throttle(0 /* threadId */) > 0

var maxLag uint32

for tabletType, _ := range ts.tabletTypes {
maxLagPerTabletType := ts.throttler.LastMaxLagNotIgnoredForTabletType(tabletType)
if maxLagPerTabletType > maxLag {
maxLag = maxLagPerTabletType
}
}

return ts.throttler.Throttle(0 /* threadId */) > 0 &&
int64(maxLag) > ts.config.TxThrottlerConfig.TargetReplicationLagSec
}

func (ts *txThrottlerStateImpl) deallocateResources() {
Expand Down

0 comments on commit 231c2f6

Please sign in to comment.