Skip to content

Commit

Permalink
txthrottler: add metrics for topoWatcher and healthCheckStreamer
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed May 25, 2023
1 parent 810dc19 commit f04827f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 28 deletions.
69 changes: 41 additions & 28 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,12 @@ type txThrottler struct {
topoServer *topo.Server

// stats
throttlerRunning *stats.Gauge
requestsTotal *stats.Counter
requestsThrottled *stats.Counter
throttlerRunning *stats.Gauge
topoWatchers *stats.GaugesWithSingleLabel
healthChecksReadTotal *stats.CountersWithSingleLabel
healthChecksRecordedTotal *stats.CountersWithSingleLabel
requestsTotal *stats.Counter
requestsThrottled *stats.Counter
}

// txThrottlerConfig holds the parameters that need to be
Expand All @@ -168,7 +171,8 @@ type txThrottlerConfig struct {

// txThrottlerState holds the state of an open TxThrottler object.
type txThrottlerState struct {
config *txThrottlerConfig
config *txThrottlerConfig
txThrottler *txThrottler

// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
// That method is required to be called in serial for each threadId.
Expand All @@ -177,7 +181,7 @@ type txThrottlerState struct {
stopHealthCheck context.CancelFunc

healthCheck discovery.HealthCheck
topologyWatchers []TopologyWatcherInterface
topologyWatchers map[string]TopologyWatcherInterface
}

// NewTxThrottler tries to construct a txThrottler from the
Expand Down Expand Up @@ -238,11 +242,14 @@ func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrott
}
}
return &txThrottler{
config: config,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"),
requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"),
config: config,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
topoWatchers: env.Exporter().NewGaugesWithSingleLabel("TransactionThrottlerTopoWatchers", "transaction throttler topology watchers", "cell"),
healthChecksReadTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerHealthchecksRead", "transaction throttler healthchecks read", "DbType"),
healthChecksRecordedTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerHealthchecksRecorded", "transaction throttler healthchecks recorded", "DbType"),
requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"),
requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"),
}, nil
}

Expand All @@ -256,7 +263,7 @@ func (t *txThrottler) Open() (err error) {
}
log.Info("txThrottler: opening")
t.throttlerRunning.Set(1)
t.state, err = newTxThrottlerState(t.topoServer, t.config, t.target)
t.state, err = newTxThrottlerState(t, t.config, t.target)
return err
}

Expand Down Expand Up @@ -300,7 +307,7 @@ func (t *txThrottler) Throttle(priority int) (result bool) {
return result
}

func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) {
func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) {
maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}

t, err := throttlerFactory(
Expand All @@ -318,24 +325,25 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
return nil, err
}
result := &txThrottlerState{
config: config,
throttler: t,
config: config,
throttler: t,
txThrottler: txThrottler,
}
createTxThrottlerHealthCheck(topoServer, config, result, target.Cell)
createTxThrottlerHealthCheck(txThrottler.topoServer, config, result, target.Cell)

result.topologyWatchers = make(
[]TopologyWatcherInterface, 0, len(config.healthCheckCells))
map[string]TopologyWatcherInterface, len(config.healthCheckCells))
for _, cell := range config.healthCheckCells {
result.topologyWatchers = append(
result.topologyWatchers,
topologyWatcherFactory(
topoServer,
result.healthCheck,
cell,
target.Keyspace,
target.Shard,
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency))
result.topologyWatchers[cell] = topologyWatcherFactory(
txThrottler.topoServer,
result.healthCheck,
cell,
target.Keyspace,
target.Shard,
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency,
)
result.txThrottler.topoWatchers.Add(cell, 1)
}
return result, nil
}
Expand Down Expand Up @@ -372,8 +380,9 @@ func (ts *txThrottlerState) deallocateResources() {
// We don't really need to nil out the fields here
// as deallocateResources is not expected to be called
// more than once, but it doesn't hurt to do so.
for _, watcher := range ts.topologyWatchers {
for cell, watcher := range ts.topologyWatchers {
watcher.Stop()
ts.txThrottler.topoWatchers.Reset(cell)
}
ts.topologyWatchers = nil

Expand All @@ -392,11 +401,15 @@ func (ts *txThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) {
return
}

tabletType := tabletStats.Target.TabletType
ts.txThrottler.healthChecksReadTotal.Add(tabletType.String(), 1)

// Monitor tablets for replication lag if they have a tablet
// type specified by the --tx_throttler_tablet_types flag.
for _, expectedTabletType := range *ts.config.tabletTypes {
if tabletStats.Target.TabletType == expectedTabletType {
if tabletType == expectedTabletType {
ts.throttler.RecordReplicationLag(time.Now(), tabletStats)
ts.txThrottler.healthChecksRecordedTotal.Add(tabletType.String(), 1)
return
}
}
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,24 @@ func TestEnabledThrottler(t *testing.T) {
})
assert.Nil(t, throttler.Open())
assert.Equal(t, int64(1), throttler.throttlerRunning.Get())
assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttler.topoWatchers.Counts())

assert.False(t, throttler.Throttle(100))
assert.Equal(t, int64(1), throttler.requestsTotal.Get())
assert.Zero(t, throttler.requestsThrottled.Get())

throttler.state.StatsUpdate(tabletStats) // This calls replication lag thing
assert.Equal(t, map[string]int64{"REPLICA": 1}, throttler.healthChecksReadTotal.Counts())
assert.Equal(t, map[string]int64{"REPLICA": 1}, throttler.healthChecksRecordedTotal.Counts())
rdonlyTabletStats := &discovery.TabletHealth{
Target: &querypb.Target{
TabletType: topodatapb.TabletType_RDONLY,
},
}
// This call should not be forwarded to the go/vt/throttler.Throttler object.
throttler.state.StatsUpdate(rdonlyTabletStats)
assert.Equal(t, map[string]int64{"REPLICA": 1, "RDONLY": 1}, throttler.healthChecksReadTotal.Counts())
assert.Equal(t, map[string]int64{"REPLICA": 1}, throttler.healthChecksRecordedTotal.Counts())
// The second throttle call should reject.
assert.True(t, throttler.Throttle(100))
assert.Equal(t, int64(2), throttler.requestsTotal.Get())
Expand All @@ -148,6 +153,7 @@ func TestEnabledThrottler(t *testing.T) {
assert.Equal(t, int64(1), throttler.requestsThrottled.Get())
throttler.Close()
assert.Zero(t, throttler.throttlerRunning.Get())
assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttler.topoWatchers.Counts())
}

func TestNewTxThrottler(t *testing.T) {
Expand Down

0 comments on commit f04827f

Please sign in to comment.