Skip to content

Commit

Permalink
Tablet throttler: empty list of probes on non-leader (#13926)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
Co-authored-by: Matt Lord <[email protected]>
  • Loading branch information
shlomi-noach and mattlord authored Sep 11, 2023
1 parent 974579d commit 7aed9ed
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 4 deletions.
24 changes: 20 additions & 4 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ const (
ThrottleCheckSelf
)

// throttlerTopoService represents the functionality we expect from a TopoServer, abstracted so that
// it can be mocked in unit tests
type throttlerTopoService interface {
GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error)
FindAllTabletAliasesInShard(ctx context.Context, keyspace, shard string) ([]*topodatapb.TabletAlias, error)
GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error)
}

// Throttler is the main entity in the throttling mechanism. This service runs, probes, collects data,
// aggregates, reads inventory, provides information, etc.
type Throttler struct {
Expand All @@ -125,7 +133,7 @@ type Throttler struct {
env tabletenv.Env
pool *connpool.Pool
tabletTypeFunc func() topodatapb.TabletType
ts *topo.Server
ts throttlerTopoService
srvTopoServer srvtopo.Server
heartbeatWriter heartbeat.HeartbeatWriter

Expand Down Expand Up @@ -602,10 +610,10 @@ func (throttler *Throttler) Operate(ctx context.Context) {
recentCheckTicker := addTicker(time.Second)

tmClient := tmclient.NewTabletManagerClient()
defer tmClient.Close()

go func() {
defer log.Infof("Throttler: Operate terminated, tickers stopped")
defer tmClient.Close()
for _, t := range tickers {
defer t.Stop()
// since we just started the tickers now, speed up the ticks by forcing an immediate tick
Expand Down Expand Up @@ -786,8 +794,10 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context, tmClient tm

var throttleMetricFunc func() *mysql.MySQLThrottleMetric
if clusterName == selfStoreName {
// Throttler is probing its own tablet's metrics:
throttleMetricFunc = throttler.generateSelfMySQLThrottleMetricFunc(ctx, probe)
} else {
// Throttler probing other tablets:
throttleMetricFunc = throttler.generateTabletHTTPProbeFunction(ctx, tmClient, clusterName, probe)
}
throttleMetrics := mysql.ReadThrottleMetric(probe, clusterName, throttleMetricFunc)
Expand All @@ -801,7 +811,6 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context, tmClient tm

// refreshMySQLInventory will re-structure the inventory based on reading config settings
func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error {

// distribute the query/threshold from the throttler down to the cluster settings and from there to the probes
metricsQuery := throttler.GetMetricsQuery()
metricsThreshold := throttler.MetricsThreshold.Load()
Expand Down Expand Up @@ -844,13 +853,20 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error {
}

if clusterName == selfStoreName {
// special case: just looking at this tablet's MySQL server
// special case: just looking at this tablet's MySQL server.
// We will probe this "cluster" (of one server) is a special way.
addInstanceKey(nil, "", 0, mysql.SelfInstanceKey, clusterName, clusterSettings, clusterProbes.InstanceProbes)
throttler.mysqlClusterProbesChan <- clusterProbes
return
}
if !throttler.isLeader.Load() {
// This tablet may have used to be the primary, but it isn't now. It may have a recollection
// of previous clusters it used to probe. It may have recollection of specific probes for such clusters.
// This now ensures any existing cluster probes are overrridden with an empty list of probes.
// `clusterProbes` was created above as empty, and identificable via `clusterName`. This will in turn
// be used to overwrite throttler.mysqlInventory.ClustersProbes[clusterProbes.ClusterName] in
// updateMySQLClusterProbes().
throttler.mysqlClusterProbesChan <- clusterProbes
// not the leader (primary tablet)? Then no more work for us.
return
}
Expand Down
144 changes: 144 additions & 0 deletions go/vt/vttablet/tabletserver/throttle/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,57 @@
package throttle

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/patrickmn/go-cache"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/config"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/mysql"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

const (
waitForProbesTimeout = 30 * time.Second
)

type FakeTopoServer struct {
}

func (ts *FakeTopoServer) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error) {
tablet := &topo.TabletInfo{
Tablet: &topodatapb.Tablet{
Alias: alias,
Hostname: "127.0.0.1",
MysqlHostname: "127.0.0.1",
MysqlPort: 3306,
PortMap: map[string]int32{"vt": 5000},
Type: topodatapb.TabletType_REPLICA,
},
}
return tablet, nil
}

func (ts *FakeTopoServer) FindAllTabletAliasesInShard(ctx context.Context, keyspace, shard string) ([]*topodatapb.TabletAlias, error) {
aliases := []*topodatapb.TabletAlias{
{Cell: "zone1", Uid: 100},
{Cell: "zone2", Uid: 101},
}
return aliases, nil
}

func (ts *FakeTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) {
ks := &topodatapb.SrvKeyspace{}
return ks, nil
}

type FakeHeartbeatWriter struct {
}

Expand Down Expand Up @@ -50,6 +94,7 @@ func TestIsAppThrottled(t *testing.T) {
}

func TestIsAppExempted(t *testing.T) {

throttler := Throttler{
throttledApps: cache.New(cache.NoExpiration, 0),
heartbeatWriter: FakeHeartbeatWriter{},
Expand All @@ -75,3 +120,102 @@ func TestIsAppExempted(t *testing.T) {
throttler.UnthrottleApp("schema-tracker") // meaningless. App is statically exempted
assert.True(t, throttler.IsAppExempted("schema-tracker"))
}

// TestRefreshMySQLInventory tests the behavior of the throttler's RefreshMySQLInventory() function, which
// is called periodically in actual throttler. For a given cluster name, it generates a list of probes
// the throttler will use to check metrics.
// On a "self" cluster, that list is expect to probe the tablet itself.
// On any other cluster, the list is expected to be empty if non-leader (only leader throttler, on a
// `PRIMARY` tablet, probes other tablets). On the leader, the list is expected to be non-empty.
func TestRefreshMySQLInventory(t *testing.T) {
metricsQuery := "select 1"
config.Settings().Stores.MySQL.Clusters = map[string]*config.MySQLClusterConfigurationSettings{
selfStoreName: {},
"ks1": {},
"ks2": {},
}
clusters := config.Settings().Stores.MySQL.Clusters
for _, s := range clusters {
s.MetricQuery = metricsQuery
s.ThrottleThreshold = &atomic.Uint64{}
s.ThrottleThreshold.Store(1)
}

throttler := &Throttler{
mysqlClusterProbesChan: make(chan *mysql.ClusterProbes),
mysqlClusterThresholds: cache.New(cache.NoExpiration, 0),
ts: &FakeTopoServer{},
mysqlInventory: mysql.NewInventory(),
}
throttler.metricsQuery.Store(metricsQuery)
throttler.initThrottleTabletTypes()

validateClusterProbes := func(t *testing.T, ctx context.Context) {
testName := fmt.Sprintf("leader=%t", throttler.isLeader.Load())
t.Run(testName, func(t *testing.T) {
// validateProbesCount expectes number of probes according to cluster name and throttler's leadership status
validateProbesCount := func(t *testing.T, clusterName string, probes *mysql.Probes) {
if clusterName == selfStoreName {
assert.Equal(t, 1, len(*probes))
} else if throttler.isLeader.Load() {
assert.NotZero(t, len(*probes))
} else {
assert.Empty(t, *probes)
}
}
t.Run("waiting for probes", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, waitForProbesTimeout)
defer cancel()
numClusterProbesResults := 0
for {
select {
case probes := <-throttler.mysqlClusterProbesChan:
// Worth noting that in this unit test, the throttler is _closed_. Its own Operate() function does
// not run, and therefore there is none but us to both populate `mysqlClusterProbesChan` as well as
// read from it. We do not compete here with any other goroutine.
assert.NotNil(t, probes)

throttler.updateMySQLClusterProbes(ctx, probes)

numClusterProbesResults++
validateProbesCount(t, probes.ClusterName, probes.InstanceProbes)

if numClusterProbesResults == len(clusters) {
// Achieved our goal
return
}
case <-ctx.Done():
assert.FailNowf(t, ctx.Err().Error(), "waiting for %d cluster probes", len(clusters))
}
}
})
t.Run("validating probes", func(t *testing.T) {
for clusterName := range clusters {
probes, ok := throttler.mysqlInventory.ClustersProbes[clusterName]
require.True(t, ok)
validateProbesCount(t, clusterName, probes)
}
})
})
}
//
ctx := context.Background()

t.Run("initial, not leader", func(t *testing.T) {
throttler.isLeader.Store(false)
throttler.refreshMySQLInventory(ctx)
validateClusterProbes(t, ctx)
})

t.Run("promote", func(t *testing.T) {
throttler.isLeader.Store(true)
throttler.refreshMySQLInventory(ctx)
validateClusterProbes(t, ctx)
})

t.Run("demote, expect cleanup", func(t *testing.T) {
throttler.isLeader.Store(false)
throttler.refreshMySQLInventory(ctx)
validateClusterProbes(t, ctx)
})
}

0 comments on commit 7aed9ed

Please sign in to comment.