Skip to content

Commit

Permalink
Cherry-pick 4a1870a with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] committed Apr 4, 2024
1 parent 7495fd0 commit 1735391
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 2 deletions.
1 change: 1 addition & 0 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func TestFKWorkflow(t *testing.T) {
defaultCell = vc.Cells[defaultCellName]
sourceKeyspace := "fksource"
shardName := "0"
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables

defer vc.TearDown(t)

Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/movetables_buffering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package vreplication
import (
"testing"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

func TestMoveTablesBuffering(t *testing.T) {
Expand Down
29 changes: 29 additions & 0 deletions go/test/endtoend/vreplication/partial_movetables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,20 @@ func TestPartialMoveTablesBasic(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "target: customer.-80.replica", "Query was routed to the target before partial SwitchTraffic")

<<<<<<< HEAD
// We cannot Complete a partial move tables at the moment because
// it will find that all traffic has (obviously) not been switched.
err = tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionComplete, "", "", "", false)
=======
workflowExec := tstWorkflowExec
if flavor == workflowFlavorVtctl {
workflowExec = tstWorkflowExecVtctl
}

// We cannot Complete a partial move tables at the moment because
// it will find that all traffic has (obviously) not been switched.
err = workflowExec(t, "", workflowName, "", targetKs, "", workflowActionComplete, "", "", "", workflowExecOptsPartial80Dash)
>>>>>>> 4a1870ad59 (VReplication: Get workflowFlavorVtctl endtoend testing working properly again (#15636))
require.Error(t, err)

// Confirm global routing rules: -80 should still be be routed to customer
Expand Down Expand Up @@ -312,8 +323,13 @@ func TestPartialMoveTablesBasic(t *testing.T) {
for _, wf := range []string{"partialDash80", "partial80Dash"} {
// We switched traffic, so it's the reverse workflow we want to cancel.
reverseWf := wf + "_reverse"
<<<<<<< HEAD
reverseKs := sourceKs // customer
err = tstWorkflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", false)
=======
reverseKs := sourceKeyspace
err = workflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", opts)
>>>>>>> 4a1870ad59 (VReplication: Get workflowFlavorVtctl endtoend testing working properly again (#15636))
require.NoError(t, err)

output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", reverseKs, reverseWf), "show")
Expand All @@ -337,4 +353,17 @@ func TestPartialMoveTablesBasic(t *testing.T) {
// Confirm that the shard routing rules are now gone.
require.Equal(t, emptyShardRoutingRules, getShardRoutingRules(t))

<<<<<<< HEAD
=======
// TestPartialMoveTablesBasic tests partial move tables by moving each
// customer shard -- -80,80- -- once a a time to customer2.
// We test with both the vtctlclient and vtctldclient flavors.
func TestPartialMoveTablesBasic(t *testing.T) {
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
for _, flavor := range workflowFlavors {
t.Run(workflowFlavorNames[flavor], func(t *testing.T) {
testPartialMoveTablesBasic(t, flavor)
})
}
>>>>>>> 4a1870ad59 (VReplication: Get workflowFlavorVtctl endtoend testing working properly again (#15636))
}
79 changes: 79 additions & 0 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,16 @@ func tstWorkflowAction(t *testing.T, action, tabletTypes, cells string) error {
return tstWorkflowExec(t, cells, workflowName, sourceKs, targetKs, tablesToMove, action, tabletTypes, "", "", false)
}

<<<<<<< HEAD
func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes, sourceShards, targetShards string, atomicCopy bool) error {
=======
// tstWorkflowExec executes a MoveTables or Reshard workflow command using
// vtctldclient. If you need to use the legacy vtctlclient, use
// tstWorkflowExecVtctl instead.
func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes,
sourceShards, targetShards string, options *workflowExecOptions) error {

>>>>>>> 4a1870ad59 (VReplication: Get workflowFlavorVtctl endtoend testing working properly again (#15636))
var args []string
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables {
args = append(args, "MoveTables")
Expand Down Expand Up @@ -136,7 +145,11 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
}
args = append(args, "--timeout=90s")
}
<<<<<<< HEAD
if action == workflowActionCreate && atomicCopy {
=======
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && action == workflowActionCreate && options.atomicCopy {
>>>>>>> 4a1870ad59 (VReplication: Get workflowFlavorVtctl endtoend testing working properly again (#15636))
args = append(args, "--atomic-copy")
}
if (action == workflowActionCreate || action == workflowActionSwitchTraffic || action == workflowActionReverseTraffic) && cells != "" {
Expand All @@ -157,6 +170,72 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
return nil
}

// tstWorkflowExecVtctl executes a MoveTables or Reshard workflow command using
// vtctlclient. It should operate exactly the same way as tstWorkflowExec, but
// using the legacy client.
func tstWorkflowExecVtctl(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes,
sourceShards, targetShards string, options *workflowExecOptions) error {

var args []string
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables {
args = append(args, "MoveTables")
} else {
args = append(args, "Reshard")
}

args = append(args, "--")

if BypassLagCheck {
args = append(args, "--max_replication_lag_allowed=2542087h")
}
if options.atomicCopy {
args = append(args, "--atomic-copy")
}
switch action {
case workflowActionCreate:
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables {
args = append(args, "--source", sourceKs)
if tables != "" {
args = append(args, "--tables", tables)
} else {
args = append(args, "--all")
}
if sourceShards != "" {
args = append(args, "--source_shards", sourceShards)
}
} else {
args = append(args, "--source_shards", sourceShards, "--target_shards", targetShards)
}
// Test new experimental --defer-secondary-keys flag
switch currentWorkflowType {
case binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowType_Migrate, binlogdatapb.VReplicationWorkflowType_Reshard:
if !options.atomicCopy && options.deferSecondaryKeys {
args = append(args, "--defer-secondary-keys")
}
args = append(args, "--initialize-target-sequences") // Only used for MoveTables
}
default:
if options.shardSubset != "" {
args = append(args, "--shards", options.shardSubset)
}
}
if cells != "" {
args = append(args, "--cells", cells)
}
if tabletTypes != "" {
args = append(args, "--tablet_types", tabletTypes)
}
args = append(args, "--timeout", time.Minute.String())
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
args = append(args, action, ksWorkflow)
output, err := vc.VtctlClient.ExecuteCommandWithOutput(args...)
lastOutput = output
if err != nil {
return fmt.Errorf("%s: %s", err, output)
}
return nil
}

func tstWorkflowSwitchReads(t *testing.T, tabletTypes, cells string) {
if tabletTypes == "" {
tabletTypes = "replica,rdonly"
Expand Down
148 changes: 148 additions & 0 deletions go/test/endtoend/vreplication/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,24 @@ func (vmt *VtctlMoveTables) Create() {
}

func (vmt *VtctlMoveTables) SwitchReadsAndWrites() {
<<<<<<< HEAD
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionSwitchTraffic, "", "", "", vmt.atomicCopy)
=======
err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions)
>>>>>>> 4a1870ad59 (VReplication: Get workflowFlavorVtctl endtoend testing working properly again (#15636))
require.NoError(vmt.vc.t, err)
}

func (vmt *VtctlMoveTables) ReverseReadsAndWrites() {
<<<<<<< HEAD
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionReverseTraffic, "", "", "", vmt.atomicCopy)
=======
err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionReverseTraffic, "", "", "", defaultWorkflowExecOptions)
>>>>>>> 4a1870ad59 (VReplication: Get workflowFlavorVtctl endtoend testing working properly again (#15636))
require.NoError(vmt.vc.t, err)
}

Expand All @@ -114,6 +124,18 @@ func (vmt *VtctlMoveTables) Show() {
panic("implement me")
}

<<<<<<< HEAD
=======
func (vmt *VtctlMoveTables) exec(action string) {
options := &workflowExecOptions{
deferSecondaryKeys: false,
atomicCopy: vmt.atomicCopy,
}
err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, action, vmt.tabletTypes, vmt.sourceShards, "", options)
require.NoError(vmt.vc.t, err)
}
>>>>>>> 4a1870ad59 (VReplication: Get workflowFlavorVtctl endtoend testing working properly again (#15636))
func (vmt *VtctlMoveTables) SwitchReads() {
//TODO implement me
panic("implement me")
Expand Down Expand Up @@ -204,3 +226,129 @@ func (v VtctldMoveTables) Complete() {
//TODO implement me
panic("implement me")
}
<<<<<<< HEAD
=======

func (vrs *VtctlReshard) exec(action string) {
options := &workflowExecOptions{}
err := tstWorkflowExecVtctl(vrs.vc.t, "", vrs.workflowName, "", vrs.targetKeyspace,
"", action, vrs.tabletTypes, vrs.sourceShards, vrs.targetShards, options)
require.NoError(vrs.vc.t, err)
}

func (vrs *VtctlReshard) SwitchReads() {
//TODO implement me
panic("implement me")
}

func (vrs *VtctlReshard) SwitchWrites() {
//TODO implement me
panic("implement me")
}

func (vrs *VtctlReshard) Cancel() {
vrs.exec(workflowActionCancel)
}

func (vrs *VtctlReshard) Complete() {
vrs.exec(workflowActionComplete)
}

func (vrs *VtctlReshard) GetLastOutput() string {
return vrs.lastOutput
}

func (vrs *VtctlReshard) Start() {
panic("implement me")
}

func (vrs *VtctlReshard) Stop() {
panic("implement me")
}

var _ iReshard = (*VtctldReshard)(nil)

type VtctldReshard struct {
*reshardWorkflow
}

func newVtctldReshard(rs *reshardWorkflow) *VtctldReshard {
return &VtctldReshard{rs}
}

func (v VtctldReshard) Flavor() string {
return "vtctld"
}

func (v VtctldReshard) exec(args ...string) {
args2 := []string{"Reshard", "--workflow=" + v.workflowName, "--target-keyspace=" + v.targetKeyspace}
args2 = append(args2, args...)
var err error
if v.lastOutput, err = vc.VtctldClient.ExecuteCommandWithOutput(args2...); err != nil {
v.vc.t.Fatalf("failed to create Reshard workflow: %v: %s", err, v.lastOutput)
}
}

func (v VtctldReshard) Create() {
args := []string{"Create"}
if v.sourceShards != "" {
args = append(args, "--source-shards="+v.sourceShards)
}
if v.targetShards != "" {
args = append(args, "--target-shards="+v.targetShards)
}
if v.skipSchemaCopy {
args = append(args, "--skip-schema-copy="+strconv.FormatBool(v.skipSchemaCopy))
}
args = append(args, v.createFlags...)
v.exec(args...)
}

func (v VtctldReshard) SwitchReadsAndWrites() {
args := []string{"SwitchTraffic"}
args = append(args, v.switchFlags...)
v.exec(args...)
}

func (v VtctldReshard) ReverseReadsAndWrites() {
v.exec("ReverseTraffic")
}

func (v VtctldReshard) Show() {
v.exec("Show")
}

func (v VtctldReshard) SwitchReads() {
//TODO implement me
panic("implement me")
}

func (v VtctldReshard) SwitchWrites() {
//TODO implement me
panic("implement me")
}

func (v VtctldReshard) Cancel() {
args := []string{"Cancel"}
args = append(args, v.cancelFlags...)
v.exec(args...)
}

func (v VtctldReshard) Complete() {
args := []string{"Complete"}
args = append(args, v.completeFlags...)
v.exec(args...)
}

func (v VtctldReshard) GetLastOutput() string {
return v.lastOutput
}

func (vrs *VtctldReshard) Start() {
vrs.exec("Start")
}

func (vrs *VtctldReshard) Stop() {
vrs.exec("Stop")
}
>>>>>>> 4a1870ad59 (VReplication: Get workflowFlavorVtctl endtoend testing working properly again (#15636))

0 comments on commit 1735391

Please sign in to comment.