Skip to content

Commit

Permalink
refac: Remove duplicate forAllShards methods from `vt/vtctl/workflo…
Browse files Browse the repository at this point in the history
…w` (#17025)

Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 authored Oct 31, 2024
1 parent 1b0a902 commit 1a4f2b9
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 109 deletions.
22 changes: 2 additions & 20 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
Expand Down Expand Up @@ -144,7 +143,7 @@ func (mz *materializer) createWorkflowStreams(req *tabletmanagerdatapb.CreateVRe
return err
}

return mz.forAllTargets(func(target *topo.ShardInfo) error {
return forAllShards(mz.targetShards, func(target *topo.ShardInfo) error {
targetPrimary, err := mz.ts.GetTablet(mz.ctx, target.PrimaryAlias)
if err != nil {
return vterrors.Wrapf(err, "GetTablet(%v) failed", target.PrimaryAlias)
Expand Down Expand Up @@ -638,29 +637,12 @@ func (mz *materializer) startStreams(ctx context.Context) error {
})
}

func (mz *materializer) forAllTargets(f func(*topo.ShardInfo) error) error {
var wg sync.WaitGroup
allErrors := &concurrency.AllErrorRecorder{}
for _, target := range mz.targetShards {
wg.Add(1)
go func(target *topo.ShardInfo) {
defer wg.Done()

if err := f(target); err != nil {
allErrors.RecordError(err)
}
}(target)
}
wg.Wait()
return allErrors.AggrError(vterrors.Aggregate)
}

// checkTZConversion is a light-weight consistency check to validate that, if a source time zone is specified to MoveTables,
// that the current primary has the time zone loaded in order to run the convert_tz() function used by VReplication to do the
// datetime conversions. We only check the current primaries on each shard and note here that it is possible a new primary
// gets elected: in this case user will either see errors during vreplication or vdiff will report mismatches.
func (mz *materializer) checkTZConversion(ctx context.Context, tz string) error {
err := mz.forAllTargets(func(target *topo.ShardInfo) error {
err := forAllShards(mz.targetShards, func(target *topo.ShardInfo) error {
targetPrimary, err := mz.ts.GetTablet(ctx, target.PrimaryAlias)
if err != nil {
return vterrors.Wrapf(err, "GetTablet(%v) failed", target.PrimaryAlias)
Expand Down
28 changes: 5 additions & 23 deletions go/vt/vtctl/workflow/resharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"time"

"vitess.io/vitess/go/ptr"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/proto/vtctldata"
Expand Down Expand Up @@ -146,7 +145,7 @@ func (s *Server) buildResharder(ctx context.Context, req *vtctldata.ReshardCreat
// VReplication workflow streams as that is an invalid starting
// state for the non-serving shards involved in a Reshard.
func (rs *resharder) validateTargets(ctx context.Context) error {
err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error {
err := forAllShards(rs.targetShards, func(target *topo.ShardInfo) error {
targetPrimary := rs.targetPrimaries[target.ShardName()]
res, err := rs.s.tmc.HasVReplicationWorkflows(ctx, targetPrimary.Tablet, &tabletmanagerdatapb.HasVReplicationWorkflowsRequest{})
if err != nil {
Expand All @@ -162,7 +161,7 @@ func (rs *resharder) validateTargets(ctx context.Context) error {

func (rs *resharder) readRefStreams(ctx context.Context) error {
var mu sync.Mutex
err := rs.forAll(rs.sourceShards, func(source *topo.ShardInfo) error {
err := forAllShards(rs.sourceShards, func(source *topo.ShardInfo) error {
sourcePrimary := rs.sourcePrimaries[source.ShardName()]

req := &tabletmanagerdatapb.ReadVReplicationWorkflowsRequest{
Expand Down Expand Up @@ -268,7 +267,7 @@ func (rs *resharder) identifyRuleType(rule *binlogdatapb.Rule) (StreamType, erro

func (rs *resharder) copySchema(ctx context.Context) error {
oneSource := rs.sourceShards[0].PrimaryAlias
err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error {
err := forAllShards(rs.targetShards, func(target *topo.ShardInfo) error {
return rs.s.CopySchemaShard(ctx, oneSource, []string{"/.*"}, nil, false, rs.keyspace, target.ShardName(), 1*time.Second, false)
})
return err
Expand All @@ -287,7 +286,7 @@ func (rs *resharder) createStreams(ctx context.Context) error {
}
}

err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error {
err := forAllShards(rs.targetShards, func(target *topo.ShardInfo) error {
targetPrimary := rs.targetPrimaries[target.ShardName()]

ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, targetPrimary.DbName())
Expand Down Expand Up @@ -339,7 +338,7 @@ func (rs *resharder) createStreams(ctx context.Context) error {
}

func (rs *resharder) startStreams(ctx context.Context) error {
err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error {
err := forAllShards(rs.targetShards, func(target *topo.ShardInfo) error {
targetPrimary := rs.targetPrimaries[target.ShardName()]
// This is the rare case where we truly want to update every stream/record
// because we've already confirmed that there were no existing workflows
Expand All @@ -357,20 +356,3 @@ func (rs *resharder) startStreams(ctx context.Context) error {
})
return err
}

func (rs *resharder) forAll(shards []*topo.ShardInfo, f func(*topo.ShardInfo) error) error {
var wg sync.WaitGroup
allErrors := &concurrency.AllErrorRecorder{}
for _, shard := range shards {
wg.Add(1)
go func(shard *topo.ShardInfo) {
defer wg.Done()

if err := f(shard); err != nil {
allErrors.RecordError(err)
}
}(shard)
}
wg.Wait()
return allErrors.AggrError(vterrors.Aggregate)
}
71 changes: 5 additions & 66 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1198,25 +1198,7 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L
return nil, err
}

// Create a parallelizer function.
forAllTargets := func(f func(*topo.ShardInfo) error) error {
var wg sync.WaitGroup
allErrors := &concurrency.AllErrorRecorder{}
for _, targetShard := range targetShards {
wg.Add(1)
go func(targetShard *topo.ShardInfo) {
defer wg.Done()

if err := f(targetShard); err != nil {
allErrors.RecordError(err)
}
}(targetShard)
}
wg.Wait()
return allErrors.AggrError(vterrors.Aggregate)
}

err = forAllTargets(func(targetShard *topo.ShardInfo) error {
err = forAllShards(targetShards, func(targetShard *topo.ShardInfo) error {
targetPrimary, err := s.ts.GetTablet(ctx, targetShard.PrimaryAlias)
if err != nil {
return err
Expand Down Expand Up @@ -1412,7 +1394,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
return nil, err
}
if len(tables) > 0 {
err = s.validateSourceTablesExist(ctx, sourceKeyspace, ksTables, tables)
err = validateSourceTablesExist(ctx, sourceKeyspace, ksTables, tables)
if err != nil {
return nil, err
}
Expand All @@ -1424,7 +1406,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
}
}
if len(req.ExcludeTables) > 0 {
err = s.validateSourceTablesExist(ctx, sourceKeyspace, ksTables, req.ExcludeTables)
err = validateSourceTablesExist(ctx, sourceKeyspace, ksTables, req.ExcludeTables)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2506,32 +2488,6 @@ func (s *Server) WorkflowUpdate(ctx context.Context, req *vtctldatapb.WorkflowUp
return response, nil
}

// validateSourceTablesExist validates that tables provided are present
// in the source keyspace.
func (s *Server) validateSourceTablesExist(ctx context.Context, sourceKeyspace string, ksTables, tables []string) error {
var missingTables []string
for _, table := range tables {
if schema.IsInternalOperationTableName(table) {
continue
}
found := false

for _, ksTable := range ksTables {
if table == ksTable {
found = true
break
}
}
if !found {
missingTables = append(missingTables, table)
}
}
if len(missingTables) > 0 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "table(s) not found in source keyspace %s: %s", sourceKeyspace, strings.Join(missingTables, ","))
}
return nil
}

// addTablesToVSchema adds tables to an (unsharded) vschema if they are not already defined.
// If copyVSchema is true then we copy over the vschema table definitions from the source,
// otherwise we create empty ones.
Expand Down Expand Up @@ -2570,7 +2526,7 @@ func (s *Server) addTablesToVSchema(ctx context.Context, sourceKeyspace string,
func (s *Server) collectTargetStreams(ctx context.Context, mz *materializer) ([]string, error) {
var shardTablets []string
var mu sync.Mutex
err := mz.forAllTargets(func(target *topo.ShardInfo) error {
err := forAllShards(mz.targetShards, func(target *topo.ShardInfo) error {
var err error
targetPrimary, err := s.ts.GetTablet(ctx, target.PrimaryAlias)
if err != nil {
Expand All @@ -2596,30 +2552,13 @@ func (s *Server) collectTargetStreams(ctx context.Context, mz *materializer) ([]
}

func (s *Server) checkIfPreviousJournalExists(ctx context.Context, mz *materializer, migrationID int64) (bool, []string, error) {
forAllSources := func(f func(*topo.ShardInfo) error) error {
var wg sync.WaitGroup
allErrors := &concurrency.AllErrorRecorder{}
for _, sourceShard := range mz.sourceShards {
wg.Add(1)
go func(sourceShard *topo.ShardInfo) {
defer wg.Done()

if err := f(sourceShard); err != nil {
allErrors.RecordError(err)
}
}(sourceShard)
}
wg.Wait()
return allErrors.AggrError(vterrors.Aggregate)
}

var (
mu sync.Mutex
exists bool
tablets []string
)

err := forAllSources(func(si *topo.ShardInfo) error {
err := forAllShards(mz.sourceShards, func(si *topo.ShardInfo) error {
tablet, err := s.ts.GetTablet(ctx, si.PrimaryAlias)
if err != nil {
return err
Expand Down
26 changes: 26 additions & 0 deletions go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,3 +1019,29 @@ func applyTargetShards(ts *trafficSwitcher, targetShards []string) error {
}
return nil
}

// validateSourceTablesExist validates that tables provided are present
// in the source keyspace.
func validateSourceTablesExist(ctx context.Context, sourceKeyspace string, ksTables, tables []string) error {
var missingTables []string
for _, table := range tables {
if schema.IsInternalOperationTableName(table) {
continue
}
found := false

for _, ksTable := range ksTables {
if table == ksTable {
found = true
break
}
}
if !found {
missingTables = append(missingTables, table)
}
}
if len(missingTables) > 0 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "table(s) not found in source keyspace %s: %s", sourceKeyspace, strings.Join(missingTables, ","))
}
return nil
}
37 changes: 37 additions & 0 deletions go/vt/vtctl/workflow/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"

Expand Down Expand Up @@ -244,3 +245,39 @@ func startEtcd(t *testing.T) string {

return clientAddr
}

func TestValidateSourceTablesExist(t *testing.T) {
ctx := context.Background()
ks := "source_keyspace"
ksTables := []string{"table1", "table2"}

testCases := []struct {
name string
tables []string
errContains string
}{
{
name: "no error",
tables: []string{"table2"},
},
{
name: "ignore internal table",
tables: []string{"_vt_hld_6ace8bcef73211ea87e9f875a4d24e90_20200915120410_", "table1", "table2"},
},
{
name: "table not found error",
tables: []string{"table3", "table1", "table2"},
errContains: "table3",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := validateSourceTablesExist(ctx, ks, ksTables, tc.tables)
if tc.errContains != "" {
assert.ErrorContains(t, err, tc.errContains)
} else {
assert.NoError(t, err)
}
})
}
}

0 comments on commit 1a4f2b9

Please sign in to comment.