Skip to content

Commit

Permalink
Self-review
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <[email protected]>
  • Loading branch information
frouioui committed Nov 29, 2024
1 parent cc02f31 commit 514a4ea
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 89 deletions.
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/backupstorage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type BackupHandle interface {
ReadFile(ctx context.Context, filename string) (io.ReadCloser, error)

// BackupErrorRecorder is embedded here to coordinate reporting and
// handling of errors among all the components involved in taking a backup.
// handling of errors among all the components involved in taking/restoring a backup.
errorsbackup.BackupErrorRecorder
}

Expand Down
190 changes: 105 additions & 85 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,12 @@ type FileEntry struct {
// for writing files in a temporary directory
ParentPath string

// AttemptNb specifies how many times we attempted to restore/backup this FileEntry.
// If we fail to restore/backup this FileEntry, we will retry up to maxRetriesPerFile (= 1) times.
// RetryNb specifies how many times we retried restoring/backing up this FileEntry.
// If we fail to restore/backup this FileEntry, we will retry up to maxRetriesPerFile times.
// Every time the builtin backup engine retries this file, we increment this field by 1.
// We don't care about adding this information to the MANIFEST and also to not cause any compatibility issue
// we are adding the - json tag to let Go know it can ignore the field.
AttemptNb int `json:"-"`
RetryNb int `json:"-"`
}

func init() {
Expand Down Expand Up @@ -594,6 +594,8 @@ func (be *BuiltinBackupEngine) backupFiles(
mysqlVersion string,
incrDetails *IncrementalBackupDetails,
) (finalErr error) {
// backupFiles always wait for AddFiles to finish its work before returning, unless there has been a
// non-recoverable error in the process, in both cases we can cancel the context safely.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -611,7 +613,7 @@ func (be *BuiltinBackupEngine) backupFiles(
}
params.Logger.Infof("found %v files to backup", len(fes))

// The error here can be ignored, it will be handled differently in the following if statement
// The error here can be ignored safely. Failed FileEntry's are handled in the next 'if' statement.
_ = be.backupFileEntries(ctx, fes, bh, params)

// BackupHandle supports the BackupErrorRecorder interface for tracking errors
Expand All @@ -623,8 +625,10 @@ func (be *BuiltinBackupEngine) backupFiles(
// error were encountered
// [here](https://github.com/vitessio/vitess/blob/d26b6c7975b12a87364e471e2e2dfa4e253c2a5b/go/vt/mysqlctl/s3backupstorage/s3.go#L139-L142).
//
// All the errors are grouped per file, if any file has failed, we will try backing up
// this file once more. If the retry fails, we definitively fail the backup process.
// All the errors are grouped per file, if one or more files failed, we back them up
// once more concurrently, if any of the retry fail, we fail-fast by canceling the context
// and return an error. There is no reason to continue processing the other retries, if
// one of them failed.
if files := bh.GetFailedFiles(); len(files) > 0 {
newFes := make([]FileEntry, len(fes))
for _, file := range files {
Expand All @@ -637,7 +641,7 @@ func (be *BuiltinBackupEngine) backupFiles(
Base: oldFes.Base,
Name: oldFes.Name,
ParentPath: oldFes.ParentPath,
AttemptNb: 1,
RetryNb: 1,
}
bh.ResetErrorForFile(file)
}
Expand All @@ -649,8 +653,8 @@ func (be *BuiltinBackupEngine) backupFiles(

// Backup the MANIFEST file and apply retry logic.
var manifestErr error
for currentAttempt := 0; currentAttempt <= maxRetriesPerFile; currentAttempt++ {
manifestErr = be.backupManifest(ctx, params, bh, backupPosition, purgedPosition, fromPosition, fromBackupName, serverUUID, mysqlVersion, incrDetails, fes, currentAttempt)
for currentRetry := 0; currentRetry <= maxRetriesPerFile; currentRetry++ {
manifestErr = be.backupManifest(ctx, params, bh, backupPosition, purgedPosition, fromPosition, fromBackupName, serverUUID, mysqlVersion, incrDetails, fes, currentRetry)
if manifestErr == nil {
break
}
Expand All @@ -662,6 +666,10 @@ func (be *BuiltinBackupEngine) backupFiles(
return nil
}

// backupFileEntries iterates over a slice of FileEntry, backing them up concurrently up to the defined concurrency limit.
// This function will ignore empty FileEntry, allowing the retry mechanism to send a partially empty slice, to not
// mess up the index of retriable FileEntry.
// This function does not leave any background operation behind itself, all calls to bh.AddFile will be finished or canceled.
func (be *BuiltinBackupEngine) backupFileEntries(ctx context.Context, fes []FileEntry, bh backupstorage.BackupHandle, params BackupParams) error {
ctxCancel, cancel := context.WithCancel(ctx)
defer func() {
Expand Down Expand Up @@ -691,8 +699,8 @@ func (be *BuiltinBackupEngine) backupFileEntries(ctx context.Context, fes []File
if acqErr != nil {
log.Errorf("Unable to acquire semaphore needed to backup file: %s, err: %s", fe.Name, acqErr.Error())
bh.RecordError(name, acqErr)
if fe.AttemptNb == maxRetriesPerFile {
// this is the last attempt and the file is marked as failed, we can cancel everything and fail fast.
if fe.RetryNb == maxRetriesPerFile {
// this is the last attempt, and we have an error, we can cancel everything and fail fast.
cancel()
}
return
Expand All @@ -715,8 +723,8 @@ func (be *BuiltinBackupEngine) backupFileEntries(ctx context.Context, fes []File
var errBackupFile error
if errBackupFile = be.backupFile(ctxCancel, params, bh, fe, name); errBackupFile != nil {
bh.RecordError(name, vterrors.Wrapf(errBackupFile, "failed to backup file '%s'", name))
if fe.AttemptNb == maxRetriesPerFile {
// this is the last attempt and the file is marked as failed, we can cancel everything and fail fast.
if fe.RetryNb == maxRetriesPerFile {
// this is the last attempt, and we have an error, we can cancel everything and fail fast.
cancel()
}
}
Expand Down Expand Up @@ -768,8 +776,9 @@ func newBackupReader(filename string, maxSize int64, r io.Reader) *backupPipe {
}
}

func attemptToString(attempt int) string {
return fmt.Sprintf("(attempt %d/%d)", attempt+1, maxRetriesPerFile+1)
func retryToString(retry int) string {
// We convert the retry number to an attempt number, increasing retry by one, so it looks more human friendly
return fmt.Sprintf("(attempt %d/%d)", retry+1, maxRetriesPerFile+1)
}

func (bp *backupPipe) Read(p []byte) (int, error) {
Expand Down Expand Up @@ -810,7 +819,7 @@ func (bp *backupPipe) HashString() string {
return hex.EncodeToString(bp.crc32.Sum(nil))
}

func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, logger logutil.Logger, restore bool, attemptStr string) {
func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, logger logutil.Logger, restore bool, retryStr string) {
messageStr := "restoring"
if !restore {
messageStr = "backing up"
Expand All @@ -820,21 +829,21 @@ func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration,
for {
select {
case <-ctx.Done():
logger.Infof("Canceled %s of %q file %s", messageStr, bp.filename, attemptStr)
logger.Infof("Canceled %s of %q file %s", messageStr, bp.filename, retryStr)
return
case <-bp.done:
logger.Infof("Completed %s %q %s", messageStr, bp.filename, attemptStr)
logger.Infof("Completed %s %q %s", messageStr, bp.filename, retryStr)
return
case <-bp.failed:
logger.Infof("Failed %s %q %s", messageStr, bp.filename, attemptStr)
logger.Infof("Failed %s %q %s", messageStr, bp.filename, retryStr)
return
case <-tick.C:
written := float64(atomic.LoadInt64(&bp.nn))
if bp.maxSize == 0 {
logger.Infof("%s %q %s: %.02fkb", messageStr, bp.filename, attemptStr, written/1024.0)
logger.Infof("%s %q %s: %.02fkb", messageStr, bp.filename, retryStr, written/1024.0)
} else {
maxSize := float64(bp.maxSize)
logger.Infof("%s %q %s: %.02f%% (%.02f/%.02fkb)", messageStr, bp.filename, attemptStr, 100.0*written/maxSize, written/1024.0, maxSize/1024.0)
logger.Infof("%s %q %s: %.02f%% (%.02f/%.02fkb)", messageStr, bp.filename, retryStr, 100.0*written/maxSize, written/1024.0, maxSize/1024.0)
}
}
}
Expand Down Expand Up @@ -873,12 +882,12 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
return err
}

attemptStr := attemptToString(fe.AttemptNb)
retryStr := retryToString(fe.RetryNb)
br := newBackupReader(fe.Name, fi.Size(), timedSource)
go br.ReportProgress(cancelableCtx, builtinBackupProgress, params.Logger, false /*restore*/, attemptStr)
go br.ReportProgress(cancelableCtx, builtinBackupProgress, params.Logger, false /*restore*/, retryStr)

// Open the destination file for writing, and a buffer.
params.Logger.Infof("Backing up file: %v %s", fe.Name, attemptStr)
params.Logger.Infof("Backing up file: %v %s", fe.Name, retryStr)
openDestAt := time.Now()
dest, err := bh.AddFile(ctx, name, fi.Size())
if err != nil {
Expand Down Expand Up @@ -938,7 +947,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
defer func() {
// Close gzip to flush it, after that all data is sent to writer.
closeCompressorAt := time.Now()
params.Logger.Infof("Closing compressor for file: %s %s", fe.Name, attemptStr)
params.Logger.Infof("Closing compressor for file: %s %s", fe.Name, retryStr)
if cerr := closer.Close(); err != nil {
cerr = vterrors.Wrapf(cerr, "failed to close compressor %v", fe.Name)
params.Logger.Error(cerr)
Expand Down Expand Up @@ -984,60 +993,71 @@ func (be *BuiltinBackupEngine) backupManifest(
fes []FileEntry,
currentAttempt int,
) (finalErr error) {
attemptStr := attemptToString(currentAttempt)
params.Logger.Infof("Backing up file %s %s", backupManifestFileName, attemptStr)

// open the MANIFEST
wc, err := bh.AddFile(ctx, backupManifestFileName, backupstorage.FileSizeUnknown)
if err != nil {
return vterrors.Wrapf(err, "cannot add %v to backup %s", backupManifestFileName, attemptStr)
}
retryStr := retryToString(currentAttempt)
params.Logger.Infof("Backing up file %s %s", backupManifestFileName, retryStr)
defer func() {
state := "Completed"
if finalErr != nil {
params.Logger.Infof("Failed backing up %s %s", backupManifestFileName, attemptStr)
} else {
params.Logger.Infof("Completed backing up %s %s", backupManifestFileName, attemptStr)
state = "Failed"
}
params.Logger.Infof("%s backing up %s %s", state, backupManifestFileName, retryStr)
}()

// JSON-encode and write the MANIFEST
bm := &builtinBackupManifest{
// Common base fields
BackupManifest: BackupManifest{
BackupName: bh.Name(),
BackupMethod: builtinBackupEngineName,
Position: backupPosition,
PurgedPosition: purgedPosition,
FromPosition: fromPosition,
FromBackup: fromBackupName,
Incremental: !fromPosition.IsZero(),
ServerUUID: serverUUID,
TabletAlias: params.TabletAlias,
Keyspace: params.Keyspace,
Shard: params.Shard,
BackupTime: params.BackupTime.UTC().Format(time.RFC3339),
FinishedTime: time.Now().UTC().Format(time.RFC3339),
MySQLVersion: mysqlVersion,
UpgradeSafe: params.UpgradeSafe,
IncrementalDetails: incrDetails,
},

// Builtin-specific fields
FileEntries: fes,
SkipCompress: !backupStorageCompress,
CompressionEngine: CompressionEngineName,
ExternalDecompressor: ManifestExternalDecompressorCmd,
}
data, err := json.MarshalIndent(bm, "", " ")
if err != nil {
return vterrors.Wrapf(err, "cannot JSON encode %v %s", backupManifestFileName, attemptStr)
}
if _, err := wc.Write(data); err != nil {
return vterrors.Wrapf(err, "cannot write %v %s", backupManifestFileName, attemptStr)
// Creating this function allows us to ensure we always close the writer no matter what,
// and in case of success that we close it before calling bh.EndBackup.
addAndWrite := func() (addAndWriteError error) {
// open the MANIFEST
wc, err := bh.AddFile(ctx, backupManifestFileName, backupstorage.FileSizeUnknown)
if err != nil {
return vterrors.Wrapf(err, "cannot add %v to backup %s", backupManifestFileName, retryStr)
}
defer func() {
if err := wc.Close(); err != nil {
addAndWriteError = errors.Join(addAndWriteError, vterrors.Wrapf(err, "cannot close backup: %v", backupManifestFileName))
}
}()

// JSON-encode and write the MANIFEST
bm := &builtinBackupManifest{
// Common base fields
BackupManifest: BackupManifest{
BackupName: bh.Name(),
BackupMethod: builtinBackupEngineName,
Position: backupPosition,
PurgedPosition: purgedPosition,
FromPosition: fromPosition,
FromBackup: fromBackupName,
Incremental: !fromPosition.IsZero(),
ServerUUID: serverUUID,
TabletAlias: params.TabletAlias,
Keyspace: params.Keyspace,
Shard: params.Shard,
BackupTime: params.BackupTime.UTC().Format(time.RFC3339),
FinishedTime: time.Now().UTC().Format(time.RFC3339),
MySQLVersion: mysqlVersion,
UpgradeSafe: params.UpgradeSafe,
IncrementalDetails: incrDetails,
},

// Builtin-specific fields
FileEntries: fes,
SkipCompress: !backupStorageCompress,
CompressionEngine: CompressionEngineName,
ExternalDecompressor: ManifestExternalDecompressorCmd,
}
data, err := json.MarshalIndent(bm, "", " ")
if err != nil {
return vterrors.Wrapf(err, "cannot JSON encode %v %s", backupManifestFileName, retryStr)
}
if _, err := wc.Write(data); err != nil {
return vterrors.Wrapf(err, "cannot write %v %s", backupManifestFileName, retryStr)
}
return nil
}

if err := wc.Close(); err != nil {
return vterrors.Wrapf(err, "cannot close %v %s", backupManifestFileName, attemptStr)
err := addAndWrite()
if err != nil {
return err
}

err = bh.EndBackup(ctx)
Expand Down Expand Up @@ -1129,21 +1149,21 @@ func (be *BuiltinBackupEngine) ExecuteRestore(ctx context.Context, params Restor
}

func (be *BuiltinBackupEngine) restoreManifest(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (bm builtinBackupManifest, finalErr error) {
var attemptNb int
var retryNb int
defer func() {
state := "Completed"
if finalErr != nil {
state = "Failed"
}
params.Logger.Infof("%s restoring %s %s", state, backupManifestFileName, attemptToString(attemptNb))
params.Logger.Infof("%s restoring %s %s", state, backupManifestFileName, retryToString(retryNb))
}()

for ; attemptNb <= maxRetriesPerFile; attemptNb++ {
params.Logger.Infof("Restoring file %s %s", backupManifestFileName, attemptToString(attemptNb))
for ; retryNb <= maxRetriesPerFile; retryNb++ {
params.Logger.Infof("Restoring file %s %s", backupManifestFileName, retryToString(retryNb))
if finalErr = getBackupManifestInto(ctx, bh, &bm); finalErr == nil {
break
}
params.Logger.Infof("Failed restoring %s %s", backupManifestFileName, attemptToString(attemptNb))
params.Logger.Infof("Failed restoring %s %s", backupManifestFileName, retryToString(retryNb))
}
return
}
Expand Down Expand Up @@ -1182,7 +1202,7 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP
Name: oldFes.Name,
ParentPath: oldFes.ParentPath,
Hash: oldFes.Hash,
AttemptNb: 1,
RetryNb: 1,
}
bh.ResetErrorForFile(file)
}
Expand Down Expand Up @@ -1215,8 +1235,8 @@ func (be *BuiltinBackupEngine) restoreFileEntries(ctx context.Context, fes []Fil
if acqErr != nil {
log.Errorf("Unable to acquire semaphore needed to restore file: %s, err: %s", fe.Name, acqErr.Error())
bh.RecordError(name, acqErr)
if fe.AttemptNb == maxRetriesPerFile {
// this is the last attempt and the file is marked as failed, we can cancel everything and fail fast.
if fe.RetryNb == maxRetriesPerFile {
// this is the last attempt, and we have an error, we can cancel everything and fail fast.
cancel()
}
return
Expand All @@ -1238,11 +1258,11 @@ func (be *BuiltinBackupEngine) restoreFileEntries(ctx context.Context, fes []Fil
fe.ParentPath = createdDir

// And restore the file.
params.Logger.Infof("Copying file %v: %v %s", name, fe.Name, attemptToString(fe.AttemptNb))
params.Logger.Infof("Copying file %v: %v %s", name, fe.Name, retryToString(fe.RetryNb))
if errRestore := be.restoreFile(ctxCancel, params, bh, fe, bm, name); errRestore != nil {
bh.RecordError(name, vterrors.Wrapf(errRestore, "failed to restore file %v to %v", name, fe.Name))
if fe.AttemptNb == maxRetriesPerFile {
// this is the last attempt and the file is marked as failed, we can cancel everything and fail fast.
if fe.RetryNb == maxRetriesPerFile {
// this is the last attempt, and we have an error, we can cancel everything and fail fast.
cancel()
}
}
Expand Down Expand Up @@ -1276,9 +1296,9 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
}()

// Create the backup/source reader and start reporting progress
attemptStr := attemptToString(fe.AttemptNb)
retryStr := retryToString(fe.RetryNb)
br := newBackupReader(fe.Name, 0, timedSource)
go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, true, attemptStr)
go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, true, retryStr)
defer func() {
if err := br.Close(finalErr == nil); err != nil {
finalErr = vterrors.Wrap(finalErr, "failed to close the source reader")
Expand Down
Loading

0 comments on commit 514a4ea

Please sign in to comment.