diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 43b2f96a064..3bcff5a02a1 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -24,7 +24,11 @@ import ( "sync" "time" + "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" + "vitess.io/vitess/go/ptr" + "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/vt/concurrency" @@ -45,6 +49,7 @@ import ( tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) const ( @@ -54,6 +59,7 @@ const ( // For automatically created sequence tables, use a standard format // of tableName_seq. autoSequenceTableFormat = "%s_seq" + getNonEmptyTableQuery = "select 1 from %s limit 1" ) type materializer struct { @@ -290,6 +296,15 @@ func (mz *materializer) deploySchema() error { } } + // Check if any table being moved is already non-empty in the target keyspace. + // Skip this check for multi-tenant migrations. + if !mz.IsMultiTenantMigration() { + err := mz.validateEmptyTables() + if err != nil { + return vterrors.Wrap(err, "failed to validate that all target tables are empty") + } + } + err := forAllShards(mz.targetShards, func(target *topo.ShardInfo) error { allTables := []string{"/.*/"} @@ -544,6 +559,66 @@ func (mz *materializer) buildMaterializer() error { return nil } +// validateEmptyTables checks if all tables are empty across all target shards. +// It queries each shard's primary tablet and if any non-empty table is found, +// returns an error containing a list of non-empty tables. +func (mz *materializer) validateEmptyTables() error { + var mu sync.Mutex + isNonEmptyTable := map[string]bool{} + + err := forAllShards(mz.targetShards, func(shard *topo.ShardInfo) error { + primary := shard.PrimaryAlias + if primary == nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "no primary tablet found for shard %s/%s", shard.Keyspace(), shard.ShardName()) + } + + ti, err := mz.ts.GetTablet(mz.ctx, primary) + if err != nil { + return err + } + + eg, groupCtx := errgroup.WithContext(mz.ctx) + eg.SetLimit(20) + + for _, ts := range mz.ms.TableSettings { + eg.Go(func() error { + table, err := sqlescape.EnsureEscaped(ts.TargetTable) + if err != nil { + return err + } + query := fmt.Sprintf(getNonEmptyTableQuery, table) + res, err := mz.tmc.ExecuteFetchAsAllPrivs(groupCtx, ti.Tablet, &tabletmanagerdatapb.ExecuteFetchAsAllPrivsRequest{ + Query: []byte(query), + MaxRows: 1, + }) + // Ignore table not found error + if err != nil && !IsTableDidNotExistError(err) { + return err + } + if res != nil && len(res.Rows) > 0 { + mu.Lock() + isNonEmptyTable[ts.TargetTable] = true + mu.Unlock() + } + return nil + }) + } + if err = eg.Wait(); err != nil { + return err + } + return nil + }) + if err != nil { + return err + } + + nonEmptyTables := maps.Keys(isNonEmptyTable) + if len(nonEmptyTables) > 0 { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "non-empty tables found in target keyspace(%s): %s", mz.ms.TargetKeyspace, strings.Join(nonEmptyTables, ", ")) + } + return nil +} + func (mz *materializer) startStreams(ctx context.Context) error { return forAllShards(mz.targetShards, func(target *topo.ShardInfo) error { targetPrimary, err := mz.ts.GetTablet(ctx, target.PrimaryAlias) diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index 500558bdd32..20374f7ef46 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -224,6 +225,7 @@ type testMaterializerTMClient struct { mu sync.Mutex vrQueries map[int][]*queryResult + fetchAsAllPrivsQueries map[int]map[string]*queryResult createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse // Used to confirm the number of times WorkflowDelete was called. @@ -243,6 +245,7 @@ func newTestMaterializerTMClient(keyspace string, sourceShards []string, tableSe sourceShards: sourceShards, tableSettings: tableSettings, vrQueries: make(map[int][]*queryResult), + fetchAsAllPrivsQueries: make(map[int]map[string]*queryResult), createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse), getSchemaResponses: make(map[uint32]*tabletmanagerdatapb.SchemaDefinition), } @@ -370,6 +373,20 @@ func (tmc *testMaterializerTMClient) expectVRQuery(tabletID int, query string, r }) } +func (tmc *testMaterializerTMClient) expectFetchAsAllPrivsQuery(tabletID int, query string, result *sqltypes.Result) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + if tmc.fetchAsAllPrivsQueries[tabletID] == nil { + tmc.fetchAsAllPrivsQueries[tabletID] = make(map[string]*queryResult) + } + + tmc.fetchAsAllPrivsQueries[tabletID][query] = &queryResult{ + query: query, + result: sqltypes.ResultToProto3(result), + } +} + func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *createVReplicationWorkflowRequestResponse) { tmc.mu.Lock() defer tmc.mu.Unlock() @@ -420,7 +437,16 @@ func (tmc *testMaterializerTMClient) ExecuteFetchAsDba(ctx context.Context, tabl } func (tmc *testMaterializerTMClient) ExecuteFetchAsAllPrivs(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ExecuteFetchAsAllPrivsRequest) (*querypb.QueryResult, error) { - return nil, nil + tmc.mu.Lock() + defer tmc.mu.Unlock() + + if resultsForTablet, ok := tmc.fetchAsAllPrivsQueries[int(tablet.Alias.Uid)]; ok { + if result, ok := resultsForTablet[string(req.Query)]; ok { + return result.result, result.err + } + } + + return nil, fmt.Errorf("%w: no ExecuteFetchAsAllPrivs result set for tablet %d", assert.AnError, int(tablet.Alias.Uid)) } // Note: ONLY breaks up change.SQL into individual statements and executes it. Does NOT fully implement ApplySchema. diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 20aa8b6df5c..fa3f25ca917 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/exp/maps" "google.golang.org/protobuf/proto" @@ -32,6 +33,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vtgate/vindexes" @@ -51,6 +53,7 @@ const ( mzGetCopyState = "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1" mzGetLatestCopyState = "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)" insertPrefix = `/insert into _vt.vreplication\(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys, options\) values ` + getNonEmptyTable = "select 1 from `t1` limit 1" ) var ( @@ -520,6 +523,7 @@ func TestMigrateVSchema(t *testing.T) { defer env.close() env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{}) + env.tmc.expectFetchAsAllPrivsQuery(200, getNonEmptyTable, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzGetCopyState, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzGetLatestCopyState, &sqltypes.Result{}) @@ -578,6 +582,7 @@ func TestMoveTablesDDLFlag(t *testing.T) { // a circular dependency. // The TabletManager portion is tested in rpc_vreplication_test.go. env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{}) + env.tmc.expectFetchAsAllPrivsQuery(200, getNonEmptyTable, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzGetCopyState, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzGetLatestCopyState, &sqltypes.Result{}) @@ -610,6 +615,7 @@ func TestMoveTablesDDLFlag(t *testing.T) { func TestShardedAutoIncHandling(t *testing.T) { tableName := "t1" tableDDL := fmt.Sprintf("create table %s (id int not null auto_increment primary key, c1 varchar(10))", tableName) + validateEmptyTableQuery := fmt.Sprintf("select 1 from `%s` limit 1", tableName) ms := &vtctldatapb.MaterializeSettings{ Workflow: "workflow", SourceKeyspace: "sourceks", @@ -623,14 +629,15 @@ func TestShardedAutoIncHandling(t *testing.T) { } type testcase struct { - name string - value vtctldatapb.ShardedAutoIncrementHandling - globalKeyspace string - targetShards []string - targetVSchema *vschemapb.Keyspace - wantTargetVSchema *vschemapb.Keyspace - expectQueries []string - expectErr string + name string + value vtctldatapb.ShardedAutoIncrementHandling + globalKeyspace string + targetShards []string + targetVSchema *vschemapb.Keyspace + wantTargetVSchema *vschemapb.Keyspace + expectQueries []string + expectAllPrivsQueries []string + expectErr string } testcases := []testcase{ { @@ -707,6 +714,9 @@ func TestShardedAutoIncHandling(t *testing.T) { expectQueries: []string{ tableDDL, // Unchanged }, + expectAllPrivsQueries: []string{ + validateEmptyTableQuery, + }, }, { name: "remove", @@ -755,6 +765,9 @@ func TestShardedAutoIncHandling(t *testing.T) { c1 varchar(10) )`, tableName), }, + expectAllPrivsQueries: []string{ + validateEmptyTableQuery, + }, }, { name: "replace, but vschema AutoIncrement already in place", @@ -811,6 +824,9 @@ func TestShardedAutoIncHandling(t *testing.T) { c1 varchar(10) )`, tableName), }, + expectAllPrivsQueries: []string{ + validateEmptyTableQuery, + }, }, { name: "replace", @@ -863,6 +879,9 @@ func TestShardedAutoIncHandling(t *testing.T) { c1 varchar(10) )`, tableName), }, + expectAllPrivsQueries: []string{ + validateEmptyTableQuery, + }, }, } @@ -882,6 +901,9 @@ func TestShardedAutoIncHandling(t *testing.T) { for _, query := range tc.expectQueries { env.tmc.expectVRQuery(uid, query, &sqltypes.Result{}) } + for _, query := range tc.expectAllPrivsQueries { + env.tmc.expectFetchAsAllPrivsQuery(uid, query, &sqltypes.Result{}) + } env.tmc.expectVRQuery(uid, mzGetCopyState, &sqltypes.Result{}) env.tmc.expectVRQuery(uid, mzGetLatestCopyState, &sqltypes.Result{}) env.tmc.SetGetSchemaResponse(uid, &tabletmanagerdatapb.SchemaDefinition{}) // So that the schema is copied from the source @@ -943,6 +965,7 @@ func TestMoveTablesNoRoutingRules(t *testing.T) { // a circular dependency. // The TabletManager portion is tested in rpc_vreplication_test.go. env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{}) + env.tmc.expectFetchAsAllPrivsQuery(200, getNonEmptyTable, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzGetCopyState, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzGetLatestCopyState, &sqltypes.Result{}) @@ -1061,6 +1084,7 @@ func TestCreateLookupVindexFull(t *testing.T) { } env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{}) + env.tmc.expectFetchAsAllPrivsQuery(200, "select 1 from `lookup` limit 1", &sqltypes.Result{}) env.tmc.expectVRQuery(200, "/CREATE TABLE `lookup`", &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzGetCopyState, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzGetLatestCopyState, &sqltypes.Result{}) @@ -2500,12 +2524,13 @@ func TestCreateLookupVindexFailures(t *testing.T) { require.NoError(t, err) testcases := []struct { - description string - input *vschemapb.Keyspace - createRequest *createVReplicationWorkflowRequestResponse - vrepExecQueries []string - schemaAdditions []*tabletmanagerdatapb.TableDefinition - err string + description string + input *vschemapb.Keyspace + createRequest *createVReplicationWorkflowRequestResponse + vrepExecQueries []string + fetchAsAllPrivsQueries []string + schemaAdditions []*tabletmanagerdatapb.TableDefinition + err string }{ { description: "dup vindex", @@ -2777,7 +2802,12 @@ func TestCreateLookupVindexFailures(t *testing.T) { }, }, }, - vrepExecQueries: []string{"CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)"}, + vrepExecQueries: []string{ + "CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)", + }, + fetchAsAllPrivsQueries: []string{ + "select 1 from `t1_lkp` limit 1", + }, createRequest: &createVReplicationWorkflowRequestResponse{ req: nil, // We don't care about defining it in this case res: &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{}, @@ -2810,6 +2840,9 @@ func TestCreateLookupVindexFailures(t *testing.T) { for _, vrq := range tcase.vrepExecQueries { env.tmc.expectVRQuery(int(tablet.Alias.Uid), vrq, &sqltypes.Result{}) } + for _, q := range tcase.fetchAsAllPrivsQueries { + env.tmc.expectFetchAsAllPrivsQuery(int(tablet.Alias.Uid), q, &sqltypes.Result{}) + } if tcase.createRequest != nil { env.tmc.expectCreateVReplicationWorkflowRequest(tablet.Alias.Uid, tcase.createRequest) } @@ -3096,6 +3129,7 @@ func TestKeyRangesEqualOptimization(t *testing.T) { if tablet.Keyspace != targetKs || tablet.Type != topodatapb.TabletType_PRIMARY { continue } + env.tmc.expectFetchAsAllPrivsQuery(int(tablet.Alias.Uid), getNonEmptyTable, &sqltypes.Result{}) // If we are doing a partial MoveTables, we will only perform the workflow // stream creation / INSERT statment on the shard(s) we're migrating. if len(tc.moveTablesReq.SourceShards) > 0 && !slices.Contains(tc.moveTablesReq.SourceShards, tablet.Shard) { @@ -3131,3 +3165,121 @@ func TestKeyRangesEqualOptimization(t *testing.T) { }) } } + +func TestValidateEmptyTables(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts := memorytopo.NewServer(ctx, "zone1") + defer ts.Close() + + ks := "test_keyspace" + shard1 := "-40" + shard2 := "40-80" + shard3 := "80-" + err := ts.CreateKeyspace(ctx, ks, &topodatapb.Keyspace{}) + require.NoError(t, err) + + err = ts.CreateShard(ctx, ks, shard1) + require.NoError(t, err) + err = ts.CreateShard(ctx, ks, shard2) + require.NoError(t, err) + err = ts.CreateShard(ctx, ks, shard3) + require.NoError(t, err) + + tablet1 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Keyspace: ks, + Shard: shard1, + } + tablet2 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 200, + }, + Keyspace: ks, + Shard: shard2, + } + tablet3 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 300, + }, + Keyspace: ks, + Shard: shard3, + } + err = ts.CreateTablet(ctx, tablet1) + require.NoError(t, err) + err = ts.CreateTablet(ctx, tablet2) + require.NoError(t, err) + err = ts.CreateTablet(ctx, tablet3) + require.NoError(t, err) + + s1, err := ts.UpdateShardFields(ctx, ks, shard1, func(si *topo.ShardInfo) error { + si.Shard.PrimaryAlias = tablet1.Alias + return nil + }) + require.NoError(t, err) + s2, err := ts.UpdateShardFields(ctx, ks, shard2, func(si *topo.ShardInfo) error { + si.Shard.PrimaryAlias = tablet2.Alias + return nil + }) + require.NoError(t, err) + s3, err := ts.UpdateShardFields(ctx, ks, shard3, func(si *topo.ShardInfo) error { + si.Shard.PrimaryAlias = tablet3.Alias + return nil + }) + require.NoError(t, err) + + tableSettings := []*vtctldatapb.TableMaterializeSettings{ + { + TargetTable: "table1", + }, + { + TargetTable: "table2", + }, + { + TargetTable: "table3", + }, + } + + tmc := newTestMaterializerTMClient(ks, []string{shard1, shard2, shard3}, tableSettings) + + table1Query := "select 1 from `table1` limit 1" + table2Query := "select 1 from `table2` limit 1" + table3Query := "select 1 from `table3` limit 1" + nonEmptyTableResult := &sqltypes.Result{Rows: []sqltypes.Row{{sqltypes.NewInt64(1)}}} + + tmc.expectFetchAsAllPrivsQuery(int(tablet1.Alias.Uid), table1Query, &sqltypes.Result{}) + tmc.expectFetchAsAllPrivsQuery(int(tablet2.Alias.Uid), table1Query, &sqltypes.Result{}) + tmc.expectFetchAsAllPrivsQuery(int(tablet3.Alias.Uid), table1Query, nonEmptyTableResult) + tmc.expectFetchAsAllPrivsQuery(int(tablet1.Alias.Uid), table2Query, &sqltypes.Result{}) + tmc.expectFetchAsAllPrivsQuery(int(tablet2.Alias.Uid), table2Query, &sqltypes.Result{}) + tmc.expectFetchAsAllPrivsQuery(int(tablet3.Alias.Uid), table2Query, &sqltypes.Result{}) + tmc.expectFetchAsAllPrivsQuery(int(tablet1.Alias.Uid), table3Query, nonEmptyTableResult) + tmc.expectFetchAsAllPrivsQuery(int(tablet2.Alias.Uid), table3Query, nonEmptyTableResult) + tmc.expectFetchAsAllPrivsQuery(int(tablet3.Alias.Uid), table3Query, nonEmptyTableResult) + + ms := &vtctldatapb.MaterializeSettings{ + TargetKeyspace: ks, + TableSettings: tableSettings, + } + + mz := &materializer{ + ctx: ctx, + ts: ts, + sourceTs: ts, + tmc: tmc, + ms: ms, + targetShards: []*topo.ShardInfo{s1, s2, s3}, + } + + err = mz.validateEmptyTables() + + assert.ErrorContains(t, err, "table1") + assert.NotContains(t, err.Error(), "table2") + // Check if the error message doesn't include duplicate tables + assert.Equal(t, strings.Count(err.Error(), "table3"), 1) +} diff --git a/go/vt/vtctl/workflow/utils_test.go b/go/vt/vtctl/workflow/utils_test.go index b315e1aa991..2094421e3c2 100644 --- a/go/vt/vtctl/workflow/utils_test.go +++ b/go/vt/vtctl/workflow/utils_test.go @@ -16,11 +16,12 @@ import ( "vitess.io/vitess/go/testfiles" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/proto/vtctldata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/etcd2topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topotools" + + "vitess.io/vitess/go/vt/proto/vtctldata" ) // TestCreateDefaultShardRoutingRules confirms that the default shard routing rules are created correctly for sharded diff --git a/go/vt/vttablet/tabletmanager/framework_test.go b/go/vt/vttablet/tabletmanager/framework_test.go index a6c15d604b9..6376f5ee2a7 100644 --- a/go/vt/vttablet/tabletmanager/framework_test.go +++ b/go/vt/vttablet/tabletmanager/framework_test.go @@ -484,8 +484,8 @@ func (tmc *fakeTMClient) VReplicationExec(ctx context.Context, tablet *topodatap } for qry, res := range tmc.vreQueries[int(tablet.Alias.Uid)] { if strings.HasPrefix(qry, "/") { - re := regexp.MustCompile(qry) - if re.MatchString(qry) { + re := regexp.MustCompile(qry[1:]) + if re.MatchString(query) { return res, nil } }