Skip to content

Commit

Permalink
VDiff: Add control for start/resume and stop (#16654)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Sep 7, 2024
1 parent c768b62 commit 10710d8
Show file tree
Hide file tree
Showing 19 changed files with 1,447 additions and 563 deletions.
11 changes: 11 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/spf13/cobra"

"vitess.io/vitess/go/cmd/vtctldclient/cli"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtctl/vtctldclient"
Expand Down Expand Up @@ -146,6 +147,16 @@ func validateOnDDL(cmd *cobra.Command) error {
return nil
}

// ValidateShards checks if the provided shard names are valid key ranges.
func ValidateShards(shards []string) error {
for _, shard := range shards {
if !key.IsValidKeyRange(shard) {
return fmt.Errorf("invalid shard: %q", shard)
}
}
return nil
}

func ParseAndValidateCreateOptions(cmd *cobra.Command) error {
if err := validateOnDDL(cmd); err != nil {
return err
Expand Down
24 changes: 19 additions & 5 deletions go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,16 @@ var (
AutoRetry bool
MaxDiffDuration time.Duration
RowDiffColumnTruncateAt int64
AutoStart bool
}{}

deleteOptions = struct {
Arg string
}{}

resumeOptions = struct {
UUID uuid.UUID
UUID uuid.UUID
TargetShards []string
}{}

showOptions = struct {
Expand All @@ -86,7 +88,8 @@ var (
}{}

stopOptions = struct {
UUID uuid.UUID
UUID uuid.UUID
TargetShards []string
}{}

parseAndValidateCreate = func(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -191,7 +194,8 @@ vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --targe
return fmt.Errorf("invalid UUID provided: %v", err)
}
resumeOptions.UUID = uuid
return nil

return common.ValidateShards(resumeOptions.TargetShards)
},
RunE: commandResume,
}
Expand Down Expand Up @@ -236,7 +240,8 @@ vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --targe
return fmt.Errorf("invalid UUID provided: %v", err)
}
stopOptions.UUID = uuid
return nil

return common.ValidateShards(stopOptions.TargetShards)
},
RunE: commandStop,
}
Expand Down Expand Up @@ -296,6 +301,7 @@ func commandCreate(cmd *cobra.Command, args []string) error {
MaxReportSampleRows: createOptions.MaxReportSampleRows,
MaxDiffDuration: protoutil.DurationToProto(createOptions.MaxDiffDuration),
RowDiffColumnTruncateAt: createOptions.RowDiffColumnTruncateAt,
AutoStart: &createOptions.AutoStart,
})

if err != nil {
Expand Down Expand Up @@ -379,6 +385,7 @@ func commandResume(cmd *cobra.Command, args []string) error {
Workflow: common.BaseOptions.Workflow,
TargetKeyspace: common.BaseOptions.TargetKeyspace,
Uuid: resumeOptions.UUID.String(),
TargetShards: resumeOptions.TargetShards,
})

if err != nil {
Expand Down Expand Up @@ -698,6 +705,9 @@ func buildSingleSummary(keyspace, workflow, uuid string, resp *vtctldatapb.VDiff
// Table summary information that must be accounted for across all shards.
{
table := row.AsString("table_name", "")
if table == "" { // This occurs when the table diff has not started on 1 or more shards
continue
}
// Create the global VDiff table summary object if it doesn't exist.
if _, ok := tableSummaryMap[table]; !ok {
tableSummaryMap[table] = tableSummary{
Expand Down Expand Up @@ -858,6 +868,7 @@ func commandStop(cmd *cobra.Command, args []string) error {
Workflow: common.BaseOptions.Workflow,
TargetKeyspace: common.BaseOptions.TargetKeyspace,
Uuid: stopOptions.UUID.String(),
TargetShards: stopOptions.TargetShards,
})

if err != nil {
Expand Down Expand Up @@ -887,18 +898,21 @@ func registerCommands(root *cobra.Command) {
create.Flags().BoolVar(&createOptions.Wait, "wait", false, "When creating or resuming a vdiff, wait for it to finish before exiting.")
create.Flags().DurationVar(&createOptions.WaitUpdateInterval, "wait-update-interval", time.Duration(1*time.Minute), "When waiting on a vdiff to finish, check and display the current status this often.")
create.Flags().BoolVar(&createOptions.AutoRetry, "auto-retry", true, "Should this vdiff automatically retry and continue in case of recoverable errors.")
create.Flags().BoolVar(&createOptions.UpdateTableStats, "update-table-stats", false, "Update the table statistics, using ANALYZE TABLE, on each table involved in the VDiff during initialization. This will ensure that progress estimates are as accurate as possible -- but it does involve locks and can potentially impact query processing on the target keyspace.")
create.Flags().BoolVar(&createOptions.UpdateTableStats, "update-table-stats", false, "Update the table statistics, using ANALYZE TABLE, on each table involved in the vdiff during initialization. This will ensure that progress estimates are as accurate as possible -- but it does involve locks and can potentially impact query processing on the target keyspace.")
create.Flags().DurationVar(&createOptions.MaxDiffDuration, "max-diff-duration", 0, "How long should an individual table diff run before being stopped and restarted in order to lessen the impact on tablets due to holding open database snapshots for long periods of time (0 is the default and means no time limit).")
create.Flags().Int64Var(&createOptions.RowDiffColumnTruncateAt, "row-diff-column-truncate-at", 128, "When showing row differences, truncate the non Primary Key column values to this length. A value less than 1 means do not truncate.")
create.Flags().BoolVar(&createOptions.AutoStart, "auto-start", true, "Start the vdiff upon creation. When false, the vdiff will be created but will not run until resumed.")
base.AddCommand(create)

base.AddCommand(delete)

resume.Flags().StringSliceVar(&resumeOptions.TargetShards, "target-shards", nil, "The target shards to resume the vdiff on; default is all shards.")
base.AddCommand(resume)

show.Flags().BoolVar(&showOptions.Verbose, "verbose", false, "Show verbose output in summaries")
base.AddCommand(show)

stop.Flags().StringSliceVar(&stopOptions.TargetShards, "target-shards", nil, "The target shards to stop the vdiff on; default is all shards.")
base.AddCommand(stop)
}

Expand Down
10 changes: 9 additions & 1 deletion go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,
testCLIErrors(t, ksWorkflow, allCellNames)
}
if tc.testCLIFlagHandling {
// This creates and then deletes the vdiff so we don't increment the count.
testCLIFlagHandling(t, tc.targetKs, tc.workflow, cells[0])
tc.vdiffCount++ // We did either vtctlclient OR vtctldclient vdiff create
}

checkVDiffCountStat(t, statsTablet, tc.vdiffCount)
Expand Down Expand Up @@ -370,6 +370,7 @@ func testCLIErrors(t *testing.T, ksWorkflow, cells string) {
// testCLIFlagHandling tests that the vtctldclient CLI flags are handled correctly
// from vtctldclient->vtctld->vttablet->mysqld.
func testCLIFlagHandling(t *testing.T, targetKs, workflowName string, cell *Cell) {
false := false
expectedOptions := &tabletmanagerdatapb.VDiffOptions{
CoreOptions: &tabletmanagerdatapb.VDiffCoreOptions{
MaxRows: 999,
Expand All @@ -378,6 +379,7 @@ func testCLIFlagHandling(t *testing.T, targetKs, workflowName string, cell *Cell
UpdateTableStats: true,
TimeoutSeconds: 60,
MaxDiffSeconds: 333,
AutoStart: &false,
},
PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{
SourceCell: "zone1,zone2,zone3,zonefoosource",
Expand Down Expand Up @@ -406,6 +408,7 @@ func testCLIFlagHandling(t *testing.T, targetKs, workflowName string, cell *Cell
fmt.Sprintf("--auto-retry=%t", expectedOptions.CoreOptions.AutoRetry),
fmt.Sprintf("--only-pks=%t", expectedOptions.ReportOptions.OnlyPks),
fmt.Sprintf("--row-diff-column-truncate-at=%d", expectedOptions.ReportOptions.RowDiffColumnTruncateAt),
fmt.Sprintf("--auto-start=%t", *expectedOptions.CoreOptions.AutoStart),
"--tablet-types-in-preference-order=false", // So tablet_types should not start with "in_order:", which is the default
"--format=json") // So we can easily grab the UUID
require.NoError(t, err, "vdiff command failed: %s", res)
Expand All @@ -430,6 +433,11 @@ func testCLIFlagHandling(t *testing.T, targetKs, workflowName string, cell *Cell
err = protojson.Unmarshal(bytes, storedOptions)
require.NoError(t, err, "failed to unmarshal result %s to a %T: %v", string(bytes), storedOptions, err)
require.True(t, proto.Equal(expectedOptions, storedOptions), "stored options %v != expected options %v", storedOptions, expectedOptions)

// Delete this vdiff as we used --auto-start=false and thus it never starts and
// does not provide the normally expected show --verbose --format=json output.
_, output := performVDiff2Action(t, false, fmt.Sprintf("%s.%s", targetKs, workflowName), "", "delete", vduuid.String(), false)
require.Equal(t, "completed", gjson.Get(output, "Status").String())
})
}

Expand Down
Loading

0 comments on commit 10710d8

Please sign in to comment.