From 4e64e1aa3fe9d6ccca101453dac54d5603eb2be8 Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Wed, 27 Sep 2023 15:27:01 -0400 Subject: [PATCH 1/2] Cherry-pick 2f679aaab15220898a790b0f3b1ae27207b49f0b with conflicts --- .../command/vreplication/vdiff/vdiff.go | 889 ++++++++++++++++++ go/test/endtoend/vreplication/vdiff2_test.go | 29 +- .../vreplication/vdiff_helper_test.go | 44 + .../tabletmanager/vdiff/engine_test.go | 5 +- .../tabletmanager/vdiff/table_differ.go | 53 +- 5 files changed, 992 insertions(+), 28 deletions(-) create mode 100644 go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go diff --git a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go new file mode 100644 index 00000000000..c5ac975494f --- /dev/null +++ b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go @@ -0,0 +1,889 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vdiff + +import ( + "encoding/json" + "fmt" + "html/template" + "io" + "math" + "reflect" + "sort" + "strings" + "time" + + "github.com/bndr/gotabulate" + "github.com/google/uuid" + "github.com/spf13/cobra" + + "vitess.io/vitess/go/cmd/vtctldclient/cli" + "vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common" + "vitess.io/vitess/go/protoutil" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + topoprotopb "vitess.io/vitess/go/vt/topo/topoproto" +) + +var ( + tabletTypesDefault = []topodatapb.TabletType{ + topodatapb.TabletType_RDONLY, + topodatapb.TabletType_REPLICA, + topodatapb.TabletType_PRIMARY, + } + + createOptions = struct { + UUID uuid.UUID + SourceCells []string + TargetCells []string + TabletTypes []topodatapb.TabletType + Tables []string + Limit uint32 // We only accept positive values but pass on an int64 + FilteredReplicationWaitTime time.Duration + DebugQuery bool + OnlyPKs bool + UpdateTableStats bool + MaxExtraRowsToCompare uint32 // We only accept positive values but pass on an int64 + Wait bool + WaitUpdateInterval time.Duration + AutoRetry bool + }{} + + deleteOptions = struct { + Arg string + }{} + + resumeOptions = struct { + UUID uuid.UUID + }{} + + showOptions = struct { + Arg string + Verbose bool + }{} + + stopOptions = struct { + UUID uuid.UUID + }{} + + parseAndValidateCreate = func(cmd *cobra.Command, args []string) error { + var err error + if len(args) == 1 { // Validate UUID if provided + if createOptions.UUID, err = uuid.Parse(args[0]); err != nil { + return fmt.Errorf("invalid UUID provided: %v", err) + } + } else { // Generate a UUID + createOptions.UUID = uuid.New() + } + if !cmd.Flags().Lookup("tablet-types").Changed { + createOptions.TabletTypes = tabletTypesDefault + } + if cmd.Flags().Lookup("source-cells").Changed { + for i, cell := range createOptions.SourceCells { + createOptions.SourceCells[i] = strings.TrimSpace(cell) + } + } + if cmd.Flags().Lookup("target-cells").Changed { + for i, cell := range createOptions.TargetCells { + createOptions.TargetCells[i] = strings.TrimSpace(cell) + } + } + if cmd.Flags().Lookup("tables").Changed { + for i, table := range createOptions.Tables { + createOptions.Tables[i] = strings.TrimSpace(table) + } + } + return nil + } + + // base is the base command for all actions related to VDiff. + base = &cobra.Command{ + Use: "VDiff --workflow --keyspace [command] [command-flags]", + Short: "Perform commands related to diffing tables involved in a VReplication workflow between the source and target.", + Long: `VDiff commands: create, resume, show, stop, and delete. +See the --help output for each command for more details.`, + DisableFlagsInUseLine: true, + Aliases: []string{"vdiff"}, + Args: cobra.NoArgs, + } + + // create makes a VDiffCreate gRPC call to a vtctld. + create = &cobra.Command{ + Use: "create", + Short: "Create and run a VDiff to compare the tables involved in a VReplication workflow between the source and target.", + Example: `vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace customer +vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace customer create b3f59678-5241-11ee-be56-0242ac120002`, + SilenceUsage: true, + DisableFlagsInUseLine: true, + Aliases: []string{"Create"}, + Args: cobra.MaximumNArgs(1), + PreRunE: parseAndValidateCreate, + RunE: commandCreate, + } + + // delete makes a VDiffDelete gRPC call to a vtctld. + delete = &cobra.Command{ + Use: "delete", + Short: "Delete VDiffs.", + Example: `vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace delete a037a9e2-5628-11ee-8c99-0242ac120002 +vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace delete all`, + DisableFlagsInUseLine: true, + Aliases: []string{"Delete"}, + Args: cobra.ExactArgs(1), + PreRunE: func(cmd *cobra.Command, args []string) error { + larg := strings.ToLower(args[0]) + switch larg { + case "all": + default: + if _, err := uuid.Parse(args[0]); err != nil { + return fmt.Errorf("invalid argument provided (%s), valid arguments are 'all' or a valid UUID", + args[0]) + } + } + deleteOptions.Arg = larg + return nil + }, + RunE: commandDelete, + } + + // resume makes a VDiffResume gRPC call to a vtctld. + resume = &cobra.Command{ + Use: "resume", + Short: "Resume a VDiff.", + Example: `vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace resume a037a9e2-5628-11ee-8c99-0242ac120002`, + DisableFlagsInUseLine: true, + Aliases: []string{"Resume"}, + Args: cobra.ExactArgs(1), + PreRunE: func(cmd *cobra.Command, args []string) error { + uuid, err := uuid.Parse(args[0]) + if err != nil { + return fmt.Errorf("invalid UUID provided: %v", err) + } + resumeOptions.UUID = uuid + return nil + }, + RunE: commandResume, + } + + // show makes a VDiffShow gRPC call to a vtctld. + show = &cobra.Command{ + Use: "show", + Short: "Show the status of a VDiff.", + Example: `vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace show last +vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace show a037a9e2-5628-11ee-8c99-0242ac120002 +vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace show all`, + DisableFlagsInUseLine: true, + Aliases: []string{"Show"}, + Args: cobra.ExactArgs(1), + PreRunE: func(cmd *cobra.Command, args []string) error { + larg := strings.ToLower(args[0]) + switch larg { + case "last", "all": + default: + if _, err := uuid.Parse(args[0]); err != nil { + return fmt.Errorf("invalid argument provided (%s), valid arguments are 'all', 'last', or a valid UUID", + args[0]) + } + } + showOptions.Arg = larg + return nil + }, + RunE: commandShow, + } + + // stop makes a VDiffStop gRPC call to a vtctld. + stop = &cobra.Command{ + Use: "stop", + Short: "Stop a running VDiff.", + Example: `vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace stop a037a9e2-5628-11ee-8c99-0242ac120002`, + DisableFlagsInUseLine: true, + Aliases: []string{"Stop"}, + Args: cobra.ExactArgs(1), + PreRunE: func(cmd *cobra.Command, args []string) error { + uuid, err := uuid.Parse(args[0]) + if err != nil { + return fmt.Errorf("invalid UUID provided: %v", err) + } + stopOptions.UUID = uuid + return nil + }, + RunE: commandStop, + } +) + +type simpleResponse struct { + Action vdiff.VDiffAction + Status string +} + +// displaySimpleResponse displays a simple standard response for the +// resume, stop, and delete commands after the client command completes +// without an error. +func displaySimpleResponse(out io.Writer, format string, action vdiff.VDiffAction) { + status := "completed" + if action == vdiff.ResumeAction { + status = "scheduled" + } + if format == "json" { + resp := &simpleResponse{ + Action: action, + Status: status, + } + jsonText, _ := cli.MarshalJSONPretty(resp) + fmt.Fprintln(out, string(jsonText)) + } else { + fmt.Fprintf(out, "VDiff %s %s\n", action, status) + } +} + +func commandCreate(cmd *cobra.Command, args []string) error { + format, err := common.GetOutputFormat(cmd) + if err != nil { + return err + } + tsp := common.GetTabletSelectionPreference(cmd) + cli.FinishedParsing(cmd) + + resp, err := common.GetClient().VDiffCreate(common.GetCommandCtx(), &vtctldatapb.VDiffCreateRequest{ + Workflow: common.BaseOptions.Workflow, + TargetKeyspace: common.BaseOptions.TargetKeyspace, + Uuid: createOptions.UUID.String(), + SourceCells: createOptions.SourceCells, + TargetCells: createOptions.TargetCells, + TabletTypes: createOptions.TabletTypes, + TabletSelectionPreference: tsp, + Tables: createOptions.Tables, + Limit: int64(createOptions.Limit), + FilteredReplicationWaitTime: protoutil.DurationToProto(createOptions.FilteredReplicationWaitTime), + DebugQuery: createOptions.DebugQuery, + OnlyPKs: createOptions.OnlyPKs, + UpdateTableStats: createOptions.UpdateTableStats, + MaxExtraRowsToCompare: int64(createOptions.MaxExtraRowsToCompare), + Wait: createOptions.Wait, + WaitUpdateInterval: protoutil.DurationToProto(createOptions.WaitUpdateInterval), + AutoRetry: createOptions.AutoRetry, + }) + + if err != nil { + return err + } + + if createOptions.Wait { + tkr := time.NewTicker(createOptions.WaitUpdateInterval) + defer tkr.Stop() + var state vdiff.VDiffState + ctx := common.GetCommandCtx() + vtctldClient := common.GetClient() + uuidStr := createOptions.UUID.String() + for { + select { + case <-ctx.Done(): + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") + case <-tkr.C: + resp, err := vtctldClient.VDiffShow(ctx, &vtctldatapb.VDiffShowRequest{ + Workflow: common.BaseOptions.Workflow, + TargetKeyspace: common.BaseOptions.TargetKeyspace, + Arg: uuidStr, + }) + if err != nil { + return err + } + if state, err = displayShowSingleSummary(cmd.OutOrStdout(), format, common.BaseOptions.TargetKeyspace, common.BaseOptions.Workflow, uuidStr, resp, false); err != nil { + return err + } + if state == vdiff.CompletedState { + return nil + } + } + } + } else { + var data []byte + if format == "json" { + data, err = cli.MarshalJSONPretty(resp) + if err != nil { + return err + } + } else { + data = []byte(fmt.Sprintf("VDiff %s scheduled on target shards, use show to view progress", resp.UUID)) + } + fmt.Println(string(data)) + } + + return nil +} + +func commandDelete(cmd *cobra.Command, args []string) error { + format, err := common.GetOutputFormat(cmd) + if err != nil { + return err + } + cli.FinishedParsing(cmd) + + _, err = common.GetClient().VDiffDelete(common.GetCommandCtx(), &vtctldatapb.VDiffDeleteRequest{ + Workflow: common.BaseOptions.Workflow, + TargetKeyspace: common.BaseOptions.TargetKeyspace, + Arg: deleteOptions.Arg, + }) + + if err != nil { + return err + } + + displaySimpleResponse(cmd.OutOrStdout(), format, vdiff.DeleteAction) + + return nil +} + +func commandResume(cmd *cobra.Command, args []string) error { + format, err := common.GetOutputFormat(cmd) + if err != nil { + return err + } + cli.FinishedParsing(cmd) + + _, err = common.GetClient().VDiffResume(common.GetCommandCtx(), &vtctldatapb.VDiffResumeRequest{ + Workflow: common.BaseOptions.Workflow, + TargetKeyspace: common.BaseOptions.TargetKeyspace, + Uuid: resumeOptions.UUID.String(), + }) + + if err != nil { + return err + } + + displaySimpleResponse(cmd.OutOrStdout(), format, vdiff.ResumeAction) + + return nil +} + +// tableSummary aggregates the current state of the table diff from all shards. +type tableSummary struct { + TableName string + State vdiff.VDiffState + RowsCompared int64 + MatchingRows int64 + MismatchedRows int64 + ExtraRowsSource int64 + ExtraRowsTarget int64 + LastUpdated string `json:"LastUpdated,omitempty"` +} + +// summary aggregates the current state of the vdiff from all shards. +type summary struct { + Workflow, Keyspace string + State vdiff.VDiffState + UUID string + RowsCompared int64 + HasMismatch bool + Shards string + StartedAt string `json:"StartedAt,omitempty"` + CompletedAt string `json:"CompletedAt,omitempty"` + TableSummaryMap map[string]tableSummary `json:"TableSummary,omitempty"` + Reports map[string]map[string]vdiff.DiffReport `json:"Reports,omitempty"` + Errors map[string]string `json:"Errors,omitempty"` + Progress *vdiff.ProgressReport `json:"Progress,omitempty"` +} + +const summaryTextTemplate = ` +VDiff Summary for {{.Keyspace}}.{{.Workflow}} ({{.UUID}}) +State: {{.State}} +{{if .Errors}} +{{- range $shard, $error := .Errors}} + Error: (shard {{$shard}}) {{$error}} +{{- end}} +{{end}} +RowsCompared: {{.RowsCompared}} +HasMismatch: {{.HasMismatch}} +StartedAt: {{.StartedAt}} +{{if (eq .State "started")}}Progress: {{printf "%.2f" .Progress.Percentage}}%%{{if .Progress.ETA}}, ETA: {{.Progress.ETA}}{{end}}{{end}} +{{if .CompletedAt}}CompletedAt: {{.CompletedAt}}{{end}} +{{range $table := .TableSummaryMap}} +Table {{$table.TableName}}: + State: {{$table.State}} + ProcessedRows: {{$table.RowsCompared}} + MatchingRows: {{$table.MatchingRows}} +{{if $table.MismatchedRows}} MismatchedRows: {{$table.MismatchedRows}}{{end}} +{{if $table.ExtraRowsSource}} ExtraRowsSource: {{$table.ExtraRowsSource}}{{end}} +{{if $table.ExtraRowsTarget}} ExtraRowsTarget: {{$table.ExtraRowsTarget}}{{end}} +{{end}} + +Use "--format=json" for more detailed output. +` + +type listing struct { + UUID, Workflow, Keyspace, Shard, State string +} + +func (vdl *listing) String() string { + return fmt.Sprintf("UUID: %s, Workflow: %s, Keyspace: %s, Shard: %s, State: %s", + vdl.UUID, vdl.Workflow, vdl.Keyspace, vdl.Shard, vdl.State) +} + +func getStructFieldNames(s any) []string { + t := reflect.TypeOf(s) + + names := make([]string, t.NumField()) + for i := range names { + names[i] = t.Field(i).Name + } + + return names +} + +func buildListings(listings []*listing) string { + var values []string + var lines [][]string + var result string + + if len(listings) == 0 { + return "" + } + // Get the column headers. + fields := getStructFieldNames(listing{}) + // The header is the first row. + lines = append(lines, fields) + for _, listing := range listings { + v := reflect.ValueOf(*listing) + for _, field := range fields { + values = append(values, v.FieldByName(field).String()) + } + lines = append(lines, values) + } + t := gotabulate.Create(lines) + result = t.Render("grid") + return result +} + +func displayShowResponse(out io.Writer, format, keyspace, workflowName, actionArg string, resp *vtctldatapb.VDiffShowResponse, verbose bool) error { + var vdiffUUID uuid.UUID + var err error + switch actionArg { + case vdiff.AllActionArg: + return displayShowRecent(out, format, keyspace, workflowName, actionArg, resp) + case vdiff.LastActionArg: + for _, resp := range resp.TabletResponses { + vdiffUUID, err = uuid.Parse(resp.VdiffUuid) + if err != nil { + if format == "json" { + fmt.Fprintln(out, "{}") + } else { + fmt.Fprintf(out, "No previous vdiff found for %s.%s\n", keyspace, workflowName) + } + return nil + } + break + } + fallthrough + default: + if vdiffUUID == uuid.Nil { // Then it must be passed as the action arg + vdiffUUID, err = uuid.Parse(actionArg) + if err != nil { + return err + } + } + if len(resp.TabletResponses) == 0 { + return fmt.Errorf("no response received for vdiff show of %s.%s (%s)", keyspace, workflowName, vdiffUUID.String()) + } + _, err = displayShowSingleSummary(out, format, keyspace, workflowName, vdiffUUID.String(), resp, verbose) + return err + } +} + +func displayShowRecent(out io.Writer, format, keyspace, workflowName, subCommand string, resp *vtctldatapb.VDiffShowResponse) error { + output := "" + recentListings, err := buildRecentListings(resp) + if err != nil { + return err + } + if format == "json" { + jsonText, err := cli.MarshalJSONPretty(recentListings) + if err != nil { + return err + } + output = string(jsonText) + if output == "null" { + output = "[]" + } + } else { + output = buildListings(recentListings) + if output == "" { + output = fmt.Sprintf("No vdiffs found for %s.%s", keyspace, workflowName) + } + } + fmt.Fprintln(out, output) + return nil +} + +func buildRecentListings(resp *vtctldatapb.VDiffShowResponse) ([]*listing, error) { + var listings []*listing + for _, resp := range resp.TabletResponses { + if resp != nil && resp.Output != nil { + qr := sqltypes.Proto3ToResult(resp.Output) + for _, row := range qr.Named().Rows { + listings = append(listings, &listing{ + UUID: row["vdiff_uuid"].ToString(), + Workflow: row["workflow"].ToString(), + Keyspace: row["keyspace"].ToString(), + Shard: row["shard"].ToString(), + State: row["state"].ToString(), + }) + } + } + } + return listings, nil +} + +func displayShowSingleSummary(out io.Writer, format, keyspace, workflowName, uuid string, resp *vtctldatapb.VDiffShowResponse, verbose bool) (vdiff.VDiffState, error) { + state := vdiff.UnknownState + var output string + summary, err := buildSingleSummary(keyspace, workflowName, uuid, resp, verbose) + if err != nil { + return state, err + } + if summary == nil { // Should never happen + return state, fmt.Errorf("no report to show for vdiff %s.%s (%s)", keyspace, workflowName, uuid) + } + state = summary.State + if format == "json" { + jsonText, err := cli.MarshalJSONPretty(summary) + if err != nil { + return state, err + } + output = string(jsonText) + } else { + tmpl, err := template.New("summary").Parse(summaryTextTemplate) + if err != nil { + return state, err + } + sb := new(strings.Builder) + err = tmpl.Execute(sb, summary) + if err != nil { + return state, err + } + output = sb.String() + for { + str := strings.Replace(output, "\n\n", "\n", -1) + if output == str { + break + } + output = str + } + } + fmt.Fprintln(out, output) + return state, nil +} + +func buildSingleSummary(keyspace, workflow, uuid string, resp *vtctldatapb.VDiffShowResponse, verbose bool) (*summary, error) { + summary := &summary{ + Workflow: workflow, + Keyspace: keyspace, + UUID: uuid, + State: vdiff.UnknownState, + RowsCompared: 0, + StartedAt: "", + CompletedAt: "", + HasMismatch: false, + Shards: "", + Reports: make(map[string]map[string]vdiff.DiffReport), + Errors: make(map[string]string), + Progress: nil, + } + + var tableSummaryMap map[string]tableSummary + var reports map[string]map[string]vdiff.DiffReport + // Keep a tally of the states across all tables in all shards. + tableStateCounts := map[vdiff.VDiffState]int{ + vdiff.UnknownState: 0, + vdiff.PendingState: 0, + vdiff.StartedState: 0, + vdiff.StoppedState: 0, + vdiff.ErrorState: 0, + vdiff.CompletedState: 0, + } + // Keep a tally of the summary states across all shards. + shardStateCounts := map[vdiff.VDiffState]int{ + vdiff.UnknownState: 0, + vdiff.PendingState: 0, + vdiff.StartedState: 0, + vdiff.StoppedState: 0, + vdiff.ErrorState: 0, + vdiff.CompletedState: 0, + } + // Keep a tally of the approximate total rows to process as we'll use this for our progress + // report. + totalRowsToCompare := int64(0) + var shards []string + for shard, resp := range resp.TabletResponses { + first := true + if resp != nil && resp.Output != nil { + shards = append(shards, shard) + qr := sqltypes.Proto3ToResult(resp.Output) + if tableSummaryMap == nil { + tableSummaryMap = make(map[string]tableSummary, 0) + reports = make(map[string]map[string]vdiff.DiffReport, 0) + } + for _, row := range qr.Named().Rows { + // Update the global VDiff summary based on the per shard level summary. + // Since these values will be the same for all subsequent rows we only use + // the first row. + if first { + first = false + // Our timestamps are strings in `2022-06-26 20:43:25` format so we sort + // them lexicographically. + // We should use the earliest started_at across all shards. + if sa := row.AsString("started_at", ""); summary.StartedAt == "" || sa < summary.StartedAt { + summary.StartedAt = sa + } + // And we should use the latest completed_at across all shards. + if ca := row.AsString("completed_at", ""); summary.CompletedAt == "" || ca > summary.CompletedAt { + summary.CompletedAt = ca + } + // If we had an error on the shard, then let's add that to the summary. + if le := row.AsString("last_error", ""); le != "" { + summary.Errors[shard] = le + } + // Keep track of how many shards are marked as a specific state. We check + // this combined with the shard.table states to determine the VDiff summary + // state. + shardStateCounts[vdiff.VDiffState(strings.ToLower(row.AsString("vdiff_state", "")))]++ + } + + // Global VDiff summary updates that take into account the per table details + // per shard. + { + summary.RowsCompared += row.AsInt64("rows_compared", 0) + totalRowsToCompare += row.AsInt64("table_rows", 0) + + // If we had a mismatch on any table on any shard then the global VDiff + // summary does too. + if mm, _ := row.ToBool("has_mismatch"); mm { + summary.HasMismatch = true + } + } + + // Table summary information that must be accounted for across all shards. + { + table := row.AsString("table_name", "") + // Create the global VDiff table summary object if it doesn't exist. + if _, ok := tableSummaryMap[table]; !ok { + tableSummaryMap[table] = tableSummary{ + TableName: table, + State: vdiff.UnknownState, + } + + } + ts := tableSummaryMap[table] + // This is the shard level VDiff table state. + sts := vdiff.VDiffState(strings.ToLower(row.AsString("table_state", ""))) + tableStateCounts[sts]++ + + // The error state must be sticky, and we should not override any other + // known state with completed. + switch sts { + case vdiff.CompletedState: + if ts.State == vdiff.UnknownState { + ts.State = sts + } + case vdiff.ErrorState: + ts.State = sts + default: + if ts.State != vdiff.ErrorState { + ts.State = sts + } + } + + diffReport := row.AsString("report", "") + dr := vdiff.DiffReport{} + if diffReport != "" { + err := json.Unmarshal([]byte(diffReport), &dr) + if err != nil { + return nil, err + } + ts.RowsCompared += dr.ProcessedRows + ts.MismatchedRows += dr.MismatchedRows + ts.MatchingRows += dr.MatchingRows + ts.ExtraRowsTarget += dr.ExtraRowsTarget + ts.ExtraRowsSource += dr.ExtraRowsSource + } + if _, ok := reports[table]; !ok { + reports[table] = make(map[string]vdiff.DiffReport) + } + + reports[table][shard] = dr + tableSummaryMap[table] = ts + } + } + } + } + + // The global VDiff summary should progress from pending->started->completed with + // stopped for any shard and error for any table being sticky for the global summary. + // We should only consider the VDiff to be complete if it's completed for every table + // on every shard. + if shardStateCounts[vdiff.StoppedState] > 0 { + summary.State = vdiff.StoppedState + } else if shardStateCounts[vdiff.ErrorState] > 0 || tableStateCounts[vdiff.ErrorState] > 0 { + summary.State = vdiff.ErrorState + } else if tableStateCounts[vdiff.StartedState] > 0 { + summary.State = vdiff.StartedState + } else if tableStateCounts[vdiff.PendingState] > 0 { + summary.State = vdiff.PendingState + } else if tableStateCounts[vdiff.CompletedState] == (len(tableSummaryMap) * len(shards)) { + // When doing shard consolidations/merges, we cannot rely solely on the + // vdiff_table state as there are N sources that we process rows from sequentially + // with each one writing to the shared _vt.vdiff_table record for the target shard. + // So we only mark the vdiff for the shard as completed when we've finished + // processing rows from all of the sources -- which is recorded by marking the + // vdiff done for the shard by setting _vt.vdiff.state = completed. + if shardStateCounts[vdiff.CompletedState] == len(shards) { + summary.State = vdiff.CompletedState + } else { + summary.State = vdiff.StartedState + } + } else { + summary.State = vdiff.UnknownState + } + + // If the vdiff has been started then we can calculate the progress. + if summary.State == vdiff.StartedState { + buildProgressReport(summary, totalRowsToCompare) + } + + sort.Strings(shards) // Sort for predictable output + summary.Shards = strings.Join(shards, ",") + summary.TableSummaryMap = tableSummaryMap + summary.Reports = reports + if !summary.HasMismatch && !verbose { + summary.Reports = nil + summary.TableSummaryMap = nil + } + // If we haven't completed the global VDiff then be sure to reflect that with no + // CompletedAt value. + if summary.State != vdiff.CompletedState { + summary.CompletedAt = "" + } + return summary, nil +} + +func buildProgressReport(summary *summary, rowsToCompare int64) { + report := &vdiff.ProgressReport{} + if summary.RowsCompared >= 1 { + // Round to 2 decimal points. + report.Percentage = math.Round(math.Min((float64(summary.RowsCompared)/float64(rowsToCompare))*100, 100.00)*100) / 100 + } + if math.IsNaN(report.Percentage) { + report.Percentage = 0 + } + pctToGo := math.Abs(report.Percentage - 100.00) + startTime, _ := time.Parse(vdiff.TimestampFormat, summary.StartedAt) + curTime := time.Now().UTC() + runTime := curTime.Unix() - startTime.Unix() + if report.Percentage >= 1 { + // Calculate how long 1% took, on avg, and multiply that by the % left. + eta := time.Unix(((int64(runTime)/int64(report.Percentage))*int64(pctToGo))+curTime.Unix(), 1).UTC() + // Cap the ETA at 1 year out to prevent providing nonsensical ETAs. + if eta.Before(time.Now().UTC().AddDate(1, 0, 0)) { + report.ETA = eta.Format(vdiff.TimestampFormat) + } + } + summary.Progress = report +} + +func commandShow(cmd *cobra.Command, args []string) error { + format, err := common.GetOutputFormat(cmd) + if err != nil { + return err + } + cli.FinishedParsing(cmd) + + resp, err := common.GetClient().VDiffShow(common.GetCommandCtx(), &vtctldatapb.VDiffShowRequest{ + Workflow: common.BaseOptions.Workflow, + TargetKeyspace: common.BaseOptions.TargetKeyspace, + Arg: showOptions.Arg, + }) + + if err != nil { + return err + } + + if err = displayShowResponse(cmd.OutOrStdout(), format, common.BaseOptions.TargetKeyspace, common.BaseOptions.Workflow, showOptions.Arg, resp, showOptions.Verbose); err != nil { + return err + } + + return nil +} + +func commandStop(cmd *cobra.Command, args []string) error { + format, err := common.GetOutputFormat(cmd) + if err != nil { + return err + } + cli.FinishedParsing(cmd) + + _, err = common.GetClient().VDiffStop(common.GetCommandCtx(), &vtctldatapb.VDiffStopRequest{ + Workflow: common.BaseOptions.Workflow, + TargetKeyspace: common.BaseOptions.TargetKeyspace, + Uuid: stopOptions.UUID.String(), + }) + + if err != nil { + return err + } + + displaySimpleResponse(cmd.OutOrStdout(), format, vdiff.StopAction) + + return nil +} + +func registerVDiffCommands(root *cobra.Command) { + common.AddCommonFlags(base) + root.AddCommand(base) + + create.Flags().StringSliceVar(&createOptions.SourceCells, "source-cells", nil, "The source cell(s) to compare from; default is any available cell.") + create.Flags().StringSliceVar(&createOptions.TargetCells, "target-cells", nil, "The target cell(s) to compare with; default is any available cell.") + create.Flags().Var((*topoprotopb.TabletTypeListFlag)(&createOptions.TabletTypes), "tablet-types", "Tablet types to use on the source and target.") + create.Flags().BoolVar(&common.CreateOptions.TabletTypesInPreferenceOrder, "tablet-types-in-preference-order", true, "When performing source tablet selection, look for candidates in the type order as they are listed in the tablet-types flag.") + create.Flags().DurationVar(&createOptions.FilteredReplicationWaitTime, "filtered-replication-wait-time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for replication to catch up when syncing tablet streams.") + create.Flags().Uint32Var(&createOptions.Limit, "limit", math.MaxUint32, "Max rows to stop comparing after.") + create.Flags().BoolVar(&createOptions.DebugQuery, "debug-query", false, "Adds a mysql query to the report that can be used for further debugging.") + create.Flags().BoolVar(&createOptions.OnlyPKs, "only-pks", false, "When reporting missing rows, only show primary keys in the report.") + create.Flags().StringSliceVar(&createOptions.Tables, "tables", nil, "Only run vdiff for these tables in the workflow.") + create.Flags().Uint32Var(&createOptions.MaxExtraRowsToCompare, "max-extra-rows-to-compare", 1000, "If there are collation differences between the source and target, you can have rows that are identical but simply returned in a different order from MySQL. We will do a second pass to compare the rows for any actual differences in this case and this flag allows you to control the resources used for this operation.") + create.Flags().BoolVar(&createOptions.Wait, "wait", false, "When creating or resuming a vdiff, wait for it to finish before exiting.") + create.Flags().DurationVar(&createOptions.WaitUpdateInterval, "wait-update-interval", time.Duration(1*time.Minute), "When waiting on a vdiff to finish, check and display the current status this often.") + create.Flags().BoolVar(&createOptions.AutoRetry, "auto-retry", true, "Should this vdiff automatically retry and continue in case of recoverable errors.") + create.Flags().BoolVar(&createOptions.UpdateTableStats, "update-table-stats", false, "Update the table statistics, using ANALYZE TABLE, on each table involved in the VDiff during initialization. This will ensure that progress estimates are as accurate as possible -- but it does involve locks and can potentially impact query processing on the target keyspace.") + base.AddCommand(create) + + base.AddCommand(delete) + + base.AddCommand(resume) + + show.Flags().BoolVar(&showOptions.Verbose, "verbose", false, "Show verbose output in summaries") + base.AddCommand(show) + + base.AddCommand(stop) +} + +func init() { + common.RegisterCommandHandler("VDiff", registerVDiffCommands) +} diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index 82e2b24f4b3..78a5728d9ee 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -103,8 +103,7 @@ var testCases = []*testCase{ } func TestVDiff2(t *testing.T) { - allCellNames = "zone1" - defaultCellName := "zone1" + allCellNames = "zone5,zone1,zone2,zone3,zone4" sourceKs := "product" sourceShards := []string{"0"} targetKs := "customer" @@ -112,14 +111,19 @@ func TestVDiff2(t *testing.T) { // This forces us to use multiple vstream packets even with small test tables extraVTTabletArgs = []string{"--vstream_packet_size=1"} - vc = NewVitessCluster(t, "TestVDiff2", []string{allCellNames}, mainClusterConfig) + vc = NewVitessCluster(t, "TestVDiff2", strings.Split(allCellNames, ","), mainClusterConfig) require.NotNil(t, vc) - defaultCell = vc.Cells[defaultCellName] - cells := []*Cell{defaultCell} + zone1 := vc.Cells["zone1"] + zone2 := vc.Cells["zone2"] + zone3 := vc.Cells["zone3"] + defaultCell = zone1 defer vc.TearDown(t) - vc.AddKeyspace(t, cells, sourceKs, strings.Join(sourceShards, ","), initialProductVSchema, initialProductSchema, 0, 0, 100, sourceKsOpts) + // The primary tablet is only added in the first cell. + // We ONLY add primary tablets in this test. + _, err := vc.AddKeyspace(t, []*Cell{zone2, zone1, zone3}, sourceKs, strings.Join(sourceShards, ","), initialProductVSchema, initialProductSchema, 0, 0, 100, sourceKsOpts) + require.NoError(t, err) vtgate = defaultCell.Vtgates[0] require.NotNil(t, vtgate) @@ -139,7 +143,9 @@ func TestVDiff2(t *testing.T) { generateMoreCustomers(t, sourceKs, 100) - _, err := vc.AddKeyspace(t, cells, targetKs, strings.Join(targetShards, ","), customerVSchema, customerSchema, 0, 0, 200, targetKsOpts) + // The primary tablet is only added in the first cell. + // We ONLY add primary tablets in this test. + tks, err := vc.AddKeyspace(t, []*Cell{zone3, zone1, zone2}, targetKs, strings.Join(targetShards, ","), customerVSchema, customerSchema, 0, 0, 200, targetKsOpts) require.NoError(t, err) for _, shard := range targetShards { require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, shard)) @@ -147,15 +153,15 @@ func TestVDiff2(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - testWorkflow(t, vc, tc, cells) + // Primary tablets for any new shards are added in the first cell. + testWorkflow(t, vc, tc, tks, []*Cell{zone3, zone2, zone1}) }) } } -func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell) { +func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, cells []*Cell) { arrTargetShards := strings.Split(tc.targetShards, ",") if tc.typ == "Reshard" { - tks := vc.Cells[cells[0].Name].Keyspaces[tc.targetKs] require.NoError(t, vc.AddShards(t, cells, tks, tc.targetShards, 0, 0, tc.tabletBaseID, targetKsOpts)) for _, shard := range arrTargetShards { require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, tc.targetKs, shard)) @@ -168,6 +174,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell) if tc.typ == "Reshard" { args = append(args, "--source_shards", tc.sourceShards, "--target_shards", tc.targetShards) } + args = append(args, "--cells", allCellNames) args = append(args, "--tables", tc.tables) args = append(args, "Create") args = append(args, ksWorkflow) @@ -180,7 +187,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell) updateTableStats(t, tab, tc.tables) // need to do this in order to test progress reports } - vdiff(t, tc.targetKs, tc.workflow, cells[0].Name, true, true, nil) + vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil) if tc.autoRetryError { testAutoRetryError(t, tc, cells[0].Name) diff --git a/go/test/endtoend/vreplication/vdiff_helper_test.go b/go/test/endtoend/vreplication/vdiff_helper_test.go index 35b3b3f5d26..c2395c87c7d 100644 --- a/go/test/endtoend/vreplication/vdiff_helper_test.go +++ b/go/test/endtoend/vreplication/vdiff_helper_test.go @@ -173,6 +173,7 @@ func doVdiff2(t *testing.T, keyspace, workflow, cells string, want *expectedVDif func performVDiff2Action(t *testing.T, ksWorkflow, cells, action, actionArg string, expectError bool, extraFlags ...string) (uuid string, output string) { var err error +<<<<<<< HEAD args := []string{"VDiff", "--", "--tablet_types=primary", "--source_cell=" + cells, "--format=json"} if len(extraFlags) > 0 { args = append(args, extraFlags...) @@ -184,6 +185,49 @@ func performVDiff2Action(t *testing.T, ksWorkflow, cells, action, actionArg stri require.Nil(t, err) uuid = gjson.Get(output, "UUID").String() if action != "delete" && !(action == "show" && actionArg == "all") { // a UUID is not required +======= + targetKeyspace, workflowName, ok := strings.Cut(ksWorkflow, ".") + require.True(t, ok, "invalid keyspace.workflow value: %s", ksWorkflow) + + if useVtctlclient { + // This will always result in us using a PRIMARY tablet, which is all + // we start in many e2e tests, but it avoids the tablet picker logic + // where when you ONLY specify the PRIMARY type it then picks the + // shard's primary and ignores any cell settings. + args := []string{"VDiff", "--", "--tablet_types=in_order:primary,replica", "--source_cell=" + cells, "--format=json"} + if len(extraFlags) > 0 { + args = append(args, extraFlags...) + } + args = append(args, ksWorkflow, action, actionArg) + output, err = vc.VtctlClient.ExecuteCommandWithOutput(args...) + log.Infof("vdiff output: %+v (err: %+v)", output, err) + if !expectError { + require.Nil(t, err) + uuid = gjson.Get(output, "UUID").String() + if action != "delete" && !(action == "show" && actionArg == "all") { // A UUID is not required + require.NoError(t, err) + require.NotEmpty(t, uuid) + } + } + } else { + args := []string{"VDiff", "--target-keyspace", targetKeyspace, "--workflow", workflowName, "--format=json", action} + if strings.ToLower(action) == string(vdiff2.CreateAction) { + // This will always result in us using a PRIMARY tablet, which is all + // we start in many e2e tests, but it avoids the tablet picker logic + // where when you ONLY specify the PRIMARY type it then picks the + // shard's primary and ignores any cell settings. + args = append(args, "--tablet-types=primary,replica", "--tablet-types-in-preference-order", "--source-cells="+cells) + } + if len(extraFlags) > 0 { + args = append(args, extraFlags...) + } + if actionArg != "" { + args = append(args, actionArg) + } + output, err = vc.VtctldClient.ExecuteCommandWithOutput(args...) + log.Infof("vdiff output: %+v (err: %+v)", output, err) + if !expectError { +>>>>>>> 2f679aaab1 (VDiff: properly split cell values in record when using TabletPicker (#14099)) require.NoError(t, err) require.NotEmpty(t, uuid) } diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine_test.go b/go/vt/vttablet/tabletmanager/vdiff/engine_test.go index cfb9651fb11..55a211475d2 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/engine_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/engine_test.go @@ -19,6 +19,7 @@ package vdiff import ( "context" "fmt" + "strings" "testing" "github.com/google/uuid" @@ -106,8 +107,8 @@ func TestVDiff(t *testing.T) { MaxRows: 100, }, PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{ - SourceCell: tstenv.Cells[0], - TargetCell: tstenv.Cells[0], + SourceCell: strings.Join(tstenv.Cells, ","), + TargetCell: strings.Join(tstenv.Cells, ","), TabletTypes: "primary", }, ReportOptions: &tabletmanagerdatapb.VDiffReportOptions{ diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index f3aa73aaee3..f19b39f1359 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "sync" "time" @@ -121,7 +122,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error { } }() - if err := td.selectTablets(ctx, td.wd.opts.PickerOptions.SourceCell, td.wd.opts.PickerOptions.TabletTypes); err != nil { + if err := td.selectTablets(ctx); err != nil { return err } if err := td.syncSourceStreams(ctx); err != nil { @@ -199,16 +200,22 @@ func (td *tableDiffer) forEachSource(cb func(source *migrationSource) error) err return allErrors.AggrError(vterrors.Aggregate) } -func (td *tableDiffer) selectTablets(ctx context.Context, cell, tabletTypes string) error { - var wg sync.WaitGroup - ct := td.wd.ct - var err1, err2 error +func (td *tableDiffer) selectTablets(ctx context.Context) error { + var ( + wg sync.WaitGroup + sourceErr, targetErr error + targetTablet *topodata.Tablet + ) + + // The cells from the vdiff record are a comma separated list. + sourceCells := strings.Split(td.wd.opts.PickerOptions.SourceCell, ",") + targetCells := strings.Split(td.wd.opts.PickerOptions.TargetCell, ",") // For Mount+Migrate, the source tablets will be in a different // Vitess cluster with its own TopoServer. - sourceTopoServer := ct.ts - if ct.externalCluster != "" { - extTS, err := ct.ts.OpenExternalVitessClusterServer(ctx, ct.externalCluster) + sourceTopoServer := td.wd.ct.ts + if td.wd.ct.externalCluster != "" { + extTS, err := td.wd.ct.ts.OpenExternalVitessClusterServer(ctx, td.wd.ct.externalCluster) if err != nil { return err } @@ -217,12 +224,17 @@ func (td *tableDiffer) selectTablets(ctx context.Context, cell, tabletTypes stri wg.Add(1) go func() { defer wg.Done() +<<<<<<< HEAD err1 = td.forEachSource(func(source *migrationSource) error { tablet, err := pickTablet(ctx, sourceTopoServer, cell, ct.sourceKeyspace, source.shard, tabletTypes) +======= + sourceErr = td.forEachSource(func(source *migrationSource) error { + sourceTablet, err := pickTablet(ctx, sourceTopoServer, sourceCells, td.wd.ct.vde.thisTablet.Alias.Cell, td.wd.ct.sourceKeyspace, source.shard, td.wd.opts.PickerOptions.TabletTypes) +>>>>>>> 2f679aaab1 (VDiff: properly split cell values in record when using TabletPicker (#14099)) if err != nil { return err } - source.tablet = tablet + source.tablet = sourceTablet return nil }) }() @@ -230,26 +242,37 @@ func (td *tableDiffer) selectTablets(ctx context.Context, cell, tabletTypes stri wg.Add(1) go func() { defer wg.Done() +<<<<<<< HEAD tablet, err2 := pickTablet(ctx, ct.ts, td.wd.opts.PickerOptions.TargetCell, ct.vde.thisTablet.Keyspace, ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes) if err2 != nil { +======= + targetTablet, targetErr = pickTablet(ctx, td.wd.ct.ts, targetCells, td.wd.ct.vde.thisTablet.Alias.Cell, td.wd.ct.vde.thisTablet.Keyspace, + td.wd.ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes) + if targetErr != nil { +>>>>>>> 2f679aaab1 (VDiff: properly split cell values in record when using TabletPicker (#14099)) return } - ct.targetShardStreamer = &shardStreamer{ - tablet: tablet, - shard: tablet.Shard, + td.wd.ct.targetShardStreamer = &shardStreamer{ + tablet: targetTablet, + shard: targetTablet.Shard, } }() wg.Wait() - if err1 != nil { - return err1 + if sourceErr != nil { + return sourceErr } - return err2 + return targetErr } +<<<<<<< HEAD func pickTablet(ctx context.Context, ts *topo.Server, cell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) { tp, err := discovery.NewTabletPicker(ts, []string{cell}, keyspace, shard, tabletTypes) +======= +func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) { + tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}) +>>>>>>> 2f679aaab1 (VDiff: properly split cell values in record when using TabletPicker (#14099)) if err != nil { return nil, err } From a8f062066c165bcc6256a0118881fb1bd4a4d4b5 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 27 Sep 2023 15:47:31 -0400 Subject: [PATCH 2/2] Adjust for v16 Signed-off-by: Matt Lord --- .../command/vreplication/vdiff/vdiff.go | 889 ------------------ go/test/endtoend/vreplication/vdiff2_test.go | 4 +- .../vreplication/vdiff_helper_test.go | 50 +- .../tabletmanager/vdiff/table_differ.go | 26 +- 4 files changed, 12 insertions(+), 957 deletions(-) delete mode 100644 go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go diff --git a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go deleted file mode 100644 index c5ac975494f..00000000000 --- a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go +++ /dev/null @@ -1,889 +0,0 @@ -/* -Copyright 2023 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package vdiff - -import ( - "encoding/json" - "fmt" - "html/template" - "io" - "math" - "reflect" - "sort" - "strings" - "time" - - "github.com/bndr/gotabulate" - "github.com/google/uuid" - "github.com/spf13/cobra" - - "vitess.io/vitess/go/cmd/vtctldclient/cli" - "vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common" - "vitess.io/vitess/go/protoutil" - "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" - - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - topoprotopb "vitess.io/vitess/go/vt/topo/topoproto" -) - -var ( - tabletTypesDefault = []topodatapb.TabletType{ - topodatapb.TabletType_RDONLY, - topodatapb.TabletType_REPLICA, - topodatapb.TabletType_PRIMARY, - } - - createOptions = struct { - UUID uuid.UUID - SourceCells []string - TargetCells []string - TabletTypes []topodatapb.TabletType - Tables []string - Limit uint32 // We only accept positive values but pass on an int64 - FilteredReplicationWaitTime time.Duration - DebugQuery bool - OnlyPKs bool - UpdateTableStats bool - MaxExtraRowsToCompare uint32 // We only accept positive values but pass on an int64 - Wait bool - WaitUpdateInterval time.Duration - AutoRetry bool - }{} - - deleteOptions = struct { - Arg string - }{} - - resumeOptions = struct { - UUID uuid.UUID - }{} - - showOptions = struct { - Arg string - Verbose bool - }{} - - stopOptions = struct { - UUID uuid.UUID - }{} - - parseAndValidateCreate = func(cmd *cobra.Command, args []string) error { - var err error - if len(args) == 1 { // Validate UUID if provided - if createOptions.UUID, err = uuid.Parse(args[0]); err != nil { - return fmt.Errorf("invalid UUID provided: %v", err) - } - } else { // Generate a UUID - createOptions.UUID = uuid.New() - } - if !cmd.Flags().Lookup("tablet-types").Changed { - createOptions.TabletTypes = tabletTypesDefault - } - if cmd.Flags().Lookup("source-cells").Changed { - for i, cell := range createOptions.SourceCells { - createOptions.SourceCells[i] = strings.TrimSpace(cell) - } - } - if cmd.Flags().Lookup("target-cells").Changed { - for i, cell := range createOptions.TargetCells { - createOptions.TargetCells[i] = strings.TrimSpace(cell) - } - } - if cmd.Flags().Lookup("tables").Changed { - for i, table := range createOptions.Tables { - createOptions.Tables[i] = strings.TrimSpace(table) - } - } - return nil - } - - // base is the base command for all actions related to VDiff. - base = &cobra.Command{ - Use: "VDiff --workflow --keyspace [command] [command-flags]", - Short: "Perform commands related to diffing tables involved in a VReplication workflow between the source and target.", - Long: `VDiff commands: create, resume, show, stop, and delete. -See the --help output for each command for more details.`, - DisableFlagsInUseLine: true, - Aliases: []string{"vdiff"}, - Args: cobra.NoArgs, - } - - // create makes a VDiffCreate gRPC call to a vtctld. - create = &cobra.Command{ - Use: "create", - Short: "Create and run a VDiff to compare the tables involved in a VReplication workflow between the source and target.", - Example: `vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace customer -vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace customer create b3f59678-5241-11ee-be56-0242ac120002`, - SilenceUsage: true, - DisableFlagsInUseLine: true, - Aliases: []string{"Create"}, - Args: cobra.MaximumNArgs(1), - PreRunE: parseAndValidateCreate, - RunE: commandCreate, - } - - // delete makes a VDiffDelete gRPC call to a vtctld. - delete = &cobra.Command{ - Use: "delete", - Short: "Delete VDiffs.", - Example: `vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace delete a037a9e2-5628-11ee-8c99-0242ac120002 -vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace delete all`, - DisableFlagsInUseLine: true, - Aliases: []string{"Delete"}, - Args: cobra.ExactArgs(1), - PreRunE: func(cmd *cobra.Command, args []string) error { - larg := strings.ToLower(args[0]) - switch larg { - case "all": - default: - if _, err := uuid.Parse(args[0]); err != nil { - return fmt.Errorf("invalid argument provided (%s), valid arguments are 'all' or a valid UUID", - args[0]) - } - } - deleteOptions.Arg = larg - return nil - }, - RunE: commandDelete, - } - - // resume makes a VDiffResume gRPC call to a vtctld. - resume = &cobra.Command{ - Use: "resume", - Short: "Resume a VDiff.", - Example: `vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace resume a037a9e2-5628-11ee-8c99-0242ac120002`, - DisableFlagsInUseLine: true, - Aliases: []string{"Resume"}, - Args: cobra.ExactArgs(1), - PreRunE: func(cmd *cobra.Command, args []string) error { - uuid, err := uuid.Parse(args[0]) - if err != nil { - return fmt.Errorf("invalid UUID provided: %v", err) - } - resumeOptions.UUID = uuid - return nil - }, - RunE: commandResume, - } - - // show makes a VDiffShow gRPC call to a vtctld. - show = &cobra.Command{ - Use: "show", - Short: "Show the status of a VDiff.", - Example: `vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace show last -vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace show a037a9e2-5628-11ee-8c99-0242ac120002 -vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace show all`, - DisableFlagsInUseLine: true, - Aliases: []string{"Show"}, - Args: cobra.ExactArgs(1), - PreRunE: func(cmd *cobra.Command, args []string) error { - larg := strings.ToLower(args[0]) - switch larg { - case "last", "all": - default: - if _, err := uuid.Parse(args[0]); err != nil { - return fmt.Errorf("invalid argument provided (%s), valid arguments are 'all', 'last', or a valid UUID", - args[0]) - } - } - showOptions.Arg = larg - return nil - }, - RunE: commandShow, - } - - // stop makes a VDiffStop gRPC call to a vtctld. - stop = &cobra.Command{ - Use: "stop", - Short: "Stop a running VDiff.", - Example: `vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --target-keyspace stop a037a9e2-5628-11ee-8c99-0242ac120002`, - DisableFlagsInUseLine: true, - Aliases: []string{"Stop"}, - Args: cobra.ExactArgs(1), - PreRunE: func(cmd *cobra.Command, args []string) error { - uuid, err := uuid.Parse(args[0]) - if err != nil { - return fmt.Errorf("invalid UUID provided: %v", err) - } - stopOptions.UUID = uuid - return nil - }, - RunE: commandStop, - } -) - -type simpleResponse struct { - Action vdiff.VDiffAction - Status string -} - -// displaySimpleResponse displays a simple standard response for the -// resume, stop, and delete commands after the client command completes -// without an error. -func displaySimpleResponse(out io.Writer, format string, action vdiff.VDiffAction) { - status := "completed" - if action == vdiff.ResumeAction { - status = "scheduled" - } - if format == "json" { - resp := &simpleResponse{ - Action: action, - Status: status, - } - jsonText, _ := cli.MarshalJSONPretty(resp) - fmt.Fprintln(out, string(jsonText)) - } else { - fmt.Fprintf(out, "VDiff %s %s\n", action, status) - } -} - -func commandCreate(cmd *cobra.Command, args []string) error { - format, err := common.GetOutputFormat(cmd) - if err != nil { - return err - } - tsp := common.GetTabletSelectionPreference(cmd) - cli.FinishedParsing(cmd) - - resp, err := common.GetClient().VDiffCreate(common.GetCommandCtx(), &vtctldatapb.VDiffCreateRequest{ - Workflow: common.BaseOptions.Workflow, - TargetKeyspace: common.BaseOptions.TargetKeyspace, - Uuid: createOptions.UUID.String(), - SourceCells: createOptions.SourceCells, - TargetCells: createOptions.TargetCells, - TabletTypes: createOptions.TabletTypes, - TabletSelectionPreference: tsp, - Tables: createOptions.Tables, - Limit: int64(createOptions.Limit), - FilteredReplicationWaitTime: protoutil.DurationToProto(createOptions.FilteredReplicationWaitTime), - DebugQuery: createOptions.DebugQuery, - OnlyPKs: createOptions.OnlyPKs, - UpdateTableStats: createOptions.UpdateTableStats, - MaxExtraRowsToCompare: int64(createOptions.MaxExtraRowsToCompare), - Wait: createOptions.Wait, - WaitUpdateInterval: protoutil.DurationToProto(createOptions.WaitUpdateInterval), - AutoRetry: createOptions.AutoRetry, - }) - - if err != nil { - return err - } - - if createOptions.Wait { - tkr := time.NewTicker(createOptions.WaitUpdateInterval) - defer tkr.Stop() - var state vdiff.VDiffState - ctx := common.GetCommandCtx() - vtctldClient := common.GetClient() - uuidStr := createOptions.UUID.String() - for { - select { - case <-ctx.Done(): - return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") - case <-tkr.C: - resp, err := vtctldClient.VDiffShow(ctx, &vtctldatapb.VDiffShowRequest{ - Workflow: common.BaseOptions.Workflow, - TargetKeyspace: common.BaseOptions.TargetKeyspace, - Arg: uuidStr, - }) - if err != nil { - return err - } - if state, err = displayShowSingleSummary(cmd.OutOrStdout(), format, common.BaseOptions.TargetKeyspace, common.BaseOptions.Workflow, uuidStr, resp, false); err != nil { - return err - } - if state == vdiff.CompletedState { - return nil - } - } - } - } else { - var data []byte - if format == "json" { - data, err = cli.MarshalJSONPretty(resp) - if err != nil { - return err - } - } else { - data = []byte(fmt.Sprintf("VDiff %s scheduled on target shards, use show to view progress", resp.UUID)) - } - fmt.Println(string(data)) - } - - return nil -} - -func commandDelete(cmd *cobra.Command, args []string) error { - format, err := common.GetOutputFormat(cmd) - if err != nil { - return err - } - cli.FinishedParsing(cmd) - - _, err = common.GetClient().VDiffDelete(common.GetCommandCtx(), &vtctldatapb.VDiffDeleteRequest{ - Workflow: common.BaseOptions.Workflow, - TargetKeyspace: common.BaseOptions.TargetKeyspace, - Arg: deleteOptions.Arg, - }) - - if err != nil { - return err - } - - displaySimpleResponse(cmd.OutOrStdout(), format, vdiff.DeleteAction) - - return nil -} - -func commandResume(cmd *cobra.Command, args []string) error { - format, err := common.GetOutputFormat(cmd) - if err != nil { - return err - } - cli.FinishedParsing(cmd) - - _, err = common.GetClient().VDiffResume(common.GetCommandCtx(), &vtctldatapb.VDiffResumeRequest{ - Workflow: common.BaseOptions.Workflow, - TargetKeyspace: common.BaseOptions.TargetKeyspace, - Uuid: resumeOptions.UUID.String(), - }) - - if err != nil { - return err - } - - displaySimpleResponse(cmd.OutOrStdout(), format, vdiff.ResumeAction) - - return nil -} - -// tableSummary aggregates the current state of the table diff from all shards. -type tableSummary struct { - TableName string - State vdiff.VDiffState - RowsCompared int64 - MatchingRows int64 - MismatchedRows int64 - ExtraRowsSource int64 - ExtraRowsTarget int64 - LastUpdated string `json:"LastUpdated,omitempty"` -} - -// summary aggregates the current state of the vdiff from all shards. -type summary struct { - Workflow, Keyspace string - State vdiff.VDiffState - UUID string - RowsCompared int64 - HasMismatch bool - Shards string - StartedAt string `json:"StartedAt,omitempty"` - CompletedAt string `json:"CompletedAt,omitempty"` - TableSummaryMap map[string]tableSummary `json:"TableSummary,omitempty"` - Reports map[string]map[string]vdiff.DiffReport `json:"Reports,omitempty"` - Errors map[string]string `json:"Errors,omitempty"` - Progress *vdiff.ProgressReport `json:"Progress,omitempty"` -} - -const summaryTextTemplate = ` -VDiff Summary for {{.Keyspace}}.{{.Workflow}} ({{.UUID}}) -State: {{.State}} -{{if .Errors}} -{{- range $shard, $error := .Errors}} - Error: (shard {{$shard}}) {{$error}} -{{- end}} -{{end}} -RowsCompared: {{.RowsCompared}} -HasMismatch: {{.HasMismatch}} -StartedAt: {{.StartedAt}} -{{if (eq .State "started")}}Progress: {{printf "%.2f" .Progress.Percentage}}%%{{if .Progress.ETA}}, ETA: {{.Progress.ETA}}{{end}}{{end}} -{{if .CompletedAt}}CompletedAt: {{.CompletedAt}}{{end}} -{{range $table := .TableSummaryMap}} -Table {{$table.TableName}}: - State: {{$table.State}} - ProcessedRows: {{$table.RowsCompared}} - MatchingRows: {{$table.MatchingRows}} -{{if $table.MismatchedRows}} MismatchedRows: {{$table.MismatchedRows}}{{end}} -{{if $table.ExtraRowsSource}} ExtraRowsSource: {{$table.ExtraRowsSource}}{{end}} -{{if $table.ExtraRowsTarget}} ExtraRowsTarget: {{$table.ExtraRowsTarget}}{{end}} -{{end}} - -Use "--format=json" for more detailed output. -` - -type listing struct { - UUID, Workflow, Keyspace, Shard, State string -} - -func (vdl *listing) String() string { - return fmt.Sprintf("UUID: %s, Workflow: %s, Keyspace: %s, Shard: %s, State: %s", - vdl.UUID, vdl.Workflow, vdl.Keyspace, vdl.Shard, vdl.State) -} - -func getStructFieldNames(s any) []string { - t := reflect.TypeOf(s) - - names := make([]string, t.NumField()) - for i := range names { - names[i] = t.Field(i).Name - } - - return names -} - -func buildListings(listings []*listing) string { - var values []string - var lines [][]string - var result string - - if len(listings) == 0 { - return "" - } - // Get the column headers. - fields := getStructFieldNames(listing{}) - // The header is the first row. - lines = append(lines, fields) - for _, listing := range listings { - v := reflect.ValueOf(*listing) - for _, field := range fields { - values = append(values, v.FieldByName(field).String()) - } - lines = append(lines, values) - } - t := gotabulate.Create(lines) - result = t.Render("grid") - return result -} - -func displayShowResponse(out io.Writer, format, keyspace, workflowName, actionArg string, resp *vtctldatapb.VDiffShowResponse, verbose bool) error { - var vdiffUUID uuid.UUID - var err error - switch actionArg { - case vdiff.AllActionArg: - return displayShowRecent(out, format, keyspace, workflowName, actionArg, resp) - case vdiff.LastActionArg: - for _, resp := range resp.TabletResponses { - vdiffUUID, err = uuid.Parse(resp.VdiffUuid) - if err != nil { - if format == "json" { - fmt.Fprintln(out, "{}") - } else { - fmt.Fprintf(out, "No previous vdiff found for %s.%s\n", keyspace, workflowName) - } - return nil - } - break - } - fallthrough - default: - if vdiffUUID == uuid.Nil { // Then it must be passed as the action arg - vdiffUUID, err = uuid.Parse(actionArg) - if err != nil { - return err - } - } - if len(resp.TabletResponses) == 0 { - return fmt.Errorf("no response received for vdiff show of %s.%s (%s)", keyspace, workflowName, vdiffUUID.String()) - } - _, err = displayShowSingleSummary(out, format, keyspace, workflowName, vdiffUUID.String(), resp, verbose) - return err - } -} - -func displayShowRecent(out io.Writer, format, keyspace, workflowName, subCommand string, resp *vtctldatapb.VDiffShowResponse) error { - output := "" - recentListings, err := buildRecentListings(resp) - if err != nil { - return err - } - if format == "json" { - jsonText, err := cli.MarshalJSONPretty(recentListings) - if err != nil { - return err - } - output = string(jsonText) - if output == "null" { - output = "[]" - } - } else { - output = buildListings(recentListings) - if output == "" { - output = fmt.Sprintf("No vdiffs found for %s.%s", keyspace, workflowName) - } - } - fmt.Fprintln(out, output) - return nil -} - -func buildRecentListings(resp *vtctldatapb.VDiffShowResponse) ([]*listing, error) { - var listings []*listing - for _, resp := range resp.TabletResponses { - if resp != nil && resp.Output != nil { - qr := sqltypes.Proto3ToResult(resp.Output) - for _, row := range qr.Named().Rows { - listings = append(listings, &listing{ - UUID: row["vdiff_uuid"].ToString(), - Workflow: row["workflow"].ToString(), - Keyspace: row["keyspace"].ToString(), - Shard: row["shard"].ToString(), - State: row["state"].ToString(), - }) - } - } - } - return listings, nil -} - -func displayShowSingleSummary(out io.Writer, format, keyspace, workflowName, uuid string, resp *vtctldatapb.VDiffShowResponse, verbose bool) (vdiff.VDiffState, error) { - state := vdiff.UnknownState - var output string - summary, err := buildSingleSummary(keyspace, workflowName, uuid, resp, verbose) - if err != nil { - return state, err - } - if summary == nil { // Should never happen - return state, fmt.Errorf("no report to show for vdiff %s.%s (%s)", keyspace, workflowName, uuid) - } - state = summary.State - if format == "json" { - jsonText, err := cli.MarshalJSONPretty(summary) - if err != nil { - return state, err - } - output = string(jsonText) - } else { - tmpl, err := template.New("summary").Parse(summaryTextTemplate) - if err != nil { - return state, err - } - sb := new(strings.Builder) - err = tmpl.Execute(sb, summary) - if err != nil { - return state, err - } - output = sb.String() - for { - str := strings.Replace(output, "\n\n", "\n", -1) - if output == str { - break - } - output = str - } - } - fmt.Fprintln(out, output) - return state, nil -} - -func buildSingleSummary(keyspace, workflow, uuid string, resp *vtctldatapb.VDiffShowResponse, verbose bool) (*summary, error) { - summary := &summary{ - Workflow: workflow, - Keyspace: keyspace, - UUID: uuid, - State: vdiff.UnknownState, - RowsCompared: 0, - StartedAt: "", - CompletedAt: "", - HasMismatch: false, - Shards: "", - Reports: make(map[string]map[string]vdiff.DiffReport), - Errors: make(map[string]string), - Progress: nil, - } - - var tableSummaryMap map[string]tableSummary - var reports map[string]map[string]vdiff.DiffReport - // Keep a tally of the states across all tables in all shards. - tableStateCounts := map[vdiff.VDiffState]int{ - vdiff.UnknownState: 0, - vdiff.PendingState: 0, - vdiff.StartedState: 0, - vdiff.StoppedState: 0, - vdiff.ErrorState: 0, - vdiff.CompletedState: 0, - } - // Keep a tally of the summary states across all shards. - shardStateCounts := map[vdiff.VDiffState]int{ - vdiff.UnknownState: 0, - vdiff.PendingState: 0, - vdiff.StartedState: 0, - vdiff.StoppedState: 0, - vdiff.ErrorState: 0, - vdiff.CompletedState: 0, - } - // Keep a tally of the approximate total rows to process as we'll use this for our progress - // report. - totalRowsToCompare := int64(0) - var shards []string - for shard, resp := range resp.TabletResponses { - first := true - if resp != nil && resp.Output != nil { - shards = append(shards, shard) - qr := sqltypes.Proto3ToResult(resp.Output) - if tableSummaryMap == nil { - tableSummaryMap = make(map[string]tableSummary, 0) - reports = make(map[string]map[string]vdiff.DiffReport, 0) - } - for _, row := range qr.Named().Rows { - // Update the global VDiff summary based on the per shard level summary. - // Since these values will be the same for all subsequent rows we only use - // the first row. - if first { - first = false - // Our timestamps are strings in `2022-06-26 20:43:25` format so we sort - // them lexicographically. - // We should use the earliest started_at across all shards. - if sa := row.AsString("started_at", ""); summary.StartedAt == "" || sa < summary.StartedAt { - summary.StartedAt = sa - } - // And we should use the latest completed_at across all shards. - if ca := row.AsString("completed_at", ""); summary.CompletedAt == "" || ca > summary.CompletedAt { - summary.CompletedAt = ca - } - // If we had an error on the shard, then let's add that to the summary. - if le := row.AsString("last_error", ""); le != "" { - summary.Errors[shard] = le - } - // Keep track of how many shards are marked as a specific state. We check - // this combined with the shard.table states to determine the VDiff summary - // state. - shardStateCounts[vdiff.VDiffState(strings.ToLower(row.AsString("vdiff_state", "")))]++ - } - - // Global VDiff summary updates that take into account the per table details - // per shard. - { - summary.RowsCompared += row.AsInt64("rows_compared", 0) - totalRowsToCompare += row.AsInt64("table_rows", 0) - - // If we had a mismatch on any table on any shard then the global VDiff - // summary does too. - if mm, _ := row.ToBool("has_mismatch"); mm { - summary.HasMismatch = true - } - } - - // Table summary information that must be accounted for across all shards. - { - table := row.AsString("table_name", "") - // Create the global VDiff table summary object if it doesn't exist. - if _, ok := tableSummaryMap[table]; !ok { - tableSummaryMap[table] = tableSummary{ - TableName: table, - State: vdiff.UnknownState, - } - - } - ts := tableSummaryMap[table] - // This is the shard level VDiff table state. - sts := vdiff.VDiffState(strings.ToLower(row.AsString("table_state", ""))) - tableStateCounts[sts]++ - - // The error state must be sticky, and we should not override any other - // known state with completed. - switch sts { - case vdiff.CompletedState: - if ts.State == vdiff.UnknownState { - ts.State = sts - } - case vdiff.ErrorState: - ts.State = sts - default: - if ts.State != vdiff.ErrorState { - ts.State = sts - } - } - - diffReport := row.AsString("report", "") - dr := vdiff.DiffReport{} - if diffReport != "" { - err := json.Unmarshal([]byte(diffReport), &dr) - if err != nil { - return nil, err - } - ts.RowsCompared += dr.ProcessedRows - ts.MismatchedRows += dr.MismatchedRows - ts.MatchingRows += dr.MatchingRows - ts.ExtraRowsTarget += dr.ExtraRowsTarget - ts.ExtraRowsSource += dr.ExtraRowsSource - } - if _, ok := reports[table]; !ok { - reports[table] = make(map[string]vdiff.DiffReport) - } - - reports[table][shard] = dr - tableSummaryMap[table] = ts - } - } - } - } - - // The global VDiff summary should progress from pending->started->completed with - // stopped for any shard and error for any table being sticky for the global summary. - // We should only consider the VDiff to be complete if it's completed for every table - // on every shard. - if shardStateCounts[vdiff.StoppedState] > 0 { - summary.State = vdiff.StoppedState - } else if shardStateCounts[vdiff.ErrorState] > 0 || tableStateCounts[vdiff.ErrorState] > 0 { - summary.State = vdiff.ErrorState - } else if tableStateCounts[vdiff.StartedState] > 0 { - summary.State = vdiff.StartedState - } else if tableStateCounts[vdiff.PendingState] > 0 { - summary.State = vdiff.PendingState - } else if tableStateCounts[vdiff.CompletedState] == (len(tableSummaryMap) * len(shards)) { - // When doing shard consolidations/merges, we cannot rely solely on the - // vdiff_table state as there are N sources that we process rows from sequentially - // with each one writing to the shared _vt.vdiff_table record for the target shard. - // So we only mark the vdiff for the shard as completed when we've finished - // processing rows from all of the sources -- which is recorded by marking the - // vdiff done for the shard by setting _vt.vdiff.state = completed. - if shardStateCounts[vdiff.CompletedState] == len(shards) { - summary.State = vdiff.CompletedState - } else { - summary.State = vdiff.StartedState - } - } else { - summary.State = vdiff.UnknownState - } - - // If the vdiff has been started then we can calculate the progress. - if summary.State == vdiff.StartedState { - buildProgressReport(summary, totalRowsToCompare) - } - - sort.Strings(shards) // Sort for predictable output - summary.Shards = strings.Join(shards, ",") - summary.TableSummaryMap = tableSummaryMap - summary.Reports = reports - if !summary.HasMismatch && !verbose { - summary.Reports = nil - summary.TableSummaryMap = nil - } - // If we haven't completed the global VDiff then be sure to reflect that with no - // CompletedAt value. - if summary.State != vdiff.CompletedState { - summary.CompletedAt = "" - } - return summary, nil -} - -func buildProgressReport(summary *summary, rowsToCompare int64) { - report := &vdiff.ProgressReport{} - if summary.RowsCompared >= 1 { - // Round to 2 decimal points. - report.Percentage = math.Round(math.Min((float64(summary.RowsCompared)/float64(rowsToCompare))*100, 100.00)*100) / 100 - } - if math.IsNaN(report.Percentage) { - report.Percentage = 0 - } - pctToGo := math.Abs(report.Percentage - 100.00) - startTime, _ := time.Parse(vdiff.TimestampFormat, summary.StartedAt) - curTime := time.Now().UTC() - runTime := curTime.Unix() - startTime.Unix() - if report.Percentage >= 1 { - // Calculate how long 1% took, on avg, and multiply that by the % left. - eta := time.Unix(((int64(runTime)/int64(report.Percentage))*int64(pctToGo))+curTime.Unix(), 1).UTC() - // Cap the ETA at 1 year out to prevent providing nonsensical ETAs. - if eta.Before(time.Now().UTC().AddDate(1, 0, 0)) { - report.ETA = eta.Format(vdiff.TimestampFormat) - } - } - summary.Progress = report -} - -func commandShow(cmd *cobra.Command, args []string) error { - format, err := common.GetOutputFormat(cmd) - if err != nil { - return err - } - cli.FinishedParsing(cmd) - - resp, err := common.GetClient().VDiffShow(common.GetCommandCtx(), &vtctldatapb.VDiffShowRequest{ - Workflow: common.BaseOptions.Workflow, - TargetKeyspace: common.BaseOptions.TargetKeyspace, - Arg: showOptions.Arg, - }) - - if err != nil { - return err - } - - if err = displayShowResponse(cmd.OutOrStdout(), format, common.BaseOptions.TargetKeyspace, common.BaseOptions.Workflow, showOptions.Arg, resp, showOptions.Verbose); err != nil { - return err - } - - return nil -} - -func commandStop(cmd *cobra.Command, args []string) error { - format, err := common.GetOutputFormat(cmd) - if err != nil { - return err - } - cli.FinishedParsing(cmd) - - _, err = common.GetClient().VDiffStop(common.GetCommandCtx(), &vtctldatapb.VDiffStopRequest{ - Workflow: common.BaseOptions.Workflow, - TargetKeyspace: common.BaseOptions.TargetKeyspace, - Uuid: stopOptions.UUID.String(), - }) - - if err != nil { - return err - } - - displaySimpleResponse(cmd.OutOrStdout(), format, vdiff.StopAction) - - return nil -} - -func registerVDiffCommands(root *cobra.Command) { - common.AddCommonFlags(base) - root.AddCommand(base) - - create.Flags().StringSliceVar(&createOptions.SourceCells, "source-cells", nil, "The source cell(s) to compare from; default is any available cell.") - create.Flags().StringSliceVar(&createOptions.TargetCells, "target-cells", nil, "The target cell(s) to compare with; default is any available cell.") - create.Flags().Var((*topoprotopb.TabletTypeListFlag)(&createOptions.TabletTypes), "tablet-types", "Tablet types to use on the source and target.") - create.Flags().BoolVar(&common.CreateOptions.TabletTypesInPreferenceOrder, "tablet-types-in-preference-order", true, "When performing source tablet selection, look for candidates in the type order as they are listed in the tablet-types flag.") - create.Flags().DurationVar(&createOptions.FilteredReplicationWaitTime, "filtered-replication-wait-time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for replication to catch up when syncing tablet streams.") - create.Flags().Uint32Var(&createOptions.Limit, "limit", math.MaxUint32, "Max rows to stop comparing after.") - create.Flags().BoolVar(&createOptions.DebugQuery, "debug-query", false, "Adds a mysql query to the report that can be used for further debugging.") - create.Flags().BoolVar(&createOptions.OnlyPKs, "only-pks", false, "When reporting missing rows, only show primary keys in the report.") - create.Flags().StringSliceVar(&createOptions.Tables, "tables", nil, "Only run vdiff for these tables in the workflow.") - create.Flags().Uint32Var(&createOptions.MaxExtraRowsToCompare, "max-extra-rows-to-compare", 1000, "If there are collation differences between the source and target, you can have rows that are identical but simply returned in a different order from MySQL. We will do a second pass to compare the rows for any actual differences in this case and this flag allows you to control the resources used for this operation.") - create.Flags().BoolVar(&createOptions.Wait, "wait", false, "When creating or resuming a vdiff, wait for it to finish before exiting.") - create.Flags().DurationVar(&createOptions.WaitUpdateInterval, "wait-update-interval", time.Duration(1*time.Minute), "When waiting on a vdiff to finish, check and display the current status this often.") - create.Flags().BoolVar(&createOptions.AutoRetry, "auto-retry", true, "Should this vdiff automatically retry and continue in case of recoverable errors.") - create.Flags().BoolVar(&createOptions.UpdateTableStats, "update-table-stats", false, "Update the table statistics, using ANALYZE TABLE, on each table involved in the VDiff during initialization. This will ensure that progress estimates are as accurate as possible -- but it does involve locks and can potentially impact query processing on the target keyspace.") - base.AddCommand(create) - - base.AddCommand(delete) - - base.AddCommand(resume) - - show.Flags().BoolVar(&showOptions.Verbose, "verbose", false, "Show verbose output in summaries") - base.AddCommand(show) - - base.AddCommand(stop) -} - -func init() { - common.RegisterCommandHandler("VDiff", registerVDiffCommands) -} diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index 78a5728d9ee..0627e39b775 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -190,11 +190,11 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil) if tc.autoRetryError { - testAutoRetryError(t, tc, cells[0].Name) + testAutoRetryError(t, tc, allCellNames) } if tc.resume { - testResume(t, tc, cells[0].Name) + testResume(t, tc, allCellNames) } // These are done here so that we have a valid workflow to test the commands against diff --git a/go/test/endtoend/vreplication/vdiff_helper_test.go b/go/test/endtoend/vreplication/vdiff_helper_test.go index c2395c87c7d..6035063902d 100644 --- a/go/test/endtoend/vreplication/vdiff_helper_test.go +++ b/go/test/endtoend/vreplication/vdiff_helper_test.go @@ -173,8 +173,11 @@ func doVdiff2(t *testing.T, keyspace, workflow, cells string, want *expectedVDif func performVDiff2Action(t *testing.T, ksWorkflow, cells, action, actionArg string, expectError bool, extraFlags ...string) (uuid string, output string) { var err error -<<<<<<< HEAD - args := []string{"VDiff", "--", "--tablet_types=primary", "--source_cell=" + cells, "--format=json"} + // This will always result in us using a PRIMARY tablet, which is all + // we start in many e2e tests, but it avoids the tablet picker logic + // where when you ONLY specify the PRIMARY type it then picks the + // shard's primary and ignores any cell settings. + args := []string{"VDiff", "--", "--tablet_types=in_order:primary,replica", "--source_cell=" + cells, "--format=json"} if len(extraFlags) > 0 { args = append(args, extraFlags...) } @@ -185,49 +188,6 @@ func performVDiff2Action(t *testing.T, ksWorkflow, cells, action, actionArg stri require.Nil(t, err) uuid = gjson.Get(output, "UUID").String() if action != "delete" && !(action == "show" && actionArg == "all") { // a UUID is not required -======= - targetKeyspace, workflowName, ok := strings.Cut(ksWorkflow, ".") - require.True(t, ok, "invalid keyspace.workflow value: %s", ksWorkflow) - - if useVtctlclient { - // This will always result in us using a PRIMARY tablet, which is all - // we start in many e2e tests, but it avoids the tablet picker logic - // where when you ONLY specify the PRIMARY type it then picks the - // shard's primary and ignores any cell settings. - args := []string{"VDiff", "--", "--tablet_types=in_order:primary,replica", "--source_cell=" + cells, "--format=json"} - if len(extraFlags) > 0 { - args = append(args, extraFlags...) - } - args = append(args, ksWorkflow, action, actionArg) - output, err = vc.VtctlClient.ExecuteCommandWithOutput(args...) - log.Infof("vdiff output: %+v (err: %+v)", output, err) - if !expectError { - require.Nil(t, err) - uuid = gjson.Get(output, "UUID").String() - if action != "delete" && !(action == "show" && actionArg == "all") { // A UUID is not required - require.NoError(t, err) - require.NotEmpty(t, uuid) - } - } - } else { - args := []string{"VDiff", "--target-keyspace", targetKeyspace, "--workflow", workflowName, "--format=json", action} - if strings.ToLower(action) == string(vdiff2.CreateAction) { - // This will always result in us using a PRIMARY tablet, which is all - // we start in many e2e tests, but it avoids the tablet picker logic - // where when you ONLY specify the PRIMARY type it then picks the - // shard's primary and ignores any cell settings. - args = append(args, "--tablet-types=primary,replica", "--tablet-types-in-preference-order", "--source-cells="+cells) - } - if len(extraFlags) > 0 { - args = append(args, extraFlags...) - } - if actionArg != "" { - args = append(args, actionArg) - } - output, err = vc.VtctldClient.ExecuteCommandWithOutput(args...) - log.Infof("vdiff output: %+v (err: %+v)", output, err) - if !expectError { ->>>>>>> 2f679aaab1 (VDiff: properly split cell values in record when using TabletPicker (#14099)) require.NoError(t, err) require.NotEmpty(t, uuid) } diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index f19b39f1359..799f95d080e 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -224,13 +224,8 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { wg.Add(1) go func() { defer wg.Done() -<<<<<<< HEAD - err1 = td.forEachSource(func(source *migrationSource) error { - tablet, err := pickTablet(ctx, sourceTopoServer, cell, ct.sourceKeyspace, source.shard, tabletTypes) -======= sourceErr = td.forEachSource(func(source *migrationSource) error { - sourceTablet, err := pickTablet(ctx, sourceTopoServer, sourceCells, td.wd.ct.vde.thisTablet.Alias.Cell, td.wd.ct.sourceKeyspace, source.shard, td.wd.opts.PickerOptions.TabletTypes) ->>>>>>> 2f679aaab1 (VDiff: properly split cell values in record when using TabletPicker (#14099)) + sourceTablet, err := pickTablet(ctx, sourceTopoServer, sourceCells, td.wd.ct.sourceKeyspace, source.shard, td.wd.opts.PickerOptions.TabletTypes) if err != nil { return err } @@ -242,15 +237,9 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { wg.Add(1) go func() { defer wg.Done() -<<<<<<< HEAD - tablet, err2 := pickTablet(ctx, ct.ts, td.wd.opts.PickerOptions.TargetCell, ct.vde.thisTablet.Keyspace, - ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes) - if err2 != nil { -======= - targetTablet, targetErr = pickTablet(ctx, td.wd.ct.ts, targetCells, td.wd.ct.vde.thisTablet.Alias.Cell, td.wd.ct.vde.thisTablet.Keyspace, - td.wd.ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes) + targetTablet, targetErr = pickTablet(ctx, td.wd.ct.ts, targetCells, td.wd.ct.vde.thisTablet.Keyspace, td.wd.ct.vde.thisTablet.Shard, + td.wd.opts.PickerOptions.TabletTypes) if targetErr != nil { ->>>>>>> 2f679aaab1 (VDiff: properly split cell values in record when using TabletPicker (#14099)) return } td.wd.ct.targetShardStreamer = &shardStreamer{ @@ -266,13 +255,8 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { return targetErr } -<<<<<<< HEAD -func pickTablet(ctx context.Context, ts *topo.Server, cell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) { - tp, err := discovery.NewTabletPicker(ts, []string{cell}, keyspace, shard, tabletTypes) -======= -func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) { - tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}) ->>>>>>> 2f679aaab1 (VDiff: properly split cell values in record when using TabletPicker (#14099)) +func pickTablet(ctx context.Context, ts *topo.Server, cells []string, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) { + tp, err := discovery.NewTabletPicker(ts, cells, keyspace, shard, tabletTypes) if err != nil { return nil, err }