Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow Status: change logic to determine whether MoveTables writes are switched #16731

43 changes: 28 additions & 15 deletions go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ const (
tabletUIDStep = 10
)

var defaultTabletTypes = []topodatapb.TabletType{
topodatapb.TabletType_PRIMARY,
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_RDONLY,
}

type testKeyspace struct {
KeyspaceName string
ShardNames []string
Expand Down Expand Up @@ -198,30 +204,38 @@ func (env *testEnv) addTablet(t *testing.T, ctx context.Context, id int, keyspac
return tablet
}

// addTableRoutingRules adds routing rules from the test env's source keyspace to
// its target keyspace for the given tablet types and tables.
func (env *testEnv) addTableRoutingRules(t *testing.T, ctx context.Context, tabletTypes []topodatapb.TabletType, tables []string) {
ks := env.targetKeyspace.KeyspaceName
rules := make(map[string][]string, len(tables)*(len(tabletTypes)*3))
func (env *testEnv) saveRoutingRules(t *testing.T, rules map[string][]string) {
err := topotools.SaveRoutingRules(context.Background(), env.ts, rules)
require.NoError(t, err)
err = env.ts.RebuildSrvVSchema(context.Background(), nil)
require.NoError(t, err)
}

func (env *testEnv) updateTableRoutingRules(t *testing.T, ctx context.Context,
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved
tabletTypes []topodatapb.TabletType, tables []string, sourceKeyspace, targetKeyspace, toKeyspace string) {

if len(tabletTypes) == 0 {
tabletTypes = defaultTabletTypes
}
rr, err := env.ts.GetRoutingRules(ctx)
require.NoError(t, err)
rules := topotools.GetRoutingRulesMap(rr)
for _, tabletType := range tabletTypes {
for _, tableName := range tables {
toTarget := []string{ks + "." + tableName}
toTarget := []string{toKeyspace + "." + tableName}
tt := strings.ToLower(tabletType.String())
if tabletType == topodatapb.TabletType_PRIMARY {
rules[tableName] = toTarget
rules[ks+"."+tableName] = toTarget
rules[env.sourceKeyspace.KeyspaceName+"."+tableName] = toTarget
rules[targetKeyspace+"."+tableName] = toTarget
rules[sourceKeyspace+"."+tableName] = toTarget
} else {
rules[tableName+"@"+tt] = toTarget
rules[ks+"."+tableName+"@"+tt] = toTarget
rules[env.sourceKeyspace.KeyspaceName+"."+tableName+"@"+tt] = toTarget
rules[targetKeyspace+"."+tableName+"@"+tt] = toTarget
rules[sourceKeyspace+"."+tableName+"@"+tt] = toTarget
}
}
}
err := topotools.SaveRoutingRules(ctx, env.ts, rules)
require.NoError(t, err)
err = env.ts.RebuildSrvVSchema(ctx, nil)
require.NoError(t, err)
env.saveRoutingRules(t, rules)
}

func (env *testEnv) deleteTablet(tablet *topodatapb.Tablet) {
Expand Down Expand Up @@ -301,7 +315,6 @@ func (tmc *testTMClient) GetWorkflowKey(keyspace, shard string) string {
func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

if expect := tmc.readVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil {
if !proto.Equal(expect, req) {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected ReadVReplicationWorkflow request: got %+v, want %+v", req, expect)
Expand Down
24 changes: 15 additions & 9 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ func (s *Server) GetCellsWithShardReadsSwitched(
// keyspace.
func (s *Server) GetCellsWithTableReadsSwitched(
ctx context.Context,
keyspace string,
sourceKeyspace string,
targetKeyspace string,
deepthi marked this conversation as resolved.
Show resolved Hide resolved
table string,
tabletType topodatapb.TabletType,
) (cellsSwitched []string, cellsNotSwitched []string, err error) {
Expand Down Expand Up @@ -329,7 +330,7 @@ func (s *Server) GetCellsWithTableReadsSwitched(
)

for _, rule := range srvVSchema.RoutingRules.Rules {
ruleName := fmt.Sprintf("%s.%s@%s", keyspace, table, strings.ToLower(tabletType.String()))
ruleName := fmt.Sprintf("%s.%s@%s", sourceKeyspace, table, strings.ToLower(tabletType.String()))
if rule.FromTable == ruleName {
found = true

Expand All @@ -340,7 +341,7 @@ func (s *Server) GetCellsWithTableReadsSwitched(
return nil, nil, err
}

if ks == keyspace {
if ks != sourceKeyspace {
switched = true
break // if one table in the workflow switched, we are done.
}
Expand Down Expand Up @@ -944,6 +945,10 @@ ORDER BY
}, nil
}

func (s *Server) GetWorkflowState(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, *State, error) {
return s.getWorkflowState(ctx, targetKeyspace, workflowName)
}

func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, *State, error) {
ts, err := s.buildTrafficSwitcher(ctx, targetKeyspace, workflowName)
if err != nil {
Expand Down Expand Up @@ -1013,12 +1018,12 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
}
}
} else {
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_RDONLY)
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_RDONLY)
if err != nil {
return nil, nil, err
}

state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_REPLICA)
state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_REPLICA)
if err != nil {
return nil, nil, err
}
Expand All @@ -1027,10 +1032,11 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
return nil, nil, err
}
for _, table := range ts.Tables() {
rr := globalRules[table]
// If a rule exists for the table and points to the target keyspace, then
// writes have been switched.
if len(rr) > 0 && rr[0] == fmt.Sprintf("%s.%s", targetKeyspace, table) {
// If a rule for the primary tablet type exists for any table and points to the target keyspace,
// then writes have been switched.
ruleKey := fmt.Sprintf("%s.%s", sourceKeyspace, table)
rr := globalRules[ruleKey]
if len(rr) > 0 && rr[0] != ruleKey {
state.WritesSwitched = true
break
}
Expand Down
38 changes: 24 additions & 14 deletions go/vt/vtctl/workflow/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,6 @@ func TestMoveTablesComplete(t *testing.T) {
tableTemplate := "CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))"
sourceKeyspaceName := "sourceks"
targetKeyspaceName := "targetks"
tabletTypes := []topodatapb.TabletType{
topodatapb.TabletType_PRIMARY,
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_RDONLY,
}
lockName := fmt.Sprintf("%s/%s", targetKeyspaceName, workflowName)
schema := map[string]*tabletmanagerdatapb.SchemaDefinition{
table1Name: {
Expand Down Expand Up @@ -640,7 +635,8 @@ func TestMoveTablesComplete(t *testing.T) {
tc.preFunc(t, env)
}
// Setup the routing rules as they would be after having previously done SwitchTraffic.
env.addTableRoutingRules(t, ctx, tabletTypes, []string{table1Name, table2Name, table3Name})
env.updateTableRoutingRules(t, ctx, nil, []string{table1Name, table2Name, table3Name},
tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName)
got, err := env.ws.MoveTablesComplete(ctx, tc.req)
if tc.wantErr != "" {
require.EqualError(t, err, tc.wantErr)
Expand Down Expand Up @@ -1103,7 +1099,8 @@ func TestMoveTablesTrafficSwitching(t *testing.T) {
} else {
env.tmc.reverse.Store(true)
// Setup the routing rules as they would be after having previously done SwitchTraffic.
env.addTableRoutingRules(t, ctx, tabletTypes, []string{tableName})
env.updateTableRoutingRules(t, ctx, tabletTypes, []string{tableName},
tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName)
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR)
for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, cutoverQR)
Expand Down Expand Up @@ -1317,7 +1314,8 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) {
} else {
env.tmc.reverse.Store(true)
// Setup the routing rules as they would be after having previously done SwitchTraffic.
env.addTableRoutingRules(t, ctx, tabletTypes, tables)
env.updateTableRoutingRules(t, ctx, tabletTypes, tables,
tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName)
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR)
for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR)
Expand Down Expand Up @@ -1356,6 +1354,15 @@ func TestMirrorTraffic(t *testing.T) {
topodatapb.TabletType_RDONLY,
}

initialRoutingRules := map[string][]string{
fmt.Sprintf("%s.%s", sourceKs, table1): {fmt.Sprintf("%s.%s", sourceKs, table1)},
fmt.Sprintf("%s.%s", sourceKs, table2): {fmt.Sprintf("%s.%s", sourceKs, table2)},
fmt.Sprintf("%s.%s@replica", sourceKs, table1): {fmt.Sprintf("%s.%s@replica", sourceKs, table1)},
fmt.Sprintf("%s.%s@replica", sourceKs, table2): {fmt.Sprintf("%s.%s@replica", sourceKs, table2)},
fmt.Sprintf("%s.%s@rdonly", sourceKs, table1): {fmt.Sprintf("%s.%s@rdonly", sourceKs, table1)},
fmt.Sprintf("%s.%s@rdonly", sourceKs, table2): {fmt.Sprintf("%s.%s@rdonly", sourceKs, table2)},
}

tests := []struct {
name string

Expand Down Expand Up @@ -1443,8 +1450,8 @@ func TestMirrorTraffic(t *testing.T) {
Percent: 50.0,
},
routingRules: map[string][]string{
fmt.Sprintf("%s.%s@rdonly", targetKs, table1): {fmt.Sprintf("%s.%s@rdonly", targetKs, table1)},
fmt.Sprintf("%s.%s@rdonly", targetKs, table2): {fmt.Sprintf("%s.%s@rdonly", targetKs, table2)},
fmt.Sprintf("%s.%s@rdonly", sourceKs, table1): {fmt.Sprintf("%s.%s@rdonly", targetKs, table1)},
fmt.Sprintf("%s.%s@rdonly", sourceKs, table2): {fmt.Sprintf("%s.%s@rdonly", targetKs, table2)},
},
wantErr: "cannot mirror [rdonly] traffic for workflow src2target at this time: traffic for those tablet types is switched",
wantMirrorRules: make(map[string]map[string]float32),
Expand All @@ -1458,8 +1465,8 @@ func TestMirrorTraffic(t *testing.T) {
Percent: 50.0,
},
routingRules: map[string][]string{
fmt.Sprintf("%s.%s@replica", targetKs, table1): {fmt.Sprintf("%s.%s@replica", targetKs, table1)},
fmt.Sprintf("%s.%s@replica", targetKs, table2): {fmt.Sprintf("%s.%s@replica", targetKs, table2)},
fmt.Sprintf("%s.%s@replica", sourceKs, table1): {fmt.Sprintf("%s.%s@replica", targetKs, table1)},
fmt.Sprintf("%s.%s@replica", sourceKs, table2): {fmt.Sprintf("%s.%s@replica", targetKs, table2)},
},
wantErr: "cannot mirror [replica] traffic for workflow src2target at this time: traffic for those tablet types is switched",
wantMirrorRules: make(map[string]map[string]float32),
Expand All @@ -1473,8 +1480,8 @@ func TestMirrorTraffic(t *testing.T) {
Percent: 50.0,
},
routingRules: map[string][]string{
table1: {fmt.Sprintf("%s.%s", targetKs, table1)},
table2: {fmt.Sprintf("%s.%s", targetKs, table2)},
fmt.Sprintf("%s.%s", sourceKs, table1): {fmt.Sprintf("%s.%s", targetKs, table1)},
fmt.Sprintf("%s.%s", sourceKs, table2): {fmt.Sprintf("%s.%s", targetKs, table2)},
},
wantErr: "cannot mirror [primary] traffic for workflow src2target at this time: traffic for those tablet types is switched",
wantMirrorRules: make(map[string]map[string]float32),
Expand Down Expand Up @@ -1555,6 +1562,7 @@ func TestMirrorTraffic(t *testing.T) {
TabletTypes: tabletTypes,
Percent: 50.0,
},
routingRules: initialRoutingRules,
wantMirrorRules: map[string]map[string]float32{
fmt.Sprintf("%s.%s", sourceKs, table1): {
fmt.Sprintf("%s.%s", targetKs, table1): 50.0,
Expand Down Expand Up @@ -1589,6 +1597,7 @@ func TestMirrorTraffic(t *testing.T) {
TabletTypes: tabletTypes,
Percent: 50.0,
},
routingRules: initialRoutingRules,
wantMirrorRules: map[string]map[string]float32{
fmt.Sprintf("%s.%s", sourceKs, table1): {
fmt.Sprintf("%s.%s", targetKs, table1): 50.0,
Expand Down Expand Up @@ -1626,6 +1635,7 @@ func TestMirrorTraffic(t *testing.T) {
fmt.Sprintf("%s.%s", targetKs, table1): 25.0,
},
},
routingRules: initialRoutingRules,
req: &vtctldatapb.WorkflowMirrorTrafficRequest{
Keyspace: targetKs,
Workflow: workflow,
Expand Down
Loading
Loading