diff --git a/config/config.yml b/config/config.yml index 1580efa1..5aacfea2 100644 --- a/config/config.yml +++ b/config/config.yml @@ -25,30 +25,30 @@ storage: backup-policies: keepFilesMaxRecordsPolicy: parallel: 8 - remove-files: KeepAll sealed: true retry-delay: 10_000 max-retries: 10 keepFilesPolicy: parallel: 8 - remove-files: KeepAll sealed: true removeFilesPolicy: parallel: 8 - remove-files: RemoveAll sealed: true + retention: + full: 1 + incremental: 0 removeIncrementalPolicy: parallel: 8 remove-files: RemoveIncremental sealed: true + retention: + incremental: 0 keepFilesPolicySlow: parallel: 8 - remove-files: KeepAll bandwidth: 1 sealed: true encryptedCompressedPolicy128: parallel: 8 - remove-files: RemoveAll sealed: true encryption: key-file: encryptionKey @@ -58,7 +58,6 @@ backup-policies: mode: ZSTD encryptedCompressedPolicy256: parallel: 8 - remove-files: RemoveAll sealed: true encryption: key-file: encryptionKey @@ -68,11 +67,9 @@ backup-policies: mode: ZSTD notSealed: parallel: 8 - remove-files: KeepAll sealed: false noIndexesUdfsRecords: parallel: 8 - remove-files: KeepAll sealed: true no-indexes: true no-records: true diff --git a/docs/docs.go b/docs/docs.go index 4cf3758b..a0ae0f0c 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1810,16 +1810,11 @@ const docTemplate = `{ "type": "integer", "example": 1000 }, - "remove-files": { - "description": "Whether to clear the output directory (default: KeepAll).", - "enum": [ - "KeepAll", - "RemoveAll", - "RemoveIncremental" - ], + "retention": { + "description": "Specifies how long to retain full and incremental backups.", "allOf": [ { - "$ref": "#/definitions/dto.RemoveFilesType" + "$ref": "#/definitions/dto.RetentionPolicy" } ] }, @@ -1860,7 +1855,7 @@ const docTemplate = `{ "backup-policy": { "description": "The name of the corresponding backup policy.", "type": "string", - "example": "keepAll" + "example": "keepAllPolicy" }, "bin-list": { "description": "The list of backup bin names (optional, an empty list implies backing up all bins).", @@ -2325,20 +2320,6 @@ const docTemplate = `{ } } }, - "dto.RemoveFilesType": { - "description": "RemoveFilesType represents the type of the backup storage.", - "type": "string", - "enum": [ - "KeepAll", - "RemoveAll", - "RemoveIncremental" - ], - "x-enum-varnames": [ - "KeepAll", - "RemoveAll", - "RemoveIncremental" - ] - }, "dto.RestoreJobStatus": { "description": "RestoreJobStatus represents a restore job status.", "type": "object", @@ -2627,6 +2608,19 @@ const docTemplate = `{ } } }, + "dto.RetentionPolicy": { + "type": "object", + "properties": { + "full": { + "description": "Number of full backups to store:\n- If nil, retain all full backups.\n- If N is specified, retain the last N full backups.\n- The minimum value is 1.", + "type": "integer" + }, + "incremental": { + "description": "Number of full backups to store incremental backups for:\n- If nil, retain all incremental backups.\n- If N is specified, retain incremental backups for the last N full backups.\n- If set to 0, do not retain any incremental backups.\n- Must not exceed the value of FullBackups.", + "type": "integer" + } + } + }, "dto.RetryPolicy": { "description": "RetryPolicy defines the configuration for retry attempts in case of failures.", "type": "object", diff --git a/docs/openapi.json b/docs/openapi.json index 7972a5db..cb285452 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -2021,11 +2021,11 @@ "example" : 1000, "type" : "integer" }, - "remove-files" : { + "retention" : { "allOf" : [ { - "$ref" : "#/components/schemas/dto.RemoveFilesType" + "$ref" : "#/components/schemas/dto.RetentionPolicy" } ], - "description" : "Whether to clear the output directory (default: KeepAll).", + "description" : "Specifies how long to retain full and incremental backups.", "type" : "object" }, "retry-policy" : { @@ -2057,7 +2057,7 @@ "properties" : { "backup-policy" : { "description" : "The name of the corresponding backup policy.", - "example" : "keepAll", + "example" : "keepAllPolicy", "type" : "string" }, "bin-list" : { @@ -2462,12 +2462,6 @@ }, "type" : "object" }, - "dto.RemoveFilesType" : { - "description" : "RemoveFilesType represents the type of the backup storage.", - "enum" : [ "KeepAll", "RemoveAll", "RemoveIncremental" ], - "type" : "string", - "x-enum-varnames" : [ "KeepAll", "RemoveAll", "RemoveIncremental" ] - }, "dto.RestoreJobStatus" : { "description" : "RestoreJobStatus represents a restore job status.", "properties" : { @@ -2730,6 +2724,19 @@ "required" : [ "destination", "policy", "routine", "time" ], "type" : "object" }, + "dto.RetentionPolicy" : { + "properties" : { + "full" : { + "description" : "Number of full backups to store:\n- If nil, retain all full backups.\n- If N is specified, retain the last N full backups.\n- The minimum value is 1.", + "type" : "integer" + }, + "incremental" : { + "description" : "Number of full backups to store incremental backups for:\n- If nil, retain all incremental backups.\n- If N is specified, retain incremental backups for the last N full backups.\n- If set to 0, do not retain any incremental backups.\n- Must not exceed the value of FullBackups.", + "type" : "integer" + } + }, + "type" : "object" + }, "dto.RetryPolicy" : { "description" : "RetryPolicy defines the configuration for retry attempts in case of failures.", "properties" : { diff --git a/docs/openapi.yaml b/docs/openapi.yaml index 3c078b1a..6ea6a770 100644 --- a/docs/openapi.yaml +++ b/docs/openapi.yaml @@ -1420,7 +1420,6 @@ components: bandwidth: 10000 total-timeout: 2000 sealed: true - remove-files: "{}" encryption: "{}" parallel: 1 no-indexes: true @@ -1429,6 +1428,7 @@ components: file-limit: 1024 retry-policy: "{}" socket-timeout: 1000 + retention: "{}" properties: bandwidth: description: |- @@ -1471,10 +1471,10 @@ components: the records-per-second limit is not applied. example: 1000 type: integer - remove-files: + retention: allOf: - - $ref: '#/components/schemas/dto.RemoveFilesType' - description: "Whether to clear the output directory (default: KeepAll)." + - $ref: '#/components/schemas/dto.RetentionPolicy' + description: Specifies how long to retain full and incremental backups. type: object retry-policy: allOf: @@ -1512,7 +1512,7 @@ components: set-list: - set1 interval-cron: 0 0 * * * * - backup-policy: keepAll + backup-policy: keepAllPolicy node-list: - ":[" - ":[" @@ -1527,7 +1527,7 @@ components: properties: backup-policy: description: The name of the corresponding backup policy. - example: keepAll + example: keepAllPolicy type: string bin-list: description: "The list of backup bin names (optional, an empty list implies\ @@ -1653,7 +1653,6 @@ components: bandwidth: 10000 total-timeout: 2000 sealed: true - remove-files: "{}" encryption: "{}" parallel: 1 no-indexes: true @@ -1662,6 +1661,7 @@ components: file-limit: 1024 retry-policy: "{}" socket-timeout: 1000 + retention: "{}" aerospike-clusters: key: credentials: "{}" @@ -1704,7 +1704,7 @@ components: set-list: - set1 interval-cron: 0 0 * * * * - backup-policy: keepAll + backup-policy: keepAllPolicy node-list: - ":[" - ":[" @@ -1985,17 +1985,6 @@ components: type: string type: array type: object - dto.RemoveFilesType: - description: RemoveFilesType represents the type of the backup storage. - enum: - - KeepAll - - RemoveAll - - RemoveIncremental - type: string - x-enum-varnames: - - KeepAll - - RemoveAll - - RemoveIncremental dto.RestoreJobStatus: description: RestoreJobStatus represents a restore job status. example: @@ -2261,6 +2250,24 @@ components: - routine - time type: object + dto.RetentionPolicy: + properties: + full: + description: |- + Number of full backups to store: + - If nil, retain all full backups. + - If N is specified, retain the last N full backups. + - The minimum value is 1. + type: integer + incremental: + description: |- + Number of full backups to store incremental backups for: + - If nil, retain all incremental backups. + - If N is specified, retain incremental backups for the last N full backups. + - If set to 0, do not retain any incremental backups. + - Must not exceed the value of FullBackups. + type: integer + type: object dto.RetryPolicy: description: RetryPolicy defines the configuration for retry attempts in case of failures. diff --git a/internal/server/handlers/config_policy_test.go b/internal/server/handlers/config_policy_test.go index 500284a2..57068ae5 100644 --- a/internal/server/handlers/config_policy_test.go +++ b/internal/server/handlers/config_policy_test.go @@ -19,9 +19,7 @@ const ( func testConfigBackupPolicy() *dto.BackupPolicy { testInt := 10 - keepFiles := dto.KeepAll return &dto.BackupPolicy{ - RemoveFiles: &keepFiles, Parallel: &testInt, SocketTimeout: &testInt, TotalTimeout: &testInt, diff --git a/pkg/dto/backup_policy.go b/pkg/dto/backup_policy.go index edafe21e..a5508313 100644 --- a/pkg/dto/backup_policy.go +++ b/pkg/dto/backup_policy.go @@ -9,20 +9,8 @@ import ( "github.com/aerospike/aerospike-backup-service/v2/pkg/model" ) -const ( - KeepAll RemoveFilesType = "KeepAll" - RemoveAll RemoveFilesType = "RemoveAll" - RemoveIncremental RemoveFilesType = "RemoveIncremental" -) - -// RemoveFilesType represents the type of the backup storage. -// @Description RemoveFilesType represents the type of the backup storage. -type RemoveFilesType string - // BackupPolicy represents a scheduled backup policy. // @Description BackupPolicy represents a scheduled backup policy. -// -//nolint:lll type BackupPolicy struct { // Maximum number of scan calls to run in parallel. Parallel *int `yaml:"parallel,omitempty" json:"parallel,omitempty" example:"1"` @@ -34,8 +22,8 @@ type BackupPolicy struct { // RetryPolicy defines the configuration for retry attempts in case of failures. // If nil, default policy is used. RetryPolicy *RetryPolicy `yaml:"retry-policy,omitempty" json:"retry-policy,omitempty"` - // Whether to clear the output directory (default: KeepAll). - RemoveFiles *RemoveFilesType `yaml:"remove-files,omitempty" json:"remove-files,omitempty" enums:"KeepAll,RemoveAll,RemoveIncremental"` + // Specifies how long to retain full and incremental backups. + RetentionPolicy *RetentionPolicy `yaml:"retention,omitempty" json:"retention,omitempty"` // Do not back up any record data (metadata or bin data). NoRecords *bool `yaml:"no-records,omitempty" json:"no-records,omitempty"` // Do not back up any secondary index definitions. @@ -101,9 +89,8 @@ func (p *BackupPolicy) Validate() error { if p.FileLimit != nil && *p.FileLimit < 0 { return fmt.Errorf("fileLimit %d invalid, should not be negative number", *p.FileLimit) } - if p.RemoveFiles != nil && - *p.RemoveFiles != KeepAll && *p.RemoveFiles != RemoveAll && *p.RemoveFiles != RemoveIncremental { - return fmt.Errorf("invalid RemoveFiles: %s. Possible values: KeepAll, RemoveAll, RemoveIncremental", *p.RemoveFiles) + if err := p.RetentionPolicy.Validate(); err != nil { + return fmt.Errorf("invalid retention policy: %w", err) } if err := p.EncryptionPolicy.Validate(); err != nil { return err @@ -120,7 +107,7 @@ func (p *BackupPolicy) ToModel() *model.BackupPolicy { SocketTimeout: millisToDuration(p.SocketTimeout), TotalTimeout: millisToDuration(p.TotalTimeout), RetryPolicy: p.RetryPolicy.ToModel(), - RemoveFiles: (*model.RemoveFilesType)(p.RemoveFiles), + RetentionPolicy: p.RetentionPolicy.toModel(), NoRecords: p.NoRecords, NoIndexes: p.NoIndexes, NoUdfs: p.NoUdfs, @@ -164,7 +151,7 @@ func (p *BackupPolicy) fromModel(m *model.BackupPolicy) { p.SocketTimeout = durationToMillis(m.SocketTimeout) p.TotalTimeout = durationToMillis(m.TotalTimeout) p.RetryPolicy = newRetryPolicyFromModel(m.RetryPolicy) - p.RemoveFiles = (*RemoveFilesType)(m.RemoveFiles) + p.RetentionPolicy = newRetentionPolicyFromModel(m.RetentionPolicy) p.NoRecords = m.NoRecords p.NoIndexes = m.NoIndexes p.NoUdfs = m.NoUdfs @@ -181,3 +168,61 @@ func (p *BackupPolicy) fromModel(m *model.BackupPolicy) { } p.Sealed = m.Sealed } + +// RetentionPolicy specifies how many full and incremental backups to keep. +type RetentionPolicy struct { + // Number of full backups to store: + // - If nil, retain all full backups. + // - If N is specified, retain the last N full backups. + // - The minimum value is 1. + FullBackups *int `json:"full,omitempty" yaml:"full,omitempty"` + + // Number of full backups to store incremental backups for: + // - If nil, retain all incremental backups. + // - If N is specified, retain incremental backups for the last N full backups. + // - If set to 0, do not retain any incremental backups. + // - Must not exceed the value of FullBackups. + IncrBackups *int `json:"incremental,omitempty" yaml:"incremental,omitempty"` +} + +func (rp *RetentionPolicy) Validate() error { + if rp == nil { + return nil + } + if rp.FullBackups != nil && *rp.FullBackups < 1 { + return fmt.Errorf("full backups retention %d is invalid, must be at least 1", *rp.FullBackups) + } + + if rp.IncrBackups != nil { + if *rp.IncrBackups < 0 { + return fmt.Errorf("incremental backups retention %d is invalid, cannot be negative", *rp.IncrBackups) + } + if rp.FullBackups != nil && *rp.IncrBackups > *rp.FullBackups { + return fmt.Errorf("incremental backups retention %d cannot exceed full backups retention %d", + *rp.IncrBackups, *rp.FullBackups) + } + } + + return nil +} +func (rp *RetentionPolicy) toModel() *model.RetentionPolicy { + if rp == nil { + return nil + } + + return &model.RetentionPolicy{ + FullBackups: rp.FullBackups, + IncrBackups: rp.IncrBackups, + } +} + +func newRetentionPolicyFromModel(m *model.RetentionPolicy) *RetentionPolicy { + if m == nil { + return nil + } + + return &RetentionPolicy{ + FullBackups: m.FullBackups, + IncrBackups: m.IncrBackups, + } +} diff --git a/pkg/dto/backup_policy_test.go b/pkg/dto/backup_policy_test.go index 880b9135..c4fe217c 100644 --- a/pkg/dto/backup_policy_test.go +++ b/pkg/dto/backup_policy_test.go @@ -3,6 +3,7 @@ package dto import ( "testing" + "github.com/aerospike/aerospike-backup-service/v2/pkg/util" "github.com/stretchr/testify/require" ) @@ -11,7 +12,6 @@ func TestBackupPolicyConversionIsLossless(t *testing.T) { socketTimeout := 5000 totalTimeout := 10000 retryPolicy := &RetryPolicy{MaxRetries: 3} - removeFiles := KeepAll noRecords := true noIndexes := false noUdfs := true @@ -22,11 +22,14 @@ func TestBackupPolicyConversionIsLossless(t *testing.T) { sealed := true original := &BackupPolicy{ - Parallel: ¶llel, - SocketTimeout: &socketTimeout, - TotalTimeout: &totalTimeout, - RetryPolicy: retryPolicy, - RemoveFiles: &removeFiles, + Parallel: ¶llel, + SocketTimeout: &socketTimeout, + TotalTimeout: &totalTimeout, + RetryPolicy: retryPolicy, + RetentionPolicy: &RetentionPolicy{ + FullBackups: util.Ptr(10), + IncrBackups: util.Ptr(5), + }, NoRecords: &noRecords, NoIndexes: &noIndexes, NoUdfs: &noUdfs, @@ -43,3 +46,59 @@ func TestBackupPolicyConversionIsLossless(t *testing.T) { require.Equal(t, original, result) } + +func TestRetentionPolicy_Validate(t *testing.T) { + tests := []struct { + name string + policy *RetentionPolicy + expectedErr string + }{ + { + name: "nil policy (no validation)", + policy: nil, + expectedErr: "", + }, + { + name: "valid policy with both full and incremental backups", + policy: &RetentionPolicy{FullBackups: util.Ptr(5), IncrBackups: util.Ptr(3)}, + expectedErr: "", + }, + { + name: "valid policy with only full backups", + policy: &RetentionPolicy{FullBackups: util.Ptr(2), IncrBackups: nil}, + expectedErr: "", + }, + { + name: "valid policy with only incremental backups set to zero", + policy: &RetentionPolicy{FullBackups: util.Ptr(3), IncrBackups: util.Ptr(0)}, + expectedErr: "", + }, + { + name: "invalid full backups: less than 1", + policy: &RetentionPolicy{FullBackups: util.Ptr(0), IncrBackups: util.Ptr(1)}, + expectedErr: "full backups retention 0 is invalid, must be at least 1", + }, + { + name: "invalid incremental backups: negative value", + policy: &RetentionPolicy{FullBackups: util.Ptr(3), IncrBackups: util.Ptr(-1)}, + expectedErr: "incremental backups retention -1 is invalid, cannot be negative", + }, + { + name: "invalid incremental backups: exceeds full backups", + policy: &RetentionPolicy{FullBackups: util.Ptr(3), IncrBackups: util.Ptr(5)}, + expectedErr: "incremental backups retention 5 cannot exceed full backups retention 3", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.policy.Validate() + if tt.expectedErr == "" { + require.NoError(t, err) + } else { + require.Error(t, err) + require.EqualError(t, err, tt.expectedErr) + } + }) + } +} diff --git a/pkg/dto/backup_routine.go b/pkg/dto/backup_routine.go index a0d1b812..70fe9ee2 100644 --- a/pkg/dto/backup_routine.go +++ b/pkg/dto/backup_routine.go @@ -16,7 +16,7 @@ import ( //nolint:lll type BackupRoutine struct { // The name of the corresponding backup policy. - BackupPolicy string `yaml:"backup-policy,omitempty" json:"backup-policy,omitempty" example:"keepAll" validate:"required"` + BackupPolicy string `yaml:"backup-policy,omitempty" json:"backup-policy,omitempty" example:"keepAllPolicy" validate:"required"` // The name of the corresponding source cluster. SourceCluster string `yaml:"source-cluster,omitempty" json:"source-cluster,omitempty" example:"testCluster" validate:"required"` // The name of the corresponding storage provider configuration. diff --git a/pkg/model/backup_policy.go b/pkg/model/backup_policy.go index 5605fb7f..bba2d3bb 100644 --- a/pkg/model/backup_policy.go +++ b/pkg/model/backup_policy.go @@ -7,16 +7,6 @@ import ( "github.com/aerospike/backup-go/models" ) -const ( - KeepAll RemoveFilesType = "KeepAll" - RemoveAll RemoveFilesType = "RemoveAll" - RemoveIncremental RemoveFilesType = "RemoveIncremental" -) - -// RemoveFilesType represents the type of the backup storage. -// @Description RemoveFilesType represents the type of the backup storage. -type RemoveFilesType string - // BackupPolicy represents a scheduled backup policy. type BackupPolicy struct { // Maximum number of scan calls to run in parallel. @@ -28,8 +18,8 @@ type BackupPolicy struct { TotalTimeout *time.Duration // RetryPolicy defines the configuration for retry attempts in case of failures. RetryPolicy *models.RetryPolicy - // Whether to clear the output directory (default: KeepAll). - RemoveFiles *RemoveFilesType + // Specifies how long to retain full and incremental backups. + RetentionPolicy *RetentionPolicy // Do not back up any record data (metadata or bin data). NoRecords *bool // Do not back up any secondary index definitions. @@ -72,7 +62,7 @@ func (p *BackupPolicy) CopySMDDisabled() *BackupPolicy { SocketTimeout: p.SocketTimeout, TotalTimeout: p.TotalTimeout, RetryPolicy: p.RetryPolicy, - RemoveFiles: p.RemoveFiles, + RetentionPolicy: p.RetentionPolicy, NoRecords: p.NoRecords, NoIndexes: util.Ptr(true), NoUdfs: util.Ptr(true), @@ -128,12 +118,7 @@ func (p *BackupPolicy) GetSocketTimeoutOrDefault() time.Duration { return *p.SocketTimeout } -func (r *RemoveFilesType) RemoveFullBackup() bool { - // Full backups are deleted only if RemoveFiles is explicitly set to RemoveAll - return r != nil && *r == RemoveAll -} - -func (r *RemoveFilesType) RemoveIncrementalBackup() bool { - // Incremental backups are deleted only if RemoveFiles is explicitly set to RemoveAll or RemoveIncremental - return r != nil && (*r == RemoveIncremental || *r == RemoveAll) +type RetentionPolicy struct { + FullBackups *int // Number of full backups to store + IncrBackups *int // Number of full backups to store incremental backups for } diff --git a/pkg/service/backup_backend.go b/pkg/service/backup_backend.go index 07cc0529..2ab9068b 100644 --- a/pkg/service/backup_backend.go +++ b/pkg/service/backup_backend.go @@ -21,19 +21,16 @@ import ( // BackupBackend handles the backup management logic, employing a StorageAccessor // implementation for I/O operations. type BackupBackend struct { - storage model.Storage - routineName string - removeFullBackup bool + storage model.Storage + routineName string } var _ BackupListReader = (*BackupBackend)(nil) -func newBackend(routineName string, routine *model.BackupRoutine) *BackupBackend { - removeFullBackup := routine.BackupPolicy.RemoveFiles.RemoveFullBackup() +func newBackend(routineName string, storage model.Storage) *BackupBackend { return &BackupBackend{ - storage: routine.Storage, - routineName: routineName, - removeFullBackup: removeFullBackup, + storage: storage, + routineName: routineName, } } @@ -70,18 +67,19 @@ func (b *BackupBackend) writeBackupMetadata(ctx context.Context, path string, me // FullBackupList returns a list of available full backups. func (b *BackupBackend) FullBackupList(ctx context.Context, timeBounds model.TimeBounds, ) ([]model.BackupDetails, error) { - return b.readMetadataList(ctx, timeBounds, true) + return b.readMetadataList(ctx, timeBounds, jobTypeFull) } // IncrementalBackupList returns a list of available incremental backups. func (b *BackupBackend) IncrementalBackupList(ctx context.Context, timeBounds model.TimeBounds, ) ([]model.BackupDetails, error) { - return b.readMetadataList(ctx, timeBounds, false) + return b.readMetadataList(ctx, timeBounds, jobTypeIncremental) } -func (b *BackupBackend) readMetadataList(ctx context.Context, timebounds model.TimeBounds, isFullBackup bool, +func (b *BackupBackend) readMetadataList( + ctx context.Context, timebounds model.TimeBounds, backupType jobType, ) ([]model.BackupDetails, error) { - backupRoot := getBackupRootPath(b.routineName, isFullBackup) + backupRoot := getBackupRootPath(b.routineName, backupType) files, err := storage.ReadFiles(ctx, b.storage, backupRoot, metadataFile, timebounds.FromTime) if err != nil { if errors.Is(err, os.ErrNotExist) || strings.Contains(err.Error(), "is empty") { @@ -99,7 +97,7 @@ func (b *BackupBackend) readMetadataList(ctx context.Context, timebounds model.T if timebounds.Contains(metadata.Created) { backups = append(backups, model.BackupDetails{ BackupMetadata: *metadata, - Key: getKey(b.routineName, isFullBackup, metadata, b.removeFullBackup && isFullBackup), + Key: getKey(b.routineName, backupType, metadata), Storage: b.storage, }) } diff --git a/pkg/service/backup_backend_test.go b/pkg/service/backup_backend_test.go index b1e69e3d..7e3b8558 100644 --- a/pkg/service/backup_backend_test.go +++ b/pkg/service/backup_backend_test.go @@ -3,55 +3,28 @@ package service import ( "context" "os" - "strconv" "testing" "time" "github.com/aerospike/aerospike-backup-service/v2/pkg/model" + "github.com/stretchr/testify/require" ) -const tempFolder = "./tmp" - -func TestFullBackupRemoveFiles(t *testing.T) { +func TestFullBackupReadFiles(t *testing.T) { + tempFolder := t.TempDir() + name := "routine" backend := &BackupBackend{ - storage: &model.LocalStorage{Path: tempFolder}, - routineName: "routine", - removeFullBackup: true, - } - - path := backend.routineName + "/backup/data/source-ns1/" - _ = os.MkdirAll(path, 0744) - _ = backend.writeBackupMetadata(context.Background(), path, model.BackupMetadata{Created: time.UnixMilli(10)}) - - to := model.NewTimeBoundsTo(time.UnixMilli(1000)) - list, _ := backend.FullBackupList(context.Background(), to) - if len(list) != 1 { - t.Errorf("Expected list size 1, got %v", list) - } - t.Cleanup(func() { - _ = os.RemoveAll(tempFolder) - }) -} - -func TestFullBackupKeepFiles(t *testing.T) { - backend := &BackupBackend{ - storage: &model.LocalStorage{Path: tempFolder}, - routineName: "routine", - removeFullBackup: false, + storage: &model.LocalStorage{Path: tempFolder}, + routineName: name, } for _, t := range []int64{10, 20, 30} { - path := backend.routineName + "/backup/" + strconv.FormatInt(t, 10) + "/data/source-ns1/" + path := getFullPath(name, "source-ns1", time.UnixMilli(t)) _ = os.MkdirAll(path, 0744) _ = backend.writeBackupMetadata(context.Background(), path, model.BackupMetadata{Created: time.UnixMilli(t)}) } bounds := model.NewTimeBoundsTo(time.UnixMilli(25)) list, _ := backend.FullBackupList(context.Background(), bounds) - if len(list) != 2 { - t.Errorf("Expected list size 2, got %v", list) - } - t.Cleanup(func() { - _ = os.RemoveAll(tempFolder) - }) + require.Equal(t, 2, len(list)) } diff --git a/pkg/service/backup_backends_holder.go b/pkg/service/backup_backends_holder.go index 06a15cf7..acc4ec06 100644 --- a/pkg/service/backup_backends_holder.go +++ b/pkg/service/backup_backends_holder.go @@ -31,7 +31,7 @@ func (b *BackendHolderImpl) Init(config *model.Config) { routines := config.BackupRoutines b.data = make(map[string]*BackupBackend, len(routines)) for routineName, routine := range routines { - b.data[routineName] = newBackend(routineName, routine) + b.data[routineName] = newBackend(routineName, routine.Storage) } } diff --git a/pkg/service/backup_go.go b/pkg/service/backup_go.go index 8b3f2d58..d9068f1a 100644 --- a/pkg/service/backup_go.go +++ b/pkg/service/backup_go.go @@ -35,8 +35,7 @@ func (b *BackupGo) BackupRun( ) (BackupHandler, error) { config := makeBackupConfig(namespace, backupRoutine, backupPolicy, timebounds, secretAgent) - writerFactory, err := storage.CreateWriter(ctx, s, path, false, - backupPolicy.RemoveFiles.RemoveFullBackup(), false) + writerFactory, err := storage.CreateWriter(ctx, s, path, false, false, false) if err != nil { return nil, fmt.Errorf("failed to create backup writer, %w", err) } diff --git a/pkg/service/backup_list_reader.go b/pkg/service/backup_list_reader.go index 7ee3b286..b2f36433 100644 --- a/pkg/service/backup_list_reader.go +++ b/pkg/service/backup_list_reader.go @@ -27,6 +27,7 @@ type BackupListReader interface { FindLastFullBackup(toTime time.Time) ([]model.BackupDetails, error) // FindIncrementalBackupsForNamespace returns all incremental backups in given range, sorted by time. - FindIncrementalBackupsForNamespace(ctx context.Context, bounds model.TimeBounds, namespace string, + FindIncrementalBackupsForNamespace( + ctx context.Context, bounds model.TimeBounds, namespace string, ) ([]model.BackupDetails, error) } diff --git a/pkg/service/backup_routine_handler.go b/pkg/service/backup_routine_handler.go index 7014d478..4465f0d9 100644 --- a/pkg/service/backup_routine_handler.go +++ b/pkg/service/backup_routine_handler.go @@ -31,6 +31,7 @@ type BackupRoutineHandler struct { clientManager aerospike.ClientManager logger *slog.Logger clusterConfigWriter ClusterConfigWriter + retentionManager RetentionManager // backup handlers by namespace fullBackupHandlers map[string]CancelableBackupHandler @@ -70,8 +71,6 @@ type CancelableBackupHandler interface { type backupMetadataManager interface { // writeBackupMetadata writes backup metadata to storage after successful backup. writeBackupMetadata(ctx context.Context, path string, metadata model.BackupMetadata) error - // findLastRun scans storage for last backup time (on startup or config apply). - findLastRun(ctx context.Context) lastBackupRun } // ClusterConfigWriter handles writing cluster configuration to storage. @@ -88,7 +87,7 @@ func newBackupRoutineHandler( clientManager aerospike.ClientManager, backupService Backup, routineName string, - backupBackend backupMetadataManager, + backupBackend *BackupBackend, lastRun lastBackupRun, ) *BackupRoutineHandler { backupRoutine := config.BackupRoutines[routineName] @@ -119,6 +118,8 @@ func newBackupRoutineHandler( backupPolicy, logger), logger: logger, + retentionManager: NewBackupRetentionManager( + backupBackend, backupStorage, routineName, backupPolicy.RetentionPolicy), } } @@ -159,12 +160,13 @@ func (h *BackupRoutineHandler) runFullBackupInternal(ctx context.Context, now ti h.lastRun.full = now - if h.backupFullPolicy.RemoveFiles.RemoveIncrementalBackup() { - h.deleteFolder(ctx, getIncrementalRoot(h.routineName)) - } - h.clusterConfigWriter.Write(ctx, client.AerospikeClient(), now) + err = h.retentionManager.deleteOldBackups(ctx) + if err != nil { + return fmt.Errorf("failed to clean up old backups: %w", err) + } + return nil } @@ -194,7 +196,7 @@ func (h *BackupRoutineHandler) prepareCluster(retry executor) (*backup.Client, [ func (h *BackupRoutineHandler) startNamespaceBackup( ctx context.Context, namespace string, now time.Time, client *backup.Client, ) CancelableBackupHandler { - backupFolder := getFullPath(h.routineName, h.backupFullPolicy, namespace, now) + backupFolder := getFullPath(h.routineName, namespace, now) timebounds := h.createTimebounds(true, now) return startBackup( @@ -333,7 +335,7 @@ func (h *BackupRoutineHandler) runIncrementalBackupInternal(ctx context.Context, func (h *BackupRoutineHandler) startIncrementalNamespaceBackup( ctx context.Context, namespace string, now time.Time, client *backup.Client, ) CancelableBackupHandler { - backupFolder := getIncrementalPathForNamespace(h.routineName, namespace, now) + backupFolder := getIncrementalPath(h.routineName, namespace, now) timebounds := h.createTimebounds(false, now) return startBackup( diff --git a/pkg/service/backup_routine_handler_test.go b/pkg/service/backup_routine_handler_test.go index 73690cff..119b08ea 100644 --- a/pkg/service/backup_routine_handler_test.go +++ b/pkg/service/backup_routine_handler_test.go @@ -77,11 +77,6 @@ func (m *mockMetadataWriter) writeBackupMetadata( return args.Error(0) } -func (m *mockMetadataWriter) findLastRun(context.Context) lastBackupRun { - args := m.Called() - return args.Get(0).(lastBackupRun) -} - type mockClusterConfigWriter struct { mock.Mock } @@ -90,11 +85,21 @@ func (m *mockClusterConfigWriter) Write(ctx context.Context, client backup.Aeros m.Called(ctx, client, timestamp) } +type mockRetentionManager struct { + mock.Mock +} + +func (m *mockRetentionManager) deleteOldBackups(ctx context.Context) error { + args := m.Called(ctx) + return args.Error(0) +} + func setupTestHandler( backupService *mockBackupService, clientManager *mockClientManager, metadataWriter *mockMetadataWriter, configWriter *mockClusterConfigWriter, + retentionManager *mockRetentionManager, ) *BackupRoutineHandler { return &BackupRoutineHandler{ namespaces: []string{"ns1", "ns2"}, @@ -112,6 +117,7 @@ func setupTestHandler( storage: &model.LocalStorage{Path: "/tmp"}, logger: slog.Default(), retry: &simpleExecutor{}, + retentionManager: retentionManager, } } @@ -120,8 +126,9 @@ func TestRunFullBackupInternal_Success(t *testing.T) { clientManager := clientManagerMock() metadataWriter := new(mockMetadataWriter) configWriter := new(mockClusterConfigWriter) + retentionManager := new(mockRetentionManager) - handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter) + handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) backupHandler := new(mockBackupHandler) backupHandler.On("Wait", mock.Anything).Return(nil) @@ -164,12 +171,15 @@ func TestRunFullBackupInternal_Success(t *testing.T) { mock.Anything, ).Return() + retentionManager.On("deleteOldBackups", mock.Anything).Return(nil) + handler.runFullBackup(context.Background(), time.Now()) clientManager.AssertExpectations(t) backupService.AssertExpectations(t) metadataWriter.AssertExpectations(t) configWriter.AssertExpectations(t) + retentionManager.AssertExpectations(t) } func TestRunFullBackupInternal_WaitError(t *testing.T) { @@ -177,8 +187,9 @@ func TestRunFullBackupInternal_WaitError(t *testing.T) { clientManager := clientManagerMock() metadataWriter := new(mockMetadataWriter) configWriter := new(mockClusterConfigWriter) + retentionManager := new(mockRetentionManager) - handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter) + handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) backupHandler := new(mockBackupHandler) expectedErr := errors.New("wait error") @@ -208,8 +219,9 @@ func TestRunIncrementalBackup_NoFullBackupYet(t *testing.T) { clientManager := clientManagerMock() metadataWriter := new(mockMetadataWriter) configWriter := new(mockClusterConfigWriter) + retentionManager := new(mockRetentionManager) - handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter) + handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) handler.lastRun = lastBackupRun{} // Ensure empty lastRun handler.runIncrementalBackup(context.Background(), time.Now()) @@ -223,8 +235,9 @@ func TestRunIncrementalBackup_SkipIfFullBackupInProgress(t *testing.T) { clientManager := clientManagerMock() metadataWriter := new(mockMetadataWriter) configWriter := new(mockClusterConfigWriter) + retentionManager := new(mockRetentionManager) - handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter) + handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) handler.lastRun = lastBackupRun{ full: time.Now(), // Set last full run } @@ -242,8 +255,9 @@ func TestRunIncrementalBackup_SkipIfIncrementalBackupInProgress(t *testing.T) { clientManager := clientManagerMock() metadataWriter := new(mockMetadataWriter) configWriter := new(mockClusterConfigWriter) + retentionManager := new(mockRetentionManager) - handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter) + handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) handler.lastRun = lastBackupRun{ full: time.Now(), // Set last full run } @@ -261,8 +275,9 @@ func TestRunIncrementalBackup_ClientError(t *testing.T) { clientManager := new(mockClientManager) metadataWriter := new(mockMetadataWriter) configWriter := new(mockClusterConfigWriter) + retentionManager := new(mockRetentionManager) - handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter) + handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) handler.lastRun = lastBackupRun{ full: time.Now(), } @@ -281,8 +296,9 @@ func TestRunIncrementalBackup_Success(t *testing.T) { clientManager := clientManagerMock() metadataWriter := new(mockMetadataWriter) configWriter := new(mockClusterConfigWriter) + retentionManager := new(mockRetentionManager) - handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter) + handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) now := time.Now() lastRun := now.Add(-1 * time.Hour) handler.lastRun = lastBackupRun{ @@ -338,8 +354,9 @@ func TestRunFullBackup_PartialFailure(t *testing.T) { metadataWriter := new(mockMetadataWriter) configWriter := new(mockClusterConfigWriter) clientManager := clientManagerMock() + retentionManager := new(mockRetentionManager) - handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter) + handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) successHandler := new(mockBackupHandler) successHandler.On("Wait", mock.Anything).Return(nil) @@ -378,6 +395,7 @@ func TestRunFullBackup_PartialFailure(t *testing.T) { mock.Anything, mock.Anything, ).Return(nil).Times(1) // Only for ns1 + retentionManager.On("deleteOldBackups", mock.Anything).Return(nil) // Run full backup and expect an error for one of the namespaces handler.runFullBackup(context.Background(), time.Now()) diff --git a/pkg/service/cluster_config_writer.go b/pkg/service/cluster_config_writer.go index 3bad9ff9..bdc4cd5b 100644 --- a/pkg/service/cluster_config_writer.go +++ b/pkg/service/cluster_config_writer.go @@ -45,7 +45,7 @@ func (w *DefaultClusterConfigWriter) Write( } for i, info := range infos { - confFilePath := getConfigurationPath(w.routineName, w.policy, timestamp, i) + confFilePath := getConfigurationPath(w.routineName, timestamp, i) err := storage.WriteFile(ctx, w.storage, confFilePath, []byte(info)) if err != nil { w.logger.Error("Failed to write cluster configuration backup", diff --git a/pkg/service/paths.go b/pkg/service/paths.go index d7d00121..a52eb45c 100644 --- a/pkg/service/paths.go +++ b/pkg/service/paths.go @@ -18,46 +18,34 @@ const ( dataDirectory = "data" ) -func getBackupRootPath(routineName string, isFullBackup bool) string { - if isFullBackup { +func getBackupRootPath(routineName string, backupType jobType) string { + if backupType == jobTypeFull { return filepath.Join(routineName, fullBackupDirectory) } + return filepath.Join(routineName, incrementalBackupDirectory) } -func getIncrementalRoot(routineName string) string { - return getBackupRootPath(routineName, false) +func getTimestampPath(routineName string, timestamp time.Time, backupType jobType) string { + return filepath.Join(getBackupRootPath(routineName, backupType), formatTimestamp(timestamp)) } -func getFullPath(routineName string, policy *model.BackupPolicy, namespace string, timestamp time.Time) string { - if policy.RemoveFiles.RemoveFullBackup() { - return filepath.Join(routineName, fullBackupDirectory, dataDirectory, namespace) - } +func getFullPath(routineName string, namespace string, timestamp time.Time) string { return filepath.Join(routineName, fullBackupDirectory, formatTimestamp(timestamp), dataDirectory, namespace) } -func getIncrementalPathForNamespace(routineName string, namespace string, timestamp time.Time) string { +func getIncrementalPath(routineName string, namespace string, timestamp time.Time) string { return filepath.Join(routineName, incrementalBackupDirectory, formatTimestamp(timestamp), dataDirectory, namespace) } -func getConfigurationPath(routineName string, policy *model.BackupPolicy, timestamp time.Time, index int) string { - if policy.RemoveFiles.RemoveFullBackup() { - return filepath.Join(routineName, fullBackupDirectory, configurationBackupDirectory, getConfigFileName(index)) - } +func getConfigurationPath(routineName string, timestamp time.Time, index int) string { return filepath.Join(routineName, fullBackupDirectory, formatTimestamp(timestamp), configurationBackupDirectory, getConfigFileName(index)) } -func getKey(routineName string, isFullBackup bool, metadata *model.BackupMetadata, noTimestampInPath bool) string { - backupDir := fullBackupDirectory - if !isFullBackup { - backupDir = incrementalBackupDirectory - } - - if noTimestampInPath { - return filepath.Join(routineName, backupDir, dataDirectory, metadata.Namespace) - } - return filepath.Join(routineName, backupDir, formatTimestamp(metadata.Created), dataDirectory, metadata.Namespace) +func getKey(routineName string, backupType jobType, metadata *model.BackupMetadata) string { + rootPath := getBackupRootPath(routineName, backupType) + return filepath.Join(rootPath, formatTimestamp(metadata.Created), dataDirectory, metadata.Namespace) } func formatTimestamp(t time.Time) string { diff --git a/pkg/service/retention_manager.go b/pkg/service/retention_manager.go new file mode 100644 index 00000000..499e70bb --- /dev/null +++ b/pkg/service/retention_manager.go @@ -0,0 +1,130 @@ +package service + +import ( + "context" + "errors" + "fmt" + "slices" + "time" + + "github.com/aerospike/aerospike-backup-service/v2/pkg/model" + "github.com/aerospike/aerospike-backup-service/v2/pkg/service/storage" +) + +// RetentionManager defines the interface for deleting old backups. +type RetentionManager interface { + // Run runs the retention manager. It deletes old backups based on the configured retention policy. + deleteOldBackups(ctx context.Context) error +} + +type RetentionManagerImpl struct { + backend BackupListReader + storage model.Storage + routineName string + policy *model.RetentionPolicy +} + +func NewBackupRetentionManager( + backend BackupListReader, + storage model.Storage, + routineName string, + policy *model.RetentionPolicy, +) RetentionManager { + return &RetentionManagerImpl{ + backend: backend, + storage: storage, + routineName: routineName, + policy: policy, + } +} + +func (e *RetentionManagerImpl) deleteOldBackups(ctx context.Context) error { + if e.policy == nil || (e.policy.FullBackups == nil && e.policy.IncrBackups == nil) { + return nil // Retention policy is not enabled, do nothing. + } + + fullBackups, err := e.backend.FullBackupList(ctx, model.TimeBounds{}) + if err != nil { + return fmt.Errorf("failed to get full backups: %w", err) + } + + timestamps := getTimestamps(fullBackups) + if e.policy.FullBackups != nil { + if err := e.deleteFullBackups(ctx, timestamps, *e.policy.FullBackups); err != nil { + return fmt.Errorf("failed to delete excess full backups: %w", err) + } + } + + if e.policy.IncrBackups != nil { + if err := e.deleteIncrementalBackups(ctx, timestamps, *e.policy.IncrBackups); err != nil { + return fmt.Errorf("failed to delete excess incremental backups: %w", err) + } + } + + return nil +} + +func (e *RetentionManagerImpl) deleteFullBackups( + ctx context.Context, timestamps []time.Time, retainCount int, +) error { + if len(timestamps) <= retainCount { + return nil + } + + var errs error + for _, t := range timestamps[:len(timestamps)-retainCount] { + path := getTimestampPath(e.routineName, t, jobTypeFull) + if err := storage.DeleteFolder(ctx, e.storage, path); err != nil { + errs = errors.Join(errs, fmt.Errorf("failed to delete folder at %v: %w", path, err)) + } + } + + return errs +} + +func (e *RetentionManagerImpl) deleteIncrementalBackups( + ctx context.Context, timestamps []time.Time, retainCount int, +) error { + if len(timestamps) <= retainCount { + return nil + } + + if retainCount == 0 { // Delete all incremental backups. + return storage.DeleteFolder(ctx, e.storage, getBackupRootPath(e.routineName, jobTypeIncremental)) + } + + earliestToKeep := timestamps[len(timestamps)-retainCount] + incrBackups, err := e.backend.IncrementalBackupList(ctx, model.NewTimeBoundsTo(earliestToKeep)) + if err != nil { + return fmt.Errorf("failed to fetch incremental backups: %w", err) + } + + var errs error + for _, b := range incrBackups { + path := getTimestampPath(e.routineName, b.Created, jobTypeIncremental) + if err := storage.DeleteFolder(ctx, e.storage, path); err != nil { + errs = errors.Join(errs, fmt.Errorf("failed to delete folder at %v: %w", path, err)) + } + } + + return errs +} + +func getTimestamps(backups []model.BackupDetails) []time.Time { + timeSet := make(map[time.Time]struct{}, len(backups)) + + for _, obj := range backups { + timeSet[obj.Created] = struct{}{} + } + + times := make([]time.Time, 0, len(timeSet)) + for t := range timeSet { + times = append(times, t) + } + + slices.SortFunc(times, func(a, b time.Time) int { + return a.Compare(b) + }) + + return times +} diff --git a/pkg/service/retention_manager_test.go b/pkg/service/retention_manager_test.go new file mode 100644 index 00000000..d3022bb6 --- /dev/null +++ b/pkg/service/retention_manager_test.go @@ -0,0 +1,227 @@ +package service + +import ( + "context" + "testing" + "time" + + "github.com/aerospike/aerospike-backup-service/v2/pkg/model" + "github.com/aerospike/aerospike-backup-service/v2/pkg/util" + "github.com/stretchr/testify/require" +) + +func TestMultipleBackups(t *testing.T) { + dir := t.TempDir() + s := &model.LocalStorage{Path: dir} + routineName := "routine1" + namespace := "ns1" + backend := newBackend(routineName, s) + + for _, t := range []int64{10, 20, 30, 40, 50} { + path := getFullPath(routineName, namespace, time.UnixMilli(t)) + metadata := model.BackupMetadata{Created: time.UnixMilli(t), Namespace: namespace} + _ = backend.writeBackupMetadata(context.Background(), path, metadata) + } + + for _, t := range []int64{22, 26, 32, 36, 42, 46, 52, 56} { + path := getIncrementalPath(routineName, namespace, time.UnixMilli(t)) + metadata := model.BackupMetadata{Created: time.UnixMilli(t), Namespace: namespace} + _ = backend.writeBackupMetadata(context.Background(), path, metadata) + } + + r := NewBackupRetentionManager( + backend, + s, + routineName, + &model.RetentionPolicy{ + FullBackups: util.Ptr(3), + IncrBackups: util.Ptr(2), + }, + ) + + ctx := context.Background() + err := r.deleteOldBackups(ctx) + require.NoError(t, err) + + full, _ := backend.FullBackupList(ctx, model.TimeBounds{}) + require.Equal(t, len(full), 3) // 30, 40, 50 + + incr, _ := backend.IncrementalBackupList(ctx, model.TimeBounds{}) + require.Len(t, incr, 4) // 42, 46, 52, 56 +} + +func TestZeroRetentionLimit(t *testing.T) { + dir := t.TempDir() + s := &model.LocalStorage{Path: dir} + routineName := "routine1" + namespace := "ns1" + backend := newBackend(routineName, s) + + for _, t := range []int64{10, 20, 30} { + path := getFullPath(routineName, namespace, time.UnixMilli(t)) + metadata := model.BackupMetadata{Created: time.UnixMilli(t), Namespace: namespace} + _ = backend.writeBackupMetadata(context.Background(), path, metadata) + } + + for _, t := range []int64{22, 26, 32, 36, 42, 46, 52, 56} { + path := getIncrementalPath(routineName, namespace, time.UnixMilli(t)) + metadata := model.BackupMetadata{Created: time.UnixMilli(t), Namespace: namespace} + _ = backend.writeBackupMetadata(context.Background(), path, metadata) + } + + r := NewBackupRetentionManager( + backend, + s, + routineName, + &model.RetentionPolicy{ + IncrBackups: util.Ptr(0), + }, + ) + + ctx := context.Background() + err := r.deleteOldBackups(ctx) + require.NoError(t, err) + + // all incremental backups should be deleted. + + full, _ := backend.FullBackupList(ctx, model.TimeBounds{}) + require.Equal(t, len(full), 3) + + incr, _ := backend.IncrementalBackupList(ctx, model.TimeBounds{}) + require.Empty(t, incr) +} + +func TestNoBackups(t *testing.T) { + dir := t.TempDir() + s := &model.LocalStorage{Path: dir} + routineName := "routine1" + backend := newBackend(routineName, s) + + r := NewBackupRetentionManager( + backend, + s, + routineName, + &model.RetentionPolicy{ + FullBackups: util.Ptr(3), + IncrBackups: util.Ptr(2), + }, + ) + + ctx := context.Background() + err := r.deleteOldBackups(ctx) + require.NoError(t, err) + + // nothing should be deleted as there are no backups. + + full, _ := backend.FullBackupList(ctx, model.TimeBounds{}) + require.Empty(t, full) + + incr, _ := backend.IncrementalBackupList(ctx, model.TimeBounds{}) + require.Empty(t, incr) +} + +func TestExactRetentionLimit(t *testing.T) { + dir := t.TempDir() + s := &model.LocalStorage{Path: dir} + routineName := "routine1" + namespace := "ns1" + backend := newBackend(routineName, s) + + for _, t := range []int64{10, 20, 30} { + path := getFullPath(routineName, namespace, time.UnixMilli(t)) + metadata := model.BackupMetadata{Created: time.UnixMilli(t), Namespace: namespace} + _ = backend.writeBackupMetadata(context.Background(), path, metadata) + } + + for _, t := range []int64{32, 36} { + path := getIncrementalPath(routineName, namespace, time.UnixMilli(t)) + metadata := model.BackupMetadata{Created: time.UnixMilli(t), Namespace: namespace} + _ = backend.writeBackupMetadata(context.Background(), path, metadata) + } + + r := NewBackupRetentionManager( + backend, + s, + routineName, + &model.RetentionPolicy{ + FullBackups: util.Ptr(3), + IncrBackups: util.Ptr(2), + }, + ) + + ctx := context.Background() + err := r.deleteOldBackups(ctx) + require.NoError(t, err) + + // nothing should be deleted as retention limits are exactly met. + + full, _ := backend.FullBackupList(ctx, model.TimeBounds{}) + require.Len(t, full, 3) + + incr, _ := backend.IncrementalBackupList(ctx, model.TimeBounds{}) + require.Len(t, incr, 2) +} + +func TestNilRetentionPolicy(t *testing.T) { + dir := t.TempDir() + s := &model.LocalStorage{Path: dir} + routineName := "routine1" + namespace := "ns1" + backend := newBackend(routineName, s) + + for _, t := range []int64{10, 20, 30} { + path := getFullPath(routineName, namespace, time.UnixMilli(t)) + metadata := model.BackupMetadata{Created: time.UnixMilli(t), Namespace: namespace} + _ = backend.writeBackupMetadata(context.Background(), path, metadata) + } + + r := NewBackupRetentionManager( + backend, + s, + routineName, + nil, // No retention policy + ) + + ctx := context.Background() + err := r.deleteOldBackups(ctx) + require.NoError(t, err) + + // Nothing should be deleted since the retention policy is nil. + + full, _ := backend.FullBackupList(ctx, model.TimeBounds{}) + require.Len(t, full, 3) +} + +func TestHighRetentionLimits(t *testing.T) { + dir := t.TempDir() + s := &model.LocalStorage{Path: dir} + routineName := "routine1" + namespace := "ns1" + backend := newBackend(routineName, s) + + for _, t := range []int64{10, 20, 30} { + path := getFullPath(routineName, namespace, time.UnixMilli(t)) + metadata := model.BackupMetadata{Created: time.UnixMilli(t), Namespace: namespace} + _ = backend.writeBackupMetadata(context.Background(), path, metadata) + } + + r := NewBackupRetentionManager( + backend, + s, + routineName, + &model.RetentionPolicy{ + FullBackups: util.Ptr(10), + IncrBackups: util.Ptr(10), + }, + ) + + ctx := context.Background() + err := r.deleteOldBackups(ctx) + require.NoError(t, err) + + // Retention limits are higher than the number of existing backups. + // Nothing should be deleted. + + full, _ := backend.FullBackupList(ctx, model.TimeBounds{}) + require.Len(t, full, 3) +}