diff --git a/go/cmd/vtctldclient/command/vreplication/workflow/update.go b/go/cmd/vtctldclient/command/vreplication/workflow/update.go index 466d81e8be4..3d06ad3e64e 100644 --- a/go/cmd/vtctldclient/command/vreplication/workflow/update.go +++ b/go/cmd/vtctldclient/command/vreplication/workflow/update.go @@ -111,6 +111,7 @@ func commandUpdate(cmd *cobra.Command, args []string) error { TabletTypes: updateOptions.TabletTypes, TabletSelectionPreference: tsp, OnDdl: binlogdatapb.OnDDLAction(onddl), + State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt), // We don't allow changing this in the client command }, } diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 93f43e9d1b6..c32590e436b 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -27,12 +27,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/protojson" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/wrangler" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) const ( @@ -120,7 +122,9 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, // Test new experimental --defer-secondary-keys flag switch currentWorkflowType { case binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowType_Migrate, binlogdatapb.VReplicationWorkflowType_Reshard: - args = append(args, "--defer-secondary-keys") + if !atomicCopy { + args = append(args, "--defer-secondary-keys") + } } } if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && action == workflowActionSwitchTraffic { @@ -132,6 +136,9 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, } args = append(args, "--timeout=90s") } + if action == workflowActionCreate && atomicCopy { + args = append(args, "--atomic-copy") + } if (action == workflowActionCreate || action == workflowActionSwitchTraffic || action == workflowActionReverseTraffic) && cells != "" { args = append(args, "--cells", cells) } @@ -194,19 +201,42 @@ func tstWorkflowComplete(t *testing.T) error { // to primary,replica,rdonly (the only applicable types in these tests). func testWorkflowUpdate(t *testing.T) { tabletTypes := "primary,replica,rdonly" - // Test vtctlclient first + // Test vtctlclient first. _, err := vc.VtctlClient.ExecuteCommandWithOutput("workflow", "--", "--tablet-types", tabletTypes, "noexist.noexist", "update") require.Error(t, err, err) resp, err := vc.VtctlClient.ExecuteCommandWithOutput("workflow", "--", "--tablet-types", tabletTypes, ksWorkflow, "update") require.NoError(t, err) require.NotEmpty(t, resp) - // Test vtctldclient last + // Test vtctldclient last. _, err = vc.VtctldClient.ExecuteCommandWithOutput("workflow", "--keyspace", "noexist", "update", "--workflow", "noexist", "--tablet-types", tabletTypes) require.Error(t, err) + // Change the tablet-types to rdonly. + resp, err = vc.VtctldClient.ExecuteCommandWithOutput("workflow", "--keyspace", targetKs, "update", "--workflow", workflowName, "--tablet-types", "rdonly") + require.NoError(t, err, err) + // Confirm that we changed the workflow. + var ures vtctldatapb.WorkflowUpdateResponse + require.NoError(t, err) + err = protojson.Unmarshal([]byte(resp), &ures) + require.NoError(t, err) + require.Greater(t, len(ures.Details), 0) + require.True(t, ures.Details[0].Changed) + // Change tablet-types back to primary,replica,rdonly. resp, err = vc.VtctldClient.ExecuteCommandWithOutput("workflow", "--keyspace", targetKs, "update", "--workflow", workflowName, "--tablet-types", tabletTypes) require.NoError(t, err, err) - require.NotEmpty(t, resp) + // Confirm that we changed the workflow. + err = protojson.Unmarshal([]byte(resp), &ures) + require.NoError(t, err) + require.Greater(t, len(ures.Details), 0) + require.True(t, ures.Details[0].Changed) + // Execute a no-op as tablet-types is already primary,replica,rdonly. + resp, err = vc.VtctldClient.ExecuteCommandWithOutput("workflow", "--keyspace", targetKs, "update", "--workflow", workflowName, "--tablet-types", tabletTypes) + require.NoError(t, err, err) + // Confirm that we didn't change the workflow. + err = protojson.Unmarshal([]byte(resp), &ures) + require.NoError(t, err) + require.Greater(t, len(ures.Details), 0) + require.False(t, ures.Details[0].Changed) } func tstWorkflowCancel(t *testing.T) error { diff --git a/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go b/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go index 383400cf7c5..cc6c8c13ef2 100644 --- a/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go +++ b/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go @@ -5663,6 +5663,12 @@ func (x *VDiffOptions) GetReportOptions() *VDiffReportOptions { return nil } +// UpdateVReplicationWorkflowRequest is used to update an existing VReplication +// workflow. Note that the following fields MUST have an explicit value provided +// if you do NOT wish to update the existing value to the given type's ZeroValue: +// cells, tablet_types, on_ddl, and state. +// TODO: leverage the optional modifier for these fields rather than using SimulatedNull +// values: https://github.com/vitessio/vitess/issues/15627 type UpdateVReplicationWorkflowRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 11d3cf85e68..a9776017e61 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -3816,6 +3816,7 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pflag TabletTypes: tabletTypes, TabletSelectionPreference: tsp, OnDdl: binlogdatapb.OnDDLAction(onddl), + State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt), // We don't allow changing this in the client command } } results, err = wr.WorkflowAction(ctx, workflow, keyspace, action, *dryRun, rpcReq) // Only update currently uses the new RPC path diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication.go b/go/vt/vttablet/tabletmanager/rpc_vreplication.go index d2d10979cad..e16968fc5b9 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication.go @@ -257,6 +257,7 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{Result: nil}, nil } + rowsAffected := uint64(0) for _, row := range res.Named().Rows { id := row.AsInt64("id", 0) cells := strings.Split(row.AsString("cell", ""), ",") @@ -317,11 +318,12 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta if err != nil { return nil, err } + rowsAffected += res.RowsAffected } return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{ Result: &querypb.QueryResult{ - RowsAffected: uint64(len(res.Rows)), + RowsAffected: rowsAffected, }, }, nil } diff --git a/proto/tabletmanagerdata.proto b/proto/tabletmanagerdata.proto index fc9f6fa97b9..b39b17ac07b 100644 --- a/proto/tabletmanagerdata.proto +++ b/proto/tabletmanagerdata.proto @@ -628,6 +628,12 @@ message VDiffOptions { VDiffReportOptions report_options = 3; } +// UpdateVReplicationWorkflowRequest is used to update an existing VReplication +// workflow. Note that the following fields MUST have an explicit value provided +// if you do NOT wish to update the existing value to the given type's ZeroValue: +// cells, tablet_types, on_ddl, and state. +// TODO: leverage the optional modifier for these fields rather than using SimulatedNull +// values: https://github.com/vitessio/vitess/issues/15627 message UpdateVReplicationWorkflowRequest { string workflow = 1; repeated string cells = 2;