Skip to content

Commit

Permalink
Cherry-pick 2b25639 with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] committed Feb 9, 2024
1 parent ec19c33 commit 58cd10c
Show file tree
Hide file tree
Showing 4 changed files with 350 additions and 4 deletions.
23 changes: 23 additions & 0 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/topodata"

throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata"
)
Expand Down Expand Up @@ -212,6 +213,28 @@ func (t *Throttler) Throttle(threadID int) time.Duration {
return t.threadThrottlers[threadID].throttle(t.nowFunc())
}

// MaxLag returns the max of all the last replication lag values seen across all tablets of
// the provided type, excluding ignored tablets.
func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 {
cache := t.maxReplicationLagModule.lagCacheByType(tabletType)

var maxLag uint32
cacheEntries := cache.entries

for key := range cacheEntries {
if cache.isIgnored(key) {
continue
}

lag := cache.latest(key).Stats.ReplicationLagSeconds
if lag > maxLag {
maxLag = lag
}
}

return maxLag
}

// ThreadFinished marks threadID as finished and redistributes the thread's
// rate allotment across the other threads.
// After ThreadFinished() is called, Throttle() must not be called anymore.
Expand Down
15 changes: 15 additions & 0 deletions go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

202 changes: 201 additions & 1 deletion go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"google.golang.org/protobuf/proto"
Expand All @@ -39,7 +40,73 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

<<<<<<< HEAD
// TxThrottler throttles transactions based on replication lag.
=======
// These vars store the functions used to create the topo server, healthcheck,
// and go/vt/throttler. These are provided here so that they can be overridden
// in tests to generate mocks.
type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck
type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error)

var (
healthCheckFactory healthCheckFactoryFunc
throttlerFactory throttlerFactoryFunc
)

func resetTxThrottlerFactories() {
healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck {
return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ","))
}
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now)
}
}

func init() {
resetTxThrottlerFactories()
}

// TxThrottler defines the interface for the transaction throttler.
type TxThrottler interface {
InitDBConfig(target *querypb.Target)
Open() (err error)
Close()
Throttle(priority int, workload string) (result bool)
}

// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler
// It is only used here to allow mocking out a throttler object.
type ThrottlerInterface interface {
Throttle(threadID int) time.Duration
ThreadFinished(threadID int)
Close()
MaxRate() int64
SetMaxRate(rate int64)
RecordReplicationLag(time time.Time, th *discovery.TabletHealth)
GetConfiguration() *throttlerdatapb.Configuration
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
ResetConfiguration()
MaxLag(tabletType topodatapb.TabletType) uint32
}

// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with
// go/vt/throttler.GlobalManager.
const TxThrottlerName = "TransactionThrottler"

// fetchKnownCells gathers a list of known cells from the topology. On error,
// the cell of the local tablet will be used and an error is logged.
func fetchKnownCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) []string {
cells, err := topoServer.GetKnownCells(ctx)
if err != nil {
log.Errorf("txThrottler: falling back to local cell due to error fetching cells from topology: %+v", err)
cells = []string{target.Cell}
}
return cells
}

// txThrottler implements TxThrottle for throttling transactions based on replication lag.
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
// It's a thin wrapper around the throttler found in vitess/go/vt/throttler.
// It uses a discovery.HealthCheck to send replication-lag updates to the wrapped throttler.
//
Expand Down Expand Up @@ -173,7 +240,20 @@ type txThrottlerState struct {
stopHealthCheck context.CancelFunc

healthCheck discovery.HealthCheck
<<<<<<< HEAD
topologyWatchers []TopologyWatcherInterface
=======
healthCheckChan chan *discovery.TabletHealth
healthCheckCells []string
cellsFromTopo bool

// tabletTypes stores the tablet types for throttling
tabletTypes map[topodatapb.TabletType]bool

maxLag int64
done chan bool
waitForTermination sync.WaitGroup
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
}

// These vars store the functions used to create the topo server, healthcheck,
Expand Down Expand Up @@ -265,7 +345,21 @@ func (t *TxThrottler) Throttle() (result bool) {
if t.state == nil {
panic("BUG: Throttle() called on a closed TxThrottler")
}
<<<<<<< HEAD
return t.state.throttle()
=======

// Throttle according to both what the throttler state says and the priority. Workloads with lower priority value
// are less likely to be throttled.
result = rand.Intn(sqlparser.MaxPriorityValue) < priority && t.state.throttle()

t.requestsTotal.Add(workload, 1)
if result {
t.requestsThrottled.Add(workload, 1)
}

return result && !t.config.TxThrottlerDryRun
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
}

func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string) (*txThrottlerState, error) {
Expand All @@ -287,6 +381,7 @@ func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string
}
createTxThrottlerHealthCheck(config, result, cell)

<<<<<<< HEAD
result.topologyWatchers = make(
[]TopologyWatcherInterface, 0, len(config.healthCheckCells))
for _, cell := range config.healthCheckCells {
Expand All @@ -300,12 +395,30 @@ func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string
shard,
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency))
=======
state := &txThrottlerStateImpl{
config: config,
healthCheckCells: config.TxThrottlerHealthCheckCells,
tabletTypes: tabletTypes,
throttler: t,
txThrottler: txThrottler,
done: make(chan bool, 1),
}

// get cells from topo if none defined in tabletenv config
if len(state.healthCheckCells) == 0 {
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
state.healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target)
state.cellsFromTopo = true
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
}
return result, nil
}

func createTxThrottlerHealthCheck(config *txThrottlerConfig, result *txThrottlerState, cell string) {
ctx, cancel := context.WithCancel(context.Background())
<<<<<<< HEAD
result.stopHealthCheck = cancel
result.healthCheck = healthCheckFactory(config.topoServer, cell, config.healthCheckCells)
ch := result.healthCheck.Subscribe()
Expand All @@ -317,6 +430,59 @@ func createTxThrottlerHealthCheck(config *txThrottlerConfig, result *txThrottler
case th := <-ch:
result.StatsUpdate(th)
}
=======
state.stopHealthCheck = cancel
state.initHealthCheckStream(txThrottler.topoServer, target)
go state.healthChecksProcessor(ctx, txThrottler.topoServer, target)
state.waitForTermination.Add(1)
go state.updateMaxLag()

return state, nil
}

func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) {
ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells)
ts.healthCheckChan = ts.healthCheck.Subscribe()

}

func (ts *txThrottlerStateImpl) closeHealthCheckStream() {
if ts.healthCheck == nil {
return
}
ts.stopHealthCheck()
ts.healthCheck.Close()
}

func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) {
fetchCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()

knownCells := fetchKnownCells(fetchCtx, topoServer, target)
if !reflect.DeepEqual(knownCells, ts.healthCheckCells) {
log.Info("txThrottler: restarting healthcheck stream due to topology cells update")
ts.healthCheckCells = knownCells
ts.closeHealthCheckStream()
ts.initHealthCheckStream(topoServer, target)
}
}

func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) {
var cellsUpdateTicks <-chan time.Time
if ts.cellsFromTopo {
ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval)
cellsUpdateTicks = ticker.C
defer ticker.Stop()
}
for {
select {
case <-ctx.Done():
return
case <-cellsUpdateTicks:
ts.updateHealthCheckCells(ctx, topoServer, target)
case th := <-ts.healthCheckChan:
ts.StatsUpdate(th)
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
}
}(ctx)
}
Expand All @@ -328,7 +494,35 @@ func (ts *txThrottlerState) throttle() bool {
// Serialize calls to ts.throttle.Throttle()
ts.throttleMu.Lock()
defer ts.throttleMu.Unlock()
return ts.throttler.Throttle(0 /* threadId */) > 0

maxLag := atomic.LoadInt64(&ts.maxLag)

return maxLag > ts.config.TxThrottlerConfig.TargetReplicationLagSec &&
ts.throttler.Throttle(0 /* threadId */) > 0
}

func (ts *txThrottlerStateImpl) updateMaxLag() {
defer ts.waitForTermination.Done()
// We use half of the target lag to ensure we have enough resolution to see changes in lag below that value
ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second)
defer ticker.Stop()
outerloop:
for {
select {
case <-ticker.C:
var maxLag uint32

for tabletType := range ts.tabletTypes {
maxLagPerTabletType := ts.throttler.MaxLag(tabletType)
if maxLagPerTabletType > maxLag {
maxLag = maxLagPerTabletType
}
}
atomic.StoreInt64(&ts.maxLag, int64(maxLag))
case <-ts.done:
break outerloop
}
}
}

func (ts *txThrottlerState) deallocateResources() {
Expand All @@ -343,7 +537,13 @@ func (ts *txThrottlerState) deallocateResources() {
ts.healthCheck.Close()
ts.healthCheck = nil

<<<<<<< HEAD
// After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not
=======
ts.done <- true
ts.waitForTermination.Wait()
// After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
// to be executing, so we can safely close the throttler.
ts.throttler.Close()
ts.throttler = nil
Expand Down
Loading

0 comments on commit 58cd10c

Please sign in to comment.