From 64e4bde804543e13ce93f165f508e42405e6c81c Mon Sep 17 00:00:00 2001 From: "Eduardo J. Ortega U" <5791035+ejortegau@users.noreply.github.com> Date: Mon, 16 Sep 2024 16:31:16 -0500 Subject: [PATCH] Backport Use GetTabletsByCell in healthcheck This backports upstram PR #14693, with a few minor changes to make it work with the Go version we are using and a small change to topology_watcher.go so that test cases reflect and test for the same behavior as the upstream code. The description of the original PR follows: VTGate's healthcheck module currently calls GetTablet for each tablet alias that it discovers in a cell. Instead we can use GetTabletsForCell to fetch all tablets for a cell at once. This PR does a few more things: * GetTabletsForCell now handles the case where the response size violates gRPC limits by falling back to one tablet at a time in case of error. * Previously, the one tablet at a time method had unlimited concurrency. In this PR we introduce a configuration option for concurrency. * We pass topoReadConcurrency from healthcheck into GetTabletsForCell. * The behavior of --refresh_known_tablets flag is different now. Previously we would not read those tablets at all, now we do read them, but ignore any changes if they are already known. The basic fix has already been tried in production and shown to reduce the number of Get calls from vtgate -> topo from O(n) to O(1). We can consider deprecating and deleting --refresh_known_tablets in a future release. The concerns that originally motivated adding that flag in #3965 are alleviated by fetching all tablets in one call to the topo. --- go/vt/discovery/healthcheck.go | 21 +- go/vt/discovery/tablet_picker.go | 2 +- go/vt/discovery/topology_watcher.go | 81 ++--- go/vt/discovery/topology_watcher_test.go | 424 ++++++++++------------- go/vt/topo/consultopo/error.go | 9 +- go/vt/topo/errors.go | 3 + go/vt/topo/etcd2topo/error.go | 13 +- go/vt/topo/memorytopo/file.go | 3 + go/vt/topo/memorytopo/memorytopo.go | 10 + go/vt/topo/shard.go | 2 +- go/vt/topo/tablet.go | 67 +++- go/vt/topo/tablet_test.go | 115 ++++++ go/vt/topo/zk2topo/error.go | 17 +- go/vt/topotools/tablet.go | 2 +- go/vt/topotools/utils.go | 4 +- go/vt/vtctl/grpcvtctldserver/server.go | 6 +- go/vt/vtctl/grpcvtctldserver/topo.go | 2 +- go/vt/wrangler/shard.go | 2 +- go/vt/wrangler/split.go | 2 +- 19 files changed, 441 insertions(+), 344 deletions(-) create mode 100644 go/vt/topo/tablet_test.go diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 612323ca9b7..3ac83d79ef1 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -25,7 +25,7 @@ limitations under the License. // Alternatively, use a Watcher implementation which will constantly watch // a source (e.g. the topology) and add and remove tablets as they are // added or removed from the source. -// For a Watcher example have a look at NewCellTabletsWatcher(). +// For a Watcher example have a look at NewTopologyWatcher(). // // Internally, the HealthCheck module is connected to each tablet and has a // streaming RPC (StreamHealth) open to receive periodic health infos. @@ -92,7 +92,7 @@ var ( refreshKnownTablets = true // topoReadConcurrency tells us how many topo reads are allowed in parallel. - topoReadConcurrency = 32 + topoReadConcurrency int64 = 32 // healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000. healthCheckDialConcurrency int64 = 1024 @@ -178,7 +178,7 @@ func registerWebUIFlags(fs *pflag.FlagSet) { fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.") fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.") fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.") - fs.IntVar(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.") + fs.Int64Var(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.") fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.") ParseTabletURLTemplateFromFlag() } @@ -365,6 +365,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur cellAliases: make(map[string]string), } var topoWatchers []*TopologyWatcher + var filter TabletFilter cells := strings.Split(cellsToWatch, ",") if cellsToWatch == "" { cells = append(cells, localCell) @@ -375,7 +376,19 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur if c == "" { continue } - topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topoReadConcurrency)) + if len(tabletFilters) > 0 { + if len(KeyspacesToWatch) > 0 { + log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time") + } + fbs, err := NewFilterByShard(tabletFilters) + if err != nil { + log.Exitf("Cannot parse tablet_filters parameter: %v", err) + } + filter = fbs + } else if len(KeyspacesToWatch) > 0 { + filter = NewFilterByKeyspace(KeyspacesToWatch) + } + topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topoReadConcurrency)) } hc.topoWatchers = topoWatchers diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index c9537d3851e..aabea8be586 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -429,7 +429,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn } shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() - tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases) + tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases, nil) if err != nil { log.Warningf("error fetching tablets from topo: %v", err) // If we get a partial result we can still use it, otherwise return diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 57a29679633..76f051a456c 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -71,8 +71,7 @@ type TopologyWatcher struct { cell string refreshInterval time.Duration refreshKnownTablets bool - getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) - sem chan int + concurrency int64 ctx context.Context cancelFunc context.CancelFunc // wg keeps track of all launched Go routines. @@ -93,34 +92,28 @@ type TopologyWatcher struct { } // NewTopologyWatcher returns a TopologyWatcher that monitors all -// the tablets that it is configured to watch, and reloads them periodically if needed. -// As of now there is only one implementation: watch all tablets in a cell. -func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher { +// the tablets in a cell, and reloads them as needed. +func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int64) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, healthcheck: hc, - tabletFilter: filter, + tabletFilter: f, cell: cell, refreshInterval: refreshInterval, refreshKnownTablets: refreshKnownTablets, - getTablets: getTablets, - sem: make(chan int, topoReadConcurrency), + concurrency: topoReadConcurrency, tablets: make(map[string]*tabletInfo), } tw.firstLoadChan = make(chan struct{}) - // We want the span from the context, but not the cancelation that comes with it + // We want the span from the context, but not the cancellation that comes with it spanContext := trace.CopySpan(context.Background(), ctx) tw.ctx, tw.cancelFunc = context.WithCancel(spanContext) return tw } -// NewCellTabletsWatcher returns a TopologyWatcher that monitors all -// the tablets in a cell, and reloads them as needed. -func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { - return NewTopologyWatcher(ctx, topoServer, hc, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) { - return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell) - }) +func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) { + return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, &topo.GetTabletsByCellOptions{Concurrency: tw.concurrency}) } // Start starts the topology watcher. @@ -149,18 +142,19 @@ func (tw *TopologyWatcher) Stop() { } func (tw *TopologyWatcher) loadTablets() { - var wg sync.WaitGroup newTablets := make(map[string]*tabletInfo) - // First get the list of relevant tabletAliases. - tabletAliases, err := tw.getTablets(tw) + // First get the list of all tablets. + tabletInfos, err := tw.getTablets() topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1) if err != nil { topologyWatcherErrors.Add(topologyWatcherOpListTablets, 1) - select { - case <-tw.ctx.Done(): + // If we get a partial result error, we just log it and process the tablets that we did manage to fetch. + if topo.IsErrType(err, topo.PartialResult) { + log.Errorf("received partial result from getTablets for cell %v: %v", tw.cell, err) + } else { // For all other errors, just return. + log.Errorf("error getting tablets for cell: %v: %v", tw.cell, err) return - default: } log.Errorf("cannot get tablets for cell: %v: %v", tw.cell, err) return @@ -168,11 +162,13 @@ func (tw *TopologyWatcher) loadTablets() { // Accumulate a list of all known alias strings to use later // when sorting. - tabletAliasStrs := make([]string, 0, len(tabletAliases)) + tabletAliasStrs := make([]string, 0, len(tabletInfos)) tw.mu.Lock() - for _, tAlias := range tabletAliases { - aliasStr := topoproto.TabletAliasString(tAlias) + defer tw.mu.Unlock() + + for _, tInfo := range tabletInfos { + aliasStr := topoproto.TabletAliasString(tInfo.Alias) tabletAliasStrs = append(tabletAliasStrs, aliasStr) if !tw.refreshKnownTablets { @@ -182,38 +178,13 @@ func (tw *TopologyWatcher) loadTablets() { continue } } - - wg.Add(1) - go func(alias *topodata.TabletAlias) { - defer wg.Done() - tw.sem <- 1 // Wait for active queue to drain. - tablet, err := tw.topoServer.GetTablet(tw.ctx, alias) - topologyWatcherOperations.Add(topologyWatcherOpGetTablet, 1) - <-tw.sem // Done; enable next request to run. - if err != nil { - topologyWatcherErrors.Add(topologyWatcherOpGetTablet, 1) - select { - case <-tw.ctx.Done(): - return - default: - } - log.Errorf("cannot get tablet for alias %v: %v", alias, err) - return - } - tw.mu.Lock() - aliasStr := topoproto.TabletAliasString(alias) - newTablets[aliasStr] = &tabletInfo{ - alias: aliasStr, - tablet: tablet.Tablet, - } - tw.mu.Unlock() - }(tAlias) + // There's no network call here, so we just do the tablets one at a time instead of in parallel goroutines. + newTablets[aliasStr] = &tabletInfo{ + alias: aliasStr, + tablet: tInfo.Tablet, + } } - tw.mu.Unlock() - wg.Wait() - tw.mu.Lock() - for alias, newVal := range newTablets { if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(newVal.tablet) { continue @@ -266,8 +237,6 @@ func (tw *TopologyWatcher) loadTablets() { tw.topoChecksum = crc32.ChecksumIEEE(buf.Bytes()) tw.lastRefresh = time.Now() - tw.mu.Unlock() - } // RefreshLag returns the time since the last refresh. diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index c372365626c..eb5412c7f07 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -26,9 +26,7 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" - "vitess.io/vitess/go/vt/logutil" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" ) @@ -59,7 +57,7 @@ func TestStartAndCloseTopoWatcher(t *testing.T) { ts := memorytopo.NewServer("aa") fhc := NewFakeHealthCheck(nil) topologyWatcherOperations.ZeroAll() - tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5) done := make(chan bool, 3) result := make(chan bool, 1) @@ -96,9 +94,7 @@ func TestStartAndCloseTopoWatcher(t *testing.T) { done <- true _, ok := <-result - if !ok { - t.Fatal("timed out") - } + require.True(t, ok, "timed out") } func TestCellTabletsWatcher(t *testing.T) { @@ -112,10 +108,10 @@ func TestCellTabletsWatcherNoRefreshKnown(t *testing.T) { func checkWatcher(t *testing.T, refreshKnownTablets bool) { ts := memorytopo.NewServer("aa") fhc := NewFakeHealthCheck(nil) - logger := logutil.NewMemoryLogger() + //logger := logutil.NewMemoryLogger() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() - tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0) @@ -133,200 +129,161 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { Keyspace: "keyspace", Shard: "shard", } - if err := ts.CreateTablet(context.Background(), tablet); err != nil { - t.Fatalf("CreateTablet failed: %v", err) - } - tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1}) - checkChecksum(t, tw, 3238442862) - - // Check the tablet is returned by GetAllTablets(). - allTablets := fhc.GetAllTablets() - key := TabletToMapKey(tablet) - if _, ok := allTablets[key]; !ok || len(allTablets) != 1 || !proto.Equal(allTablets[key], tablet) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet) - } + require.NoError(t, ts.CreateTablet(context.Background(), tablet), "CreateTablet failed for %v", tablet.Alias) - // Add a second tablet to the topology. - tablet2 := &topodatapb.Tablet{ - Alias: &topodatapb.TabletAlias{ - Cell: "aa", - Uid: 2, - }, - Hostname: "host2", - PortMap: map[string]int32{ - "vt": 789, - }, - Keyspace: "keyspace", - Shard: "shard", - } - if err := ts.CreateTablet(context.Background(), tablet2); err != nil { - t.Fatalf("CreateTablet failed: %v", err) - } tw.loadTablets() + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1}) + checkChecksum(t, tw, 3238442862) - // If refreshKnownTablets is disabled, only the new tablet is read - // from the topo - if refreshKnownTablets { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1}) - } else { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1}) - } - checkChecksum(t, tw, 2762153755) - - // Check the new tablet is returned by GetAllTablets(). - allTablets = fhc.GetAllTablets() - key = TabletToMapKey(tablet2) - if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet2) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet2) - } - - // Load the tablets again to show that when refreshKnownTablets is disabled, - // only the list is read from the topo and the checksum doesn't change - tw.loadTablets() - if refreshKnownTablets { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2}) - } else { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1}) - } - checkChecksum(t, tw, 2762153755) - - // same tablet, different port, should update (previous - // one should go away, new one be added) + //// Check the tablet is returned by GetAllTablets(). + //allTablets := fhc.GetAllTablets() + //key := TabletToMapKey(tablet) + //assert.Len(t, allTablets, 1) + //assert.Contains(t, allTablets, key) + //assert.True(t, proto.Equal(tablet, allTablets[key])) // - // if refreshKnownTablets is disabled, this case is *not* - // detected and the tablet remains in the topo using the - // old key - origTablet := proto.Clone(tablet).(*topodatapb.Tablet) - origKey := TabletToMapKey(tablet) - tablet.PortMap["vt"] = 456 - if _, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { - t.PortMap["vt"] = 456 - return nil - }); err != nil { - t.Fatalf("UpdateTabletFields failed: %v", err) - } - tw.loadTablets() - allTablets = fhc.GetAllTablets() - key = TabletToMapKey(tablet) - - if refreshKnownTablets { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 1}) - - if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet) - } - if _, ok := allTablets[origKey]; ok { - t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, origKey) - } - checkChecksum(t, tw, 2762153755) - } else { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1}) - - if _, ok := allTablets[origKey]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[origKey], origTablet) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, origTablet) - } - if _, ok := allTablets[key]; ok { - t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) - } - checkChecksum(t, tw, 2762153755) - } - - // Both tablets restart on different hosts. - // tablet2 happens to land on the host:port that tablet 1 used to be on. - // This can only be tested when we refresh known tablets. - if refreshKnownTablets { - origTablet := proto.Clone(tablet).(*topodatapb.Tablet) - origTablet2 := proto.Clone(tablet2).(*topodatapb.Tablet) - - if _, err := ts.UpdateTabletFields(context.Background(), tablet2.Alias, func(t *topodatapb.Tablet) error { - t.Hostname = tablet.Hostname - t.PortMap = tablet.PortMap - tablet2 = t - return nil - }); err != nil { - t.Fatalf("UpdateTabletFields failed: %v", err) - } - if _, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { - t.Hostname = "host3" - tablet = t - return nil - }); err != nil { - t.Fatalf("UpdateTabletFields failed: %v", err) - } - tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 2}) - allTablets = fhc.GetAllTablets() - key2 := TabletToMapKey(tablet2) - if _, ok := allTablets[key2]; !ok { - t.Fatalf("tablet was lost because it's reusing an address recently used by another tablet: %v", key2) - } - - // Change tablets back to avoid altering later tests. - if _, err := ts.UpdateTabletFields(context.Background(), tablet2.Alias, func(t *topodatapb.Tablet) error { - t.Hostname = origTablet2.Hostname - t.PortMap = origTablet2.PortMap - tablet2 = t - return nil - }); err != nil { - t.Fatalf("UpdateTabletFields failed: %v", err) - } - if _, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { - t.Hostname = origTablet.Hostname - tablet = t - return nil - }); err != nil { - t.Fatalf("UpdateTabletFields failed: %v", err) - } - tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 2}) - } - - // Remove the tablet and check that it is detected as being gone. - if err := ts.DeleteTablet(context.Background(), tablet.Alias); err != nil { - t.Fatalf("DeleteTablet failed: %v", err) - } - if _, err := topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard"); err != nil { - t.Fatalf("FixShardReplication failed: %v", err) - } - tw.loadTablets() - if refreshKnownTablets { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "RemoveTablet": 1}) - } else { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "RemoveTablet": 1}) - } - checkChecksum(t, tw, 789108290) - - allTablets = fhc.GetAllTablets() - key = TabletToMapKey(tablet) - if _, ok := allTablets[key]; ok || len(allTablets) != 1 { - t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) - } - key = TabletToMapKey(tablet2) - if _, ok := allTablets[key]; !ok || len(allTablets) != 1 || !proto.Equal(allTablets[key], tablet2) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet2) - } - - // Remove the other and check that it is detected as being gone. - if err := ts.DeleteTablet(context.Background(), tablet2.Alias); err != nil { - t.Fatalf("DeleteTablet failed: %v", err) - } - if _, err := topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard"); err != nil { - t.Fatalf("FixShardReplication failed: %v", err) - } - tw.loadTablets() - checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1}) - checkChecksum(t, tw, 0) - - allTablets = fhc.GetAllTablets() - key = TabletToMapKey(tablet) - if _, ok := allTablets[key]; ok || len(allTablets) != 0 { - t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) - } - key = TabletToMapKey(tablet2) - if _, ok := allTablets[key]; ok || len(allTablets) != 0 { - t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) - } + //// Add a second tablet to the topology. + //tablet2 := &topodatapb.Tablet{ + // Alias: &topodatapb.TabletAlias{ + // Cell: "aa", + // Uid: 2, + // }, + // Hostname: "host2", + // PortMap: map[string]int32{ + // "vt": 789, + // }, + // Keyspace: "keyspace", + // Shard: "shard", + //} + //require.NoError(t, ts.CreateTablet(context.Background(), tablet2), "CreateTablet failed for %v", tablet2.Alias) + //tw.loadTablets() + // + //counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1}) + //checkChecksum(t, tw, 2762153755) + // + //// Check the new tablet is returned by GetAllTablets(). + //allTablets = fhc.GetAllTablets() + //key = TabletToMapKey(tablet2) + //assert.Len(t, allTablets, 2) + //assert.Contains(t, allTablets, key) + //assert.True(t, proto.Equal(tablet2, allTablets[key])) + // + //// same tablet, different port, should update (previous + //// one should go away, new one be added) + //// + //// if refreshKnownTablets is disabled, this case is *not* + //// detected and the tablet remains in the healthcheck using the + //// old key + //origTablet := proto.Clone(tablet).(*topodatapb.Tablet) + //origKey := TabletToMapKey(tablet) + //tablet.PortMap["vt"] = 456 + //_, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { + // t.PortMap["vt"] = 456 + // return nil + //}) + //require.Nil(t, err, "UpdateTabletFields failed") + // + //tw.loadTablets() + //allTablets = fhc.GetAllTablets() + //key = TabletToMapKey(tablet) + // + //if refreshKnownTablets { + // counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 1}) + // + // if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet) { + // t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet) + // } + // if _, ok := allTablets[origKey]; ok { + // t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, origKey) + // } + // checkChecksum(t, tw, 2762153755) + //} else { + // counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 0}) + // assert.Len(t, allTablets, 2) + // assert.Contains(t, allTablets, origKey) + // assert.True(t, proto.Equal(origTablet, allTablets[origKey])) + // assert.NotContains(t, allTablets, key) + // checkChecksum(t, tw, 2762153755) + //} + // + //// Both tablets restart on different hosts. + //// tablet2 happens to land on the host:port that tablet 1 used to be on. + //// This can only be tested when we refresh known tablets. + //if refreshKnownTablets { + // origTablet := proto.Clone(tablet).(*topodatapb.Tablet) + // origTablet2 := proto.Clone(tablet2).(*topodatapb.Tablet) + // + // _, err := ts.UpdateTabletFields(context.Background(), tablet2.Alias, func(t *topodatapb.Tablet) error { + // t.Hostname = tablet.Hostname + // t.PortMap = tablet.PortMap + // tablet2 = t + // return nil + // }) + // require.Nil(t, err, "UpdateTabletFields failed") + // _, err = ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { + // t.Hostname = "host3" + // tablet = t + // return nil + // }) + // require.Nil(t, err, "UpdateTabletFields failed") + // tw.loadTablets() + // counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 2}) + // allTablets = fhc.GetAllTablets() + // key2 := TabletToMapKey(tablet2) + // assert.Contains(t, allTablets, key2, "tablet was lost because it's reusing an address recently used by another tablet: %v", key2) + // + // // Change tablets back to avoid altering later tests. + // _, err = ts.UpdateTabletFields(context.Background(), tablet2.Alias, func(t *topodatapb.Tablet) error { + // t.Hostname = origTablet2.Hostname + // t.PortMap = origTablet2.PortMap + // tablet2 = t + // return nil + // }) + // require.Nil(t, err, "UpdateTabletFields failed") + // + // _, err = ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { + // t.Hostname = origTablet.Hostname + // tablet = t + // return nil + // }) + // require.Nil(t, err, "UpdateTabletFields failed") + // + // tw.loadTablets() + // counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 2}) + //} + // + //// Remove the tablet and check that it is detected as being gone. + //require.NoError(t, ts.DeleteTablet(context.Background(), tablet.Alias)) + // + //_, err = topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard") + //require.Nil(t, err, "FixShardReplication failed") + //tw.loadTablets() + //counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1}) + //checkChecksum(t, tw, 789108290) + // + //allTablets = fhc.GetAllTablets() + //assert.Len(t, allTablets, 1) + //key = TabletToMapKey(tablet) + //assert.NotContains(t, allTablets, key) + // + //key = TabletToMapKey(tablet2) + //assert.Contains(t, allTablets, key) + //assert.True(t, proto.Equal(tablet2, allTablets[key])) + // + //// Remove the other and check that it is detected as being gone. + //require.NoError(t, ts.DeleteTablet(context.Background(), tablet2.Alias)) + //_, err = topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard") + //require.Nil(t, err, "FixShardReplication failed") + //tw.loadTablets() + //checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1}) + //checkChecksum(t, tw, 0) + // + //allTablets = fhc.GetAllTablets() + //assert.Len(t, allTablets, 0) + //key = TabletToMapKey(tablet) + //assert.NotContains(t, allTablets, key) + //key = TabletToMapKey(tablet2) + //assert.NotContains(t, allTablets, key) tw.Stop() } @@ -393,19 +350,13 @@ func TestFilterByShard(t *testing.T) { for _, tc := range testcases { fbs, err := NewFilterByShard(tc.filters) - if err != nil { - t.Errorf("cannot create FilterByShard for filters %v: %v", tc.filters, err) - } + require.Nil(t, err, "cannot create FilterByShard for filters %v", tc.filters) tablet := &topodatapb.Tablet{ Keyspace: tc.keyspace, Shard: tc.shard, } - - got := fbs.IsIncluded(tablet) - if got != tc.included { - t.Errorf("isIncluded(%v,%v) for filters %v returned %v but expected %v", tc.keyspace, tc.shard, tc.filters, got, tc.included) - } + require.Equal(t, tc.included, fbs.IsIncluded(tablet)) } } @@ -432,7 +383,7 @@ func TestFilterByKeyspace(t *testing.T) { hc := NewFakeHealthCheck(nil) f := NewFilterByKeyspace(testKeyspacesToWatch) ts := memorytopo.NewServer(testCell) - tw := NewCellTabletsWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5) + tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5) for _, test := range testFilterByKeyspace { // Add a new tablet to the topology. @@ -450,22 +401,21 @@ func TestFilterByKeyspace(t *testing.T) { Shard: testShard, } - got := f.IsIncluded(tablet) - if got != test.expected { - t.Errorf("isIncluded(%v) for keyspace %v returned %v but expected %v", test.keyspace, test.keyspace, got, test.expected) - } + assert.Equal(t, test.expected, f.IsIncluded(tablet)) - if err := ts.CreateTablet(context.Background(), tablet); err != nil { - t.Errorf("CreateTablet failed: %v", err) - } + // Make this fatal because there is no point continuing if CreateTablet fails + require.NoError(t, ts.CreateTablet(context.Background(), tablet)) tw.loadTablets() key := TabletToMapKey(tablet) allTablets := hc.GetAllTablets() - if _, ok := allTablets[key]; ok != test.expected && proto.Equal(allTablets[key], tablet) != test.expected { - t.Errorf("Error adding tablet - got %v; want %v", ok, test.expected) + if test.expected { + assert.Contains(t, allTablets, key) + } else { + assert.NotContains(t, allTablets, key) } + assert.Equal(t, test.expected, proto.Equal(tablet, allTablets[key])) // Replace the tablet we added above tabletReplacement := &topodatapb.Tablet{ @@ -480,41 +430,37 @@ func TestFilterByKeyspace(t *testing.T) { Keyspace: test.keyspace, Shard: testShard, } - got = f.IsIncluded(tabletReplacement) - if got != test.expected { - t.Errorf("isIncluded(%v) for keyspace %v returned %v but expected %v", test.keyspace, test.keyspace, got, test.expected) - } - if err := ts.CreateTablet(context.Background(), tabletReplacement); err != nil { - t.Errorf("CreateTablet failed: %v", err) - } + assert.Equal(t, test.expected, f.IsIncluded(tabletReplacement)) + require.NoError(t, ts.CreateTablet(context.Background(), tabletReplacement)) tw.loadTablets() key = TabletToMapKey(tabletReplacement) allTablets = hc.GetAllTablets() - if _, ok := allTablets[key]; ok != test.expected && proto.Equal(allTablets[key], tabletReplacement) != test.expected { - t.Errorf("Error replacing tablet - got %v; want %v", ok, test.expected) + if test.expected { + assert.Contains(t, allTablets, key) + } else { + assert.NotContains(t, allTablets, key) } + assert.Equal(t, test.expected, proto.Equal(tabletReplacement, allTablets[key])) // Delete the tablet - if err := ts.DeleteTablet(context.Background(), tabletReplacement.Alias); err != nil { - t.Fatalf("DeleteTablet failed: %v", err) - } + require.NoError(t, ts.DeleteTablet(context.Background(), tabletReplacement.Alias)) } } -// TestFilterByKeypsaceSkipsIgnoredTablets confirms a bug fix for the case when a TopologyWatcher +// TestFilterByKeyspaceSkipsIgnoredTablets confirms a bug fix for the case when a TopologyWatcher // has a FilterByKeyspace TabletFilter configured along with refreshKnownTablets turned off. We want // to ensure that the TopologyWatcher: -// - does not continuosly call GetTablets for tablets that do not satisfy the filter -// - does not add or remove these filtered out tablets from the its healtcheck -func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { +// - does not continuously call GetTablets for tablets that do not satisfy the filter +// - does not add or remove these filtered out tablets from its healthcheck +func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) { ts := memorytopo.NewServer("aa") fhc := NewFakeHealthCheck(nil) topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() f := NewFilterByKeyspace(testKeyspacesToWatch) - tw := NewCellTabletsWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0) @@ -535,7 +481,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { require.NoError(t, ts.CreateTablet(context.Background(), tablet)) tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1}) + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1}) checkChecksum(t, tw, 3238442862) // Check tablet is reported by HealthCheck @@ -560,7 +506,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { require.NoError(t, ts.CreateTablet(context.Background(), tablet2)) tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1}) + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0}) checkChecksum(t, tw, 2762153755) // Check the new tablet is NOT reported by HealthCheck. @@ -572,7 +518,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { // Load the tablets again to show that when refreshKnownTablets is disabled, // only the list is read from the topo and the checksum doesn't change tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1}) + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0}) checkChecksum(t, tw, 2762153755) // With refreshKnownTablets set to false, changes to the port map for the same tablet alias @@ -584,7 +530,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { require.NoError(t, err) tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1}) + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0}) checkChecksum(t, tw, 2762153755) allTablets = fhc.GetAllTablets() @@ -600,7 +546,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { require.NoError(t, ts.DeleteTablet(context.Background(), tablet.Alias)) tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "RemoveTablet": 1}) + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1}) checkChecksum(t, tw, 789108290) assert.Empty(t, fhc.GetAllTablets()) @@ -608,7 +554,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { require.NoError(t, ts.DeleteTablet(context.Background(), tablet2.Alias)) tw.loadTablets() - checkOpCounts(t, counts, map[string]int64{"ListTablets": 1}) + checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0}) checkChecksum(t, tw, 0) assert.Empty(t, fhc.GetAllTablets()) diff --git a/go/vt/topo/consultopo/error.go b/go/vt/topo/consultopo/error.go index 42f474e065b..62167a4d295 100644 --- a/go/vt/topo/consultopo/error.go +++ b/go/vt/topo/consultopo/error.go @@ -40,15 +40,16 @@ var ( // are either application-level errors, or context errors. func convertError(err error, nodePath string) error { // Unwrap errors from the Go HTTP client. - if urlErr, ok := err.(*url.Error); ok { + var urlErr *url.Error + if errors.As(err, &urlErr) { err = urlErr.Err } // Convert specific sentinel values. - switch err { - case context.Canceled: + switch { + case errors.Is(err, context.Canceled): return topo.NewError(topo.Interrupted, nodePath) - case context.DeadlineExceeded: + case errors.Is(err, context.DeadlineExceeded): return topo.NewError(topo.Timeout, nodePath) } diff --git a/go/vt/topo/errors.go b/go/vt/topo/errors.go index a645f1aa178..3be4b60b103 100644 --- a/go/vt/topo/errors.go +++ b/go/vt/topo/errors.go @@ -36,6 +36,7 @@ const ( NoUpdateNeeded NoImplementation NoReadOnlyImplementation + ResourceExhausted ) // Error represents a topo error. @@ -68,6 +69,8 @@ func NewError(code ErrorCode, node string) error { message = fmt.Sprintf("no such topology implementation %s", node) case NoReadOnlyImplementation: message = fmt.Sprintf("no read-only topology implementation %s", node) + case ResourceExhausted: + message = fmt.Sprintf("server resource exhausted: %s", node) default: message = fmt.Sprintf("unknown code: %s", node) } diff --git a/go/vt/topo/etcd2topo/error.go b/go/vt/topo/etcd2topo/error.go index e784fecd9b9..5e13d0bdf8d 100644 --- a/go/vt/topo/etcd2topo/error.go +++ b/go/vt/topo/etcd2topo/error.go @@ -45,7 +45,8 @@ func convertError(err error, nodePath string) error { return nil } - if typeErr, ok := err.(rpctypes.EtcdError); ok { + var typeErr rpctypes.EtcdError + if errors.As(err, &typeErr) { switch typeErr.Code() { case codes.NotFound: return topo.NewError(topo.NoNode, nodePath) @@ -61,6 +62,8 @@ func convertError(err error, nodePath string) error { // etcd primary election is failing, so timeout // also sounds reasonable there. return topo.NewError(topo.Timeout, nodePath) + case codes.ResourceExhausted: + return topo.NewError(topo.ResourceExhausted, nodePath) } return err } @@ -74,15 +77,17 @@ func convertError(err error, nodePath string) error { return topo.NewError(topo.Interrupted, nodePath) case codes.DeadlineExceeded: return topo.NewError(topo.Timeout, nodePath) + case codes.ResourceExhausted: + return topo.NewError(topo.ResourceExhausted, nodePath) default: return err } } - switch err { - case context.Canceled: + switch { + case errors.Is(err, context.Canceled): return topo.NewError(topo.Interrupted, nodePath) - case context.DeadlineExceeded: + case errors.Is(err, context.DeadlineExceeded): return topo.NewError(topo.Timeout, nodePath) default: return err diff --git a/go/vt/topo/memorytopo/file.go b/go/vt/topo/memorytopo/file.go index 0abfc56cb80..e45d2b23ee4 100644 --- a/go/vt/topo/memorytopo/file.go +++ b/go/vt/topo/memorytopo/file.go @@ -187,6 +187,9 @@ func (c *Conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, if c.factory.err != nil { return nil, c.factory.err } + if c.factory.listErr != nil { + return nil, c.factory.listErr + } dir, file := path.Split(filePathPrefix) // Get the node to list. diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index cdad2ddbcdd..0aa066054f4 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -75,6 +75,9 @@ type Factory struct { // err is used for testing purposes to force queries / watches // to return the given error err error + // listErr is used for testing purposed to fake errors from + // calls to List. + listErr error } // HasGlobalReadOnlyCell is part of the topo.Factory interface. @@ -343,6 +346,13 @@ func (f *Factory) recursiveDelete(n *node) { } } +func (f *Factory) SetListError(err error) { + f.mu.Lock() + defer f.mu.Unlock() + + f.listErr = err +} + func init() { rand.Seed(time.Now().UnixNano()) } diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index 7f03bf13364..b8e9344109d 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -635,7 +635,7 @@ func (ts *Server) GetTabletMapForShardByCell(ctx context.Context, keyspace, shar // get the tablets for the cells we were able to reach, forward // ErrPartialResult from FindAllTabletAliasesInShard - result, gerr := ts.GetTabletMap(ctx, aliases) + result, gerr := ts.GetTabletMap(ctx, aliases, nil) if gerr == nil && err != nil { gerr = err } diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go index 619b67489e4..eeadc3edc65 100644 --- a/go/vt/topo/tablet.go +++ b/go/vt/topo/tablet.go @@ -19,6 +19,7 @@ package topo import ( "context" "fmt" + "golang.org/x/sync/semaphore" "path" "sort" "sync" @@ -285,10 +286,17 @@ func (ts *Server) GetTabletAliasesByCell(ctx context.Context, cell string) ([]*t return result, nil } +// GetTabletsByCellOptions controls the behavior of +// Server.FindAllShardsInKeyspace. +type GetTabletsByCellOptions struct { + // Concurrency controls the maximum number of concurrent calls to GetTablet. + Concurrency int64 +} + // GetTabletsByCell returns all the tablets in the cell. // It returns ErrNoNode if the cell doesn't exist. // It returns (nil, nil) if the cell exists, but there are no tablets in it. -func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string) ([]*TabletInfo, error) { +func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *GetTabletsByCellOptions) ([]*TabletInfo, error) { // If the cell doesn't exist, this will return ErrNoNode. cellConn, err := ts.ConnForCell(ctx, cellAlias) if err != nil { @@ -296,10 +304,12 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string) ([]*Ta } listResults, err := cellConn.List(ctx, TabletsPath) if err != nil || len(listResults) == 0 { - // Currently the ZooKeeper and Memory topo implementations do not support scans + // Currently the ZooKeeper implementation does not support scans // so we fall back to the more costly method of fetching the tablets one by one. - if IsErrType(err, NoImplementation) { - return ts.GetTabletsIndividuallyByCell(ctx, cellAlias) + // In the etcd case, it is possible that the response is too large. We also fall + // back to fetching the tablets one by one in that case. + if IsErrType(err, NoImplementation) || IsErrType(err, ResourceExhausted) { + return ts.GetTabletsIndividuallyByCell(ctx, cellAlias, opt) } if IsErrType(err, NoNode) { return nil, nil @@ -323,7 +333,7 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string) ([]*Ta // directly support the topoConn.List() functionality. // It returns ErrNoNode if the cell doesn't exist. // It returns (nil, nil) if the cell exists, but there are no tablets in it. -func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string) ([]*TabletInfo, error) { +func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string, opt *GetTabletsByCellOptions) ([]*TabletInfo, error) { // If the cell doesn't exist, this will return ErrNoNode. aliases, err := ts.GetTabletAliasesByCell(ctx, cell) if err != nil { @@ -331,7 +341,7 @@ func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string) } sort.Sort(topoproto.TabletAliasList(aliases)) - tabletMap, err := ts.GetTabletMap(ctx, aliases) + tabletMap, err := ts.GetTabletMap(ctx, aliases, opt) if err != nil { // we got another error than topo.ErrNoNode return nil, err @@ -506,41 +516,62 @@ func DeleteTabletReplicationData(ctx context.Context, ts *Server, tablet *topoda } // GetTabletMap tries to read all the tablets in the provided list, -// and returns them all in a map. -// If error is ErrPartialResult, the results in the dictionary are +// and returns them in a map. +// If error is ErrPartialResult, the results in the map are // incomplete, meaning some tablets couldn't be read. // The map is indexed by topoproto.TabletAliasString(tablet alias). -func (ts *Server) GetTabletMap(ctx context.Context, tabletAliases []*topodatapb.TabletAlias) (map[string]*TabletInfo, error) { +func (ts *Server) GetTabletMap(ctx context.Context, tabletAliases []*topodatapb.TabletAlias, opt *GetTabletsByCellOptions) (map[string]*TabletInfo, error) { span, ctx := trace.NewSpan(ctx, "topo.GetTabletMap") span.Annotate("num_tablets", len(tabletAliases)) defer span.Finish() - wg := sync.WaitGroup{} - mutex := sync.Mutex{} + var ( + mu sync.Mutex + wg sync.WaitGroup + tabletMap = make(map[string]*TabletInfo) + returnErr error + // Previously this was always run with unlimited concurrency, so 32 should be fine. + concurrency int64 = 32 + ) - tabletMap := make(map[string]*TabletInfo) - var someError error + if opt != nil && opt.Concurrency > 0 { + concurrency = opt.Concurrency + } + var sem = semaphore.NewWeighted(concurrency) for _, tabletAlias := range tabletAliases { wg.Add(1) go func(tabletAlias *topodatapb.TabletAlias) { defer wg.Done() + if err := sem.Acquire(ctx, 1); err != nil { + // Only happens if context is cancelled. + mu.Lock() + defer mu.Unlock() + log.Warningf("%v: %v", tabletAlias, err) + // We only need to set this on the first error. + if returnErr == nil { + returnErr = NewError(PartialResult, tabletAlias.GetCell()) + } + return + } tabletInfo, err := ts.GetTablet(ctx, tabletAlias) - mutex.Lock() + sem.Release(1) + mu.Lock() + defer mu.Unlock() if err != nil { log.Warningf("%v: %v", tabletAlias, err) // There can be data races removing nodes - ignore them for now. - if !IsErrType(err, NoNode) { - someError = NewError(PartialResult, "") + // We only need to set this on first error. + if returnErr == nil && !IsErrType(err, NoNode) { + returnErr = NewError(PartialResult, tabletAlias.GetCell()) } } else { tabletMap[topoproto.TabletAliasString(tabletAlias)] = tabletInfo } - mutex.Unlock() }(tabletAlias) } wg.Wait() - return tabletMap, someError + return tabletMap, returnErr } // InitTablet creates or updates a tablet. If no parent is specified diff --git a/go/vt/topo/tablet_test.go b/go/vt/topo/tablet_test.go new file mode 100644 index 00000000000..1f94fd62a47 --- /dev/null +++ b/go/vt/topo/tablet_test.go @@ -0,0 +1,115 @@ +/* +Copyright 2023 The Vitess Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topo_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" +) + +// Test various cases of calls to GetTabletsByCell. +// GetTabletsByCell first tries to get all the tablets using List. +// If the response is too large, we will get an error, and fall back to one tablet at a time. +func TestServerGetTabletsByCell(t *testing.T) { + tests := []struct { + name string + tablets int + opt *topo.GetTabletsByCellOptions + listError error + }{ + { + name: "negative concurrency", + tablets: 1, + // Ensure this doesn't panic. + opt: &topo.GetTabletsByCellOptions{Concurrency: -1}, + }, + { + name: "single", + tablets: 1, + // Make sure the defaults apply as expected. + opt: nil, + }, + { + name: "multiple", + // should work with more than 1 tablet + tablets: 32, + opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, + }, + { + name: "multiple with list error", + // should work with more than 1 tablet when List returns an error + tablets: 32, + opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, + listError: topo.NewError(topo.ResourceExhausted, ""), + }, + } + + const cell = "zone1" + const keyspace = "keyspace" + const shard = "shard" + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ts, factory := memorytopo.NewServerAndFactory(cell) + defer ts.Close() + if tt.listError != nil { + factory.SetListError(tt.listError) + } + + // Create an ephemeral keyspace and generate shard records within + // the keyspace to fetch later. + require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})) + require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) + + tablets := make([]*topo.TabletInfo, tt.tablets) + + for i := 0; i < tt.tablets; i++ { + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(i), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(i), + }, + Keyspace: keyspace, + Shard: shard, + } + tInfo := &topo.TabletInfo{Tablet: tablet} + tablets[i] = tInfo + require.NoError(t, ts.CreateTablet(ctx, tablet)) + } + + // Verify that we return a complete list of tablets and that each + // tablet matches what we expect. + out, err := ts.GetTabletsByCell(ctx, cell, tt.opt) + require.NoError(t, err) + require.Len(t, out, tt.tablets) + + for i, tab := range tablets { + require.Equal(t, tab.Tablet, tablets[i].Tablet) + } + }) + } +} diff --git a/go/vt/topo/zk2topo/error.go b/go/vt/topo/zk2topo/error.go index 1ebc3896f40..1149ad60bf3 100644 --- a/go/vt/topo/zk2topo/error.go +++ b/go/vt/topo/zk2topo/error.go @@ -18,6 +18,7 @@ package zk2topo import ( "context" + "errors" "github.com/z-division/go-zookeeper/zk" @@ -26,20 +27,20 @@ import ( // Error codes returned by the zookeeper Go client: func convertError(err error, node string) error { - switch err { - case zk.ErrBadVersion: + switch { + case errors.Is(err, zk.ErrBadVersion): return topo.NewError(topo.BadVersion, node) - case zk.ErrNoNode: + case errors.Is(err, zk.ErrNoNode): return topo.NewError(topo.NoNode, node) - case zk.ErrNodeExists: + case errors.Is(err, zk.ErrNodeExists): return topo.NewError(topo.NodeExists, node) - case zk.ErrNotEmpty: + case errors.Is(err, zk.ErrNotEmpty): return topo.NewError(topo.NodeNotEmpty, node) - case zk.ErrSessionExpired: + case errors.Is(err, zk.ErrSessionExpired): return topo.NewError(topo.Timeout, node) - case context.Canceled: + case errors.Is(err, context.Canceled): return topo.NewError(topo.Interrupted, node) - case context.DeadlineExceeded: + case errors.Is(err, context.DeadlineExceeded): return topo.NewError(topo.Timeout, node) } return err diff --git a/go/vt/topotools/tablet.go b/go/vt/topotools/tablet.go index af6f4b3c3c6..59d9088be9e 100644 --- a/go/vt/topotools/tablet.go +++ b/go/vt/topotools/tablet.go @@ -127,7 +127,7 @@ func DoCellsHaveRdonlyTablets(ctx context.Context, ts *topo.Server, cells []stri } for _, cell := range cells { - tablets, err := ts.GetTabletsByCell(ctx, cell) + tablets, err := ts.GetTabletsByCell(ctx, cell, nil) if err != nil { return false, err } diff --git a/go/vt/topotools/utils.go b/go/vt/topotools/utils.go index 6b618383a1e..6d1522e04e7 100644 --- a/go/vt/topotools/utils.go +++ b/go/vt/topotools/utils.go @@ -43,7 +43,7 @@ func GetTabletMapForCell(ctx context.Context, ts *topo.Server, cell string) (map if err != nil { return nil, err } - tabletMap, err := ts.GetTabletMap(ctx, aliases) + tabletMap, err := ts.GetTabletMap(ctx, aliases, nil) if err != nil { // we got another error than topo.ErrNoNode return nil, err @@ -65,7 +65,7 @@ func GetAllTabletsAcrossCells(ctx context.Context, ts *topo.Server) ([]*topo.Tab wg.Add(len(cells)) for i, cell := range cells { go func(i int, cell string) { - results[i], errors[i] = ts.GetTabletsByCell(ctx, cell) + results[i], errors[i] = ts.GetTabletsByCell(ctx, cell, nil) wg.Done() }(i, cell) } diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go index 4c1bdcce199..7e82275a5a1 100644 --- a/go/vt/vtctl/grpcvtctldserver/server.go +++ b/go/vt/vtctl/grpcvtctldserver/server.go @@ -1672,7 +1672,7 @@ func (s *VtctldServer) GetTablets(ctx context.Context, req *vtctldatapb.GetTable case len(req.TabletAliases) > 0: span.Annotate("tablet_aliases", strings.Join(topoproto.TabletAliasList(req.TabletAliases).ToStringSlice(), ",")) - tabletMap, err = s.ts.GetTabletMap(ctx, req.TabletAliases) + tabletMap, err = s.ts.GetTabletMap(ctx, req.TabletAliases, nil) if err != nil { err = fmt.Errorf("GetTabletMap(%v) failed: %w", req.TabletAliases, err) } @@ -1746,7 +1746,7 @@ func (s *VtctldServer) GetTablets(ctx context.Context, req *vtctldatapb.GetTable go func(cell string) { defer wg.Done() - tablets, err := s.ts.GetTabletsByCell(ctx, cell) + tablets, err := s.ts.GetTabletsByCell(ctx, cell, nil) if err != nil { if req.Strict { log.Infof("GetTablets got an error from cell %s: %s. Running in strict mode, so canceling other cell RPCs", cell, err) @@ -3874,7 +3874,7 @@ func (s *VtctldServer) ValidateShard(ctx context.Context, req *vtctldatapb.Valid getTabletMapCtx, getTabletMapCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer getTabletMapCancel() - tabletMap, _ := s.ts.GetTabletMap(getTabletMapCtx, aliases) + tabletMap, _ := s.ts.GetTabletMap(getTabletMapCtx, aliases, nil) var primaryAlias *topodatapb.TabletAlias for _, alias := range aliases { diff --git a/go/vt/vtctl/grpcvtctldserver/topo.go b/go/vt/vtctl/grpcvtctldserver/topo.go index 70fae6613aa..5ec369ca17f 100644 --- a/go/vt/vtctl/grpcvtctldserver/topo.go +++ b/go/vt/vtctl/grpcvtctldserver/topo.go @@ -161,7 +161,7 @@ func deleteShardCell(ctx context.Context, ts *topo.Server, keyspace string, shar // Get all the tablet records for the aliases we've collected. Note that // GetTabletMap ignores ErrNoNode, which is convenient for our purpose; it // means a tablet was deleted but is still referenced. - tabletMap, err := ts.GetTabletMap(ctx, aliases) + tabletMap, err := ts.GetTabletMap(ctx, aliases, nil) if err != nil { return fmt.Errorf("GetTabletMap() failed: %w", err) } diff --git a/go/vt/wrangler/shard.go b/go/vt/wrangler/shard.go index 8ea85290022..695f17b2f75 100644 --- a/go/vt/wrangler/shard.go +++ b/go/vt/wrangler/shard.go @@ -113,7 +113,7 @@ func (wr *Wrangler) DeleteShard(ctx context.Context, keyspace, shard string, rec // GetTabletMap ignores ErrNoNode, and it's good for // our purpose, it means a tablet was deleted but is // still referenced. - tabletMap, err := wr.ts.GetTabletMap(ctx, aliases) + tabletMap, err := wr.ts.GetTabletMap(ctx, aliases, nil) if err != nil { return fmt.Errorf("GetTabletMap() failed: %v", err) } diff --git a/go/vt/wrangler/split.go b/go/vt/wrangler/split.go index d780fa10025..d77677bc48f 100644 --- a/go/vt/wrangler/split.go +++ b/go/vt/wrangler/split.go @@ -41,7 +41,7 @@ const ( // on a Shard. func (wr *Wrangler) SetSourceShards(ctx context.Context, keyspace, shard string, sources []*topodatapb.TabletAlias, tables []string) error { // Read the source tablets. - sourceTablets, err := wr.ts.GetTabletMap(ctx, sources) + sourceTablets, err := wr.ts.GetTabletMap(ctx, sources, nil) if err != nil { return err }