diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index feb80c374e5..2bdd616a261 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -19,6 +19,9 @@ package topo import ( "context" "path" + "sync" + + "golang.org/x/sync/errgroup" "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/vt/vterrors" @@ -270,26 +273,77 @@ func (ts *Server) UpdateKeyspace(ctx context.Context, ki *KeyspaceInfo) error { return nil } -// FindAllShardsInKeyspace reads and returns all the existing shards in -// a keyspace. It doesn't take any lock. -func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string) (map[string]*ShardInfo, error) { +// FindAllShardsInKeyspaceOptions controls the behavior of +// Server.FindAllShardsInKeyspace. +type FindAllShardsInKeyspaceOptions struct { + // Concurrency controls the maximum number of concurrent calls to GetShard. + // If <= 0, Concurrency is set to 1. + Concurrency int +} + +// FindAllShardsInKeyspace reads and returns all the existing shards in a +// keyspace. It doesn't take any lock. +// +// If opt is non-nil, it is used to configure the method's behavior. Otherwise, +// the default options are used. +func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, opt *FindAllShardsInKeyspaceOptions) (map[string]*ShardInfo, error) { + // Apply any necessary defaults. + if opt == nil { + opt = &FindAllShardsInKeyspaceOptions{} + } + if opt.Concurrency <= 0 { + opt.Concurrency = 1 + } + shards, err := ts.GetShardNames(ctx, keyspace) if err != nil { return nil, vterrors.Wrapf(err, "failed to get list of shards for keyspace '%v'", keyspace) } - result := make(map[string]*ShardInfo, len(shards)) + // Keyspaces with a large number of shards and geographically distributed + // topo instances may experience significant latency fetching shard records. + // + // A prior version of this logic used unbounded concurrency to fetch shard + // records which resulted in overwhelming topo server instances: + // https://github.com/vitessio/vitess/pull/5436. + // + // However, removing the concurrency altogether can cause large operations + // to fail due to timeout. The caller chooses the appropriate concurrency + // level so that certain paths can be optimized (such as vtctld + // RebuildKeyspace calls, which do not run on every vttablet). + var ( + mu sync.Mutex + result = make(map[string]*ShardInfo, len(shards)) + ) + + eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(opt.Concurrency) + for _, shard := range shards { - si, err := ts.GetShard(ctx, keyspace, shard) - if err != nil { - if IsErrType(err, NoNode) { + shard := shard + + eg.Go(func() error { + si, err := ts.GetShard(ctx, keyspace, shard) + switch { + case IsErrType(err, NoNode): log.Warningf("GetShard(%v, %v) returned ErrNoNode, consider checking the topology.", keyspace, shard) - } else { - return nil, vterrors.Wrapf(err, "GetShard(%v, %v) failed", keyspace, shard) + return nil + case err == nil: + mu.Lock() + result[shard] = si + mu.Unlock() + + return nil + default: + return vterrors.Wrapf(err, "GetShard(%v, %v) failed", keyspace, shard) } - } - result[shard] = si + }) } + + if err := eg.Wait(); err != nil { + return nil, err + } + return result, nil } @@ -319,7 +373,7 @@ func (ts *Server) GetServingShards(ctx context.Context, keyspace string) ([]*Sha // GetOnlyShard returns the single ShardInfo of an unsharded keyspace. func (ts *Server) GetOnlyShard(ctx context.Context, keyspace string) (*ShardInfo, error) { - allShards, err := ts.FindAllShardsInKeyspace(ctx, keyspace) + allShards, err := ts.FindAllShardsInKeyspace(ctx, keyspace, nil) if err != nil { return nil, err } diff --git a/go/vt/topo/keyspace_external_test.go b/go/vt/topo/keyspace_external_test.go new file mode 100644 index 00000000000..064c4cba93b --- /dev/null +++ b/go/vt/topo/keyspace_external_test.go @@ -0,0 +1,89 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topo_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/key" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" +) + +func TestServerFindAllShardsInKeyspace(t *testing.T) { + tests := []struct { + name string + shards int + opt *topo.FindAllShardsInKeyspaceOptions + }{ + { + name: "negative concurrency", + shards: 1, + // Ensure this doesn't panic. + opt: &topo.FindAllShardsInKeyspaceOptions{Concurrency: -1}, + }, + { + name: "unsharded", + shards: 1, + // Make sure the defaults apply as expected. + opt: nil, + }, + { + name: "sharded", + shards: 32, + opt: &topo.FindAllShardsInKeyspaceOptions{Concurrency: 8}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ts := memorytopo.NewServer(ctx) + defer ts.Close() + + // Create an ephemeral keyspace and generate shard records within + // the keyspace to fetch later. + const keyspace = "keyspace" + require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})) + + shards, err := key.GenerateShardRanges(tt.shards) + require.NoError(t, err) + + for _, s := range shards { + require.NoError(t, ts.CreateShard(ctx, keyspace, s)) + } + + // Verify that we return a complete list of shards and that each + // key range is present in the output. + out, err := ts.FindAllShardsInKeyspace(ctx, keyspace, tt.opt) + require.NoError(t, err) + require.Len(t, out, tt.shards) + + for _, s := range shards { + if _, ok := out[s]; !ok { + t.Errorf("shard %q was not found", s) + } + } + }) + } +} diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index 183ed409bbb..752001438f4 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -314,7 +314,14 @@ func (ts *Server) CreateShard(ctx context.Context, keyspace, shard string) (err // Set primary as serving only if its keyrange doesn't overlap // with other shards. This applies to unsharded keyspaces also value.IsPrimaryServing = true - sis, err := ts.FindAllShardsInKeyspace(ctx, keyspace) + sis, err := ts.FindAllShardsInKeyspace(ctx, keyspace, &FindAllShardsInKeyspaceOptions{ + // Assume that CreateShard may be called by many vttablets concurrently + // in a large, sharded keyspace. Do not apply concurrency to avoid + // overwhelming the toposerver. + // + // See: https://github.com/vitessio/vitess/pull/5436. + Concurrency: 1, + }) if err != nil && !IsErrType(err, NoNode) { return err } diff --git a/go/vt/topotools/rebuild_keyspace.go b/go/vt/topotools/rebuild_keyspace.go index d58ce0b7160..09df8b8fadc 100644 --- a/go/vt/topotools/rebuild_keyspace.go +++ b/go/vt/topotools/rebuild_keyspace.go @@ -64,7 +64,12 @@ func RebuildKeyspaceLocked(ctx context.Context, log logutil.Logger, ts *topo.Ser } } - shards, err := ts.FindAllShardsInKeyspace(ctx, keyspace) + shards, err := ts.FindAllShardsInKeyspace(ctx, keyspace, &topo.FindAllShardsInKeyspaceOptions{ + // Fetch shard records concurrently to speed up the rebuild process. + // This call is invoked by the first tablet in a given keyspace or + // manually via vtctld, so there is little risk of a thundering herd. + Concurrency: 8, + }) if err != nil { return err } diff --git a/go/vt/topotools/split.go b/go/vt/topotools/split.go index ace3dda94a7..0671c2c5cb8 100644 --- a/go/vt/topotools/split.go +++ b/go/vt/topotools/split.go @@ -17,12 +17,11 @@ limitations under the License. package topotools import ( + "context" "errors" "fmt" "sort" - "context" - "vitess.io/vitess/go/vt/key" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" @@ -119,7 +118,7 @@ func OverlappingShardsForShard(os []*OverlappingShards, shardName string) *Overl // will return an error). // If shards don't perfectly overlap, they are not returned. func FindOverlappingShards(ctx context.Context, ts *topo.Server, keyspace string) ([]*OverlappingShards, error) { - shardMap, err := ts.FindAllShardsInKeyspace(ctx, keyspace) + shardMap, err := ts.FindAllShardsInKeyspace(ctx, keyspace, nil) if err != nil { return nil, err } diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go index ef0e9db341b..2ece69876e8 100644 --- a/go/vt/vtctl/grpcvtctldserver/server.go +++ b/go/vt/vtctl/grpcvtctldserver/server.go @@ -1243,7 +1243,7 @@ func (s *VtctldServer) FindAllShardsInKeyspace(ctx context.Context, req *vtctlda span.Annotate("keyspace", req.Keyspace) - result, err := s.ts.FindAllShardsInKeyspace(ctx, req.Keyspace) + result, err := s.ts.FindAllShardsInKeyspace(ctx, req.Keyspace, nil) if err != nil { return nil, err } diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 1a723c6192c..4d1a3c5df4d 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -86,7 +86,7 @@ func getTablesInKeyspace(ctx context.Context, ts *topo.Server, tmc tmclient.Tabl // validateNewWorkflow ensures that the specified workflow doesn't already exist // in the keyspace. func validateNewWorkflow(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, keyspace, workflow string) error { - allshards, err := ts.FindAllShardsInKeyspace(ctx, keyspace) + allshards, err := ts.FindAllShardsInKeyspace(ctx, keyspace, nil) if err != nil { return err } diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery.go b/go/vt/vtorc/logic/keyspace_shard_discovery.go index c79ace5bdc3..b1e93fe2a01 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery.go @@ -124,7 +124,12 @@ func refreshKeyspaceHelper(ctx context.Context, keyspaceName string) error { // refreshAllShards refreshes all the shard records in the given keyspace. func refreshAllShards(ctx context.Context, keyspaceName string) error { - shardInfos, err := ts.FindAllShardsInKeyspace(ctx, keyspaceName) + shardInfos, err := ts.FindAllShardsInKeyspace(ctx, keyspaceName, &topo.FindAllShardsInKeyspaceOptions{ + // Fetch shard records concurrently to speed up discovery. A typical + // Vitess cluster will have 1-3 vtorc instances deployed, so there is + // little risk of a thundering herd. + Concurrency: 8, + }) if err != nil { log.Error(err) return err diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go index 7f3f00da4f8..a5f7d6ae0bf 100644 --- a/go/vt/wrangler/keyspace.go +++ b/go/vt/wrangler/keyspace.go @@ -44,7 +44,7 @@ const ( // validateNewWorkflow ensures that the specified workflow doesn't already exist // in the keyspace. func (wr *Wrangler) validateNewWorkflow(ctx context.Context, keyspace, workflow string) error { - allshards, err := wr.ts.FindAllShardsInKeyspace(ctx, keyspace) + allshards, err := wr.ts.FindAllShardsInKeyspace(ctx, keyspace, nil) if err != nil { return err }