diff --git a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go index 86b87d52f21..a379239c246 100644 --- a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go +++ b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go @@ -31,6 +31,7 @@ import ( "vitess.io/vitess/go/json2" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/wrangler" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" @@ -169,6 +170,7 @@ func getMoveTablesShowResponse(mt *iMoveTables) *vtctldatapb.GetWorkflowsRespons // Validates some of the flags created from the previous test. func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetKeyspace, workflowName string, targetTabs map[string]*cluster.VttabletProcess) { ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName) + wf := (*mt).(iWorkflow) (*mt).Start() // Need to start because we set auto-start to false. waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String()) confirmNoRoutingRules(t) @@ -193,65 +195,79 @@ func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetK validateReadsRouteToTarget(t, "replica") validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) validateRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched) (*mt).ReverseReads() validateReadsRouteToSource(t, "replica") validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) validateRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) (*mt).SwitchReadsAndWrites() validateReadsRouteToTarget(t, "replica") validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) validateWritesRouteToTarget(t) validateRoutingRule(t, "customer", "", sourceKs, targetKs) + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) (*mt).ReverseReadsAndWrites() validateReadsRouteToSource(t, "replica") validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) validateWritesRouteToSource(t) validateRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched) (*mt).SwitchReadsAndWrites() validateReadsRouteToTarget(t, "replica") validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) validateWritesRouteToTarget(t) validateRoutingRule(t, "customer", "", sourceKs, targetKs) + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) (*mt).ReverseReads() validateReadsRouteToSource(t, "replica") validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) validateWritesRouteToTarget(t) validateRoutingRule(t, "customer", "", sourceKs, targetKs) + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched) (*mt).ReverseWrites() validateReadsRouteToSource(t, "replica") validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) validateWritesRouteToSource(t) validateRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched) (*mt).SwitchReadsAndWrites() validateReadsRouteToTarget(t, "replica") validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) validateWritesRouteToTarget(t) validateRoutingRule(t, "customer", "", sourceKs, targetKs) + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) (*mt).ReverseWrites() validateReadsRouteToTarget(t, "replica") validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) validateWritesRouteToSource(t) validateRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateReadsSwitched) (*mt).ReverseReads() validateReadsRouteToSource(t, "replica") validateRoutingRule(t, "customer", "replica", targetKs, sourceKs) validateWritesRouteToSource(t) validateRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) + + // Confirm that everything is still in sync after our switch fest. + vdiff(t, targetKeyspace, workflowName, "zone1", false, true, nil) (*mt).SwitchReadsAndWrites() validateReadsRouteToTarget(t, "replica") validateRoutingRule(t, "customer", "replica", sourceKs, targetKs) validateWritesRouteToTarget(t) validateRoutingRule(t, "customer", "", sourceKs, targetKs) + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) (*mt).Complete() confirmRoutingRulesExist(t) @@ -443,6 +459,7 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards }, workflowFlavorVtctld) ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflowName) + wf := rs.(iWorkflow) rs.Create() validateReshardResponse(rs) validateOverrides(t, targetTabs, overrides) @@ -491,56 +508,72 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards vdiff(t, keyspace, workflowName+"_reverse", "zone1", true, false, nil) shardReadsRouteToTarget() shardWritesRouteToTarget() + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) rs.ReverseReadsAndWrites() waitForLowLag(t, keyspace, workflowName) vdiff(t, keyspace, workflowName, "zone1", false, true, nil) shardReadsRouteToSource() shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched) rs.SwitchReads() shardReadsRouteToTarget() shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched) rs.ReverseReads() shardReadsRouteToSource() shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) rs.SwitchReadsAndWrites() shardReadsRouteToTarget() shardWritesRouteToTarget() + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) rs.ReverseReadsAndWrites() shardReadsRouteToSource() shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched) rs.SwitchReadsAndWrites() shardReadsRouteToTarget() shardWritesRouteToTarget() + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) rs.ReverseReads() shardReadsRouteToSource() shardWritesRouteToTarget() + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched) rs.ReverseWrites() shardReadsRouteToSource() shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched) rs.SwitchReadsAndWrites() shardReadsRouteToTarget() shardWritesRouteToTarget() + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) rs.ReverseWrites() shardReadsRouteToTarget() shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateReadsSwitched) rs.ReverseReads() shardReadsRouteToSource() shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) + + // Confirm that everything is still in sync after our switch fest. + vdiff(t, keyspace, workflowName, "zone1", false, true, nil) rs.SwitchReadsAndWrites() shardReadsRouteToTarget() shardWritesRouteToTarget() + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) rs.Complete() } @@ -910,3 +943,8 @@ func testOneRoutingRulesCommand(t *testing.T, typ string, rules string, validate }) } } + +func confirmStates(t *testing.T, workflow *iWorkflow, startState, endState string) { + require.Contains(t, (*workflow).GetLastOutput(), fmt.Sprintf("Start State: %s", startState)) + require.Contains(t, (*workflow).GetLastOutput(), fmt.Sprintf("Current State: %s", endState)) +} diff --git a/go/test/endtoend/vreplication/wrappers_test.go b/go/test/endtoend/vreplication/wrappers_test.go index e46bc6acda0..c91eeddcfd0 100644 --- a/go/test/endtoend/vreplication/wrappers_test.go +++ b/go/test/endtoend/vreplication/wrappers_test.go @@ -42,6 +42,7 @@ type iWorkflow interface { Flavor() string GetLastOutput() string Start() + Status() Stop() } @@ -149,6 +150,11 @@ func (vmt *VtctlMoveTables) Show() { panic("implement me") } +func (vmt *VtctlMoveTables) Status() { + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables + vmt.exec("Status") +} + func (vmt *VtctlMoveTables) exec(action string) { options := &workflowExecOptions{ deferSecondaryKeys: false, @@ -263,6 +269,10 @@ func (v VtctldMoveTables) Show() { v.exec(args...) } +func (v VtctldMoveTables) Status() { + v.exec("Status") +} + func (v VtctldMoveTables) SwitchReads() { args := []string{"SwitchTraffic", "--tablet-types=rdonly,replica"} args = append(args, v.switchFlags...) @@ -379,6 +389,11 @@ func (vrs *VtctlReshard) MirrorTraffic() { panic("implement me") } +func (vrs *VtctlReshard) Status() { + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard + vrs.exec("Status") +} + func (vrs *VtctlReshard) SwitchReadsAndWrites() { vrs.exec(workflowActionSwitchTraffic) } @@ -486,6 +501,10 @@ func (v VtctldReshard) Show() { v.exec("Show") } +func (v *VtctldReshard) Status() { + v.exec("Status") +} + func (v VtctldReshard) SwitchReads() { args := []string{"SwitchTraffic"} args = append(args, v.switchFlags...)