Skip to content

Commit

Permalink
go/vt/topo: enable concurrency for FindAllShardsInKeyspace (#14670)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Layher <[email protected]>
  • Loading branch information
mdlayher authored Dec 5, 2023
1 parent 3d53207 commit 7312b05
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 21 deletions.
78 changes: 66 additions & 12 deletions go/vt/topo/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package topo
import (
"context"
"path"
"sync"

"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -270,26 +273,77 @@ func (ts *Server) UpdateKeyspace(ctx context.Context, ki *KeyspaceInfo) error {
return nil
}

// FindAllShardsInKeyspace reads and returns all the existing shards in
// a keyspace. It doesn't take any lock.
func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string) (map[string]*ShardInfo, error) {
// FindAllShardsInKeyspaceOptions controls the behavior of
// Server.FindAllShardsInKeyspace.
type FindAllShardsInKeyspaceOptions struct {
// Concurrency controls the maximum number of concurrent calls to GetShard.
// If <= 0, Concurrency is set to 1.
Concurrency int
}

// FindAllShardsInKeyspace reads and returns all the existing shards in a
// keyspace. It doesn't take any lock.
//
// If opt is non-nil, it is used to configure the method's behavior. Otherwise,
// the default options are used.
func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, opt *FindAllShardsInKeyspaceOptions) (map[string]*ShardInfo, error) {
// Apply any necessary defaults.
if opt == nil {
opt = &FindAllShardsInKeyspaceOptions{}
}
if opt.Concurrency <= 0 {
opt.Concurrency = 1
}

shards, err := ts.GetShardNames(ctx, keyspace)
if err != nil {
return nil, vterrors.Wrapf(err, "failed to get list of shards for keyspace '%v'", keyspace)
}

result := make(map[string]*ShardInfo, len(shards))
// Keyspaces with a large number of shards and geographically distributed
// topo instances may experience significant latency fetching shard records.
//
// A prior version of this logic used unbounded concurrency to fetch shard
// records which resulted in overwhelming topo server instances:
// https://github.com/vitessio/vitess/pull/5436.
//
// However, removing the concurrency altogether can cause large operations
// to fail due to timeout. The caller chooses the appropriate concurrency
// level so that certain paths can be optimized (such as vtctld
// RebuildKeyspace calls, which do not run on every vttablet).
var (
mu sync.Mutex
result = make(map[string]*ShardInfo, len(shards))
)

eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(opt.Concurrency)

for _, shard := range shards {
si, err := ts.GetShard(ctx, keyspace, shard)
if err != nil {
if IsErrType(err, NoNode) {
shard := shard

eg.Go(func() error {
si, err := ts.GetShard(ctx, keyspace, shard)
switch {
case IsErrType(err, NoNode):
log.Warningf("GetShard(%v, %v) returned ErrNoNode, consider checking the topology.", keyspace, shard)
} else {
return nil, vterrors.Wrapf(err, "GetShard(%v, %v) failed", keyspace, shard)
return nil
case err == nil:
mu.Lock()
result[shard] = si
mu.Unlock()

return nil
default:
return vterrors.Wrapf(err, "GetShard(%v, %v) failed", keyspace, shard)
}
}
result[shard] = si
})
}

if err := eg.Wait(); err != nil {
return nil, err
}

return result, nil
}

Expand Down Expand Up @@ -319,7 +373,7 @@ func (ts *Server) GetServingShards(ctx context.Context, keyspace string) ([]*Sha

// GetOnlyShard returns the single ShardInfo of an unsharded keyspace.
func (ts *Server) GetOnlyShard(ctx context.Context, keyspace string) (*ShardInfo, error) {
allShards, err := ts.FindAllShardsInKeyspace(ctx, keyspace)
allShards, err := ts.FindAllShardsInKeyspace(ctx, keyspace, nil)
if err != nil {
return nil, err
}
Expand Down
89 changes: 89 additions & 0 deletions go/vt/topo/keyspace_external_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
Copyright 2023 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package topo_test

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/key"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
)

func TestServerFindAllShardsInKeyspace(t *testing.T) {
tests := []struct {
name string
shards int
opt *topo.FindAllShardsInKeyspaceOptions
}{
{
name: "negative concurrency",
shards: 1,
// Ensure this doesn't panic.
opt: &topo.FindAllShardsInKeyspaceOptions{Concurrency: -1},
},
{
name: "unsharded",
shards: 1,
// Make sure the defaults apply as expected.
opt: nil,
},
{
name: "sharded",
shards: 32,
opt: &topo.FindAllShardsInKeyspaceOptions{Concurrency: 8},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ts := memorytopo.NewServer(ctx)
defer ts.Close()

// Create an ephemeral keyspace and generate shard records within
// the keyspace to fetch later.
const keyspace = "keyspace"
require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}))

shards, err := key.GenerateShardRanges(tt.shards)
require.NoError(t, err)

for _, s := range shards {
require.NoError(t, ts.CreateShard(ctx, keyspace, s))
}

// Verify that we return a complete list of shards and that each
// key range is present in the output.
out, err := ts.FindAllShardsInKeyspace(ctx, keyspace, tt.opt)
require.NoError(t, err)
require.Len(t, out, tt.shards)

for _, s := range shards {
if _, ok := out[s]; !ok {
t.Errorf("shard %q was not found", s)
}
}
})
}
}
9 changes: 8 additions & 1 deletion go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,14 @@ func (ts *Server) CreateShard(ctx context.Context, keyspace, shard string) (err
// Set primary as serving only if its keyrange doesn't overlap
// with other shards. This applies to unsharded keyspaces also
value.IsPrimaryServing = true
sis, err := ts.FindAllShardsInKeyspace(ctx, keyspace)
sis, err := ts.FindAllShardsInKeyspace(ctx, keyspace, &FindAllShardsInKeyspaceOptions{
// Assume that CreateShard may be called by many vttablets concurrently
// in a large, sharded keyspace. Do not apply concurrency to avoid
// overwhelming the toposerver.
//
// See: https://github.com/vitessio/vitess/pull/5436.
Concurrency: 1,
})
if err != nil && !IsErrType(err, NoNode) {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion go/vt/topotools/rebuild_keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ func RebuildKeyspaceLocked(ctx context.Context, log logutil.Logger, ts *topo.Ser
}
}

shards, err := ts.FindAllShardsInKeyspace(ctx, keyspace)
shards, err := ts.FindAllShardsInKeyspace(ctx, keyspace, &topo.FindAllShardsInKeyspaceOptions{
// Fetch shard records concurrently to speed up the rebuild process.
// This call is invoked by the first tablet in a given keyspace or
// manually via vtctld, so there is little risk of a thundering herd.
Concurrency: 8,
})
if err != nil {
return err
}
Expand Down
5 changes: 2 additions & 3 deletions go/vt/topotools/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ limitations under the License.
package topotools

import (
"context"
"errors"
"fmt"
"sort"

"context"

"vitess.io/vitess/go/vt/key"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -119,7 +118,7 @@ func OverlappingShardsForShard(os []*OverlappingShards, shardName string) *Overl
// will return an error).
// If shards don't perfectly overlap, they are not returned.
func FindOverlappingShards(ctx context.Context, ts *topo.Server, keyspace string) ([]*OverlappingShards, error) {
shardMap, err := ts.FindAllShardsInKeyspace(ctx, keyspace)
shardMap, err := ts.FindAllShardsInKeyspace(ctx, keyspace, nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,7 +1243,7 @@ func (s *VtctldServer) FindAllShardsInKeyspace(ctx context.Context, req *vtctlda

span.Annotate("keyspace", req.Keyspace)

result, err := s.ts.FindAllShardsInKeyspace(ctx, req.Keyspace)
result, err := s.ts.FindAllShardsInKeyspace(ctx, req.Keyspace, nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func getTablesInKeyspace(ctx context.Context, ts *topo.Server, tmc tmclient.Tabl
// validateNewWorkflow ensures that the specified workflow doesn't already exist
// in the keyspace.
func validateNewWorkflow(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, keyspace, workflow string) error {
allshards, err := ts.FindAllShardsInKeyspace(ctx, keyspace)
allshards, err := ts.FindAllShardsInKeyspace(ctx, keyspace, nil)
if err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vtorc/logic/keyspace_shard_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,12 @@ func refreshKeyspaceHelper(ctx context.Context, keyspaceName string) error {

// refreshAllShards refreshes all the shard records in the given keyspace.
func refreshAllShards(ctx context.Context, keyspaceName string) error {
shardInfos, err := ts.FindAllShardsInKeyspace(ctx, keyspaceName)
shardInfos, err := ts.FindAllShardsInKeyspace(ctx, keyspaceName, &topo.FindAllShardsInKeyspaceOptions{
// Fetch shard records concurrently to speed up discovery. A typical
// Vitess cluster will have 1-3 vtorc instances deployed, so there is
// little risk of a thundering herd.
Concurrency: 8,
})
if err != nil {
log.Error(err)
return err
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
// validateNewWorkflow ensures that the specified workflow doesn't already exist
// in the keyspace.
func (wr *Wrangler) validateNewWorkflow(ctx context.Context, keyspace, workflow string) error {
allshards, err := wr.ts.FindAllShardsInKeyspace(ctx, keyspace)
allshards, err := wr.ts.FindAllShardsInKeyspace(ctx, keyspace, nil)
if err != nil {
return err
}
Expand Down

0 comments on commit 7312b05

Please sign in to comment.