Skip to content

Commit

Permalink
tx throttler: healthcheck all cells if `--tx-throttler-healthcheck-ce…
Browse files Browse the repository at this point in the history
…lls` is undefined (vitessio#12477)

Signed-off-by: Tim Vaillancourt <[email protected]>
Co-authored-by: Shlomi Noach <[email protected]>
  • Loading branch information
timvaillancourt and shlomi-noach committed Apr 17, 2024
1 parent 7a380b8 commit 343c024
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 66 deletions.
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ Usage of vttablet:
--tx-throttler-default-priority int Default priority assigned to queries that lack priority information (default 100)
--tx-throttler-healthcheck-cells strings Synonym to -tx_throttler_healthcheck_cells
--tx-throttler-tablet-types strings A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly. (default replica)
--tx-throttler-topo-refresh-interval duration The rate that the transaction throttler will refresh the topology to find cells. (default 5m0s)
--tx_throttler_config string The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message. (default "target_replication_lag_sec:2 max_replication_lag_sec:10 initial_rate:100 max_increase:1 emergency_decrease:0.5 min_duration_between_increases_sec:40 max_duration_between_increases_sec:62 min_duration_between_decreases_sec:20 spread_backlog_across_sec:20 age_bad_rate_after_sec:180 bad_rate_increase:0.1 max_rate_approach_threshold:0.9")
--tx_throttler_healthcheck_cells strings A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.
--unhealthy_threshold duration replication lag after which a replica is considered unhealthy (default 2h0m0s)
Expand Down
26 changes: 13 additions & 13 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
flagutil.DualFormatStringListVar(fs, &currentConfig.TxThrottlerHealthCheckCells, "tx_throttler_healthcheck_cells", defaultConfig.TxThrottlerHealthCheckCells, "A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.")
fs.IntVar(&currentConfig.TxThrottlerDefaultPriority, "tx-throttler-default-priority", defaultConfig.TxThrottlerDefaultPriority, "Default priority assigned to queries that lack priority information")
fs.Var(currentConfig.TxThrottlerTabletTypes, "tx-throttler-tablet-types", "A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly.")
fs.DurationVar(&currentConfig.TxThrottlerTopoRefreshInterval, "tx-throttler-topo-refresh-interval", time.Minute*5, "The rate that the transaction throttler will refresh the topology to find cells.")

fs.BoolVar(&enableHotRowProtection, "enable_hot_row_protection", false, "If true, incoming transactions for the same row (range) will be queued and cannot consume all txpool slots.")
fs.BoolVar(&enableHotRowProtectionDryRun, "enable_hot_row_protection_dry_run", false, "If true, hot row protection is not enforced but logs if transactions would have been queued.")
Expand Down Expand Up @@ -335,11 +336,12 @@ type TabletConfig struct {
TwoPCCoordinatorAddress string `json:"-"`
TwoPCAbandonAge Seconds `json:"-"`

EnableTxThrottler bool `json:"-"`
TxThrottlerConfig *TxThrottlerConfigFlag `json:"-"`
TxThrottlerHealthCheckCells []string `json:"-"`
TxThrottlerDefaultPriority int `json:"-"`
TxThrottlerTabletTypes *topoproto.TabletTypeListFlag `json:"-"`
EnableTxThrottler bool `json:"-"`
TxThrottlerConfig *TxThrottlerConfigFlag `json:"-"`
TxThrottlerHealthCheckCells []string `json:"-"`
TxThrottlerDefaultPriority int `json:"-"`
TxThrottlerTabletTypes *topoproto.TabletTypeListFlag `json:"-"`
TxThrottlerTopoRefreshInterval time.Duration `json:"-"`

EnableLagThrottler bool `json:"-"`

Expand Down Expand Up @@ -534,9 +536,6 @@ func (c *TabletConfig) verifyTxThrottlerConfig() error {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse throttlerdatapb.Configuration config: %v", err)
}

if len(c.TxThrottlerHealthCheckCells) == 0 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "empty healthCheckCells given: %+v", c.TxThrottlerHealthCheckCells)
}
if v := c.TxThrottlerDefaultPriority; v > sqlparser.MaxPriorityValue || v < 0 {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--tx-throttler-default-priority must be > 0 and < 100 (specified value: %d)", v)
}
Expand Down Expand Up @@ -619,11 +618,12 @@ var defaultConfig = TabletConfig{
DeprecatedCacheResultFields: true,
SignalWhenSchemaChange: true,

EnableTxThrottler: false,
TxThrottlerConfig: defaultTxThrottlerConfig(),
TxThrottlerHealthCheckCells: []string{},
TxThrottlerDefaultPriority: sqlparser.MaxPriorityValue, // This leads to all queries being candidates to throttle
TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA},
EnableTxThrottler: false,
TxThrottlerConfig: defaultTxThrottlerConfig(),
TxThrottlerHealthCheckCells: []string{},
TxThrottlerDefaultPriority: sqlparser.MaxPriorityValue, // This leads to all queries being candidates to throttle
TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA},
TxThrottlerTopoRefreshInterval: time.Minute * 5,

EnableLagThrottler: false, // Feature flag; to switch to 'true' at some stage in the future

Expand Down
7 changes: 0 additions & 7 deletions go/vt/vttablet/tabletserver/tabletenv/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,6 @@ func TestVerifyTxThrottlerConfig(t *testing.T) {
EnableTxThrottler: true,
TxThrottlerConfig: &TxThrottlerConfigFlag{invalidMaxReplicationLagModuleConfig},
},
{
// enabled without cells defined
Name: "enabled without cells",
ExpectedErrorCode: vtrpcpb.Code_FAILED_PRECONDITION,
EnableTxThrottler: true,
TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig},
},
{
// enabled with good config (default/replica tablet type)
Name: "enabled",
Expand Down
148 changes: 103 additions & 45 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package txthrottler
import (
"context"
"math/rand"
"reflect"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -101,6 +102,17 @@ type TopologyWatcherInterface interface {
// go/vt/throttler.GlobalManager.
const TxThrottlerName = "TransactionThrottler"

// fetchKnownCells gathers a list of known cells from the topology. On error,
// the cell of the local tablet will be used and an error is logged.
func fetchKnownCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) []string {
cells, err := topoServer.GetKnownCells(ctx)
if err != nil {
log.Errorf("txThrottler: falling back to local cell due to error fetching cells from topology: %+v", err)
cells = []string{target.Cell}
}
return cells
}

// txThrottler implements TxThrottle for throttling transactions based on replication lag.
// It's a thin wrapper around the throttler found in vitess/go/vt/throttler.
// It uses a discovery.HealthCheck to send replication-lag updates to the wrapped throttler.
Expand Down Expand Up @@ -165,6 +177,9 @@ type txThrottlerConfig struct {

// tabletTypes stores the tablet types for throttling
tabletTypes map[topodatapb.TabletType]bool

// rate to refresh topo for cells
topoRefreshInterval time.Duration
}

// txThrottlerState holds the state of an open TxThrottler object.
Expand All @@ -174,12 +189,15 @@ type txThrottlerState struct {

// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
// That method is required to be called in serial for each threadId.
throttleMu sync.Mutex
throttler ThrottlerInterface
stopHealthCheck context.CancelFunc
throttleMu sync.Mutex
throttler ThrottlerInterface
stopHealthCheck context.CancelFunc
topologyWatchers map[string]TopologyWatcherInterface

healthCheck discovery.HealthCheck
topologyWatchers map[string]TopologyWatcherInterface
healthCheckChan chan *discovery.TabletHealth
healthCheckCells []string
cellsFromTopo bool
}

// NewTxThrottler tries to construct a txThrottler from the
Expand All @@ -201,10 +219,11 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
}

throttlerConfig = &txThrottlerConfig{
enabled: true,
tabletTypes: tabletTypes,
throttlerConfig: env.Config().TxThrottlerConfig.Get(),
healthCheckCells: healthCheckCells,
enabled: true,
healthCheckCells: healthCheckCells,
tabletTypes: tabletTypes,
throttlerConfig: env.Config().TxThrottlerConfig.Get(),
topoRefreshInterval: env.Config().TxThrottlerTopoRefreshInterval,
}

defer log.Infof("Initialized transaction throttler with config: %+v", throttlerConfig)
Expand Down Expand Up @@ -301,44 +320,91 @@ func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, ta
return nil, err
}
state := &txThrottlerState{
config: config,
throttler: t,
txThrottler: txThrottler,
config: config,
healthCheckCells: config.healthCheckCells,
throttler: t,
txThrottler: txThrottler,
}
createTxThrottlerHealthCheck(txThrottler.topoServer, config, state, target.Cell)

state.topologyWatchers = make(
map[string]TopologyWatcherInterface, len(config.healthCheckCells))
for _, cell := range config.healthCheckCells {
state.topologyWatchers[cell] = topologyWatcherFactory(
txThrottler.topoServer,
state.healthCheck,

// get cells from topo if none defined in tabletenv config
if len(state.healthCheckCells) == 0 {
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
state.healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target)
state.cellsFromTopo = true
}

ctx, cancel := context.WithCancel(context.Background())
state.stopHealthCheck = cancel
state.initHealthCheckStream(txThrottler.topoServer, target)
go state.healthChecksProcessor(ctx, txThrottler.topoServer, target)

return state, nil
}

func (ts *txThrottlerState) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) {
ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells)
ts.healthCheckChan = ts.healthCheck.Subscribe()

ts.topologyWatchers = make(
map[string]TopologyWatcherInterface, len(ts.healthCheckCells))
for _, cell := range ts.healthCheckCells {
ts.topologyWatchers[cell] = topologyWatcherFactory(
topoServer,
ts.healthCheck,
cell,
target.Keyspace,
target.Shard,
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency,
)
txThrottler.topoWatchers.Add(cell, 1)
ts.txThrottler.topoWatchers.Add(cell, 1)
}
return state, nil
}

func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerConfig, result *txThrottlerState, cell string) {
ctx, cancel := context.WithCancel(context.Background())
result.stopHealthCheck = cancel
result.healthCheck = healthCheckFactory(topoServer, cell, config.healthCheckCells)
ch := result.healthCheck.Subscribe()
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case th := <-ch:
result.StatsUpdate(th)
}
func (ts *txThrottlerState) closeHealthCheckStream() {
if ts.healthCheck == nil {
return
}
for cell, watcher := range ts.topologyWatchers {
watcher.Stop()
ts.txThrottler.topoWatchers.Reset(cell)
}
ts.topologyWatchers = nil
ts.stopHealthCheck()
ts.healthCheck.Close()
}

func (ts *txThrottlerState) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) {
fetchCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()

knownCells := fetchKnownCells(fetchCtx, topoServer, target)
if !reflect.DeepEqual(knownCells, ts.healthCheckCells) {
log.Info("txThrottler: restarting healthcheck stream due to topology cells update")
ts.healthCheckCells = knownCells
ts.closeHealthCheckStream()
ts.initHealthCheckStream(topoServer, target)
}
}

func (ts *txThrottlerState) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) {
var cellsUpdateTicks <-chan time.Time
if ts.cellsFromTopo {
ticker := time.NewTicker(ts.config.topoRefreshInterval)
cellsUpdateTicks = ticker.C
defer ticker.Stop()
}
for {
select {
case <-ctx.Done():
return
case <-cellsUpdateTicks:
ts.updateHealthCheckCells(ctx, topoServer, target)
case th := <-ts.healthCheckChan:
ts.StatsUpdate(th)
}
}(ctx)
}
}

func (ts *txThrottlerState) throttle() bool {
Expand All @@ -353,16 +419,8 @@ func (ts *txThrottlerState) throttle() bool {
}

func (ts *txThrottlerState) deallocateResources() {
// We don't really need to nil out the fields here
// as deallocateResources is not expected to be called
// more than once, but it doesn't hurt to do so.
for cell, watcher := range ts.topologyWatchers {
watcher.Stop()
ts.txThrottler.topoWatchers.Reset(cell)
}
ts.topologyWatchers = nil

ts.healthCheck.Close()
// Close healthcheck and topo watchers
ts.closeHealthCheckStream()
ts.healthCheck = nil

// After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not
Expand Down
15 changes: 14 additions & 1 deletion go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package txthrottler
//go:generate mockgen -destination mock_topology_watcher_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler TopologyWatcherInterface

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -112,7 +113,6 @@ func TestEnabledThrottler(t *testing.T) {

config := tabletenv.NewDefaultConfig()
config.EnableTxThrottler = true
config.TxThrottlerHealthCheckCells = []string{"cell1", "cell2"}
config.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA}

env := tabletenv.NewEnv(config, t.Name())
Expand Down Expand Up @@ -161,6 +161,19 @@ func TestEnabledThrottler(t *testing.T) {
assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttlerImpl.topoWatchers.Counts())
}

func TestFetchKnownCells(t *testing.T) {
{
ts := memorytopo.NewServer("cell1", "cell2")
cells := fetchKnownCells(context.Background(), ts, &querypb.Target{Cell: "cell1"})
assert.Equal(t, []string{"cell1", "cell2"}, cells)
}
{
ts := memorytopo.NewServer()
cells := fetchKnownCells(context.Background(), ts, &querypb.Target{Cell: "cell1"})
assert.Equal(t, []string{"cell1"}, cells)
}
}

func TestNewTxThrottler(t *testing.T) {
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, t.Name())
Expand Down

0 comments on commit 343c024

Please sign in to comment.