From fe2bdd84e075385a6fbe73665f24661983302143 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 18 Jan 2024 14:51:20 +0100 Subject: [PATCH 1/8] Limit concurrency of .AddTablet() calls in topology watcher Signed-off-by: Tim Vaillancourt --- go/vt/discovery/topology_watcher.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index e91925e386a..87e49cb9486 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -230,9 +230,11 @@ func (tw *TopologyWatcher) loadTablets() { topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1) } } else { + tw.sem <- 1 // Wait for active queue to drain. // This is a new tablet record, let's add it to the healthcheck tw.tabletRecorder.AddTablet(newVal.tablet) topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1) + <-tw.sem // Done; enable next request to run } } From d55c5c2b9f9e31b48974a1a1767ddc143811d6b9 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 18 Jan 2024 15:06:49 +0100 Subject: [PATCH 2/8] Use a new flag Signed-off-by: Tim Vaillancourt --- go/vt/discovery/healthcheck.go | 4 +++- go/vt/discovery/topology_watcher.go | 12 +++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index ad93336e98e..cd45735003f 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -81,6 +81,8 @@ var ( refreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes") // topoReadConcurrency tells us how many topo reads are allowed in parallel topoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads") + // tabletDiscoveryConcurrency tells us how many tablets can be discovered in parallel + tabletDiscoveryConcurrency = flag.Int("tablet_discovery_concurrency", 1024, "concurrent tablet discoveries") ) // See the documentation for NewHealthCheck below for an explanation of these parameters. @@ -324,7 +326,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur } else if len(KeyspacesToWatch) > 0 { filter = NewFilterByKeyspace(KeyspacesToWatch) } - topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency)) + topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency, *tabletDiscoveryConcurrency)) } hc.topoWatchers = topoWatchers diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 87e49cb9486..641d2c14b2f 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -73,6 +73,7 @@ type TopologyWatcher struct { refreshKnownTablets bool getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) sem chan int + tabletDiscoverySem chan int ctx context.Context cancelFunc context.CancelFunc // wg keeps track of all launched Go routines. @@ -94,7 +95,7 @@ type TopologyWatcher struct { // NewTopologyWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and starts refreshing. -func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher { +func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency, tabletDiscoveryConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, tabletRecorder: tr, @@ -104,6 +105,7 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletR refreshKnownTablets: refreshKnownTablets, getTablets: getTablets, sem: make(chan int, topoReadConcurrency), + tabletDiscoverySem: make(chan int, tabletDiscoveryConcurrency), tablets: make(map[string]*tabletInfo), } tw.firstLoadChan = make(chan struct{}) @@ -116,8 +118,8 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletR // NewCellTabletsWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and starts refreshing. -func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { - return NewTopologyWatcher(ctx, topoServer, tr, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) { +func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency, tabletDiscoveryConcurrency int) *TopologyWatcher { + return NewTopologyWatcher(ctx, topoServer, tr, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, tabletDiscoveryConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) { return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell) }) } @@ -230,11 +232,11 @@ func (tw *TopologyWatcher) loadTablets() { topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1) } } else { - tw.sem <- 1 // Wait for active queue to drain. + tw.tabletDiscoverySem <- 1 // Wait for active queue to drain. // This is a new tablet record, let's add it to the healthcheck tw.tabletRecorder.AddTablet(newVal.tablet) topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1) - <-tw.sem // Done; enable next request to run + <-tw.tabletDiscoverySem // Done; enable next request to run } } From 45e249f6e0017ae21889b3f32c489cc27d7173f2 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 18 Jan 2024 15:20:21 +0100 Subject: [PATCH 3/8] Fix vtgate.txt Signed-off-by: Tim Vaillancourt --- go/flags/endtoend/vtgate.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 95e015c91dd..95f7c89272f 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -313,6 +313,8 @@ Usage of vtgate: the number of bytes sent from vtgate for each stream call. It's recommended to keep this value in sync with vttablet's query-server-config-stream-buffer-size. (default 32768) --structured-logging whether to use structured logging (Zap) or the original (glog) logger + --tablet_discovery_concurrency int + concurrent tablet discoveries (default 128) --tablet_filters value Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch --tablet_grpc_ca string From 91ebfc22313babebbd8732c64ac8185898a3f0b9 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 18 Jan 2024 15:21:55 +0100 Subject: [PATCH 4/8] Fix vtgate.txt again Signed-off-by: Tim Vaillancourt --- go/flags/endtoend/vtgate.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 95f7c89272f..f5331414339 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -314,7 +314,7 @@ Usage of vtgate: --structured-logging whether to use structured logging (Zap) or the original (glog) logger --tablet_discovery_concurrency int - concurrent tablet discoveries (default 128) + concurrent tablet discoveries (default 1024) --tablet_filters value Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch --tablet_grpc_ca string From 8953a309d5d35871a97fa30cf634a982e9b20050 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 18 Jan 2024 15:31:13 +0100 Subject: [PATCH 5/8] Fix unit test Signed-off-by: Tim Vaillancourt --- go/vt/discovery/topology_watcher_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index dff8ba720c7..45d8b7a6fef 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -59,7 +59,7 @@ func TestStartAndCloseTopoWatcher(t *testing.T) { ts := memorytopo.NewServer("aa") fhc := NewFakeHealthCheck(nil) topologyWatcherOperations.ZeroAll() - tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5) + tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5, 5) done := make(chan bool, 3) result := make(chan bool, 1) @@ -115,7 +115,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { logger := logutil.NewMemoryLogger() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() - tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5) + tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5, 5) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0) @@ -432,7 +432,7 @@ func TestFilterByKeyspace(t *testing.T) { hc := NewFakeHealthCheck(nil) f := NewFilterByKeyspace(testKeyspacesToWatch) ts := memorytopo.NewServer(testCell) - tw := NewCellTabletsWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5) + tw := NewCellTabletsWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5, 5) for _, test := range testFilterByKeyspace { // Add a new tablet to the topology. @@ -514,7 +514,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() f := NewFilterByKeyspace(testKeyspacesToWatch) - tw := NewCellTabletsWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5) + tw := NewCellTabletsWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5, 5) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0) From 3987f1c516d4539678349600e474a89f61617a85 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 18 Jan 2024 15:43:22 +0100 Subject: [PATCH 6/8] go mod tidy Signed-off-by: Tim Vaillancourt --- go.sum | 1 - 1 file changed, 1 deletion(-) diff --git a/go.sum b/go.sum index 6ce1df5e358..cc300ba3899 100644 --- a/go.sum +++ b/go.sum @@ -169,7 +169,6 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/cyphar/filepath-securejoin v0.2.3 h1:YX6ebbZCZP7VkM3scTTokDgBL2TY741X51MTk3ycuNI= github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg= github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= From 115238f1febc1d059b7cfb2e8ac6ba688131eb0b Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 18 Jan 2024 15:57:26 +0100 Subject: [PATCH 7/8] Fix e2e flag test Signed-off-by: Tim Vaillancourt --- go/flags/endtoend/vttablet.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index a09da5b87bc..af357ee474f 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -809,6 +809,8 @@ Usage of vttablet: YAML file config for tablet --tablet_dir string The directory within the vtdataroot to store vttablet/mysql files. Defaults to being generated by the tablet uid. + --tablet_discovery_concurrency int + concurrent tablet discoveries (default 1024) --tablet_filters value Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch --tablet_grpc_ca string From 6331043ac6baaff45fb190a448d24a80b6cc8a67 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 19 Jan 2024 19:25:32 +0100 Subject: [PATCH 8/8] Move semaphore to `HealthCheckImpl` Signed-off-by: Tim Vaillancourt --- go/flags/endtoend/vtgate.txt | 4 ++-- go/flags/endtoend/vttablet.txt | 4 ++-- go/vt/discovery/healthcheck.go | 11 ++++++++--- go/vt/discovery/topology_watcher.go | 10 +++------- go/vt/discovery/topology_watcher_test.go | 8 ++++---- 5 files changed, 19 insertions(+), 18 deletions(-) diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index f5331414339..c9a92b5c6b9 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -121,6 +121,8 @@ Usage of vtgate: gRPC server permit client keepalive pings even when there are no active streams (RPCs) --grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal. + --healthcheck_concurrency int + concurrent healthchecks (default 32) --healthcheck_retry_delay duration health check retry delay (default 2ms) --healthcheck_timeout duration @@ -313,8 +315,6 @@ Usage of vtgate: the number of bytes sent from vtgate for each stream call. It's recommended to keep this value in sync with vttablet's query-server-config-stream-buffer-size. (default 32768) --structured-logging whether to use structured logging (Zap) or the original (glog) logger - --tablet_discovery_concurrency int - concurrent tablet discoveries (default 1024) --tablet_filters value Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch --tablet_grpc_ca string diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index af357ee474f..6d48e412ec6 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -487,6 +487,8 @@ Usage of vttablet: gRPC server permit client keepalive pings even when there are no active streams (RPCs) --health_check_interval duration Interval between health checks (default 20s) + --healthcheck_concurrency int + concurrent healthchecks (default 32) --heartbeat_enable If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the table _vt.heartbeat. The result is used to inform the serving state of the vttablet via healthchecks. --heartbeat_interval duration @@ -809,8 +811,6 @@ Usage of vttablet: YAML file config for tablet --tablet_dir string The directory within the vtdataroot to store vttablet/mysql files. Defaults to being generated by the tablet uid. - --tablet_discovery_concurrency int - concurrent tablet discoveries (default 1024) --tablet_filters value Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch --tablet_grpc_ca string diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index cd45735003f..acd29d7a8e6 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -81,8 +81,8 @@ var ( refreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes") // topoReadConcurrency tells us how many topo reads are allowed in parallel topoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads") - // tabletDiscoveryConcurrency tells us how many tablets can be discovered in parallel - tabletDiscoveryConcurrency = flag.Int("tablet_discovery_concurrency", 1024, "concurrent tablet discoveries") + // healthCheckConcurrency tells us how many tablets can be healthchecked in parallel + healthCheckConcurrency = flag.Int("healthcheck_concurrency", 32, "concurrent healthchecks") ) // See the documentation for NewHealthCheck below for an explanation of these parameters. @@ -262,6 +262,8 @@ type HealthCheckImpl struct { subMu sync.Mutex // subscribers subscribers map[chan *TabletHealth]struct{} + // healthCheckSem + healthCheckSem chan int } // NewHealthCheck creates a new HealthCheck object. @@ -296,6 +298,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur cell: localCell, retryDelay: retryDelay, healthCheckTimeout: healthCheckTimeout, + healthCheckSem: make(chan int, *healthCheckConcurrency), healthByAlias: make(map[tabletAliasString]*tabletHealthCheck), healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth), healthy: make(map[KeyspaceShardTabletType][]*TabletHealth), @@ -326,7 +329,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur } else if len(KeyspacesToWatch) > 0 { filter = NewFilterByKeyspace(KeyspacesToWatch) } - topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency, *tabletDiscoveryConcurrency)) + topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency)) } hc.topoWatchers = topoWatchers @@ -386,9 +389,11 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) { } hc.healthData[key][tabletAliasString(tabletAlias)] = res + hc.healthCheckSem <- 1 // Wait for active queue to drain. hc.broadcast(res) hc.connsWG.Add(1) go thc.checkConn(hc) + <-hc.healthCheckSem } // RemoveTablet removes the tablet, and stops the health check. diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 641d2c14b2f..e91925e386a 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -73,7 +73,6 @@ type TopologyWatcher struct { refreshKnownTablets bool getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) sem chan int - tabletDiscoverySem chan int ctx context.Context cancelFunc context.CancelFunc // wg keeps track of all launched Go routines. @@ -95,7 +94,7 @@ type TopologyWatcher struct { // NewTopologyWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and starts refreshing. -func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency, tabletDiscoveryConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher { +func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, tabletRecorder: tr, @@ -105,7 +104,6 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletR refreshKnownTablets: refreshKnownTablets, getTablets: getTablets, sem: make(chan int, topoReadConcurrency), - tabletDiscoverySem: make(chan int, tabletDiscoveryConcurrency), tablets: make(map[string]*tabletInfo), } tw.firstLoadChan = make(chan struct{}) @@ -118,8 +116,8 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletR // NewCellTabletsWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and starts refreshing. -func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency, tabletDiscoveryConcurrency int) *TopologyWatcher { - return NewTopologyWatcher(ctx, topoServer, tr, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, tabletDiscoveryConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) { +func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { + return NewTopologyWatcher(ctx, topoServer, tr, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) { return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell) }) } @@ -232,11 +230,9 @@ func (tw *TopologyWatcher) loadTablets() { topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1) } } else { - tw.tabletDiscoverySem <- 1 // Wait for active queue to drain. // This is a new tablet record, let's add it to the healthcheck tw.tabletRecorder.AddTablet(newVal.tablet) topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1) - <-tw.tabletDiscoverySem // Done; enable next request to run } } diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 45d8b7a6fef..dff8ba720c7 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -59,7 +59,7 @@ func TestStartAndCloseTopoWatcher(t *testing.T) { ts := memorytopo.NewServer("aa") fhc := NewFakeHealthCheck(nil) topologyWatcherOperations.ZeroAll() - tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5, 5) + tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5) done := make(chan bool, 3) result := make(chan bool, 1) @@ -115,7 +115,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { logger := logutil.NewMemoryLogger() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() - tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5, 5) + tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0) @@ -432,7 +432,7 @@ func TestFilterByKeyspace(t *testing.T) { hc := NewFakeHealthCheck(nil) f := NewFilterByKeyspace(testKeyspacesToWatch) ts := memorytopo.NewServer(testCell) - tw := NewCellTabletsWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5, 5) + tw := NewCellTabletsWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5) for _, test := range testFilterByKeyspace { // Add a new tablet to the topology. @@ -514,7 +514,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() f := NewFilterByKeyspace(testKeyspacesToWatch) - tw := NewCellTabletsWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5, 5) + tw := NewCellTabletsWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0)