Skip to content

Commit

Permalink
APPS-973 Fix incremental restore from S3 (#45)
Browse files Browse the repository at this point in the history
* add s3 prefix

* add error message

* add error message

* add error message

* add error message

* add error message

* add error message

* add error message

* add error message

* add success message

* change order

* add logs

* add logs

* add logs

* add logs

* add logs

* remove useless logs

* remove unused logs

* try look at modAfter

* clean code

* clean code

* clean code

* add test
  • Loading branch information
korotkov-aerospike authored Nov 9, 2023
1 parent 3c9b5ea commit 43d3342
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 14 deletions.
3 changes: 2 additions & 1 deletion pkg/service/backup_backend_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,15 @@ func (s *BackupBackendS3) dirSize(path string) int64 {

// IncrementalBackupList returns a list of available incremental backups.
func (s *BackupBackendS3) IncrementalBackupList() ([]model.BackupDetails, error) {
s3prefix := "s3://" + s.bucket
list, err := s.listFiles(s.Path + "/" + model.IncrementalBackupDirectory)
if err != nil {
return nil, err
}
contents := make([]model.BackupDetails, len(list))
for i, object := range list {
details := model.BackupDetails{
Key: object.Key,
Key: ptr.String(s3prefix + "/" + *object.Key),
LastModified: object.LastModified,
Size: &object.Size,
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/service/backup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ loop:

// Schedule schedules backup for the defining policy.
func (h *BackupHandler) Schedule(ctx context.Context) {
slog.Info("Scheduling full backup", "name", *h.backupPolicy.Name)
go h.scheduleFullBackup(ctx)
if h.backupPolicy.IncrIntervalMillis != nil && *h.backupPolicy.IncrIntervalMillis > 0 {
slog.Info("Scheduling incremental backup", "name", *h.backupPolicy.Name)
go h.scheduleIncrementalBackup(ctx)
}
slog.Info("Scheduling full backup", "name", *h.backupPolicy.Name)
go h.scheduleFullBackup(ctx)
}

func (h *BackupHandler) isFullEligible(n time.Time, t time.Time) bool {
Expand Down
5 changes: 3 additions & 2 deletions pkg/service/s3_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ func (s *S3Context) writeFile(filePath string, v any) error {
if err != nil {
slog.Warn("Couldn't upload file", "path", filePath,
"bucket", s.bucket, "err", err)
return err
}

return err
slog.Debug("File written", "path", filePath, "bucket", s.bucket)
return nil
}

// listFiles returns all files in the given s3 prefix path.
Expand Down
12 changes: 8 additions & 4 deletions pkg/shared/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,12 @@ func (b *BackupShared) BackupRun(backupPolicy *model.BackupPolicy, cluster *mode
// lock to restrict parallel execution (shared library limitation)
b.Lock()
defer b.Unlock()

slog.Debug(fmt.Sprintf("Starting backup for %s", *backupPolicy.Name))
isIncremental := opts.ModAfter != nil
if isIncremental {
slog.Debug(fmt.Sprintf("Starting incremental backup for %s", *backupPolicy.Name))
} else {
slog.Debug(fmt.Sprintf("Starting full backup for %s", *backupPolicy.Name))
}

backupConfig := C.backup_config_t{}
C.backup_config_default(&backupConfig)
Expand Down Expand Up @@ -99,7 +103,7 @@ func (b *BackupShared) BackupRun(backupPolicy *model.BackupPolicy, cluster *mode
setCString(&backupConfig.s3_region, storage.S3Region)
setCString(&backupConfig.s3_profile, storage.S3Profile)

if opts.ModAfter != nil {
if isIncremental {
// for incremental backup
setCLong(&backupConfig.mod_after, opts.ModAfter)
setCString(&backupConfig.output_file, getIncrementalPath(storage))
Expand All @@ -118,7 +122,7 @@ func (b *BackupShared) BackupRun(backupPolicy *model.BackupPolicy, cluster *mode
C.cf_free(unsafe.Pointer(backupStatus))
success = true
} else {
slog.Warn("Failed backup operation", "policy", backupPolicy.Name)
slog.Warn("Failed backup operation", "policy", *backupPolicy.Name)
}

// destroy the backup_config
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"log/slog"
)

var libLogRegex = regexp.MustCompile(`^(.+)\s\[(\D+)\]\s\[(\d+)\]\s(.*)$`)
var libLogRegex = regexp.MustCompile(`^(.+)\s\[(\D+)\]\s\[\s*(\d+)\]\s(.*)$`)

// LogCaptured logs the captured std output from the shared libraries.
func LogCaptured(out string) {
Expand Down
13 changes: 9 additions & 4 deletions pkg/util/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ import (
)

func Test_libLogRegex(t *testing.T) {
e := "2023-01-01 00:00:00 UTC [INF] [74813] Starting backup for 4096 partitions from 0 to 4095"
match := libLogRegex.FindStringSubmatch(e)
if len(match) != 5 {
t.Fatal("libLogRegex error")
tests := []string{
"2023-01-01 00:00:00 UTC [INF] [74813] Starting backup for 4096 partitions from 0 to 4095",
"2023-11-08 09:44:48 GMT [ERR] [ 14] The max S3 object size is 5497558138880"}

for _, e := range tests {
match := libLogRegex.FindStringSubmatch(e)
if len(match) != 5 {
t.Fatal("libLogRegex error")
}
}
}

0 comments on commit 43d3342

Please sign in to comment.