From 514a4ea161e9ad688daa586c5d1e1977af0ceb73 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Fri, 29 Nov 2024 16:30:12 -0600 Subject: [PATCH] Self-review Signed-off-by: Florent Poinsard --- go/vt/mysqlctl/backupstorage/interface.go | 2 +- go/vt/mysqlctl/builtinbackupengine.go | 190 ++++++++++-------- go/vt/mysqlctl/errorsbackup/errors.go | 7 +- .../s3_backup_endtoend_test.go} | 8 +- 4 files changed, 118 insertions(+), 89 deletions(-) rename go/vt/mysqlctl/s3backupstorage/{integration/s3_backup_integration_test.go => endtoend/s3_backup_endtoend_test.go} (98%) diff --git a/go/vt/mysqlctl/backupstorage/interface.go b/go/vt/mysqlctl/backupstorage/interface.go index 9398485473b..1b9ff5ea2bf 100644 --- a/go/vt/mysqlctl/backupstorage/interface.go +++ b/go/vt/mysqlctl/backupstorage/interface.go @@ -91,7 +91,7 @@ type BackupHandle interface { ReadFile(ctx context.Context, filename string) (io.ReadCloser, error) // BackupErrorRecorder is embedded here to coordinate reporting and - // handling of errors among all the components involved in taking a backup. + // handling of errors among all the components involved in taking/restoring a backup. errorsbackup.BackupErrorRecorder } diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 5319f7e9610..474cf670a7e 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -152,12 +152,12 @@ type FileEntry struct { // for writing files in a temporary directory ParentPath string - // AttemptNb specifies how many times we attempted to restore/backup this FileEntry. - // If we fail to restore/backup this FileEntry, we will retry up to maxRetriesPerFile (= 1) times. + // RetryNb specifies how many times we retried restoring/backing up this FileEntry. + // If we fail to restore/backup this FileEntry, we will retry up to maxRetriesPerFile times. // Every time the builtin backup engine retries this file, we increment this field by 1. // We don't care about adding this information to the MANIFEST and also to not cause any compatibility issue // we are adding the - json tag to let Go know it can ignore the field. - AttemptNb int `json:"-"` + RetryNb int `json:"-"` } func init() { @@ -594,6 +594,8 @@ func (be *BuiltinBackupEngine) backupFiles( mysqlVersion string, incrDetails *IncrementalBackupDetails, ) (finalErr error) { + // backupFiles always wait for AddFiles to finish its work before returning, unless there has been a + // non-recoverable error in the process, in both cases we can cancel the context safely. ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -611,7 +613,7 @@ func (be *BuiltinBackupEngine) backupFiles( } params.Logger.Infof("found %v files to backup", len(fes)) - // The error here can be ignored, it will be handled differently in the following if statement + // The error here can be ignored safely. Failed FileEntry's are handled in the next 'if' statement. _ = be.backupFileEntries(ctx, fes, bh, params) // BackupHandle supports the BackupErrorRecorder interface for tracking errors @@ -623,8 +625,10 @@ func (be *BuiltinBackupEngine) backupFiles( // error were encountered // [here](https://github.com/vitessio/vitess/blob/d26b6c7975b12a87364e471e2e2dfa4e253c2a5b/go/vt/mysqlctl/s3backupstorage/s3.go#L139-L142). // - // All the errors are grouped per file, if any file has failed, we will try backing up - // this file once more. If the retry fails, we definitively fail the backup process. + // All the errors are grouped per file, if one or more files failed, we back them up + // once more concurrently, if any of the retry fail, we fail-fast by canceling the context + // and return an error. There is no reason to continue processing the other retries, if + // one of them failed. if files := bh.GetFailedFiles(); len(files) > 0 { newFes := make([]FileEntry, len(fes)) for _, file := range files { @@ -637,7 +641,7 @@ func (be *BuiltinBackupEngine) backupFiles( Base: oldFes.Base, Name: oldFes.Name, ParentPath: oldFes.ParentPath, - AttemptNb: 1, + RetryNb: 1, } bh.ResetErrorForFile(file) } @@ -649,8 +653,8 @@ func (be *BuiltinBackupEngine) backupFiles( // Backup the MANIFEST file and apply retry logic. var manifestErr error - for currentAttempt := 0; currentAttempt <= maxRetriesPerFile; currentAttempt++ { - manifestErr = be.backupManifest(ctx, params, bh, backupPosition, purgedPosition, fromPosition, fromBackupName, serverUUID, mysqlVersion, incrDetails, fes, currentAttempt) + for currentRetry := 0; currentRetry <= maxRetriesPerFile; currentRetry++ { + manifestErr = be.backupManifest(ctx, params, bh, backupPosition, purgedPosition, fromPosition, fromBackupName, serverUUID, mysqlVersion, incrDetails, fes, currentRetry) if manifestErr == nil { break } @@ -662,6 +666,10 @@ func (be *BuiltinBackupEngine) backupFiles( return nil } +// backupFileEntries iterates over a slice of FileEntry, backing them up concurrently up to the defined concurrency limit. +// This function will ignore empty FileEntry, allowing the retry mechanism to send a partially empty slice, to not +// mess up the index of retriable FileEntry. +// This function does not leave any background operation behind itself, all calls to bh.AddFile will be finished or canceled. func (be *BuiltinBackupEngine) backupFileEntries(ctx context.Context, fes []FileEntry, bh backupstorage.BackupHandle, params BackupParams) error { ctxCancel, cancel := context.WithCancel(ctx) defer func() { @@ -691,8 +699,8 @@ func (be *BuiltinBackupEngine) backupFileEntries(ctx context.Context, fes []File if acqErr != nil { log.Errorf("Unable to acquire semaphore needed to backup file: %s, err: %s", fe.Name, acqErr.Error()) bh.RecordError(name, acqErr) - if fe.AttemptNb == maxRetriesPerFile { - // this is the last attempt and the file is marked as failed, we can cancel everything and fail fast. + if fe.RetryNb == maxRetriesPerFile { + // this is the last attempt, and we have an error, we can cancel everything and fail fast. cancel() } return @@ -715,8 +723,8 @@ func (be *BuiltinBackupEngine) backupFileEntries(ctx context.Context, fes []File var errBackupFile error if errBackupFile = be.backupFile(ctxCancel, params, bh, fe, name); errBackupFile != nil { bh.RecordError(name, vterrors.Wrapf(errBackupFile, "failed to backup file '%s'", name)) - if fe.AttemptNb == maxRetriesPerFile { - // this is the last attempt and the file is marked as failed, we can cancel everything and fail fast. + if fe.RetryNb == maxRetriesPerFile { + // this is the last attempt, and we have an error, we can cancel everything and fail fast. cancel() } } @@ -768,8 +776,9 @@ func newBackupReader(filename string, maxSize int64, r io.Reader) *backupPipe { } } -func attemptToString(attempt int) string { - return fmt.Sprintf("(attempt %d/%d)", attempt+1, maxRetriesPerFile+1) +func retryToString(retry int) string { + // We convert the retry number to an attempt number, increasing retry by one, so it looks more human friendly + return fmt.Sprintf("(attempt %d/%d)", retry+1, maxRetriesPerFile+1) } func (bp *backupPipe) Read(p []byte) (int, error) { @@ -810,7 +819,7 @@ func (bp *backupPipe) HashString() string { return hex.EncodeToString(bp.crc32.Sum(nil)) } -func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, logger logutil.Logger, restore bool, attemptStr string) { +func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, logger logutil.Logger, restore bool, retryStr string) { messageStr := "restoring" if !restore { messageStr = "backing up" @@ -820,21 +829,21 @@ func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, for { select { case <-ctx.Done(): - logger.Infof("Canceled %s of %q file %s", messageStr, bp.filename, attemptStr) + logger.Infof("Canceled %s of %q file %s", messageStr, bp.filename, retryStr) return case <-bp.done: - logger.Infof("Completed %s %q %s", messageStr, bp.filename, attemptStr) + logger.Infof("Completed %s %q %s", messageStr, bp.filename, retryStr) return case <-bp.failed: - logger.Infof("Failed %s %q %s", messageStr, bp.filename, attemptStr) + logger.Infof("Failed %s %q %s", messageStr, bp.filename, retryStr) return case <-tick.C: written := float64(atomic.LoadInt64(&bp.nn)) if bp.maxSize == 0 { - logger.Infof("%s %q %s: %.02fkb", messageStr, bp.filename, attemptStr, written/1024.0) + logger.Infof("%s %q %s: %.02fkb", messageStr, bp.filename, retryStr, written/1024.0) } else { maxSize := float64(bp.maxSize) - logger.Infof("%s %q %s: %.02f%% (%.02f/%.02fkb)", messageStr, bp.filename, attemptStr, 100.0*written/maxSize, written/1024.0, maxSize/1024.0) + logger.Infof("%s %q %s: %.02f%% (%.02f/%.02fkb)", messageStr, bp.filename, retryStr, 100.0*written/maxSize, written/1024.0, maxSize/1024.0) } } } @@ -873,12 +882,12 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara return err } - attemptStr := attemptToString(fe.AttemptNb) + retryStr := retryToString(fe.RetryNb) br := newBackupReader(fe.Name, fi.Size(), timedSource) - go br.ReportProgress(cancelableCtx, builtinBackupProgress, params.Logger, false /*restore*/, attemptStr) + go br.ReportProgress(cancelableCtx, builtinBackupProgress, params.Logger, false /*restore*/, retryStr) // Open the destination file for writing, and a buffer. - params.Logger.Infof("Backing up file: %v %s", fe.Name, attemptStr) + params.Logger.Infof("Backing up file: %v %s", fe.Name, retryStr) openDestAt := time.Now() dest, err := bh.AddFile(ctx, name, fi.Size()) if err != nil { @@ -938,7 +947,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara defer func() { // Close gzip to flush it, after that all data is sent to writer. closeCompressorAt := time.Now() - params.Logger.Infof("Closing compressor for file: %s %s", fe.Name, attemptStr) + params.Logger.Infof("Closing compressor for file: %s %s", fe.Name, retryStr) if cerr := closer.Close(); err != nil { cerr = vterrors.Wrapf(cerr, "failed to close compressor %v", fe.Name) params.Logger.Error(cerr) @@ -984,60 +993,71 @@ func (be *BuiltinBackupEngine) backupManifest( fes []FileEntry, currentAttempt int, ) (finalErr error) { - attemptStr := attemptToString(currentAttempt) - params.Logger.Infof("Backing up file %s %s", backupManifestFileName, attemptStr) - - // open the MANIFEST - wc, err := bh.AddFile(ctx, backupManifestFileName, backupstorage.FileSizeUnknown) - if err != nil { - return vterrors.Wrapf(err, "cannot add %v to backup %s", backupManifestFileName, attemptStr) - } + retryStr := retryToString(currentAttempt) + params.Logger.Infof("Backing up file %s %s", backupManifestFileName, retryStr) defer func() { + state := "Completed" if finalErr != nil { - params.Logger.Infof("Failed backing up %s %s", backupManifestFileName, attemptStr) - } else { - params.Logger.Infof("Completed backing up %s %s", backupManifestFileName, attemptStr) + state = "Failed" } + params.Logger.Infof("%s backing up %s %s", state, backupManifestFileName, retryStr) }() - // JSON-encode and write the MANIFEST - bm := &builtinBackupManifest{ - // Common base fields - BackupManifest: BackupManifest{ - BackupName: bh.Name(), - BackupMethod: builtinBackupEngineName, - Position: backupPosition, - PurgedPosition: purgedPosition, - FromPosition: fromPosition, - FromBackup: fromBackupName, - Incremental: !fromPosition.IsZero(), - ServerUUID: serverUUID, - TabletAlias: params.TabletAlias, - Keyspace: params.Keyspace, - Shard: params.Shard, - BackupTime: params.BackupTime.UTC().Format(time.RFC3339), - FinishedTime: time.Now().UTC().Format(time.RFC3339), - MySQLVersion: mysqlVersion, - UpgradeSafe: params.UpgradeSafe, - IncrementalDetails: incrDetails, - }, - - // Builtin-specific fields - FileEntries: fes, - SkipCompress: !backupStorageCompress, - CompressionEngine: CompressionEngineName, - ExternalDecompressor: ManifestExternalDecompressorCmd, - } - data, err := json.MarshalIndent(bm, "", " ") - if err != nil { - return vterrors.Wrapf(err, "cannot JSON encode %v %s", backupManifestFileName, attemptStr) - } - if _, err := wc.Write(data); err != nil { - return vterrors.Wrapf(err, "cannot write %v %s", backupManifestFileName, attemptStr) + // Creating this function allows us to ensure we always close the writer no matter what, + // and in case of success that we close it before calling bh.EndBackup. + addAndWrite := func() (addAndWriteError error) { + // open the MANIFEST + wc, err := bh.AddFile(ctx, backupManifestFileName, backupstorage.FileSizeUnknown) + if err != nil { + return vterrors.Wrapf(err, "cannot add %v to backup %s", backupManifestFileName, retryStr) + } + defer func() { + if err := wc.Close(); err != nil { + addAndWriteError = errors.Join(addAndWriteError, vterrors.Wrapf(err, "cannot close backup: %v", backupManifestFileName)) + } + }() + + // JSON-encode and write the MANIFEST + bm := &builtinBackupManifest{ + // Common base fields + BackupManifest: BackupManifest{ + BackupName: bh.Name(), + BackupMethod: builtinBackupEngineName, + Position: backupPosition, + PurgedPosition: purgedPosition, + FromPosition: fromPosition, + FromBackup: fromBackupName, + Incremental: !fromPosition.IsZero(), + ServerUUID: serverUUID, + TabletAlias: params.TabletAlias, + Keyspace: params.Keyspace, + Shard: params.Shard, + BackupTime: params.BackupTime.UTC().Format(time.RFC3339), + FinishedTime: time.Now().UTC().Format(time.RFC3339), + MySQLVersion: mysqlVersion, + UpgradeSafe: params.UpgradeSafe, + IncrementalDetails: incrDetails, + }, + + // Builtin-specific fields + FileEntries: fes, + SkipCompress: !backupStorageCompress, + CompressionEngine: CompressionEngineName, + ExternalDecompressor: ManifestExternalDecompressorCmd, + } + data, err := json.MarshalIndent(bm, "", " ") + if err != nil { + return vterrors.Wrapf(err, "cannot JSON encode %v %s", backupManifestFileName, retryStr) + } + if _, err := wc.Write(data); err != nil { + return vterrors.Wrapf(err, "cannot write %v %s", backupManifestFileName, retryStr) + } + return nil } - if err := wc.Close(); err != nil { - return vterrors.Wrapf(err, "cannot close %v %s", backupManifestFileName, attemptStr) + err := addAndWrite() + if err != nil { + return err } err = bh.EndBackup(ctx) @@ -1129,21 +1149,21 @@ func (be *BuiltinBackupEngine) ExecuteRestore(ctx context.Context, params Restor } func (be *BuiltinBackupEngine) restoreManifest(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (bm builtinBackupManifest, finalErr error) { - var attemptNb int + var retryNb int defer func() { state := "Completed" if finalErr != nil { state = "Failed" } - params.Logger.Infof("%s restoring %s %s", state, backupManifestFileName, attemptToString(attemptNb)) + params.Logger.Infof("%s restoring %s %s", state, backupManifestFileName, retryToString(retryNb)) }() - for ; attemptNb <= maxRetriesPerFile; attemptNb++ { - params.Logger.Infof("Restoring file %s %s", backupManifestFileName, attemptToString(attemptNb)) + for ; retryNb <= maxRetriesPerFile; retryNb++ { + params.Logger.Infof("Restoring file %s %s", backupManifestFileName, retryToString(retryNb)) if finalErr = getBackupManifestInto(ctx, bh, &bm); finalErr == nil { break } - params.Logger.Infof("Failed restoring %s %s", backupManifestFileName, attemptToString(attemptNb)) + params.Logger.Infof("Failed restoring %s %s", backupManifestFileName, retryToString(retryNb)) } return } @@ -1182,7 +1202,7 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP Name: oldFes.Name, ParentPath: oldFes.ParentPath, Hash: oldFes.Hash, - AttemptNb: 1, + RetryNb: 1, } bh.ResetErrorForFile(file) } @@ -1215,8 +1235,8 @@ func (be *BuiltinBackupEngine) restoreFileEntries(ctx context.Context, fes []Fil if acqErr != nil { log.Errorf("Unable to acquire semaphore needed to restore file: %s, err: %s", fe.Name, acqErr.Error()) bh.RecordError(name, acqErr) - if fe.AttemptNb == maxRetriesPerFile { - // this is the last attempt and the file is marked as failed, we can cancel everything and fail fast. + if fe.RetryNb == maxRetriesPerFile { + // this is the last attempt, and we have an error, we can cancel everything and fail fast. cancel() } return @@ -1238,11 +1258,11 @@ func (be *BuiltinBackupEngine) restoreFileEntries(ctx context.Context, fes []Fil fe.ParentPath = createdDir // And restore the file. - params.Logger.Infof("Copying file %v: %v %s", name, fe.Name, attemptToString(fe.AttemptNb)) + params.Logger.Infof("Copying file %v: %v %s", name, fe.Name, retryToString(fe.RetryNb)) if errRestore := be.restoreFile(ctxCancel, params, bh, fe, bm, name); errRestore != nil { bh.RecordError(name, vterrors.Wrapf(errRestore, "failed to restore file %v to %v", name, fe.Name)) - if fe.AttemptNb == maxRetriesPerFile { - // this is the last attempt and the file is marked as failed, we can cancel everything and fail fast. + if fe.RetryNb == maxRetriesPerFile { + // this is the last attempt, and we have an error, we can cancel everything and fail fast. cancel() } } @@ -1276,9 +1296,9 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa }() // Create the backup/source reader and start reporting progress - attemptStr := attemptToString(fe.AttemptNb) + retryStr := retryToString(fe.RetryNb) br := newBackupReader(fe.Name, 0, timedSource) - go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, true, attemptStr) + go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, true, retryStr) defer func() { if err := br.Close(finalErr == nil); err != nil { finalErr = vterrors.Wrap(finalErr, "failed to close the source reader") diff --git a/go/vt/mysqlctl/errorsbackup/errors.go b/go/vt/mysqlctl/errorsbackup/errors.go index 0f0122a498d..5e425f24074 100644 --- a/go/vt/mysqlctl/errorsbackup/errors.go +++ b/go/vt/mysqlctl/errorsbackup/errors.go @@ -30,7 +30,9 @@ type BackupErrorRecorder interface { ResetErrorForFile(string) } -// PerFileErrorRecorder records all the errors. +// PerFileErrorRecorder records errors and group them by filename. +// This is particularly useful when processing several files at the same time +// and wanting to know which files failed. type PerFileErrorRecorder struct { mu sync.Mutex errors map[string][]error @@ -59,6 +61,7 @@ func (pfer *PerFileErrorRecorder) HasErrors() bool { return len(pfer.errors) > 0 } +// Error returns all the errors that were recorded func (pfer *PerFileErrorRecorder) Error() error { pfer.mu.Lock() defer pfer.mu.Unlock() @@ -78,6 +81,7 @@ func (pfer *PerFileErrorRecorder) Error() error { return errors.New(strings.Join(errs, "; ")) } +// GetFailedFiles returns a slice of filenames, each of this file have at least 1 error. func (pfer *PerFileErrorRecorder) GetFailedFiles() []string { pfer.mu.Lock() defer pfer.mu.Unlock() @@ -91,6 +95,7 @@ func (pfer *PerFileErrorRecorder) GetFailedFiles() []string { return files } +// ResetErrorForFile removes all the errors of a given file. func (pfer *PerFileErrorRecorder) ResetErrorForFile(filename string) { pfer.mu.Lock() defer pfer.mu.Unlock() diff --git a/go/vt/mysqlctl/s3backupstorage/integration/s3_backup_integration_test.go b/go/vt/mysqlctl/s3backupstorage/endtoend/s3_backup_endtoend_test.go similarity index 98% rename from go/vt/mysqlctl/s3backupstorage/integration/s3_backup_integration_test.go rename to go/vt/mysqlctl/s3backupstorage/endtoend/s3_backup_endtoend_test.go index 158458e1677..0bb81dc2c1c 100644 --- a/go/vt/mysqlctl/s3backupstorage/integration/s3_backup_integration_test.go +++ b/go/vt/mysqlctl/s3backupstorage/endtoend/s3_backup_endtoend_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package integration +package endtoend import ( "context" @@ -48,7 +48,11 @@ import ( Minio is almost a drop-in replacement for AWS S3, if you want to run these tests against a true AWS S3 Bucket, you can do so by not running the TestMain - and setting the 'AWS_*' environment variable to your own values. + and setting the 'AWS_*' environment variables to your own values. + + This package and file are named 'endtoend', but it's more an integration test. + However, we don't want our CI infra to mistake this for a regular unit-test, + hence the rename to 'endtoend'. */ func TestMain(m *testing.M) {