From 12643913eac46d5fa646af2d5032cf88e01b31ef Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 26 Jan 2024 10:54:31 -0500 Subject: [PATCH 01/22] Make GetServingShards concurrent Signed-off-by: Matt Lord --- go/vt/topo/keyspace.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index 4ff53e24204..b5e7b6b2274 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -19,11 +19,13 @@ package topo import ( "context" "path" + "sort" "sync" "golang.org/x/sync/errgroup" "vitess.io/vitess/go/constants/sidecar" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/event" @@ -245,25 +247,26 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, // GetServingShards returns all shards where the primary is serving. func (ts *Server) GetServingShards(ctx context.Context, keyspace string) ([]*ShardInfo, error) { - shards, err := ts.GetShardNames(ctx, keyspace) + shards, err := ts.FindAllShardsInKeyspace(ctx, keyspace, nil) if err != nil { return nil, vterrors.Wrapf(err, "failed to get list of shards for keyspace '%v'", keyspace) } result := make([]*ShardInfo, 0, len(shards)) for _, shard := range shards { - si, err := ts.GetShard(ctx, keyspace, shard) - if err != nil { - return nil, vterrors.Wrapf(err, "GetShard(%v, %v) failed", keyspace, shard) - } - if !si.IsPrimaryServing { + if !shard.IsPrimaryServing { continue } - result = append(result, si) + result = append(result, shard) } if len(result) == 0 { return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%v has no serving shards", keyspace) } + // Sort the shards by KeyRange for deterministic results. + sort.Slice(result, func(i, j int) bool { + return key.KeyRangeLess(result[i].KeyRange, result[j].KeyRange) + }) + return result, nil } From b26f7cf01ffbc981dc6f1b059daa473cb51ef6c9 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 26 Jan 2024 11:37:22 -0500 Subject: [PATCH 02/22] Get all shards in a keyspace via List when we can Signed-off-by: Matt Lord --- go/vt/topo/keyspace.go | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index b5e7b6b2274..667eeda9384 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -193,9 +193,44 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, opt.Concurrency = 1 } + // First try to get all shards using List if we can. + shardsPath := path.Join(KeyspacesPath, keyspace, ShardsPath) + listResults, err := ts.globalCell.List(ctx, shardsPath) + if err != nil || len(listResults) == 0 { + if IsErrType(err, NoNode) { + return make(map[string]*ShardInfo, 0), nil // No shards + } + // Currently the ZooKeeper implementation does not support scans + // so we fall back to concurrently fetching the shards one by one. + // It is possible that the response is too large in which case we + // also fall back to the one by one fetch in that case. + if !IsErrType(err, NoImplementation) && !IsErrType(err, ResourceExhausted) { + return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%v): List", keyspace) + } + // Continue on with the shard by shard method. + } else { + result := make(map[string]*ShardInfo, len(listResults)) + for _, entry := range listResults { + // The key looks like this: /vitess/global/keyspaces/commerce/shards/-80/Shard + shardName := path.Base(path.Dir(string(entry.Key))) // The base part of the dir is "-80" + shard := &topodatapb.Shard{} + if err = shard.UnmarshalVT(entry.Value); err != nil { + return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%v): bad shard data", keyspace) + } + result[shardName] = &ShardInfo{ + keyspace: keyspace, + shardName: shardName, + version: entry.Version, + Shard: shard, + } + } + return result, nil + } + + // Fall back to the shard by shard method. shards, err := ts.GetShardNames(ctx, keyspace) if err != nil { - return nil, vterrors.Wrapf(err, "failed to get list of shards for keyspace '%v'", keyspace) + return nil, vterrors.Wrapf(err, "failed to get list of shard names for keyspace '%v'", keyspace) } // Keyspaces with a large number of shards and geographically distributed From ca2ea86b5b76d966878341d024d5f69b22360b6b Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 26 Jan 2024 12:14:32 -0500 Subject: [PATCH 03/22] Return error when keyspace doesn't exist Signed-off-by: Matt Lord --- go/vt/topo/keyspace.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index 667eeda9384..369a6c33429 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -198,7 +198,14 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, listResults, err := ts.globalCell.List(ctx, shardsPath) if err != nil || len(listResults) == 0 { if IsErrType(err, NoNode) { - return make(map[string]*ShardInfo, 0), nil // No shards + // The path doesn't exist, let's see if the keyspace + // is here or not. + _, kerr := ts.GetKeyspace(ctx, keyspace) + if kerr == nil { + // Keyspace is here, means no shards. + return make(map[string]*ShardInfo, 0), nil // No shards + } + return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%v): List", keyspace) } // Currently the ZooKeeper implementation does not support scans // so we fall back to concurrently fetching the shards one by one. From cc1aec4431582e70d8dcaedfdc401b1c4fcc459c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 26 Jan 2024 12:26:14 -0500 Subject: [PATCH 04/22] Use default concurrency based on vCPUs Signed-off-by: Matt Lord --- go/vt/topo/keyspace.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index 369a6c33429..8dd9a41f23a 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -19,6 +19,7 @@ package topo import ( "context" "path" + "runtime" "sort" "sync" @@ -38,6 +39,12 @@ import ( // This file contains keyspace utility functions +// Default concurrency to use in order to avoid overhwelming the topo server. +// This uses a heuristic based on the number of vCPUs available -- where it's +// assumed that as larger machines are used for Vitess deployments they will +// be able to do more concurrently. +var defaultConcurrency = runtime.NumCPU() + // KeyspaceInfo is a meta struct that contains metadata to give the // data more context and convenience. This is the main way we interact // with a keyspace. @@ -198,12 +205,11 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, listResults, err := ts.globalCell.List(ctx, shardsPath) if err != nil || len(listResults) == 0 { if IsErrType(err, NoNode) { - // The path doesn't exist, let's see if the keyspace - // is here or not. + // The path doesn't exist, let's see if the keyspace exists. _, kerr := ts.GetKeyspace(ctx, keyspace) if kerr == nil { - // Keyspace is here, means no shards. - return make(map[string]*ShardInfo, 0), nil // No shards + // We simply have no shards. + return make(map[string]*ShardInfo, 0), nil } return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%v): List", keyspace) } @@ -289,7 +295,9 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, // GetServingShards returns all shards where the primary is serving. func (ts *Server) GetServingShards(ctx context.Context, keyspace string) ([]*ShardInfo, error) { - shards, err := ts.FindAllShardsInKeyspace(ctx, keyspace, nil) + shards, err := ts.FindAllShardsInKeyspace(ctx, keyspace, &FindAllShardsInKeyspaceOptions{ + Concurrency: defaultConcurrency, // Limit concurrency to avoid overwhelming the topo server. + }) if err != nil { return nil, vterrors.Wrapf(err, "failed to get list of shards for keyspace '%v'", keyspace) } From d4b06091a486a20f788d8be3d74c7aeb1911910c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 26 Jan 2024 13:06:14 -0500 Subject: [PATCH 05/22] Improve the logic in processing list results Signed-off-by: Matt Lord --- go/vt/topo/keyspace.go | 44 ++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index 8dd9a41f23a..b8ea34f3692 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -203,32 +203,14 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, // First try to get all shards using List if we can. shardsPath := path.Join(KeyspacesPath, keyspace, ShardsPath) listResults, err := ts.globalCell.List(ctx, shardsPath) - if err != nil || len(listResults) == 0 { - if IsErrType(err, NoNode) { - // The path doesn't exist, let's see if the keyspace exists. - _, kerr := ts.GetKeyspace(ctx, keyspace) - if kerr == nil { - // We simply have no shards. - return make(map[string]*ShardInfo, 0), nil - } - return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%v): List", keyspace) - } - // Currently the ZooKeeper implementation does not support scans - // so we fall back to concurrently fetching the shards one by one. - // It is possible that the response is too large in which case we - // also fall back to the one by one fetch in that case. - if !IsErrType(err, NoImplementation) && !IsErrType(err, ResourceExhausted) { - return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%v): List", keyspace) - } - // Continue on with the shard by shard method. - } else { + if err == nil { // We have everything we need to build the result result := make(map[string]*ShardInfo, len(listResults)) for _, entry := range listResults { // The key looks like this: /vitess/global/keyspaces/commerce/shards/-80/Shard shardName := path.Base(path.Dir(string(entry.Key))) // The base part of the dir is "-80" shard := &topodatapb.Shard{} if err = shard.UnmarshalVT(entry.Value); err != nil { - return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%v): bad shard data", keyspace) + return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): bad shard data", keyspace) } result[shardName] = &ShardInfo{ keyspace: keyspace, @@ -239,11 +221,27 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, } return result, nil } + if IsErrType(err, NoNode) { + // The path doesn't exist, let's see if the keyspace exists. + _, kerr := ts.GetKeyspace(ctx, keyspace) + if kerr == nil { + // We simply have no shards. + return make(map[string]*ShardInfo, 0), nil + } + return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): List", keyspace) + } + // Currently the ZooKeeper implementation does not support scans so we + // fall back to concurrently fetching the shards one by one. + // It is also possible that the response containing all shards is too + // large in which case we also fall back to the one by one fetch. + if !IsErrType(err, NoImplementation) && !IsErrType(err, ResourceExhausted) { + return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): List", keyspace) + } // Fall back to the shard by shard method. shards, err := ts.GetShardNames(ctx, keyspace) if err != nil { - return nil, vterrors.Wrapf(err, "failed to get list of shard names for keyspace '%v'", keyspace) + return nil, vterrors.Wrapf(err, "failed to get list of shard names for keyspace '%s'", keyspace) } // Keyspaces with a large number of shards and geographically distributed @@ -272,7 +270,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, si, err := ts.GetShard(ctx, keyspace, shard) switch { case IsErrType(err, NoNode): - log.Warningf("GetShard(%v, %v) returned ErrNoNode, consider checking the topology.", keyspace, shard) + log.Warningf("GetShard(%s, %s) returned ErrNoNode, consider checking the topology.", keyspace, shard) return nil case err == nil: mu.Lock() @@ -281,7 +279,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, return nil default: - return vterrors.Wrapf(err, "GetShard(%v, %v) failed", keyspace, shard) + return vterrors.Wrapf(err, "GetShard(%s, %s) failed", keyspace, shard) } }) } From b6286ff2e7a1958636bf39cae34fab743b008f86 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 26 Jan 2024 15:12:49 -0500 Subject: [PATCH 06/22] Utilize in callsites and improve test coverage Signed-off-by: Matt Lord --- go/vt/schemamanager/tablet_executor.go | 16 +++--- go/vt/topo/keyspace.go | 4 +- go/vt/topo/keyspace_test.go | 63 +++++++++++++++++++++ go/vt/topo/test/shard.go | 12 +++- go/vt/vtctl/endtoend/onlineddl_show_test.go | 2 +- go/vt/vtctl/workflow/server_test.go | 2 +- go/vt/vtctl/workflow/utils.go | 19 +++---- go/vt/wrangler/permissions.go | 37 ++++++------ 8 files changed, 111 insertions(+), 44 deletions(-) create mode 100644 go/vt/topo/keyspace_test.go diff --git a/go/vt/schemamanager/tablet_executor.go b/go/vt/schemamanager/tablet_executor.go index cd1691dd01e..85e14fb51a9 100644 --- a/go/vt/schemamanager/tablet_executor.go +++ b/go/vt/schemamanager/tablet_executor.go @@ -107,16 +107,15 @@ func (exec *TabletExecutor) Open(ctx context.Context, keyspace string) error { return nil } exec.keyspace = keyspace - shardNames, err := exec.ts.GetShardNames(ctx, keyspace) + shards, err := exec.ts.FindAllShardsInKeyspace(ctx, keyspace, &topo.FindAllShardsInKeyspaceOptions{ + Concurrency: topo.DefaultConcurrency, // Limit concurrency to avoid overwhelming the topo server. + }) if err != nil { - return fmt.Errorf("unable to get shard names for keyspace: %s, error: %v", keyspace, err) + return fmt.Errorf("unable to get shards for keyspace: %s, error: %v", keyspace, err) } - exec.tablets = make([]*topodatapb.Tablet, len(shardNames)) - for i, shardName := range shardNames { - shardInfo, err := exec.ts.GetShard(ctx, keyspace, shardName) - if err != nil { - return fmt.Errorf("unable to get shard info, keyspace: %s, shard: %s, error: %v", keyspace, shardName, err) - } + exec.tablets = make([]*topodatapb.Tablet, len(shards)) + i := 0 + for shardName, shardInfo := range shards { if !shardInfo.HasPrimary() { return fmt.Errorf("shard: %s does not have a primary", shardName) } @@ -125,6 +124,7 @@ func (exec *TabletExecutor) Open(ctx context.Context, keyspace string) error { return fmt.Errorf("unable to get primary tablet info, keyspace: %s, shard: %s, error: %v", keyspace, shardName, err) } exec.tablets[i] = tabletInfo.Tablet + i++ } if len(exec.tablets) == 0 { diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index b8ea34f3692..c67c831c9ec 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -43,7 +43,7 @@ import ( // This uses a heuristic based on the number of vCPUs available -- where it's // assumed that as larger machines are used for Vitess deployments they will // be able to do more concurrently. -var defaultConcurrency = runtime.NumCPU() +var DefaultConcurrency = runtime.NumCPU() // KeyspaceInfo is a meta struct that contains metadata to give the // data more context and convenience. This is the main way we interact @@ -294,7 +294,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, // GetServingShards returns all shards where the primary is serving. func (ts *Server) GetServingShards(ctx context.Context, keyspace string) ([]*ShardInfo, error) { shards, err := ts.FindAllShardsInKeyspace(ctx, keyspace, &FindAllShardsInKeyspaceOptions{ - Concurrency: defaultConcurrency, // Limit concurrency to avoid overwhelming the topo server. + Concurrency: DefaultConcurrency, // Limit concurrency to avoid overwhelming the topo server. }) if err != nil { return nil, vterrors.Wrapf(err, "failed to get list of shards for keyspace '%v'", keyspace) diff --git a/go/vt/topo/keyspace_test.go b/go/vt/topo/keyspace_test.go new file mode 100644 index 00000000000..e7d3f0fc294 --- /dev/null +++ b/go/vt/topo/keyspace_test.go @@ -0,0 +1,63 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topo + +import ( + "context" + "reflect" + "testing" +) + +func TestGetServingShards(t *testing.T) { + type fields struct { + globalCell Conn + globalReadOnlyCell Conn + factory Factory + cellConns map[string]cellConn + } + type args struct { + ctx context.Context + keyspace string + } + tests := []struct { + name string + fields fields + args args + want []*ShardInfo + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts := &Server{ + globalCell: tt.fields.globalCell, + globalReadOnlyCell: tt.fields.globalReadOnlyCell, + factory: tt.fields.factory, + cellConns: tt.fields.cellConns, + } + got, err := ts.GetServingShards(tt.args.ctx, tt.args.keyspace) + if (err != nil) != tt.wantErr { + t.Errorf("Server.GetServingShards() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("Server.GetServingShards() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/go/vt/topo/test/shard.go b/go/vt/topo/test/shard.go index b5c92c4a3ec..fdf5d378d03 100644 --- a/go/vt/topo/test/shard.go +++ b/go/vt/topo/test/shard.go @@ -22,7 +22,6 @@ import ( "time" "github.com/stretchr/testify/require" - "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/topo" @@ -82,12 +81,19 @@ func checkShard(t *testing.T, ctx context.Context, ts *topo.Server) { t.Fatalf("shard.PrimaryAlias = %v, want %v", si.Shard.PrimaryAlias, other) } + // Test FindAllShardsInKeyspace. + require.NoError(t, err) + shards, err := ts.FindAllShardsInKeyspace(ctx, "test_keyspace", &topo.FindAllShardsInKeyspaceOptions{ + Concurrency: topo.DefaultConcurrency, + }) + require.NoError(t, err) + // test GetShardNames - shards, err := ts.GetShardNames(ctx, "test_keyspace") + shardNames, err := ts.GetShardNames(ctx, "test_keyspace") if err != nil { t.Errorf("GetShardNames: %v", err) } - if len(shards) != 1 || shards[0] != "b0-c0" { + if len(shards) != 1 || shardNames[0] != "b0-c0" { t.Errorf(`GetShardNames: want [ "b0-c0" ], got %v`, shards) } diff --git a/go/vt/vtctl/endtoend/onlineddl_show_test.go b/go/vt/vtctl/endtoend/onlineddl_show_test.go index 500f33aa3ce..ed848c14be8 100644 --- a/go/vt/vtctl/endtoend/onlineddl_show_test.go +++ b/go/vt/vtctl/endtoend/onlineddl_show_test.go @@ -127,7 +127,7 @@ func onlineDDLTest(t *testing.T, args []string, expectedQuery string) { assert.NotEmpty(t, err.Error()) containsExpectedError := false expectedErrors := []string{ - "unable to get shard names for keyspace", + "unable to get shards for keyspace", "no ExecuteFetchAsDba results on fake TabletManagerClient", } for _, expect := range expectedErrors { diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index 4ff6a21778b..3cd7ff43412 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -183,7 +183,7 @@ func TestVDiffCreate(t *testing.T) { { name: "no values", req: &vtctldatapb.VDiffCreateRequest{}, - wantErr: "node doesn't exist: keyspaces/shards", // We did not provide any keyspace or shard + wantErr: "FindAllShardsInKeyspace(): List: node doesn't exist: keyspaces/shards", // We did not provide any keyspace or shard }, } for _, tt := range tests { diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index cb35cc2d1ab..c0f0c9be6e2 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -326,7 +326,9 @@ func getMigrationID(targetKeyspace string, shardTablets []string) (int64, error) // // It returns ErrNoStreams if there are no targets found for the workflow. func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, targetKeyspace string, workflow string) (*TargetInfo, error) { - targetShards, err := ts.GetShardNames(ctx, targetKeyspace) + targetShards, err := ts.FindAllShardsInKeyspace(ctx, targetKeyspace, &topo.FindAllShardsInKeyspaceOptions{ + Concurrency: topo.DefaultConcurrency, + }) if err != nil { return nil, err } @@ -344,18 +346,13 @@ func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManag // stream. For example, if we're splitting -80 to [-40,40-80], only those // two target shards will have vreplication streams, and the other shards in // the target keyspace will not. - for _, targetShard := range targetShards { - si, err := ts.GetShard(ctx, targetKeyspace, targetShard) - if err != nil { - return nil, err - } - - if si.PrimaryAlias == nil { + for targetShardName, targetShard := range targetShards { + if targetShard.PrimaryAlias == nil { // This can happen if bad inputs are given. return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "shard %v/%v doesn't have a primary set", targetKeyspace, targetShard) } - primary, err := ts.GetTablet(ctx, si.PrimaryAlias) + primary, err := ts.GetTablet(ctx, targetShard.PrimaryAlias) if err != nil { return nil, err } @@ -372,7 +369,7 @@ func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManag } target := &MigrationTarget{ - si: si, + si: targetShard, primary: primary, Sources: make(map[int32]*binlogdatapb.BinlogSource), } @@ -389,7 +386,7 @@ func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManag target.Sources[stream.Id] = stream.Bls } - targets[targetShard] = target + targets[targetShardName] = target } if len(targets) == 0 { diff --git a/go/vt/wrangler/permissions.go b/go/vt/wrangler/permissions.go index 652ae7f44a3..987b09cfce1 100644 --- a/go/vt/wrangler/permissions.go +++ b/go/vt/wrangler/permissions.go @@ -18,14 +18,16 @@ package wrangler import ( "fmt" - "sort" "sync" "context" + "golang.org/x/exp/maps" + "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl/tmutils" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" @@ -101,32 +103,31 @@ func (wr *Wrangler) ValidatePermissionsShard(ctx context.Context, keyspace, shar // ValidatePermissionsKeyspace validates all the permissions are the same // in a keyspace func (wr *Wrangler) ValidatePermissionsKeyspace(ctx context.Context, keyspace string) error { - // find all the shards - shards, err := wr.ts.GetShardNames(ctx, keyspace) + // Find all the shards. + shards, err := wr.ts.FindAllShardsInKeyspace(ctx, keyspace, &topo.FindAllShardsInKeyspaceOptions{ + Concurrency: topo.DefaultConcurrency, // Limit concurrency to avoid overwhelming the topo server. + }) if err != nil { return err } - // corner cases + // Corner cases. if len(shards) == 0 { return fmt.Errorf("no shards in keyspace %v", keyspace) } - sort.Strings(shards) if len(shards) == 1 { - return wr.ValidatePermissionsShard(ctx, keyspace, shards[0]) + return wr.ValidatePermissionsShard(ctx, keyspace, maps.Keys(shards)[0]) } - // find the reference permissions using the first shard's primary - si, err := wr.ts.GetShard(ctx, keyspace, shards[0]) - if err != nil { - return err - } - if !si.HasPrimary() { - return fmt.Errorf("no primary in shard %v/%v", keyspace, shards[0]) + // Find the reference permissions using the first shard's primary. + shardName := maps.Keys(shards)[0] + shard := shards[shardName] + if !shard.HasPrimary() { + return fmt.Errorf("no primary in shard %v/%v", keyspace, shardName) } - referenceAlias := si.PrimaryAlias + referenceAlias := shard.PrimaryAlias log.Infof("Gathering permissions for reference primary %v", topoproto.TabletAliasString(referenceAlias)) - referencePermissions, err := wr.GetPermissions(ctx, si.PrimaryAlias) + referencePermissions, err := wr.GetPermissions(ctx, shard.PrimaryAlias) if err != nil { return err } @@ -134,15 +135,15 @@ func (wr *Wrangler) ValidatePermissionsKeyspace(ctx context.Context, keyspace st // then diff with all tablets but primary 0 er := concurrency.AllErrorRecorder{} wg := sync.WaitGroup{} - for _, shard := range shards { - aliases, err := wr.ts.FindAllTabletAliasesInShard(ctx, keyspace, shard) + for shardName, shard := range shards { + aliases, err := wr.ts.FindAllTabletAliasesInShard(ctx, keyspace, shardName) if err != nil { er.RecordError(err) continue } for _, alias := range aliases { - if topoproto.TabletAliasEqual(alias, si.PrimaryAlias) { + if topoproto.TabletAliasEqual(alias, shard.PrimaryAlias) { continue } From c26678a281a6c601a1e1da831d8c4a1882893fe2 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 26 Jan 2024 17:17:06 -0500 Subject: [PATCH 07/22] Add dedicated unit test Signed-off-by: Matt Lord --- go/vt/schemamanager/tablet_executor.go | 6 +- go/vt/topo/keyspace_external_test.go | 84 +++++++++++++++++++++++++- go/vt/topo/keyspace_test.go | 63 ------------------- go/vt/topo/memorytopo/directory.go | 2 + go/vt/topo/memorytopo/file.go | 10 +++ go/vt/topo/memorytopo/lock.go | 2 + go/vt/topo/memorytopo/memorytopo.go | 11 ++++ go/vt/topo/memorytopo/watch.go | 4 ++ 8 files changed, 114 insertions(+), 68 deletions(-) delete mode 100644 go/vt/topo/keyspace_test.go diff --git a/go/vt/schemamanager/tablet_executor.go b/go/vt/schemamanager/tablet_executor.go index 85e14fb51a9..42f3805dd87 100644 --- a/go/vt/schemamanager/tablet_executor.go +++ b/go/vt/schemamanager/tablet_executor.go @@ -113,8 +113,7 @@ func (exec *TabletExecutor) Open(ctx context.Context, keyspace string) error { if err != nil { return fmt.Errorf("unable to get shards for keyspace: %s, error: %v", keyspace, err) } - exec.tablets = make([]*topodatapb.Tablet, len(shards)) - i := 0 + exec.tablets = make([]*topodatapb.Tablet, 0, len(shards)) for shardName, shardInfo := range shards { if !shardInfo.HasPrimary() { return fmt.Errorf("shard: %s does not have a primary", shardName) @@ -123,8 +122,7 @@ func (exec *TabletExecutor) Open(ctx context.Context, keyspace string) error { if err != nil { return fmt.Errorf("unable to get primary tablet info, keyspace: %s, shard: %s, error: %v", keyspace, shardName, err) } - exec.tablets[i] = tabletInfo.Tablet - i++ + exec.tablets = append(exec.tablets, tabletInfo.Tablet) } if len(exec.tablets) == 0 { diff --git a/go/vt/topo/keyspace_external_test.go b/go/vt/topo/keyspace_external_test.go index 064c4cba93b..f16a6af722e 100644 --- a/go/vt/topo/keyspace_external_test.go +++ b/go/vt/topo/keyspace_external_test.go @@ -18,14 +18,17 @@ package topo_test import ( "context" + "fmt" + "slices" "testing" "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/key" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) func TestServerFindAllShardsInKeyspace(t *testing.T) { @@ -87,3 +90,82 @@ func TestServerFindAllShardsInKeyspace(t *testing.T) { }) } } + +func TestServerGetServingShards(t *testing.T) { + keyspace := "ks1" + listErr := topo.NewError(topo.NoImplementation, "don't be doing no listing round here") + tests := []struct { + shards int // Number of shards to create + err string // Error message we expect, if any + fallback bool // Should we fallback to the shard by shard method + }{ + { + shards: 0, + err: fmt.Sprintf("%s has no serving shards", keyspace), + }, + { + shards: 2, + }, + { + shards: 128, + }, + { + shards: 512, + fallback: true, + }, + { + shards: 1024, + }, + } + + for _, tt := range tests { + t.Run(fmt.Sprintf("%d shards with fallback = %t", tt.shards, tt.fallback), func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts, factory := memorytopo.NewServerAndFactory(ctx) + defer ts.Close() + stats := factory.GetStats() + + if tt.fallback { + factory.SetListError(listErr) + } + + err := ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}) + require.NoError(t, err) + var shardNames []string + if tt.shards > 0 { + shardNames, err = key.GenerateShardRanges(tt.shards) + require.NoError(t, err) + for _, shardName := range shardNames { + err = ts.CreateShard(ctx, keyspace, shardName) + require.NoError(t, err) + } + } + + // Verify that we return a complete list of shards and that each + // key range is present in the output. + stats.ResetAll() // We only want the stats for GetServingShards + shardInfos, err := ts.GetServingShards(ctx, keyspace) + if tt.err != "" { + require.EqualError(t, err, tt.err) + } else { + require.NoError(t, err) + } + require.Len(t, shardInfos, tt.shards) + + for _, shardName := range shardNames { + f := func(si *topo.ShardInfo) bool { + return key.KeyRangeString(si.Shard.KeyRange) == shardName + } + require.True(t, slices.ContainsFunc(shardInfos, f), "shard %q was not found", shardName) + } + + // Now we check the stats based on the number of shards and whether or not + // we should have had a List error and fell back to the shard by shard method. + require.Equal(t, int64(1), stats.Counts()["List"]) // We should always try + if tt.fallback { + require.Equal(t, int64(tt.shards), stats.Counts()["Get"]) + } + }) + } +} diff --git a/go/vt/topo/keyspace_test.go b/go/vt/topo/keyspace_test.go deleted file mode 100644 index e7d3f0fc294..00000000000 --- a/go/vt/topo/keyspace_test.go +++ /dev/null @@ -1,63 +0,0 @@ -/* -Copyright 2024 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package topo - -import ( - "context" - "reflect" - "testing" -) - -func TestGetServingShards(t *testing.T) { - type fields struct { - globalCell Conn - globalReadOnlyCell Conn - factory Factory - cellConns map[string]cellConn - } - type args struct { - ctx context.Context - keyspace string - } - tests := []struct { - name string - fields fields - args args - want []*ShardInfo - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ts := &Server{ - globalCell: tt.fields.globalCell, - globalReadOnlyCell: tt.fields.globalReadOnlyCell, - factory: tt.fields.factory, - cellConns: tt.fields.cellConns, - } - got, err := ts.GetServingShards(tt.args.ctx, tt.args.keyspace) - if (err != nil) != tt.wantErr { - t.Errorf("Server.GetServingShards() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("Server.GetServingShards() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/go/vt/topo/memorytopo/directory.go b/go/vt/topo/memorytopo/directory.go index f68c87a2166..aceb3a3423f 100644 --- a/go/vt/topo/memorytopo/directory.go +++ b/go/vt/topo/memorytopo/directory.go @@ -27,6 +27,8 @@ import ( // ListDir is part of the topo.Conn interface. func (c *Conn) ListDir(ctx context.Context, dirPath string, full bool) ([]topo.DirEntry, error) { + c.factory.stats.Add([]string{"ListDir"}, 1) + if err := c.dial(ctx); err != nil { return nil, err } diff --git a/go/vt/topo/memorytopo/file.go b/go/vt/topo/memorytopo/file.go index cc19eb79011..9de07801e5c 100644 --- a/go/vt/topo/memorytopo/file.go +++ b/go/vt/topo/memorytopo/file.go @@ -30,6 +30,8 @@ import ( // Create is part of topo.Conn interface. func (c *Conn) Create(ctx context.Context, filePath string, contents []byte) (topo.Version, error) { + c.factory.stats.Add([]string{"Create"}, 1) + if err := c.dial(ctx); err != nil { return nil, err } @@ -74,6 +76,8 @@ func (c *Conn) Create(ctx context.Context, filePath string, contents []byte) (to // Update is part of topo.Conn interface. func (c *Conn) Update(ctx context.Context, filePath string, contents []byte, version topo.Version) (topo.Version, error) { + c.factory.stats.Add([]string{"Update"}, 1) + if err := c.dial(ctx); err != nil { return nil, err } @@ -152,6 +156,8 @@ func (c *Conn) Update(ctx context.Context, filePath string, contents []byte, ver // Get is part of topo.Conn interface. func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version, error) { + c.factory.stats.Add([]string{"Get"}, 1) + if err := c.dial(ctx); err != nil { return nil, nil, err } @@ -177,6 +183,8 @@ func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version, // List is part of the topo.Conn interface. func (c *Conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) { + c.factory.stats.Add([]string{"List"}, 1) + if err := c.dial(ctx); err != nil { return nil, err } @@ -239,6 +247,8 @@ func gatherChildren(n *node, dirPath string) []topo.KVInfo { // Delete is part of topo.Conn interface. func (c *Conn) Delete(ctx context.Context, filePath string, version topo.Version) error { + c.factory.stats.Add([]string{"Delete"}, 1) + if err := c.dial(ctx); err != nil { return err } diff --git a/go/vt/topo/memorytopo/lock.go b/go/vt/topo/memorytopo/lock.go index c15fb9099bb..6a6fc9b5d2e 100644 --- a/go/vt/topo/memorytopo/lock.go +++ b/go/vt/topo/memorytopo/lock.go @@ -42,11 +42,13 @@ type memoryTopoLockDescriptor struct { // TryLock is part of the topo.Conn interface. Its implementation is same as Lock func (c *Conn) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { + c.factory.stats.Add([]string{"TryLock"}, 1) return c.Lock(ctx, dirPath, contents) } // Lock is part of the topo.Conn interface. func (c *Conn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { + c.factory.stats.Add([]string{"Lock"}, 1) return c.lock(ctx, dirPath, contents) } diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index ae33bb73edd..3910815c91f 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -26,6 +26,7 @@ import ( "strings" "sync" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" @@ -73,6 +74,8 @@ type Factory struct { // listErr is used for testing purposed to fake errors from // calls to List. listErr error + // stats allows us to keep track of how many calls we make. + stats *stats.CountersWithMultiLabels } // HasGlobalReadOnlyCell is part of the topo.Factory interface. @@ -108,6 +111,10 @@ func (f *Factory) SetError(err error) { } } +func (f *Factory) GetStats() *stats.CountersWithMultiLabels { + return f.stats +} + // Lock blocks all requests to the topo and is exposed to allow tests to // simulate an unresponsive topo server func (f *Factory) Lock() { @@ -145,6 +152,9 @@ func (c *Conn) dial(ctx context.Context) error { // Close is part of the topo.Conn interface. func (c *Conn) Close() { c.closed = true + if c.factory != nil { + c.factory.stats = nil + } } type watch struct { @@ -240,6 +250,7 @@ func NewServerAndFactory(ctx context.Context, cells ...string) (*topo.Server, *F f := &Factory{ cells: make(map[string]*node), generation: uint64(rand.Int63n(1 << 60)), + stats: stats.NewCountersWithMultiLabels("", "", []string{"operation"}), } f.cells[topo.GlobalCell] = f.newDirectory(topo.GlobalCell, nil) diff --git a/go/vt/topo/memorytopo/watch.go b/go/vt/topo/memorytopo/watch.go index 73b2d248434..241abda7649 100644 --- a/go/vt/topo/memorytopo/watch.go +++ b/go/vt/topo/memorytopo/watch.go @@ -25,6 +25,8 @@ import ( // Watch is part of the topo.Conn interface. func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { + c.factory.stats.Add([]string{"Watch"}, 1) + if c.closed { return nil, nil, ErrConnectionClosed } @@ -75,6 +77,8 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c // WatchRecursive is part of the topo.Conn interface. func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { + c.factory.stats.Add([]string{"WatchRecursive"}, 1) + if c.closed { return nil, nil, ErrConnectionClosed } From fbefa4e7a2af3d39933becaed2e86784b1623f17 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 26 Jan 2024 17:45:47 -0500 Subject: [PATCH 08/22] This change was not needed (may be more expensive) Signed-off-by: Matt Lord --- go/vt/wrangler/permissions.go | 37 +++++++++++++++++------------------ 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/go/vt/wrangler/permissions.go b/go/vt/wrangler/permissions.go index 987b09cfce1..652ae7f44a3 100644 --- a/go/vt/wrangler/permissions.go +++ b/go/vt/wrangler/permissions.go @@ -18,16 +18,14 @@ package wrangler import ( "fmt" + "sort" "sync" "context" - "golang.org/x/exp/maps" - "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl/tmutils" - "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" @@ -103,31 +101,32 @@ func (wr *Wrangler) ValidatePermissionsShard(ctx context.Context, keyspace, shar // ValidatePermissionsKeyspace validates all the permissions are the same // in a keyspace func (wr *Wrangler) ValidatePermissionsKeyspace(ctx context.Context, keyspace string) error { - // Find all the shards. - shards, err := wr.ts.FindAllShardsInKeyspace(ctx, keyspace, &topo.FindAllShardsInKeyspaceOptions{ - Concurrency: topo.DefaultConcurrency, // Limit concurrency to avoid overwhelming the topo server. - }) + // find all the shards + shards, err := wr.ts.GetShardNames(ctx, keyspace) if err != nil { return err } - // Corner cases. + // corner cases if len(shards) == 0 { return fmt.Errorf("no shards in keyspace %v", keyspace) } + sort.Strings(shards) if len(shards) == 1 { - return wr.ValidatePermissionsShard(ctx, keyspace, maps.Keys(shards)[0]) + return wr.ValidatePermissionsShard(ctx, keyspace, shards[0]) } - // Find the reference permissions using the first shard's primary. - shardName := maps.Keys(shards)[0] - shard := shards[shardName] - if !shard.HasPrimary() { - return fmt.Errorf("no primary in shard %v/%v", keyspace, shardName) + // find the reference permissions using the first shard's primary + si, err := wr.ts.GetShard(ctx, keyspace, shards[0]) + if err != nil { + return err + } + if !si.HasPrimary() { + return fmt.Errorf("no primary in shard %v/%v", keyspace, shards[0]) } - referenceAlias := shard.PrimaryAlias + referenceAlias := si.PrimaryAlias log.Infof("Gathering permissions for reference primary %v", topoproto.TabletAliasString(referenceAlias)) - referencePermissions, err := wr.GetPermissions(ctx, shard.PrimaryAlias) + referencePermissions, err := wr.GetPermissions(ctx, si.PrimaryAlias) if err != nil { return err } @@ -135,15 +134,15 @@ func (wr *Wrangler) ValidatePermissionsKeyspace(ctx context.Context, keyspace st // then diff with all tablets but primary 0 er := concurrency.AllErrorRecorder{} wg := sync.WaitGroup{} - for shardName, shard := range shards { - aliases, err := wr.ts.FindAllTabletAliasesInShard(ctx, keyspace, shardName) + for _, shard := range shards { + aliases, err := wr.ts.FindAllTabletAliasesInShard(ctx, keyspace, shard) if err != nil { er.RecordError(err) continue } for _, alias := range aliases { - if topoproto.TabletAliasEqual(alias, shard.PrimaryAlias) { + if topoproto.TabletAliasEqual(alias, si.PrimaryAlias) { continue } From 6d7a440677d8438b1aeaab6a15f4bd20714dfe11 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 26 Jan 2024 19:38:17 -0500 Subject: [PATCH 09/22] Nit from self review Signed-off-by: Matt Lord --- go/vt/topo/memorytopo/memorytopo.go | 4 ++-- go/vt/topo/test/shard.go | 11 ++++++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index 3910815c91f..d5b87457ff3 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -152,8 +152,8 @@ func (c *Conn) dial(ctx context.Context) error { // Close is part of the topo.Conn interface. func (c *Conn) Close() { c.closed = true - if c.factory != nil { - c.factory.stats = nil + if c.factory != nil && c.factory.stats != nil { + c.factory.stats.ResetAll() } } diff --git a/go/vt/topo/test/shard.go b/go/vt/topo/test/shard.go index fdf5d378d03..93ea77d7365 100644 --- a/go/vt/topo/test/shard.go +++ b/go/vt/topo/test/shard.go @@ -83,18 +83,23 @@ func checkShard(t *testing.T, ctx context.Context, ts *topo.Server) { // Test FindAllShardsInKeyspace. require.NoError(t, err) - shards, err := ts.FindAllShardsInKeyspace(ctx, "test_keyspace", &topo.FindAllShardsInKeyspaceOptions{ + _, err = ts.FindAllShardsInKeyspace(ctx, "test_keyspace", &topo.FindAllShardsInKeyspaceOptions{ Concurrency: topo.DefaultConcurrency, }) require.NoError(t, err) + // Test GetServingShards. + require.NoError(t, err) + _, err = ts.GetServingShards(ctx, "test_keyspace") + require.NoError(t, err) + // test GetShardNames shardNames, err := ts.GetShardNames(ctx, "test_keyspace") if err != nil { t.Errorf("GetShardNames: %v", err) } - if len(shards) != 1 || shardNames[0] != "b0-c0" { - t.Errorf(`GetShardNames: want [ "b0-c0" ], got %v`, shards) + if len(shardNames) != 1 || shardNames[0] != "b0-c0" { + t.Errorf(`GetShardNames: want [ "b0-c0" ], got %v`, shardNames) } if _, err := ts.GetShardNames(ctx, "test_keyspace666"); !topo.IsErrType(err, topo.NoNode) { From 4871d816d5949d0f76d038638aa5b279eba81a24 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 26 Jan 2024 22:56:16 -0500 Subject: [PATCH 10/22] Small improvements to the unit test Signed-off-by: Matt Lord --- go/vt/topo/keyspace_external_test.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/go/vt/topo/keyspace_external_test.go b/go/vt/topo/keyspace_external_test.go index f16a6af722e..2614b72e254 100644 --- a/go/vt/topo/keyspace_external_test.go +++ b/go/vt/topo/keyspace_external_test.go @@ -93,7 +93,7 @@ func TestServerFindAllShardsInKeyspace(t *testing.T) { func TestServerGetServingShards(t *testing.T) { keyspace := "ks1" - listErr := topo.NewError(topo.NoImplementation, "don't be doing no listing round here") + noListImplErr := topo.NewError(topo.NoImplementation, "don't be doing no listing round here") tests := []struct { shards int // Number of shards to create err string // Error message we expect, if any @@ -127,7 +127,7 @@ func TestServerGetServingShards(t *testing.T) { stats := factory.GetStats() if tt.fallback { - factory.SetListError(listErr) + factory.SetListError(noListImplErr) } err := ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}) @@ -148,11 +148,11 @@ func TestServerGetServingShards(t *testing.T) { shardInfos, err := ts.GetServingShards(ctx, keyspace) if tt.err != "" { require.EqualError(t, err, tt.err) + return } else { require.NoError(t, err) } require.Len(t, shardInfos, tt.shards) - for _, shardName := range shardNames { f := func(si *topo.ShardInfo) bool { return key.KeyRangeString(si.Shard.KeyRange) == shardName @@ -163,8 +163,16 @@ func TestServerGetServingShards(t *testing.T) { // Now we check the stats based on the number of shards and whether or not // we should have had a List error and fell back to the shard by shard method. require.Equal(t, int64(1), stats.Counts()["List"]) // We should always try - if tt.fallback { - require.Equal(t, int64(tt.shards), stats.Counts()["Get"]) + switch { + case tt.fallback: // We get the shards one by one from the list + require.Equal(t, int64(1), stats.Counts()["ListDir"]) // GetShardNames + require.Equal(t, int64(tt.shards), stats.Counts()["Get"]) // GetShard + case tt.shards < 1: // We use a Get to check that the keyspace exists + require.Equal(t, int64(0), stats.Counts()["ListDir"]) + require.Equal(t, int64(1), stats.Counts()["Get"]) + default: // We should not make any ListDir or Get calls + require.Equal(t, int64(0), stats.Counts()["ListDir"]) + require.Equal(t, int64(0), stats.Counts()["Get"]) } }) } From 1e36795b9bc062a46c4d74e5017436216748ef0c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 27 Jan 2024 10:04:23 -0500 Subject: [PATCH 11/22] Minor improvements after self review Signed-off-by: Matt Lord --- go/vt/topo/keyspace_external_test.go | 7 ++++--- go/vt/topo/memorytopo/directory.go | 2 +- go/vt/topo/memorytopo/election.go | 4 +++- go/vt/topo/memorytopo/file.go | 10 +++++----- go/vt/topo/memorytopo/lock.go | 6 ++++-- go/vt/topo/memorytopo/memorytopo.go | 15 +++++++-------- go/vt/topo/memorytopo/watch.go | 4 ++-- 7 files changed, 26 insertions(+), 22 deletions(-) diff --git a/go/vt/topo/keyspace_external_test.go b/go/vt/topo/keyspace_external_test.go index 2614b72e254..1ba71d224b6 100644 --- a/go/vt/topo/keyspace_external_test.go +++ b/go/vt/topo/keyspace_external_test.go @@ -93,7 +93,8 @@ func TestServerFindAllShardsInKeyspace(t *testing.T) { func TestServerGetServingShards(t *testing.T) { keyspace := "ks1" - noListImplErr := topo.NewError(topo.NoImplementation, "don't be doing no listing round here") + errNoListImpl := topo.NewError(topo.NoImplementation, "don't be doing no listing round here") + tests := []struct { shards int // Number of shards to create err string // Error message we expect, if any @@ -124,10 +125,10 @@ func TestServerGetServingShards(t *testing.T) { defer cancel() ts, factory := memorytopo.NewServerAndFactory(ctx) defer ts.Close() - stats := factory.GetStats() + stats := factory.GetCallStats() if tt.fallback { - factory.SetListError(noListImplErr) + factory.SetListError(errNoListImpl) } err := ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}) diff --git a/go/vt/topo/memorytopo/directory.go b/go/vt/topo/memorytopo/directory.go index aceb3a3423f..8e673f474a6 100644 --- a/go/vt/topo/memorytopo/directory.go +++ b/go/vt/topo/memorytopo/directory.go @@ -27,7 +27,7 @@ import ( // ListDir is part of the topo.Conn interface. func (c *Conn) ListDir(ctx context.Context, dirPath string, full bool) ([]topo.DirEntry, error) { - c.factory.stats.Add([]string{"ListDir"}, 1) + c.factory.callstats.Add([]string{"ListDir"}, 1) if err := c.dial(ctx); err != nil { return nil, err diff --git a/go/vt/topo/memorytopo/election.go b/go/vt/topo/memorytopo/election.go index 868a2c53287..52bbe2e93ce 100644 --- a/go/vt/topo/memorytopo/election.go +++ b/go/vt/topo/memorytopo/election.go @@ -24,8 +24,10 @@ import ( "vitess.io/vitess/go/vt/topo" ) -// NewLeaderParticipation is part of the topo.Server interface +// NewLeaderParticipation is part of the topo.Conn interface. func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation, error) { + c.factory.callstats.Add([]string{"NewLeaderParticipation"}, 1) + if c.closed { return nil, ErrConnectionClosed } diff --git a/go/vt/topo/memorytopo/file.go b/go/vt/topo/memorytopo/file.go index 9de07801e5c..800e7791afa 100644 --- a/go/vt/topo/memorytopo/file.go +++ b/go/vt/topo/memorytopo/file.go @@ -30,7 +30,7 @@ import ( // Create is part of topo.Conn interface. func (c *Conn) Create(ctx context.Context, filePath string, contents []byte) (topo.Version, error) { - c.factory.stats.Add([]string{"Create"}, 1) + c.factory.callstats.Add([]string{"Create"}, 1) if err := c.dial(ctx); err != nil { return nil, err @@ -76,7 +76,7 @@ func (c *Conn) Create(ctx context.Context, filePath string, contents []byte) (to // Update is part of topo.Conn interface. func (c *Conn) Update(ctx context.Context, filePath string, contents []byte, version topo.Version) (topo.Version, error) { - c.factory.stats.Add([]string{"Update"}, 1) + c.factory.callstats.Add([]string{"Update"}, 1) if err := c.dial(ctx); err != nil { return nil, err @@ -156,7 +156,7 @@ func (c *Conn) Update(ctx context.Context, filePath string, contents []byte, ver // Get is part of topo.Conn interface. func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version, error) { - c.factory.stats.Add([]string{"Get"}, 1) + c.factory.callstats.Add([]string{"Get"}, 1) if err := c.dial(ctx); err != nil { return nil, nil, err @@ -183,7 +183,7 @@ func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version, // List is part of the topo.Conn interface. func (c *Conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) { - c.factory.stats.Add([]string{"List"}, 1) + c.factory.callstats.Add([]string{"List"}, 1) if err := c.dial(ctx); err != nil { return nil, err @@ -247,7 +247,7 @@ func gatherChildren(n *node, dirPath string) []topo.KVInfo { // Delete is part of topo.Conn interface. func (c *Conn) Delete(ctx context.Context, filePath string, version topo.Version) error { - c.factory.stats.Add([]string{"Delete"}, 1) + c.factory.callstats.Add([]string{"Delete"}, 1) if err := c.dial(ctx); err != nil { return err diff --git a/go/vt/topo/memorytopo/lock.go b/go/vt/topo/memorytopo/lock.go index 6a6fc9b5d2e..0545ba8b182 100644 --- a/go/vt/topo/memorytopo/lock.go +++ b/go/vt/topo/memorytopo/lock.go @@ -42,13 +42,15 @@ type memoryTopoLockDescriptor struct { // TryLock is part of the topo.Conn interface. Its implementation is same as Lock func (c *Conn) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { - c.factory.stats.Add([]string{"TryLock"}, 1) + c.factory.callstats.Add([]string{"TryLock"}, 1) + return c.Lock(ctx, dirPath, contents) } // Lock is part of the topo.Conn interface. func (c *Conn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { - c.factory.stats.Add([]string{"Lock"}, 1) + c.factory.callstats.Add([]string{"Lock"}, 1) + return c.lock(ctx, dirPath, contents) } diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index d5b87457ff3..b8ddd5f5348 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -74,8 +74,9 @@ type Factory struct { // listErr is used for testing purposed to fake errors from // calls to List. listErr error - // stats allows us to keep track of how many calls we make. - stats *stats.CountersWithMultiLabels + // callstats allows us to keep track of how many topo.Conn calls + // we make (Create, Get, Update, Delete, List, ListDir, etc). + callstats *stats.CountersWithMultiLabels } // HasGlobalReadOnlyCell is part of the topo.Factory interface. @@ -111,8 +112,8 @@ func (f *Factory) SetError(err error) { } } -func (f *Factory) GetStats() *stats.CountersWithMultiLabels { - return f.stats +func (f *Factory) GetCallStats() *stats.CountersWithMultiLabels { + return f.callstats } // Lock blocks all requests to the topo and is exposed to allow tests to @@ -151,10 +152,8 @@ func (c *Conn) dial(ctx context.Context) error { // Close is part of the topo.Conn interface. func (c *Conn) Close() { + c.factory.callstats.Add([]string{"Close"}, 1) c.closed = true - if c.factory != nil && c.factory.stats != nil { - c.factory.stats.ResetAll() - } } type watch struct { @@ -250,7 +249,7 @@ func NewServerAndFactory(ctx context.Context, cells ...string) (*topo.Server, *F f := &Factory{ cells: make(map[string]*node), generation: uint64(rand.Int63n(1 << 60)), - stats: stats.NewCountersWithMultiLabels("", "", []string{"operation"}), + callstats: stats.NewCountersWithMultiLabels("", "", []string{"calls"}), } f.cells[topo.GlobalCell] = f.newDirectory(topo.GlobalCell, nil) diff --git a/go/vt/topo/memorytopo/watch.go b/go/vt/topo/memorytopo/watch.go index 241abda7649..8d9ef5cb54c 100644 --- a/go/vt/topo/memorytopo/watch.go +++ b/go/vt/topo/memorytopo/watch.go @@ -25,7 +25,7 @@ import ( // Watch is part of the topo.Conn interface. func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, error) { - c.factory.stats.Add([]string{"Watch"}, 1) + c.factory.callstats.Add([]string{"Watch"}, 1) if c.closed { return nil, nil, ErrConnectionClosed @@ -77,7 +77,7 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c // WatchRecursive is part of the topo.Conn interface. func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.WatchDataRecursive, <-chan *topo.WatchDataRecursive, error) { - c.factory.stats.Add([]string{"WatchRecursive"}, 1) + c.factory.callstats.Add([]string{"WatchRecursive"}, 1) if c.closed { return nil, nil, ErrConnectionClosed From 8a9f8d535a9b1ce9b2243a57bc947d6885f2691a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 27 Jan 2024 10:29:33 -0500 Subject: [PATCH 12/22] Add keyrange validity check Signed-off-by: Matt Lord --- go/vt/topo/keyspace.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index c67c831c9ec..0538c273163 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -208,6 +208,10 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, for _, entry := range listResults { // The key looks like this: /vitess/global/keyspaces/commerce/shards/-80/Shard shardName := path.Base(path.Dir(string(entry.Key))) // The base part of the dir is "-80" + if !key.IsValidKeyRange(shardName) { + return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): unexpected shard key/path %q contains invalid shard name/range %q", + keyspace, string(entry.Key), shardName) + } shard := &topodatapb.Shard{} if err = shard.UnmarshalVT(entry.Value); err != nil { return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): bad shard data", keyspace) From 1041e599af0b8f6e60eced93526e97ee182bdda2 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 27 Jan 2024 11:05:35 -0500 Subject: [PATCH 13/22] Support legacy sequences as shard names Signed-off-by: Matt Lord --- go/vt/topo/keyspace.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index 0538c273163..c8b8ebee292 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -21,6 +21,7 @@ import ( "path" "runtime" "sort" + "strconv" "sync" "golang.org/x/sync/errgroup" @@ -208,9 +209,15 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, for _, entry := range listResults { // The key looks like this: /vitess/global/keyspaces/commerce/shards/-80/Shard shardName := path.Base(path.Dir(string(entry.Key))) // The base part of the dir is "-80" + // Validate the shard name. if !key.IsValidKeyRange(shardName) { - return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): unexpected shard key/path %q contains invalid shard name/range %q", - keyspace, string(entry.Key), shardName) + // Accept legacy shard names that are not valid key ranges but instead use + // names that are simply numbered sequences: 0,1,2,3... This is e.g. used + // in various tests today. + if _, err := strconv.Atoi(shardName); err != nil { + return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): unexpected shard key/path %q contains invalid shard name/range %q", + keyspace, string(entry.Key), shardName) + } } shard := &topodatapb.Shard{} if err = shard.UnmarshalVT(entry.Value); err != nil { From cebde7adc16a3221a29574cbfaa57bead9016b24 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 27 Jan 2024 11:20:39 -0500 Subject: [PATCH 14/22] Use existing ValidateShardName function Signed-off-by: Matt Lord --- go/vt/topo/keyspace.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index c8b8ebee292..b13e0724d6f 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -21,7 +21,6 @@ import ( "path" "runtime" "sort" - "strconv" "sync" "golang.org/x/sync/errgroup" @@ -207,17 +206,13 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, if err == nil { // We have everything we need to build the result result := make(map[string]*ShardInfo, len(listResults)) for _, entry := range listResults { - // The key looks like this: /vitess/global/keyspaces/commerce/shards/-80/Shard - shardName := path.Base(path.Dir(string(entry.Key))) // The base part of the dir is "-80" - // Validate the shard name. - if !key.IsValidKeyRange(shardName) { - // Accept legacy shard names that are not valid key ranges but instead use - // names that are simply numbered sequences: 0,1,2,3... This is e.g. used - // in various tests today. - if _, err := strconv.Atoi(shardName); err != nil { - return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): unexpected shard key/path %q contains invalid shard name/range %q", - keyspace, string(entry.Key), shardName) - } + // The shard key looks like this: /vitess/global/keyspaces/commerce/shards/-80/Shard + shardKey := string(entry.Key) + shardName := path.Base(path.Dir(shardKey)) // The base part of the dir is "-80" + // Validate the extracted shard name. + if _, _, err := ValidateShardName(shardName); err != nil { + return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): unexpected shard key/path %q contains invalid shard name/range %q", + keyspace, shardKey, shardName) } shard := &topodatapb.Shard{} if err = shard.UnmarshalVT(entry.Value); err != nil { From 4cd8a23b4d4d8631c941754593c50c6219400d3f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 27 Jan 2024 11:36:48 -0500 Subject: [PATCH 15/22] Can't stop nitting... (send help) Signed-off-by: Matt Lord --- go/vt/topo/keyspace.go | 5 +++-- go/vt/topo/keyspace_external_test.go | 23 +++++++++++++---------- go/vt/topo/memorytopo/memorytopo.go | 2 +- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index b13e0724d6f..7005b2071d4 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -210,13 +210,14 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, shardKey := string(entry.Key) shardName := path.Base(path.Dir(shardKey)) // The base part of the dir is "-80" // Validate the extracted shard name. - if _, _, err := ValidateShardName(shardName); err != nil { + if _, _, err = ValidateShardName(shardName); err != nil { return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): unexpected shard key/path %q contains invalid shard name/range %q", keyspace, shardKey, shardName) } shard := &topodatapb.Shard{} if err = shard.UnmarshalVT(entry.Value); err != nil { - return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): bad shard data", keyspace) + return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): invalid data found for shard %q in %q", + keyspace, shardName, shardKey) } result[shardName] = &ShardInfo{ keyspace: keyspace, diff --git a/go/vt/topo/keyspace_external_test.go b/go/vt/topo/keyspace_external_test.go index 1ba71d224b6..4d62c052976 100644 --- a/go/vt/topo/keyspace_external_test.go +++ b/go/vt/topo/keyspace_external_test.go @@ -126,6 +126,7 @@ func TestServerGetServingShards(t *testing.T) { ts, factory := memorytopo.NewServerAndFactory(ctx) defer ts.Close() stats := factory.GetCallStats() + require.NotNil(t, stats) if tt.fallback { factory.SetListError(errNoListImpl) @@ -150,30 +151,32 @@ func TestServerGetServingShards(t *testing.T) { if tt.err != "" { require.EqualError(t, err, tt.err) return - } else { - require.NoError(t, err) } + require.NoError(t, err) require.Len(t, shardInfos, tt.shards) for _, shardName := range shardNames { f := func(si *topo.ShardInfo) bool { return key.KeyRangeString(si.Shard.KeyRange) == shardName } - require.True(t, slices.ContainsFunc(shardInfos, f), "shard %q was not found", shardName) + require.True(t, slices.ContainsFunc(shardInfos, f), "shard %q was not found in the results", + shardName) } // Now we check the stats based on the number of shards and whether or not // we should have had a List error and fell back to the shard by shard method. - require.Equal(t, int64(1), stats.Counts()["List"]) // We should always try + callcounts := stats.Counts() + require.NotNil(t, callcounts) + require.Equal(t, int64(1), callcounts["List"]) // We should always try switch { case tt.fallback: // We get the shards one by one from the list - require.Equal(t, int64(1), stats.Counts()["ListDir"]) // GetShardNames - require.Equal(t, int64(tt.shards), stats.Counts()["Get"]) // GetShard + require.Equal(t, int64(1), callcounts["ListDir"]) // GetShardNames + require.Equal(t, int64(tt.shards), callcounts["Get"]) // GetShard case tt.shards < 1: // We use a Get to check that the keyspace exists - require.Equal(t, int64(0), stats.Counts()["ListDir"]) - require.Equal(t, int64(1), stats.Counts()["Get"]) + require.Equal(t, int64(0), callcounts["ListDir"]) + require.Equal(t, int64(1), callcounts["Get"]) default: // We should not make any ListDir or Get calls - require.Equal(t, int64(0), stats.Counts()["ListDir"]) - require.Equal(t, int64(0), stats.Counts()["Get"]) + require.Equal(t, int64(0), callcounts["ListDir"]) + require.Equal(t, int64(0), callcounts["Get"]) } }) } diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index b8ddd5f5348..55057b62468 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -249,7 +249,7 @@ func NewServerAndFactory(ctx context.Context, cells ...string) (*topo.Server, *F f := &Factory{ cells: make(map[string]*node), generation: uint64(rand.Int63n(1 << 60)), - callstats: stats.NewCountersWithMultiLabels("", "", []string{"calls"}), + callstats: stats.NewCountersWithMultiLabels("", "", []string{"Call"}), } f.cells[topo.GlobalCell] = f.newDirectory(topo.GlobalCell, nil) From c7a27d0b60d25421cb8e6185a4b32b67f758cc83 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 29 Jan 2024 10:09:57 -0500 Subject: [PATCH 16/22] Address review comments Signed-off-by: Matt Lord --- go/vt/discovery/healthcheck.go | 8 +------- go/vt/topo/keyspace.go | 33 ++++++++++++++++++++------------- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 5e25be60f4f..bfd321a17bc 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -87,9 +87,6 @@ var ( // refreshKnownTablets tells us whether to process all tablets or only new tablets. refreshKnownTablets = true - // topoReadConcurrency tells us how many topo reads are allowed in parallel. - topoReadConcurrency int64 = 32 - // How much to sleep between each check. waitAvailableTabletInterval = 100 * time.Millisecond @@ -107,8 +104,6 @@ const ( DefaultHealthCheckRetryDelay = 5 * time.Second DefaultHealthCheckTimeout = 1 * time.Minute - // DefaultTopoReadConcurrency is used as the default value for the topoReadConcurrency parameter of a TopologyWatcher. - DefaultTopoReadConcurrency int = 5 // DefaultTopologyWatcherRefreshInterval is used as the default value for // the refresh interval of a topology watcher. DefaultTopologyWatcherRefreshInterval = 1 * time.Minute @@ -176,7 +171,6 @@ func registerWebUIFlags(fs *pflag.FlagSet) { fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.") fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.") fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.") - fs.Int64Var(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.") ParseTabletURLTemplateFromFlag() } @@ -362,7 +356,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur } else if len(KeyspacesToWatch) > 0 { filter = NewFilterByKeyspace(KeyspacesToWatch) } - topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topoReadConcurrency)) + topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency)) } hc.topoWatchers = topoWatchers diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index 7005b2071d4..d13040bb864 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -19,14 +19,15 @@ package topo import ( "context" "path" - "runtime" "sort" "sync" + "github.com/spf13/pflag" "golang.org/x/sync/errgroup" "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/event" @@ -37,13 +38,20 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) -// This file contains keyspace utility functions +// This file contains keyspace utility functions. // Default concurrency to use in order to avoid overhwelming the topo server. -// This uses a heuristic based on the number of vCPUs available -- where it's -// assumed that as larger machines are used for Vitess deployments they will -// be able to do more concurrently. -var DefaultConcurrency = runtime.NumCPU() +var DefaultConcurrency int64 + +func registerFlags(fs *pflag.FlagSet) { + fs.Int64Var(&DefaultConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.") +} + +func init() { + servenv.OnParseFor("vtcombo", registerFlags) + servenv.OnParseFor("vtctld", registerFlags) + servenv.OnParseFor("vtgate", registerFlags) +} // KeyspaceInfo is a meta struct that contains metadata to give the // data more context and convenience. This is the main way we interact @@ -183,7 +191,7 @@ func (ts *Server) UpdateKeyspace(ctx context.Context, ki *KeyspaceInfo) error { type FindAllShardsInKeyspaceOptions struct { // Concurrency controls the maximum number of concurrent calls to GetShard. // If <= 0, Concurrency is set to 1. - Concurrency int + Concurrency int64 } // FindAllShardsInKeyspace reads and returns all the existing shards in a @@ -230,12 +238,11 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, } if IsErrType(err, NoNode) { // The path doesn't exist, let's see if the keyspace exists. - _, kerr := ts.GetKeyspace(ctx, keyspace) - if kerr == nil { - // We simply have no shards. - return make(map[string]*ShardInfo, 0), nil + if _, kerr := ts.GetKeyspace(ctx, keyspace); kerr != nil { + return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): List", keyspace) } - return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): List", keyspace) + // We simply have no shards. + return make(map[string]*ShardInfo, 0), nil } // Currently the ZooKeeper implementation does not support scans so we // fall back to concurrently fetching the shards one by one. @@ -268,7 +275,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, ) eg, ctx := errgroup.WithContext(ctx) - eg.SetLimit(opt.Concurrency) + eg.SetLimit(int(opt.Concurrency)) for _, shard := range shards { shard := shard From 8a0638e4b5130d8dd279791f7db504aac31843f0 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 29 Jan 2024 10:15:05 -0500 Subject: [PATCH 17/22] Address review comment Signed-off-by: Matt Lord --- go/vt/schemamanager/tablet_executor.go | 4 +--- go/vt/topo/keyspace.go | 2 +- go/vt/topo/test/shard.go | 4 +--- go/vt/vtctl/workflow/utils.go | 4 +--- 4 files changed, 4 insertions(+), 10 deletions(-) diff --git a/go/vt/schemamanager/tablet_executor.go b/go/vt/schemamanager/tablet_executor.go index 42f3805dd87..4f0326f70b1 100644 --- a/go/vt/schemamanager/tablet_executor.go +++ b/go/vt/schemamanager/tablet_executor.go @@ -107,9 +107,7 @@ func (exec *TabletExecutor) Open(ctx context.Context, keyspace string) error { return nil } exec.keyspace = keyspace - shards, err := exec.ts.FindAllShardsInKeyspace(ctx, keyspace, &topo.FindAllShardsInKeyspaceOptions{ - Concurrency: topo.DefaultConcurrency, // Limit concurrency to avoid overwhelming the topo server. - }) + shards, err := exec.ts.FindAllShardsInKeyspace(ctx, keyspace, nil) if err != nil { return fmt.Errorf("unable to get shards for keyspace: %s, error: %v", keyspace, err) } diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index d13040bb864..9b0494cfa6c 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -205,7 +205,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, opt = &FindAllShardsInKeyspaceOptions{} } if opt.Concurrency <= 0 { - opt.Concurrency = 1 + opt.Concurrency = DefaultConcurrency } // First try to get all shards using List if we can. diff --git a/go/vt/topo/test/shard.go b/go/vt/topo/test/shard.go index 93ea77d7365..270b236b98d 100644 --- a/go/vt/topo/test/shard.go +++ b/go/vt/topo/test/shard.go @@ -83,9 +83,7 @@ func checkShard(t *testing.T, ctx context.Context, ts *topo.Server) { // Test FindAllShardsInKeyspace. require.NoError(t, err) - _, err = ts.FindAllShardsInKeyspace(ctx, "test_keyspace", &topo.FindAllShardsInKeyspaceOptions{ - Concurrency: topo.DefaultConcurrency, - }) + _, err = ts.FindAllShardsInKeyspace(ctx, "test_keyspace", nil) require.NoError(t, err) // Test GetServingShards. diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index c0f0c9be6e2..80b981026d8 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -326,9 +326,7 @@ func getMigrationID(targetKeyspace string, shardTablets []string) (int64, error) // // It returns ErrNoStreams if there are no targets found for the workflow. func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, targetKeyspace string, workflow string) (*TargetInfo, error) { - targetShards, err := ts.FindAllShardsInKeyspace(ctx, targetKeyspace, &topo.FindAllShardsInKeyspaceOptions{ - Concurrency: topo.DefaultConcurrency, - }) + targetShards, err := ts.FindAllShardsInKeyspace(ctx, targetKeyspace, nil) if err != nil { return nil, err } From 3fb3e0421146a117fc2979b148265ed365adf9df Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 29 Jan 2024 10:26:07 -0500 Subject: [PATCH 18/22] Address review comment Signed-off-by: Matt Lord --- go/vt/topo/keyspace.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index 9b0494cfa6c..e8159945dd6 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -209,21 +209,19 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, } // First try to get all shards using List if we can. - shardsPath := path.Join(KeyspacesPath, keyspace, ShardsPath) - listResults, err := ts.globalCell.List(ctx, shardsPath) - if err == nil { // We have everything we need to build the result - result := make(map[string]*ShardInfo, len(listResults)) - for _, entry := range listResults { + buildResultFromList := func(kvpairs []KVInfo) (map[string]*ShardInfo, error) { + result := make(map[string]*ShardInfo, len(kvpairs)) + for _, entry := range kvpairs { // The shard key looks like this: /vitess/global/keyspaces/commerce/shards/-80/Shard shardKey := string(entry.Key) shardName := path.Base(path.Dir(shardKey)) // The base part of the dir is "-80" // Validate the extracted shard name. - if _, _, err = ValidateShardName(shardName); err != nil { + if _, _, err := ValidateShardName(shardName); err != nil { return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): unexpected shard key/path %q contains invalid shard name/range %q", keyspace, shardKey, shardName) } shard := &topodatapb.Shard{} - if err = shard.UnmarshalVT(entry.Value); err != nil { + if err := shard.UnmarshalVT(entry.Value); err != nil { return nil, vterrors.Wrapf(err, "FindAllShardsInKeyspace(%s): invalid data found for shard %q in %q", keyspace, shardName, shardKey) } @@ -236,6 +234,11 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, } return result, nil } + shardsPath := path.Join(KeyspacesPath, keyspace, ShardsPath) + listRes, err := ts.globalCell.List(ctx, shardsPath) + if err == nil { // We have everything we need to build the result + return buildResultFromList(listRes) + } if IsErrType(err, NoNode) { // The path doesn't exist, let's see if the keyspace exists. if _, kerr := ts.GetKeyspace(ctx, keyspace); kerr != nil { @@ -244,8 +247,8 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, // We simply have no shards. return make(map[string]*ShardInfo, 0), nil } - // Currently the ZooKeeper implementation does not support scans so we - // fall back to concurrently fetching the shards one by one. + // Currently the ZooKeeper implementation does not support index prefix + // scans so we fall back to concurrently fetching the shards one by one. // It is also possible that the response containing all shards is too // large in which case we also fall back to the one by one fetch. if !IsErrType(err, NoImplementation) && !IsErrType(err, ResourceExhausted) { From 76e49a0498171a901c3ff34e1931e20691095e7a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 29 Jan 2024 12:18:38 -0500 Subject: [PATCH 19/22] Remove unnecessary opt usage spot (and kick CI) Signed-off-by: Matt Lord --- go/vt/topo/keyspace.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index e8159945dd6..8433f543027 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -310,9 +310,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string, // GetServingShards returns all shards where the primary is serving. func (ts *Server) GetServingShards(ctx context.Context, keyspace string) ([]*ShardInfo, error) { - shards, err := ts.FindAllShardsInKeyspace(ctx, keyspace, &FindAllShardsInKeyspaceOptions{ - Concurrency: DefaultConcurrency, // Limit concurrency to avoid overwhelming the topo server. - }) + shards, err := ts.FindAllShardsInKeyspace(ctx, keyspace, nil) if err != nil { return nil, vterrors.Wrapf(err, "failed to get list of shards for keyspace '%v'", keyspace) } From 99a7f86fbee1f6e4fa226984e93d20f80ee978ec Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 29 Jan 2024 13:43:20 -0500 Subject: [PATCH 20/22] Move topo_read_concurrency to int Signed-off-by: Matt Lord --- go/vt/discovery/topology_watcher.go | 4 ++-- go/vt/topo/keyspace.go | 6 +++--- go/vt/topo/tablet.go | 7 +++---- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index dbee11c1610..3945268f62e 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -70,7 +70,7 @@ type TopologyWatcher struct { cell string refreshInterval time.Duration refreshKnownTablets bool - concurrency int64 + concurrency int ctx context.Context cancelFunc context.CancelFunc // wg keeps track of all launched Go routines. @@ -92,7 +92,7 @@ type TopologyWatcher struct { // NewTopologyWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and reloads them as needed. -func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int64) *TopologyWatcher { +func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, healthcheck: hc, diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index 8433f543027..685d83d4b80 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -41,10 +41,10 @@ import ( // This file contains keyspace utility functions. // Default concurrency to use in order to avoid overhwelming the topo server. -var DefaultConcurrency int64 +var DefaultConcurrency int func registerFlags(fs *pflag.FlagSet) { - fs.Int64Var(&DefaultConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.") + fs.IntVar(&DefaultConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.") } func init() { @@ -191,7 +191,7 @@ func (ts *Server) UpdateKeyspace(ctx context.Context, ki *KeyspaceInfo) error { type FindAllShardsInKeyspaceOptions struct { // Concurrency controls the maximum number of concurrent calls to GetShard. // If <= 0, Concurrency is set to 1. - Concurrency int64 + Concurrency int } // FindAllShardsInKeyspace reads and returns all the existing shards in a diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go index cafe55ec5a9..f412233b43a 100644 --- a/go/vt/topo/tablet.go +++ b/go/vt/topo/tablet.go @@ -288,7 +288,7 @@ func (ts *Server) GetTabletAliasesByCell(ctx context.Context, cell string) ([]*t // Server.FindAllShardsInKeyspace. type GetTabletsByCellOptions struct { // Concurrency controls the maximum number of concurrent calls to GetTablet. - Concurrency int64 + Concurrency int } // GetTabletsByCell returns all the tablets in the cell. @@ -527,14 +527,13 @@ func (ts *Server) GetTabletMap(ctx context.Context, tabletAliases []*topodatapb. wg sync.WaitGroup tabletMap = make(map[string]*TabletInfo) returnErr error - // Previously this was always run with unlimited concurrency, so 32 should be fine. - concurrency int64 = 32 ) + concurrency := DefaultConcurrency if opt != nil && opt.Concurrency > 0 { concurrency = opt.Concurrency } - var sem = semaphore.NewWeighted(concurrency) + var sem = semaphore.NewWeighted(int64(concurrency)) for _, tabletAlias := range tabletAliases { wg.Add(1) From 484c7dcd46ceeea420a31dd2779c304622f94d39 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 29 Jan 2024 16:10:27 -0500 Subject: [PATCH 21/22] Restore the old hardcoded default value for binaries w/o the flag Signed-off-by: Matt Lord --- go/vt/topo/keyspace.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index 685d83d4b80..9cbd167d174 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -41,10 +41,10 @@ import ( // This file contains keyspace utility functions. // Default concurrency to use in order to avoid overhwelming the topo server. -var DefaultConcurrency int +var DefaultConcurrency = 32 func registerFlags(fs *pflag.FlagSet) { - fs.IntVar(&DefaultConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.") + fs.IntVar(&DefaultConcurrency, "topo_read_concurrency", DefaultConcurrency, "Concurrency of topo reads.") } func init() { From 3cdba406ec19a8fff7c110d3e98133c0a1d6b5e4 Mon Sep 17 00:00:00 2001 From: deepthi Date: Tue, 30 Jan 2024 12:48:33 +0530 Subject: [PATCH 22/22] healthcheck: remove unused constant Signed-off-by: deepthi --- go/vt/discovery/healthcheck.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index bfd321a17bc..5d6a5e32662 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -104,9 +104,6 @@ const ( DefaultHealthCheckRetryDelay = 5 * time.Second DefaultHealthCheckTimeout = 1 * time.Minute - // DefaultTopologyWatcherRefreshInterval is used as the default value for - // the refresh interval of a topology watcher. - DefaultTopologyWatcherRefreshInterval = 1 * time.Minute // healthCheckTemplate is the HTML code to display a TabletsCacheStatusList, it takes a parameter for the title // as the template can be used for both HealthCheck's cache and healthy tablets list. healthCheckTemplate = `