From ced11cc0385a39b2905decac3fa52a9295fc4c53 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 22 Mar 2024 09:45:47 +0100 Subject: [PATCH 1/4] Allow VDiff for OnlineDDLs. Add e2e test Signed-off-by: Rohit Nayak --- .../endtoend/cluster/vtctldclient_process.go | 11 ++ go/test/endtoend/vreplication/helper_test.go | 4 +- .../vreplication/vdiff_helper_test.go | 12 +- .../vreplication/vdiff_online_ddl_test.go | 146 ++++++++++++++++++ .../tabletmanager/vdiff/workflow_differ.go | 4 - test/config.json | 9 ++ 6 files changed, 177 insertions(+), 9 deletions(-) create mode 100644 go/test/endtoend/vreplication/vdiff_online_ddl_test.go diff --git a/go/test/endtoend/cluster/vtctldclient_process.go b/go/test/endtoend/cluster/vtctldclient_process.go index 2c6d6028ee0..4ed5acde518 100644 --- a/go/test/endtoend/cluster/vtctldclient_process.go +++ b/go/test/endtoend/cluster/vtctldclient_process.go @@ -298,3 +298,14 @@ func (vtctldclient *VtctldClientProcess) OnlineDDLShowRecent(Keyspace string) (r "recent", ) } + +// OnlineDDLShow responds with recent schema migration list +func (vtctldclient *VtctldClientProcess) OnlineDDLShow(keyspace, workflow string) (result string, err error) { + return vtctldclient.ExecuteCommandWithOutput( + "OnlineDDL", + "show", + "--json", + keyspace, + workflow, + ) +} diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index e187c8398b6..1c8b3a118bc 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -358,7 +358,7 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa log.Infof("Waiting for workflow %q to fully reach %q state", ksWorkflow, wantState) for { output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show") - require.NoError(t, err) + require.NoError(t, err, output) done = true state := "" result := gjson.Get(output, "ShardStatuses") @@ -522,7 +522,7 @@ func validateDryRunResults(t *testing.T, output string, want []string) { w = strings.TrimSpace(w[1:]) result := strings.HasPrefix(g, w) match = result - //t.Logf("Partial match |%v|%v|%v\n", w, g, match) + // t.Logf("Partial match |%v|%v|%v\n", w, g, match) } else { match = g == w } diff --git a/go/test/endtoend/vreplication/vdiff_helper_test.go b/go/test/endtoend/vreplication/vdiff_helper_test.go index 91605bff402..9911cd3e482 100644 --- a/go/test/endtoend/vreplication/vdiff_helper_test.go +++ b/go/test/endtoend/vreplication/vdiff_helper_test.go @@ -80,6 +80,7 @@ func doVtctlclientVDiff(t *testing.T, keyspace, workflow, cells string, want *ex } else { require.Equal(t, "completed", info.State, "vdiff results: %+v", info) require.False(t, info.HasMismatch, "vdiff results: %+v", info) + require.NotZero(t, info.RowsCompared) } if strings.Contains(t.Name(), "AcrossDBVersions") { log.Errorf("VDiff resume cannot be guaranteed between major MySQL versions due to implied collation differences, skipping resume test...") @@ -150,9 +151,10 @@ func waitForVDiff2ToComplete(t *testing.T, useVtctlclient bool, ksWorkflow, cell } type expectedVDiff2Result struct { - state string - shards []string - hasMismatch bool + state string + shards []string + hasMismatch bool + minimumRowsCompared int64 } func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *expectedVDiff2Result, extraFlags ...string) { @@ -175,6 +177,10 @@ func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *e } else { require.Equal(t, "completed", info.State, "vdiff results: %+v", info) require.False(t, info.HasMismatch, "vdiff results: %+v", info) + if want.minimumRowsCompared > 0 { + require.Greater(t, info.RowsCompared, want.minimumRowsCompared, + "not enough rows compared: want at least %d, got %d", want.minimumRowsCompared, info.RowsCompared) + } } if strings.Contains(t.Name(), "AcrossDBVersions") { log.Errorf("VDiff resume cannot be guaranteed between major MySQL versions due to implied collation differences, skipping resume test...") diff --git a/go/test/endtoend/vreplication/vdiff_online_ddl_test.go b/go/test/endtoend/vreplication/vdiff_online_ddl_test.go new file mode 100644 index 00000000000..7c8bd66492f --- /dev/null +++ b/go/test/endtoend/vreplication/vdiff_online_ddl_test.go @@ -0,0 +1,146 @@ +package vreplication + +import ( + "context" + "fmt" + "strconv" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/protojson" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/log" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + "vitess.io/vitess/go/vt/proto/vtctldata" +) + +// TestOnlineDDLVDiff is to run a vdiff on a table that is part of an OnlineDDL workflow. +func TestOnlineDDLVDiff(t *testing.T) { + setSidecarDBName("_vt") + defaultRdonly = 0 + defaultReplicas = 0 + defer func() { + defaultRdonly = 1 + defaultReplicas = 1 + }() + + vc = setupMinimalCluster(t) + defer vc.TearDown() + keyspace := "product" + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + createQuery := "create table temp (id int, name varchar(100), blb blob, primary key (id))" + dropQuery := "drop table temp" + alterQuery := "alter table temp add column extra1 int not null default 0" + insertTemplate := "insert into temp (id, name, blb) values (%d, 'name%d', 'blb%d')" + updateTemplate := "update temp set name = 'name_%d' where id = %d" + execOnlineDDL(t, "direct", keyspace, createQuery) + defer execOnlineDDL(t, "direct", keyspace, dropQuery) + + var done = make(chan bool) + go populate(ctx, done, insertTemplate, updateTemplate) + + var output string + waitForAdditionalRows(t, keyspace, "temp", 100) + output = execOnlineDDL(t, "vitess --postpone-completion", keyspace, alterQuery) + uuid := strings.TrimSpace(output) + waitForAdditionalRows(t, keyspace, "temp", 200) + want := &expectedVDiff2Result{ + state: "completed", + minimumRowsCompared: 200, + hasMismatch: false, + shards: []string{"0"}, + } + doVtctldclientVDiff(t, keyspace, uuid, "zone1", want) + + cancel() + <-done +} + +func execOnlineDDL(t *testing.T, strategy, keyspace, query string) string { + output, err := vc.VtctldClient.ExecuteCommandWithOutput("ApplySchema", "--ddl-strategy", strategy, "--sql", query, keyspace) + require.NoError(t, err, output) + uuid := strings.TrimSpace(output) + if strategy != "direct" { + err = waitForCondition("online ddl to start", func() bool { + var response vtctldata.GetSchemaMigrationsResponse + output, err := vc.VtctldClient.OnlineDDLShow(keyspace, uuid) + require.NoError(t, err, output) + err = protojson.Unmarshal([]byte(output), &response) + if err != nil { + log.Errorf("error unmarshalling response: %v", err) + return false + } + if len(response.Migrations) > 0 && response.Migrations[0].Status == vtctldata.SchemaMigration_RUNNING { + return true + } + return false + }, 30*time.Second) + require.NoError(t, err) + uuid := strings.TrimSpace(output) + waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, uuid), binlogdatapb.VReplicationWorkflowState_Running.String()) + } + return output +} + +func waitForAdditionalRows(t *testing.T, keyspace, table string, count int) { + vtgateConn, cancel := getVTGateConn() + defer cancel() + + numRowsStart := getNumRows(t, vtgateConn, keyspace, table) + shortCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + for { + switch { + case shortCtx.Err() != nil: + t.Fatalf("Timed out waiting for additional rows") + default: + numRows := getNumRows(t, vtgateConn, keyspace, table) + if numRows >= numRowsStart+count { + return + } + time.Sleep(10 * time.Millisecond) + } + } +} + +func getNumRows(t *testing.T, vtgateConn *mysql.Conn, keyspace, table string) int { + qr := execVtgateQuery(t, vtgateConn, keyspace, fmt.Sprintf("SELECT COUNT(*) FROM %s", table)) + require.NotNil(t, qr) + numRows, err := strconv.Atoi(qr.Rows[0][0].ToString()) + require.NoError(t, err) + return numRows +} + +func populate(ctx context.Context, done chan bool, insertTemplate, updateTemplate string) { + defer close(done) + vtgateConn, closeConn := getVTGateConn() + defer closeConn() + id := 1 + for { + select { + case <-ctx.Done(): + log.Infof("load cancelled") + return + default: + query := fmt.Sprintf(insertTemplate, id, id, id) + _, err := vtgateConn.ExecuteFetch(query, 1, false) + if err != nil { + log.Errorf("error in insert: %v", err) + panic(err) + } + query = fmt.Sprintf(updateTemplate, id, id) + _, err = vtgateConn.ExecuteFetch(query, 1, false) + if err != nil { + log.Errorf("error in update: %v", err) + panic(err) + } + id++ + time.Sleep(10 * time.Millisecond) + } + } +} diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index 97d2bd387cb..e4f3d2db727 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -31,7 +31,6 @@ import ( "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtctl/schematools" "vitess.io/vitess/go/vt/vterrors" @@ -343,9 +342,6 @@ func (wd *workflowDiffer) buildPlan(dbClient binlogplayer.DBClient, filter *binl if len(specifiedTables) != 0 && !stringListContains(specifiedTables, table.Name) { continue } - if schema.IsInternalOperationTableName(table.Name) { - continue - } rule, err := vreplication.MatchTable(table.Name, filter) if err != nil { return err diff --git a/test/config.json b/test/config.json index 26efdee1f36..6db35dd8158 100644 --- a/test/config.json +++ b/test/config.json @@ -1076,6 +1076,15 @@ "RetryMax": 0, "Tags": [] }, + "vreplication_onlineddl_vdiff": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestOnlineDDLVDiff"], + "Command": [], + "Manual": false, + "Shard": "vreplication_cellalias", + "RetryMax": 2, + "Tags": [] + }, "vreplication_vschema_load": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVSchemaChangesUnderLoad"], From fe9689862a6a09e3b1d3a279d806731270d820c0 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 22 Mar 2024 10:14:07 +0100 Subject: [PATCH 2/4] Fixed incorrect change to test helper Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/vdiff_helper_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/test/endtoend/vreplication/vdiff_helper_test.go b/go/test/endtoend/vreplication/vdiff_helper_test.go index 9911cd3e482..623c0d83c06 100644 --- a/go/test/endtoend/vreplication/vdiff_helper_test.go +++ b/go/test/endtoend/vreplication/vdiff_helper_test.go @@ -174,13 +174,13 @@ func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *e require.Equal(t, want.state, info.State) require.Equal(t, strings.Join(want.shards, ","), info.Shards) require.Equal(t, want.hasMismatch, info.HasMismatch) - } else { - require.Equal(t, "completed", info.State, "vdiff results: %+v", info) - require.False(t, info.HasMismatch, "vdiff results: %+v", info) if want.minimumRowsCompared > 0 { require.Greater(t, info.RowsCompared, want.minimumRowsCompared, "not enough rows compared: want at least %d, got %d", want.minimumRowsCompared, info.RowsCompared) } + } else { + require.Equal(t, "completed", info.State, "vdiff results: %+v", info) + require.False(t, info.HasMismatch, "vdiff results: %+v", info) } if strings.Contains(t.Name(), "AcrossDBVersions") { log.Errorf("VDiff resume cannot be guaranteed between major MySQL versions due to implied collation differences, skipping resume test...") From a08bb2f9b387aaa8e14184ec1cba8eec3a85a9eb Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 22 Mar 2024 18:07:08 +0100 Subject: [PATCH 3/4] Ensure workflows in VDiff are in Running state to avoid unpredictable results from Starting a Stopped workflow, for example Signed-off-by: Rohit Nayak --- .../vreplication/vdiff_helper_test.go | 3 +- .../vreplication/vdiff_online_ddl_test.go | 48 +++++++++++-------- go/vt/vtctl/workflow/server.go | 34 +++++++++++++ 3 files changed, 63 insertions(+), 22 deletions(-) diff --git a/go/test/endtoend/vreplication/vdiff_helper_test.go b/go/test/endtoend/vreplication/vdiff_helper_test.go index 623c0d83c06..c35496500ff 100644 --- a/go/test/endtoend/vreplication/vdiff_helper_test.go +++ b/go/test/endtoend/vreplication/vdiff_helper_test.go @@ -28,6 +28,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/log" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" vdiff2 "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" ) @@ -193,7 +194,7 @@ func performVDiff2Action(t *testing.T, useVtctlclient bool, ksWorkflow, cells, a var err error targetKeyspace, workflowName, ok := strings.Cut(ksWorkflow, ".") require.True(t, ok, "invalid keyspace.workflow value: %s", ksWorkflow) - + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) if useVtctlclient { // This will always result in us using a PRIMARY tablet, which is all // we start in many e2e tests, but it avoids the tablet picker logic diff --git a/go/test/endtoend/vreplication/vdiff_online_ddl_test.go b/go/test/endtoend/vreplication/vdiff_online_ddl_test.go index 7c8bd66492f..6f97c17f957 100644 --- a/go/test/endtoend/vreplication/vdiff_online_ddl_test.go +++ b/go/test/endtoend/vreplication/vdiff_online_ddl_test.go @@ -13,8 +13,9 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/proto/vtctldata" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) // TestOnlineDDLVDiff is to run a vdiff on a table that is part of an OnlineDDL workflow. @@ -41,24 +42,28 @@ func TestOnlineDDLVDiff(t *testing.T) { execOnlineDDL(t, "direct", keyspace, createQuery) defer execOnlineDDL(t, "direct", keyspace, dropQuery) - var done = make(chan bool) - go populate(ctx, done, insertTemplate, updateTemplate) - var output string - waitForAdditionalRows(t, keyspace, "temp", 100) - output = execOnlineDDL(t, "vitess --postpone-completion", keyspace, alterQuery) - uuid := strings.TrimSpace(output) - waitForAdditionalRows(t, keyspace, "temp", 200) - want := &expectedVDiff2Result{ - state: "completed", - minimumRowsCompared: 200, - hasMismatch: false, - shards: []string{"0"}, - } - doVtctldclientVDiff(t, keyspace, uuid, "zone1", want) - cancel() - <-done + t.Run("OnlineDDL VDiff", func(t *testing.T) { + var done = make(chan bool) + go populate(ctx, done, insertTemplate, updateTemplate) + + waitForAdditionalRows(t, keyspace, "temp", 100) + output = execOnlineDDL(t, "vitess --postpone-completion", keyspace, alterQuery) + uuid := strings.TrimSpace(output) + waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, uuid), binlogdatapb.VReplicationWorkflowState_Running.String()) + waitForAdditionalRows(t, keyspace, "temp", 200) + want := &expectedVDiff2Result{ + state: "completed", + minimumRowsCompared: 200, + hasMismatch: false, + shards: []string{"0"}, + } + doVtctldclientVDiff(t, keyspace, uuid, "zone1", want) + + cancel() + <-done + }) } func execOnlineDDL(t *testing.T, strategy, keyspace, query string) string { @@ -75,16 +80,17 @@ func execOnlineDDL(t *testing.T, strategy, keyspace, query string) string { log.Errorf("error unmarshalling response: %v", err) return false } - if len(response.Migrations) > 0 && response.Migrations[0].Status == vtctldata.SchemaMigration_RUNNING { + if len(response.Migrations) > 0 && + (response.Migrations[0].Status == vtctldata.SchemaMigration_RUNNING || + response.Migrations[0].Status == vtctldata.SchemaMigration_COMPLETE) { return true } return false }, 30*time.Second) require.NoError(t, err) - uuid := strings.TrimSpace(output) - waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, uuid), binlogdatapb.VReplicationWorkflowState_Running.String()) + } - return output + return uuid } func waitForAdditionalRows(t *testing.T, keyspace, table string, count int) { diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 53ca966d026..0142bf45376 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1792,6 +1792,16 @@ func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRe req.TargetKeyspace, req.Workflow) } + workflowStatus, err := s.getWorkflowStatus(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + if workflowStatus != binlogdatapb.VReplicationWorkflowState_Running { + log.Infof("Workflow %s.%s is not running, cannot start VDiff in state %s", req.TargetKeyspace, req.Workflow, workflowStatus) + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, + "not all streams are running in workflow %s.%s", req.TargetKeyspace, req.Workflow) + } + err = ts.ForAllTargets(func(target *MigrationTarget) error { _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) return err @@ -3949,3 +3959,27 @@ func (s *Server) MigrateCreate(ctx context.Context, req *vtctldatapb.MigrateCrea } return s.moveTablesCreate(ctx, moveTablesCreateRequest, binlogdatapb.VReplicationWorkflowType_Migrate) } + +// getWorkflowStatus gets the overall status of the workflow by checking the status of all the streams. If all streams are not +// in the same state, it returns the unknown state. +func (s *Server) getWorkflowStatus(ctx context.Context, keyspace string, workflow string) (binlogdatapb.VReplicationWorkflowState, error) { + workflowStatus := binlogdatapb.VReplicationWorkflowState_Unknown + wf, err := s.GetWorkflow(ctx, keyspace, workflow, false, nil) + if err != nil { + return workflowStatus, err + } + for _, shardStream := range wf.ShardStreams { + for _, stream := range shardStream.GetStreams() { + state, ok := binlogdatapb.VReplicationWorkflowState_value[stream.State] + if !ok { + return workflowStatus, fmt.Errorf("invalid state for stream %s of workflow %s.%s", stream.State, keyspace, workflow) + } + currentStatus := binlogdatapb.VReplicationWorkflowState(state) + if workflowStatus != binlogdatapb.VReplicationWorkflowState_Unknown && currentStatus != workflowStatus { + return binlogdatapb.VReplicationWorkflowState_Unknown, nil + } + workflowStatus = currentStatus + } + } + return workflowStatus, nil +} From c3fcd7375354de8595b6092674d0ba5b5f6c3ee5 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Mon, 8 Apr 2024 16:19:33 +0200 Subject: [PATCH 4/4] Address review comments. Check that online ddl is ready to complete before doing a vdiff on it Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/helper_test.go | 1 - .../vreplication/vdiff_helper_test.go | 6 +- .../vreplication/vdiff_online_ddl_test.go | 59 +++++++++++-------- go/vt/vtctl/workflow/server.go | 2 +- .../tabletmanager/vdiff/workflow_differ.go | 5 ++ 5 files changed, 42 insertions(+), 31 deletions(-) diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 1c8b3a118bc..29d8f518638 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -522,7 +522,6 @@ func validateDryRunResults(t *testing.T, output string, want []string) { w = strings.TrimSpace(w[1:]) result := strings.HasPrefix(g, w) match = result - // t.Logf("Partial match |%v|%v|%v\n", w, g, match) } else { match = g == w } diff --git a/go/test/endtoend/vreplication/vdiff_helper_test.go b/go/test/endtoend/vreplication/vdiff_helper_test.go index c35496500ff..53e19e56731 100644 --- a/go/test/endtoend/vreplication/vdiff_helper_test.go +++ b/go/test/endtoend/vreplication/vdiff_helper_test.go @@ -175,10 +175,8 @@ func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *e require.Equal(t, want.state, info.State) require.Equal(t, strings.Join(want.shards, ","), info.Shards) require.Equal(t, want.hasMismatch, info.HasMismatch) - if want.minimumRowsCompared > 0 { - require.Greater(t, info.RowsCompared, want.minimumRowsCompared, - "not enough rows compared: want at least %d, got %d", want.minimumRowsCompared, info.RowsCompared) - } + require.GreaterOrEqual(t, info.RowsCompared, want.minimumRowsCompared, + "not enough rows compared: want at least %d, got %d", want.minimumRowsCompared, info.RowsCompared) } else { require.Equal(t, "completed", info.State, "vdiff results: %+v", info) require.False(t, info.HasMismatch, "vdiff results: %+v", info) diff --git a/go/test/endtoend/vreplication/vdiff_online_ddl_test.go b/go/test/endtoend/vreplication/vdiff_online_ddl_test.go index 6f97c17f957..bad1b840069 100644 --- a/go/test/endtoend/vreplication/vdiff_online_ddl_test.go +++ b/go/test/endtoend/vreplication/vdiff_online_ddl_test.go @@ -21,11 +21,13 @@ import ( // TestOnlineDDLVDiff is to run a vdiff on a table that is part of an OnlineDDL workflow. func TestOnlineDDLVDiff(t *testing.T) { setSidecarDBName("_vt") + originalRdonly := defaultRdonly + originalReplicas := defaultReplicas defaultRdonly = 0 defaultReplicas = 0 defer func() { - defaultRdonly = 1 - defaultReplicas = 1 + defaultRdonly = originalRdonly + defaultReplicas = originalReplicas }() vc = setupMinimalCluster(t) @@ -46,13 +48,23 @@ func TestOnlineDDLVDiff(t *testing.T) { t.Run("OnlineDDL VDiff", func(t *testing.T) { var done = make(chan bool) - go populate(ctx, done, insertTemplate, updateTemplate) + go populate(ctx, t, done, insertTemplate, updateTemplate) waitForAdditionalRows(t, keyspace, "temp", 100) output = execOnlineDDL(t, "vitess --postpone-completion", keyspace, alterQuery) uuid := strings.TrimSpace(output) waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, uuid), binlogdatapb.VReplicationWorkflowState_Running.String()) waitForAdditionalRows(t, keyspace, "temp", 200) + + require.NoError(t, waitForCondition("online ddl migration to be ready to complete", func() bool { + response := onlineDDLShow(t, keyspace, uuid) + if len(response.Migrations) > 0 && + response.Migrations[0].ReadyToComplete == true { + return true + } + return false + }, defaultTimeout)) + want := &expectedVDiff2Result{ state: "completed", minimumRowsCompared: 200, @@ -66,27 +78,29 @@ func TestOnlineDDLVDiff(t *testing.T) { }) } +func onlineDDLShow(t *testing.T, keyspace, uuid string) *vtctldata.GetSchemaMigrationsResponse { + var response vtctldata.GetSchemaMigrationsResponse + output, err := vc.VtctldClient.OnlineDDLShow(keyspace, uuid) + require.NoError(t, err, output) + err = protojson.Unmarshal([]byte(output), &response) + require.NoErrorf(t, err, "error unmarshalling OnlineDDL showresponse") + return &response +} + func execOnlineDDL(t *testing.T, strategy, keyspace, query string) string { output, err := vc.VtctldClient.ExecuteCommandWithOutput("ApplySchema", "--ddl-strategy", strategy, "--sql", query, keyspace) require.NoError(t, err, output) uuid := strings.TrimSpace(output) if strategy != "direct" { err = waitForCondition("online ddl to start", func() bool { - var response vtctldata.GetSchemaMigrationsResponse - output, err := vc.VtctldClient.OnlineDDLShow(keyspace, uuid) - require.NoError(t, err, output) - err = protojson.Unmarshal([]byte(output), &response) - if err != nil { - log.Errorf("error unmarshalling response: %v", err) - return false - } + response := onlineDDLShow(t, keyspace, uuid) if len(response.Migrations) > 0 && (response.Migrations[0].Status == vtctldata.SchemaMigration_RUNNING || response.Migrations[0].Status == vtctldata.SchemaMigration_COMPLETE) { return true } return false - }, 30*time.Second) + }, defaultTimeout) require.NoError(t, err) } @@ -98,18 +112,19 @@ func waitForAdditionalRows(t *testing.T, keyspace, table string, count int) { defer cancel() numRowsStart := getNumRows(t, vtgateConn, keyspace, table) - shortCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + numRows := 0 + shortCtx, cancel := context.WithTimeout(context.Background(), defaultTimeout) defer cancel() for { switch { case shortCtx.Err() != nil: - t.Fatalf("Timed out waiting for additional rows") + require.FailNowf(t, "Timed out waiting for additional rows", "wanted %d rows, got %d rows", count, numRows) default: - numRows := getNumRows(t, vtgateConn, keyspace, table) + numRows = getNumRows(t, vtgateConn, keyspace, table) if numRows >= numRowsStart+count { return } - time.Sleep(10 * time.Millisecond) + time.Sleep(defaultTick) } } } @@ -122,7 +137,7 @@ func getNumRows(t *testing.T, vtgateConn *mysql.Conn, keyspace, table string) in return numRows } -func populate(ctx context.Context, done chan bool, insertTemplate, updateTemplate string) { +func populate(ctx context.Context, t *testing.T, done chan bool, insertTemplate, updateTemplate string) { defer close(done) vtgateConn, closeConn := getVTGateConn() defer closeConn() @@ -135,16 +150,10 @@ func populate(ctx context.Context, done chan bool, insertTemplate, updateTemplat default: query := fmt.Sprintf(insertTemplate, id, id, id) _, err := vtgateConn.ExecuteFetch(query, 1, false) - if err != nil { - log.Errorf("error in insert: %v", err) - panic(err) - } + require.NoErrorf(t, err, "error in insert") query = fmt.Sprintf(updateTemplate, id, id) _, err = vtgateConn.ExecuteFetch(query, 1, false) - if err != nil { - log.Errorf("error in update: %v", err) - panic(err) - } + require.NoErrorf(t, err, "error in update") id++ time.Sleep(10 * time.Millisecond) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 0142bf45376..4197269feb6 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3968,7 +3968,7 @@ func (s *Server) getWorkflowStatus(ctx context.Context, keyspace string, workflo if err != nil { return workflowStatus, err } - for _, shardStream := range wf.ShardStreams { + for _, shardStream := range wf.GetShardStreams() { for _, stream := range shardStream.GetStreams() { state, ok := binlogdatapb.VReplicationWorkflowState_value[stream.State] if !ok { diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index e4f3d2db727..56b8d663a3c 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -24,6 +24,8 @@ import ( "strings" "time" + "vitess.io/vitess/go/vt/schema" + "google.golang.org/protobuf/encoding/prototext" "vitess.io/vitess/go/mysql/collations" @@ -342,6 +344,9 @@ func (wd *workflowDiffer) buildPlan(dbClient binlogplayer.DBClient, filter *binl if len(specifiedTables) != 0 && !stringListContains(specifiedTables, table.Name) { continue } + if schema.IsInternalOperationTableName(table.Name) && !schema.IsOnlineDDLTableName(table.Name) { + continue + } rule, err := vreplication.MatchTable(table.Name, filter) if err != nil { return err