Skip to content

Commit

Permalink
APPS-956 Fix race condition in backups (#49)
Browse files Browse the repository at this point in the history
* func instead of goto

* ticker inside of function

* ticker inside of function

* inline backup method

* add debug timestamp log

* save real finish time

* try run backup every second

* add mod before

* updte every second

* store state in memory

* remove extra logs

* clear log

* codereview

* extract constant
  • Loading branch information
korotkov-aerospike authored Nov 14, 2023
1 parent 20412c8 commit a59ca3d
Showing 1 changed file with 103 additions and 107 deletions.
210 changes: 103 additions & 107 deletions pkg/service/backup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ type BackupHandler struct {
backupPolicy *model.BackupPolicy
cluster *model.AerospikeCluster
storage *model.BackupStorage
state *model.BackupState
fullBackupInProgress atomic.Bool
}

var _ BackupScheduler = (*BackupHandler)(nil)

var BackupScheduleTick = 1000 * time.Millisecond

// NewBackupHandler returns a new BackupHandler instance.
func NewBackupHandler(config *model.Config, backupPolicy *model.BackupPolicy) (*BackupHandler, error) {
cluster, err := aerospikeClusterByName(*backupPolicy.SourceCluster, config.AerospikeClusters)
Expand All @@ -55,118 +58,111 @@ func NewBackupHandler(config *model.Config, backupPolicy *model.BackupPolicy) (*
backupPolicy: backupPolicy,
cluster: cluster,
storage: storage,
state: backupBackend.readState(),
}, nil
}

// scheduleFullBackup runs the full backup periodically.
func (h *BackupHandler) scheduleFullBackup(ctx context.Context) {
ticker := time.NewTicker(time.Duration(*h.backupPolicy.IntervalMillis) * time.Millisecond)
defer ticker.Stop()
loop:
for {
select {
case now := <-ticker.C:
if isStaleTick(now) {
slog.Error("Skipped full backup", "name", *h.backupPolicy.Name)
backupSkippedCounter.Inc()
break
}
// read the state first and check
state := h.backend.readState()
if !h.isFullEligible(now, state.LastRun) {
slog.Debug("The full backup is not due to run yet", "name", *h.backupPolicy.Name)
break
}
if !h.fullBackupInProgress.CompareAndSwap(false, true) {
slog.Debug("Backup is currently in progress, skipping full backup", "name", *h.backupPolicy.Name)
break
}
backupRunFunc := func() {
backupService.BackupRun(h.backupPolicy, h.cluster, h.storage, shared.BackupOptions{})
}
out := stdIO.Capture(backupRunFunc)
util.LogCaptured(out)
slog.Debug("Completed full backup", "name", *h.backupPolicy.Name)

// increment backupCounter metric
backupCounter.Inc()

// update the state
h.updateBackupState(now, state)

// clean incremental backups
h.backend.CleanDir(model.IncrementalBackupDirectory)

// release the lock
h.fullBackupInProgress.Store(false)
// Schedule schedules backup for the defining policy.
func (h *BackupHandler) Schedule(ctx context.Context) {
slog.Info("Scheduling full backup", "name", *h.backupPolicy.Name)
h.scheduleBackupPeriodically(ctx, h.runFullBackup)

case <-ctx.Done():
slog.Debug("ctx.Done in scheduleFullBackup")
break loop
}
if h.backupPolicy.IncrIntervalMillis != nil && *h.backupPolicy.IncrIntervalMillis > 0 {
slog.Info("Scheduling incremental backup", "name", *h.backupPolicy.Name)
h.scheduleBackupPeriodically(ctx, h.runIncrementalBackup)
}
slog.Info("Exiting scheduling loop for full backup", "name", *h.backupPolicy.Name)
}

// scheduleBackup runs the incremental backup periodically.
func (h *BackupHandler) scheduleIncrementalBackup(ctx context.Context) {
ticker := time.NewTicker(time.Duration(*h.backupPolicy.IncrIntervalMillis) * time.Millisecond)
defer ticker.Stop()
loop:
for {
select {
case now := <-ticker.C:
if isStaleTick(now) {
slog.Error("Skipped incremental backup", "name", *h.backupPolicy.Name)
incrBackupSkippedCounter.Inc()
break
// scheduleBackupPeriodically runs the backup periodically based on the provided interval.
func (h *BackupHandler) scheduleBackupPeriodically(
ctx context.Context,
backupFunc func(time.Time)) {
go func() {
ticker := time.NewTicker(BackupScheduleTick)
defer ticker.Stop()
for {
select {
case now := <-ticker.C:
backupFunc(now)
case <-ctx.Done():
slog.Debug("ctx.Done in scheduleBackupPeriodically")
return
}
// read the state first and check
state := h.backend.readState()
if state.LastRun == (time.Time{}) {
slog.Debug("Skip incremental backup until initial full backup is done", "name", *h.backupPolicy.Name)
break
}
if !h.isIncrementalEligible(now, state.LastIncrRun) {
slog.Debug("The incremental backup is not due to run yet", "name", *h.backupPolicy.Name)
break
}
if h.fullBackupInProgress.Load() {
slog.Debug("Full backup is currently in progress, skipping incremental backup", "name", *h.backupPolicy.Name)
break
}
backupRunFunc := func() {
opts := shared.BackupOptions{}
lastIncrRunEpoch := state.LastIncrRun.UnixNano()
opts.ModAfter = &lastIncrRunEpoch
backupService.BackupRun(h.backupPolicy, h.cluster, h.storage, opts)
}
out := stdIO.Capture(backupRunFunc)
util.LogCaptured(out)
slog.Debug("Completed incremental backup", "name", *h.backupPolicy.Name)

// increment incrBackupCounter metric
incrBackupCounter.Inc()
}
}()
// Run the backup immediately
go backupFunc(time.Now())
}

// update the state
h.updateIncrementalBackupState(now, state)
func (h *BackupHandler) runFullBackup(now time.Time) {
if isStaleTick(now) {
slog.Debug("Skipped full backup", "name", *h.backupPolicy.Name)
backupSkippedCounter.Inc()
return
}
if !h.fullBackupInProgress.CompareAndSwap(false, true) {
slog.Debug("Backup is currently in progress, skipping full backup", "name", *h.backupPolicy.Name)
return
}
// release the lock
defer h.fullBackupInProgress.Store(false)

case <-ctx.Done():
slog.Debug("ctx.Done in scheduleIncrementalBackup")
break loop
}
if !h.isFullEligible(now, h.state.LastRun) {
slog.Debug("The full backup is not due to run yet", "name", *h.backupPolicy.Name)
return
}
slog.Info("Exiting scheduling loop for incremental backup", "name", *h.backupPolicy.Name)
backupRunFunc := func() {
backupService.BackupRun(h.backupPolicy, h.cluster, h.storage, shared.BackupOptions{})
}
out := stdIO.Capture(backupRunFunc)
util.LogCaptured(out)
slog.Debug("Completed full backup", "name", *h.backupPolicy.Name)

// increment backupCounter metric
backupCounter.Inc()

// update the state
h.updateBackupState()

// clean incremental backups
h.backend.CleanDir(model.IncrementalBackupDirectory)
}

// Schedule schedules backup for the defining policy.
func (h *BackupHandler) Schedule(ctx context.Context) {
slog.Info("Scheduling full backup", "name", *h.backupPolicy.Name)
go h.scheduleFullBackup(ctx)
if h.backupPolicy.IncrIntervalMillis != nil && *h.backupPolicy.IncrIntervalMillis > 0 {
slog.Info("Scheduling incremental backup", "name", *h.backupPolicy.Name)
go h.scheduleIncrementalBackup(ctx)
func (h *BackupHandler) runIncrementalBackup(now time.Time) {
if isStaleTick(now) {
slog.Error("Skipped incremental backup", "name", *h.backupPolicy.Name)
incrBackupSkippedCounter.Inc()
return
}
// read the state first and check
state := h.backend.readState()
if state.LastRun == (time.Time{}) {
slog.Debug("Skip incremental backup until initial full backup is done", "name", *h.backupPolicy.Name)
return
}
if !h.isIncrementalEligible(now, state.LastIncrRun) {
slog.Debug("The incremental backup is not due to run yet", "name", *h.backupPolicy.Name)
return
}
if h.fullBackupInProgress.Load() {
slog.Debug("Full backup is currently in progress, skipping incremental backup", "name", *h.backupPolicy.Name)
return
}
backupRunFunc := func() {
opts := shared.BackupOptions{}
lastIncrRunEpoch := state.LastIncrRun.UnixNano()
opts.ModAfter = &lastIncrRunEpoch
backupService.BackupRun(h.backupPolicy, h.cluster, h.storage, opts)
}
out := stdIO.Capture(backupRunFunc)
util.LogCaptured(out)
slog.Debug("Completed incremental backup", "name", *h.backupPolicy.Name)

// increment incrBackupCounter metric
incrBackupCounter.Inc()

// update the state
h.updateIncrementalBackupState()
}

func (h *BackupHandler) isFullEligible(n time.Time, t time.Time) bool {
Expand All @@ -177,19 +173,19 @@ func (h *BackupHandler) isIncrementalEligible(n time.Time, t time.Time) bool {
return n.UnixMilli()-t.UnixMilli() >= *h.backupPolicy.IncrIntervalMillis
}

func (h *BackupHandler) updateBackupState(now time.Time, state *model.BackupState) {
state.LastRun = now
state.Performed++
h.writeState(state)
func (h *BackupHandler) updateBackupState() {
h.state.LastRun = time.Now()
h.state.Performed++
h.writeState()
}

func (h *BackupHandler) updateIncrementalBackupState(now time.Time, state *model.BackupState) {
state.LastIncrRun = now
h.writeState(state)
func (h *BackupHandler) updateIncrementalBackupState() {
h.state.LastIncrRun = time.Now()
h.writeState()
}

func (h *BackupHandler) writeState(state *model.BackupState) {
if err := h.backend.writeState(state); err != nil {
func (h *BackupHandler) writeState() {
if err := h.backend.writeState(h.state); err != nil {
slog.Error("Failed to write state for the backup", "name", *h.backupPolicy.Name, "err", err)
}
}
Expand Down

0 comments on commit a59ca3d

Please sign in to comment.