Skip to content

Commit

Permalink
backup_pitr CI: validate rejoining replication stream (#16807)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Sep 23, 2024
1 parent 32edb28 commit 4c12794
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
11 changes: 11 additions & 0 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,17 @@ func GetReplicaGtidPurged(t *testing.T, replicaIndex int) string {
return row.AsString("gtid_purged", "")
}

func ReconnectReplicaToPrimary(t *testing.T, replicaIndex int) {
query := fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='localhost', SOURCE_PORT=%d, SOURCE_USER='vt_repl', SOURCE_AUTO_POSITION = 1", primary.MySQLPort)
replica := getReplica(t, replicaIndex)
_, err := replica.VttabletProcess.QueryTablet("stop replica", keyspaceName, true)
require.NoError(t, err)
_, err = replica.VttabletProcess.QueryTablet(query, keyspaceName, true)
require.NoError(t, err)
_, err = replica.VttabletProcess.QueryTablet("start replica", keyspaceName, true)
require.NoError(t, err)
}

func InsertRowOnPrimary(t *testing.T, hint string) {
if hint == "" {
hint = textutil.RandomHash()[:12]
Expand Down
23 changes: 19 additions & 4 deletions go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,24 @@ type testedBackupTimestampInfo struct {
postTimestamp time.Time
}

func waitForReplica(t *testing.T, replicaIndex int) {
// waitForReplica waits for the replica to have same row set as on primary.
func waitForReplica(t *testing.T, replicaIndex int) int {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
pMsgs := ReadRowsFromPrimary(t)
for {
rMsgs := ReadRowsFromReplica(t, replicaIndex)
if len(pMsgs) == len(rMsgs) {
// success
return
return len(pMsgs)
}
select {
case <-ctx.Done():
assert.FailNow(t, "timeout waiting for replica to catch up")
return
case <-time.After(time.Second):
return 0
case <-ticker.C:
//
}
}
Expand Down Expand Up @@ -289,6 +292,12 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
if sampleTestedBackupPos == "" {
sampleTestedBackupPos = pos
}
t.Run("post-pitr, wait for replica to catch up", func(t *testing.T) {
// Replica is DRAINED and does not have replication configuration.
// We now connect the replica to the primary and validate it's able to catch up.
ReconnectReplicaToPrimary(t, 0)
waitForReplica(t, 0)
})
})
}
}
Expand Down Expand Up @@ -539,6 +548,12 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
if sampleTestedBackupIndex < 0 {
sampleTestedBackupIndex = backupIndex
}
t.Run("post-pitr, wait for replica to catch up", func(t *testing.T) {
// Replica is DRAINED and does not have replication configuration.
// We now connect the replica to the primary and validate it's able to catch up.
ReconnectReplicaToPrimary(t, 0)
waitForReplica(t, 0)
})
} else {
numFailedRestores++
}
Expand Down

0 comments on commit 4c12794

Please sign in to comment.