diff --git a/go/test/endtoend/backup/vtctlbackup/backup_utils.go b/go/test/endtoend/backup/vtctlbackup/backup_utils.go index 3a0150cc87c..5c69a110e15 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_utils.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_utils.go @@ -1133,6 +1133,17 @@ func GetReplicaGtidPurged(t *testing.T, replicaIndex int) string { return row.AsString("gtid_purged", "") } +func ReconnectReplicaToPrimary(t *testing.T, replicaIndex int) { + query := fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='localhost', SOURCE_PORT=%d, SOURCE_USER='vt_repl', SOURCE_AUTO_POSITION = 1", primary.MySQLPort) + replica := getReplica(t, replicaIndex) + _, err := replica.VttabletProcess.QueryTablet("stop replica", keyspaceName, true) + require.NoError(t, err) + _, err = replica.VttabletProcess.QueryTablet(query, keyspaceName, true) + require.NoError(t, err) + _, err = replica.VttabletProcess.QueryTablet("start replica", keyspaceName, true) + require.NoError(t, err) +} + func InsertRowOnPrimary(t *testing.T, hint string) { if hint == "" { hint = textutil.RandomHash()[:12] diff --git a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go index 7f611d81ad6..4c84c3e63bc 100644 --- a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go +++ b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go @@ -68,21 +68,24 @@ type testedBackupTimestampInfo struct { postTimestamp time.Time } -func waitForReplica(t *testing.T, replicaIndex int) { +// waitForReplica waits for the replica to have same row set as on primary. +func waitForReplica(t *testing.T, replicaIndex int) int { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() pMsgs := ReadRowsFromPrimary(t) for { rMsgs := ReadRowsFromReplica(t, replicaIndex) if len(pMsgs) == len(rMsgs) { // success - return + return len(pMsgs) } select { case <-ctx.Done(): assert.FailNow(t, "timeout waiting for replica to catch up") - return - case <-time.After(time.Second): + return 0 + case <-ticker.C: // } } @@ -289,6 +292,12 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) if sampleTestedBackupPos == "" { sampleTestedBackupPos = pos } + t.Run("post-pitr, wait for replica to catch up", func(t *testing.T) { + // Replica is DRAINED and does not have replication configuration. + // We now connect the replica to the primary and validate it's able to catch up. + ReconnectReplicaToPrimary(t, 0) + waitForReplica(t, 0) + }) }) } } @@ -539,6 +548,12 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes if sampleTestedBackupIndex < 0 { sampleTestedBackupIndex = backupIndex } + t.Run("post-pitr, wait for replica to catch up", func(t *testing.T) { + // Replica is DRAINED and does not have replication configuration. + // We now connect the replica to the primary and validate it's able to catch up. + ReconnectReplicaToPrimary(t, 0) + waitForReplica(t, 0) + }) } else { numFailedRestores++ }