From 4e79a02d4fbc7136ec3d4097f3d4797bd3b0f558 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 19 Sep 2024 12:19:20 +0300 Subject: [PATCH 1/2] backup_pitr testing: validate rejoining replication stream Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../endtoend/backup/vtctlbackup/backup_utils.go | 11 +++++++++++ .../backup/vtctlbackup/pitr_test_framework.go | 17 +++++++++++++---- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/go/test/endtoend/backup/vtctlbackup/backup_utils.go b/go/test/endtoend/backup/vtctlbackup/backup_utils.go index 3a0150cc87c..5c69a110e15 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_utils.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_utils.go @@ -1133,6 +1133,17 @@ func GetReplicaGtidPurged(t *testing.T, replicaIndex int) string { return row.AsString("gtid_purged", "") } +func ReconnectReplicaToPrimary(t *testing.T, replicaIndex int) { + query := fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='localhost', SOURCE_PORT=%d, SOURCE_USER='vt_repl', SOURCE_AUTO_POSITION = 1", primary.MySQLPort) + replica := getReplica(t, replicaIndex) + _, err := replica.VttabletProcess.QueryTablet("stop replica", keyspaceName, true) + require.NoError(t, err) + _, err = replica.VttabletProcess.QueryTablet(query, keyspaceName, true) + require.NoError(t, err) + _, err = replica.VttabletProcess.QueryTablet("start replica", keyspaceName, true) + require.NoError(t, err) +} + func InsertRowOnPrimary(t *testing.T, hint string) { if hint == "" { hint = textutil.RandomHash()[:12] diff --git a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go index 7f611d81ad6..2970e915ac1 100644 --- a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go +++ b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go @@ -68,21 +68,24 @@ type testedBackupTimestampInfo struct { postTimestamp time.Time } -func waitForReplica(t *testing.T, replicaIndex int) { +// waitForReplica waits for the replica to have same row set as on primary. +func waitForReplica(t *testing.T, replicaIndex int) int { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() pMsgs := ReadRowsFromPrimary(t) for { rMsgs := ReadRowsFromReplica(t, replicaIndex) if len(pMsgs) == len(rMsgs) { // success - return + return len(pMsgs) } select { case <-ctx.Done(): assert.FailNow(t, "timeout waiting for replica to catch up") - return - case <-time.After(time.Second): + return 0 + case <-ticker.C: // } } @@ -289,6 +292,12 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) if sampleTestedBackupPos == "" { sampleTestedBackupPos = pos } + t.Run("post-pitr, wait for replica to catch up", func(t *testing.T) { + // Replica is DRAINED and does not have replication configuration. + // We now connect the replica to the primary and validate it's able to catch up. + ReconnectReplicaToPrimary(t, 0) + waitForReplica(t, 0) + }) }) } } From 7e77e864be51b2c6cf395a04f0ab8db7d5d2e239 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 19 Sep 2024 12:26:43 +0300 Subject: [PATCH 2/2] test replication stream in ExecTestIncrementalBackupAndRestoreToTimestamp Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go index 2970e915ac1..4c84c3e63bc 100644 --- a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go +++ b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go @@ -548,6 +548,12 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes if sampleTestedBackupIndex < 0 { sampleTestedBackupIndex = backupIndex } + t.Run("post-pitr, wait for replica to catch up", func(t *testing.T) { + // Replica is DRAINED and does not have replication configuration. + // We now connect the replica to the primary and validate it's able to catch up. + ReconnectReplicaToPrimary(t, 0) + waitForReplica(t, 0) + }) } else { numFailedRestores++ }