Skip to content

Commit

Permalink
VReplication: Properly support cancel and delete for multi-tenant Mov…
Browse files Browse the repository at this point in the history
…eTables (#16906)

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Oct 23, 2024
1 parent f812835 commit 17607fa
Show file tree
Hide file tree
Showing 34 changed files with 3,601 additions and 1,503 deletions.
7 changes: 7 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/common/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"sort"

"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"vitess.io/vitess/go/cmd/vtctldclient/cli"

Expand All @@ -31,6 +33,7 @@ var CancelOptions = struct {
KeepData bool
KeepRoutingRules bool
Shards []string
DeleteBatchSize int64
}{}

func GetCancelCommand(opts *SubCommandsOpts) *cobra.Command {
Expand Down Expand Up @@ -60,9 +63,13 @@ func commandCancel(cmd *cobra.Command, args []string) error {
KeepData: CancelOptions.KeepData,
KeepRoutingRules: CancelOptions.KeepRoutingRules,
Shards: CancelOptions.Shards,
DeleteBatchSize: CancelOptions.DeleteBatchSize,
}
resp, err := GetClient().WorkflowDelete(GetCommandCtx(), req)
if err != nil {
if grpcerr, ok := status.FromError(err); ok && (grpcerr.Code() == codes.DeadlineExceeded) {
return fmt.Errorf("Cancel action timed out. Please try again and the work will pick back up where it left off. Note that you can control the timeout using the --action_timeout flag and the delete batch size with --delete-batch-size.")
}
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import (
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

// The default batch size to use when deleting tenant data
// if a multi-tenant migration is canceled or deleted.
const DefaultDeleteBatchSize = 1000

var (
// base is the base command for all actions related to MoveTables.
base = &cobra.Command{
Expand Down Expand Up @@ -108,6 +112,7 @@ func registerCommands(root *cobra.Command) {
cancel := common.GetCancelCommand(opts)
cancel.Flags().BoolVar(&common.CancelOptions.KeepData, "keep-data", false, "Keep the partially copied table data from the MoveTables workflow in the target keyspace.")
cancel.Flags().BoolVar(&common.CancelOptions.KeepRoutingRules, "keep-routing-rules", false, "Keep the routing rules created for the MoveTables workflow.")
cancel.Flags().Int64Var(&common.CancelOptions.DeleteBatchSize, "delete-batch-size", DefaultDeleteBatchSize, "When cleaning up the migrated data in tables moved as part of a multi-tenant workflow, delete the records in batches of this size.")
common.AddShardSubsetFlag(cancel, &common.CancelOptions.Shards)
base.AddCommand(cancel)
}
Expand Down
2 changes: 2 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/workflow/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
deleteOptions = struct {
KeepData bool
KeepRoutingRules bool
DeleteBatchSize int64
}{}

// delete makes a WorkflowDelete gRPC call to a vtctld.
Expand All @@ -55,6 +56,7 @@ func commandDelete(cmd *cobra.Command, args []string) error {
KeepData: deleteOptions.KeepData,
KeepRoutingRules: deleteOptions.KeepRoutingRules,
Shards: baseOptions.Shards,
DeleteBatchSize: deleteOptions.DeleteBatchSize,
}
resp, err := common.GetClient().WorkflowDelete(common.GetCommandCtx(), req)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/spf13/cobra"

"vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common"
"vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/movetables"
"vitess.io/vitess/go/vt/topo/topoproto"
)

Expand Down Expand Up @@ -60,6 +61,7 @@ func registerCommands(root *cobra.Command) {
delete.MarkFlagRequired("workflow")
delete.Flags().BoolVar(&deleteOptions.KeepData, "keep-data", false, "Keep the partially copied table data from the workflow in the target keyspace.")
delete.Flags().BoolVar(&deleteOptions.KeepRoutingRules, "keep-routing-rules", false, "Keep the routing rules created for the workflow.")
delete.Flags().Int64Var(&deleteOptions.DeleteBatchSize, "delete-batch-size", movetables.DefaultDeleteBatchSize, "When cleaning up the migrated data in tables moved as part of a multi-tenant MoveTables workflow, delete the records in batches of this size.")
common.AddShardSubsetFlag(delete, &baseOptions.Shards)
base.AddCommand(delete)

Expand Down
24 changes: 19 additions & 5 deletions go/test/endtoend/vreplication/multi_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,26 @@ func TestMultiTenantSimple(t *testing.T) {

require.Zero(t, len(getKeyspaceRoutingRules(t, vc).Rules))

mt.Create()
confirmKeyspacesRoutedTo(t, sourceKeyspace, "s1", "t1", nil)
validateKeyspaceRoutingRules(t, vc, initialRules)
createFunc := func() {
mt.Create()
confirmKeyspacesRoutedTo(t, sourceKeyspace, "s1", "t1", nil)
validateKeyspaceRoutingRules(t, vc, initialRules)

lastIndex = insertRows(lastIndex, sourceKeyspace)
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", targetKeyspace, mt.workflowName), binlogdatapb.VReplicationWorkflowState_Running.String())
lastIndex = insertRows(lastIndex, sourceKeyspace)
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", targetKeyspace, mt.workflowName), binlogdatapb.VReplicationWorkflowState_Running.String())
}

t.Run("cancel", func(t *testing.T) {
// First let's test canceling the workflow to ensure that it properly
// cleans up all of the data.
createFunc()
mt.Cancel()
rowCount := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", targetKeyspace, "t1"))
require.Zero(t, rowCount)
})

// Create again and run it to completion.
createFunc()

vdiff(t, targetKeyspace, workflowName, defaultCellName, false, true, nil)
mt.SwitchReads()
Expand Down
3 changes: 2 additions & 1 deletion go/test/endtoend/vreplication/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,8 @@ func (v VtctldMoveTables) ReverseWrites() {
}

func (v VtctldMoveTables) Cancel() {
v.exec("Cancel")
args := []string{"Cancel", "--delete-batch-size=500"}
v.exec(args...)
}

func (v VtctldMoveTables) Complete() {
Expand Down
Loading

0 comments on commit 17607fa

Please sign in to comment.