From c1ff0ae8ac8f0a70cc321d48e1fb6c39192b660f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 31 Oct 2024 17:02:36 -0400 Subject: [PATCH 1/7] Relax restrictions on Cancel and ReverseTraffic when writes not involved Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 1828b0af814..8086c1c0584 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -148,9 +148,9 @@ var ( // ErrMultipleTargetKeyspaces occurs when a workflow somehow has multiple // target keyspaces across different shard primaries. This should be // impossible. - ErrMultipleTargetKeyspaces = errors.New("multiple target keyspaces for a single workflow") - ErrWorkflowNotFullySwitched = errors.New("cannot complete workflow because you have not yet switched all read and write traffic") - ErrWorkflowPartiallySwitched = errors.New("cannot cancel workflow because you have already switched some or all read and write traffic") + ErrMultipleTargetKeyspaces = errors.New("multiple target keyspaces for a single workflow") + ErrWorkflowCompleteNotFullySwitched = errors.New("cannot complete workflow because you have not yet switched all read and write traffic") + ErrWorkflowDeleteWritesSwitched = errors.New("cannot delete workflow because you have already switched write traffic") ) // Server provides an API to work with Vitess workflows, like vreplication @@ -1754,7 +1754,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa } if !state.WritesSwitched || len(state.ReplicaCellsNotSwitched) > 0 || len(state.RdonlyCellsNotSwitched) > 0 { - return nil, ErrWorkflowNotFullySwitched + return nil, ErrWorkflowCompleteNotFullySwitched } var renameTable TableRemovalType if req.RenameTables { @@ -2129,10 +2129,12 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe } if ts.workflowType != binlogdatapb.VReplicationWorkflowType_CreateLookupIndex { - // Return an error if the workflow traffic is partially switched. - if state.WritesSwitched || len(state.ReplicaCellsSwitched) > 0 || len(state.RdonlyCellsSwitched) > 0 { - return nil, ErrWorkflowPartiallySwitched + // Return an error if the write workflow traffic is switched. + if state.WritesSwitched { + return nil, ErrWorkflowDeleteWritesSwitched } + // If only reads have been switched, then we can delete the + // workflow and its routing rules. } // Lock the workflow for deletion. @@ -3300,14 +3302,6 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid action for Migrate workflow: SwitchTraffic") } - if direction == DirectionBackward && ts.IsMultiTenantMigration() { - // In a multi-tenant migration, multiple migrations would be writing to the same - // table, so we can't stop writes like we do with MoveTables, using denied tables, - // since it would block all other migrations as well as traffic for tenants which - // have already been migrated. - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot reverse traffic for multi-tenant migrations") - } - // We need this to know when there isn't a (non-FROZEN) reverse workflow to use. onlySwitchingReads := !startState.WritesSwitched && !switchPrimary @@ -3316,6 +3310,13 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor (direction == DirectionBackward && !startState.WritesSwitched) if direction == DirectionBackward && !onlySwitchingReads { + if ts.IsMultiTenantMigration() { + // In a multi-tenant migration, multiple migrations would be writing to the same + // table, so we can't stop writes like we do with MoveTables, using denied tables, + // since it would block all other migrations as well as traffic for tenants which + // have already been migrated. + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot reverse write traffic for multi-tenant migrations") + } // This means that the main workflow is FROZEN and the reverse workflow // exists. So we update the starting state so that we're using the reverse // workflow and we can move forward with a normal traffic switch forward From 252b6c26efead52ea81050006c7e00e4569331cb Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 31 Oct 2024 17:47:38 -0400 Subject: [PATCH 2/7] Add unit tests Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 43 ++-- go/vt/vtctl/workflow/server_test.go | 306 ++++++++++++++++++++++++++-- go/vt/vtctl/workflow/utils.go | 2 +- 3 files changed, 314 insertions(+), 37 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 8086c1c0584..14937d0d2d3 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -2178,22 +2178,21 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe ts.workflowType) } // We need to delete the rows that the target tables would have for the tenant. - // We don't cleanup other related artifacts since they are not tied to the tenant. if !req.GetKeepData() { if err := s.deleteTenantData(ctx, ts, req.DeleteBatchSize); err != nil { return nil, vterrors.Wrapf(err, "failed to fully delete all migrated data for tenant %s, please retry the operation", ts.options.TenantId) } } - } else { - // Cleanup related data and artifacts. There are none for a LookupVindex workflow. - if ts.workflowType != binlogdatapb.VReplicationWorkflowType_CreateLookupIndex { - if _, err := s.dropTargets(ctx, ts, req.GetKeepData(), req.GetKeepRoutingRules(), false); err != nil { - if topo.IsErrType(err, topo.NoNode) { - return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.GetKeyspace()) - } - return nil, err + } + + // Cleanup related data and artifacts. There are none for a LookupVindex workflow. + if ts.workflowType != binlogdatapb.VReplicationWorkflowType_CreateLookupIndex { + if _, err := s.dropTargets(ctx, ts, req.GetKeepData(), req.GetKeepRoutingRules(), false); err != nil { + if topo.IsErrType(err, topo.NoNode) { + return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.GetKeyspace()) } + return nil, err } } @@ -3302,6 +3301,15 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid action for Migrate workflow: SwitchTraffic") } + if ts.IsMultiTenantMigration() { + // Multi-tenant migrations use keyspace routing rules, so we need to update the state + // using them. + err = updateKeyspaceRoutingState(ctx, ts.TopoServer(), ts.sourceKeyspace, ts.targetKeyspace, startState) + if err != nil { + return nil, vterrors.Wrap(err, "failed to update multi-tenant workflow state using keyspace routing rules") + } + } + // We need this to know when there isn't a (non-FROZEN) reverse workflow to use. onlySwitchingReads := !startState.WritesSwitched && !switchPrimary @@ -3398,6 +3406,15 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor resp.StartState = startState.String() s.Logger().Infof("Before reloading workflow state after switching traffic: %+v\n", resp.StartState) _, currentState, err := s.getWorkflowState(ctx, ts.targetKeyspace, ts.workflow) + if ts.IsMultiTenantMigration() { + // Multi-tenant migrations use keyspace routing rules, so we need to update the state + // using them. + sourceKs, targetKs := ts.sourceKeyspace, ts.targetKeyspace + if TrafficSwitchDirection(req.Direction) == DirectionBackward { + sourceKs, targetKs = targetKs, sourceKs + } + err = updateKeyspaceRoutingState(ctx, ts.TopoServer(), sourceKs, targetKs, currentState) + } if err != nil { resp.CurrentState = fmt.Sprintf("Error reloading workflow state after switching traffic: %v", err) } else { @@ -3449,11 +3466,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc // shard level traffic switching is all or nothing trafficSwitchingIsAllOrNothing = true case ts.MigrationType() == binlogdatapb.MigrationType_TABLES && ts.IsMultiTenantMigration(): - if direction == DirectionBackward { - return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, - "requesting reversal of read traffic for multi-tenant migrations is not supported")) - } - // For multi-tenant migrations, we only support switching traffic to all cells at once + // For multi-tenant migrations, we only support switching traffic to all cells at once. allCells, err := ts.TopoServer().GetCellInfoNames(ctx) if err != nil { return nil, err @@ -3477,7 +3490,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } if direction == DirectionBackward && switchRdonly && len(state.RdonlyCellsSwitched) == 0 { return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, - "requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")) + "requesting reversal of read traffic for RDONLYs but RDONLY reads have not been switched")) } } diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index b7783fc2945..93045133daa 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -753,6 +753,160 @@ func TestWorkflowDelete(t *testing.T) { postFunc func(t *testing.T, env *testEnv) expectedLogs []string }{ + { + name: "delete workflow", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowDeleteRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + }, + expectedSourceQueries: []*queryResult{ + { + query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", + sourceKeyspaceName, ReverseWorkflowName(workflowName)), + result: &querypb.QueryResult{}, + }, + }, + expectedTargetQueries: []*queryResult{ + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table1Name), + result: &querypb.QueryResult{}, + }, + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table2Name), + result: &querypb.QueryResult{}, + }, + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table3Name), + result: &querypb.QueryResult{}, + }, + }, + want: &vtctldatapb.WorkflowDeleteResponse{ + Summary: fmt.Sprintf("Successfully cancelled the %s workflow in the %s keyspace", + workflowName, targetKeyspaceName), + Details: []*vtctldatapb.WorkflowDeleteResponse_TabletInfo{ + { + Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID}, + Deleted: true, + }, + { + Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID + tabletUIDStep}, + Deleted: true, + }, + }, + }, + }, + { + name: "delete workflow with only reads switched", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowDeleteRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + }, + expectedSourceQueries: []*queryResult{ + { + query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", + sourceKeyspaceName, ReverseWorkflowName(workflowName)), + result: &querypb.QueryResult{}, + }, + }, + expectedTargetQueries: []*queryResult{ + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table1Name), + result: &querypb.QueryResult{}, + }, + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table2Name), + result: &querypb.QueryResult{}, + }, + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table3Name), + result: &querypb.QueryResult{}, + }, + }, + preFunc: func(t *testing.T, env *testEnv) { + // Setup the routing rules as they would be after having previously done SwitchTraffic + // for replica and rdonly tablets. + env.updateTableRoutingRules(t, ctx, roTabletTypes, []string{table1Name, table2Name, table3Name}, + sourceKeyspaceName, targetKeyspaceName, targetKeyspaceName) + }, + want: &vtctldatapb.WorkflowDeleteResponse{ + Summary: fmt.Sprintf("Successfully cancelled the %s workflow in the %s keyspace", + workflowName, targetKeyspaceName), + Details: []*vtctldatapb.WorkflowDeleteResponse_TabletInfo{ + { + Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID}, + Deleted: true, + }, + { + Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID + tabletUIDStep}, + Deleted: true, + }, + }, + }, + }, + { + name: "delete workflow with writes switched", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowDeleteRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + }, + expectedSourceQueries: []*queryResult{ + { + query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", + sourceKeyspaceName, ReverseWorkflowName(workflowName)), + result: &querypb.QueryResult{}, + }, + }, + expectedTargetQueries: []*queryResult{ + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table1Name), + result: &querypb.QueryResult{}, + }, + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table2Name), + result: &querypb.QueryResult{}, + }, + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table3Name), + result: &querypb.QueryResult{}, + }, + }, + preFunc: func(t *testing.T, env *testEnv) { + // Setup the routing rules as they would be after having previously + // done SwitchTraffic with for all tablet types. + env.updateTableRoutingRules(t, ctx, allTabletTypes, []string{table1Name, table2Name, table3Name}, + sourceKeyspaceName, targetKeyspaceName, targetKeyspaceName) + }, + postFunc: func(t *testing.T, env *testEnv) { + // Clear out the routing rules we put in place. + err := env.ts.SaveRoutingRules(ctx, &vschemapb.RoutingRules{}) + require.NoError(t, err) + }, + wantErr: ErrWorkflowDeleteWritesSwitched.Error(), + }, { name: "missing table", sourceKeyspace: &testKeyspace{ @@ -983,7 +1137,7 @@ func TestWorkflowDelete(t *testing.T) { wantErr: "unsupported workflow type \"Reshard\" for multi-tenant migration", }, { - name: "multi-tenant workflow without predicate ", + name: "multi-tenant workflow without predicate", sourceKeyspace: &testKeyspace{ KeyspaceName: sourceKeyspaceName, ShardNames: []string{"0"}, @@ -1339,6 +1493,7 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { name string sourceKeyspace, targetKeyspace *testKeyspace req *vtctldatapb.WorkflowSwitchTrafficRequest + multiTenant bool preFunc func(env *testEnv) want *vtctldatapb.WorkflowSwitchTrafficResponse wantErr bool @@ -1409,6 +1564,48 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { CurrentState: "Reads Not Switched. Writes Not Switched", }, }, + { + name: "backward for multi-tenant workflow and read-only tablets", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowSwitchTrafficRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + Direction: int32(DirectionBackward), + TabletTypes: roTabletTypes, + }, + multiTenant: true, + want: &vtctldatapb.WorkflowSwitchTrafficResponse{ + Summary: fmt.Sprintf("ReverseTraffic was successful for workflow %s.%s", targetKeyspaceName, workflowName), + StartState: "All Reads Switched. Writes Not Switched", + CurrentState: "Reads Not Switched. Writes Not Switched", + }, + }, + { + name: "backward for multi-tenant workflow for all tablet types", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + multiTenant: true, + req: &vtctldatapb.WorkflowSwitchTrafficRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + Direction: int32(DirectionBackward), + TabletTypes: allTabletTypes, + }, + wantErr: true, + }, { name: "forward with tablet refresh error", sourceKeyspace: &testKeyspace{ @@ -1459,6 +1656,7 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { }, }, } + for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { require.NotNil(t, tc.sourceKeyspace) @@ -1467,6 +1665,7 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { env := newTestEnv(t, ctx, defaultCellName, tc.sourceKeyspace, tc.targetKeyspace) defer env.close() env.tmc.schema = schema + if tc.req.Direction == int32(DirectionForward) { env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, copyTableQR) env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, cutoverQR) @@ -1508,9 +1707,47 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, freezeReverseWFQR) } } + if tc.preFunc != nil { tc.preFunc(env) } + + if tc.multiTenant { + rwr := &readVReplicationWorkflowRequestResponse{ + req: &tabletmanagerdatapb.ReadVReplicationWorkflowRequest{ + Workflow: workflowName, + }, + res: &tabletmanagerdatapb.ReadVReplicationWorkflowResponse{ + Options: `{"tenant_id": "1"}`, // This is all we need for it to be considered a multi-tenant workflow + Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + { + Id: 1, + Bls: &binlogdatapb.BinlogSource{ + Keyspace: sourceKeyspaceName, + Shard: "0", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "t1", + Filter: "select * from t1", + }, + }, + }, + }, + }, + }, + }, + } + env.tmc.expectReadVReplicationWorkflowRequestOnTargetTablets(rwr) + // Multi-tenant workflows also use keyspace routing rules. So we set those + // up as if we've already switched the traffic. + if tc.req.Direction == int32(DirectionBackward) { + err := changeKeyspaceRouting(ctx, env.ts, tc.req.TabletTypes, tc.sourceKeyspace.KeyspaceName, + tc.targetKeyspace.KeyspaceName, "SwitchTraffic") + require.NoError(t, err) + } + } + got, err := env.ws.WorkflowSwitchTraffic(ctx, tc.req) if tc.wantErr { require.Error(t, err) @@ -1519,34 +1756,61 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { require.NoError(t, err) require.Equal(t, tc.want.String(), got.String(), "Server.WorkflowSwitchTraffic() = %v, want %v", got, tc.want) - // Confirm that we have the expected routing rules. - rr, err := env.ts.GetRoutingRules(ctx) - require.NoError(t, err) - for _, rr := range rr.Rules { - _, rrTabletType, found := strings.Cut(rr.FromTable, "@") - if !found { // No @ is primary - rrTabletType = topodatapb.TabletType_PRIMARY.String() + if tc.multiTenant { // Confirm the keyspace routing rules + gotKrrs, err := env.ts.GetKeyspaceRoutingRules(ctx) + require.NoError(t, err) + sort.Slice(gotKrrs.Rules, func(i, j int) bool { + return gotKrrs.Rules[i].FromKeyspace < gotKrrs.Rules[j].FromKeyspace + }) + expectedKrrs := &vschemapb.KeyspaceRoutingRules{} + for _, tabletType := range tc.req.TabletTypes { + suffix := "" + if tabletType != topodatapb.TabletType_PRIMARY { + suffix = fmt.Sprintf("@%s", strings.ToLower(tabletType.String())) + } + toKs, fromKs := tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName + if tc.req.Direction == int32(DirectionBackward) { + fromKs, toKs = toKs, fromKs + } + expectedKrrs.Rules = append(expectedKrrs.Rules, &vschemapb.KeyspaceRoutingRule{ + FromKeyspace: fromKs + suffix, + ToKeyspace: toKs, + }) } - tabletType, err := topoproto.ParseTabletType(rrTabletType) + sort.Slice(expectedKrrs.Rules, func(i, j int) bool { + return expectedKrrs.Rules[i].FromKeyspace < expectedKrrs.Rules[j].FromKeyspace + }) + require.Equal(t, expectedKrrs.String(), gotKrrs.String()) + } else { // Confirm the [table] routing rules + rr, err := env.ts.GetRoutingRules(ctx) require.NoError(t, err) + for _, rr := range rr.Rules { + _, rrTabletType, found := strings.Cut(rr.FromTable, "@") + if !found { // No @ is primary + rrTabletType = topodatapb.TabletType_PRIMARY.String() + } + tabletType, err := topoproto.ParseTabletType(rrTabletType) + require.NoError(t, err) - var to string - if slices.Contains(tc.req.TabletTypes, tabletType) { - to = fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName) - if tc.req.Direction == int32(DirectionBackward) { + var to string + if slices.Contains(tc.req.TabletTypes, tabletType) { + to = fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName) + if tc.req.Direction == int32(DirectionBackward) { + to = fmt.Sprintf("%s.%s", tc.sourceKeyspace.KeyspaceName, tableName) + } + } else { to = fmt.Sprintf("%s.%s", tc.sourceKeyspace.KeyspaceName, tableName) + if tc.req.Direction == int32(DirectionBackward) { + to = fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName) + } } - } else { - to = fmt.Sprintf("%s.%s", tc.sourceKeyspace.KeyspaceName, tableName) - if tc.req.Direction == int32(DirectionBackward) { - to = fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName) + for _, tt := range rr.ToTables { + require.Equal(t, to, tt, "Additional info: tablet type: %s, rr.FromTable: %s, rr.ToTables: %v, to string: %s", + tabletType.String(), rr.FromTable, rr.ToTables, to) } } - for _, tt := range rr.ToTables { - require.Equal(t, to, tt, "Additional info: tablet type: %s, rr.FromTable: %s, rr.ToTables: %v, to string: %s", - tabletType.String(), rr.FromTable, rr.ToTables, to) - } } + // Confirm that we have the expected denied tables entries. if slices.Contains(tc.req.TabletTypes, topodatapb.TabletType_PRIMARY) { for _, keyspace := range []*testKeyspace{tc.sourceKeyspace, tc.targetKeyspace} { diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 5021e3938c8..3dc2c021578 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -910,7 +910,7 @@ func validateTenantId(dataType querypb.Type, value string) error { } func updateKeyspaceRoutingState(ctx context.Context, ts *topo.Server, sourceKeyspace, targetKeyspace string, state *State) error { - // For multi-tenant migrations, we only support switching traffic to all cells at once + // For multi-tenant migrations, we only support switching traffic to all cells at once. cells, err := ts.GetCellInfoNames(ctx) if err != nil { return err From 42d0a232068a72aee4bce755b6f25b9d81a07ea9 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 31 Oct 2024 22:31:10 -0400 Subject: [PATCH 3/7] Do NOT drop target tables on multi-tenant delete/cancel Signed-off-by: Matt Lord --- .../vreplication_vtctldclient_cli_test.go | 57 ++++++++++++++----- go/vt/vtctl/workflow/server.go | 6 +- 2 files changed, 47 insertions(+), 16 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go index 4b556955815..d209a66a831 100644 --- a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go +++ b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go @@ -304,6 +304,17 @@ func testMoveTablesFlags3(t *testing.T, sourceKeyspace, targetKeyspace string, t // Confirm that the source tables were renamed. require.True(t, checkTablesExist(t, "zone1-100", []string{"_customer2_old"})) require.False(t, checkTablesExist(t, "zone1-100", []string{"customer2"})) + + // Confirm that we can cancel a workflow after ONLY switching read traffic. + mt = createMoveTables(t, sourceKeyspace, targetKeyspace, workflowName, "customer", createFlags, nil, nil) + mt.Start() // Need to start because we set stop-after-copy to true. + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) + for _, tab := range targetTabs { + catchup(t, tab, workflowName, "MoveTables") + } + mt.SwitchReads() + mt.Cancel() + confirmNoRoutingRules(t) } // Create two workflows in order to confirm that listing all workflows works. @@ -450,20 +461,36 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards "--all-cells", "--format=json", "--config-overrides", mapToCSV(overrides), } - rs := newReshard(vc, &reshardWorkflow{ - workflowInfo: &workflowInfo{ - vc: vc, - workflowName: workflowName, - targetKeyspace: keyspace, - }, - sourceShards: sourceShards, - targetShards: targetShards, - createFlags: createFlags, - }, workflowFlavorVtctld) + var rs iReshard + var wf iWorkflow + createWorkflow := func() { + rs = newReshard(vc, &reshardWorkflow{ + workflowInfo: &workflowInfo{ + vc: vc, + workflowName: workflowName, + targetKeyspace: keyspace, + }, + sourceShards: sourceShards, + targetShards: targetShards, + createFlags: createFlags, + }, workflowFlavorVtctld) + wf = rs.(iWorkflow) + rs.Create() + } + + // First test that we can create a workflow, switch ONLY reads, and then cancel it. + createWorkflow() + rs.SwitchReads() + validateReadsRouteToTarget(t, "replica") + validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateTableRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched) + rs.Cancel() + confirmNoRoutingRules(t) + + createWorkflow() ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflowName) - wf := rs.(iWorkflow) - rs.Create() validateReshardResponse(rs) validateOverrides(t, targetTabs, overrides) workflowResponse := getWorkflow(keyspace, workflowName) @@ -769,8 +796,10 @@ func getRoutingRules(t *testing.T) *vschemapb.RoutingRules { } func confirmNoRoutingRules(t *testing.T) { - routingRulesResponse := getRoutingRules(t) - require.Zero(t, len(routingRulesResponse.Rules)) + rrRes := getRoutingRules(t) + require.Zero(t, len(rrRes.Rules)) + krrRes := getKeyspaceRoutingRules(t, vc) + require.Zero(t, len(krrRes.Rules)) } func confirmRoutingRulesExist(t *testing.T) { diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 14937d0d2d3..d5473431e41 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -2759,8 +2759,10 @@ func (s *Server) dropTargets(ctx context.Context, ts *trafficSwitcher, keepData, if !keepData { switch ts.MigrationType() { case binlogdatapb.MigrationType_TABLES: - if err := sw.removeTargetTables(ctx); err != nil { - return nil, err + if !ts.IsMultiTenantMigration() { + if err := sw.removeTargetTables(ctx); err != nil { + return nil, err + } } if err := sw.dropSourceDeniedTables(ctx); err != nil { return nil, err From d8ad0ba8cadb7e50684c30219d7a06737763dadb Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 31 Oct 2024 22:43:39 -0400 Subject: [PATCH 4/7] Fix e2e tests Signed-off-by: Matt Lord --- .../vreplication/multi_tenant_test.go | 12 +++++ .../vreplication_vtctldclient_cli_test.go | 45 +++++++------------ 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/go/test/endtoend/vreplication/multi_tenant_test.go b/go/test/endtoend/vreplication/multi_tenant_test.go index c82ee8a620f..6bceaeefc6e 100644 --- a/go/test/endtoend/vreplication/multi_tenant_test.go +++ b/go/test/endtoend/vreplication/multi_tenant_test.go @@ -214,6 +214,18 @@ func TestMultiTenantSimple(t *testing.T) { require.Zero(t, rowCount) }) + t.Run("cancel after switching reads", func(t *testing.T) { + // First let's test canceling the workflow after only switching reads + // to ensure that it properly cleans up all of the state. + createFunc() + mt.SwitchReads() + confirmOnlyReadsSwitched(t) + mt.Cancel() + confirmNoRoutingRules(t) + rowCount := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", targetKeyspace, "t1")) + require.Zero(t, rowCount) + }) + // Create again and run it to completion. createFunc() diff --git a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go index d209a66a831..4ee977c4d74 100644 --- a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go +++ b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go @@ -313,6 +313,11 @@ func testMoveTablesFlags3(t *testing.T, sourceKeyspace, targetKeyspace string, t catchup(t, tab, workflowName, "MoveTables") } mt.SwitchReads() + wf := mt.(iWorkflow) + validateReadsRouteToTarget(t, "replica") + validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateTableRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched) mt.Cancel() confirmNoRoutingRules(t) } @@ -462,35 +467,19 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards "--config-overrides", mapToCSV(overrides), } - var rs iReshard - var wf iWorkflow - createWorkflow := func() { - rs = newReshard(vc, &reshardWorkflow{ - workflowInfo: &workflowInfo{ - vc: vc, - workflowName: workflowName, - targetKeyspace: keyspace, - }, - sourceShards: sourceShards, - targetShards: targetShards, - createFlags: createFlags, - }, workflowFlavorVtctld) - wf = rs.(iWorkflow) - rs.Create() - } - - // First test that we can create a workflow, switch ONLY reads, and then cancel it. - createWorkflow() - rs.SwitchReads() - validateReadsRouteToTarget(t, "replica") - validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs) - validateTableRoutingRule(t, "customer", "", targetKs, sourceKs) - confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched) - rs.Cancel() - confirmNoRoutingRules(t) - - createWorkflow() + rs := newReshard(vc, &reshardWorkflow{ + workflowInfo: &workflowInfo{ + vc: vc, + workflowName: workflowName, + targetKeyspace: keyspace, + }, + sourceShards: sourceShards, + targetShards: targetShards, + createFlags: createFlags, + }, workflowFlavorVtctld) ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflowName) + wf := rs.(iWorkflow) + rs.Create() validateReshardResponse(rs) validateOverrides(t, targetTabs, overrides) workflowResponse := getWorkflow(keyspace, workflowName) From 5e937c27426db9951673b96d6e65268595d56e18 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 5 Nov 2024 15:04:32 -0500 Subject: [PATCH 5/7] Improve comment and eliminate test case cruft Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 2 +- go/vt/vtctl/workflow/server_test.go | 21 --------------------- 2 files changed, 1 insertion(+), 22 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index d5473431e41..9aebee9d497 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -2134,7 +2134,7 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe return nil, ErrWorkflowDeleteWritesSwitched } // If only reads have been switched, then we can delete the - // workflow and its routing rules. + // workflow and its related artifacts. } // Lock the workflow for deletion. diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index 93045133daa..ae9f4a30b76 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -873,27 +873,6 @@ func TestWorkflowDelete(t *testing.T) { Keyspace: targetKeyspaceName, Workflow: workflowName, }, - expectedSourceQueries: []*queryResult{ - { - query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", - sourceKeyspaceName, ReverseWorkflowName(workflowName)), - result: &querypb.QueryResult{}, - }, - }, - expectedTargetQueries: []*queryResult{ - { - query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table1Name), - result: &querypb.QueryResult{}, - }, - { - query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table2Name), - result: &querypb.QueryResult{}, - }, - { - query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table3Name), - result: &querypb.QueryResult{}, - }, - }, preFunc: func(t *testing.T, env *testEnv) { // Setup the routing rules as they would be after having previously // done SwitchTraffic with for all tablet types. From 0d9bd27cfa9d8a77ebb5b23372a4be62a0997246 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 5 Nov 2024 15:15:19 -0500 Subject: [PATCH 6/7] Two more nitty nits (halp) Signed-off-by: Matt Lord --- .../vreplication/vreplication_vtctldclient_cli_test.go | 4 ++-- go/vt/vtctl/workflow/server_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go index 4ee977c4d74..bb1a092cc54 100644 --- a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go +++ b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go @@ -786,9 +786,9 @@ func getRoutingRules(t *testing.T) *vschemapb.RoutingRules { func confirmNoRoutingRules(t *testing.T) { rrRes := getRoutingRules(t) - require.Zero(t, len(rrRes.Rules)) + require.Zero(t, rrRes.Rules) krrRes := getKeyspaceRoutingRules(t, vc) - require.Zero(t, len(krrRes.Rules)) + require.Zero(t, krrRes.Rules) } func confirmRoutingRulesExist(t *testing.T) { diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index ae9f4a30b76..dbe06ab1a47 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -879,12 +879,12 @@ func TestWorkflowDelete(t *testing.T) { env.updateTableRoutingRules(t, ctx, allTabletTypes, []string{table1Name, table2Name, table3Name}, sourceKeyspaceName, targetKeyspaceName, targetKeyspaceName) }, + wantErr: ErrWorkflowDeleteWritesSwitched.Error(), postFunc: func(t *testing.T, env *testEnv) { // Clear out the routing rules we put in place. err := env.ts.SaveRoutingRules(ctx, &vschemapb.RoutingRules{}) require.NoError(t, err) }, - wantErr: ErrWorkflowDeleteWritesSwitched.Error(), }, { name: "missing table", From d5ebfc9f1446e807b29df3b9246f9b14991c6f27 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 5 Nov 2024 16:13:32 -0500 Subject: [PATCH 7/7] Undo require.Zero test change as it fails with: Should be zero, but was [] Signed-off-by: Matt Lord --- .../vreplication/vreplication_vtctldclient_cli_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go index bb1a092cc54..4ee977c4d74 100644 --- a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go +++ b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go @@ -786,9 +786,9 @@ func getRoutingRules(t *testing.T) *vschemapb.RoutingRules { func confirmNoRoutingRules(t *testing.T) { rrRes := getRoutingRules(t) - require.Zero(t, rrRes.Rules) + require.Zero(t, len(rrRes.Rules)) krrRes := getKeyspaceRoutingRules(t, vc) - require.Zero(t, krrRes.Rules) + require.Zero(t, len(krrRes.Rules)) } func confirmRoutingRulesExist(t *testing.T) {