Skip to content

Commit

Permalink
Improve error handling in restore operation (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Dec 12, 2023
1 parent 7f63fe1 commit 5a87c9f
Showing 1 changed file with 50 additions and 20 deletions.
70 changes: 50 additions & 20 deletions pkg/service/restore_memory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package service

import (
"errors"
"fmt"
"log/slog"
"time"
Expand Down Expand Up @@ -64,29 +65,39 @@ func (r *RestoreMemory) doRestore(request *model.RestoreRequestInternal) bool {
func (r *RestoreMemory) RestoreByTime(request *model.RestoreTimestampRequest) int {
jobID := r.restoreJobs.newJob()
go func() {
backend := r.backends[request.Routine]
backend, found := r.backends[request.Routine]
if !found {
slog.Error("Backend not found for restore", "JobId", jobID, "routine", request.Routine)
r.restoreJobs.setFailed(jobID)
return
}

fullBackup, err := r.findLastFullBackup(backend, request)
if err != nil {
slog.Error(err.Error(), "JobId", jobID)
slog.Error("Could not find last full backup", "JobId", jobID, "routine", request.Routine,
"err", err)
r.restoreJobs.setFailed(jobID)
return
}

if err = r.restoreFullBackup(request, fullBackup.Key); err != nil {
slog.Error("Could not restore full backup", "routine", request.Routine)
slog.Error("Could not restore full backup", "JobId", jobID, "routine", request.Routine,
"err", err)
r.restoreJobs.setFailed(jobID)
return
}

incrementalBackups, err := r.findIncrementalBackups(backend, *fullBackup.LastModified)
if err != nil {
slog.Error(err.Error(), "JobId", jobID)
slog.Error("Could not find incremental backups", "JobId", jobID, "routine", request.Routine,
"err", err)
r.restoreJobs.setFailed(jobID)
return
}

if err = r.restoreIncrementalBackups(incrementalBackups, request); err != nil {
slog.Error(err.Error(), "JobId", jobID)
if err = r.restoreIncrementalBackups(request, incrementalBackups); err != nil {
slog.Error("Could not restore incremental backups", "JobId", jobID, "routine", request.Routine,
"err", err)
r.restoreJobs.setFailed(jobID)
return
}
Expand All @@ -96,17 +107,18 @@ func (r *RestoreMemory) RestoreByTime(request *model.RestoreTimestampRequest) in
return jobID
}

func (r *RestoreMemory) findLastFullBackup(backend BackupBackend,
request *model.RestoreTimestampRequest) (*model.BackupDetails, error) {
func (r *RestoreMemory) findLastFullBackup(
backend BackupBackend,
request *model.RestoreTimestampRequest,
) (*model.BackupDetails, error) {
fullBackupList, err := backend.FullBackupList()
if err != nil {
slog.Error("cannot read full backup list", "name", request.Routine, "error", err)
return nil, fmt.Errorf("cannot read full backup list for %s", request.Routine)
return nil, fmt.Errorf("cannot read full backup list")
}

fullBackup := latestFullBackupBeforeTime(fullBackupList, time.UnixMilli(request.Time))
if fullBackup == nil {
return nil, fmt.Errorf("no full backup found for %s at %d", request.Routine, request.Time)
return nil, fmt.Errorf("no full backup found at %d", request.Time)
}

return fullBackup, nil
Expand All @@ -125,7 +137,10 @@ func latestFullBackupBeforeTime(list []model.BackupDetails, time time.Time) *mod
return latestFullBackup
}

func (r *RestoreMemory) restoreFullBackup(request *model.RestoreTimestampRequest, key *string) error {
func (r *RestoreMemory) restoreFullBackup(
request *model.RestoreTimestampRequest,
key *string,
) error {
restoreRequest, err := r.toRestoreRequest(request)
if err != nil {
return err
Expand All @@ -135,28 +150,33 @@ func (r *RestoreMemory) restoreFullBackup(request *model.RestoreTimestampRequest
Dir: key,
})
if !fullRestoreOK {
return fmt.Errorf("could not restore full backup %s at %s", request.Routine, *key)
return fmt.Errorf("could not restore full backup at %s", *key)
}

return nil
}

func (r *RestoreMemory) findIncrementalBackups(backend BackupBackend, u time.Time) ([]model.BackupDetails, error) {
func (r *RestoreMemory) findIncrementalBackups(
backend BackupBackend,
since time.Time,
) ([]model.BackupDetails, error) {
allIncrementalBackupList, err := backend.IncrementalBackupList()
if err != nil {
return nil, err
}
var filteredIncrementalBackups []model.BackupDetails
for _, b := range allIncrementalBackupList {
if b.LastModified.After(u) {
if b.LastModified.After(since) {
filteredIncrementalBackups = append(filteredIncrementalBackups, b)
}
}
return filteredIncrementalBackups, nil
}

func (r *RestoreMemory) restoreIncrementalBackups(
incrementalBackups []model.BackupDetails, request *model.RestoreTimestampRequest) error {
request *model.RestoreTimestampRequest,
incrementalBackups []model.BackupDetails,
) error {
for _, b := range incrementalBackups {
restoreRequest, err := r.toRestoreRequest(request)
if err != nil {
Expand All @@ -167,16 +187,26 @@ func (r *RestoreMemory) restoreIncrementalBackups(
File: b.Key,
})
if !incrRestoreOK {
return fmt.Errorf("could not restore incremental backup %s at %s", request.Routine, *b.Key)
return fmt.Errorf("could not restore incremental backup at %s", *b.Key)
}
}
return nil
}

func (r *RestoreMemory) toRestoreRequest(request *model.RestoreTimestampRequest) (*model.RestoreRequest, error) {
routine := r.config.BackupRoutines[request.Routine]
storage := r.config.Storage[routine.Storage]
return model.NewRestoreRequest(request.DestinationCuster, request.Policy, storage)
routine, found := r.config.BackupRoutines[request.Routine]
if !found {
return nil, errors.New("routine not found")
}
storage, found := r.config.Storage[routine.Storage]
if !found {
return nil, errors.New("storage not found")
}
return model.NewRestoreRequest(
request.DestinationCuster,
request.Policy,
storage,
)
}

// JobStatus returns the status of the job with the given id.
Expand Down

0 comments on commit 5a87c9f

Please sign in to comment.