Skip to content

Commit

Permalink
APPS-1003 Filter backups in progress (#57)
Browse files Browse the repository at this point in the history
* filter backups

* use backup routine name in log

* move initialisation

* add comment

* merge scheuler file
  • Loading branch information
korotkov-aerospike authored Nov 21, 2023
1 parent 3403d66 commit e3656f1
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 99 deletions.
6 changes: 3 additions & 3 deletions cmd/backup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ func run() int {
// get system ctx
ctx := systemCtx()
// schedule all configured backups
handlers := service.BuildBackupHandlers(config)
service.ScheduleHandlers(ctx, handlers)
schedulers := service.BuildBackupSchedulers(config)
service.ScheduleHandlers(ctx, schedulers)
// run HTTP server
return runHTTPServer(ctx, handlers, config)
return runHTTPServer(ctx, schedulers, config)
}

err := rootCmd.Execute()
Expand Down
50 changes: 32 additions & 18 deletions pkg/service/backup_backend_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io/fs"
"os"
"path/filepath"
"sync/atomic"

"log/slog"

Expand All @@ -17,22 +18,25 @@ import (
// BackupBackendLocal implements the BackupBackend interface by
// saving state to the local file system.
type BackupBackendLocal struct {
path string
stateFilePath string
backupPolicy *model.BackupPolicy
path string
stateFilePath string
backupPolicy *model.BackupPolicy
fullBackupInProgress *atomic.Bool // BackupBackend needs to know if full backup is running to filter it out
}

var _ BackupBackend = (*BackupBackendLocal)(nil)

// NewBackupBackendLocal returns a new BackupBackendLocal instance.
func NewBackupBackendLocal(path string, backupPolicy *model.BackupPolicy) *BackupBackendLocal {
func NewBackupBackendLocal(path string, backupPolicy *model.BackupPolicy,
fullBackupInProgress *atomic.Bool) *BackupBackendLocal {
prepareDirectory(path)
prepareDirectory(path + "/" + model.IncrementalBackupDirectory)
prepareDirectory(path + "/" + model.FullBackupDirectory)
return &BackupBackendLocal{
path: path,
stateFilePath: path + "/" + model.StateFileName,
backupPolicy: backupPolicy,
path: path,
stateFilePath: path + "/" + model.StateFileName,
backupPolicy: backupPolicy,
fullBackupInProgress: fullBackupInProgress,
}
}

Expand Down Expand Up @@ -78,21 +82,28 @@ func (local *BackupBackendLocal) FullBackupList() ([]model.BackupDetails, error)
return nil, err
}

lastRun := local.readState().LastRun
if local.backupPolicy.RemoveFiles != nil && *local.backupPolicy.RemoveFiles {
// when use RemoveFiles = true, backup data is located in backupFolder folder itself
if len(entries) > 0 {
return []model.BackupDetails{{
Key: ptr.String(backupFolder),
LastModified: &local.readState().LastRun,
}}, nil
if len(entries) == 0 {
return []model.BackupDetails{}, nil
}
return []model.BackupDetails{}, nil
if local.fullBackupInProgress.Load() {
return []model.BackupDetails{}, nil
}
return []model.BackupDetails{{
Key: ptr.String(backupFolder),
LastModified: &lastRun,
}}, nil
}

backupDetails := make([]model.BackupDetails, 0)
backupDetails := make([]model.BackupDetails, 0, len(entries))
for _, e := range entries {
if e.IsDir() {
backupDetails = append(backupDetails, toBackupDetails(e, backupFolder))
details := toBackupDetails(e, backupFolder)
if details.LastModified.Before(lastRun) {
backupDetails = append(backupDetails, details)
}
}
}
return backupDetails, nil
Expand All @@ -105,11 +116,14 @@ func (local *BackupBackendLocal) IncrementalBackupList() ([]model.BackupDetails,
if err != nil {
return nil, err
}

backupDetails := make([]model.BackupDetails, 0)
lastIncrRun := local.readState().LastIncrRun
backupDetails := make([]model.BackupDetails, 0, len(entries))
for _, e := range entries {
if !e.IsDir() {
backupDetails = append(backupDetails, toBackupDetails(e, backupFolder))
details := toBackupDetails(e, backupFolder)
if details.LastModified.Before(lastIncrRun) {
backupDetails = append(backupDetails, details)
}
}
}
return backupDetails, nil
Expand Down
56 changes: 34 additions & 22 deletions pkg/service/backup_backend_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"log/slog"
"sync/atomic"

"github.com/aerospike/backup/pkg/model"
"github.com/aws/smithy-go/ptr"
Expand All @@ -11,20 +12,22 @@ import (
// saving state to AWS S3.
type BackupBackendS3 struct {
*S3Context
stateFilePath string
backupPolicy *model.BackupPolicy
stateFilePath string
backupPolicy *model.BackupPolicy
fullBackupInProgress *atomic.Bool // BackupBackend needs to know if full backup is running to filter it out
}

var _ BackupBackend = (*BackupBackendS3)(nil)

// NewBackupBackendS3 returns a new BackupBackendS3 instance.
func NewBackupBackendS3(storage *model.Storage,
backupPolicy *model.BackupPolicy) *BackupBackendS3 {
func NewBackupBackendS3(storage *model.Storage, backupPolicy *model.BackupPolicy,
fullBackupInProgress *atomic.Bool) *BackupBackendS3 {
s3Context := NewS3Context(storage)
return &BackupBackendS3{
S3Context: s3Context,
stateFilePath: s3Context.Path + "/" + model.StateFileName,
backupPolicy: backupPolicy,
S3Context: s3Context,
stateFilePath: s3Context.Path + "/" + model.StateFileName,
backupPolicy: backupPolicy,
fullBackupInProgress: fullBackupInProgress,
}
}

Expand All @@ -42,33 +45,39 @@ func (s *BackupBackendS3) writeState(state *model.BackupState) error {
func (s *BackupBackendS3) FullBackupList() ([]model.BackupDetails, error) {
backupFolder := s.Path + "/" + model.FullBackupDirectory + "/"
s3prefix := "s3://" + s.bucket
lastRun := s.readState().LastRun
if s.backupPolicy.RemoveFiles != nil && *s.backupPolicy.RemoveFiles {
// when use RemoveFiles = true, backup data is located in backupFolder folder itself
files, _ := s.listFiles(backupFolder)
if len(files) > 0 {
return []model.BackupDetails{{
Key: ptr.String(s3prefix + backupFolder),
LastModified: &s.readState().LastRun,
Size: ptr.Int64(s.dirSize(backupFolder)),
}}, nil
if len(files) == 0 {
return []model.BackupDetails{}, nil
}
return []model.BackupDetails{}, nil
if s.fullBackupInProgress.Load() {
return []model.BackupDetails{}, nil
}
return []model.BackupDetails{{
Key: ptr.String(s3prefix + backupFolder),
LastModified: &lastRun,
Size: ptr.Int64(s.dirSize(backupFolder)),
}}, nil
}

subfolders, err := s.listFolders(backupFolder)
if err != nil {
return nil, err
}
contents := make([]model.BackupDetails, len(subfolders))
for i, subfolder := range subfolders {
result := make([]model.BackupDetails, 0, len(subfolders))
for _, subfolder := range subfolders {
details := model.BackupDetails{
Key: ptr.String(s3prefix + "/" + *subfolder.Prefix),
LastModified: s.GetTime(subfolder),
Size: ptr.Int64(s.dirSize(*subfolder.Prefix)),
}
contents[i] = details
if details.LastModified.Before(lastRun) {
result = append(result, details)
}
}
return contents, err
return result, err
}

func (s *BackupBackendS3) dirSize(path string) int64 {
Expand All @@ -91,14 +100,17 @@ func (s *BackupBackendS3) IncrementalBackupList() ([]model.BackupDetails, error)
if err != nil {
return nil, err
}
contents := make([]model.BackupDetails, len(list))
for i, object := range list {
result := make([]model.BackupDetails, 0, len(list))
lastIncrRun := s.readState().LastIncrRun
for _, object := range list {
details := model.BackupDetails{
Key: ptr.String(s3prefix + "/" + *object.Key),
LastModified: object.LastModified,
Size: &object.Size,
}
contents[i] = details
if details.LastModified.Before(lastIncrRun) {
result = append(result, details)
}
}
return contents, err
return result, err
}
70 changes: 50 additions & 20 deletions pkg/service/backup_handler.go → pkg/service/backup_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync/atomic"
"time"

"github.com/aerospike/backup/pkg/stdio"

"github.com/aerospike/backup/pkg/model"
"github.com/aerospike/backup/pkg/shared"
"github.com/aerospike/backup/pkg/util"
Expand All @@ -27,15 +29,41 @@ type BackupHandler struct {
cluster *model.AerospikeCluster
storage *model.Storage
state *model.BackupState
fullBackupInProgress atomic.Bool
fullBackupInProgress *atomic.Bool
}

// stdIO captures standard output
var stdIO = &stdio.CgoStdio{}

var backupService shared.Backup = shared.NewBackup()

var _ BackupScheduler = (*BackupHandler)(nil)

var BackupScheduleTick = 1000 * time.Millisecond

// NewBackupHandler returns a new BackupHandler instance.
func NewBackupHandler(config *model.Config, backupRoutine *model.BackupRoutine) (*BackupHandler, error) {
// ScheduleHandlers schedules the configured backup policies.
func ScheduleHandlers(ctx context.Context, schedulers []BackupScheduler) {
for _, scheduler := range schedulers {
scheduler.Schedule(ctx)
}
}

// BuildBackupSchedulers builds a list of BackupSchedulers according to
// the given configuration.
func BuildBackupSchedulers(config *model.Config) []BackupScheduler {
schedulers := make([]BackupScheduler, 0, len(config.BackupPolicies))
for _, backupRoutine := range config.BackupRoutines {
scheduler, err := newBackupHandler(config, backupRoutine)
if err != nil {
panic(err)
}
schedulers = append(schedulers, scheduler)
}
return schedulers
}

// newBackupHandler returns a new BackupHandler instance.
func newBackupHandler(config *model.Config, backupRoutine *model.BackupRoutine) (*BackupHandler, error) {
cluster, found := config.AerospikeClusters[backupRoutine.SourceCluster]
if !found {
return nil, fmt.Errorf("cluster not found for %s", backupRoutine.SourceCluster)
Expand All @@ -49,23 +77,25 @@ func NewBackupHandler(config *model.Config, backupRoutine *model.BackupRoutine)
return nil, fmt.Errorf("backupPolicy not found for %s", backupRoutine.BackupPolicy)
}

fullBackupInProgress := &atomic.Bool{}
var backupBackend BackupBackend
switch *storage.Type {
case model.Local:
backupBackend = NewBackupBackendLocal(*storage.Path, backupPolicy)
backupBackend = NewBackupBackendLocal(*storage.Path, backupPolicy, fullBackupInProgress)
case model.S3:
backupBackend = NewBackupBackendS3(storage, backupPolicy)
backupBackend = NewBackupBackendS3(storage, backupPolicy, fullBackupInProgress)
default:
return nil, fmt.Errorf("unsupported storage type: %d", *storage.Type)
}

return &BackupHandler{
backend: backupBackend,
backupRoutine: backupRoutine,
backupPolicy: backupPolicy,
cluster: cluster,
storage: storage,
state: backupBackend.readState(),
backend: backupBackend,
backupRoutine: backupRoutine,
backupPolicy: backupPolicy,
cluster: cluster,
storage: storage,
state: backupBackend.readState(),
fullBackupInProgress: fullBackupInProgress,
}, nil
}

Expand Down Expand Up @@ -104,19 +134,19 @@ func (h *BackupHandler) scheduleBackupPeriodically(

func (h *BackupHandler) runFullBackup(now time.Time) {
if isStaleTick(now) {
slog.Debug("Skipped full backup", "name", *h.backupPolicy.Name)
slog.Debug("Skipped full backup", "name", h.backupRoutine.Name)
backupSkippedCounter.Inc()
return
}
if !h.fullBackupInProgress.CompareAndSwap(false, true) {
slog.Debug("Backup is currently in progress, skipping full backup", "name", *h.backupPolicy.Name)
slog.Debug("Backup is currently in progress, skipping full backup", "name", h.backupRoutine.Name)
return
}
// release the lock
defer h.fullBackupInProgress.Store(false)

if !h.isFullEligible(now, h.state.LastRun) {
slog.Debug("The full backup is not due to run yet", "name", *h.backupPolicy.Name)
slog.Debug("The full backup is not due to run yet", "name", h.backupRoutine.Name)
return
}
backupRunFunc := func() {
Expand All @@ -131,7 +161,7 @@ func (h *BackupHandler) runFullBackup(now time.Time) {
}
out := stdIO.Capture(backupRunFunc)
util.LogCaptured(out)
slog.Debug("Completed full backup", "name", *h.backupPolicy.Name)
slog.Debug("Completed full backup", "name", h.backupRoutine.Name)

// increment backupCounter metric
backupCounter.Inc()
Expand All @@ -145,22 +175,22 @@ func (h *BackupHandler) runFullBackup(now time.Time) {

func (h *BackupHandler) runIncrementalBackup(now time.Time) {
if isStaleTick(now) {
slog.Error("Skipped incremental backup", "name", *h.backupPolicy.Name)
slog.Error("Skipped incremental backup", "name", h.backupRoutine.Name)
incrBackupSkippedCounter.Inc()
return
}
// read the state first and check
state := h.backend.readState()
if state.LastRun == (time.Time{}) {
slog.Debug("Skip incremental backup until initial full backup is done", "name", *h.backupPolicy.Name)
slog.Debug("Skip incremental backup until initial full backup is done", "name", h.backupRoutine.Name)
return
}
if !h.isIncrementalEligible(now, state.LastIncrRun) {
slog.Debug("The incremental backup is not due to run yet", "name", *h.backupPolicy.Name)
slog.Debug("The incremental backup is not due to run yet", "name", h.backupRoutine.Name)
return
}
if h.fullBackupInProgress.Load() {
slog.Debug("Full backup is currently in progress, skipping incremental backup", "name", *h.backupPolicy.Name)
slog.Debug("Full backup is currently in progress, skipping incremental backup", "name", h.backupRoutine.Name)
return
}
backupRunFunc := func() {
Expand All @@ -177,7 +207,7 @@ func (h *BackupHandler) runIncrementalBackup(now time.Time) {
}
out := stdIO.Capture(backupRunFunc)
util.LogCaptured(out)
slog.Debug("Completed incremental backup", "name", *h.backupPolicy.Name)
slog.Debug("Completed incremental backup", "name", h.backupRoutine.Name)

// increment incrBackupCounter metric
incrBackupCounter.Inc()
Expand Down
Loading

0 comments on commit e3656f1

Please sign in to comment.