diff --git a/cmd/backup/main.go b/cmd/backup/main.go index d15e7ad2..b430399c 100644 --- a/cmd/backup/main.go +++ b/cmd/backup/main.go @@ -62,10 +62,10 @@ func run() int { // get system ctx ctx := systemCtx() // schedule all configured backups - handlers := service.BuildBackupHandlers(config) - service.ScheduleHandlers(ctx, handlers) + schedulers := service.BuildBackupSchedulers(config) + service.ScheduleHandlers(ctx, schedulers) // run HTTP server - return runHTTPServer(ctx, handlers, config) + return runHTTPServer(ctx, schedulers, config) } err := rootCmd.Execute() diff --git a/pkg/service/backup_backend_local.go b/pkg/service/backup_backend_local.go index 5c70ee39..5a685bf2 100644 --- a/pkg/service/backup_backend_local.go +++ b/pkg/service/backup_backend_local.go @@ -5,6 +5,7 @@ import ( "io/fs" "os" "path/filepath" + "sync/atomic" "log/slog" @@ -17,22 +18,25 @@ import ( // BackupBackendLocal implements the BackupBackend interface by // saving state to the local file system. type BackupBackendLocal struct { - path string - stateFilePath string - backupPolicy *model.BackupPolicy + path string + stateFilePath string + backupPolicy *model.BackupPolicy + fullBackupInProgress *atomic.Bool // BackupBackend needs to know if full backup is running to filter it out } var _ BackupBackend = (*BackupBackendLocal)(nil) // NewBackupBackendLocal returns a new BackupBackendLocal instance. -func NewBackupBackendLocal(path string, backupPolicy *model.BackupPolicy) *BackupBackendLocal { +func NewBackupBackendLocal(path string, backupPolicy *model.BackupPolicy, + fullBackupInProgress *atomic.Bool) *BackupBackendLocal { prepareDirectory(path) prepareDirectory(path + "/" + model.IncrementalBackupDirectory) prepareDirectory(path + "/" + model.FullBackupDirectory) return &BackupBackendLocal{ - path: path, - stateFilePath: path + "/" + model.StateFileName, - backupPolicy: backupPolicy, + path: path, + stateFilePath: path + "/" + model.StateFileName, + backupPolicy: backupPolicy, + fullBackupInProgress: fullBackupInProgress, } } @@ -78,21 +82,28 @@ func (local *BackupBackendLocal) FullBackupList() ([]model.BackupDetails, error) return nil, err } + lastRun := local.readState().LastRun if local.backupPolicy.RemoveFiles != nil && *local.backupPolicy.RemoveFiles { // when use RemoveFiles = true, backup data is located in backupFolder folder itself - if len(entries) > 0 { - return []model.BackupDetails{{ - Key: ptr.String(backupFolder), - LastModified: &local.readState().LastRun, - }}, nil + if len(entries) == 0 { + return []model.BackupDetails{}, nil } - return []model.BackupDetails{}, nil + if local.fullBackupInProgress.Load() { + return []model.BackupDetails{}, nil + } + return []model.BackupDetails{{ + Key: ptr.String(backupFolder), + LastModified: &lastRun, + }}, nil } - backupDetails := make([]model.BackupDetails, 0) + backupDetails := make([]model.BackupDetails, 0, len(entries)) for _, e := range entries { if e.IsDir() { - backupDetails = append(backupDetails, toBackupDetails(e, backupFolder)) + details := toBackupDetails(e, backupFolder) + if details.LastModified.Before(lastRun) { + backupDetails = append(backupDetails, details) + } } } return backupDetails, nil @@ -105,11 +116,14 @@ func (local *BackupBackendLocal) IncrementalBackupList() ([]model.BackupDetails, if err != nil { return nil, err } - - backupDetails := make([]model.BackupDetails, 0) + lastIncrRun := local.readState().LastIncrRun + backupDetails := make([]model.BackupDetails, 0, len(entries)) for _, e := range entries { if !e.IsDir() { - backupDetails = append(backupDetails, toBackupDetails(e, backupFolder)) + details := toBackupDetails(e, backupFolder) + if details.LastModified.Before(lastIncrRun) { + backupDetails = append(backupDetails, details) + } } } return backupDetails, nil diff --git a/pkg/service/backup_backend_s3.go b/pkg/service/backup_backend_s3.go index a25028f8..6f090c82 100644 --- a/pkg/service/backup_backend_s3.go +++ b/pkg/service/backup_backend_s3.go @@ -2,6 +2,7 @@ package service import ( "log/slog" + "sync/atomic" "github.com/aerospike/backup/pkg/model" "github.com/aws/smithy-go/ptr" @@ -11,20 +12,22 @@ import ( // saving state to AWS S3. type BackupBackendS3 struct { *S3Context - stateFilePath string - backupPolicy *model.BackupPolicy + stateFilePath string + backupPolicy *model.BackupPolicy + fullBackupInProgress *atomic.Bool // BackupBackend needs to know if full backup is running to filter it out } var _ BackupBackend = (*BackupBackendS3)(nil) // NewBackupBackendS3 returns a new BackupBackendS3 instance. -func NewBackupBackendS3(storage *model.Storage, - backupPolicy *model.BackupPolicy) *BackupBackendS3 { +func NewBackupBackendS3(storage *model.Storage, backupPolicy *model.BackupPolicy, + fullBackupInProgress *atomic.Bool) *BackupBackendS3 { s3Context := NewS3Context(storage) return &BackupBackendS3{ - S3Context: s3Context, - stateFilePath: s3Context.Path + "/" + model.StateFileName, - backupPolicy: backupPolicy, + S3Context: s3Context, + stateFilePath: s3Context.Path + "/" + model.StateFileName, + backupPolicy: backupPolicy, + fullBackupInProgress: fullBackupInProgress, } } @@ -42,33 +45,39 @@ func (s *BackupBackendS3) writeState(state *model.BackupState) error { func (s *BackupBackendS3) FullBackupList() ([]model.BackupDetails, error) { backupFolder := s.Path + "/" + model.FullBackupDirectory + "/" s3prefix := "s3://" + s.bucket + lastRun := s.readState().LastRun if s.backupPolicy.RemoveFiles != nil && *s.backupPolicy.RemoveFiles { // when use RemoveFiles = true, backup data is located in backupFolder folder itself files, _ := s.listFiles(backupFolder) - if len(files) > 0 { - return []model.BackupDetails{{ - Key: ptr.String(s3prefix + backupFolder), - LastModified: &s.readState().LastRun, - Size: ptr.Int64(s.dirSize(backupFolder)), - }}, nil + if len(files) == 0 { + return []model.BackupDetails{}, nil } - return []model.BackupDetails{}, nil + if s.fullBackupInProgress.Load() { + return []model.BackupDetails{}, nil + } + return []model.BackupDetails{{ + Key: ptr.String(s3prefix + backupFolder), + LastModified: &lastRun, + Size: ptr.Int64(s.dirSize(backupFolder)), + }}, nil } subfolders, err := s.listFolders(backupFolder) if err != nil { return nil, err } - contents := make([]model.BackupDetails, len(subfolders)) - for i, subfolder := range subfolders { + result := make([]model.BackupDetails, 0, len(subfolders)) + for _, subfolder := range subfolders { details := model.BackupDetails{ Key: ptr.String(s3prefix + "/" + *subfolder.Prefix), LastModified: s.GetTime(subfolder), Size: ptr.Int64(s.dirSize(*subfolder.Prefix)), } - contents[i] = details + if details.LastModified.Before(lastRun) { + result = append(result, details) + } } - return contents, err + return result, err } func (s *BackupBackendS3) dirSize(path string) int64 { @@ -91,14 +100,17 @@ func (s *BackupBackendS3) IncrementalBackupList() ([]model.BackupDetails, error) if err != nil { return nil, err } - contents := make([]model.BackupDetails, len(list)) - for i, object := range list { + result := make([]model.BackupDetails, 0, len(list)) + lastIncrRun := s.readState().LastIncrRun + for _, object := range list { details := model.BackupDetails{ Key: ptr.String(s3prefix + "/" + *object.Key), LastModified: object.LastModified, Size: &object.Size, } - contents[i] = details + if details.LastModified.Before(lastIncrRun) { + result = append(result, details) + } } - return contents, err + return result, err } diff --git a/pkg/service/backup_handler.go b/pkg/service/backup_scheduler.go similarity index 74% rename from pkg/service/backup_handler.go rename to pkg/service/backup_scheduler.go index 8fd256fd..a9a31725 100644 --- a/pkg/service/backup_handler.go +++ b/pkg/service/backup_scheduler.go @@ -7,6 +7,8 @@ import ( "sync/atomic" "time" + "github.com/aerospike/backup/pkg/stdio" + "github.com/aerospike/backup/pkg/model" "github.com/aerospike/backup/pkg/shared" "github.com/aerospike/backup/pkg/util" @@ -27,15 +29,41 @@ type BackupHandler struct { cluster *model.AerospikeCluster storage *model.Storage state *model.BackupState - fullBackupInProgress atomic.Bool + fullBackupInProgress *atomic.Bool } +// stdIO captures standard output +var stdIO = &stdio.CgoStdio{} + +var backupService shared.Backup = shared.NewBackup() + var _ BackupScheduler = (*BackupHandler)(nil) var BackupScheduleTick = 1000 * time.Millisecond -// NewBackupHandler returns a new BackupHandler instance. -func NewBackupHandler(config *model.Config, backupRoutine *model.BackupRoutine) (*BackupHandler, error) { +// ScheduleHandlers schedules the configured backup policies. +func ScheduleHandlers(ctx context.Context, schedulers []BackupScheduler) { + for _, scheduler := range schedulers { + scheduler.Schedule(ctx) + } +} + +// BuildBackupSchedulers builds a list of BackupSchedulers according to +// the given configuration. +func BuildBackupSchedulers(config *model.Config) []BackupScheduler { + schedulers := make([]BackupScheduler, 0, len(config.BackupPolicies)) + for _, backupRoutine := range config.BackupRoutines { + scheduler, err := newBackupHandler(config, backupRoutine) + if err != nil { + panic(err) + } + schedulers = append(schedulers, scheduler) + } + return schedulers +} + +// newBackupHandler returns a new BackupHandler instance. +func newBackupHandler(config *model.Config, backupRoutine *model.BackupRoutine) (*BackupHandler, error) { cluster, found := config.AerospikeClusters[backupRoutine.SourceCluster] if !found { return nil, fmt.Errorf("cluster not found for %s", backupRoutine.SourceCluster) @@ -49,23 +77,25 @@ func NewBackupHandler(config *model.Config, backupRoutine *model.BackupRoutine) return nil, fmt.Errorf("backupPolicy not found for %s", backupRoutine.BackupPolicy) } + fullBackupInProgress := &atomic.Bool{} var backupBackend BackupBackend switch *storage.Type { case model.Local: - backupBackend = NewBackupBackendLocal(*storage.Path, backupPolicy) + backupBackend = NewBackupBackendLocal(*storage.Path, backupPolicy, fullBackupInProgress) case model.S3: - backupBackend = NewBackupBackendS3(storage, backupPolicy) + backupBackend = NewBackupBackendS3(storage, backupPolicy, fullBackupInProgress) default: return nil, fmt.Errorf("unsupported storage type: %d", *storage.Type) } return &BackupHandler{ - backend: backupBackend, - backupRoutine: backupRoutine, - backupPolicy: backupPolicy, - cluster: cluster, - storage: storage, - state: backupBackend.readState(), + backend: backupBackend, + backupRoutine: backupRoutine, + backupPolicy: backupPolicy, + cluster: cluster, + storage: storage, + state: backupBackend.readState(), + fullBackupInProgress: fullBackupInProgress, }, nil } @@ -104,19 +134,19 @@ func (h *BackupHandler) scheduleBackupPeriodically( func (h *BackupHandler) runFullBackup(now time.Time) { if isStaleTick(now) { - slog.Debug("Skipped full backup", "name", *h.backupPolicy.Name) + slog.Debug("Skipped full backup", "name", h.backupRoutine.Name) backupSkippedCounter.Inc() return } if !h.fullBackupInProgress.CompareAndSwap(false, true) { - slog.Debug("Backup is currently in progress, skipping full backup", "name", *h.backupPolicy.Name) + slog.Debug("Backup is currently in progress, skipping full backup", "name", h.backupRoutine.Name) return } // release the lock defer h.fullBackupInProgress.Store(false) if !h.isFullEligible(now, h.state.LastRun) { - slog.Debug("The full backup is not due to run yet", "name", *h.backupPolicy.Name) + slog.Debug("The full backup is not due to run yet", "name", h.backupRoutine.Name) return } backupRunFunc := func() { @@ -131,7 +161,7 @@ func (h *BackupHandler) runFullBackup(now time.Time) { } out := stdIO.Capture(backupRunFunc) util.LogCaptured(out) - slog.Debug("Completed full backup", "name", *h.backupPolicy.Name) + slog.Debug("Completed full backup", "name", h.backupRoutine.Name) // increment backupCounter metric backupCounter.Inc() @@ -145,22 +175,22 @@ func (h *BackupHandler) runFullBackup(now time.Time) { func (h *BackupHandler) runIncrementalBackup(now time.Time) { if isStaleTick(now) { - slog.Error("Skipped incremental backup", "name", *h.backupPolicy.Name) + slog.Error("Skipped incremental backup", "name", h.backupRoutine.Name) incrBackupSkippedCounter.Inc() return } // read the state first and check state := h.backend.readState() if state.LastRun == (time.Time{}) { - slog.Debug("Skip incremental backup until initial full backup is done", "name", *h.backupPolicy.Name) + slog.Debug("Skip incremental backup until initial full backup is done", "name", h.backupRoutine.Name) return } if !h.isIncrementalEligible(now, state.LastIncrRun) { - slog.Debug("The incremental backup is not due to run yet", "name", *h.backupPolicy.Name) + slog.Debug("The incremental backup is not due to run yet", "name", h.backupRoutine.Name) return } if h.fullBackupInProgress.Load() { - slog.Debug("Full backup is currently in progress, skipping incremental backup", "name", *h.backupPolicy.Name) + slog.Debug("Full backup is currently in progress, skipping incremental backup", "name", h.backupRoutine.Name) return } backupRunFunc := func() { @@ -177,7 +207,7 @@ func (h *BackupHandler) runIncrementalBackup(now time.Time) { } out := stdIO.Capture(backupRunFunc) util.LogCaptured(out) - slog.Debug("Completed incremental backup", "name", *h.backupPolicy.Name) + slog.Debug("Completed incremental backup", "name", h.backupRoutine.Name) // increment incrBackupCounter metric incrBackupCounter.Inc() diff --git a/pkg/service/scheduler.go b/pkg/service/scheduler.go deleted file mode 100644 index 254cc23c..00000000 --- a/pkg/service/scheduler.go +++ /dev/null @@ -1,35 +0,0 @@ -package service - -import ( - "context" - - "github.com/aerospike/backup/internal/util" - "github.com/aerospike/backup/pkg/model" - "github.com/aerospike/backup/pkg/shared" - "github.com/aerospike/backup/pkg/stdio" -) - -// backup service -var backupService shared.Backup = shared.NewBackup() - -// stdIO captures standard output -var stdIO *stdio.CgoStdio = &stdio.CgoStdio{} - -// ScheduleHandlers schedules the configured backup policies. -func ScheduleHandlers(ctx context.Context, handlers []BackupScheduler) { - for _, handler := range handlers { - handler.Schedule(ctx) - } -} - -// BuildBackupHandlers builds a list of BackupSchedulers according to -// the given configuration. -func BuildBackupHandlers(config *model.Config) []BackupScheduler { - schedulers := make([]BackupScheduler, 0, len(config.BackupPolicies)) - for _, backupRoutine := range config.BackupRoutines { - handler, err := NewBackupHandler(config, backupRoutine) - util.Check(err) - schedulers = append(schedulers, handler) - } - return schedulers -} diff --git a/pkg/shared/backup.go b/pkg/shared/backup.go index 7c9a9df2..d31a224e 100644 --- a/pkg/shared/backup.go +++ b/pkg/shared/backup.go @@ -153,7 +153,7 @@ func getIncrementalPath(storage *model.Storage) *string { } func timeSuffix() string { - return strconv.FormatInt(time.Now().Unix(), 10) + return strconv.FormatInt(time.Now().UnixMilli(), 10) } func printNodes(nodes []model.Node) *string {