diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index f5331414339..c9a92b5c6b9 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -121,6 +121,8 @@ Usage of vtgate: gRPC server permit client keepalive pings even when there are no active streams (RPCs) --grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal. + --healthcheck_concurrency int + concurrent healthchecks (default 32) --healthcheck_retry_delay duration health check retry delay (default 2ms) --healthcheck_timeout duration @@ -313,8 +315,6 @@ Usage of vtgate: the number of bytes sent from vtgate for each stream call. It's recommended to keep this value in sync with vttablet's query-server-config-stream-buffer-size. (default 32768) --structured-logging whether to use structured logging (Zap) or the original (glog) logger - --tablet_discovery_concurrency int - concurrent tablet discoveries (default 1024) --tablet_filters value Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch --tablet_grpc_ca string diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index af357ee474f..6d48e412ec6 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -487,6 +487,8 @@ Usage of vttablet: gRPC server permit client keepalive pings even when there are no active streams (RPCs) --health_check_interval duration Interval between health checks (default 20s) + --healthcheck_concurrency int + concurrent healthchecks (default 32) --heartbeat_enable If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the table _vt.heartbeat. The result is used to inform the serving state of the vttablet via healthchecks. --heartbeat_interval duration @@ -809,8 +811,6 @@ Usage of vttablet: YAML file config for tablet --tablet_dir string The directory within the vtdataroot to store vttablet/mysql files. Defaults to being generated by the tablet uid. - --tablet_discovery_concurrency int - concurrent tablet discoveries (default 1024) --tablet_filters value Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch --tablet_grpc_ca string diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index cd45735003f..acd29d7a8e6 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -81,8 +81,8 @@ var ( refreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes") // topoReadConcurrency tells us how many topo reads are allowed in parallel topoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads") - // tabletDiscoveryConcurrency tells us how many tablets can be discovered in parallel - tabletDiscoveryConcurrency = flag.Int("tablet_discovery_concurrency", 1024, "concurrent tablet discoveries") + // healthCheckConcurrency tells us how many tablets can be healthchecked in parallel + healthCheckConcurrency = flag.Int("healthcheck_concurrency", 32, "concurrent healthchecks") ) // See the documentation for NewHealthCheck below for an explanation of these parameters. @@ -262,6 +262,8 @@ type HealthCheckImpl struct { subMu sync.Mutex // subscribers subscribers map[chan *TabletHealth]struct{} + // healthCheckSem + healthCheckSem chan int } // NewHealthCheck creates a new HealthCheck object. @@ -296,6 +298,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur cell: localCell, retryDelay: retryDelay, healthCheckTimeout: healthCheckTimeout, + healthCheckSem: make(chan int, *healthCheckConcurrency), healthByAlias: make(map[tabletAliasString]*tabletHealthCheck), healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth), healthy: make(map[KeyspaceShardTabletType][]*TabletHealth), @@ -326,7 +329,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur } else if len(KeyspacesToWatch) > 0 { filter = NewFilterByKeyspace(KeyspacesToWatch) } - topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency, *tabletDiscoveryConcurrency)) + topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency)) } hc.topoWatchers = topoWatchers @@ -386,9 +389,11 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) { } hc.healthData[key][tabletAliasString(tabletAlias)] = res + hc.healthCheckSem <- 1 // Wait for active queue to drain. hc.broadcast(res) hc.connsWG.Add(1) go thc.checkConn(hc) + <-hc.healthCheckSem } // RemoveTablet removes the tablet, and stops the health check. diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 641d2c14b2f..e91925e386a 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -73,7 +73,6 @@ type TopologyWatcher struct { refreshKnownTablets bool getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) sem chan int - tabletDiscoverySem chan int ctx context.Context cancelFunc context.CancelFunc // wg keeps track of all launched Go routines. @@ -95,7 +94,7 @@ type TopologyWatcher struct { // NewTopologyWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and starts refreshing. -func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency, tabletDiscoveryConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher { +func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, tabletRecorder: tr, @@ -105,7 +104,6 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletR refreshKnownTablets: refreshKnownTablets, getTablets: getTablets, sem: make(chan int, topoReadConcurrency), - tabletDiscoverySem: make(chan int, tabletDiscoveryConcurrency), tablets: make(map[string]*tabletInfo), } tw.firstLoadChan = make(chan struct{}) @@ -118,8 +116,8 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletR // NewCellTabletsWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and starts refreshing. -func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency, tabletDiscoveryConcurrency int) *TopologyWatcher { - return NewTopologyWatcher(ctx, topoServer, tr, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, tabletDiscoveryConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) { +func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { + return NewTopologyWatcher(ctx, topoServer, tr, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) { return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell) }) } @@ -232,11 +230,9 @@ func (tw *TopologyWatcher) loadTablets() { topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1) } } else { - tw.tabletDiscoverySem <- 1 // Wait for active queue to drain. // This is a new tablet record, let's add it to the healthcheck tw.tabletRecorder.AddTablet(newVal.tablet) topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1) - <-tw.tabletDiscoverySem // Done; enable next request to run } } diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 45d8b7a6fef..dff8ba720c7 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -59,7 +59,7 @@ func TestStartAndCloseTopoWatcher(t *testing.T) { ts := memorytopo.NewServer("aa") fhc := NewFakeHealthCheck(nil) topologyWatcherOperations.ZeroAll() - tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5, 5) + tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5) done := make(chan bool, 3) result := make(chan bool, 1) @@ -115,7 +115,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { logger := logutil.NewMemoryLogger() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() - tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5, 5) + tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0) @@ -432,7 +432,7 @@ func TestFilterByKeyspace(t *testing.T) { hc := NewFakeHealthCheck(nil) f := NewFilterByKeyspace(testKeyspacesToWatch) ts := memorytopo.NewServer(testCell) - tw := NewCellTabletsWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5, 5) + tw := NewCellTabletsWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5) for _, test := range testFilterByKeyspace { // Add a new tablet to the topology. @@ -514,7 +514,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() f := NewFilterByKeyspace(testKeyspacesToWatch) - tw := NewCellTabletsWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5, 5) + tw := NewCellTabletsWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0)