diff --git a/go/protoutil/binlogsource.go b/go/protoutil/binlogsource.go new file mode 100644 index 00000000000..385f472c202 --- /dev/null +++ b/go/protoutil/binlogsource.go @@ -0,0 +1,60 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protoutil + +import ( + "slices" + "sort" + "strings" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" +) + +// SortBinlogSourceTables sorts the table related contents of the +// BinlogSource struct lexicographically by table name in order to +// produce consistent results. +func SortBinlogSourceTables(bls *binlogdatapb.BinlogSource) { + if bls == nil { + return + } + + // Sort the tables by name to ensure a consistent order. + slices.Sort(bls.Tables) + + if bls.Filter == nil || len(bls.Filter.Rules) == 0 { + return + } + sort.Slice(bls.Filter.Rules, func(i, j int) bool { + // Exclude filters should logically be processed first. + if bls.Filter.Rules[i].Filter == "exclude" && bls.Filter.Rules[j].Filter != "exclude" { + return true + } + if bls.Filter.Rules[j].Filter == "exclude" && bls.Filter.Rules[i].Filter != "exclude" { + return false + } + + // Remove preceding slash from the match string. + // That is used when the filter is a regular expression. + fi, _ := strings.CutPrefix(bls.Filter.Rules[i].Match, "/") + fj, _ := strings.CutPrefix(bls.Filter.Rules[j].Match, "/") + if fi != fj { + return fi < fj + } + + return bls.Filter.Rules[i].Filter < bls.Filter.Rules[j].Filter + }) +} diff --git a/go/protoutil/binlogsource_test.go b/go/protoutil/binlogsource_test.go new file mode 100644 index 00000000000..fe5564535bd --- /dev/null +++ b/go/protoutil/binlogsource_test.go @@ -0,0 +1,209 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protoutil + +import ( + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" +) + +func TestSortBinlogSourceTables(t *testing.T) { + tests := []struct { + name string + inSource *binlogdatapb.BinlogSource + outSource *binlogdatapb.BinlogSource + }{ + { + name: "Basic", + inSource: &binlogdatapb.BinlogSource{ + Tables: []string{"wuts1", "atable", "1table", "ztable2", "table3"}, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "ztable2", + }, + { + Match: "table3", + }, + { + Match: "/wuts", + }, + { + Match: "1table", + Filter: "a", + }, + { + Match: "1table", + }, + { + Match: "atable", + }, + }, + }, + }, + outSource: &binlogdatapb.BinlogSource{ + Tables: []string{"1table", "atable", "table3", "wuts1", "ztable2"}, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "1table", + }, + { + Match: "1table", + Filter: "a", + }, + { + Match: "atable", + }, + { + Match: "table3", + }, + { + Match: "/wuts", + }, + { + Match: "ztable2", + }, + }, + }, + }, + }, + { + name: "With excludes", + inSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "./*", + }, + { + Match: "no4", + Filter: "exclude", + }, + { + Match: "no2", + Filter: "exclude", + }, + { + Match: "ztable2", + }, + { + Match: "atable2", + }, + }, + }, + }, + outSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "no2", + Filter: "exclude", + }, + { + Match: "no4", + Filter: "exclude", + }, + { + Match: "./*", + }, + { + Match: "atable2", + }, + { + Match: "ztable2", + }, + }, + }, + }, + }, + { + name: "With excludes", + inSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "no4", + Filter: "exclude", + }, + { + Match: "no2", + Filter: "exclude", + }, + { + Match: "./*", + }, + }, + }, + }, + outSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "no2", + Filter: "exclude", + }, + { + Match: "no4", + Filter: "exclude", + }, + { + Match: "./*", + }, + }, + }, + }, + }, + { + name: "Nil", + inSource: nil, + outSource: nil, + }, + { + name: "No filter", + inSource: &binlogdatapb.BinlogSource{ + Tables: []string{"wuts1", "atable", "1table", "ztable2", "table3"}, + Filter: nil, + }, + outSource: &binlogdatapb.BinlogSource{ + Tables: []string{"1table", "atable", "table3", "wuts1", "ztable2"}, + Filter: nil, + }, + }, + { + name: "No filter rules", + inSource: &binlogdatapb.BinlogSource{ + Tables: []string{"wuts1", "atable", "1table", "ztable2", "table3"}, + Filter: &binlogdatapb.Filter{}, + }, + outSource: &binlogdatapb.BinlogSource{ + Tables: []string{"1table", "atable", "table3", "wuts1", "ztable2"}, + Filter: &binlogdatapb.Filter{}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + SortBinlogSourceTables(tt.inSource) + require.True(t, proto.Equal(tt.inSource, tt.outSource), "got: %s, want: %s", tt.inSource.String(), tt.outSource.String()) + }) + } +} diff --git a/go/test/endtoend/vreplication/vdiff_helper_test.go b/go/test/endtoend/vreplication/vdiff_helper_test.go index e7d0b714834..91605bff402 100644 --- a/go/test/endtoend/vreplication/vdiff_helper_test.go +++ b/go/test/endtoend/vreplication/vdiff_helper_test.go @@ -90,13 +90,14 @@ func doVtctlclientVDiff(t *testing.T, keyspace, workflow, cells string, want *ex func waitForVDiff2ToComplete(t *testing.T, useVtctlclient bool, ksWorkflow, cells, uuid string, completedAtMin time.Time) *vdiffInfo { var info *vdiffInfo + var jsonStr string first := true previousProgress := vdiff2.ProgressReport{} ch := make(chan bool) go func() { for { time.Sleep(vdiffStatusCheckInterval) - _, jsonStr := performVDiff2Action(t, useVtctlclient, ksWorkflow, cells, "show", uuid, false) + _, jsonStr = performVDiff2Action(t, useVtctlclient, ksWorkflow, cells, "show", uuid, false) info = getVDiffInfo(jsonStr) require.NotNil(t, info) if info.State == "completed" { @@ -142,7 +143,7 @@ func waitForVDiff2ToComplete(t *testing.T, useVtctlclient bool, ksWorkflow, cell case <-ch: return info case <-time.After(vdiffTimeout): - log.Errorf("VDiff never completed for UUID %s", uuid) + log.Errorf("VDiff never completed for UUID %s. Latest output: %s", uuid, jsonStr) require.FailNow(t, fmt.Sprintf("VDiff never completed for UUID %s", uuid)) return nil } diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index f651f3bb25c..88c4093f451 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -34,14 +34,13 @@ import ( "time" "github.com/spf13/pflag" - - "vitess.io/vitess/go/mysql/replication" - "vitess.io/vitess/go/mysql/sqlerror" - "google.golang.org/protobuf/proto" "vitess.io/vitess/go/history" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/mysql/replication" + "vitess.io/vitess/go/mysql/sqlerror" + "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" @@ -606,6 +605,7 @@ func ReadVRSettings(dbClient DBClient, uid int32) (VRSettings, error) { // the _vt.vreplication table. func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, position string, maxTPS, maxReplicationLag, timeUpdated int64, dbName string, workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType, deferSecondaryKeys bool) string { + protoutil.SortBinlogSourceTables(source) return fmt.Sprintf("insert into _vt.vreplication "+ "(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys) "+ "values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d, %v)", @@ -616,6 +616,7 @@ func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, posi // CreateVReplicationState returns a statement to create a stopped vreplication. func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, position string, state binlogdatapb.VReplicationWorkflowState, dbName string, workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType) string { + protoutil.SortBinlogSourceTables(source) return fmt.Sprintf("insert into _vt.vreplication "+ "(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type) "+ "values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d)", diff --git a/go/vt/vtctl/workflow/resharder.go b/go/vt/vtctl/workflow/resharder.go index e36b546c1d2..18f10f25319 100644 --- a/go/vt/vtctl/workflow/resharder.go +++ b/go/vt/vtctl/workflow/resharder.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "slices" "sync" "time" @@ -281,8 +282,8 @@ func (rs *resharder) createStreams(ctx context.Context) error { ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, targetPrimary.DbName()) - // copy excludeRules to prevent data race. - copyExcludeRules := append([]*binlogdatapb.Rule(nil), excludeRules...) + // Clone excludeRules to prevent data races. + copyExcludeRules := slices.Clone(excludeRules) for _, source := range rs.sourceShards { if !key.KeyRangeIntersect(target.KeyRange, source.KeyRange) { continue diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication.go b/go/vt/vttablet/tabletmanager/rpc_vreplication.go index 0319146321a..ee1907005a8 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication.go @@ -23,6 +23,7 @@ import ( "google.golang.org/protobuf/encoding/prototext" "vitess.io/vitess/go/constants/sidecar" + "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/vt/discovery" @@ -57,6 +58,7 @@ func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *ta } res := &sqltypes.Result{} for _, bls := range req.BinlogSource { + protoutil.SortBinlogSourceTables(bls) source, err := prototext.Marshal(bls) if err != nil { return nil, err diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 5ef9b4cd8c6..1db4e02b67b 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -45,7 +45,7 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" - "vitess.io/vitess/go/vt/proto/vttime" + vttimepb "vitess.io/vitess/go/vt/proto/vttime" ) const ( @@ -167,6 +167,86 @@ func TestCreateVReplicationWorkflow(t *testing.T) { query: fmt.Sprintf(`%s values ('%s', 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"t1\" filter:\"select * from t1\"}} on_ddl:EXEC stop_after_copy:true source_time_zone:\"EDT\" target_time_zone:\"UTC\"', '', 0, 0, '%s', '', now(), 0, 'Stopped', '%s', 1, 0, 1)`, insertVReplicationPrefix, wf, sourceKs, shard, tenv.cells[0], tenv.dbName), }, + { + name: "binlog source order with include", + schema: &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: "zt", + Columns: []string{"id"}, + PrimaryKeyColumns: []string{"id"}, + Fields: sqltypes.MakeTestFields("id", "int64"), + }, + { + Name: "t1", + Columns: []string{"id", "c2"}, + PrimaryKeyColumns: []string{"id"}, + Fields: sqltypes.MakeTestFields("id|c2", "int64|int64"), + }, + { + Name: "wut", + Columns: []string{"id"}, + PrimaryKeyColumns: []string{"id"}, + Fields: sqltypes.MakeTestFields("id", "int64"), + }, + }, + }, + req: &vtctldatapb.MoveTablesCreateRequest{ + SourceKeyspace: sourceKs, + TargetKeyspace: targetKs, + Workflow: wf, + Cells: tenv.cells, + IncludeTables: []string{"zt", "wut", "t1"}, + SourceTimeZone: "EDT", + OnDdl: binlogdatapb.OnDDLAction_EXEC.String(), + StopAfterCopy: true, + DropForeignKeys: true, + DeferSecondaryKeys: true, + AutoStart: true, + }, + query: fmt.Sprintf(`%s values ('%s', 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"t1\" filter:\"select * from t1\"} rules:{match:\"wut\" filter:\"select * from wut\"} rules:{match:\"zt\" filter:\"select * from zt\"}} on_ddl:EXEC stop_after_copy:true source_time_zone:\"EDT\" target_time_zone:\"UTC\"', '', 0, 0, '%s', '', now(), 0, 'Stopped', '%s', 1, 0, 1)`, + insertVReplicationPrefix, wf, sourceKs, shard, tenv.cells[0], tenv.dbName), + }, + { + name: "binlog source order with all-tables", + schema: &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: "zt", + Columns: []string{"id"}, + PrimaryKeyColumns: []string{"id"}, + Fields: sqltypes.MakeTestFields("id", "int64"), + }, + { + Name: "t1", + Columns: []string{"id", "c2"}, + PrimaryKeyColumns: []string{"id"}, + Fields: sqltypes.MakeTestFields("id|c2", "int64|int64"), + }, + { + Name: "wut", + Columns: []string{"id"}, + PrimaryKeyColumns: []string{"id"}, + Fields: sqltypes.MakeTestFields("id", "int64"), + }, + }, + }, + req: &vtctldatapb.MoveTablesCreateRequest{ + SourceKeyspace: sourceKs, + TargetKeyspace: targetKs, + Workflow: wf, + Cells: tenv.cells, + AllTables: true, + SourceTimeZone: "EDT", + OnDdl: binlogdatapb.OnDDLAction_EXEC.String(), + StopAfterCopy: true, + DropForeignKeys: true, + DeferSecondaryKeys: true, + AutoStart: true, + }, + query: fmt.Sprintf(`%s values ('%s', 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"t1\" filter:\"select * from t1\"} rules:{match:\"wut\" filter:\"select * from wut\"} rules:{match:\"zt\" filter:\"select * from zt\"}} on_ddl:EXEC stop_after_copy:true source_time_zone:\"EDT\" target_time_zone:\"UTC\"', '', 0, 0, '%s', '', now(), 0, 'Stopped', '%s', 1, 0, 1)`, + insertVReplicationPrefix, wf, sourceKs, shard, tenv.cells[0], tenv.dbName), + }, } tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", @@ -426,7 +506,7 @@ func TestMoveTables(t *testing.T) { Keyspace: targetKs, Workflow: wf, Cells: tenv.cells, - MaxReplicationLagAllowed: &vttime.Duration{Seconds: 922337203}, + MaxReplicationLagAllowed: &vttimepb.Duration{Seconds: 922337203}, EnableReverseReplication: true, InitializeTargetSequences: true, Direction: int32(workflow.DirectionForward), @@ -447,7 +527,7 @@ func TestMoveTables(t *testing.T) { Keyspace: targetKs, Workflow: wf, Cells: tenv.cells, - MaxReplicationLagAllowed: &vttime.Duration{Seconds: 922337203}, + MaxReplicationLagAllowed: &vttimepb.Duration{Seconds: 922337203}, EnableReverseReplication: true, Direction: int32(workflow.DirectionBackward), }) @@ -476,7 +556,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { } selectQuery, err := parsed.GenerateQuery(bindVars, nil) require.NoError(t, err) - blsStr := fmt.Sprintf(`keyspace:"%s" shard:"%s" filter:{rules:{match:"customer" filter:"select * from customer"} rules:{match:"corder" filter:"select * from corder"}}`, + blsStr := fmt.Sprintf(`keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}`, keyspace, shard) selectRes := sqltypes.MakeTestResult( sqltypes.MakeTestFields( @@ -509,7 +589,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { Cells: []string{"zone2"}, // TabletTypes is an empty value, so the current value should be cleared }, - query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '' where id in (%d)`, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"corder\" filter:\"select * from corder\"} rules:{match:\"customer\" filter:\"select * from customer\"}}', cell = '%s', tablet_types = '' where id in (%d)`, keyspace, shard, "zone2", vreplID), }, { @@ -520,7 +600,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { Cells: []string{"zone3"}, TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}, // So keep the current value of replica }, - query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"corder\" filter:\"select * from corder\"} rules:{match:\"customer\" filter:\"select * from customer\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, keyspace, shard, "zone3", tabletTypes[0], vreplID), }, { @@ -531,7 +611,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { TabletSelectionPreference: tabletmanagerdatapb.TabletSelectionPreference_INORDER, TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA}, }, - query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '', tablet_types = '%s' where id in (%d)`, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"corder\" filter:\"select * from corder\"} rules:{match:\"customer\" filter:\"select * from customer\"}}', cell = '', tablet_types = '%s' where id in (%d)`, keyspace, shard, "in_order:rdonly,replica", vreplID), }, { @@ -542,7 +622,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { Cells: textutil.SimulatedNullStringSlice, // So keep the current value of zone1 TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, }, - query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"corder\" filter:\"select * from corder\"} rules:{match:\"customer\" filter:\"select * from customer\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, keyspace, shard, cells[0], "rdonly", vreplID), }, { @@ -552,7 +632,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt), OnDdl: binlogdatapb.OnDDLAction_EXEC, }, - query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '', tablet_types = '' where id in (%d)`, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"corder\" filter:\"select * from corder\"} rules:{match:\"customer\" filter:\"select * from customer\"}} on_ddl:%s', cell = '', tablet_types = '' where id in (%d)`, keyspace, shard, binlogdatapb.OnDDLAction_EXEC.String(), vreplID), }, { @@ -564,7 +644,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_PRIMARY}, OnDdl: binlogdatapb.OnDDLAction_EXEC_IGNORE, }, - query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '%s', tablet_types = '%s' where id in (%d)`, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"corder\" filter:\"select * from corder\"} rules:{match:\"customer\" filter:\"select * from customer\"}} on_ddl:%s', cell = '%s', tablet_types = '%s' where id in (%d)`, keyspace, shard, binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), "zone1,zone2,zone3", "rdonly,replica,primary", vreplID), }, { @@ -576,7 +656,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}, OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt), }, - query: fmt.Sprintf(`update _vt.vreplication set state = '%s', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, + query: fmt.Sprintf(`update _vt.vreplication set state = '%s', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"corder\" filter:\"select * from corder\"} rules:{match:\"customer\" filter:\"select * from customer\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, binlogdatapb.VReplicationWorkflowState_Stopped.String(), keyspace, shard, cells[0], tabletTypes[0], vreplID), }, } diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine_test.go b/go/vt/vttablet/tabletmanager/vdiff/engine_test.go index ca548a9a478..e6c9a84d9e2 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/engine_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/engine_test.go @@ -147,7 +147,7 @@ func TestVDiff(t *testing.T) { ), "NULL", ), nil) - vdenv.dbClient.ExpectRequest(fmt.Sprintf("select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = '%s' and table_name in ('t1')", vdiffDBName), sqltypes.MakeTestResult(sqltypes.MakeTestFields( + vdenv.dbClient.ExpectRequest(fmt.Sprintf("select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = '%s' and table_name in ('t1') order by table_name", vdiffDBName), sqltypes.MakeTestResult(sqltypes.MakeTestFields( "table_name|table_rows", "varchar|int64", ), @@ -192,7 +192,7 @@ func TestVDiff(t *testing.T) { vdenv.dbClient.ExpectRequest(`insert into _vt.vdiff_log(vdiff_id, message) values (1, 'completed: table \'t1\'')`, singleRowAffected, nil) vdenv.dbClient.ExpectRequest("update _vt.vdiff_table set state = 'completed' where vdiff_id = 1 and table_name = 't1'", singleRowAffected, nil) vdenv.dbClient.ExpectRequest(`insert into _vt.vdiff_log(vdiff_id, message) values (1, 'completed: table \'t1\'')`, singleRowAffected, nil) - vdenv.dbClient.ExpectRequest("select table_name as table_name from _vt.vdiff_table where vdiff_id = 1 and state != 'completed'", singleRowAffected, nil) + vdenv.dbClient.ExpectRequest("select table_name as table_name from _vt.vdiff_table where vdiff_id = 1 and state != 'completed' order by table_name", singleRowAffected, nil) vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'completed', last_error = left('', 1024) , completed_at = utc_timestamp() where id = 1", singleRowAffected, nil) vdenv.dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: completed')", singleRowAffected, nil) diff --git a/go/vt/vttablet/tabletmanager/vdiff/schema.go b/go/vt/vttablet/tabletmanager/vdiff/schema.go index 31e686877a2..afb79b4e4b3 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/schema.go +++ b/go/vt/vttablet/tabletmanager/vdiff/schema.go @@ -37,7 +37,7 @@ const ( vd.started_at as started_at, vdt.rows_compared as rows_compared, vd.completed_at as completed_at, IF(vdt.mismatch = 1, 1, 0) as has_mismatch, vdt.report as report from _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) - where vd.id = %a` + where vd.id = %a order by table_name` // sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1`. // It also truncates the error if needed to ensure that we can save the state when the error text is very long. sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = left(%s, 1024) %s where id = %d" @@ -49,7 +49,7 @@ const ( sqlGetVDiffID = "select id as id from _vt.vdiff where vdiff_uuid = %a" sqlGetVDiffIDsByKeyspaceWorkflow = "select id as id from _vt.vdiff where keyspace = %a and workflow = %a" sqlGetTableRows = "select table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %a and table_name = %a" - sqlGetAllTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name in (%s)" + sqlGetAllTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name in (%s) order by table_name" sqlNewVDiffTable = "insert into _vt.vdiff_table(vdiff_id, table_name, state, table_rows) values(%a, %a, 'pending', %a)" sqlGetVDiffTable = `select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report @@ -62,5 +62,5 @@ const ( sqlUpdateTableStateAndReport = "update _vt.vdiff_table set state = %a, rows_compared = %a, report = %a where vdiff_id = %a and table_name = %a" sqlUpdateTableMismatch = "update _vt.vdiff_table set mismatch = true where vdiff_id = %a and table_name = %a" - sqlGetIncompleteTables = "select table_name as table_name from _vt.vdiff_table where vdiff_id = %a and state != 'completed'" + sqlGetIncompleteTables = "select table_name as table_name from _vt.vdiff_table where vdiff_id = %a and state != 'completed' order by table_name" ) diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index d658fea2a25..b3634e37ab5 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -112,7 +112,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error { defer func() { unlock(&err) if err != nil { - log.Errorf("UnlockKeyspace %s failed: %v", targetKeyspace, lockErr) + log.Errorf("UnlockKeyspace %s failed: %v", targetKeyspace, err) } }() diff --git a/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go b/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go index da1753a8444..62af8c9396d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go @@ -21,8 +21,10 @@ import ( "strings" "time" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/vt/throttler" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) // InsertGenerator generates a vreplication insert statement. @@ -50,6 +52,7 @@ func NewInsertGenerator(state binlogdatapb.VReplicationWorkflowState, dbname str // AddRow adds a row to the insert statement. func (ig *InsertGenerator) AddRow(workflow string, bls *binlogdatapb.BinlogSource, pos, cell, tabletTypes string, workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType, deferSecondaryKeys bool) { + protoutil.SortBinlogSourceTables(bls) fmt.Fprintf(ig.buf, "%s(%v, %v, %v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d, %v)", ig.prefix, encodeString(workflow), diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go index 4779e607960..dfe51f71dbd 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go @@ -21,10 +21,12 @@ import ( "fmt" "io" "math" + "slices" "strconv" "strings" "time" + "golang.org/x/exp/maps" "google.golang.org/protobuf/encoding/prototext" "vitess.io/vitess/go/bytes2" @@ -230,9 +232,12 @@ func (vc *vcopier) initTablesForCopy(ctx context.Context) error { if len(plan.TargetTables) != 0 { var buf strings.Builder buf.WriteString("insert into _vt.copy_state(vrepl_id, table_name) values ") + // Sort the tables by name to ensure a consistent order. + tableNames := maps.Keys(plan.TargetTables) + slices.Sort(tableNames) prefix := "" - for name := range plan.TargetTables { - fmt.Fprintf(&buf, "%s(%d, %s)", prefix, vc.vr.id, encodeString(name)) + for _, tableName := range tableNames { + fmt.Fprintf(&buf, "%s(%d, %s)", prefix, vc.vr.id, encodeString(tableName)) prefix = ", " } if _, err := vc.vr.dbClient.Execute(buf.String()); err != nil { @@ -256,8 +261,8 @@ func (vc *vcopier) initTablesForCopy(ctx context.Context) error { len(plan.TargetTables))); err != nil { return err } - for name := range plan.TargetTables { - if err := vc.vr.stashSecondaryKeys(ctx, name); err != nil { + for _, tableName := range tableNames { + if err := vc.vr.stashSecondaryKeys(ctx, tableName); err != nil { return err } } @@ -294,7 +299,7 @@ func (vc *vcopier) initTablesForCopy(ctx context.Context) error { // primary key that was copied. A nil Result means that nothing has been copied. // A table that was fully copied is removed from copyState. func (vc *vcopier) copyNext(ctx context.Context, settings binlogplayer.VRSettings) error { - qr, err := vc.vr.dbClient.Execute(fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d and id in (select max(id) from _vt.copy_state group by vrepl_id, table_name)", vc.vr.id)) + qr, err := vc.vr.dbClient.Execute(fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d and id in (select max(id) from _vt.copy_state group by vrepl_id, table_name) order by table_name", vc.vr.id)) if err != nil { return err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index 0e35036321f..c32482641b2 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "os" + "regexp" "strings" "testing" "time" @@ -562,15 +563,19 @@ func testPlayerCopyTables(t *testing.T) { defer deleteTablet(addTablet(100)) execStatements(t, []string{ + "create table ast1(id int, primary key(id))", "create table src1(id int, val varbinary(128), d decimal(8,0), j json, primary key(id))", "insert into src1 values(2, 'bbb', 1, '{\"foo\": \"bar\"}'), (1, 'aaa', 0, JSON_ARRAY(123456789012345678901234567890, \"abcd\")), (3, 'ccc', 2, 'null'), (4, 'ddd', 3, '{\"name\": \"matt\", \"size\": null}'), (5, 'eee', 4, null)", + fmt.Sprintf("create table %s.ast1(id int, primary key(id))", vrepldb), fmt.Sprintf("create table %s.dst1(id int, val varbinary(128), val2 varbinary(128), d decimal(8,0), j json, primary key(id))", vrepldb), "create table yes(id int, val varbinary(128), primary key(id))", fmt.Sprintf("create table %s.yes(id int, val varbinary(128), primary key(id))", vrepldb), "create table no(id int, val varbinary(128), primary key(id))", }) defer execStatements(t, []string{ + "drop table ast1", "drop table src1", + fmt.Sprintf("drop table %s.ast1", vrepldb), fmt.Sprintf("drop table %s.dst1", vrepldb), "drop table yes", fmt.Sprintf("drop table %s.yes", vrepldb), @@ -582,6 +587,9 @@ func testPlayerCopyTables(t *testing.T) { Rules: []*binlogdatapb.Rule{{ Match: "dst1", Filter: "select id, val, val as val2, d, j from src1", + }, { + Match: "ast1", + Filter: "select * from ast1", }, { Match: "/yes", }}, @@ -595,9 +603,7 @@ func testPlayerCopyTables(t *testing.T) { } query := binlogplayer.CreateVReplicationState("test", bls, "", binlogdatapb.VReplicationWorkflowState_Init, playerEngine.dbName, 0, 0) qr, err := playerEngine.Exec(query) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer func() { query := fmt.Sprintf("delete from _vt.vreplication where id = %d", qr.InsertID) if _, err := playerEngine.Exec(query); err != nil { @@ -607,15 +613,24 @@ func testPlayerCopyTables(t *testing.T) { }() expectDBClientQueries(t, qh.Expect( - "/insert into _vt.vreplication", + // Filters should be lexicographically ordered by name. + regexp.QuoteMeta("/insert into _vt.vreplication (workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type) values ('test', 'keyspace:\\\"vttest\\\" shard:\\\"0\\\" filter:{rules:{match:\\\"ast1\\\" filter:\\\"select * from ast1\\\"} rules:{match:\\\"dst1\\\" filter:\\\"select id, val, val as val2, d, j from src1\\\"} rules:{match:\\\"/yes\\\"}}'"), "/update _vt.vreplication set message='Picked source tablet.*", // Create the list of tables to copy and transition to Copying state. "begin", - "/insert into _vt.copy_state", + // The table names should be lexicographically ordered by name. + fmt.Sprintf("insert into _vt.copy_state(vrepl_id, table_name) values (%d, 'ast1'), (%d, 'dst1'), (%d, 'yes')", qr.InsertID, qr.InsertID, qr.InsertID), "/update _vt.vreplication set state='Copying'", "commit", // The first fast-forward has no starting point. So, it just saves the current position. "/update _vt.vreplication set pos=", + // Now the tables should be copied in lexicographical order: ast1, dst1, yes. + // Nothing to copy from ast1. Delete from copy_state. + "/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*ast1", + // The next FF executes and updates the position before copying. + "begin", + "/update _vt.vreplication set pos=", + "commit", "begin", "insert into dst1(id,val,val2,d,j) values (1,'aaa','aaa',0,JSON_ARRAY(123456789012345678901234567890, _utf8mb4'abcd')), (2,'bbb','bbb',1,JSON_OBJECT(_utf8mb4'foo', _utf8mb4'bar')), (3,'ccc','ccc',2,CAST(_utf8mb4'null' as JSON)), (4,'ddd','ddd',3,JSON_OBJECT(_utf8mb4'name', _utf8mb4'matt', _utf8mb4'size', null)), (5,'eee','eee',4,null)", `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"id\\" type:INT32 charset:63 flags:53251} rows:{lengths:1 values:\\"5\\"}'.*`,