diff --git a/go/cmd/vtctldclient/command/vreplication/workflow/update.go b/go/cmd/vtctldclient/command/vreplication/workflow/update.go index 52df87acc8b..3152d3d4a65 100644 --- a/go/cmd/vtctldclient/command/vreplication/workflow/update.go +++ b/go/cmd/vtctldclient/command/vreplication/workflow/update.go @@ -112,6 +112,7 @@ func commandUpdate(cmd *cobra.Command, args []string) error { TabletSelectionPreference: tsp, OnDdl: binlogdatapb.OnDDLAction(onddl), Shards: baseOptions.Shards, + State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt), // We don't allow changing this in the client command }, } diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index b14b649d6b7..c63fc245535 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -26,12 +26,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/protojson" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/wrangler" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) const ( @@ -212,19 +214,42 @@ func tstWorkflowComplete(t *testing.T) error { // to primary,replica,rdonly (the only applicable types in these tests). func testWorkflowUpdate(t *testing.T) { tabletTypes := "primary,replica,rdonly" - // Test vtctlclient first + // Test vtctlclient first. _, err := vc.VtctlClient.ExecuteCommandWithOutput("workflow", "--", "--tablet-types", tabletTypes, "noexist.noexist", "update") require.Error(t, err, err) resp, err := vc.VtctlClient.ExecuteCommandWithOutput("workflow", "--", "--tablet-types", tabletTypes, ksWorkflow, "update") require.NoError(t, err) require.NotEmpty(t, resp) - // Test vtctldclient last + // Test vtctldclient last. _, err = vc.VtctldClient.ExecuteCommandWithOutput("workflow", "--keyspace", "noexist", "update", "--workflow", "noexist", "--tablet-types", tabletTypes) require.Error(t, err) + // Change the tablet-types to rdonly. + resp, err = vc.VtctldClient.ExecuteCommandWithOutput("workflow", "--keyspace", targetKs, "update", "--workflow", workflowName, "--tablet-types", "rdonly") + require.NoError(t, err, err) + // Confirm that we changed the workflow. + var ures vtctldatapb.WorkflowUpdateResponse + require.NoError(t, err) + err = protojson.Unmarshal([]byte(resp), &ures) + require.NoError(t, err) + require.Greater(t, len(ures.Details), 0) + require.True(t, ures.Details[0].Changed) + // Change tablet-types back to primary,replica,rdonly. resp, err = vc.VtctldClient.ExecuteCommandWithOutput("workflow", "--keyspace", targetKs, "update", "--workflow", workflowName, "--tablet-types", tabletTypes) require.NoError(t, err, err) - require.NotEmpty(t, resp) + // Confirm that we changed the workflow. + err = protojson.Unmarshal([]byte(resp), &ures) + require.NoError(t, err) + require.Greater(t, len(ures.Details), 0) + require.True(t, ures.Details[0].Changed) + // Execute a no-op as tablet-types is already primary,replica,rdonly. + resp, err = vc.VtctldClient.ExecuteCommandWithOutput("workflow", "--keyspace", targetKs, "update", "--workflow", workflowName, "--tablet-types", tabletTypes) + require.NoError(t, err, err) + // Confirm that we didn't change the workflow. + err = protojson.Unmarshal([]byte(resp), &ures) + require.NoError(t, err) + require.Greater(t, len(ures.Details), 0) + require.False(t, ures.Details[0].Changed) } func tstWorkflowCancel(t *testing.T) error { diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 919783e2f76..324cdda0a76 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -3829,6 +3829,7 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pflag TabletTypes: tabletTypes, TabletSelectionPreference: tsp, OnDdl: binlogdatapb.OnDDLAction(onddl), + State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt), // We don't allow changing this in the client command } } results, err = wr.WorkflowAction(ctx, workflow, keyspace, action, *dryRun, rpcReq, *shards) // Only update currently uses the new RPC path diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication.go b/go/vt/vttablet/tabletmanager/rpc_vreplication.go index 9e1cf1a2471..a274da98fdf 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication.go @@ -409,6 +409,7 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{Result: nil}, nil } + rowsAffected := uint64(0) for _, row := range res.Named().Rows { id := row.AsInt64("id", 0) cells := strings.Split(row.AsString("cell", ""), ",") @@ -472,11 +473,12 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta if err != nil { return nil, err } + rowsAffected += res.RowsAffected } return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{ Result: &querypb.QueryResult{ - RowsAffected: uint64(len(res.Rows)), + RowsAffected: rowsAffected, }, }, nil } @@ -499,7 +501,7 @@ func (tm *TabletManager) UpdateVReplicationWorkflows(ctx context.Context, req *t return &tabletmanagerdatapb.UpdateVReplicationWorkflowsResponse{ Result: &querypb.QueryResult{ - RowsAffected: uint64(len(res.Rows)), + RowsAffected: res.RowsAffected, }, }, nil }