Skip to content

Commit

Permalink
Throttler: fix race conditions in Operate() termination and in tests (#…
Browse files Browse the repository at this point in the history
…14971)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Jan 18, 2024
1 parent 1b328bf commit 65e3b6e
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 87 deletions.
26 changes: 17 additions & 9 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,25 +424,28 @@ func (throttler *Throttler) IsRunning() bool {

// Enable activates the throttler probes; when enabled, the throttler responds to check queries based on
// the collected metrics.
func (throttler *Throttler) Enable() bool {
// The function returns a WaitGroup that can be used to wait for the throttler to be fully disabled, ie when
// the Operate() goroutine function terminates and caches are invalidated.
func (throttler *Throttler) Enable() *sync.WaitGroup {
throttler.enableMutex.Lock()
defer throttler.enableMutex.Unlock()

if wasEnabled := throttler.isEnabled.Swap(true); wasEnabled {
log.Infof("Throttler: already enabled")
return false
return nil
}
log.Infof("Throttler: enabling")

wg := &sync.WaitGroup{}
var ctx context.Context
ctx, throttler.cancelEnableContext = context.WithCancel(context.Background())
throttler.check.SelfChecks(ctx)
throttler.Operate(ctx)
throttler.Operate(ctx, wg)

// Make a one-time request for a lease of heartbeats
go throttler.heartbeatWriter.RequestHeartbeats()

return true
return wg
}

// Disable deactivates the probes and associated operations. When disabled, the throttler responds to check
Expand All @@ -457,10 +460,6 @@ func (throttler *Throttler) Disable() bool {
}
log.Infof("Throttler: disabling")
// _ = throttler.updateConfig(ctx, false, throttler.MetricsThreshold.Get()) // TODO(shlomi)
throttler.aggregatedMetrics.Flush()
throttler.recentApps.Flush()
throttler.nonLowPriorityAppRequestsThrottled.Flush()
// we do not flush throttler.throttledApps because this is data submitted by the user; the user expects the data to survive a disable+enable

throttler.cancelEnableContext()
return true
Expand Down Expand Up @@ -641,7 +640,7 @@ func (throttler *Throttler) isDormant() bool {

// Operate is the main entry point for the throttler operation and logic. It will
// run the probes, collect metrics, refresh inventory, etc.
func (throttler *Throttler) Operate(ctx context.Context) {
func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) {
tickers := [](*timer.SuspendableTicker){}
addTicker := func(d time.Duration) *timer.SuspendableTicker {
t := timer.NewSuspendableTicker(d, false)
Expand All @@ -656,7 +655,16 @@ func (throttler *Throttler) Operate(ctx context.Context) {
throttledAppsTicker := addTicker(throttler.throttledAppsSnapshotInterval)
recentCheckTicker := addTicker(time.Second)

wg.Add(1)
go func() {
defer func() {
throttler.aggregatedMetrics.Flush()
throttler.recentApps.Flush()
throttler.nonLowPriorityAppRequestsThrottled.Flush()
wg.Done()
}()
// we do not flush throttler.throttledApps because this is data submitted by the user; the user expects the data to survive a disable+enable

defer log.Infof("Throttler: Operate terminated, tickers stopped")
for _, t := range tickers {
defer t.Stop()
Expand Down

This file was deleted.

55 changes: 53 additions & 2 deletions go/vt/vttablet/tabletserver/throttle/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,16 @@ func runThrottler(t *testing.T, throttler *Throttler, timeout time.Duration, f f
assert.True(t, throttler.IsOpen())
assert.False(t, throttler.IsEnabled())

ok := throttler.Enable()
wg := throttler.Enable()
require.NotNil(t, wg)
defer wg.Wait()
defer throttler.Disable()
assert.True(t, ok)
assert.True(t, throttler.IsEnabled())

// Enabling again does nothing:
wg2 := throttler.Enable()
assert.Nil(t, wg2)

if f != nil {
time.Sleep(timeout / 2)
f(t)
Expand Down Expand Up @@ -382,3 +387,49 @@ func TestProbesWhileOperating(t *testing.T) {
})
})
}

// TestProbesPostDisable runs the throttler for some time, and then investigates the internal throttler maps and values.
func TestProbesPostDisable(t *testing.T) {
throttler := newTestThrottler()
runThrottler(t, throttler, 2*time.Second, nil)

probes := throttler.mysqlInventory.ClustersProbes
assert.NotEmpty(t, probes)

selfProbes := probes[selfStoreName]
t.Run("self", func(t *testing.T) {
assert.NotEmpty(t, selfProbes)
require.Equal(t, 1, len(selfProbes)) // should always be true once refreshMySQLInventory() runs
probe, ok := selfProbes[""]
assert.True(t, ok)
assert.NotNil(t, probe)

assert.Equal(t, "", probe.Alias)
assert.Nil(t, probe.Tablet)
assert.Equal(t, "select 1", probe.MetricQuery)
assert.Zero(t, atomic.LoadInt64(&probe.QueryInProgress))
})

shardProbes := probes[shardStoreName]
t.Run("shard", func(t *testing.T) {
assert.NotEmpty(t, shardProbes)
assert.Equal(t, 2, len(shardProbes)) // see fake FindAllTabletAliasesInShard above
for _, probe := range shardProbes {
require.NotNil(t, probe)
assert.NotEmpty(t, probe.Alias)
assert.NotNil(t, probe.Tablet)
assert.Equal(t, "select 1", probe.MetricQuery)
assert.Zero(t, atomic.LoadInt64(&probe.QueryInProgress))
}
})

t.Run("metrics", func(t *testing.T) {
assert.Equal(t, 3, len(throttler.mysqlInventory.TabletMetrics)) // 1 self tablet + 2 shard tablets
})

t.Run("aggregated", func(t *testing.T) {
assert.Zero(t, throttler.aggregatedMetrics.ItemCount()) // flushed upon Disable()
aggr := throttler.aggregatedMetricsSnapshot()
assert.Empty(t, aggr)
})
}

0 comments on commit 65e3b6e

Please sign in to comment.