Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow Status: change logic to determine whether MoveTables writes are switched #16731

37 changes: 22 additions & 15 deletions go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,30 +198,38 @@ func (env *testEnv) addTablet(t *testing.T, ctx context.Context, id int, keyspac
return tablet
}

// addTableRoutingRules adds routing rules from the test env's source keyspace to
// its target keyspace for the given tablet types and tables.
func (env *testEnv) addTableRoutingRules(t *testing.T, ctx context.Context, tabletTypes []topodatapb.TabletType, tables []string) {
ks := env.targetKeyspace.KeyspaceName
rules := make(map[string][]string, len(tables)*(len(tabletTypes)*3))
func (env *testEnv) saveRoutingRules(t *testing.T, rules map[string][]string) {
err := topotools.SaveRoutingRules(context.Background(), env.ts, rules)
require.NoError(t, err)
err = env.ts.RebuildSrvVSchema(context.Background(), nil)
require.NoError(t, err)
}

func (env *testEnv) updateTableRoutingRules(t *testing.T, ctx context.Context,
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved
tabletTypes []topodatapb.TabletType, tables []string, sourceKeyspace, targetKeyspace, toKeyspace string) {

if len(tabletTypes) == 0 {
tabletTypes = []topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY}
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved
}
rr, err := env.ts.GetRoutingRules(ctx)
require.NoError(t, err)
rules := topotools.GetRoutingRulesMap(rr)
for _, tabletType := range tabletTypes {
for _, tableName := range tables {
toTarget := []string{ks + "." + tableName}
toTarget := []string{toKeyspace + "." + tableName}
tt := strings.ToLower(tabletType.String())
if tabletType == topodatapb.TabletType_PRIMARY {
rules[tableName] = toTarget
rules[ks+"."+tableName] = toTarget
rules[env.sourceKeyspace.KeyspaceName+"."+tableName] = toTarget
rules[targetKeyspace+"."+tableName] = toTarget
rules[sourceKeyspace+"."+tableName] = toTarget
} else {
rules[tableName+"@"+tt] = toTarget
rules[ks+"."+tableName+"@"+tt] = toTarget
rules[env.sourceKeyspace.KeyspaceName+"."+tableName+"@"+tt] = toTarget
rules[targetKeyspace+"."+tableName+"@"+tt] = toTarget
rules[sourceKeyspace+"."+tableName+"@"+tt] = toTarget
}
}
}
err := topotools.SaveRoutingRules(ctx, env.ts, rules)
require.NoError(t, err)
err = env.ts.RebuildSrvVSchema(ctx, nil)
require.NoError(t, err)
env.saveRoutingRules(t, rules)
}

func (env *testEnv) deleteTablet(tablet *topodatapb.Tablet) {
Expand Down Expand Up @@ -301,7 +309,6 @@ func (tmc *testTMClient) GetWorkflowKey(keyspace, shard string) string {
func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

if expect := tmc.readVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil {
if !proto.Equal(expect, req) {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected ReadVReplicationWorkflow request: got %+v, want %+v", req, expect)
Expand Down
23 changes: 14 additions & 9 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ func (s *Server) GetCellsWithShardReadsSwitched(
// keyspace.
func (s *Server) GetCellsWithTableReadsSwitched(
ctx context.Context,
keyspace string,
sourceKeyspace string,
targetKeyspace string,
deepthi marked this conversation as resolved.
Show resolved Hide resolved
table string,
tabletType topodatapb.TabletType,
) (cellsSwitched []string, cellsNotSwitched []string, err error) {
Expand Down Expand Up @@ -329,7 +330,7 @@ func (s *Server) GetCellsWithTableReadsSwitched(
)

for _, rule := range srvVSchema.RoutingRules.Rules {
ruleName := fmt.Sprintf("%s.%s@%s", keyspace, table, strings.ToLower(tabletType.String()))
ruleName := fmt.Sprintf("%s.%s@%s", sourceKeyspace, table, strings.ToLower(tabletType.String()))
if rule.FromTable == ruleName {
found = true

Expand All @@ -340,7 +341,7 @@ func (s *Server) GetCellsWithTableReadsSwitched(
return nil, nil, err
}

if ks == keyspace {
if ks != sourceKeyspace {
switched = true
break // if one table in the workflow switched, we are done.
}
Expand Down Expand Up @@ -944,6 +945,10 @@ ORDER BY
}, nil
}

func (s *Server) GetWorkflowState(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, *State, error) {
return s.getWorkflowState(ctx, targetKeyspace, workflowName)
}

func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, *State, error) {
ts, err := s.buildTrafficSwitcher(ctx, targetKeyspace, workflowName)
if err != nil {
Expand Down Expand Up @@ -1013,12 +1018,12 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
}
}
} else {
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_RDONLY)
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_RDONLY)
if err != nil {
return nil, nil, err
}

state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_REPLICA)
state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_REPLICA)
if err != nil {
return nil, nil, err
}
Expand All @@ -1027,10 +1032,10 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
return nil, nil, err
}
for _, table := range ts.Tables() {
rr := globalRules[table]
// If a rule exists for the table and points to the target keyspace, then
// writes have been switched.
if len(rr) > 0 && rr[0] == fmt.Sprintf("%s.%s", targetKeyspace, table) {
// If a rule for the primary tablet type exists for any table and points to the target keyspace,
// then writes have been switched.
rr := globalRules[fmt.Sprintf("%s.%s", sourceKeyspace, table)]
if len(rr) > 0 && rr[0] != fmt.Sprintf("%s.%s", sourceKeyspace, table) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can calculate this key once. That's more efficient and prevents any accidental drift.

state.WritesSwitched = true
break
}
Expand Down
38 changes: 24 additions & 14 deletions go/vt/vtctl/workflow/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,6 @@ func TestMoveTablesComplete(t *testing.T) {
tableTemplate := "CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))"
sourceKeyspaceName := "sourceks"
targetKeyspaceName := "targetks"
tabletTypes := []topodatapb.TabletType{
topodatapb.TabletType_PRIMARY,
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_RDONLY,
}
lockName := fmt.Sprintf("%s/%s", targetKeyspaceName, workflowName)
schema := map[string]*tabletmanagerdatapb.SchemaDefinition{
table1Name: {
Expand Down Expand Up @@ -640,7 +635,8 @@ func TestMoveTablesComplete(t *testing.T) {
tc.preFunc(t, env)
}
// Setup the routing rules as they would be after having previously done SwitchTraffic.
env.addTableRoutingRules(t, ctx, tabletTypes, []string{table1Name, table2Name, table3Name})
env.updateTableRoutingRules(t, ctx, nil, []string{table1Name, table2Name, table3Name},
tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName)
got, err := env.ws.MoveTablesComplete(ctx, tc.req)
if tc.wantErr != "" {
require.EqualError(t, err, tc.wantErr)
Expand Down Expand Up @@ -1103,7 +1099,8 @@ func TestMoveTablesTrafficSwitching(t *testing.T) {
} else {
env.tmc.reverse.Store(true)
// Setup the routing rules as they would be after having previously done SwitchTraffic.
env.addTableRoutingRules(t, ctx, tabletTypes, []string{tableName})
env.updateTableRoutingRules(t, ctx, tabletTypes, []string{tableName},
tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName)
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR)
for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, cutoverQR)
Expand Down Expand Up @@ -1317,7 +1314,8 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) {
} else {
env.tmc.reverse.Store(true)
// Setup the routing rules as they would be after having previously done SwitchTraffic.
env.addTableRoutingRules(t, ctx, tabletTypes, tables)
env.updateTableRoutingRules(t, ctx, tabletTypes, tables,
tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName)
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR)
for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR)
Expand Down Expand Up @@ -1356,6 +1354,15 @@ func TestMirrorTraffic(t *testing.T) {
topodatapb.TabletType_RDONLY,
}

initialRoutingRules := map[string][]string{
fmt.Sprintf("%s.%s", sourceKs, table1): {fmt.Sprintf("%s.%s", sourceKs, table1)},
fmt.Sprintf("%s.%s", sourceKs, table2): {fmt.Sprintf("%s.%s", sourceKs, table2)},
fmt.Sprintf("%s.%s@replica", sourceKs, table1): {fmt.Sprintf("%s.%s@replica", sourceKs, table1)},
fmt.Sprintf("%s.%s@replica", sourceKs, table2): {fmt.Sprintf("%s.%s@replica", sourceKs, table2)},
fmt.Sprintf("%s.%s@rdonly", sourceKs, table1): {fmt.Sprintf("%s.%s@rdonly", sourceKs, table1)},
fmt.Sprintf("%s.%s@rdonly", sourceKs, table2): {fmt.Sprintf("%s.%s@rdonly", sourceKs, table2)},
}

tests := []struct {
name string

Expand Down Expand Up @@ -1443,8 +1450,8 @@ func TestMirrorTraffic(t *testing.T) {
Percent: 50.0,
},
routingRules: map[string][]string{
fmt.Sprintf("%s.%s@rdonly", targetKs, table1): {fmt.Sprintf("%s.%s@rdonly", targetKs, table1)},
fmt.Sprintf("%s.%s@rdonly", targetKs, table2): {fmt.Sprintf("%s.%s@rdonly", targetKs, table2)},
fmt.Sprintf("%s.%s@rdonly", sourceKs, table1): {fmt.Sprintf("%s.%s@rdonly", targetKs, table1)},
fmt.Sprintf("%s.%s@rdonly", sourceKs, table2): {fmt.Sprintf("%s.%s@rdonly", targetKs, table2)},
},
wantErr: "cannot mirror [rdonly] traffic for workflow src2target at this time: traffic for those tablet types is switched",
wantMirrorRules: make(map[string]map[string]float32),
Expand All @@ -1458,8 +1465,8 @@ func TestMirrorTraffic(t *testing.T) {
Percent: 50.0,
},
routingRules: map[string][]string{
fmt.Sprintf("%s.%s@replica", targetKs, table1): {fmt.Sprintf("%s.%s@replica", targetKs, table1)},
fmt.Sprintf("%s.%s@replica", targetKs, table2): {fmt.Sprintf("%s.%s@replica", targetKs, table2)},
fmt.Sprintf("%s.%s@replica", sourceKs, table1): {fmt.Sprintf("%s.%s@replica", targetKs, table1)},
fmt.Sprintf("%s.%s@replica", sourceKs, table2): {fmt.Sprintf("%s.%s@replica", targetKs, table2)},
},
wantErr: "cannot mirror [replica] traffic for workflow src2target at this time: traffic for those tablet types is switched",
wantMirrorRules: make(map[string]map[string]float32),
Expand All @@ -1473,8 +1480,8 @@ func TestMirrorTraffic(t *testing.T) {
Percent: 50.0,
},
routingRules: map[string][]string{
table1: {fmt.Sprintf("%s.%s", targetKs, table1)},
table2: {fmt.Sprintf("%s.%s", targetKs, table2)},
fmt.Sprintf("%s.%s", sourceKs, table1): {fmt.Sprintf("%s.%s", targetKs, table1)},
fmt.Sprintf("%s.%s", sourceKs, table2): {fmt.Sprintf("%s.%s", targetKs, table2)},
},
wantErr: "cannot mirror [primary] traffic for workflow src2target at this time: traffic for those tablet types is switched",
wantMirrorRules: make(map[string]map[string]float32),
Expand Down Expand Up @@ -1555,6 +1562,7 @@ func TestMirrorTraffic(t *testing.T) {
TabletTypes: tabletTypes,
Percent: 50.0,
},
routingRules: initialRoutingRules,
wantMirrorRules: map[string]map[string]float32{
fmt.Sprintf("%s.%s", sourceKs, table1): {
fmt.Sprintf("%s.%s", targetKs, table1): 50.0,
Expand Down Expand Up @@ -1589,6 +1597,7 @@ func TestMirrorTraffic(t *testing.T) {
TabletTypes: tabletTypes,
Percent: 50.0,
},
routingRules: initialRoutingRules,
wantMirrorRules: map[string]map[string]float32{
fmt.Sprintf("%s.%s", sourceKs, table1): {
fmt.Sprintf("%s.%s", targetKs, table1): 50.0,
Expand Down Expand Up @@ -1626,6 +1635,7 @@ func TestMirrorTraffic(t *testing.T) {
fmt.Sprintf("%s.%s", targetKs, table1): 25.0,
},
},
routingRules: initialRoutingRules,
req: &vtctldatapb.WorkflowMirrorTrafficRequest{
Keyspace: targetKs,
Workflow: workflow,
Expand Down
Loading
Loading