diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 3bcff5a02a1..3f4115579eb 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -31,7 +31,6 @@ import ( "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/textutil" - "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl/tmutils" @@ -144,7 +143,7 @@ func (mz *materializer) createWorkflowStreams(req *tabletmanagerdatapb.CreateVRe return err } - return mz.forAllTargets(func(target *topo.ShardInfo) error { + return forAllShards(mz.targetShards, func(target *topo.ShardInfo) error { targetPrimary, err := mz.ts.GetTablet(mz.ctx, target.PrimaryAlias) if err != nil { return vterrors.Wrapf(err, "GetTablet(%v) failed", target.PrimaryAlias) @@ -638,29 +637,12 @@ func (mz *materializer) startStreams(ctx context.Context) error { }) } -func (mz *materializer) forAllTargets(f func(*topo.ShardInfo) error) error { - var wg sync.WaitGroup - allErrors := &concurrency.AllErrorRecorder{} - for _, target := range mz.targetShards { - wg.Add(1) - go func(target *topo.ShardInfo) { - defer wg.Done() - - if err := f(target); err != nil { - allErrors.RecordError(err) - } - }(target) - } - wg.Wait() - return allErrors.AggrError(vterrors.Aggregate) -} - // checkTZConversion is a light-weight consistency check to validate that, if a source time zone is specified to MoveTables, // that the current primary has the time zone loaded in order to run the convert_tz() function used by VReplication to do the // datetime conversions. We only check the current primaries on each shard and note here that it is possible a new primary // gets elected: in this case user will either see errors during vreplication or vdiff will report mismatches. func (mz *materializer) checkTZConversion(ctx context.Context, tz string) error { - err := mz.forAllTargets(func(target *topo.ShardInfo) error { + err := forAllShards(mz.targetShards, func(target *topo.ShardInfo) error { targetPrimary, err := mz.ts.GetTablet(ctx, target.PrimaryAlias) if err != nil { return vterrors.Wrapf(err, "GetTablet(%v) failed", target.PrimaryAlias) diff --git a/go/vt/vtctl/workflow/resharder.go b/go/vt/vtctl/workflow/resharder.go index 57c860bdfaf..e3f7380af69 100644 --- a/go/vt/vtctl/workflow/resharder.go +++ b/go/vt/vtctl/workflow/resharder.go @@ -26,7 +26,6 @@ import ( "time" "vitess.io/vitess/go/ptr" - "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/proto/vtctldata" @@ -146,7 +145,7 @@ func (s *Server) buildResharder(ctx context.Context, req *vtctldata.ReshardCreat // VReplication workflow streams as that is an invalid starting // state for the non-serving shards involved in a Reshard. func (rs *resharder) validateTargets(ctx context.Context) error { - err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error { + err := forAllShards(rs.targetShards, func(target *topo.ShardInfo) error { targetPrimary := rs.targetPrimaries[target.ShardName()] res, err := rs.s.tmc.HasVReplicationWorkflows(ctx, targetPrimary.Tablet, &tabletmanagerdatapb.HasVReplicationWorkflowsRequest{}) if err != nil { @@ -162,7 +161,7 @@ func (rs *resharder) validateTargets(ctx context.Context) error { func (rs *resharder) readRefStreams(ctx context.Context) error { var mu sync.Mutex - err := rs.forAll(rs.sourceShards, func(source *topo.ShardInfo) error { + err := forAllShards(rs.sourceShards, func(source *topo.ShardInfo) error { sourcePrimary := rs.sourcePrimaries[source.ShardName()] req := &tabletmanagerdatapb.ReadVReplicationWorkflowsRequest{ @@ -268,7 +267,7 @@ func (rs *resharder) identifyRuleType(rule *binlogdatapb.Rule) (StreamType, erro func (rs *resharder) copySchema(ctx context.Context) error { oneSource := rs.sourceShards[0].PrimaryAlias - err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error { + err := forAllShards(rs.targetShards, func(target *topo.ShardInfo) error { return rs.s.CopySchemaShard(ctx, oneSource, []string{"/.*"}, nil, false, rs.keyspace, target.ShardName(), 1*time.Second, false) }) return err @@ -287,7 +286,7 @@ func (rs *resharder) createStreams(ctx context.Context) error { } } - err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error { + err := forAllShards(rs.targetShards, func(target *topo.ShardInfo) error { targetPrimary := rs.targetPrimaries[target.ShardName()] ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, targetPrimary.DbName()) @@ -339,7 +338,7 @@ func (rs *resharder) createStreams(ctx context.Context) error { } func (rs *resharder) startStreams(ctx context.Context) error { - err := rs.forAll(rs.targetShards, func(target *topo.ShardInfo) error { + err := forAllShards(rs.targetShards, func(target *topo.ShardInfo) error { targetPrimary := rs.targetPrimaries[target.ShardName()] // This is the rare case where we truly want to update every stream/record // because we've already confirmed that there were no existing workflows @@ -357,20 +356,3 @@ func (rs *resharder) startStreams(ctx context.Context) error { }) return err } - -func (rs *resharder) forAll(shards []*topo.ShardInfo, f func(*topo.ShardInfo) error) error { - var wg sync.WaitGroup - allErrors := &concurrency.AllErrorRecorder{} - for _, shard := range shards { - wg.Add(1) - go func(shard *topo.ShardInfo) { - defer wg.Done() - - if err := f(shard); err != nil { - allErrors.RecordError(err) - } - }(shard) - } - wg.Wait() - return allErrors.AggrError(vterrors.Aggregate) -} diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 1828b0af814..b47d99118ce 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1198,25 +1198,7 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L return nil, err } - // Create a parallelizer function. - forAllTargets := func(f func(*topo.ShardInfo) error) error { - var wg sync.WaitGroup - allErrors := &concurrency.AllErrorRecorder{} - for _, targetShard := range targetShards { - wg.Add(1) - go func(targetShard *topo.ShardInfo) { - defer wg.Done() - - if err := f(targetShard); err != nil { - allErrors.RecordError(err) - } - }(targetShard) - } - wg.Wait() - return allErrors.AggrError(vterrors.Aggregate) - } - - err = forAllTargets(func(targetShard *topo.ShardInfo) error { + err = forAllShards(targetShards, func(targetShard *topo.ShardInfo) error { targetPrimary, err := s.ts.GetTablet(ctx, targetShard.PrimaryAlias) if err != nil { return err @@ -1412,7 +1394,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl return nil, err } if len(tables) > 0 { - err = s.validateSourceTablesExist(ctx, sourceKeyspace, ksTables, tables) + err = validateSourceTablesExist(ctx, sourceKeyspace, ksTables, tables) if err != nil { return nil, err } @@ -1424,7 +1406,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl } } if len(req.ExcludeTables) > 0 { - err = s.validateSourceTablesExist(ctx, sourceKeyspace, ksTables, req.ExcludeTables) + err = validateSourceTablesExist(ctx, sourceKeyspace, ksTables, req.ExcludeTables) if err != nil { return nil, err } @@ -2506,32 +2488,6 @@ func (s *Server) WorkflowUpdate(ctx context.Context, req *vtctldatapb.WorkflowUp return response, nil } -// validateSourceTablesExist validates that tables provided are present -// in the source keyspace. -func (s *Server) validateSourceTablesExist(ctx context.Context, sourceKeyspace string, ksTables, tables []string) error { - var missingTables []string - for _, table := range tables { - if schema.IsInternalOperationTableName(table) { - continue - } - found := false - - for _, ksTable := range ksTables { - if table == ksTable { - found = true - break - } - } - if !found { - missingTables = append(missingTables, table) - } - } - if len(missingTables) > 0 { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "table(s) not found in source keyspace %s: %s", sourceKeyspace, strings.Join(missingTables, ",")) - } - return nil -} - // addTablesToVSchema adds tables to an (unsharded) vschema if they are not already defined. // If copyVSchema is true then we copy over the vschema table definitions from the source, // otherwise we create empty ones. @@ -2570,7 +2526,7 @@ func (s *Server) addTablesToVSchema(ctx context.Context, sourceKeyspace string, func (s *Server) collectTargetStreams(ctx context.Context, mz *materializer) ([]string, error) { var shardTablets []string var mu sync.Mutex - err := mz.forAllTargets(func(target *topo.ShardInfo) error { + err := forAllShards(mz.targetShards, func(target *topo.ShardInfo) error { var err error targetPrimary, err := s.ts.GetTablet(ctx, target.PrimaryAlias) if err != nil { @@ -2596,30 +2552,13 @@ func (s *Server) collectTargetStreams(ctx context.Context, mz *materializer) ([] } func (s *Server) checkIfPreviousJournalExists(ctx context.Context, mz *materializer, migrationID int64) (bool, []string, error) { - forAllSources := func(f func(*topo.ShardInfo) error) error { - var wg sync.WaitGroup - allErrors := &concurrency.AllErrorRecorder{} - for _, sourceShard := range mz.sourceShards { - wg.Add(1) - go func(sourceShard *topo.ShardInfo) { - defer wg.Done() - - if err := f(sourceShard); err != nil { - allErrors.RecordError(err) - } - }(sourceShard) - } - wg.Wait() - return allErrors.AggrError(vterrors.Aggregate) - } - var ( mu sync.Mutex exists bool tablets []string ) - err := forAllSources(func(si *topo.ShardInfo) error { + err := forAllShards(mz.sourceShards, func(si *topo.ShardInfo) error { tablet, err := s.ts.GetTablet(ctx, si.PrimaryAlias) if err != nil { return err diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 5021e3938c8..2850dd1678e 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -1019,3 +1019,29 @@ func applyTargetShards(ts *trafficSwitcher, targetShards []string) error { } return nil } + +// validateSourceTablesExist validates that tables provided are present +// in the source keyspace. +func validateSourceTablesExist(ctx context.Context, sourceKeyspace string, ksTables, tables []string) error { + var missingTables []string + for _, table := range tables { + if schema.IsInternalOperationTableName(table) { + continue + } + found := false + + for _, ksTable := range ksTables { + if table == ksTable { + found = true + break + } + } + if !found { + missingTables = append(missingTables, table) + } + } + if len(missingTables) > 0 { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "table(s) not found in source keyspace %s: %s", sourceKeyspace, strings.Join(missingTables, ",")) + } + return nil +} diff --git a/go/vt/vtctl/workflow/utils_test.go b/go/vt/vtctl/workflow/utils_test.go index 2094421e3c2..8458cf60995 100644 --- a/go/vt/vtctl/workflow/utils_test.go +++ b/go/vt/vtctl/workflow/utils_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" @@ -244,3 +245,39 @@ func startEtcd(t *testing.T) string { return clientAddr } + +func TestValidateSourceTablesExist(t *testing.T) { + ctx := context.Background() + ks := "source_keyspace" + ksTables := []string{"table1", "table2"} + + testCases := []struct { + name string + tables []string + errContains string + }{ + { + name: "no error", + tables: []string{"table2"}, + }, + { + name: "ignore internal table", + tables: []string{"_vt_hld_6ace8bcef73211ea87e9f875a4d24e90_20200915120410_", "table1", "table2"}, + }, + { + name: "table not found error", + tables: []string{"table3", "table1", "table2"}, + errContains: "table3", + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := validateSourceTablesExist(ctx, ks, ksTables, tc.tables) + if tc.errContains != "" { + assert.ErrorContains(t, err, tc.errContains) + } else { + assert.NoError(t, err) + } + }) + } +}