Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-16.0] TxThrottler: dont throttle unless lag (#14789) #15188

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/topodata"

throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata"
)
Expand Down Expand Up @@ -212,6 +213,28 @@ func (t *Throttler) Throttle(threadID int) time.Duration {
return t.threadThrottlers[threadID].throttle(t.nowFunc())
}

// MaxLag returns the max of all the last replication lag values seen across all tablets of
// the provided type, excluding ignored tablets.
func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 {
cache := t.maxReplicationLagModule.lagCacheByType(tabletType)

var maxLag uint32
cacheEntries := cache.entries

for key := range cacheEntries {
if cache.isIgnored(key) {
continue
}

lag := cache.latest(key).Stats.ReplicationLagSeconds
if lag > maxLag {
maxLag = lag
}
}

return maxLag
}

// ThreadFinished marks threadID as finished and redistributes the thread's
// rate allotment across the other threads.
// After ThreadFinished() is called, Throttle() must not be called anymore.
Expand Down
15 changes: 15 additions & 0 deletions go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

235 changes: 234 additions & 1 deletion go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"google.golang.org/protobuf/proto"
Expand All @@ -39,7 +40,73 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

<<<<<<< HEAD
// TxThrottler throttles transactions based on replication lag.
=======
// These vars store the functions used to create the topo server, healthcheck,
// and go/vt/throttler. These are provided here so that they can be overridden
// in tests to generate mocks.
type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck
type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error)

var (
healthCheckFactory healthCheckFactoryFunc
throttlerFactory throttlerFactoryFunc
)

func resetTxThrottlerFactories() {
healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck {
return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ","))
}
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now)
}
}

func init() {
resetTxThrottlerFactories()
}

// TxThrottler defines the interface for the transaction throttler.
type TxThrottler interface {
InitDBConfig(target *querypb.Target)
Open() (err error)
Close()
Throttle(priority int, workload string) (result bool)
}

// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler
// It is only used here to allow mocking out a throttler object.
type ThrottlerInterface interface {
Throttle(threadID int) time.Duration
ThreadFinished(threadID int)
Close()
MaxRate() int64
SetMaxRate(rate int64)
RecordReplicationLag(time time.Time, th *discovery.TabletHealth)
GetConfiguration() *throttlerdatapb.Configuration
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
ResetConfiguration()
MaxLag(tabletType topodatapb.TabletType) uint32
}

// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with
// go/vt/throttler.GlobalManager.
const TxThrottlerName = "TransactionThrottler"

// fetchKnownCells gathers a list of known cells from the topology. On error,
// the cell of the local tablet will be used and an error is logged.
func fetchKnownCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) []string {
cells, err := topoServer.GetKnownCells(ctx)
if err != nil {
log.Errorf("txThrottler: falling back to local cell due to error fetching cells from topology: %+v", err)
cells = []string{target.Cell}
}
return cells
}

// txThrottler implements TxThrottle for throttling transactions based on replication lag.
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
// It's a thin wrapper around the throttler found in vitess/go/vt/throttler.
// It uses a discovery.HealthCheck to send replication-lag updates to the wrapped throttler.
//
Expand Down Expand Up @@ -173,7 +240,23 @@ type txThrottlerState struct {
stopHealthCheck context.CancelFunc

healthCheck discovery.HealthCheck
<<<<<<< HEAD
topologyWatchers []TopologyWatcherInterface
=======
healthCheckChan chan *discovery.TabletHealth
healthCheckCells []string
cellsFromTopo bool

// tabletTypes stores the tablet types for throttling
tabletTypes map[topodatapb.TabletType]bool

maxLag int64
done chan bool
waitForTermination sync.WaitGroup
<<<<<<< HEAD
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
=======
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
}

// These vars store the functions used to create the topo server, healthcheck,
Expand Down Expand Up @@ -265,7 +348,21 @@ func (t *TxThrottler) Throttle() (result bool) {
if t.state == nil {
panic("BUG: Throttle() called on a closed TxThrottler")
}
<<<<<<< HEAD
return t.state.throttle()
=======

// Throttle according to both what the throttler state says and the priority. Workloads with lower priority value
// are less likely to be throttled.
result = rand.Intn(sqlparser.MaxPriorityValue) < priority && t.state.throttle()

t.requestsTotal.Add(workload, 1)
if result {
t.requestsThrottled.Add(workload, 1)
}

return result && !t.config.TxThrottlerDryRun
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
}

func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string) (*txThrottlerState, error) {
Expand All @@ -287,6 +384,7 @@ func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string
}
createTxThrottlerHealthCheck(config, result, cell)

<<<<<<< HEAD
result.topologyWatchers = make(
[]TopologyWatcherInterface, 0, len(config.healthCheckCells))
for _, cell := range config.healthCheckCells {
Expand All @@ -300,12 +398,30 @@ func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string
shard,
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency))
=======
state := &txThrottlerStateImpl{
config: config,
healthCheckCells: config.TxThrottlerHealthCheckCells,
tabletTypes: tabletTypes,
throttler: t,
txThrottler: txThrottler,
done: make(chan bool, 1),
}

// get cells from topo if none defined in tabletenv config
if len(state.healthCheckCells) == 0 {
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
state.healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target)
state.cellsFromTopo = true
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
}
return result, nil
}

func createTxThrottlerHealthCheck(config *txThrottlerConfig, result *txThrottlerState, cell string) {
ctx, cancel := context.WithCancel(context.Background())
<<<<<<< HEAD
result.stopHealthCheck = cancel
result.healthCheck = healthCheckFactory(config.topoServer, cell, config.healthCheckCells)
ch := result.healthCheck.Subscribe()
Expand All @@ -317,6 +433,59 @@ func createTxThrottlerHealthCheck(config *txThrottlerConfig, result *txThrottler
case th := <-ch:
result.StatsUpdate(th)
}
=======
state.stopHealthCheck = cancel
state.initHealthCheckStream(txThrottler.topoServer, target)
go state.healthChecksProcessor(ctx, txThrottler.topoServer, target)
state.waitForTermination.Add(1)
go state.updateMaxLag()

return state, nil
}

func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) {
ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells)
ts.healthCheckChan = ts.healthCheck.Subscribe()

}

func (ts *txThrottlerStateImpl) closeHealthCheckStream() {
if ts.healthCheck == nil {
return
}
ts.stopHealthCheck()
ts.healthCheck.Close()
}

func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) {
fetchCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()

knownCells := fetchKnownCells(fetchCtx, topoServer, target)
if !reflect.DeepEqual(knownCells, ts.healthCheckCells) {
log.Info("txThrottler: restarting healthcheck stream due to topology cells update")
ts.healthCheckCells = knownCells
ts.closeHealthCheckStream()
ts.initHealthCheckStream(topoServer, target)
}
}

func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) {
var cellsUpdateTicks <-chan time.Time
if ts.cellsFromTopo {
ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval)
cellsUpdateTicks = ticker.C
defer ticker.Stop()
}
for {
select {
case <-ctx.Done():
return
case <-cellsUpdateTicks:
ts.updateHealthCheckCells(ctx, topoServer, target)
case th := <-ts.healthCheckChan:
ts.StatsUpdate(th)
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
}
}(ctx)
}
Expand All @@ -328,7 +497,62 @@ func (ts *txThrottlerState) throttle() bool {
// Serialize calls to ts.throttle.Throttle()
ts.throttleMu.Lock()
defer ts.throttleMu.Unlock()
return ts.throttler.Throttle(0 /* threadId */) > 0

maxLag := atomic.LoadInt64(&ts.maxLag)

return maxLag > ts.config.TxThrottlerConfig.TargetReplicationLagSec &&
ts.throttler.Throttle(0 /* threadId */) > 0
<<<<<<< HEAD
=======
}

func (ts *txThrottlerStateImpl) updateMaxLag() {
defer ts.waitForTermination.Done()
// We use half of the target lag to ensure we have enough resolution to see changes in lag below that value
ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second)
defer ticker.Stop()
outerloop:
for {
select {
case <-ticker.C:
var maxLag uint32

for tabletType := range ts.tabletTypes {
maxLagPerTabletType := ts.throttler.MaxLag(tabletType)
if maxLagPerTabletType > maxLag {
maxLag = maxLagPerTabletType
}
}
atomic.StoreInt64(&ts.maxLag, int64(maxLag))
case <-ts.done:
break outerloop
}
}
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
}

func (ts *txThrottlerStateImpl) updateMaxLag() {
defer ts.waitForTermination.Done()
// We use half of the target lag to ensure we have enough resolution to see changes in lag below that value
ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second)
defer ticker.Stop()
outerloop:
for {
select {
case <-ticker.C:
var maxLag uint32

for tabletType := range ts.tabletTypes {
maxLagPerTabletType := ts.throttler.MaxLag(tabletType)
if maxLagPerTabletType > maxLag {
maxLag = maxLagPerTabletType
}
}
atomic.StoreInt64(&ts.maxLag, int64(maxLag))
case <-ts.done:
break outerloop
}
}
}

func (ts *txThrottlerState) deallocateResources() {
Expand All @@ -343,7 +567,16 @@ func (ts *txThrottlerState) deallocateResources() {
ts.healthCheck.Close()
ts.healthCheck = nil

<<<<<<< HEAD
<<<<<<< HEAD
// After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not
=======
=======
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
ts.done <- true
ts.waitForTermination.Wait()
// After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
// to be executing, so we can safely close the throttler.
ts.throttler.Close()
ts.throttler = nil
Expand Down
Loading
Loading