Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vtgate: support filtering tablets by tablet-tags #15911

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ Flags:
--stream_health_buffer_size uint max streaming health entries to buffer per streaming health client (default 20)
--table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class
--table_gc_lifecycle string States for a DROP TABLE garbage collection cycle. Default is 'hold,purge,evac,drop', use any subset ('drop' implicitly always included) (default "hold,purge,evac,drop")
--tablet-filter-tags StringMap Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch.
--tablet_dir string The directory within the vtdataroot to store vttablet/mysql files. Defaults to being generated by the tablet uid.
--tablet_filters strings Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch.
--tablet_health_keep_alive duration close streaming tablet health connection if there are no requests for this long (default 5m0s)
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ Flags:
--stderrthreshold severityFlag logs at or above this threshold go to stderr (default 1)
--stream_buffer_size int the number of bytes sent from vtgate for each stream call. It's recommended to keep this value in sync with vttablet's query-server-config-stream-buffer-size. (default 32768)
--table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class
--tablet-filter-tags StringMap Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch.
--tablet_filters strings Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch.
--tablet_grpc_ca string the server ca to use to validate servers when connecting
--tablet_grpc_cert string the cert to use to connect
Expand Down
16 changes: 12 additions & 4 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/spf13/pflag"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -82,6 +83,9 @@ var (
// tabletFilters are the keyspace|shard or keyrange filters to apply to the full set of tablets.
tabletFilters []string

// tabletFilterTags are the tablet tag filters (as key:value pairs) to apply to the full set of tablets.
tabletFilterTags flagutil.StringMapValue

// refreshInterval is the interval at which healthcheck refreshes its list of tablets from topo.
refreshInterval = 1 * time.Minute

Expand Down Expand Up @@ -164,6 +168,7 @@ func init() {

func registerDiscoveryFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&tabletFilters, "tablet_filters", []string{}, "Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch.")
fs.Var(&tabletFilterTags, "tablet-filter-tags", "Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch.")
fs.Var((*topoproto.TabletTypeListFlag)(&AllowedTabletTypes), "allowed_tablet_types", "Specifies the tablet types this vtgate is allowed to route queries to. Should be provided as a comma-separated set of tablet types.")
fs.StringSliceVar(&KeyspacesToWatch, "keyspaces_to_watch", []string{}, "Specifies which keyspaces this vtgate should have access to while routing queries or accessing the vschema.")
}
Expand Down Expand Up @@ -337,7 +342,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
loadTabletsTrigger: make(chan struct{}),
}
var topoWatchers []*TopologyWatcher
var filter TabletFilter
var filters TabletFilters
cells := strings.Split(cellsToWatch, ",")
if cellsToWatch == "" {
cells = append(cells, localCell)
Expand All @@ -357,11 +362,14 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
if err != nil {
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
filter = fbs
filters = append(filters, fbs)
} else if len(KeyspacesToWatch) > 0 {
filter = NewFilterByKeyspace(KeyspacesToWatch)
filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch))
}
if len(tabletFilterTags) > 0 {
filters = append(filters, NewFilterByTabletTags(tabletFilterTags))
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we reset the filters slice here? Otherwise we keep appending the same filters over and over again for each cells.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frouioui good catch! yes, this should be a reset each iteration of the loop, updated 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frouioui re-requested review on the fix 🙇

}

hc.topoWatchers = topoWatchers
Expand Down
41 changes: 41 additions & 0 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,19 @@ type TabletFilter interface {
IsIncluded(tablet *topodata.Tablet) bool
}

// TabletFilters contains filters for tablets.
type TabletFilters []TabletFilter

// IsIncluded returns true if a tablet passes all filters.
func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool {
for _, filter := range tf {
if !filter.IsIncluded(tablet) {
return false
}
}
return true
}

// FilterByShard is a filter that filters tablets by
// keyspace/shard.
type FilterByShard struct {
Expand Down Expand Up @@ -375,3 +388,31 @@ func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool {
_, exist := fbk.keyspaces[tablet.Keyspace]
return exist
}

// FilterByTabletTags is a filter that filters tablets by tablet tag key/values.
type FilterByTabletTags struct {
tags map[string]string
}

// NewFilterByTabletTags creates a new FilterByTabletTags. All tablets that match
// all tablet tags will be forwarded to the TopologyWatcher's consumer.
func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags {
return &FilterByTabletTags{
tags: tabletTags,
}
}

// IsIncluded returns true if the tablet's tags match what we expect.
func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool {
if fbtg.tags == nil {
return true
} else if tablet.Tags == nil {
return false
}
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved
for key, val := range fbtg.tags {
if tabletVal, found := tablet.Tags[key]; !found || tabletVal != val {
return false
}
}
return true
}
31 changes: 29 additions & 2 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func TestFilterByKeyspace(t *testing.T) {
ctx := utils.LeakCheckContext(t)

hc := NewFakeHealthCheck(nil)
f := NewFilterByKeyspace(testKeyspacesToWatch)
f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)}
ts := memorytopo.NewServer(ctx, testCell)
defer ts.Close()
tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5)
Expand Down Expand Up @@ -476,7 +476,7 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) {
defer fhc.Close()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
f := NewFilterByKeyspace(testKeyspacesToWatch)
f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)}
tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5)

counts = checkOpCounts(t, counts, map[string]int64{})
Expand Down Expand Up @@ -578,6 +578,33 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) {
tw.Stop()
}

func TestNewFilterByTabletTags(t *testing.T) {
// no required tags == true
filter := NewFilterByTabletTags(nil)
assert.True(t, filter.IsIncluded(&topodatapb.Tablet{}))

tags := map[string]string{
"instance_type": "i3.xlarge",
"some_key": "some_value",
}
filter = NewFilterByTabletTags(tags)

assert.False(t, filter.IsIncluded(&topodatapb.Tablet{
Tags: nil,
}))
assert.False(t, filter.IsIncluded(&topodatapb.Tablet{
Tags: map[string]string{},
}))
assert.True(t, filter.IsIncluded(&topodatapb.Tablet{
Tags: map[string]string{
"instance_type": "i3.xlarge",
},
}))
assert.True(t, filter.IsIncluded(&topodatapb.Tablet{
Tags: tags,
}))
}

func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) {
ctx := utils.LeakCheckContext(t)

Expand Down
Loading