Skip to content

Commit

Permalink
VDiff: Support a max diff time for tables (#14786)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Dec 27, 2023
1 parent 1ee8f28 commit 4bb84ed
Show file tree
Hide file tree
Showing 19 changed files with 793 additions and 415 deletions.
3 changes: 3 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var (
Wait bool
WaitUpdateInterval time.Duration
AutoRetry bool
MaxDiffDuration time.Duration
}{}

deleteOptions = struct {
Expand Down Expand Up @@ -291,6 +292,7 @@ func commandCreate(cmd *cobra.Command, args []string) error {
WaitUpdateInterval: protoutil.DurationToProto(createOptions.WaitUpdateInterval),
AutoRetry: createOptions.AutoRetry,
MaxReportSampleRows: createOptions.MaxReportSampleRows,
MaxDiffDuration: protoutil.DurationToProto(createOptions.MaxDiffDuration),
})

if err != nil {
Expand Down Expand Up @@ -883,6 +885,7 @@ func registerCommands(root *cobra.Command) {
create.Flags().DurationVar(&createOptions.WaitUpdateInterval, "wait-update-interval", time.Duration(1*time.Minute), "When waiting on a vdiff to finish, check and display the current status this often.")
create.Flags().BoolVar(&createOptions.AutoRetry, "auto-retry", true, "Should this vdiff automatically retry and continue in case of recoverable errors.")
create.Flags().BoolVar(&createOptions.UpdateTableStats, "update-table-stats", false, "Update the table statistics, using ANALYZE TABLE, on each table involved in the VDiff during initialization. This will ensure that progress estimates are as accurate as possible -- but it does involve locks and can potentially impact query processing on the target keyspace.")
create.Flags().DurationVar(&createOptions.MaxDiffDuration, "max-diff-duration", 0, "How long should an individual table diff run before being stopped and restarted in order to lessen the impact on tablets due to holding open database snapshots for long periods of time (0 is the default and means no time limit).")
base.AddCommand(create)

base.AddCommand(delete)
Expand Down
113 changes: 86 additions & 27 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import (
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
)

Expand All @@ -50,6 +52,7 @@ type testCase struct {
testCLIErrors bool // test CLI errors against this workflow (only needs to be done once)
testCLICreateWait bool // test CLI create and wait until done against this workflow (only needs to be done once)
testCLIFlagHandling bool // test vtctldclient flag handling from end-to-end
extraVDiffFlags map[string]string
}

const (
Expand All @@ -71,12 +74,15 @@ var testCases = []*testCase{
tabletBaseID: 200,
tables: "customer,Lead,Lead-1,nopk",
autoRetryError: true,
retryInsert: `insert into customer(cid, name, typ) values(1991234, 'Testy McTester', 'soho')`,
retryInsert: `insert into customer(cid, name, typ) values(2005149100, 'Testy McTester', 'soho')`,
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(1992234, 'Testy McTester (redux)', 'enterprise')`,
resumeInsert: `insert into customer(cid, name, typ) values(2005149200, 'Testy McTester (redux)', 'enterprise')`,
testCLIErrors: true, // test for errors in the simplest workflow
testCLICreateWait: true, // test wait on create feature against simplest workflow
testCLIFlagHandling: true, // test flag handling end-to-end against simplest workflow
extraVDiffFlags: map[string]string{
"--max-diff-duration": "2s",
},
},
{
name: "Reshard Merge/split 2 to 3",
Expand All @@ -88,9 +94,9 @@ var testCases = []*testCase{
targetShards: "-40,40-a0,a0-",
tabletBaseID: 400,
autoRetryError: true,
retryInsert: `insert into customer(cid, name, typ) values(1993234, 'Testy McTester Jr', 'enterprise'), (1993235, 'Testy McTester II', 'enterprise')`,
retryInsert: `insert into customer(cid, name, typ) values(2005149300, 'Testy McTester Jr', 'enterprise'), (2005149350, 'Testy McTester II', 'enterprise')`,
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(1994234, 'Testy McTester III', 'enterprise')`,
resumeInsert: `insert into customer(cid, name, typ) values(2005149400, 'Testy McTester III', 'enterprise')`,
stop: true,
},
{
Expand All @@ -103,9 +109,9 @@ var testCases = []*testCase{
targetShards: "0",
tabletBaseID: 700,
autoRetryError: true,
retryInsert: `insert into customer(cid, name, typ) values(1995234, 'Testy McTester IV', 'enterprise')`,
retryInsert: `insert into customer(cid, name, typ) values(2005149500, 'Testy McTester IV', 'enterprise')`,
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(1996234, 'Testy McTester V', 'enterprise'), (1996235, 'Testy McTester VI', 'enterprise')`,
resumeInsert: `insert into customer(cid, name, typ) values(2005149600, 'Testy McTester V', 'enterprise'), (2005149650, 'Testy McTester VI', 'enterprise')`,
stop: true,
},
}
Expand All @@ -116,8 +122,13 @@ func TestVDiff2(t *testing.T) {
sourceShards := []string{"0"}
targetKs := "customer"
targetShards := []string{"-80", "80-"}
// This forces us to use multiple vstream packets even with small test tables.
extraVTTabletArgs = []string{"--vstream_packet_size=1"}
extraVTTabletArgs = []string{
// This forces us to use multiple vstream packets even with small test tables.
"--vstream_packet_size=1",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}

vc = NewVitessCluster(t, &clusterOptions{cells: strings.Split(cellNames, ",")})
defer vc.TearDown()
Expand Down Expand Up @@ -181,12 +192,58 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,
err := vc.VtctlClient.ExecuteCommand(args...)
require.NoError(t, err)

for _, shard := range arrTargetShards {
tab := vc.getPrimaryTablet(t, tc.targetKs, shard)
catchup(t, tab, tc.workflow, tc.typ)
waitForShardsToCatchup := func() {
for _, shard := range arrTargetShards {
tab := vc.getPrimaryTablet(t, tc.targetKs, shard)
catchup(t, tab, tc.workflow, tc.typ)
}
}

vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil)
// Wait for the workflow to finish the copy phase and initially catch up.
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
waitForShardsToCatchup()

if diffDuration, ok := tc.extraVDiffFlags["--max-diff-duration"]; ok {
if !strings.Contains(tc.tables, "customer") {
require.Fail(t, "customer table must be included in the table list to test --max-diff-duration")
}
// Generate enough customer table data so that the table diff gets restarted.
dur, err := time.ParseDuration(diffDuration)
require.NoError(t, err, "could not parse --max-diff-duration %q: %v", diffDuration, err)
seconds := int64(dur.Seconds())
chunkSize := int64(100000)
perSecondCount := int64(1000000)
totalRowsToCreate := seconds * perSecondCount
for i := int64(0); i < totalRowsToCreate; i += chunkSize {
generateMoreCustomers(t, sourceKs, chunkSize)
}

// Wait for the workflow to catch up after all the inserts.
waitForShardsToCatchup()

// This flag is only implemented in vtctldclient.
doVtctldclientVDiff(t, tc.targetKs, tc.workflow, allCellNames, nil, "--max-diff-duration", diffDuration)

// Confirm that the customer table diff was restarted but not others.
tablet := vc.getPrimaryTablet(t, tc.targetKs, arrTargetShards[0])
stat, err := getDebugVar(t, tablet.Port, []string{"VDiffRestartedTableDiffsCount"})
require.NoError(t, err, "failed to get VDiffRestartedTableDiffsCount stat: %v", err)
customerRestarts := gjson.Parse(stat).Get("customer").Int()
require.Greater(t, customerRestarts, int64(0), "expected VDiffRestartedTableDiffsCount stat to be greater than 0 for the customer table, got %d", customerRestarts)
leadRestarts := gjson.Parse(stat).Get("lead").Int()
require.Equal(t, int64(0), leadRestarts, "expected VDiffRestartedTableDiffsCount stat to be 0 for the Lead table, got %d", leadRestarts)

// Cleanup the created customer records so as not to slow down the rest of the test.
delstmt := fmt.Sprintf("delete from %s.customer order by cid desc limit %d", sourceKs, chunkSize)
for i := int64(0); i < totalRowsToCreate; i += chunkSize {
_, err := vtgateConn.ExecuteFetch(delstmt, int(chunkSize), false)

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_basic)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_basic)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_basic)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_migrate_vdiff2_convert_tz)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_migrate_vdiff2_convert_tz)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_migrate_vdiff2_convert_tz)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_migrate_vdiff2_convert_tz)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_partial_movetables_and_materialize)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_partial_movetables_and_materialize)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_partial_movetables_and_materialize)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_partial_movetables_and_materialize)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_partial_movetables_and_materialize)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_partial_movetables_and_materialize)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_partial_movetables_and_materialize)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_partial_movetables_and_materialize)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_partial_movetables_and_materialize)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_partial_movetables_and_materialize)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

undefined: vtgateConn (typecheck)

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_foreign_key_stress)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_cellalias)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_cellalias)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_cellalias)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_cellalias)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_cellalias)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_cellalias)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_cellalias)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_cellalias)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_cellalias)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vstream)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vstream)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vstream)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vstream)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vstream)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vstream)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (schemadiff_vrepl)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (schemadiff_vrepl)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_v2)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (schemadiff_vrepl) mysql57

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (schemadiff_vrepl) mysql57

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_across_db_versions)

undefined: vtgateConn

Check failure on line 239 in go/test/endtoend/vreplication/vdiff2_test.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_across_db_versions)

undefined: vtgateConn
require.NoError(t, err, "failed to cleanup added customer records: %v", err)
}
// Wait for the workflow to catch up again on the deletes.
waitForShardsToCatchup()
} else {
vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil)
}

if tc.autoRetryError {
testAutoRetryError(t, tc, allCellNames)
Expand All @@ -196,7 +253,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,
testResume(t, tc, allCellNames)
}

// These are done here so that we have a valid workflow to test the commands against
// These are done here so that we have a valid workflow to test the commands against.
if tc.stop {
testStop(t, ksWorkflow, allCellNames)
}
Expand All @@ -212,7 +269,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,

testDelete(t, ksWorkflow, allCellNames)

// create another VDiff record to confirm it gets deleted when the workflow is completed
// Create another VDiff record to confirm it gets deleted when the workflow is completed.
ts := time.Now()
uuid, _ := performVDiff2Action(t, false, ksWorkflow, allCellNames, "create", "", false)
waitForVDiff2ToComplete(t, false, ksWorkflow, allCellNames, uuid, ts)
Expand All @@ -222,7 +279,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,
err = vc.VtctlClient.ExecuteCommand(tc.typ, "--", "Complete", ksWorkflow)
require.NoError(t, err)

// confirm the VDiff data is deleted for the workflow
// Confirm the VDiff data is deleted for the workflow.
testNoOrphanedData(t, tc.targetKs, tc.workflow, arrTargetShards)
}

Expand Down Expand Up @@ -254,6 +311,7 @@ func testCLIFlagHandling(t *testing.T, targetKs, workflowName string, cell *Cell
AutoRetry: true,
UpdateTableStats: true,
TimeoutSeconds: 60,
MaxDiffSeconds: 333,
},
PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{
SourceCell: "zone1,zone2,zone3,zonefoosource",
Expand All @@ -273,6 +331,7 @@ func testCLIFlagHandling(t *testing.T, targetKs, workflowName string, cell *Cell
"--max-report-sample-rows", fmt.Sprintf("%d", expectedOptions.ReportOptions.MaxSampleRows),
"--max-extra-rows-to-compare", fmt.Sprintf("%d", expectedOptions.CoreOptions.MaxExtraRowsToCompare),
"--filtered-replication-wait-time", fmt.Sprintf("%v", time.Duration(expectedOptions.CoreOptions.TimeoutSeconds)*time.Second),
"--max-diff-duration", fmt.Sprintf("%v", time.Duration(expectedOptions.CoreOptions.MaxDiffSeconds)*time.Second),
"--source-cells", expectedOptions.PickerOptions.SourceCell,
"--target-cells", expectedOptions.PickerOptions.TargetCell,
"--tablet-types", expectedOptions.PickerOptions.TabletTypes,
Expand Down Expand Up @@ -370,13 +429,13 @@ func testResume(t *testing.T, tc *testCase, cells string) {
defer closeConn()
ksWorkflow := fmt.Sprintf("%s.%s", tc.targetKs, tc.workflow)

// confirm the last VDiff is in the expected completed state
// Confirm the last VDiff is in the expected completed state.
uuid, output := performVDiff2Action(t, false, ksWorkflow, cells, "show", "last", false)
jsonOutput := getVDiffInfo(output)
require.Equal(t, "completed", jsonOutput.State)
// save the number of rows compared in previous runs
// Save the number of rows compared in previous runs.
rowsCompared := jsonOutput.RowsCompared
ogTime := time.Now() // the completed_at should be later than this after resuming
ogTime := time.Now() // The completed_at should be later than this after resuming

expectedNewRows := int64(0)
if tc.resumeInsert != "" {
Expand Down Expand Up @@ -415,34 +474,34 @@ func testAutoRetryError(t *testing.T, tc *testCase, cells string) {
defer closeConn()
ksWorkflow := fmt.Sprintf("%s.%s", tc.targetKs, tc.workflow)

// confirm the last VDiff is in the expected completed state
// Confirm the last VDiff is in the expected completed state.
uuid, output := performVDiff2Action(t, false, ksWorkflow, cells, "show", "last", false)
jsonOutput := getVDiffInfo(output)
require.Equal(t, "completed", jsonOutput.State)
// save the number of rows compared in the first run
// Save the number of rows compared in the first run.
rowsCompared := jsonOutput.RowsCompared
ogTime := time.Now() // the completed_at should be later than this upon retry
ogTime := time.Now() // The completed_at should be later than this upon retry

// create new data since original VDiff run -- if requested -- to confirm that the rows
// compared is cumulative
// Create new data since original VDiff run -- if requested -- to confirm that the rows
// compared is cumulative.
expectedNewRows := int64(0)
if tc.retryInsert != "" {
res := execVtgateQuery(t, vtgateConn, tc.sourceKs, tc.retryInsert)
expectedNewRows = int64(res.RowsAffected)
}
expectedRows := rowsCompared + expectedNewRows

// update the VDiff to simulate an ephemeral error having occurred
// Update the VDiff to simulate an ephemeral error having occurred.
for _, shard := range strings.Split(tc.targetShards, ",") {
tab := vc.getPrimaryTablet(t, tc.targetKs, shard)
res, err := tab.QueryTabletWithDB(sqlparser.BuildParsedQuery(sqlSimulateError, sidecarDBIdentifier, sidecarDBIdentifier, encodeString(uuid)).Query, "vt_"+tc.targetKs)
require.NoError(t, err)
// should have updated the vdiff record and at least one vdiff_table record
// Should have updated the vdiff record and at least one vdiff_table record.
require.GreaterOrEqual(t, int(res.RowsAffected), 2)
}

// confirm that the VDiff was retried, able to complete, and we compared the expected
// number of rows in total (original run and retry)
// Confirm that the VDiff was retried, able to complete, and we compared the expected
// number of rows in total (original run and retry).
info := waitForVDiff2ToComplete(t, false, ksWorkflow, cells, uuid, ogTime)
require.NotNil(t, info)
require.False(t, info.HasMismatch)
Expand Down
12 changes: 8 additions & 4 deletions go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
)

const (
vdiffTimeout = 90 * time.Second // we can leverage auto retry on error with this longer-than-usual timeout
vdiffTimeout = 120 * time.Second // We can leverage auto retry on error with this longer-than-usual timeout
vdiffRetryTimeout = 30 * time.Second
vdiffStatusCheckInterval = 5 * time.Second
vdiffRetryInterval = 5 * time.Second
Expand Down Expand Up @@ -109,7 +109,7 @@ func waitForVDiff2ToComplete(t *testing.T, useVtctlclient bool, ksWorkflow, cell
}
ch <- true
return
} else if info.State == "started" { // test the progress report
} else if info.State == "started" { // Test the progress report
// The ETA should always be in the future -- when we're able to estimate
// it -- and the progress percentage should only increase.
// The timestamp format allows us to compare them lexicographically.
Expand Down Expand Up @@ -154,11 +154,15 @@ type expectedVDiff2Result struct {
hasMismatch bool
}

func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *expectedVDiff2Result) {
func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *expectedVDiff2Result, extraFlags ...string) {
ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflow)
t.Run(fmt.Sprintf("vtctldclient vdiff %s", ksWorkflow), func(t *testing.T) {
// update-table-stats is needed in order to test progress reports.
uuid, _ := performVDiff2Action(t, false, ksWorkflow, cells, "create", "", false, "--auto-retry", "--update-table-stats")
flags := []string{"--auto-retry", "--update-table-stats"}
if len(extraFlags) > 0 {
flags = append(flags, extraFlags...)
}
uuid, _ := performVDiff2Action(t, false, ksWorkflow, cells, "create", "", false, flags...)
info := waitForVDiff2ToComplete(t, false, ksWorkflow, cells, uuid, time.Time{})
require.NotNil(t, info)
require.Equal(t, workflow, info.Workflow)
Expand Down
Loading

0 comments on commit 4bb84ed

Please sign in to comment.