Skip to content

Commit

Permalink
Utilize in callsites and improve test coverage
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 26, 2024
1 parent d4b0609 commit b6286ff
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 44 deletions.
16 changes: 8 additions & 8 deletions go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,15 @@ func (exec *TabletExecutor) Open(ctx context.Context, keyspace string) error {
return nil
}
exec.keyspace = keyspace
shardNames, err := exec.ts.GetShardNames(ctx, keyspace)
shards, err := exec.ts.FindAllShardsInKeyspace(ctx, keyspace, &topo.FindAllShardsInKeyspaceOptions{
Concurrency: topo.DefaultConcurrency, // Limit concurrency to avoid overwhelming the topo server.
})
if err != nil {
return fmt.Errorf("unable to get shard names for keyspace: %s, error: %v", keyspace, err)
return fmt.Errorf("unable to get shards for keyspace: %s, error: %v", keyspace, err)
}
exec.tablets = make([]*topodatapb.Tablet, len(shardNames))
for i, shardName := range shardNames {
shardInfo, err := exec.ts.GetShard(ctx, keyspace, shardName)
if err != nil {
return fmt.Errorf("unable to get shard info, keyspace: %s, shard: %s, error: %v", keyspace, shardName, err)
}
exec.tablets = make([]*topodatapb.Tablet, len(shards))
i := 0
for shardName, shardInfo := range shards {
if !shardInfo.HasPrimary() {
return fmt.Errorf("shard: %s does not have a primary", shardName)
}
Expand All @@ -125,6 +124,7 @@ func (exec *TabletExecutor) Open(ctx context.Context, keyspace string) error {
return fmt.Errorf("unable to get primary tablet info, keyspace: %s, shard: %s, error: %v", keyspace, shardName, err)
}
exec.tablets[i] = tabletInfo.Tablet
i++
}

if len(exec.tablets) == 0 {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/topo/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
// This uses a heuristic based on the number of vCPUs available -- where it's
// assumed that as larger machines are used for Vitess deployments they will
// be able to do more concurrently.
var defaultConcurrency = runtime.NumCPU()
var DefaultConcurrency = runtime.NumCPU()

// KeyspaceInfo is a meta struct that contains metadata to give the
// data more context and convenience. This is the main way we interact
Expand Down Expand Up @@ -294,7 +294,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string,
// GetServingShards returns all shards where the primary is serving.
func (ts *Server) GetServingShards(ctx context.Context, keyspace string) ([]*ShardInfo, error) {
shards, err := ts.FindAllShardsInKeyspace(ctx, keyspace, &FindAllShardsInKeyspaceOptions{
Concurrency: defaultConcurrency, // Limit concurrency to avoid overwhelming the topo server.
Concurrency: DefaultConcurrency, // Limit concurrency to avoid overwhelming the topo server.
})
if err != nil {
return nil, vterrors.Wrapf(err, "failed to get list of shards for keyspace '%v'", keyspace)
Expand Down
63 changes: 63 additions & 0 deletions go/vt/topo/keyspace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package topo

import (
"context"
"reflect"
"testing"
)

func TestGetServingShards(t *testing.T) {
type fields struct {
globalCell Conn
globalReadOnlyCell Conn
factory Factory
cellConns map[string]cellConn
}
type args struct {
ctx context.Context
keyspace string
}
tests := []struct {
name string
fields fields
args args
want []*ShardInfo
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts := &Server{
globalCell: tt.fields.globalCell,
globalReadOnlyCell: tt.fields.globalReadOnlyCell,
factory: tt.fields.factory,
cellConns: tt.fields.cellConns,
}
got, err := ts.GetServingShards(tt.args.ctx, tt.args.keyspace)
if (err != nil) != tt.wantErr {
t.Errorf("Server.GetServingShards() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Server.GetServingShards() = %v, want %v", got, tt.want)
}
})
}
}
12 changes: 9 additions & 3 deletions go/vt/topo/test/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/stretchr/testify/require"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -82,12 +81,19 @@ func checkShard(t *testing.T, ctx context.Context, ts *topo.Server) {
t.Fatalf("shard.PrimaryAlias = %v, want %v", si.Shard.PrimaryAlias, other)
}

// Test FindAllShardsInKeyspace.
require.NoError(t, err)
shards, err := ts.FindAllShardsInKeyspace(ctx, "test_keyspace", &topo.FindAllShardsInKeyspaceOptions{
Concurrency: topo.DefaultConcurrency,
})
require.NoError(t, err)

// test GetShardNames
shards, err := ts.GetShardNames(ctx, "test_keyspace")
shardNames, err := ts.GetShardNames(ctx, "test_keyspace")
if err != nil {
t.Errorf("GetShardNames: %v", err)
}
if len(shards) != 1 || shards[0] != "b0-c0" {
if len(shards) != 1 || shardNames[0] != "b0-c0" {
t.Errorf(`GetShardNames: want [ "b0-c0" ], got %v`, shards)
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/endtoend/onlineddl_show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func onlineDDLTest(t *testing.T, args []string, expectedQuery string) {
assert.NotEmpty(t, err.Error())
containsExpectedError := false
expectedErrors := []string{
"unable to get shard names for keyspace",
"unable to get shards for keyspace",
"no ExecuteFetchAsDba results on fake TabletManagerClient",
}
for _, expect := range expectedErrors {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestVDiffCreate(t *testing.T) {
{
name: "no values",
req: &vtctldatapb.VDiffCreateRequest{},
wantErr: "node doesn't exist: keyspaces/shards", // We did not provide any keyspace or shard
wantErr: "FindAllShardsInKeyspace(): List: node doesn't exist: keyspaces/shards", // We did not provide any keyspace or shard
},
}
for _, tt := range tests {
Expand Down
19 changes: 8 additions & 11 deletions go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,9 @@ func getMigrationID(targetKeyspace string, shardTablets []string) (int64, error)
//
// It returns ErrNoStreams if there are no targets found for the workflow.
func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, targetKeyspace string, workflow string) (*TargetInfo, error) {
targetShards, err := ts.GetShardNames(ctx, targetKeyspace)
targetShards, err := ts.FindAllShardsInKeyspace(ctx, targetKeyspace, &topo.FindAllShardsInKeyspaceOptions{
Concurrency: topo.DefaultConcurrency,
})
if err != nil {
return nil, err
}
Expand All @@ -344,18 +346,13 @@ func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManag
// stream. For example, if we're splitting -80 to [-40,40-80], only those
// two target shards will have vreplication streams, and the other shards in
// the target keyspace will not.
for _, targetShard := range targetShards {
si, err := ts.GetShard(ctx, targetKeyspace, targetShard)
if err != nil {
return nil, err
}

if si.PrimaryAlias == nil {
for targetShardName, targetShard := range targetShards {
if targetShard.PrimaryAlias == nil {
// This can happen if bad inputs are given.
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "shard %v/%v doesn't have a primary set", targetKeyspace, targetShard)
}

primary, err := ts.GetTablet(ctx, si.PrimaryAlias)
primary, err := ts.GetTablet(ctx, targetShard.PrimaryAlias)
if err != nil {
return nil, err
}
Expand All @@ -372,7 +369,7 @@ func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManag
}

target := &MigrationTarget{
si: si,
si: targetShard,
primary: primary,
Sources: make(map[int32]*binlogdatapb.BinlogSource),
}
Expand All @@ -389,7 +386,7 @@ func BuildTargets(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManag
target.Sources[stream.Id] = stream.Bls
}

targets[targetShard] = target
targets[targetShardName] = target
}

if len(targets) == 0 {
Expand Down
37 changes: 19 additions & 18 deletions go/vt/wrangler/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ package wrangler

import (
"fmt"
"sort"
"sync"

"context"

"golang.org/x/exp/maps"

"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"

tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
Expand Down Expand Up @@ -101,48 +103,47 @@ func (wr *Wrangler) ValidatePermissionsShard(ctx context.Context, keyspace, shar
// ValidatePermissionsKeyspace validates all the permissions are the same
// in a keyspace
func (wr *Wrangler) ValidatePermissionsKeyspace(ctx context.Context, keyspace string) error {
// find all the shards
shards, err := wr.ts.GetShardNames(ctx, keyspace)
// Find all the shards.
shards, err := wr.ts.FindAllShardsInKeyspace(ctx, keyspace, &topo.FindAllShardsInKeyspaceOptions{
Concurrency: topo.DefaultConcurrency, // Limit concurrency to avoid overwhelming the topo server.
})
if err != nil {
return err
}

// corner cases
// Corner cases.
if len(shards) == 0 {
return fmt.Errorf("no shards in keyspace %v", keyspace)
}
sort.Strings(shards)
if len(shards) == 1 {
return wr.ValidatePermissionsShard(ctx, keyspace, shards[0])
return wr.ValidatePermissionsShard(ctx, keyspace, maps.Keys(shards)[0])
}

// find the reference permissions using the first shard's primary
si, err := wr.ts.GetShard(ctx, keyspace, shards[0])
if err != nil {
return err
}
if !si.HasPrimary() {
return fmt.Errorf("no primary in shard %v/%v", keyspace, shards[0])
// Find the reference permissions using the first shard's primary.
shardName := maps.Keys(shards)[0]
shard := shards[shardName]
if !shard.HasPrimary() {
return fmt.Errorf("no primary in shard %v/%v", keyspace, shardName)
}
referenceAlias := si.PrimaryAlias
referenceAlias := shard.PrimaryAlias
log.Infof("Gathering permissions for reference primary %v", topoproto.TabletAliasString(referenceAlias))
referencePermissions, err := wr.GetPermissions(ctx, si.PrimaryAlias)
referencePermissions, err := wr.GetPermissions(ctx, shard.PrimaryAlias)
if err != nil {
return err
}

// then diff with all tablets but primary 0
er := concurrency.AllErrorRecorder{}
wg := sync.WaitGroup{}
for _, shard := range shards {
aliases, err := wr.ts.FindAllTabletAliasesInShard(ctx, keyspace, shard)
for shardName, shard := range shards {
aliases, err := wr.ts.FindAllTabletAliasesInShard(ctx, keyspace, shardName)
if err != nil {
er.RecordError(err)
continue
}

for _, alias := range aliases {
if topoproto.TabletAliasEqual(alias, si.PrimaryAlias) {
if topoproto.TabletAliasEqual(alias, shard.PrimaryAlias) {
continue
}

Expand Down

0 comments on commit b6286ff

Please sign in to comment.