Skip to content

Commit

Permalink
APPS-982 Block parallel execution of backups for the same policy (#47)
Browse files Browse the repository at this point in the history
* check for full backup before run incremental

* add boolean lock

* lock on full backups

* codereview
  • Loading branch information
korotkov-aerospike authored Nov 9, 2023
1 parent 43d3342 commit d4ad103
Showing 1 changed file with 57 additions and 37 deletions.
94 changes: 57 additions & 37 deletions pkg/service/backup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"sync/atomic"
"time"

"github.com/aerospike/backup/pkg/model"
Expand All @@ -19,10 +20,11 @@ type BackupScheduler interface {

// BackupHandler handles a configured backup policy.
type BackupHandler struct {
backend BackupBackend
backupPolicy *model.BackupPolicy
cluster *model.AerospikeCluster
storage *model.BackupStorage
backend BackupBackend
backupPolicy *model.BackupPolicy
cluster *model.AerospikeCluster
storage *model.BackupStorage
fullBackupInProgress atomic.Bool
}

var _ BackupScheduler = (*BackupHandler)(nil)
Expand Down Expand Up @@ -71,24 +73,33 @@ loop:
}
// read the state first and check
state := h.backend.readState()
if h.isFullEligible(now, state.LastRun) {
backupRunFunc := func() {
backupService.BackupRun(h.backupPolicy, h.cluster, h.storage, shared.BackupOptions{})
}
out := stdIO.Capture(backupRunFunc)
util.LogCaptured(out)
slog.Debug("Completed full backup", "name", *h.backupPolicy.Name)

// increment backupCounter metric
backupCounter.Inc()

// update the state
h.updateBackupState(now, state)
// clean incremental backups
h.backend.CleanDir(model.IncrementalBackupDirectory)
} else {
if !h.isFullEligible(now, state.LastRun) {
slog.Debug("The full backup is not due to run yet", "name", *h.backupPolicy.Name)
break
}
if !h.fullBackupInProgress.CompareAndSwap(false, true) {
slog.Debug("Backup is currently in progress, skipping full backup", "name", *h.backupPolicy.Name)
break
}
backupRunFunc := func() {
backupService.BackupRun(h.backupPolicy, h.cluster, h.storage, shared.BackupOptions{})
}
out := stdIO.Capture(backupRunFunc)
util.LogCaptured(out)
slog.Debug("Completed full backup", "name", *h.backupPolicy.Name)

// increment backupCounter metric
backupCounter.Inc()

// update the state
h.updateBackupState(now, state)

// clean incremental backups
h.backend.CleanDir(model.IncrementalBackupDirectory)

// release the lock
h.fullBackupInProgress.Store(false)

case <-ctx.Done():
slog.Debug("ctx.Done in scheduleFullBackup")
break loop
Expand All @@ -112,25 +123,34 @@ loop:
}
// read the state first and check
state := h.backend.readState()
if h.isIncrementalEligible(now, state.LastIncrRun) {
backupRunFunc := func() {
opts := shared.BackupOptions{}
lastIncrRunEpoch := state.LastIncrRun.UnixNano()
opts.ModAfter = &lastIncrRunEpoch
backupService.BackupRun(h.backupPolicy, h.cluster, h.storage, opts)
}
out := stdIO.Capture(backupRunFunc)
util.LogCaptured(out)
slog.Debug("Completed incremental backup", "name", *h.backupPolicy.Name)

// increment incrBackupCounter metric
incrBackupCounter.Inc()

// update the state
h.updateIncrementalBackupState(now, state)
} else {
if state.LastRun == (time.Time{}) {
slog.Debug("Skip incremental backup until initial full backup is done", "name", *h.backupPolicy.Name)
break
}
if !h.isIncrementalEligible(now, state.LastIncrRun) {
slog.Debug("The incremental backup is not due to run yet", "name", *h.backupPolicy.Name)
break
}
if h.fullBackupInProgress.Load() {
slog.Debug("Full backup is currently in progress, skipping incremental backup", "name", *h.backupPolicy.Name)
break
}
backupRunFunc := func() {
opts := shared.BackupOptions{}
lastIncrRunEpoch := state.LastIncrRun.UnixNano()
opts.ModAfter = &lastIncrRunEpoch
backupService.BackupRun(h.backupPolicy, h.cluster, h.storage, opts)
}
out := stdIO.Capture(backupRunFunc)
util.LogCaptured(out)
slog.Debug("Completed incremental backup", "name", *h.backupPolicy.Name)

// increment incrBackupCounter metric
incrBackupCounter.Inc()

// update the state
h.updateIncrementalBackupState(now, state)

case <-ctx.Done():
slog.Debug("ctx.Done in scheduleIncrementalBackup")
break loop
Expand Down

0 comments on commit d4ad103

Please sign in to comment.