diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go index 31886864f11..dba2d8fbd8b 100644 --- a/go/test/endtoend/vreplication/fk_test.go +++ b/go/test/endtoend/vreplication/fk_test.go @@ -52,6 +52,7 @@ func TestFKWorkflow(t *testing.T) { defaultCell = vc.Cells[defaultCellName] sourceKeyspace := "fksource" shardName := "0" + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables defer vc.TearDown(t) diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go index e81e507f9f4..3171c35e35b 100644 --- a/go/test/endtoend/vreplication/movetables_buffering_test.go +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -3,11 +3,11 @@ package vreplication import ( "testing" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/log" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) func TestMoveTablesBuffering(t *testing.T) { diff --git a/go/test/endtoend/vreplication/partial_movetables_test.go b/go/test/endtoend/vreplication/partial_movetables_test.go index 9719474afef..a253a43c3de 100644 --- a/go/test/endtoend/vreplication/partial_movetables_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_test.go @@ -270,9 +270,20 @@ func TestPartialMoveTablesBasic(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "target: customer.-80.replica", "Query was routed to the target before partial SwitchTraffic") +<<<<<<< HEAD // We cannot Complete a partial move tables at the moment because // it will find that all traffic has (obviously) not been switched. err = tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionComplete, "", "", "", false) +======= + workflowExec := tstWorkflowExec + if flavor == workflowFlavorVtctl { + workflowExec = tstWorkflowExecVtctl + } + + // We cannot Complete a partial move tables at the moment because + // it will find that all traffic has (obviously) not been switched. + err = workflowExec(t, "", workflowName, "", targetKs, "", workflowActionComplete, "", "", "", workflowExecOptsPartial80Dash) +>>>>>>> 4a1870ad59 (VReplication: Get workflowFlavorVtctl endtoend testing working properly again (#15636)) require.Error(t, err) // Confirm global routing rules: -80 should still be be routed to customer @@ -312,8 +323,13 @@ func TestPartialMoveTablesBasic(t *testing.T) { for _, wf := range []string{"partialDash80", "partial80Dash"} { // We switched traffic, so it's the reverse workflow we want to cancel. reverseWf := wf + "_reverse" +<<<<<<< HEAD reverseKs := sourceKs // customer err = tstWorkflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", false) +======= + reverseKs := sourceKeyspace + err = workflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", opts) +>>>>>>> 4a1870ad59 (VReplication: Get workflowFlavorVtctl endtoend testing working properly again (#15636)) require.NoError(t, err) output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", reverseKs, reverseWf), "show") @@ -337,4 +353,17 @@ func TestPartialMoveTablesBasic(t *testing.T) { // Confirm that the shard routing rules are now gone. require.Equal(t, emptyShardRoutingRules, getShardRoutingRules(t)) +<<<<<<< HEAD +======= +// TestPartialMoveTablesBasic tests partial move tables by moving each +// customer shard -- -80,80- -- once a a time to customer2. +// We test with both the vtctlclient and vtctldclient flavors. +func TestPartialMoveTablesBasic(t *testing.T) { + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables + for _, flavor := range workflowFlavors { + t.Run(workflowFlavorNames[flavor], func(t *testing.T) { + testPartialMoveTablesBasic(t, flavor) + }) + } +>>>>>>> 4a1870ad59 (VReplication: Get workflowFlavorVtctl endtoend testing working properly again (#15636)) } diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index c32590e436b..d4aba89ecc4 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -94,7 +94,16 @@ func tstWorkflowAction(t *testing.T, action, tabletTypes, cells string) error { return tstWorkflowExec(t, cells, workflowName, sourceKs, targetKs, tablesToMove, action, tabletTypes, "", "", false) } +<<<<<<< HEAD func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes, sourceShards, targetShards string, atomicCopy bool) error { +======= +// tstWorkflowExec executes a MoveTables or Reshard workflow command using +// vtctldclient. If you need to use the legacy vtctlclient, use +// tstWorkflowExecVtctl instead. +func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes, + sourceShards, targetShards string, options *workflowExecOptions) error { + +>>>>>>> 4a1870ad59 (VReplication: Get workflowFlavorVtctl endtoend testing working properly again (#15636)) var args []string if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables { args = append(args, "MoveTables") @@ -136,7 +145,11 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, } args = append(args, "--timeout=90s") } +<<<<<<< HEAD if action == workflowActionCreate && atomicCopy { +======= + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && action == workflowActionCreate && options.atomicCopy { +>>>>>>> 4a1870ad59 (VReplication: Get workflowFlavorVtctl endtoend testing working properly again (#15636)) args = append(args, "--atomic-copy") } if (action == workflowActionCreate || action == workflowActionSwitchTraffic || action == workflowActionReverseTraffic) && cells != "" { @@ -157,6 +170,72 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, return nil } +// tstWorkflowExecVtctl executes a MoveTables or Reshard workflow command using +// vtctlclient. It should operate exactly the same way as tstWorkflowExec, but +// using the legacy client. +func tstWorkflowExecVtctl(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes, + sourceShards, targetShards string, options *workflowExecOptions) error { + + var args []string + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables { + args = append(args, "MoveTables") + } else { + args = append(args, "Reshard") + } + + args = append(args, "--") + + if BypassLagCheck { + args = append(args, "--max_replication_lag_allowed=2542087h") + } + if options.atomicCopy { + args = append(args, "--atomic-copy") + } + switch action { + case workflowActionCreate: + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables { + args = append(args, "--source", sourceKs) + if tables != "" { + args = append(args, "--tables", tables) + } else { + args = append(args, "--all") + } + if sourceShards != "" { + args = append(args, "--source_shards", sourceShards) + } + } else { + args = append(args, "--source_shards", sourceShards, "--target_shards", targetShards) + } + // Test new experimental --defer-secondary-keys flag + switch currentWorkflowType { + case binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowType_Migrate, binlogdatapb.VReplicationWorkflowType_Reshard: + if !options.atomicCopy && options.deferSecondaryKeys { + args = append(args, "--defer-secondary-keys") + } + args = append(args, "--initialize-target-sequences") // Only used for MoveTables + } + default: + if options.shardSubset != "" { + args = append(args, "--shards", options.shardSubset) + } + } + if cells != "" { + args = append(args, "--cells", cells) + } + if tabletTypes != "" { + args = append(args, "--tablet_types", tabletTypes) + } + args = append(args, "--timeout", time.Minute.String()) + ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) + args = append(args, action, ksWorkflow) + output, err := vc.VtctlClient.ExecuteCommandWithOutput(args...) + lastOutput = output + if err != nil { + return fmt.Errorf("%s: %s", err, output) + } + return nil +} + func tstWorkflowSwitchReads(t *testing.T, tabletTypes, cells string) { if tabletTypes == "" { tabletTypes = "replica,rdonly" diff --git a/go/test/endtoend/vreplication/wrappers_test.go b/go/test/endtoend/vreplication/wrappers_test.go index 6bd0bbb19d8..dd4c1e038bb 100644 --- a/go/test/endtoend/vreplication/wrappers_test.go +++ b/go/test/endtoend/vreplication/wrappers_test.go @@ -98,14 +98,24 @@ func (vmt *VtctlMoveTables) Create() { } func (vmt *VtctlMoveTables) SwitchReadsAndWrites() { +<<<<<<< HEAD err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, vmt.tables, workflowActionSwitchTraffic, "", "", "", vmt.atomicCopy) +======= + err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + vmt.tables, workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions) +>>>>>>> 4a1870ad59 (VReplication: Get workflowFlavorVtctl endtoend testing working properly again (#15636)) require.NoError(vmt.vc.t, err) } func (vmt *VtctlMoveTables) ReverseReadsAndWrites() { +<<<<<<< HEAD err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, vmt.tables, workflowActionReverseTraffic, "", "", "", vmt.atomicCopy) +======= + err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + vmt.tables, workflowActionReverseTraffic, "", "", "", defaultWorkflowExecOptions) +>>>>>>> 4a1870ad59 (VReplication: Get workflowFlavorVtctl endtoend testing working properly again (#15636)) require.NoError(vmt.vc.t, err) } @@ -114,6 +124,18 @@ func (vmt *VtctlMoveTables) Show() { panic("implement me") } +<<<<<<< HEAD +======= +func (vmt *VtctlMoveTables) exec(action string) { + options := &workflowExecOptions{ + deferSecondaryKeys: false, + atomicCopy: vmt.atomicCopy, + } + err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + vmt.tables, action, vmt.tabletTypes, vmt.sourceShards, "", options) + require.NoError(vmt.vc.t, err) +} +>>>>>>> 4a1870ad59 (VReplication: Get workflowFlavorVtctl endtoend testing working properly again (#15636)) func (vmt *VtctlMoveTables) SwitchReads() { //TODO implement me panic("implement me") @@ -204,3 +226,129 @@ func (v VtctldMoveTables) Complete() { //TODO implement me panic("implement me") } +<<<<<<< HEAD +======= + +func (vrs *VtctlReshard) exec(action string) { + options := &workflowExecOptions{} + err := tstWorkflowExecVtctl(vrs.vc.t, "", vrs.workflowName, "", vrs.targetKeyspace, + "", action, vrs.tabletTypes, vrs.sourceShards, vrs.targetShards, options) + require.NoError(vrs.vc.t, err) +} + +func (vrs *VtctlReshard) SwitchReads() { + //TODO implement me + panic("implement me") +} + +func (vrs *VtctlReshard) SwitchWrites() { + //TODO implement me + panic("implement me") +} + +func (vrs *VtctlReshard) Cancel() { + vrs.exec(workflowActionCancel) +} + +func (vrs *VtctlReshard) Complete() { + vrs.exec(workflowActionComplete) +} + +func (vrs *VtctlReshard) GetLastOutput() string { + return vrs.lastOutput +} + +func (vrs *VtctlReshard) Start() { + panic("implement me") +} + +func (vrs *VtctlReshard) Stop() { + panic("implement me") +} + +var _ iReshard = (*VtctldReshard)(nil) + +type VtctldReshard struct { + *reshardWorkflow +} + +func newVtctldReshard(rs *reshardWorkflow) *VtctldReshard { + return &VtctldReshard{rs} +} + +func (v VtctldReshard) Flavor() string { + return "vtctld" +} + +func (v VtctldReshard) exec(args ...string) { + args2 := []string{"Reshard", "--workflow=" + v.workflowName, "--target-keyspace=" + v.targetKeyspace} + args2 = append(args2, args...) + var err error + if v.lastOutput, err = vc.VtctldClient.ExecuteCommandWithOutput(args2...); err != nil { + v.vc.t.Fatalf("failed to create Reshard workflow: %v: %s", err, v.lastOutput) + } +} + +func (v VtctldReshard) Create() { + args := []string{"Create"} + if v.sourceShards != "" { + args = append(args, "--source-shards="+v.sourceShards) + } + if v.targetShards != "" { + args = append(args, "--target-shards="+v.targetShards) + } + if v.skipSchemaCopy { + args = append(args, "--skip-schema-copy="+strconv.FormatBool(v.skipSchemaCopy)) + } + args = append(args, v.createFlags...) + v.exec(args...) +} + +func (v VtctldReshard) SwitchReadsAndWrites() { + args := []string{"SwitchTraffic"} + args = append(args, v.switchFlags...) + v.exec(args...) +} + +func (v VtctldReshard) ReverseReadsAndWrites() { + v.exec("ReverseTraffic") +} + +func (v VtctldReshard) Show() { + v.exec("Show") +} + +func (v VtctldReshard) SwitchReads() { + //TODO implement me + panic("implement me") +} + +func (v VtctldReshard) SwitchWrites() { + //TODO implement me + panic("implement me") +} + +func (v VtctldReshard) Cancel() { + args := []string{"Cancel"} + args = append(args, v.cancelFlags...) + v.exec(args...) +} + +func (v VtctldReshard) Complete() { + args := []string{"Complete"} + args = append(args, v.completeFlags...) + v.exec(args...) +} + +func (v VtctldReshard) GetLastOutput() string { + return v.lastOutput +} + +func (vrs *VtctldReshard) Start() { + vrs.exec("Start") +} + +func (vrs *VtctldReshard) Stop() { + vrs.exec("Stop") +} +>>>>>>> 4a1870ad59 (VReplication: Get workflowFlavorVtctl endtoend testing working properly again (#15636))