Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tablet throttler: remove cached metric associated with removed tablet #16555

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,7 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) {
primaryStimulatorRateLimiter.Stop()
throttler.aggregatedMetrics.Flush()
throttler.recentApps.Flush()
clear(throttler.inventory.TabletMetrics)
}()
// we do not flush throttler.throttledApps because this is data submitted by the user; the user expects the data to survive a disable+enable

Expand Down Expand Up @@ -842,7 +843,7 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) {
}
case probes := <-throttler.clusterProbesChan:
// incoming structural update, sparse, as result of refreshInventory()
throttler.updateClusterProbes(ctx, probes)
throttler.updateClusterProbes(probes)
case <-metricsAggregateTicker.C:
if throttler.IsOpen() {
throttler.aggregateMetrics()
Expand Down Expand Up @@ -1116,10 +1117,25 @@ func (throttler *Throttler) refreshInventory(ctx context.Context) error {
}

// synchronous update of inventory
func (throttler *Throttler) updateClusterProbes(ctx context.Context, clusterProbes *base.ClusterProbes) error {
func (throttler *Throttler) updateClusterProbes(clusterProbes *base.ClusterProbes) error {
throttler.inventory.ClustersProbes = clusterProbes.TabletProbes
throttler.inventory.IgnoreHostsCount = clusterProbes.IgnoreHostsCount
throttler.inventory.IgnoreHostsThreshold = clusterProbes.IgnoreHostsThreshold

for alias := range throttler.inventory.TabletMetrics {
if alias == "" {
// *this* tablet uses the empty alias to identify itself.
continue
}
if _, found := clusterProbes.TabletProbes[alias]; !found {
// There seems to be a metric stored for some alias, say zone1-0000000102,
// but there is no alias for this probe in the new clusterProbes. This
// suggests that the corresponding tablet has been removed, or its type was changed
// (e.g. from REPLICA to RDONLY). We should therefore remove this cached metric.
delete(throttler.inventory.TabletMetrics, alias)
}
}

return nil
}

Expand Down
81 changes: 75 additions & 6 deletions go/vt/vttablet/tabletserver/throttle/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ var (
Value: 5.1,
},
}
nonPrimaryTabletType atomic.Int32
)

const (
Expand Down Expand Up @@ -151,7 +152,11 @@ type FakeTopoServer struct {
func (ts *FakeTopoServer) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error) {
tabletType := topodatapb.TabletType_PRIMARY
if alias.Uid != 100 {
tabletType = topodatapb.TabletType_REPLICA
val := topodatapb.TabletType(nonPrimaryTabletType.Load())
if val == topodatapb.TabletType_UNKNOWN {
val = topodatapb.TabletType_REPLICA
}
tabletType = val
}
tablet := &topo.TabletInfo{
Tablet: &topodatapb.Tablet{
Expand Down Expand Up @@ -1156,9 +1161,9 @@ func TestRefreshInventory(t *testing.T) {
// validateProbesCount expects number of probes according to cluster name and throttler's leadership status
validateProbesCount := func(t *testing.T, probes base.Probes) {
if throttler.isLeader.Load() {
assert.Equal(t, 3, len(probes))
assert.Len(t, probes, 3)
} else {
assert.Equal(t, 1, len(probes))
assert.Len(t, probes, 1)
}
}
t.Run("waiting for probes", func(t *testing.T) {
Expand All @@ -1171,7 +1176,7 @@ func TestRefreshInventory(t *testing.T) {
// not run, and therefore there is none but us to both populate `clusterProbesChan` as well as
// read from it. We do not compete here with any other goroutine.
assert.NotNil(t, probes)
throttler.updateClusterProbes(ctx, probes)
throttler.updateClusterProbes(probes)
validateProbesCount(t, probes.TabletProbes)
// Achieved our goal
return
Expand Down Expand Up @@ -1488,6 +1493,70 @@ func TestProbesWhileOperating(t *testing.T) {
})
})
})

t.Run("metrics", func(t *testing.T) {
var results base.TabletResultMap
<-runSerialFunction(t, ctx, throttler, func(ctx context.Context) {
results = maps.Clone(throttler.inventory.TabletMetrics)
})
assert.Len(t, results, 3) // 1 self tablet + 2 shard tablets
assert.Contains(t, results, "", "TabletMetrics: %+v", results) // primary self identifies with empty alias
assert.Contains(t, results, "fakezone1-0000000101", "TabletMetrics: %+v", results)
assert.Contains(t, results, "fakezone2-0000000102", "TabletMetrics: %+v", results)
})

t.Run("no REPLICA probes", func(t *testing.T) {
nonPrimaryTabletType.Store(int32(topodatapb.TabletType_RDONLY))
defer nonPrimaryTabletType.Store(int32(topodatapb.TabletType_REPLICA))

t.Run("waiting for inventory metrics", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, waitForProbesTimeout)
defer cancel()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
var results base.TabletResultMap
<-runSerialFunction(t, ctx, throttler, func(ctx context.Context) {
results = maps.Clone(throttler.inventory.TabletMetrics)
})
if len(results) == 1 {
// That's what we were waiting for. Good.
assert.Contains(t, results, "", "TabletMetrics: %+v", results) // primary self identifies with empty alias
return
}

select {
case <-ticker.C:
case <-ctx.Done():
assert.FailNowf(t, ctx.Err().Error(), "waiting for inventory metrics")
}
}
})
})
t.Run("again with probes", func(t *testing.T) {
t.Run("waiting for inventory metrics", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, waitForProbesTimeout)
defer cancel()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
var results base.TabletResultMap
<-runSerialFunction(t, ctx, throttler, func(ctx context.Context) {
results = maps.Clone(throttler.inventory.TabletMetrics)
})
if len(results) == 3 {
// That's what we were waiting for. Good.
return
}

select {
case <-ticker.C:
case <-ctx.Done():
assert.FailNowf(t, ctx.Err().Error(), "waiting for inventory metrics")
}
}
})
})
})
}

Expand Down Expand Up @@ -1603,7 +1672,7 @@ func TestProbesPostDisable(t *testing.T) {
})

t.Run("metrics", func(t *testing.T) {
assert.Equal(t, 3, len(throttler.inventory.TabletMetrics)) // 1 self tablet + 2 shard tablets
assert.Empty(t, throttler.inventory.TabletMetrics) // map has been cleared
})

t.Run("aggregated", func(t *testing.T) {
Expand Down Expand Up @@ -2103,7 +2172,7 @@ func TestReplica(t *testing.T) {
defer throttler.appCheckedMetrics.Delete(testAppName.String())
checkResult := throttler.Check(ctx, testAppName.String(), nil, flags)
require.NotNil(t, checkResult)
assert.Equal(t, 3, len(checkResult.Metrics))
assert.Len(t, checkResult.Metrics, 3)
})
t.Run("client, OK", func(t *testing.T) {
client := NewBackgroundClient(throttler, throttlerapp.TestingName, base.UndefinedScope)
Expand Down
Loading