diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 5e25be60f4f..5d6a5e32662 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -87,9 +87,6 @@ var ( // refreshKnownTablets tells us whether to process all tablets or only new tablets. refreshKnownTablets = true - // topoReadConcurrency tells us how many topo reads are allowed in parallel. - topoReadConcurrency int64 = 32 - // How much to sleep between each check. waitAvailableTabletInterval = 100 * time.Millisecond @@ -107,11 +104,6 @@ const ( DefaultHealthCheckRetryDelay = 5 * time.Second DefaultHealthCheckTimeout = 1 * time.Minute - // DefaultTopoReadConcurrency is used as the default value for the topoReadConcurrency parameter of a TopologyWatcher. - DefaultTopoReadConcurrency int = 5 - // DefaultTopologyWatcherRefreshInterval is used as the default value for - // the refresh interval of a topology watcher. - DefaultTopologyWatcherRefreshInterval = 1 * time.Minute // healthCheckTemplate is the HTML code to display a TabletsCacheStatusList, it takes a parameter for the title // as the template can be used for both HealthCheck's cache and healthy tablets list. healthCheckTemplate = ` @@ -176,7 +168,6 @@ func registerWebUIFlags(fs *pflag.FlagSet) { fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.") fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.") fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.") - fs.Int64Var(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.") ParseTabletURLTemplateFromFlag() } @@ -362,7 +353,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur } else if len(KeyspacesToWatch) > 0 { filter = NewFilterByKeyspace(KeyspacesToWatch) } - topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topoReadConcurrency)) + topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency)) } hc.topoWatchers = topoWatchers diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index dbee11c1610..3945268f62e 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -70,7 +70,7 @@ type TopologyWatcher struct { cell string refreshInterval time.Duration refreshKnownTablets bool - concurrency int64 + concurrency int ctx context.Context cancelFunc context.CancelFunc // wg keeps track of all launched Go routines. @@ -92,7 +92,7 @@ type TopologyWatcher struct { // NewTopologyWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and reloads them as needed. -func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int64) *TopologyWatcher { +func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, healthcheck: hc, diff --git a/go/vt/schemamanager/tablet_executor.go b/go/vt/schemamanager/tablet_executor.go index cd1691dd01e..4f0326f70b1 100644 --- a/go/vt/schemamanager/tablet_executor.go +++ b/go/vt/schemamanager/tablet_executor.go @@ -107,16 +107,12 @@ func (exec *TabletExecutor) Open(ctx context.Context, keyspace string) error { return nil } exec.keyspace = keyspace - shardNames, err := exec.ts.GetShardNames(ctx, keyspace) + shards, err := exec.ts.FindAllShardsInKeyspace(ctx, keyspace, nil) if err != nil { - return fmt.Errorf("unable to get shard names for keyspace: %s, error: %v", keyspace, err) + return fmt.Errorf("unable to get shards for keyspace: %s, error: %v", keyspace, err) } - exec.tablets = make([]*topodatapb.Tablet, len(shardNames)) - for i, shardName := range shardNames { - shardInfo, err := exec.ts.GetShard(ctx, keyspace, shardName) - if err != nil { - return fmt.Errorf("unable to get shard info, keyspace: %s, shard: %s, error: %v", keyspace, shardName, err) - } + exec.tablets = make([]*topodatapb.Tablet, 0, len(shards)) + for shardName, shardInfo := range shards { if !shardInfo.HasPrimary() { return fmt.Errorf("shard: %s does not have a primary", shardName) } @@ -124,7 +120,7 @@ func (exec *TabletExecutor) Open(ctx context.Context, keyspace string) error { if err != nil { return fmt.Errorf("unable to get primary tablet info, keyspace: %s, shard: %s, error: %v", keyspace, shardName, err) } - exec.tablets[i] = tabletInfo.Tablet + exec.tablets = append(exec.tablets, tabletInfo.Tablet) } if len(exec.tablets) == 0 { diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index 4ff53e24204..9cbd167d174 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -19,11 +19,15 @@ package topo import ( "context" "path" + "sort" "sync" + "github.com/spf13/pflag" "golang.org/x/sync/errgroup" "vitess.io/vitess/go/constants/sidecar" + "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/event" @@ -34,7 +38,20 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) -// This file contains keyspace utility functions +// This file contains keyspace utility functions. + +// Default concurrency to use in order to avoid overhwelming the topo server. +var DefaultConcurrency = 32 + +func registerFlags(fs *pflag.FlagSet) { + fs.IntVar(&DefaultConcurrency, "topo_read_concurrency", DefaultConcurrency, "Concurrency of topo reads.") +} + +func init() { + servenv.OnParseFor("vtcombo", registerFlags) + servenv.OnParseFor("vtctld", registerFlags) + servenv.OnParseFor("vtgate", registerFlags) +} // KeyspaceInfo is a meta struct that contains metadata to give the // data more context and convenience. This is the main way we interact @@ -188,12 +205,60 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, opt = &FindAllShardsInKeyspaceOptions{} } if opt.Concurrency <= 0 { - opt.Concurrency = 1 + opt.Concurrency = DefaultConcurrency + } + + // First try to get all shards using List if we can. + buildResultFromList := func(kvpairs []KVInfo) (map[string]*ShardInfo, error) { + result := make(map[string]*ShardInfo, len(kvpairs)) + for _, entry := range kvpairs { + // The shard key looks like this: /vitess/global/keyspaces/commerce/shards/-80/Shard + shardKey := string(entry.Key) + shardName := path.Base(path.Dir(shardKey)) // The base part of the dir is "-80" + // Validate the extracted shard name. + if _, _, err := ValidateShardName(shardName); err != nil { + return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): unexpected shard key/path %q contains invalid shard name/range %q", + keyspace, shardKey, shardName) + } + shard := &topodatapb.Shard{} + if err := shard.UnmarshalVT(entry.Value); err != nil { + return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): invalid data found for shard %q in %q", + keyspace, shardName, shardKey) + } + result[shardName] = &ShardInfo{ + keyspace: keyspace, + shardName: shardName, + version: entry.Version, + Shard: shard, + } + } + return result, nil + } + shardsPath := path.Join(KeyspacesPath, keyspace, ShardsPath) + listRes, err := ts.globalCell.List(ctx, shardsPath) + if err == nil { // We have everything we need to build the result + return buildResultFromList(listRes) + } + if IsErrType(err, NoNode) { + // The path doesn't exist, let's see if the keyspace exists. + if _, kerr := ts.GetKeyspace(ctx, keyspace); kerr != nil { + return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): List", keyspace) + } + // We simply have no shards. + return make(map[string]*ShardInfo, 0), nil + } + // Currently the ZooKeeper implementation does not support index prefix + // scans so we fall back to concurrently fetching the shards one by one. + // It is also possible that the response containing all shards is too + // large in which case we also fall back to the one by one fetch. + if !IsErrType(err, NoImplementation) && !IsErrType(err, ResourceExhausted) { + return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): List", keyspace) } + // Fall back to the shard by shard method. shards, err := ts.GetShardNames(ctx, keyspace) if err != nil { - return nil, vterrors.Wrapf(err, "failed to get list of shards for keyspace '%v'", keyspace) + return nil, vterrors.Wrapf(err, "failed to get list of shard names for keyspace '%s'", keyspace) } // Keyspaces with a large number of shards and geographically distributed @@ -213,7 +278,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, ) eg, ctx := errgroup.WithContext(ctx) - eg.SetLimit(opt.Concurrency) + eg.SetLimit(int(opt.Concurrency)) for _, shard := range shards { shard := shard @@ -222,7 +287,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, si, err := ts.GetShard(ctx, keyspace, shard) switch { case IsErrType(err, NoNode): - log.Warningf("GetShard(%v, %v) returned ErrNoNode, consider checking the topology.", keyspace, shard) + log.Warningf("GetShard(%s, %s) returned ErrNoNode, consider checking the topology.", keyspace, shard) return nil case err == nil: mu.Lock() @@ -231,7 +296,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, return nil default: - return vterrors.Wrapf(err, "GetShard(%v, %v) failed", keyspace, shard) + return vterrors.Wrapf(err, "GetShard(%s, %s) failed", keyspace, shard) } }) } @@ -245,25 +310,26 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, // GetServingShards returns all shards where the primary is serving. func (ts *Server) GetServingShards(ctx context.Context, keyspace string) ([]*ShardInfo, error) { - shards, err := ts.GetShardNames(ctx, keyspace) + shards, err := ts.FindAllShardsInKeyspace(ctx, keyspace, nil) if err != nil { return nil, vterrors.Wrapf(err, "failed to get list of shards for keyspace '%v'", keyspace) } result := make([]*ShardInfo, 0, len(shards)) for _, shard := range shards { - si, err := ts.GetShard(ctx, keyspace, shard) - if err != nil { - return nil, vterrors.Wrapf(err, "GetShard(%v, %v) failed", keyspace, shard) - } - if !si.IsPrimaryServing { + if !shard.IsPrimaryServing { continue } - result = append(result, si) + result = append(result, shard) } if len(result) == 0 { return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%v has no serving shards", keyspace) } + // Sort the shards by KeyRange for deterministic results. + sort.Slice(result, func(i, j int) bool { + return key.KeyRangeLess(result[i].KeyRange, result[j].KeyRange) + }) + return result, nil } diff --git a/go/vt/topo/keyspace_external_test.go b/go/vt/topo/keyspace_external_test.go index 064c4cba93b..4d62c052976 100644 --- a/go/vt/topo/keyspace_external_test.go +++ b/go/vt/topo/keyspace_external_test.go @@ -18,14 +18,17 @@ package topo_test import ( "context" + "fmt" + "slices" "testing" "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/key" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) func TestServerFindAllShardsInKeyspace(t *testing.T) { @@ -87,3 +90,94 @@ func TestServerFindAllShardsInKeyspace(t *testing.T) { }) } } + +func TestServerGetServingShards(t *testing.T) { + keyspace := "ks1" + errNoListImpl := topo.NewError(topo.NoImplementation, "don't be doing no listing round here") + + tests := []struct { + shards int // Number of shards to create + err string // Error message we expect, if any + fallback bool // Should we fallback to the shard by shard method + }{ + { + shards: 0, + err: fmt.Sprintf("%s has no serving shards", keyspace), + }, + { + shards: 2, + }, + { + shards: 128, + }, + { + shards: 512, + fallback: true, + }, + { + shards: 1024, + }, + } + + for _, tt := range tests { + t.Run(fmt.Sprintf("%d shards with fallback = %t", tt.shards, tt.fallback), func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts, factory := memorytopo.NewServerAndFactory(ctx) + defer ts.Close() + stats := factory.GetCallStats() + require.NotNil(t, stats) + + if tt.fallback { + factory.SetListError(errNoListImpl) + } + + err := ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}) + require.NoError(t, err) + var shardNames []string + if tt.shards > 0 { + shardNames, err = key.GenerateShardRanges(tt.shards) + require.NoError(t, err) + for _, shardName := range shardNames { + err = ts.CreateShard(ctx, keyspace, shardName) + require.NoError(t, err) + } + } + + // Verify that we return a complete list of shards and that each + // key range is present in the output. + stats.ResetAll() // We only want the stats for GetServingShards + shardInfos, err := ts.GetServingShards(ctx, keyspace) + if tt.err != "" { + require.EqualError(t, err, tt.err) + return + } + require.NoError(t, err) + require.Len(t, shardInfos, tt.shards) + for _, shardName := range shardNames { + f := func(si *topo.ShardInfo) bool { + return key.KeyRangeString(si.Shard.KeyRange) == shardName + } + require.True(t, slices.ContainsFunc(shardInfos, f), "shard %q was not found in the results", + shardName) + } + + // Now we check the stats based on the number of shards and whether or not + // we should have had a List error and fell back to the shard by shard method. + callcounts := stats.Counts() + require.NotNil(t, callcounts) + require.Equal(t, int64(1), callcounts["List"]) // We should always try + switch { + case tt.fallback: // We get the shards one by one from the list + require.Equal(t, int64(1), callcounts["ListDir"]) // GetShardNames + require.Equal(t, int64(tt.shards), callcounts["Get"]) // GetShard + case tt.shards < 1: // We use a Get to check that the keyspace exists + require.Equal(t, int64(0), callcounts["ListDir"]) + require.Equal(t, int64(1), callcounts["Get"]) + default: // We should not make any ListDir or Get calls + require.Equal(t, int64(0), callcounts["ListDir"]) + require.Equal(t, int64(0), callcounts["Get"]) + } + }) + } +} diff --git a/go/vt/topo/memorytopo/directory.go b/go/vt/topo/memorytopo/directory.go index f68c87a2166..8e673f474a6 100644 --- a/go/vt/topo/memorytopo/directory.go +++ b/go/vt/topo/memorytopo/directory.go @@ -27,6 +27,8 @@ import ( // ListDir is part of the topo.Conn interface. func (c *Conn) ListDir(ctx context.Context, dirPath string, full bool) ([]topo.DirEntry, error) { + c.factory.callstats.Add([]string{"ListDir"}, 1) + if err := c.dial(ctx); err != nil { return nil, err } diff --git a/go/vt/topo/memorytopo/election.go b/go/vt/topo/memorytopo/election.go index 868a2c53287..52bbe2e93ce 100644 --- a/go/vt/topo/memorytopo/election.go +++ b/go/vt/topo/memorytopo/election.go @@ -24,8 +24,10 @@ import ( "vitess.io/vitess/go/vt/topo" ) -// NewLeaderParticipation is part of the topo.Server interface +// NewLeaderParticipation is part of the topo.Conn interface. func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation, error) { + c.factory.callstats.Add([]string{"NewLeaderParticipation"}, 1) + if c.closed { return nil, ErrConnectionClosed } diff --git a/go/vt/topo/memorytopo/file.go b/go/vt/topo/memorytopo/file.go index cc19eb79011..800e7791afa 100644 --- a/go/vt/topo/memorytopo/file.go +++ b/go/vt/topo/memorytopo/file.go @@ -30,6 +30,8 @@ import ( // Create is part of topo.Conn interface. func (c *Conn) Create(ctx context.Context, filePath string, contents []byte) (topo.Version, error) { + c.factory.callstats.Add([]string{"Create"}, 1) + if err := c.dial(ctx); err != nil { return nil, err } @@ -74,6 +76,8 @@ func (c *Conn) Create(ctx context.Context, filePath string, contents []byte) (to // Update is part of topo.Conn interface. func (c *Conn) Update(ctx context.Context, filePath string, contents []byte, version topo.Version) (topo.Version, error) { + c.factory.callstats.Add([]string{"Update"}, 1) + if err := c.dial(ctx); err != nil { return nil, err } @@ -152,6 +156,8 @@ func (c *Conn) Update(ctx context.Context, filePath string, contents []byte, ver // Get is part of topo.Conn interface. func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version, error) { + c.factory.callstats.Add([]string{"Get"}, 1) + if err := c.dial(ctx); err != nil { return nil, nil, err } @@ -177,6 +183,8 @@ func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version, // List is part of the topo.Conn interface. func (c *Conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) { + c.factory.callstats.Add([]string{"List"}, 1) + if err := c.dial(ctx); err != nil { return nil, err } @@ -239,6 +247,8 @@ func gatherChildren(n *node, dirPath string) []topo.KVInfo { // Delete is part of topo.Conn interface. func (c *Conn) Delete(ctx context.Context, filePath string, version topo.Version) error { + c.factory.callstats.Add([]string{"Delete"}, 1) + if err := c.dial(ctx); err != nil { return err } diff --git a/go/vt/topo/memorytopo/lock.go b/go/vt/topo/memorytopo/lock.go index c15fb9099bb..0545ba8b182 100644 --- a/go/vt/topo/memorytopo/lock.go +++ b/go/vt/topo/memorytopo/lock.go @@ -42,11 +42,15 @@ type memoryTopoLockDescriptor struct { // TryLock is part of the topo.Conn interface. Its implementation is same as Lock func (c *Conn) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { + c.factory.callstats.Add([]string{"TryLock"}, 1) + return c.Lock(ctx, dirPath, contents) } // Lock is part of the topo.Conn interface. func (c *Conn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { + c.factory.callstats.Add([]string{"Lock"}, 1) + return c.lock(ctx, dirPath, contents) } diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index ae33bb73edd..55057b62468 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -26,6 +26,7 @@ import ( "strings" "sync" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" @@ -73,6 +74,9 @@ type Factory struct { // listErr is used for testing purposed to fake errors from // calls to List. listErr error + // callstats allows us to keep track of how many topo.Conn calls + // we make (Create, Get, Update, Delete, List, ListDir, etc). + callstats *stats.CountersWithMultiLabels } // HasGlobalReadOnlyCell is part of the topo.Factory interface. @@ -108,6 +112,10 @@ func (f *Factory) SetError(err error) { } } +func (f *Factory) GetCallStats() *stats.CountersWithMultiLabels { + return f.callstats +} + // Lock blocks all requests to the topo and is exposed to allow tests to // simulate an unresponsive topo server func (f *Factory) Lock() { @@ -144,6 +152,7 @@ func (c *Conn) dial(ctx context.Context) error { // Close is part of the topo.Conn interface. func (c *Conn) Close() { + c.factory.callstats.Add([]string{"Close"}, 1) c.closed = true } @@ -240,6 +249,7 @@ func NewServerAndFactory(ctx context.Context, cells ...string) (*topo.Server, *F f := &Factory{ cells: make(map[string]*node), generation: uint64(rand.Int63n(1 << 60)), + callstats: stats.NewCountersWithMultiLabels("", "", []string{"Call"}), } f.cells[topo.GlobalCell] = f.newDirectory(topo.GlobalCell, nil) diff --git a/go/vt/topo/memorytopo/watch.go b/go/vt/topo/memorytopo/watch.go index 73b2d248434..8d9ef5cb54c 100644 --- a/go/vt/topo/memorytopo/watch.go +++ b/go/vt/topo/memorytopo/watch.go @@ -25,6 +25,8 @@ import ( // Watch is part of the topo.Conn interface. func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { + c.factory.callstats.Add([]string{"Watch"}, 1) + if c.closed { return nil, nil, ErrConnectionClosed } @@ -75,6 +77,8 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c // WatchRecursive is part of the topo.Conn interface. func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { + c.factory.callstats.Add([]string{"WatchRecursive"}, 1) + if c.closed { return nil, nil, ErrConnectionClosed } diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go index cafe55ec5a9..f412233b43a 100644 --- a/go/vt/topo/tablet.go +++ b/go/vt/topo/tablet.go @@ -288,7 +288,7 @@ func (ts *Server) GetTabletAliasesByCell(ctx context.Context, cell string) ([]*t // Server.FindAllShardsInKeyspace. type GetTabletsByCellOptions struct { // Concurrency controls the maximum number of concurrent calls to GetTablet. - Concurrency int64 + Concurrency int } // GetTabletsByCell returns all the tablets in the cell. @@ -527,14 +527,13 @@ func (ts *Server) GetTabletMap(ctx context.Context, tabletAliases []*topodatapb. wg sync.WaitGroup tabletMap = make(map[string]*TabletInfo) returnErr error - // Previously this was always run with unlimited concurrency, so 32 should be fine. - concurrency int64 = 32 ) + concurrency := DefaultConcurrency if opt != nil && opt.Concurrency > 0 { concurrency = opt.Concurrency } - var sem = semaphore.NewWeighted(concurrency) + var sem = semaphore.NewWeighted(int64(concurrency)) for _, tabletAlias := range tabletAliases { wg.Add(1) diff --git a/go/vt/topo/test/shard.go b/go/vt/topo/test/shard.go index b5c92c4a3ec..270b236b98d 100644 --- a/go/vt/topo/test/shard.go +++ b/go/vt/topo/test/shard.go @@ -22,7 +22,6 @@ import ( "time" "github.com/stretchr/testify/require" - "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/topo" @@ -82,13 +81,23 @@ func checkShard(t *testing.T, ctx context.Context, ts *topo.Server) { t.Fatalf("shard.PrimaryAlias = %v, want %v", si.Shard.PrimaryAlias, other) } + // Test FindAllShardsInKeyspace. + require.NoError(t, err) + _, err = ts.FindAllShardsInKeyspace(ctx, "test_keyspace", nil) + require.NoError(t, err) + + // Test GetServingShards. + require.NoError(t, err) + _, err = ts.GetServingShards(ctx, "test_keyspace") + require.NoError(t, err) + // test GetShardNames - shards, err := ts.GetShardNames(ctx, "test_keyspace") + shardNames, err := ts.GetShardNames(ctx, "test_keyspace") if err != nil { t.Errorf("GetShardNames: %v", err) } - if len(shards) != 1 || shards[0] != "b0-c0" { - t.Errorf(`GetShardNames: want [ "b0-c0" ], got %v`, shards) + if len(shardNames) != 1 || shardNames[0] != "b0-c0" { + t.Errorf(`GetShardNames: want [ "b0-c0" ], got %v`, shardNames) } if _, err := ts.GetShardNames(ctx, "test_keyspace666"); !topo.IsErrType(err, topo.NoNode) { diff --git a/go/vt/vtctl/endtoend/onlineddl_show_test.go b/go/vt/vtctl/endtoend/onlineddl_show_test.go index 500f33aa3ce..ed848c14be8 100644 --- a/go/vt/vtctl/endtoend/onlineddl_show_test.go +++ b/go/vt/vtctl/endtoend/onlineddl_show_test.go @@ -127,7 +127,7 @@ func onlineDDLTest(t *testing.T, args []string, expectedQuery string) { assert.NotEmpty(t, err.Error()) containsExpectedError := false expectedErrors := []string{ - "unable to get shard names for keyspace", + "unable to get shards for keyspace", "no ExecuteFetchAsDba results on fake TabletManagerClient", } for _, expect := range expectedErrors { diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index 4ff6a21778b..3cd7ff43412 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -183,7 +183,7 @@ func TestVDiffCreate(t *testing.T) { { name: "no values", req: &vtctldatapb.VDiffCreateRequest{}, - wantErr: "node doesn't exist: keyspaces/shards", // We did not provide any keyspace or shard + wantErr: "FindAllShardsInKeyspace(): List: node doesn't exist: keyspaces/shards", // We did not provide any keyspace or shard }, } for _, tt := range tests { diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index cb35cc2d1ab..80b981026d8 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -326,7 +326,7 @@ func getMigrationID(targetKeyspace string, shardTablets []string) (int64, error) // // It returns ErrNoStreams if there are no targets found for the workflow. func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, targetKeyspace string, workflow string) (*TargetInfo, error) { - targetShards, err := ts.GetShardNames(ctx, targetKeyspace) + targetShards, err := ts.FindAllShardsInKeyspace(ctx, targetKeyspace, nil) if err != nil { return nil, err } @@ -344,18 +344,13 @@ func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManag // stream. For example, if we're splitting -80 to [-40,40-80], only those // two target shards will have vreplication streams, and the other shards in // the target keyspace will not. - for _, targetShard := range targetShards { - si, err := ts.GetShard(ctx, targetKeyspace, targetShard) - if err != nil { - return nil, err - } - - if si.PrimaryAlias == nil { + for targetShardName, targetShard := range targetShards { + if targetShard.PrimaryAlias == nil { // This can happen if bad inputs are given. return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "shard %v/%v doesn't have a primary set", targetKeyspace, targetShard) } - primary, err := ts.GetTablet(ctx, si.PrimaryAlias) + primary, err := ts.GetTablet(ctx, targetShard.PrimaryAlias) if err != nil { return nil, err } @@ -372,7 +367,7 @@ func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManag } target := &MigrationTarget{ - si: si, + si: targetShard, primary: primary, Sources: make(map[int32]*binlogdatapb.BinlogSource), } @@ -389,7 +384,7 @@ func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManag target.Sources[stream.Id] = stream.Bls } - targets[targetShard] = target + targets[targetShardName] = target } if len(targets) == 0 {