diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go index 31886864f11..dba2d8fbd8b 100644 --- a/go/test/endtoend/vreplication/fk_test.go +++ b/go/test/endtoend/vreplication/fk_test.go @@ -52,6 +52,7 @@ func TestFKWorkflow(t *testing.T) { defaultCell = vc.Cells[defaultCellName] sourceKeyspace := "fksource" shardName := "0" + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables defer vc.TearDown(t) diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go index e81e507f9f4..3171c35e35b 100644 --- a/go/test/endtoend/vreplication/movetables_buffering_test.go +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -3,11 +3,11 @@ package vreplication import ( "testing" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/log" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) func TestMoveTablesBuffering(t *testing.T) { diff --git a/go/test/endtoend/vreplication/partial_movetables_test.go b/go/test/endtoend/vreplication/partial_movetables_test.go index 9719474afef..40bb585495e 100644 --- a/go/test/endtoend/vreplication/partial_movetables_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_test.go @@ -272,7 +272,7 @@ func TestPartialMoveTablesBasic(t *testing.T) { // We cannot Complete a partial move tables at the moment because // it will find that all traffic has (obviously) not been switched. - err = tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionComplete, "", "", "", false) + err = tstWorkflowExecVtctl(t, "", wfName, "", targetKs, "", workflowActionComplete, "", "", "", false) require.Error(t, err) // Confirm global routing rules: -80 should still be be routed to customer @@ -313,7 +313,7 @@ func TestPartialMoveTablesBasic(t *testing.T) { // We switched traffic, so it's the reverse workflow we want to cancel. reverseWf := wf + "_reverse" reverseKs := sourceKs // customer - err = tstWorkflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", false) + err = tstWorkflowExecVtctl(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", false) require.NoError(t, err) output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", reverseKs, reverseWf), "show") diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index c32590e436b..3135b2c6a33 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -94,6 +94,9 @@ func tstWorkflowAction(t *testing.T, action, tabletTypes, cells string) error { return tstWorkflowExec(t, cells, workflowName, sourceKs, targetKs, tablesToMove, action, tabletTypes, "", "", false) } +// tstWorkflowExec executes a MoveTables or Reshard workflow command using +// vtctldclient. If you need to use the legacy vtctlclient, use +// tstWorkflowExecVtctl instead. func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes, sourceShards, targetShards string, atomicCopy bool) error { var args []string if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables { @@ -136,7 +139,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, } args = append(args, "--timeout=90s") } - if action == workflowActionCreate && atomicCopy { + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && action == workflowActionCreate && atomicCopy { args = append(args, "--atomic-copy") } if (action == workflowActionCreate || action == workflowActionSwitchTraffic || action == workflowActionReverseTraffic) && cells != "" { @@ -157,6 +160,66 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, return nil } +// tstWorkflowExecVtctl executes a MoveTables or Reshard workflow command using +// vtctlclient. It should operate exactly the same way as tstWorkflowExec, but +// using the legacy client. +func tstWorkflowExecVtctl(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes, sourceShards, targetShards string, atomicCopy bool) error { + var args []string + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables { + args = append(args, "MoveTables") + } else { + args = append(args, "Reshard") + } + + args = append(args, "--") + + if BypassLagCheck { + args = append(args, "--max_replication_lag_allowed=2542087h") + } + if atomicCopy { + args = append(args, "--atomic-copy") + } + switch action { + case workflowActionCreate: + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables { + args = append(args, "--source", sourceKs) + if tables != "" { + args = append(args, "--tables", tables) + } else { + args = append(args, "--all") + } + if sourceShards != "" { + args = append(args, "--source_shards", sourceShards) + } + } else { + args = append(args, "--source_shards", sourceShards, "--target_shards", targetShards) + } + // Test new experimental --defer-secondary-keys flag + switch currentWorkflowType { + case binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowType_Migrate, binlogdatapb.VReplicationWorkflowType_Reshard: + if !atomicCopy { + args = append(args, "--defer-secondary-keys") + } + args = append(args, "--initialize-target-sequences") // Only used for MoveTables + } + } + if cells != "" { + args = append(args, "--cells", cells) + } + if tabletTypes != "" { + args = append(args, "--tablet_types", tabletTypes) + } + args = append(args, "--timeout", time.Minute.String()) + ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) + args = append(args, action, ksWorkflow) + output, err := vc.VtctlClient.ExecuteCommandWithOutput(args...) + lastOutput = output + if err != nil { + return fmt.Errorf("%s: %s", err, output) + } + return nil +} + func tstWorkflowSwitchReads(t *testing.T, tabletTypes, cells string) { if tabletTypes == "" { tabletTypes = "replica,rdonly" diff --git a/go/test/endtoend/vreplication/wrappers_test.go b/go/test/endtoend/vreplication/wrappers_test.go index 6bd0bbb19d8..fee98eaf3d7 100644 --- a/go/test/endtoend/vreplication/wrappers_test.go +++ b/go/test/endtoend/vreplication/wrappers_test.go @@ -92,19 +92,19 @@ func newVtctlMoveTables(mt *moveTables) *VtctlMoveTables { func (vmt *VtctlMoveTables) Create() { log.Infof("vmt is %+v", vmt.vc, vmt.tables) - err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, vmt.tables, workflowActionCreate, "", vmt.sourceShards, "", vmt.atomicCopy) require.NoError(vmt.vc.t, err) } func (vmt *VtctlMoveTables) SwitchReadsAndWrites() { - err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, vmt.tables, workflowActionSwitchTraffic, "", "", "", vmt.atomicCopy) require.NoError(vmt.vc.t, err) } func (vmt *VtctlMoveTables) ReverseReadsAndWrites() { - err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, vmt.tables, workflowActionReverseTraffic, "", "", "", vmt.atomicCopy) require.NoError(vmt.vc.t, err) } @@ -125,7 +125,7 @@ func (vmt *VtctlMoveTables) SwitchWrites() { } func (vmt *VtctlMoveTables) Cancel() { - err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, vmt.tables, workflowActionCancel, "", "", "", vmt.atomicCopy) require.NoError(vmt.vc.t, err) }