From b58079449ec8a1a48b3c12f10feef36a01f532dd Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 10 May 2024 01:11:03 +0200 Subject: [PATCH 01/17] `vtgate`: support filtering tablets by tablet-tags Signed-off-by: Tim Vaillancourt --- go/vt/discovery/healthcheck.go | 16 ++++++--- go/vt/discovery/topology_watcher.go | 42 +++++++++++++++++++++--- go/vt/discovery/topology_watcher_test.go | 25 +++++++++++--- 3 files changed, 70 insertions(+), 13 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 46d92c7364e..c64db9c1923 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -48,6 +48,7 @@ import ( "github.com/spf13/pflag" "golang.org/x/sync/semaphore" + "vitess.io/vitess/go/flagutil" "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" @@ -82,6 +83,9 @@ var ( // tabletFilters are the keyspace|shard or keyrange filters to apply to the full set of tablets. tabletFilters []string + // tabletFilterTags are the key/values filters to apply to the full set of tablets. + tabletFilterTags flagutil.StringMapValue + // refreshInterval is the interval at which healthcheck refreshes its list of tablets from topo. refreshInterval = 1 * time.Minute @@ -164,6 +168,7 @@ func init() { func registerDiscoveryFlags(fs *pflag.FlagSet) { fs.StringSliceVar(&tabletFilters, "tablet_filters", []string{}, "Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch.") + fs.Var(&tabletFilterTags, "tablet-filter-tags", "Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch.") fs.Var((*topoproto.TabletTypeListFlag)(&AllowedTabletTypes), "allowed_tablet_types", "Specifies the tablet types this vtgate is allowed to route queries to. Should be provided as a comma-separated set of tablet types.") fs.StringSliceVar(&KeyspacesToWatch, "keyspaces_to_watch", []string{}, "Specifies which keyspaces this vtgate should have access to while routing queries or accessing the vschema.") } @@ -337,7 +342,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur loadTabletsTrigger: make(chan struct{}), } var topoWatchers []*TopologyWatcher - var filter TabletFilter + var filters []TabletFilter cells := strings.Split(cellsToWatch, ",") if cellsToWatch == "" { cells = append(cells, localCell) @@ -357,11 +362,14 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur if err != nil { log.Exitf("Cannot parse tablet_filters parameter: %v", err) } - filter = fbs + filters = []TabletFilter{fbs} } else if len(KeyspacesToWatch) > 0 { - filter = NewFilterByKeyspace(KeyspacesToWatch) + filters = []TabletFilter{NewFilterByKeyspace(KeyspacesToWatch)} + } + if len(tabletFilterTags) > 0 { + filters = append(filters, NewFilterByTabletTags(tabletFilterTags)) } - topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency)) + topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency)) } hc.topoWatchers = topoWatchers diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 0b69ecb6a63..0b8d87467ef 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -66,7 +66,7 @@ type TopologyWatcher struct { // set at construction time topoServer *topo.Server healthcheck HealthCheck - tabletFilter TabletFilter + tabletFilters []TabletFilter cell string refreshInterval time.Duration refreshKnownTablets bool @@ -92,11 +92,11 @@ type TopologyWatcher struct { // NewTopologyWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and reloads them as needed. -func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { +func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f []TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, healthcheck: hc, - tabletFilter: f, + tabletFilters: f, cell: cell, refreshInterval: refreshInterval, refreshKnownTablets: refreshKnownTablets, @@ -141,6 +141,16 @@ func (tw *TopologyWatcher) Stop() { tw.wg.Wait() } +// hasTabletFiltersMatch returns true if a tablet matches all tablet filters. +func (tw *TopologyWatcher) hasTabletFiltersMatch(tablet *topodata.Tablet) bool { + for _, tabletFilter := range tw.tabletFilters { + if !tabletFilter.IsIncluded(tablet) { + return false + } + } + return true +} + func (tw *TopologyWatcher) loadTablets() { newTablets := make(map[string]*tabletInfo) var partialResult bool @@ -198,7 +208,7 @@ func (tw *TopologyWatcher) loadTablets() { } for alias, newVal := range newTablets { - if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(newVal.tablet) { + if tw.tabletFilters != nil && !tw.hasTabletFiltersMatch(newVal.tablet) { continue } @@ -221,7 +231,7 @@ func (tw *TopologyWatcher) loadTablets() { } for _, val := range tw.tablets { - if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(val.tablet) { + if tw.tabletFilters != nil && !tw.hasTabletFiltersMatch(val.tablet) { continue } @@ -375,3 +385,25 @@ func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool { _, exist := fbk.keyspaces[tablet.Keyspace] return exist } + +// TODO: +type FilterByTabletTags struct { + tags map[string]string +} + +func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags { + return &FilterByTabletTags{tags: tabletTags} +} + +// IsIncluded returns true if the tablet's tags match what we expect. +func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool { + if tablet.Tags == nil { + return false + } + for key, val := range fbtg.tags { + if tabletVal, found := tablet.Tags[key]; !found || tabletVal != val { + return false + } + } + return true +} diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 95c6e44ec43..8ffb9b73b44 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -393,7 +393,7 @@ func TestFilterByKeyspace(t *testing.T) { ctx := utils.LeakCheckContext(t) hc := NewFakeHealthCheck(nil) - f := NewFilterByKeyspace(testKeyspacesToWatch) + f := []TabletFilter{NewFilterByKeyspace(testKeyspacesToWatch)} ts := memorytopo.NewServer(ctx, testCell) defer ts.Close() tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5) @@ -414,7 +414,7 @@ func TestFilterByKeyspace(t *testing.T) { Shard: testShard, } - assert.Equal(t, test.expected, f.IsIncluded(tablet)) + assert.Equal(t, test.expected, tw.hasTabletFiltersMatch(tablet)) // Make this fatal because there is no point continuing if CreateTablet fails require.NoError(t, ts.CreateTablet(context.Background(), tablet)) @@ -443,7 +443,7 @@ func TestFilterByKeyspace(t *testing.T) { Keyspace: test.keyspace, Shard: testShard, } - assert.Equal(t, test.expected, f.IsIncluded(tabletReplacement)) + assert.Equal(t, test.expected, tw.hasTabletFiltersMatch(tabletReplacement)) require.NoError(t, ts.CreateTablet(context.Background(), tabletReplacement)) tw.loadTablets() @@ -476,7 +476,7 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) { defer fhc.Close() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() - f := NewFilterByKeyspace(testKeyspacesToWatch) + f := []TabletFilter{NewFilterByKeyspace(testKeyspacesToWatch)} tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5) counts = checkOpCounts(t, counts, map[string]int64{}) @@ -578,6 +578,23 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) { tw.Stop() } +func TestNewFilterByTabletTags(t *testing.T) { + tags := map[string]string{ + "instance_type": "i3.xlarge", + "some_key": "some_value", + } + filter := NewFilterByTabletTags(tags) + assert.False(t, filter.IsIncluded(&topodatapb.Tablet{ + Tags: nil, + })) + assert.False(t, filter.IsIncluded(&topodatapb.Tablet{ + Tags: map[string]string{}, + })) + assert.True(t, filter.IsIncluded(&topodatapb.Tablet{ + Tags: tags, + })) +} + func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) { ctx := utils.LeakCheckContext(t) From 7f27e66642cdc9492bc651cad5a6646ccdf316b8 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 10 May 2024 01:21:28 +0200 Subject: [PATCH 02/17] cleanup Signed-off-by: Tim Vaillancourt --- go/vt/discovery/topology_watcher.go | 12 +++++++++--- go/vt/discovery/topology_watcher_test.go | 7 ++++++- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 0b8d87467ef..d53f8505ff2 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -386,18 +386,24 @@ func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool { return exist } -// TODO: +// FilterByTabletTags is a filter that filters tablets by tablet tag key/values. type FilterByTabletTags struct { tags map[string]string } +// NewFilterByTabletTags creates a new FilterByTabletTags. All tablets that match +// all tablet tags will be forwarded to the TopologyWatcher's consumer. func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags { - return &FilterByTabletTags{tags: tabletTags} + return &FilterByTabletTags{ + tags: tabletTags, + } } // IsIncluded returns true if the tablet's tags match what we expect. func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool { - if tablet.Tags == nil { + if fbtg.tags == nil { + return true + } else if tablet.Tags == nil { return false } for key, val := range fbtg.tags { diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 8ffb9b73b44..e76e2ef894a 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -579,11 +579,16 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) { } func TestNewFilterByTabletTags(t *testing.T) { + // no required tags == true + filter := NewFilterByTabletTags(nil) + assert.True(t, filter.IsIncluded(&topodatapb.Tablet{})) + tags := map[string]string{ "instance_type": "i3.xlarge", "some_key": "some_value", } - filter := NewFilterByTabletTags(tags) + filter = NewFilterByTabletTags(tags) + assert.False(t, filter.IsIncluded(&topodatapb.Tablet{ Tags: nil, })) From bc1f7b1f0202dbee5d41ab5836c0ad2bba7e2c1f Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 10 May 2024 01:42:04 +0200 Subject: [PATCH 03/17] Fix comment Signed-off-by: Tim Vaillancourt --- go/vt/discovery/healthcheck.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index c64db9c1923..4a92504c624 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -83,7 +83,7 @@ var ( // tabletFilters are the keyspace|shard or keyrange filters to apply to the full set of tablets. tabletFilters []string - // tabletFilterTags are the key/values filters to apply to the full set of tablets. + // tabletFilterTags are the tablet tag filters (as key:value pairs) to apply to the full set of tablets. tabletFilterTags flagutil.StringMapValue // refreshInterval is the interval at which healthcheck refreshes its list of tablets from topo. From 9a6eca522ade614290bea1ed3b005562145655a8 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 10 May 2024 01:49:11 +0200 Subject: [PATCH 04/17] missing flags e2e updates Signed-off-by: Tim Vaillancourt --- go/flags/endtoend/vtcombo.txt | 1 + go/flags/endtoend/vtgate.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index fd09f940b76..f5687e5b359 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -337,6 +337,7 @@ Flags: --stream_health_buffer_size uint max streaming health entries to buffer per streaming health client (default 20) --table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class --table_gc_lifecycle string States for a DROP TABLE garbage collection cycle. Default is 'hold,purge,evac,drop', use any subset ('drop' implicitly always included) (default "hold,purge,evac,drop") + --tablet-filter-tags StringMap Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch. --tablet_dir string The directory within the vtdataroot to store vttablet/mysql files. Defaults to being generated by the tablet uid. --tablet_filters strings Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch. --tablet_health_keep_alive duration close streaming tablet health connection if there are no requests for this long (default 5m0s) diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 7d0b3272cc8..b5aad44c5b6 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -195,6 +195,7 @@ Flags: --stderrthreshold severityFlag logs at or above this threshold go to stderr (default 1) --stream_buffer_size int the number of bytes sent from vtgate for each stream call. It's recommended to keep this value in sync with vttablet's query-server-config-stream-buffer-size. (default 32768) --table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class + --tablet-filter-tags StringMap Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch. --tablet_filters strings Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch. --tablet_grpc_ca string the server ca to use to validate servers when connecting --tablet_grpc_cert string the cert to use to connect From c8953e8192adf34d47cacd6f5da2e64130feec91 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 10 May 2024 03:24:14 +0200 Subject: [PATCH 05/17] add TabletFilters abstraction Signed-off-by: Tim Vaillancourt --- go/vt/discovery/healthcheck.go | 6 ++--- go/vt/discovery/topology_watcher.go | 31 +++++++++++++----------- go/vt/discovery/topology_watcher_test.go | 6 ++--- 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 4a92504c624..d8595de3573 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -342,7 +342,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur loadTabletsTrigger: make(chan struct{}), } var topoWatchers []*TopologyWatcher - var filters []TabletFilter + filters := make(TabletFilters, 0) cells := strings.Split(cellsToWatch, ",") if cellsToWatch == "" { cells = append(cells, localCell) @@ -362,9 +362,9 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur if err != nil { log.Exitf("Cannot parse tablet_filters parameter: %v", err) } - filters = []TabletFilter{fbs} + filters = TabletFilters{fbs} } else if len(KeyspacesToWatch) > 0 { - filters = []TabletFilter{NewFilterByKeyspace(KeyspacesToWatch)} + filters = TabletFilters{NewFilterByKeyspace(KeyspacesToWatch)} } if len(tabletFilterTags) > 0 { filters = append(filters, NewFilterByTabletTags(tabletFilterTags)) diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index d53f8505ff2..0dead6d6358 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -66,7 +66,7 @@ type TopologyWatcher struct { // set at construction time topoServer *topo.Server healthcheck HealthCheck - tabletFilters []TabletFilter + tabletFilters TabletFilters cell string refreshInterval time.Duration refreshKnownTablets bool @@ -92,7 +92,7 @@ type TopologyWatcher struct { // NewTopologyWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and reloads them as needed. -func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f []TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { +func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilters, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, healthcheck: hc, @@ -141,16 +141,6 @@ func (tw *TopologyWatcher) Stop() { tw.wg.Wait() } -// hasTabletFiltersMatch returns true if a tablet matches all tablet filters. -func (tw *TopologyWatcher) hasTabletFiltersMatch(tablet *topodata.Tablet) bool { - for _, tabletFilter := range tw.tabletFilters { - if !tabletFilter.IsIncluded(tablet) { - return false - } - } - return true -} - func (tw *TopologyWatcher) loadTablets() { newTablets := make(map[string]*tabletInfo) var partialResult bool @@ -208,7 +198,7 @@ func (tw *TopologyWatcher) loadTablets() { } for alias, newVal := range newTablets { - if tw.tabletFilters != nil && !tw.hasTabletFiltersMatch(newVal.tablet) { + if tw.tabletFilters != nil && !tw.tabletFilters.IsIncluded(newVal.tablet) { continue } @@ -231,7 +221,7 @@ func (tw *TopologyWatcher) loadTablets() { } for _, val := range tw.tablets { - if tw.tabletFilters != nil && !tw.hasTabletFiltersMatch(val.tablet) { + if tw.tabletFilters != nil && !tw.tabletFilters.IsIncluded(val.tablet) { continue } @@ -284,6 +274,19 @@ type TabletFilter interface { IsIncluded(tablet *topodata.Tablet) bool } +// TabletFilters contains filters for tablets. +type TabletFilters []TabletFilter + +// IsIncluded returns true if a tablet passes all filters. +func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool { + for _, filter := range tf { + if !filter.IsIncluded(tablet) { + return false + } + } + return true +} + // FilterByShard is a filter that filters tablets by // keyspace/shard. type FilterByShard struct { diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index e76e2ef894a..32ff2c96d44 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -393,7 +393,7 @@ func TestFilterByKeyspace(t *testing.T) { ctx := utils.LeakCheckContext(t) hc := NewFakeHealthCheck(nil) - f := []TabletFilter{NewFilterByKeyspace(testKeyspacesToWatch)} + f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)} ts := memorytopo.NewServer(ctx, testCell) defer ts.Close() tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5) @@ -414,7 +414,7 @@ func TestFilterByKeyspace(t *testing.T) { Shard: testShard, } - assert.Equal(t, test.expected, tw.hasTabletFiltersMatch(tablet)) + assert.Equal(t, test.expected, f.IsIncluded(tablet)) // Make this fatal because there is no point continuing if CreateTablet fails require.NoError(t, ts.CreateTablet(context.Background(), tablet)) @@ -443,7 +443,7 @@ func TestFilterByKeyspace(t *testing.T) { Keyspace: test.keyspace, Shard: testShard, } - assert.Equal(t, test.expected, tw.hasTabletFiltersMatch(tabletReplacement)) + assert.Equal(t, test.expected, f.IsIncluded(tabletReplacement)) require.NoError(t, ts.CreateTablet(context.Background(), tabletReplacement)) tw.loadTablets() From e55428c2749d9f41dc7fc3c13c6cb79a796b8064 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 10 May 2024 03:26:16 +0200 Subject: [PATCH 06/17] just use 'var' Signed-off-by: Tim Vaillancourt --- go/vt/discovery/healthcheck.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index d8595de3573..3de4b9f8196 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -342,7 +342,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur loadTabletsTrigger: make(chan struct{}), } var topoWatchers []*TopologyWatcher - filters := make(TabletFilters, 0) + var filters TabletFilters cells := strings.Split(cellsToWatch, ",") if cellsToWatch == "" { cells = append(cells, localCell) From 56de44b57a16e584ab9c75e97f4e5873ccfea17a Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 10 May 2024 03:28:04 +0200 Subject: [PATCH 07/17] []TabletFilter -> TabletFilters Signed-off-by: Tim Vaillancourt --- go/vt/discovery/topology_watcher_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 32ff2c96d44..afa44f185b4 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -476,7 +476,7 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) { defer fhc.Close() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() - f := []TabletFilter{NewFilterByKeyspace(testKeyspacesToWatch)} + f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)} tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5) counts = checkOpCounts(t, counts, map[string]int64{}) From 9f9cd1dec2c6ff640237e9a55d0b69e2d820983f Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 10 May 2024 03:36:16 +0200 Subject: [PATCH 08/17] use TabletFilter interface Signed-off-by: Tim Vaillancourt --- go/vt/discovery/topology_watcher.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 0dead6d6358..a09ded0052a 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -66,7 +66,7 @@ type TopologyWatcher struct { // set at construction time topoServer *topo.Server healthcheck HealthCheck - tabletFilters TabletFilters + tabletFilter TabletFilter cell string refreshInterval time.Duration refreshKnownTablets bool @@ -92,11 +92,11 @@ type TopologyWatcher struct { // NewTopologyWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and reloads them as needed. -func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilters, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { +func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, healthcheck: hc, - tabletFilters: f, + tabletFilter: f, cell: cell, refreshInterval: refreshInterval, refreshKnownTablets: refreshKnownTablets, @@ -198,7 +198,7 @@ func (tw *TopologyWatcher) loadTablets() { } for alias, newVal := range newTablets { - if tw.tabletFilters != nil && !tw.tabletFilters.IsIncluded(newVal.tablet) { + if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(newVal.tablet) { continue } @@ -221,7 +221,7 @@ func (tw *TopologyWatcher) loadTablets() { } for _, val := range tw.tablets { - if tw.tabletFilters != nil && !tw.tabletFilters.IsIncluded(val.tablet) { + if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(val.tablet) { continue } From 64b80ab4c9bac3f35162e3cb1261e9de4cc9340a Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 10 May 2024 04:37:19 +0200 Subject: [PATCH 09/17] skip tablet-tag match if PRIMARY Signed-off-by: Tim Vaillancourt --- go/vt/discovery/topology_watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index a09ded0052a..ce3476593db 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -404,7 +404,7 @@ func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags { // IsIncluded returns true if the tablet's tags match what we expect. func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool { - if fbtg.tags == nil { + if fbtg.tags == nil || tablet.Type == topodata.TabletType_PRIMARY { return true } else if tablet.Tags == nil { return false From 2a3e5b351009502549f65f17ea916cee4fb11703 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 10 May 2024 04:41:09 +0200 Subject: [PATCH 10/17] Add test for PRIMARY Signed-off-by: Tim Vaillancourt --- go/vt/discovery/topology_watcher_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index afa44f185b4..43102d67df9 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -595,6 +595,10 @@ func TestNewFilterByTabletTags(t *testing.T) { assert.False(t, filter.IsIncluded(&topodatapb.Tablet{ Tags: map[string]string{}, })) + assert.True(t, filter.IsIncluded(&topodatapb.Tablet{ + Type: topodatapb.TabletType_PRIMARY, + Tags: map[string]string{}, + })) assert.True(t, filter.IsIncluded(&topodatapb.Tablet{ Tags: tags, })) From 325b04a1b43aebff5639eef53927317f427068c5 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 10 May 2024 04:55:21 +0200 Subject: [PATCH 11/17] move PRIMARY check to TabletFilters Signed-off-by: Tim Vaillancourt --- go/vt/discovery/topology_watcher.go | 5 ++++- go/vt/discovery/topology_watcher_test.go | 4 ---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index ce3476593db..adb0ca59d4c 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -279,6 +279,9 @@ type TabletFilters []TabletFilter // IsIncluded returns true if a tablet passes all filters. func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool { + if tablet.Type == topodata.TabletType_PRIMARY { + return true + } for _, filter := range tf { if !filter.IsIncluded(tablet) { return false @@ -404,7 +407,7 @@ func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags { // IsIncluded returns true if the tablet's tags match what we expect. func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool { - if fbtg.tags == nil || tablet.Type == topodata.TabletType_PRIMARY { + if fbtg.tags == nil { return true } else if tablet.Tags == nil { return false diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 43102d67df9..afa44f185b4 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -595,10 +595,6 @@ func TestNewFilterByTabletTags(t *testing.T) { assert.False(t, filter.IsIncluded(&topodatapb.Tablet{ Tags: map[string]string{}, })) - assert.True(t, filter.IsIncluded(&topodatapb.Tablet{ - Type: topodatapb.TabletType_PRIMARY, - Tags: map[string]string{}, - })) assert.True(t, filter.IsIncluded(&topodatapb.Tablet{ Tags: tags, })) From 7954a3ad5aa39b0a90d34a2a7eac62ca743001d8 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 10 May 2024 05:11:30 +0200 Subject: [PATCH 12/17] remove PRIMARY filter bypass Signed-off-by: Tim Vaillancourt --- go/vt/discovery/topology_watcher.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index adb0ca59d4c..a09ded0052a 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -279,9 +279,6 @@ type TabletFilters []TabletFilter // IsIncluded returns true if a tablet passes all filters. func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool { - if tablet.Type == topodata.TabletType_PRIMARY { - return true - } for _, filter := range tf { if !filter.IsIncluded(tablet) { return false From d3989cd13cf1e090f0dd8b9fe5891046235ed41d Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 13 May 2024 02:32:56 +0200 Subject: [PATCH 13/17] use append always Signed-off-by: Tim Vaillancourt --- go/vt/discovery/healthcheck.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 3de4b9f8196..bbef4fffd30 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -362,9 +362,9 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur if err != nil { log.Exitf("Cannot parse tablet_filters parameter: %v", err) } - filters = TabletFilters{fbs} + filters = append(filters, fbs) } else if len(KeyspacesToWatch) > 0 { - filters = TabletFilters{NewFilterByKeyspace(KeyspacesToWatch)} + filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch)) } if len(tabletFilterTags) > 0 { filters = append(filters, NewFilterByTabletTags(tabletFilterTags)) From f1751ef2f9aed1ecab4359069ac7616ea3ca5f57 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 14 May 2024 23:41:53 +0200 Subject: [PATCH 14/17] additional test Signed-off-by: Tim Vaillancourt --- go/vt/discovery/topology_watcher_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index afa44f185b4..1fcc0d3306b 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -595,6 +595,11 @@ func TestNewFilterByTabletTags(t *testing.T) { assert.False(t, filter.IsIncluded(&topodatapb.Tablet{ Tags: map[string]string{}, })) + assert.True(t, filter.IsIncluded(&topodatapb.Tablet{ + Tags: map[string]string{ + "instance_type": "i3.xlarge", + }, + })) assert.True(t, filter.IsIncluded(&topodatapb.Tablet{ Tags: tags, })) From 55aa9e45826b89d4d647dad8327680fe8353300e Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Sat, 1 Jun 2024 01:28:19 +0200 Subject: [PATCH 15/17] reset filters per loop of cells Signed-off-by: Tim Vaillancourt --- go/vt/discovery/healthcheck.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index bbef4fffd30..70799b0f6bc 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -342,13 +342,13 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur loadTabletsTrigger: make(chan struct{}), } var topoWatchers []*TopologyWatcher - var filters TabletFilters cells := strings.Split(cellsToWatch, ",") if cellsToWatch == "" { cells = append(cells, localCell) } for _, c := range cells { + var filters TabletFilters log.Infof("Setting up healthcheck for cell: %v", c) if c == "" { continue From 2f7a0a1f820fd9133d314e221185454bc5b85132 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Sat, 1 Jun 2024 02:06:59 +0200 Subject: [PATCH 16/17] Fix partial match test Signed-off-by: Tim Vaillancourt --- go/vt/discovery/topology_watcher_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 1fcc0d3306b..ad45ef92ebe 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -595,7 +595,7 @@ func TestNewFilterByTabletTags(t *testing.T) { assert.False(t, filter.IsIncluded(&topodatapb.Tablet{ Tags: map[string]string{}, })) - assert.True(t, filter.IsIncluded(&topodatapb.Tablet{ + assert.False(t, filter.IsIncluded(&topodatapb.Tablet{ Tags: map[string]string{ "instance_type": "i3.xlarge", }, From 7e38cf117454f1c1d6031cef33e420cf8b477162 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Sun, 2 Jun 2024 16:25:54 +0200 Subject: [PATCH 17/17] Update go/vt/discovery/topology_watcher.go Co-authored-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Signed-off-by: Tim Vaillancourt --- go/vt/discovery/topology_watcher.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index a09ded0052a..64346d524ad 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -406,7 +406,8 @@ func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags { func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool { if fbtg.tags == nil { return true - } else if tablet.Tags == nil { + } + if tablet.Tags == nil { return false } for key, val := range fbtg.tags {