diff --git a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go index 726da479b56..722578fe411 100644 --- a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go +++ b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go @@ -57,13 +57,13 @@ var ( TargetCells []string TabletTypes []topodatapb.TabletType Tables []string - Limit uint32 // We only accept positive values but pass on an int64 + Limit int64 FilteredReplicationWaitTime time.Duration DebugQuery bool - MaxReportSampleRows uint32 // We only accept positive values but pass on an int64 + MaxReportSampleRows int64 OnlyPKs bool UpdateTableStats bool - MaxExtraRowsToCompare uint32 // We only accept positive values but pass on an int64 + MaxExtraRowsToCompare int64 Wait bool WaitUpdateInterval time.Duration AutoRetry bool @@ -113,6 +113,16 @@ var ( createOptions.Tables[i] = strings.TrimSpace(table) } } + // Enforce non-negative values for limits and max options. + if createOptions.Limit < 1 { + return fmt.Errorf("--limit must be a positive value") + } + if createOptions.MaxReportSampleRows < 0 { + return fmt.Errorf("--max-report-sample-rows must not be a negative value") + } + if createOptions.MaxExtraRowsToCompare < 0 { + return fmt.Errorf("--max-extra-rows-to-compare must not be a negative value") + } return nil } @@ -271,16 +281,16 @@ func commandCreate(cmd *cobra.Command, args []string) error { TabletTypes: createOptions.TabletTypes, TabletSelectionPreference: tsp, Tables: createOptions.Tables, - Limit: int64(createOptions.Limit), + Limit: createOptions.Limit, FilteredReplicationWaitTime: protoutil.DurationToProto(createOptions.FilteredReplicationWaitTime), DebugQuery: createOptions.DebugQuery, OnlyPKs: createOptions.OnlyPKs, UpdateTableStats: createOptions.UpdateTableStats, - MaxExtraRowsToCompare: int64(createOptions.MaxExtraRowsToCompare), + MaxExtraRowsToCompare: createOptions.MaxExtraRowsToCompare, Wait: createOptions.Wait, WaitUpdateInterval: protoutil.DurationToProto(createOptions.WaitUpdateInterval), AutoRetry: createOptions.AutoRetry, - MaxReportSampleRows: int64(createOptions.MaxReportSampleRows), + MaxReportSampleRows: createOptions.MaxReportSampleRows, }) if err != nil { @@ -863,12 +873,12 @@ func registerCommands(root *cobra.Command) { create.Flags().Var((*topoprotopb.TabletTypeListFlag)(&createOptions.TabletTypes), "tablet-types", "Tablet types to use on the source and target.") create.Flags().BoolVar(&common.CreateOptions.TabletTypesInPreferenceOrder, "tablet-types-in-preference-order", true, "When performing source tablet selection, look for candidates in the type order as they are listed in the tablet-types flag.") create.Flags().DurationVar(&createOptions.FilteredReplicationWaitTime, "filtered-replication-wait-time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for replication to catch up when syncing tablet streams.") - create.Flags().Uint32Var(&createOptions.Limit, "limit", math.MaxUint32, "Max rows to stop comparing after.") + create.Flags().Int64Var(&createOptions.Limit, "limit", math.MaxInt64, "Max rows to stop comparing after.") create.Flags().BoolVar(&createOptions.DebugQuery, "debug-query", false, "Adds a mysql query to the report that can be used for further debugging.") - create.Flags().Uint32Var(&createOptions.MaxReportSampleRows, "max-report-sample-rows", 10, "Maximum number of row differences to report (0 for all differences). NOTE: when increasing this value it is highly recommended to also specify --only-pks") + create.Flags().Int64Var(&createOptions.MaxReportSampleRows, "max-report-sample-rows", 10, "Maximum number of row differences to report (0 for all differences). NOTE: when increasing this value it is highly recommended to also specify --only-pks") create.Flags().BoolVar(&createOptions.OnlyPKs, "only-pks", false, "When reporting missing rows, only show primary keys in the report.") create.Flags().StringSliceVar(&createOptions.Tables, "tables", nil, "Only run vdiff for these tables in the workflow.") - create.Flags().Uint32Var(&createOptions.MaxExtraRowsToCompare, "max-extra-rows-to-compare", 1000, "If there are collation differences between the source and target, you can have rows that are identical but simply returned in a different order from MySQL. We will do a second pass to compare the rows for any actual differences in this case and this flag allows you to control the resources used for this operation.") + create.Flags().Int64Var(&createOptions.MaxExtraRowsToCompare, "max-extra-rows-to-compare", 1000, "If there are collation differences between the source and target, you can have rows that are identical but simply returned in a different order from MySQL. We will do a second pass to compare the rows for any actual differences in this case and this flag allows you to control the resources used for this operation.") create.Flags().BoolVar(&createOptions.Wait, "wait", false, "When creating or resuming a vdiff, wait for it to finish before exiting.") create.Flags().DurationVar(&createOptions.WaitUpdateInterval, "wait-update-interval", time.Duration(1*time.Minute), "When waiting on a vdiff to finish, check and display the current status this often.") create.Flags().BoolVar(&createOptions.AutoRetry, "auto-retry", true, "Should this vdiff automatically retry and continue in case of recoverable errors.") diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index eb96985af57..b4753750871 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -22,11 +22,17 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/stretchr/testify/require" "github.com/tidwall/gjson" + "golang.org/x/exp/maps" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/sqlparser" + + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" ) type testCase struct { @@ -40,10 +46,11 @@ type testCase struct { retryInsert string resume bool // test resume functionality with this workflow // If testing resume, what new rows should be diff'd. These rows must have a PK > all initial rows and retry rows. - resumeInsert string - stop bool // test stop functionality with this workflow - testCLIErrors bool // test CLI errors against this workflow (only needs to be done once) - testCLICreateWait bool // test CLI create and wait until done against this workflow (only needs to be done once) + resumeInsert string + stop bool // test stop functionality with this workflow + testCLIErrors bool // test CLI errors against this workflow (only needs to be done once) + testCLICreateWait bool // test CLI create and wait until done against this workflow (only needs to be done once) + testCLIFlagHandling bool // test vtctldclient flag handling from end-to-end } const ( @@ -55,21 +62,22 @@ const ( var testCases = []*testCase{ { - name: "MoveTables/unsharded to two shards", - workflow: "p1c2", - typ: "MoveTables", - sourceKs: "product", - targetKs: "customer", - sourceShards: "0", - targetShards: "-80,80-", - tabletBaseID: 200, - tables: "customer,Lead,Lead-1", - autoRetryError: true, - retryInsert: `insert into customer(cid, name, typ) values(1991234, 'Testy McTester', 'soho')`, - resume: true, - resumeInsert: `insert into customer(cid, name, typ) values(1992234, 'Testy McTester (redux)', 'enterprise')`, - testCLIErrors: true, // test for errors in the simplest workflow - testCLICreateWait: true, // test wait on create feature against simplest workflow + name: "MoveTables/unsharded to two shards", + workflow: "p1c2", + typ: "MoveTables", + sourceKs: "product", + targetKs: "customer", + sourceShards: "0", + targetShards: "-80,80-", + tabletBaseID: 200, + tables: "customer,Lead,Lead-1", + autoRetryError: true, + retryInsert: `insert into customer(cid, name, typ) values(1991234, 'Testy McTester', 'soho')`, + resume: true, + resumeInsert: `insert into customer(cid, name, typ) values(1992234, 'Testy McTester (redux)', 'enterprise')`, + testCLIErrors: true, // test for errors in the simplest workflow + testCLICreateWait: true, // test wait on create feature against simplest workflow + testCLIFlagHandling: true, // test flag handling end-to-end against simplest workflow }, { name: "Reshard Merge/split 2 to 3", @@ -207,6 +215,9 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, if tc.testCLIErrors { testCLIErrors(t, ksWorkflow, allCellNames) } + if tc.testCLIFlagHandling { + testCLIFlagHandling(t, tc.targetKs, tc.workflow, cells[0]) + } testDelete(t, ksWorkflow, allCellNames) @@ -242,6 +253,68 @@ func testCLIErrors(t *testing.T, ksWorkflow, cells string) { }) } +// testCLIFlagHandling tests that the vtctldclient CLI flags are handled correctly +// from vtctldclient->vtctld->vttablet->mysqld. +func testCLIFlagHandling(t *testing.T, targetKs, workflowName string, cell *Cell) { + expectedOptions := &tabletmanagerdatapb.VDiffOptions{ + CoreOptions: &tabletmanagerdatapb.VDiffCoreOptions{ + MaxRows: 999, + MaxExtraRowsToCompare: 777, + AutoRetry: true, + UpdateTableStats: true, + TimeoutSeconds: 60, + }, + PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{ + SourceCell: "zone1,zone2,zone3,zonefoosource", + TargetCell: "zone1,zone2,zone3,zonefootarget", + TabletTypes: "replica,primary,rdonly", + }, + ReportOptions: &tabletmanagerdatapb.VDiffReportOptions{ + MaxSampleRows: 888, + OnlyPks: true, + }, + } + + t.Run("Client flag handling", func(t *testing.T) { + res, err := vc.VtctldClient.ExecuteCommandWithOutput("vdiff", "--target-keyspace", targetKs, "--workflow", workflowName, + "create", + "--limit", fmt.Sprintf("%d", expectedOptions.CoreOptions.MaxRows), + "--max-report-sample-rows", fmt.Sprintf("%d", expectedOptions.ReportOptions.MaxSampleRows), + "--max-extra-rows-to-compare", fmt.Sprintf("%d", expectedOptions.CoreOptions.MaxExtraRowsToCompare), + "--filtered-replication-wait-time", fmt.Sprintf("%v", time.Duration(expectedOptions.CoreOptions.TimeoutSeconds)*time.Second), + "--source-cells", expectedOptions.PickerOptions.SourceCell, + "--target-cells", expectedOptions.PickerOptions.TargetCell, + "--tablet-types", expectedOptions.PickerOptions.TabletTypes, + fmt.Sprintf("--update-table-stats=%t", expectedOptions.CoreOptions.UpdateTableStats), + fmt.Sprintf("--auto-retry=%t", expectedOptions.CoreOptions.AutoRetry), + fmt.Sprintf("--only-pks=%t", expectedOptions.ReportOptions.OnlyPks), + "--tablet-types-in-preference-order=false", // So tablet_types should not start with "in_order:", which is the default + "--format=json") // So we can easily grab the UUID + require.NoError(t, err, "vdiff command failed: %s", res) + jsonRes := gjson.Parse(res) + vduuid, err := uuid.Parse(jsonRes.Get("UUID").String()) + require.NoError(t, err, "invalid UUID: %s", jsonRes.Get("UUID").String()) + + // Confirm that the options were passed through and saved correctly. + query := sqlparser.BuildParsedQuery("select options from %s.vdiff where vdiff_uuid = %s", + sidecarDBIdentifier, encodeString(vduuid.String())).Query + tablets := vc.getVttabletsInKeyspace(t, cell, targetKs, "PRIMARY") + require.Greater(t, len(tablets), 0, "no primary tablets found in keyspace %s", targetKs) + tablet := maps.Values(tablets)[0] + qres, err := tablet.QueryTablet(query, targetKs, false) + require.NoError(t, err, "query %q failed: %v", query, err) + require.NotNil(t, qres, "query %q returned nil result", query) // Should never happen + require.Equal(t, 1, len(qres.Rows), "query %q returned %d rows, expected 1", query, len(qres.Rows)) + require.Equal(t, 1, len(qres.Rows[0]), "query %q returned %d columns, expected 1", query, len(qres.Rows[0])) + storedOptions := &tabletmanagerdatapb.VDiffOptions{} + bytes, err := qres.Rows[0][0].ToBytes() + require.NoError(t, err, "failed to convert result %+v to bytes: %v", qres.Rows[0], err) + err = protojson.Unmarshal(bytes, storedOptions) + require.NoError(t, err, "failed to unmarshal result %s to a %T: %v", string(bytes), storedOptions, err) + require.True(t, proto.Equal(expectedOptions, storedOptions), "stored options %v != expected options %v", storedOptions, expectedOptions) + }) +} + func testDelete(t *testing.T, ksWorkflow, cells string) { t.Run("Delete", func(t *testing.T) { // Let's be sure that we have at least 3 unique VDiffs. diff --git a/go/vt/vtctl/grpcvtctldclient/client_gen.go b/go/vt/vtctl/grpcvtctldclient/client_gen.go index 4020afa3307..c1d251487fd 100644 --- a/go/vt/vtctl/grpcvtctldclient/client_gen.go +++ b/go/vt/vtctl/grpcvtctldclient/client_gen.go @@ -128,15 +128,6 @@ func (client *gRPCVtctldClient) CleanupSchemaMigration(ctx context.Context, in * return client.c.CleanupSchemaMigration(ctx, in, opts...) } -// ForceCutOverSchemaMigration is part of the vtctlservicepb.VtctldClient interface. -func (client *gRPCVtctldClient) ForceCutOverSchemaMigration(ctx context.Context, in *vtctldatapb.ForceCutOverSchemaMigrationRequest, opts ...grpc.CallOption) (*vtctldatapb.ForceCutOverSchemaMigrationResponse, error) { - if client.c == nil { - return nil, status.Error(codes.Unavailable, connClosedMsg) - } - - return client.c.ForceCutOverSchemaMigration(ctx, in, opts...) -} - // CompleteSchemaMigration is part of the vtctlservicepb.VtctldClient interface. func (client *gRPCVtctldClient) CompleteSchemaMigration(ctx context.Context, in *vtctldatapb.CompleteSchemaMigrationRequest, opts ...grpc.CallOption) (*vtctldatapb.CompleteSchemaMigrationResponse, error) { if client.c == nil { @@ -263,6 +254,15 @@ func (client *gRPCVtctldClient) FindAllShardsInKeyspace(ctx context.Context, in return client.c.FindAllShardsInKeyspace(ctx, in, opts...) } +// ForceCutOverSchemaMigration is part of the vtctlservicepb.VtctldClient interface. +func (client *gRPCVtctldClient) ForceCutOverSchemaMigration(ctx context.Context, in *vtctldatapb.ForceCutOverSchemaMigrationRequest, opts ...grpc.CallOption) (*vtctldatapb.ForceCutOverSchemaMigrationResponse, error) { + if client.c == nil { + return nil, status.Error(codes.Unavailable, connClosedMsg) + } + + return client.c.ForceCutOverSchemaMigration(ctx, in, opts...) +} + // GetBackups is part of the vtctlservicepb.VtctldClient interface. func (client *gRPCVtctldClient) GetBackups(ctx context.Context, in *vtctldatapb.GetBackupsRequest, opts ...grpc.CallOption) (*vtctldatapb.GetBackupsResponse, error) { if client.c == nil { diff --git a/go/vt/vtctl/localvtctldclient/client_gen.go b/go/vt/vtctl/localvtctldclient/client_gen.go index 019ce619054..cbde68f1b27 100644 --- a/go/vt/vtctl/localvtctldclient/client_gen.go +++ b/go/vt/vtctl/localvtctldclient/client_gen.go @@ -176,11 +176,6 @@ func (client *localVtctldClient) CleanupSchemaMigration(ctx context.Context, in return client.s.CleanupSchemaMigration(ctx, in) } -// ForceCutOverSchemaMigration is part of the vtctlservicepb.VtctldClient interface. -func (client *localVtctldClient) ForceCutOverSchemaMigration(ctx context.Context, in *vtctldatapb.ForceCutOverSchemaMigrationRequest, opts ...grpc.CallOption) (*vtctldatapb.ForceCutOverSchemaMigrationResponse, error) { - return client.s.ForceCutOverSchemaMigration(ctx, in) -} - // CompleteSchemaMigration is part of the vtctlservicepb.VtctldClient interface. func (client *localVtctldClient) CompleteSchemaMigration(ctx context.Context, in *vtctldatapb.CompleteSchemaMigrationRequest, opts ...grpc.CallOption) (*vtctldatapb.CompleteSchemaMigrationResponse, error) { return client.s.CompleteSchemaMigration(ctx, in) @@ -251,6 +246,11 @@ func (client *localVtctldClient) FindAllShardsInKeyspace(ctx context.Context, in return client.s.FindAllShardsInKeyspace(ctx, in) } +// ForceCutOverSchemaMigration is part of the vtctlservicepb.VtctldClient interface. +func (client *localVtctldClient) ForceCutOverSchemaMigration(ctx context.Context, in *vtctldatapb.ForceCutOverSchemaMigrationRequest, opts ...grpc.CallOption) (*vtctldatapb.ForceCutOverSchemaMigrationResponse, error) { + return client.s.ForceCutOverSchemaMigration(ctx, in) +} + // GetBackups is part of the vtctlservicepb.VtctldClient interface. func (client *localVtctldClient) GetBackups(ctx context.Context, in *vtctldatapb.GetBackupsRequest, opts ...grpc.CallOption) (*vtctldatapb.GetBackupsResponse, error) { return client.s.GetBackups(ctx, in) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index d15e9dc1bbd..5302b33edb8 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1632,7 +1632,7 @@ func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRe CoreOptions: &tabletmanagerdatapb.VDiffCoreOptions{ Tables: strings.Join(req.Tables, ","), AutoRetry: req.AutoRetry, - MaxRows: req.MaxExtraRowsToCompare, + MaxRows: req.Limit, TimeoutSeconds: req.FilteredReplicationWaitTime.Seconds, MaxExtraRowsToCompare: req.MaxExtraRowsToCompare, UpdateTableStats: req.UpdateTableStats,