Skip to content

Commit

Permalink
vtgate: support filtering tablets by tablet-tags (vitessio#15911)
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
Co-authored-by: Shlomi Noach <[email protected]>
  • Loading branch information
timvaillancourt and shlomi-noach committed Jun 4, 2024
1 parent b6812a7 commit d0e0931
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 4 deletions.
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ Usage of vtgate:
--stderrthreshold severityFlag logs at or above this threshold go to stderr (default 1)
--stream_buffer_size int the number of bytes sent from vtgate for each stream call. It's recommended to keep this value in sync with vttablet's query-server-config-stream-buffer-size. (default 32768)
--structured-logging whether to use structured logging (PlanetScale Log) logger or the original (glog) logger
--table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class
--tablet-filter-tags StringMap Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch.
--tablet_filters strings Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch.
--tablet_grpc_ca string the server ca to use to validate servers when connecting
--tablet_grpc_cert string the cert to use to connect
Expand Down
16 changes: 12 additions & 4 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/spf13/pflag"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -80,6 +81,9 @@ var (
// tabletFilters are the keyspace|shard or keyrange filters to apply to the full set of tablets.
tabletFilters []string

// tabletFilterTags are the tablet tag filters (as key:value pairs) to apply to the full set of tablets.
tabletFilterTags flagutil.StringMapValue

// refreshInterval is the interval at which healthcheck refreshes its list of tablets from topo.
refreshInterval = 1 * time.Minute

Expand Down Expand Up @@ -161,6 +165,7 @@ func init() {

func registerDiscoveryFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&tabletFilters, "tablet_filters", []string{}, "Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch.")
fs.Var(&tabletFilterTags, "tablet-filter-tags", "Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch.")
fs.Var((*topoproto.TabletTypeListFlag)(&AllowedTabletTypes), "allowed_tablet_types", "Specifies the tablet types this vtgate is allowed to route queries to. Should be provided as a comma-separated set of tablet types.")
fs.StringSliceVar(&KeyspacesToWatch, "keyspaces_to_watch", []string{}, "Specifies which keyspaces this vtgate should have access to while routing queries or accessing the vschema.")
}
Expand Down Expand Up @@ -331,13 +336,13 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
cellAliases: make(map[string]string),
}
var topoWatchers []*TopologyWatcher
var filter TabletFilter
cells := strings.Split(cellsToWatch, ",")
if cellsToWatch == "" {
cells = append(cells, localCell)
}

for _, c := range cells {
var filters TabletFilters
log.Infof("Setting up healthcheck for cell: %v", c)
if c == "" {
continue
Expand All @@ -351,11 +356,14 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
if err != nil {
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
filter = fbs
filters = append(filters, fbs)
} else if len(KeyspacesToWatch) > 0 {
filter = NewFilterByKeyspace(KeyspacesToWatch)
filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch))
}
if len(tabletFilterTags) > 0 {
filters = append(filters, NewFilterByTabletTags(tabletFilterTags))
}
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down
42 changes: 42 additions & 0 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,19 @@ type TabletFilter interface {
IsIncluded(tablet *topodata.Tablet) bool
}

// TabletFilters contains filters for tablets.
type TabletFilters []TabletFilter

// IsIncluded returns true if a tablet passes all filters.
func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool {
for _, filter := range tf {
if !filter.IsIncluded(tablet) {
return false
}
}
return true
}

// FilterByShard is a filter that filters tablets by
// keyspace/shard.
type FilterByShard struct {
Expand Down Expand Up @@ -394,3 +407,32 @@ func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool {
_, exist := fbk.keyspaces[tablet.Keyspace]
return exist
}

// FilterByTabletTags is a filter that filters tablets by tablet tag key/values.
type FilterByTabletTags struct {
tags map[string]string
}

// NewFilterByTabletTags creates a new FilterByTabletTags. All tablets that match
// all tablet tags will be forwarded to the TopologyWatcher's consumer.
func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags {
return &FilterByTabletTags{
tags: tabletTags,
}
}

// IsIncluded returns true if the tablet's tags match what we expect.
func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool {
if fbtg.tags == nil {
return true
}
if tablet.Tags == nil {
return false
}
for key, val := range fbtg.tags {
if tabletVal, found := tablet.Tags[key]; !found || tabletVal != val {
return false
}
}
return true
}
27 changes: 27 additions & 0 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,3 +614,30 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) {

tw.Stop()
}

func TestNewFilterByTabletTags(t *testing.T) {
// no required tags == true
filter := NewFilterByTabletTags(nil)
assert.True(t, filter.IsIncluded(&topodatapb.Tablet{}))

tags := map[string]string{
"instance_type": "i3.xlarge",
"some_key": "some_value",
}
filter = NewFilterByTabletTags(tags)

assert.False(t, filter.IsIncluded(&topodatapb.Tablet{
Tags: nil,
}))
assert.False(t, filter.IsIncluded(&topodatapb.Tablet{
Tags: map[string]string{},
}))
assert.False(t, filter.IsIncluded(&topodatapb.Tablet{
Tags: map[string]string{
"instance_type": "i3.xlarge",
},
}))
assert.True(t, filter.IsIncluded(&topodatapb.Tablet{
Tags: tags,
}))
}

0 comments on commit d0e0931

Please sign in to comment.