From 43d3342443ff70946c0f608d6a24c8ebb11a4589 Mon Sep 17 00:00:00 2001 From: Anton Korotkov <106995168+korotkov-aerospike@users.noreply.github.com> Date: Thu, 9 Nov 2023 12:10:09 +0200 Subject: [PATCH] APPS-973 Fix incremental restore from S3 (#45) * add s3 prefix * add error message * add error message * add error message * add error message * add error message * add error message * add error message * add error message * add success message * change order * add logs * add logs * add logs * add logs * add logs * remove useless logs * remove unused logs * try look at modAfter * clean code * clean code * clean code * add test --- pkg/service/backup_backend_s3.go | 3 ++- pkg/service/backup_handler.go | 4 ++-- pkg/service/s3_context.go | 5 +++-- pkg/shared/backup.go | 12 ++++++++---- pkg/util/log.go | 2 +- pkg/util/log_test.go | 13 +++++++++---- 6 files changed, 25 insertions(+), 14 deletions(-) diff --git a/pkg/service/backup_backend_s3.go b/pkg/service/backup_backend_s3.go index d4b22c53..2a26a755 100644 --- a/pkg/service/backup_backend_s3.go +++ b/pkg/service/backup_backend_s3.go @@ -86,6 +86,7 @@ func (s *BackupBackendS3) dirSize(path string) int64 { // IncrementalBackupList returns a list of available incremental backups. func (s *BackupBackendS3) IncrementalBackupList() ([]model.BackupDetails, error) { + s3prefix := "s3://" + s.bucket list, err := s.listFiles(s.Path + "/" + model.IncrementalBackupDirectory) if err != nil { return nil, err @@ -93,7 +94,7 @@ func (s *BackupBackendS3) IncrementalBackupList() ([]model.BackupDetails, error) contents := make([]model.BackupDetails, len(list)) for i, object := range list { details := model.BackupDetails{ - Key: object.Key, + Key: ptr.String(s3prefix + "/" + *object.Key), LastModified: object.LastModified, Size: &object.Size, } diff --git a/pkg/service/backup_handler.go b/pkg/service/backup_handler.go index f0752ad0..73c9e66d 100644 --- a/pkg/service/backup_handler.go +++ b/pkg/service/backup_handler.go @@ -141,12 +141,12 @@ loop: // Schedule schedules backup for the defining policy. func (h *BackupHandler) Schedule(ctx context.Context) { + slog.Info("Scheduling full backup", "name", *h.backupPolicy.Name) + go h.scheduleFullBackup(ctx) if h.backupPolicy.IncrIntervalMillis != nil && *h.backupPolicy.IncrIntervalMillis > 0 { slog.Info("Scheduling incremental backup", "name", *h.backupPolicy.Name) go h.scheduleIncrementalBackup(ctx) } - slog.Info("Scheduling full backup", "name", *h.backupPolicy.Name) - go h.scheduleFullBackup(ctx) } func (h *BackupHandler) isFullEligible(n time.Time, t time.Time) bool { diff --git a/pkg/service/s3_context.go b/pkg/service/s3_context.go index 6042a0f1..c08c8d6d 100644 --- a/pkg/service/s3_context.go +++ b/pkg/service/s3_context.go @@ -113,9 +113,10 @@ func (s *S3Context) writeFile(filePath string, v any) error { if err != nil { slog.Warn("Couldn't upload file", "path", filePath, "bucket", s.bucket, "err", err) + return err } - - return err + slog.Debug("File written", "path", filePath, "bucket", s.bucket) + return nil } // listFiles returns all files in the given s3 prefix path. diff --git a/pkg/shared/backup.go b/pkg/shared/backup.go index e8b89dfc..90db2418 100644 --- a/pkg/shared/backup.go +++ b/pkg/shared/backup.go @@ -48,8 +48,12 @@ func (b *BackupShared) BackupRun(backupPolicy *model.BackupPolicy, cluster *mode // lock to restrict parallel execution (shared library limitation) b.Lock() defer b.Unlock() - - slog.Debug(fmt.Sprintf("Starting backup for %s", *backupPolicy.Name)) + isIncremental := opts.ModAfter != nil + if isIncremental { + slog.Debug(fmt.Sprintf("Starting incremental backup for %s", *backupPolicy.Name)) + } else { + slog.Debug(fmt.Sprintf("Starting full backup for %s", *backupPolicy.Name)) + } backupConfig := C.backup_config_t{} C.backup_config_default(&backupConfig) @@ -99,7 +103,7 @@ func (b *BackupShared) BackupRun(backupPolicy *model.BackupPolicy, cluster *mode setCString(&backupConfig.s3_region, storage.S3Region) setCString(&backupConfig.s3_profile, storage.S3Profile) - if opts.ModAfter != nil { + if isIncremental { // for incremental backup setCLong(&backupConfig.mod_after, opts.ModAfter) setCString(&backupConfig.output_file, getIncrementalPath(storage)) @@ -118,7 +122,7 @@ func (b *BackupShared) BackupRun(backupPolicy *model.BackupPolicy, cluster *mode C.cf_free(unsafe.Pointer(backupStatus)) success = true } else { - slog.Warn("Failed backup operation", "policy", backupPolicy.Name) + slog.Warn("Failed backup operation", "policy", *backupPolicy.Name) } // destroy the backup_config diff --git a/pkg/util/log.go b/pkg/util/log.go index 462ec69a..f4f15bd3 100644 --- a/pkg/util/log.go +++ b/pkg/util/log.go @@ -9,7 +9,7 @@ import ( "log/slog" ) -var libLogRegex = regexp.MustCompile(`^(.+)\s\[(\D+)\]\s\[(\d+)\]\s(.*)$`) +var libLogRegex = regexp.MustCompile(`^(.+)\s\[(\D+)\]\s\[\s*(\d+)\]\s(.*)$`) // LogCaptured logs the captured std output from the shared libraries. func LogCaptured(out string) { diff --git a/pkg/util/log_test.go b/pkg/util/log_test.go index 5950f504..1f37af36 100644 --- a/pkg/util/log_test.go +++ b/pkg/util/log_test.go @@ -5,9 +5,14 @@ import ( ) func Test_libLogRegex(t *testing.T) { - e := "2023-01-01 00:00:00 UTC [INF] [74813] Starting backup for 4096 partitions from 0 to 4095" - match := libLogRegex.FindStringSubmatch(e) - if len(match) != 5 { - t.Fatal("libLogRegex error") + tests := []string{ + "2023-01-01 00:00:00 UTC [INF] [74813] Starting backup for 4096 partitions from 0 to 4095", + "2023-11-08 09:44:48 GMT [ERR] [ 14] The max S3 object size is 5497558138880"} + + for _, e := range tests { + match := libLogRegex.FindStringSubmatch(e) + if len(match) != 5 { + t.Fatal("libLogRegex error") + } } }