Skip to content

Commit

Permalink
Backup/restore: provision and restore a tablet with point-in-time rec…
Browse files Browse the repository at this point in the history
…overy flags (#13964)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Sep 28, 2023
1 parent 8e1fb8e commit c311c66
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 26 deletions.
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ Flags:
--relay_log_max_size int Maximum buffer size (in bytes) for VReplication target buffering. If single rows are larger than this, a single row is buffered at a time. (default 250000)
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
--replication_connect_retry duration how long to wait in between replica reconnect attempts. Only precise to the second. (default 10s)
--restore-to-pos string (init incremental restore parameter) if set, run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups
--restore-to-timestamp string (init incremental restore parameter) if set, run a point in time recovery that restores up to the given timestamp, if possible. Given timestamp in RFC3339 format. Example: '2006-01-02T15:04:05Z07:00'
--restore_concurrency int (init restore parameter) how many concurrent files to restore at once (default 4)
--restore_from_backup (init restore parameter) will check BackupStorage for a recent backup at startup and start there
--restore_from_backup_ts string (init restore parameter) if set, restore the latest backup taken at or before this timestamp. Example: '2021-04-29.133050'
Expand Down
2 changes: 2 additions & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ Flags:
--relay_log_max_size int Maximum buffer size (in bytes) for VReplication target buffering. If single rows are larger than this, a single row is buffered at a time. (default 250000)
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
--replication_connect_retry duration how long to wait in between replica reconnect attempts. Only precise to the second. (default 10s)
--restore-to-pos string (init incremental restore parameter) if set, run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups
--restore-to-timestamp string (init incremental restore parameter) if set, run a point in time recovery that restores up to the given timestamp, if possible. Given timestamp in RFC3339 format. Example: '2006-01-02T15:04:05Z07:00'
--restore_concurrency int (init restore parameter) how many concurrent files to restore at once (default 4)
--restore_from_backup (init restore parameter) will check BackupStorage for a recent backup at startup and start there
--restore_from_backup_ts string (init restore parameter) if set, restore the latest backup taken at or before this timestamp. Example: '2021-04-29.133050'
Expand Down
20 changes: 17 additions & 3 deletions go/test/endtoend/backup/pitr/backup_pitr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,21 @@ import (
backup "vitess.io/vitess/go/test/endtoend/backup/vtctlbackup"
)

// TestIncrementalBackupAndRestoreToPos
// TestIncrementalBackupAndRestoreToPos - tests incremental backups and restores.
// The general outline of the test:
// - Generate some schema with data
// - Take a full backup
// - Proceed to take a series of inremental backups. In between, inject data (insert rows), and keep record
// of which data (number of rows) is present in each backup, and at which position.
// - Expect backups success/failure per scenario
// - Next up, we start testing restores. Randomly pick recorded positions and restore to those points in time.
// - In each restore, excpect to find the data (number of rows) recorded for said position
// - Some restores should fail because the position exceeds the last binlog
// - Do so for all recorded positions.
// - Then, a 2nd round where some backups are purged -- this tests to see that we're still able to find a restore path
// (of course we only delete backups that still leave us with valid restore paths).
// - Last, create a new tablet with --restore_from_backup --restore-to-pos and see that it bootstraps with restored data
// and that it ends up in DRAINED type
func TestIncrementalBackupAndRestoreToPos(t *testing.T) {
tcase := &backup.PITRTestCase{
Name: "BuiltinBackup",
Expand All @@ -45,8 +59,8 @@ func TestIncrementalBackupAndRestoreToPos(t *testing.T) {
// - Do so for all recorded tiemstamps.
// - Then, a 2nd round where some backups are purged -- this tests to see that we're still able to find a restore path
// (of course we only delete backups that still leave us with valid restore paths).
//
// All of the above is done for BuiltinBackup, XtraBackup, Mysqlctld (which is technically builtin)
// - Last, create a new tablet with --restore_from_backup --restore-to-timestamp and see that it bootstraps with restored data
// and that it ends up in DRAINED type
func TestIncrementalBackupAndRestoreToTimestamp(t *testing.T) {
tcase := &backup.PITRTestCase{
Name: "BuiltinBackup",
Expand Down
39 changes: 31 additions & 8 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var (
primary *cluster.Vttablet
replica1 *cluster.Vttablet
replica2 *cluster.Vttablet
replica3 *cluster.Vttablet
localCluster *cluster.LocalProcessCluster
newInitDBFile string
useXtrabackup bool
Expand Down Expand Up @@ -90,6 +91,7 @@ var (
primary key (id)
) Engine=InnoDB
`
SetupReplica3Tablet func(extraArgs []string) (*cluster.Vttablet, error)
)

type CompressionDetails struct {
Expand Down Expand Up @@ -170,9 +172,10 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp
0: "primary",
1: "replica",
2: "rdonly",
3: "spare",
}
for i := 0; i < 3; i++ {
tabletType := tabletTypes[i]

createTablet := func(tabletType string) error {
tablet := localCluster.NewVttabletInstance(tabletType, 0, cell)
tablet.VttabletProcess = localCluster.VtprocessInstanceFromVttablet(tablet, shard.Name, keyspaceName)
tablet.VttabletProcess.DbPassword = dbPassword
Expand All @@ -182,33 +185,40 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp
if setupType == Mysqlctld {
mysqlctldProcess, err := cluster.MysqlCtldProcessInstance(tablet.TabletUID, tablet.MySQLPort, localCluster.TmpDirectory)
if err != nil {
return 1, err
return err
}
tablet.MysqlctldProcess = *mysqlctldProcess
tablet.MysqlctldProcess.InitDBFile = newInitDBFile
tablet.MysqlctldProcess.ExtraArgs = extraArgs
tablet.MysqlctldProcess.Password = tablet.VttabletProcess.DbPassword
if err := tablet.MysqlctldProcess.Start(); err != nil {
return 1, err
return err
}
shard.Vttablets = append(shard.Vttablets, tablet)
continue
return nil
}

mysqlctlProcess, err := cluster.MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, localCluster.TmpDirectory)
if err != nil {
return 1, err
return err
}
tablet.MysqlctlProcess = *mysqlctlProcess
tablet.MysqlctlProcess.InitDBFile = newInitDBFile
tablet.MysqlctlProcess.ExtraArgs = extraArgs
proc, err := tablet.MysqlctlProcess.StartProcess()
if err != nil {
return 1, err
return err
}
mysqlProcs = append(mysqlProcs, proc)

shard.Vttablets = append(shard.Vttablets, tablet)
return nil
}
for i := 0; i < 4; i++ {
tabletType := tabletTypes[i]
if err := createTablet(tabletType); err != nil {
return 1, err
}
}
for _, proc := range mysqlProcs {
if err := proc.Wait(); err != nil {
Expand All @@ -218,6 +228,7 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp
primary = shard.Vttablets[0]
replica1 = shard.Vttablets[1]
replica2 = shard.Vttablets[2]
replica3 = shard.Vttablets[3]

if err := localCluster.VtctlclientProcess.InitTablet(primary, cell, keyspaceName, hostname, shard.Name); err != nil {
return 1, err
Expand All @@ -234,12 +245,20 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp
return 1, err
}

for _, tablet := range []*cluster.Vttablet{primary, replica1, replica2} {
for _, tablet := range []*cluster.Vttablet{primary, replica1, replica2} { // we don't start replica3 yet
if err := tablet.VttabletProcess.Setup(); err != nil {
return 1, err
}
}

SetupReplica3Tablet = func(extraArgs []string) (*cluster.Vttablet, error) {
replica3.VttabletProcess.ExtraArgs = append(replica3.VttabletProcess.ExtraArgs, extraArgs...)
if err := replica3.VttabletProcess.Setup(); err != nil {
return replica3, err
}
return replica3, nil
}

if err := localCluster.VtctlclientProcess.InitShardPrimary(keyspaceName, shard.Name, cell, primary.TabletUID); err != nil {
return 1, err
}
Expand Down Expand Up @@ -1140,6 +1159,8 @@ func getReplica(t *testing.T, replicaIndex int) *cluster.Vttablet {
return replica1
case 1:
return replica2
case 2:
return replica3
default:
assert.Failf(t, "invalid replica index", "index=%d", replicaIndex)
return nil
Expand Down Expand Up @@ -1290,6 +1311,7 @@ func TestReplicaRestoreToPos(t *testing.T, replicaIndex int, restoreToPos replic
}
require.NoErrorf(t, err, "output: %v", output)
verifyTabletRestoreStats(t, replica.VttabletProcess.GetVars())
checkTabletType(t, replica1.Alias, topodata.TabletType_DRAINED)
}

func TestReplicaRestoreToTimestamp(t *testing.T, restoreToTimestamp time.Time, expectError string) {
Expand All @@ -1303,6 +1325,7 @@ func TestReplicaRestoreToTimestamp(t *testing.T, restoreToTimestamp time.Time, e
}
require.NoErrorf(t, err, "output: %v", output)
verifyTabletRestoreStats(t, replica1.VttabletProcess.GetVars())
checkTabletType(t, replica1.Alias, topodata.TabletType_DRAINED)
}

func verifyTabletBackupStats(t *testing.T, vars map[string]any) {
Expand Down
53 changes: 53 additions & 0 deletions go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

var (
gracefulPostBackupDuration = 10 * time.Millisecond
backupTimeoutDuration = 3 * time.Minute
)

const (
Expand Down Expand Up @@ -225,6 +226,7 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
})
}

sampleTestedBackupPos := ""
testRestores := func(t *testing.T) {
for _, r := range rand.Perm(len(backupPositions)) {
pos := backupPositions[r]
Expand All @@ -237,6 +239,9 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
count, ok := rowsPerPosition[pos]
require.True(t, ok)
assert.Equalf(t, count, len(msgs), "messages: %v", msgs)
if sampleTestedBackupPos == "" {
sampleTestedBackupPos = pos
}
})
}
}
Expand All @@ -252,6 +257,27 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
t.Run("PITR-2", func(t *testing.T) {
testRestores(t)
})
// Test that we can create a new tablet with --restore_from_backup --restore-to-pos and that it bootstraps
// via PITR and ends up in DRAINED type.
t.Run("init tablet PITR", func(t *testing.T) {
require.NotEmpty(t, sampleTestedBackupPos)

var tablet *cluster.Vttablet

t.Run(fmt.Sprintf("init from backup pos %s", sampleTestedBackupPos), func(t *testing.T) {
tablet, err = SetupReplica3Tablet([]string{"--restore-to-pos", sampleTestedBackupPos})
assert.NoError(t, err)
})
t.Run("wait for drained", func(t *testing.T) {
err = tablet.VttabletProcess.WaitForTabletTypesForTimeout([]string{"drained"}, backupTimeoutDuration)
assert.NoError(t, err)
})
t.Run(fmt.Sprintf("validate %d rows", rowsPerPosition[sampleTestedBackupPos]), func(t *testing.T) {
require.NotZero(t, rowsPerPosition[sampleTestedBackupPos])
msgs := ReadRowsFromReplica(t, 2)
assert.Equal(t, rowsPerPosition[sampleTestedBackupPos], len(msgs))
})
})
})
}

Expand Down Expand Up @@ -415,6 +441,7 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
})
}

sampleTestedBackupIndex := -1
testRestores := func(t *testing.T) {
numFailedRestores := 0
numSuccessfulRestores := 0
Expand All @@ -433,6 +460,9 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
msgs := ReadRowsFromReplica(t, 0)
assert.Equalf(t, testedBackup.rows, len(msgs), "messages: %v", msgs)
numSuccessfulRestores++
if sampleTestedBackupIndex < 0 {
sampleTestedBackupIndex = backupIndex
}
} else {
numFailedRestores++
}
Expand All @@ -454,6 +484,29 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
t.Run("PITR-2", func(t *testing.T) {
testRestores(t)
})
// Test that we can create a new tablet with --restore_from_backup --restore-to-timestamp and that it bootstraps
// via PITR and ends up in DRAINED type.
t.Run("init tablet PITR", func(t *testing.T) {
require.GreaterOrEqual(t, sampleTestedBackupIndex, 0)
sampleTestedBackup := testedBackups[sampleTestedBackupIndex]
restoreToTimestampArg := mysqlctl.FormatRFC3339(sampleTestedBackup.postTimestamp)

var tablet *cluster.Vttablet

t.Run(fmt.Sprintf("init from backup num %d", sampleTestedBackupIndex), func(t *testing.T) {
tablet, err = SetupReplica3Tablet([]string{"--restore-to-timestamp", restoreToTimestampArg})
assert.NoError(t, err)
})
t.Run("wait for drained", func(t *testing.T) {
err = tablet.VttabletProcess.WaitForTabletTypesForTimeout([]string{"drained"}, backupTimeoutDuration)
assert.NoError(t, err)
})
t.Run(fmt.Sprintf("validate %d rows", sampleTestedBackup.rows), func(t *testing.T) {
require.NotZero(t, sampleTestedBackup.rows)
msgs := ReadRowsFromReplica(t, 2)
assert.Equal(t, sampleTestedBackup.rows, len(msgs))
})
})
})
}

Expand Down
38 changes: 31 additions & 7 deletions go/vt/vttablet/tabletmanager/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,18 @@ func registerRestoreFlags(fs *pflag.FlagSet) {
}

var (
// Flags for PITR
// Flags for incremental restore (PITR) - new iteration
restoreToTimestampStr string
restoreToPos string
)

func registerIncrementalRestoreFlags(fs *pflag.FlagSet) {
fs.StringVar(&restoreToTimestampStr, "restore-to-timestamp", restoreToTimestampStr, "(init incremental restore parameter) if set, run a point in time recovery that restores up to the given timestamp, if possible. Given timestamp in RFC3339 format. Example: '2006-01-02T15:04:05Z07:00'")
fs.StringVar(&restoreToPos, "restore-to-pos", restoreToPos, "(init incremental restore parameter) if set, run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups")
}

var (
// Flags for PITR - old iteration
binlogHost string
binlogPort int
binlogUser string
Expand Down Expand Up @@ -99,6 +110,9 @@ func init() {
servenv.OnParseFor("vtcombo", registerRestoreFlags)
servenv.OnParseFor("vttablet", registerRestoreFlags)

servenv.OnParseFor("vtcombo", registerIncrementalRestoreFlags)
servenv.OnParseFor("vttablet", registerIncrementalRestoreFlags)

servenv.OnParseFor("vtcombo", registerPointInTimeRestoreFlags)
servenv.OnParseFor("vttablet", registerPointInTimeRestoreFlags)

Expand All @@ -110,7 +124,14 @@ func init() {
// It will either work, fail gracefully, or return
// an error in case of a non-recoverable error.
// It takes the action lock so no RPC interferes.
func (tm *TabletManager) RestoreData(ctx context.Context, logger logutil.Logger, waitForBackupInterval time.Duration, deleteBeforeRestore bool, backupTime time.Time) error {
func (tm *TabletManager) RestoreData(
ctx context.Context,
logger logutil.Logger,
waitForBackupInterval time.Duration,
deleteBeforeRestore bool,
backupTime time.Time,
restoreToTimetamp time.Time,
restoreToPos string) error {
if err := tm.lock(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -155,7 +176,9 @@ func (tm *TabletManager) RestoreData(ctx context.Context, logger logutil.Logger,
startTime = time.Now()

req := &tabletmanagerdatapb.RestoreFromBackupRequest{
BackupTime: protoutil.TimeToProto(backupTime),
BackupTime: protoutil.TimeToProto(backupTime),
RestoreToPos: restoreToPos,
RestoreToTimestamp: protoutil.TimeToProto(restoreToTimetamp),
}
err = tm.restoreDataLocked(ctx, logger, waitForBackupInterval, deleteBeforeRestore, req)
if err != nil {
Expand Down Expand Up @@ -207,17 +230,18 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L
DryRun: request.DryRun,
Stats: backupstats.RestoreStats(),
}
if request.RestoreToPos != "" && !protoutil.TimeFromProto(request.RestoreToTimestamp).UTC().IsZero() {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--restore_to_pos and --restore_to_timestamp are mutually exclusive")
restoreToTimestamp := protoutil.TimeFromProto(request.RestoreToTimestamp).UTC()
if request.RestoreToPos != "" && !restoreToTimestamp.IsZero() {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--restore-to-pos and --restore-to-timestamp are mutually exclusive")
}
if request.RestoreToPos != "" {
pos, err := replication.DecodePosition(request.RestoreToPos)
if err != nil {
return vterrors.Wrapf(err, "restore failed: unable to decode --restore_to_pos: %s", request.RestoreToPos)
return vterrors.Wrapf(err, "restore failed: unable to decode --restore-to-pos: %s", request.RestoreToPos)
}
params.RestoreToPos = pos
}
if restoreToTimestamp := protoutil.TimeFromProto(request.RestoreToTimestamp).UTC(); !restoreToTimestamp.IsZero() {
if !restoreToTimestamp.IsZero() {
// Restore to given timestamp
params.RestoreToTimestamp = restoreToTimestamp
}
Expand Down
Loading

0 comments on commit c311c66

Please sign in to comment.