diff --git a/go/vt/mysqlctl/backup_blackbox_test.go b/go/vt/mysqlctl/backup_blackbox_test.go index 0d5051273aa..151048aa7cb 100644 --- a/go/vt/mysqlctl/backup_blackbox_test.go +++ b/go/vt/mysqlctl/backup_blackbox_test.go @@ -22,6 +22,7 @@ import ( "context" "errors" "fmt" + "io" "os" "path" "strconv" @@ -590,34 +591,42 @@ func TestExecuteRestoreWithTimedOutContext(t *testing.T) { } } -type writeCloseFailFirstWrite struct { +type rwCloseFailFirstCall struct { *bytes.Buffer - firstWriteDone bool + firstDone bool } -func (w *writeCloseFailFirstWrite) Write(p []byte) (n int, err error) { - if w.firstWriteDone { +func (w *rwCloseFailFirstCall) Write(p []byte) (n int, err error) { + if w.firstDone { return w.Buffer.Write(p) } - w.firstWriteDone = true + w.firstDone = true return 0, errors.New("failing first write") } -func (w *writeCloseFailFirstWrite) Close() error { +func (w *rwCloseFailFirstCall) Read(p []byte) (n int, err error) { + if w.firstDone { + return w.Buffer.Read(p) + } + w.firstDone = true + return 0, errors.New("failing first read") +} + +func (w *rwCloseFailFirstCall) Close() error { return nil } -func newWriteCloseFailFirstWrite(firstWriteDone bool) *writeCloseFailFirstWrite { - return &writeCloseFailFirstWrite{ - Buffer: bytes.NewBuffer(nil), - firstWriteDone: firstWriteDone, +func newWriteCloseFailFirstWrite(firstWriteDone bool) *rwCloseFailFirstCall { + return &rwCloseFailFirstCall{ + Buffer: bytes.NewBuffer(nil), + firstDone: firstWriteDone, } } func TestExecuteBackupFailToWriteEachFileOnlyOnce(t *testing.T) { ctx, backupRoot, keyspace, shard, ts := setupCluster(t, 2, 2) - bufferPerFiles := make(map[string]*writeCloseFailFirstWrite) + bufferPerFiles := make(map[string]*rwCloseFailFirstCall) be := &mysqlctl.BuiltinBackupEngine{} bh := &mysqlctl.FakeBackupHandle{} bh.AddFileReturnF = func(filename string) mysqlctl.FakeBackupHandleAddFileReturn { @@ -626,22 +635,6 @@ func TestExecuteBackupFailToWriteEachFileOnlyOnce(t *testing.T) { _, isRetry := bufferPerFiles[filename] newBuffer := newWriteCloseFailFirstWrite(isRetry) bufferPerFiles[filename] = newBuffer - - // if filename == "MANIFEST" { - // return mysqlctl.FakeBackupHandleAddFileReturn{WriteCloser: writerCloserBuffer} - // } - // - // count := 0 - // for _, call := range bh.AddFileCalls { - // if call.Filename == filename { - // count++ - // } - // } - // // if it is the first time we call AddFile for this file, let's fail the call - // if count == 1 { - // return mysqlctl.FakeBackupHandleAddFileReturn{Err: fmt.Errorf("failed to AddFile on %s", filename)} - // } - // if it is the second time we call AddFile for this file, let's make it pass return mysqlctl.FakeBackupHandleAddFileReturn{WriteCloser: newBuffer} } @@ -694,10 +687,10 @@ func TestExecuteBackupFailToWriteEachFileOnlyOnce(t *testing.T) { require.Equal(t, mysqlctl.BackupUsable, backupResult) } -func TestExecuteBackupFailToWriteFileEachTwice(t *testing.T) { +func TestExecuteBackupFailToWriteFileTwice(t *testing.T) { ctx, backupRoot, keyspace, shard, ts := setupCluster(t, 1, 1) - bufferPerFiles := make(map[string]*writeCloseFailFirstWrite) + bufferPerFiles := make(map[string]*rwCloseFailFirstCall) be := &mysqlctl.BuiltinBackupEngine{} bh := &mysqlctl.FakeBackupHandle{} bh.AddFileReturnF = func(filename string) mysqlctl.FakeBackupHandleAddFileReturn { @@ -716,6 +709,7 @@ func TestExecuteBackupFailToWriteFileEachTwice(t *testing.T) { mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"} logger := logutil.NewMemoryLogger() + fakeStats := backupstats.NewFakeStats() backupResult, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ Logger: logger, Mysqld: mysqld, @@ -724,7 +718,7 @@ func TestExecuteBackupFailToWriteFileEachTwice(t *testing.T) { InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), DataDir: path.Join(backupRoot, "datadir"), }, - Stats: backupstats.NewFakeStats(), + Stats: fakeStats, Concurrency: 1, HookExtraEnv: map[string]string{}, TopoServer: ts, @@ -737,13 +731,271 @@ func TestExecuteBackupFailToWriteFileEachTwice(t *testing.T) { "Backing up file: test1/0.ibd (attempt 1/2)", "Backing up file: test1/0.ibd (attempt 2/2)", } - assertLogs(t, expectedLogs, logger) + ss := getStats(fakeStats) + require.Equal(t, 2, ss.destinationCloseStats) + require.Equal(t, 2, ss.destinationOpenStats) + require.Equal(t, 2, ss.destinationWriteStats) + require.Equal(t, 2, ss.sourceCloseStats) + require.Equal(t, 2, ss.sourceOpenStats) + require.Equal(t, 2, ss.sourceReadStats) + require.ErrorContains(t, err, "failing first write") require.Equal(t, mysqlctl.BackupUnusable, backupResult) } +func TestExecuteRestoreFailToReadEachFileOnlyOnce(t *testing.T) { + ctx, backupRoot, keyspace, shard, ts := setupCluster(t, 2, 2) + + be := &mysqlctl.BuiltinBackupEngine{} + bufferPerFiles := make(map[string]*rwCloseFailFirstCall) + bh := &mysqlctl.FakeBackupHandle{} + bh.AddFileReturnF = func(filename string) mysqlctl.FakeBackupHandleAddFileReturn { + // let's never make it fail for now + newBuffer := newWriteCloseFailFirstWrite(true) + bufferPerFiles[filename] = newBuffer + return mysqlctl.FakeBackupHandleAddFileReturn{WriteCloser: newBuffer} + } + + // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: + // "STOP REPLICA", "START REPLICA", in that order. + fakedb := fakesqldb.New(t) + defer fakedb.Close() + mysqld := mysqlctl.NewFakeMysqlDaemon(fakedb) + defer mysqld.Close() + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"} + + backupResult, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ + Logger: logutil.NewConsoleLogger(), + Mysqld: mysqld, + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + }, + Stats: backupstats.NewFakeStats(), + Concurrency: 1, + HookExtraEnv: map[string]string{}, + TopoServer: ts, + Keyspace: keyspace, + Shard: shard, + MysqlShutdownTimeout: mysqlShutdownTimeout, + }, bh) + + require.NoError(t, err) + require.Equal(t, mysqlctl.BackupUsable, backupResult) + + // let's mark each file in the buffer as if it is their first read + for key := range bufferPerFiles { + bufferPerFiles[key].firstDone = false + } + + // Now try to restore the above backup. + fakeBh := &mysqlctl.FakeBackupHandle{} + fakeBh.ReadFileReturnF = func(ctx context.Context, filename string) (io.ReadCloser, error) { + return bufferPerFiles[filename], nil + } + + fakedb = fakesqldb.New(t) + defer fakedb.Close() + mysqld = mysqlctl.NewFakeMysqlDaemon(fakedb) + defer mysqld.Close() + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"} + + fakeStats := backupstats.NewFakeStats() + logger := logutil.NewMemoryLogger() + + restoreParams := mysqlctl.RestoreParams{ + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + BinLogPath: path.Join(backupRoot, "binlog"), + RelayLogPath: path.Join(backupRoot, "relaylog"), + RelayLogIndexPath: path.Join(backupRoot, "relaylogindex"), + RelayLogInfoPath: path.Join(backupRoot, "relayloginfo"), + }, + Logger: logger, + Mysqld: mysqld, + Concurrency: 1, + HookExtraEnv: map[string]string{}, + DeleteBeforeRestore: false, + DbName: "test", + Keyspace: "test", + Shard: "-", + StartTime: time.Now(), + RestoreToPos: replication.Position{}, + RestoreToTimestamp: time.Time{}, + DryRun: false, + Stats: fakeStats, + MysqlShutdownTimeout: mysqlShutdownTimeout, + } + + // Successful restore. + bm, err := be.ExecuteRestore(ctx, restoreParams, fakeBh) + assert.NoError(t, err) + assert.NotNil(t, bm) + + ss := getStats(fakeStats) + require.Equal(t, 8, ss.destinationCloseStats) + require.Equal(t, 8, ss.destinationOpenStats) + require.Equal(t, 4, ss.destinationWriteStats) + require.Equal(t, 8, ss.sourceCloseStats) + require.Equal(t, 8, ss.sourceOpenStats) + require.Equal(t, 8, ss.sourceReadStats) +} + +func TestExecuteRestoreFailToReadEachFileTwice(t *testing.T) { + ctx, backupRoot, keyspace, shard, ts := setupCluster(t, 2, 2) + + be := &mysqlctl.BuiltinBackupEngine{} + bufferPerFiles := make(map[string]*rwCloseFailFirstCall) + bh := &mysqlctl.FakeBackupHandle{} + bh.AddFileReturnF = func(filename string) mysqlctl.FakeBackupHandleAddFileReturn { + // let's never make it fail for now + newBuffer := newWriteCloseFailFirstWrite(true) + bufferPerFiles[filename] = newBuffer + return mysqlctl.FakeBackupHandleAddFileReturn{WriteCloser: newBuffer} + } + + // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: + // "STOP REPLICA", "START REPLICA", in that order. + fakedb := fakesqldb.New(t) + defer fakedb.Close() + mysqld := mysqlctl.NewFakeMysqlDaemon(fakedb) + defer mysqld.Close() + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"} + + backupResult, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ + Logger: logutil.NewConsoleLogger(), + Mysqld: mysqld, + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + }, + Stats: backupstats.NewFakeStats(), + Concurrency: 1, + HookExtraEnv: map[string]string{}, + TopoServer: ts, + Keyspace: keyspace, + Shard: shard, + MysqlShutdownTimeout: mysqlShutdownTimeout, + }, bh) + + require.NoError(t, err) + require.Equal(t, mysqlctl.BackupUsable, backupResult) + + // Now try to restore the above backup. + fakeBh := &mysqlctl.FakeBackupHandle{} + fakeBh.ReadFileReturnF = func(ctx context.Context, filename string) (io.ReadCloser, error) { + // always make it fail, expect if it is the MANIFEST file, otherwise we won't start restoring the other files + buffer := bufferPerFiles[filename] + if filename != "MANIFEST" { + buffer.firstDone = false + } + return buffer, nil + } + + fakedb = fakesqldb.New(t) + defer fakedb.Close() + mysqld = mysqlctl.NewFakeMysqlDaemon(fakedb) + defer mysqld.Close() + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"} + + fakeStats := backupstats.NewFakeStats() + logger := logutil.NewMemoryLogger() + + restoreParams := mysqlctl.RestoreParams{ + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + BinLogPath: path.Join(backupRoot, "binlog"), + RelayLogPath: path.Join(backupRoot, "relaylog"), + RelayLogIndexPath: path.Join(backupRoot, "relaylogindex"), + RelayLogInfoPath: path.Join(backupRoot, "relayloginfo"), + }, + Logger: logger, + Mysqld: mysqld, + Concurrency: 1, + HookExtraEnv: map[string]string{}, + DeleteBeforeRestore: false, + DbName: "test", + Keyspace: "test", + Shard: "-", + StartTime: time.Now(), + RestoreToPos: replication.Position{}, + RestoreToTimestamp: time.Time{}, + DryRun: false, + Stats: fakeStats, + MysqlShutdownTimeout: mysqlShutdownTimeout, + } + + // Successful restore. + bm, err := be.ExecuteRestore(ctx, restoreParams, fakeBh) + assert.ErrorContains(t, err, "failing first read") + assert.Nil(t, bm) + + expectedLogs := []string{ + "Failed restoring \"test2/1.ibd\" (attempt 1/2)", + "Failed restoring \"test2/1.ibd\" (attempt 2/2)", + } + assertLogs(t, expectedLogs, logger) + + ss := getStats(fakeStats) + require.Equal(t, 2, ss.destinationCloseStats) + require.Equal(t, 2, ss.destinationOpenStats) + require.Equal(t, 0, ss.destinationWriteStats) + require.Equal(t, 2, ss.sourceCloseStats) + require.Equal(t, 2, ss.sourceOpenStats) + require.Equal(t, 2, ss.sourceReadStats) +} + +type statSummary struct { + destinationCloseStats int + destinationOpenStats int + destinationWriteStats int + sourceCloseStats int + sourceOpenStats int + sourceReadStats int +} + +func getStats(stats *backupstats.FakeStats) statSummary { + var ss statSummary + + for _, sr := range stats.ScopeReturns { + switch sr.ScopeV[backupstats.ScopeOperation] { + case "Destination:Close": + if len(sr.TimedIncrementCalls) > 0 { + ss.destinationCloseStats++ + } + case "Destination:Open": + if len(sr.TimedIncrementCalls) > 0 { + ss.destinationOpenStats++ + } + case "Destination:Write": + if len(sr.TimedIncrementBytesCalls) > 0 { + ss.destinationWriteStats++ + } + case "Source:Close": + if len(sr.TimedIncrementCalls) > 0 { + ss.sourceCloseStats++ + } + case "Source:Open": + if len(sr.TimedIncrementCalls) > 0 { + ss.sourceOpenStats++ + } + case "Source:Read": + if len(sr.TimedIncrementBytesCalls) > 0 { + ss.sourceReadStats++ + } + } + } + return ss +} + func assertLogs(t *testing.T, expectedLogs []string, logger *logutil.MemoryLogger) { for _, log := range expectedLogs { var found bool diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index d5822c944b2..0beb83036ee 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -1067,8 +1067,8 @@ func (be *BuiltinBackupEngine) executeRestoreIncrementalBackup(ctx context.Conte // we return the position from which replication should start // otherwise an error is returned func (be *BuiltinBackupEngine) ExecuteRestore(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (*BackupManifest, error) { - var bm builtinBackupManifest - if err := getBackupManifestInto(ctx, bh, &bm); err != nil { + bm, err := be.restoreManifest(ctx, params, bh) + if err != nil { return nil, err } @@ -1077,7 +1077,6 @@ func (be *BuiltinBackupEngine) ExecuteRestore(ctx context.Context, params Restor return nil, err } - var err error if bm.Incremental { err = be.executeRestoreIncrementalBackup(ctx, params, bh, bm) } else { @@ -1090,6 +1089,29 @@ func (be *BuiltinBackupEngine) ExecuteRestore(ctx context.Context, params Restor return &bm.BackupManifest, nil } +func (be *BuiltinBackupEngine) restoreManifest(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (bm builtinBackupManifest, finalErr error) { + var attemptNb int + defer func() { + state := "Completed" + if finalErr != nil { + state = "Failed" + } + params.Logger.Infof("%s restoring %s %s", state, backupManifestFileName, attemptToString(attemptNb)) + }() + + for ; attemptNb <= maxRetriesPerFile; attemptNb++ { + params.Logger.Infof("Restoring file %s %s", backupManifestFileName, attemptToString(attemptNb)) + if finalErr = getBackupManifestInto(ctx, bh, &bm); finalErr == nil { + if ctxErr := ctx.Err(); ctxErr != nil { + finalErr = ctxErr + } + break + } + params.Logger.Infof("Failed restoring %s %s", backupManifestFileName, attemptToString(attemptNb)) + } + return +} + // restoreFiles will copy all the files from the BackupStorage to the // right place. func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, bm builtinBackupManifest) (createdDir string, err error) { @@ -1153,12 +1175,26 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP } fe.ParentPath = createdDir + // And restore the file. name := fmt.Sprintf("%v", i) - params.Logger.Infof("Copying file %v: %v", name, fe.Name) - err := be.restoreFile(ctxCancel, params, bh, fe, bm, name) - if err != nil { - rec.RecordError(vterrors.Wrapf(err, "can't restore file %v to %v", name, fe.Name)) + var errRestore error + for fe.AttemptNb <= maxRetriesPerFile { + params.Logger.Infof("Copying file %v: %v %s", name, fe.Name, attemptToString(fe.AttemptNb)) + if errRestore = be.restoreFile(ctxCancel, params, bh, fe, bm, name); errRestore == nil { + // If the restore did not fail or the context was canceled, we do not need to retry. + // If the context was canceled, the next attempt will fail anyway, so we should break. + if ctxErr := ctxCancel.Err(); ctxErr != nil { + errRestore = ctxErr + } + break + } + + // We can try restoring again: the previous restore failed + fe.AttemptNb++ + } + if errRestore != nil { + rec.RecordError(vterrors.Wrapf(errRestore, "failed to restore file %v to %v after %d attempts", name, fe.Name, fe.AttemptNb)) cancel() } }(i) @@ -1171,6 +1207,7 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, fe *FileEntry, bm builtinBackupManifest, name string) (finalErr error) { ctx, cancel := context.WithCancel(ctx) defer cancel() + // Open the source file for reading. openSourceAt := time.Now() source, err := bh.ReadFile(ctx, name) @@ -1188,9 +1225,15 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa params.Stats.Scope(stats.Operation("Source:Close")).TimedIncrement(time.Since(closeSourceAt)) }() + // Create the backup/source reader and start reporting progress attemptStr := attemptToString(fe.AttemptNb) - br := newBackupReader(name, 0, timedSource) + br := newBackupReader(fe.Name, 0, timedSource) go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, true, attemptStr) + defer func() { + if err := br.Close(finalErr == nil); err != nil { + finalErr = vterrors.Wrap(finalErr, "failed to close the source reader") + } + }() var reader io.Reader = br // Open the destination file for writing. @@ -1276,10 +1319,6 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa return vterrors.Wrap(err, "failed to flush destination buffer") } - if err := br.Close(true); err != nil { - return vterrors.Wrap(err, "failed to close the source reader") - } - return nil }