Skip to content

Commit

Permalink
Check workflow states and do a final vdiff in new tests
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Oct 14, 2024
1 parent 394ce81 commit 9dbb24e
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/wrangler"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
Expand Down Expand Up @@ -169,6 +170,7 @@ func getMoveTablesShowResponse(mt *iMoveTables) *vtctldatapb.GetWorkflowsRespons
// Validates some of the flags created from the previous test.
func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetKeyspace, workflowName string, targetTabs map[string]*cluster.VttabletProcess) {
ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName)
wf := (*mt).(iWorkflow)
(*mt).Start() // Need to start because we set auto-start to false.
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String())
confirmNoRoutingRules(t)
Expand All @@ -193,65 +195,79 @@ func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetK
validateReadsRouteToTarget(t, "replica")
validateRoutingRule(t, "customer", "replica", sourceKs, targetKs)
validateRoutingRule(t, "customer", "", targetKs, sourceKs)
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)

(*mt).ReverseReads()
validateReadsRouteToSource(t, "replica")
validateRoutingRule(t, "customer", "replica", targetKs, sourceKs)
validateRoutingRule(t, "customer", "", targetKs, sourceKs)
confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)

(*mt).SwitchReadsAndWrites()
validateReadsRouteToTarget(t, "replica")
validateRoutingRule(t, "customer", "replica", sourceKs, targetKs)
validateWritesRouteToTarget(t)
validateRoutingRule(t, "customer", "", sourceKs, targetKs)
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)

(*mt).ReverseReadsAndWrites()
validateReadsRouteToSource(t, "replica")
validateRoutingRule(t, "customer", "replica", targetKs, sourceKs)
validateWritesRouteToSource(t)
validateRoutingRule(t, "customer", "", targetKs, sourceKs)
confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched)

(*mt).SwitchReadsAndWrites()
validateReadsRouteToTarget(t, "replica")
validateRoutingRule(t, "customer", "replica", sourceKs, targetKs)
validateWritesRouteToTarget(t)
validateRoutingRule(t, "customer", "", sourceKs, targetKs)
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)

(*mt).ReverseReads()
validateReadsRouteToSource(t, "replica")
validateRoutingRule(t, "customer", "replica", targetKs, sourceKs)
validateWritesRouteToTarget(t)
validateRoutingRule(t, "customer", "", sourceKs, targetKs)
confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched)

(*mt).ReverseWrites()
validateReadsRouteToSource(t, "replica")
validateRoutingRule(t, "customer", "replica", targetKs, sourceKs)
validateWritesRouteToSource(t)
validateRoutingRule(t, "customer", "", targetKs, sourceKs)
confirmStates(t, &wf, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched)

(*mt).SwitchReadsAndWrites()
validateReadsRouteToTarget(t, "replica")
validateRoutingRule(t, "customer", "replica", sourceKs, targetKs)
validateWritesRouteToTarget(t)
validateRoutingRule(t, "customer", "", sourceKs, targetKs)
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)

(*mt).ReverseWrites()
validateReadsRouteToTarget(t, "replica")
validateRoutingRule(t, "customer", "replica", sourceKs, targetKs)
validateWritesRouteToSource(t)
validateRoutingRule(t, "customer", "", targetKs, sourceKs)
confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateReadsSwitched)

(*mt).ReverseReads()
validateReadsRouteToSource(t, "replica")
validateRoutingRule(t, "customer", "replica", targetKs, sourceKs)
validateWritesRouteToSource(t)
validateRoutingRule(t, "customer", "", targetKs, sourceKs)
confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)

// Confirm that everything is still in sync after our switch fest.
vdiff(t, targetKeyspace, workflowName, "zone1", false, true, nil)

(*mt).SwitchReadsAndWrites()
validateReadsRouteToTarget(t, "replica")
validateRoutingRule(t, "customer", "replica", sourceKs, targetKs)
validateWritesRouteToTarget(t)
validateRoutingRule(t, "customer", "", sourceKs, targetKs)
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)

(*mt).Complete()
confirmRoutingRulesExist(t)
Expand Down Expand Up @@ -443,6 +459,7 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards
}, workflowFlavorVtctld)

ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflowName)
wf := rs.(iWorkflow)
rs.Create()
validateReshardResponse(rs)
validateOverrides(t, targetTabs, overrides)
Expand Down Expand Up @@ -491,56 +508,72 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards
vdiff(t, keyspace, workflowName+"_reverse", "zone1", true, false, nil)
shardReadsRouteToTarget()
shardWritesRouteToTarget()
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)

rs.ReverseReadsAndWrites()
waitForLowLag(t, keyspace, workflowName)
vdiff(t, keyspace, workflowName, "zone1", false, true, nil)
shardReadsRouteToSource()
shardWritesRouteToSource()
confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched)

rs.SwitchReads()
shardReadsRouteToTarget()
shardWritesRouteToSource()
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)

rs.ReverseReads()
shardReadsRouteToSource()
shardWritesRouteToSource()
confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)

rs.SwitchReadsAndWrites()
shardReadsRouteToTarget()
shardWritesRouteToTarget()
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)

rs.ReverseReadsAndWrites()
shardReadsRouteToSource()
shardWritesRouteToSource()
confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched)

rs.SwitchReadsAndWrites()
shardReadsRouteToTarget()
shardWritesRouteToTarget()
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)

rs.ReverseReads()
shardReadsRouteToSource()
shardWritesRouteToTarget()
confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched)

rs.ReverseWrites()
shardReadsRouteToSource()
shardWritesRouteToSource()
confirmStates(t, &wf, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched)

rs.SwitchReadsAndWrites()
shardReadsRouteToTarget()
shardWritesRouteToTarget()
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)

rs.ReverseWrites()
shardReadsRouteToTarget()
shardWritesRouteToSource()
confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateReadsSwitched)

rs.ReverseReads()
shardReadsRouteToSource()
shardWritesRouteToSource()
confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)

// Confirm that everything is still in sync after our switch fest.
vdiff(t, keyspace, workflowName, "zone1", false, true, nil)

rs.SwitchReadsAndWrites()
shardReadsRouteToTarget()
shardWritesRouteToTarget()
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)

rs.Complete()
}
Expand Down Expand Up @@ -910,3 +943,8 @@ func testOneRoutingRulesCommand(t *testing.T, typ string, rules string, validate
})
}
}

func confirmStates(t *testing.T, workflow *iWorkflow, startState, endState string) {
require.Contains(t, (*workflow).GetLastOutput(), fmt.Sprintf("Start State: %s", startState))
require.Contains(t, (*workflow).GetLastOutput(), fmt.Sprintf("Current State: %s", endState))
}
19 changes: 19 additions & 0 deletions go/test/endtoend/vreplication/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type iWorkflow interface {
Flavor() string
GetLastOutput() string
Start()
Status()
Stop()
}

Expand Down Expand Up @@ -149,6 +150,11 @@ func (vmt *VtctlMoveTables) Show() {
panic("implement me")
}

func (vmt *VtctlMoveTables) Status() {
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
vmt.exec("Status")
}

func (vmt *VtctlMoveTables) exec(action string) {
options := &workflowExecOptions{
deferSecondaryKeys: false,
Expand Down Expand Up @@ -263,6 +269,10 @@ func (v VtctldMoveTables) Show() {
v.exec(args...)
}

func (v VtctldMoveTables) Status() {
v.exec("Status")
}

func (v VtctldMoveTables) SwitchReads() {
args := []string{"SwitchTraffic", "--tablet-types=rdonly,replica"}
args = append(args, v.switchFlags...)
Expand Down Expand Up @@ -379,6 +389,11 @@ func (vrs *VtctlReshard) MirrorTraffic() {
panic("implement me")
}

func (vrs *VtctlReshard) Status() {
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard
vrs.exec("Status")
}

func (vrs *VtctlReshard) SwitchReadsAndWrites() {
vrs.exec(workflowActionSwitchTraffic)
}
Expand Down Expand Up @@ -486,6 +501,10 @@ func (v VtctldReshard) Show() {
v.exec("Show")
}

func (v *VtctldReshard) Status() {
v.exec("Status")
}

func (v VtctldReshard) SwitchReads() {
args := []string{"SwitchTraffic"}
args = append(args, v.switchFlags...)
Expand Down

0 comments on commit 9dbb24e

Please sign in to comment.