Skip to content

Commit

Permalink
TxThrottler only throttles if current lag is above threshold.
Browse files Browse the repository at this point in the history
This the lag caches in MaxReplicationLagModule

Signed-off-by: Eduardo J. Ortega U <[email protected]>
  • Loading branch information
ejortegau committed Dec 13, 2023
1 parent f1517e5 commit 714ae9d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
23 changes: 23 additions & 0 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"math"
"sync"
"time"
"vitess.io/vitess/go/vt/proto/topodata"

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -224,6 +225,28 @@ func (t *Throttler) Throttle(threadID int) time.Duration {
return t.threadThrottlers[threadID].throttle(t.nowFunc())
}

// LastMaxLagNotIgnoredForTabletType returns the max of all the last replication lag values seen across all tablets of
// the provided type, excluding ignored tablets.
func (t *Throttler) LastMaxLagNotIgnoredForTabletType(tabletType topodata.TabletType) uint32 {
cache := t.maxReplicationLagModule.lagCacheByType(tabletType)

var maxLag uint32
cacheEntries := cache.entries

for key, _ := range cacheEntries {
if cache.isIgnored(key) {
continue
}

lag := cache.latest(key).Stats.ReplicationLagSeconds
if lag > maxLag {
maxLag = lag
}
}

return maxLag
}

// ThreadFinished marks threadID as finished and redistributes the thread's
// rate allotment across the other threads.
// After ThreadFinished() is called, Throttle() must not be called anymore.
Expand Down
15 changes: 14 additions & 1 deletion go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type ThrottlerInterface interface {
GetConfiguration() *throttlerdatapb.Configuration
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
ResetConfiguration()
LastMaxLagNotIgnoredForTabletType(tabletType topodatapb.TabletType) uint32
}

// TopologyWatcherInterface defines the public interface that is implemented by
Expand Down Expand Up @@ -359,7 +360,19 @@ func (ts *txThrottlerStateImpl) throttle() bool {
// Serialize calls to ts.throttle.Throttle()
ts.throttleMu.Lock()
defer ts.throttleMu.Unlock()
return ts.throttler.Throttle(0 /* threadId */) > 0

var maxLag uint32

for _, tabletType := range ts.config.tabletTypes {
maxLagPerTabletType := ts.throttler.LastMaxLagNotIgnoredForTabletType(tabletType)
if maxLagPerTabletType > maxLag {
maxLag = maxLagPerTabletType
}
}

return ts.throttler.Throttle(0 /* threadId */) > 0 &&
int64(maxLag) > ts.config.throttlerConfig.TargetReplicationLagSec

}

func (ts *txThrottlerStateImpl) deallocateResources() {
Expand Down

0 comments on commit 714ae9d

Please sign in to comment.