Skip to content

Commit

Permalink
better clear incrementals
Browse files Browse the repository at this point in the history
  • Loading branch information
korotkov-aerospike committed Jul 31, 2024
1 parent f977ed6 commit 6c4d873
Showing 1 changed file with 39 additions and 26 deletions.
65 changes: 39 additions & 26 deletions pkg/service/backup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,9 @@ func newBackupHandler(config *model.Config, routineName string, backupBackend *B
secretAgent = config.SecretAgents[*backupRoutine.SecretAgent]
}

namespaces := backupRoutine.Namespaces
if len(namespaces) == 0 {
var err error
namespaces, err = getAllNamespacesOfCluster(cluster)
if err != nil {
return nil, fmt.Errorf("failed to get namespaces: %w", err)
}
namespaces, err := getNamespacesToBackup(backupRoutine.Namespaces, cluster)
if err != nil {
return nil, err
}

return &BackupHandler{
Expand All @@ -75,6 +71,14 @@ func newBackupHandler(config *model.Config, routineName string, backupBackend *B
}, nil
}

func getNamespacesToBackup(namespaces []string, cluster *model.AerospikeCluster) ([]string, error) {
if len(namespaces) == 0 {
return getAllNamespacesOfCluster(cluster)
}

return namespaces, nil
}

func (h *BackupHandler) runFullBackup(ctx context.Context, now time.Time) {
h.retry.retry(
func() error { return h.runFullBackupInternal(ctx, now) },
Expand Down Expand Up @@ -266,7 +270,7 @@ func (h *BackupHandler) startIncrementalBackupForAllNamespaces(

clear(h.incrBackupHandlers)
for _, namespace := range h.namespaces {
backupFolder := getIncrementalPath(h.backend.incrementalBackupsPath, namespace, upperBound)
backupFolder := getIncrementalPathForNamespace(h.backend.incrementalBackupsPath, namespace, upperBound)
backupPath := h.backend.wrapWithPrefix(backupFolder)
handler, err := backupService.BackupRun(ctx,
h.backupRoutine, h.backupIncrPolicy, client, h.storage, h.secretAgent, *timebounds, namespace, backupPath)
Expand All @@ -280,34 +284,38 @@ func (h *BackupHandler) startIncrementalBackupForAllNamespaces(

func (h *BackupHandler) waitForIncrementalBackups(ctx context.Context, backupTimestamp time.Time) {
startTime := time.Now() // startTime is only used to measure backup time
hasBackup := false
for namespace, handler := range h.incrBackupHandlers {
err := handler.Wait(ctx)
if err != nil {
slog.Warn("Failed incremental backup", "name", h.routineName, "err", err)
incrBackupFailureCounter.Inc()
}

backupFolder := getIncrementalPath(h.backend.incrementalBackupsPath, namespace, backupTimestamp)
backupFolder := getIncrementalPathForNamespace(h.backend.incrementalBackupsPath, namespace, backupTimestamp)
// delete if the backup file is empty
if handler.GetStats().IsEmpty() {
h.deleteEmptyBackup(backupFolder, h.routineName)
} else {
if err := h.writeBackupMetadata(handler.GetStats(), backupTimestamp, namespace, backupFolder); err != nil {
slog.Error("Could not write backup metadata", "name", h.routineName,
"folder", backupFolder, "err", err)
}
h.deleteEmptyBackup(backupFolder)
continue
}
if err := h.writeBackupMetadata(handler.GetStats(), backupTimestamp, namespace, backupFolder); err != nil {
slog.Error("Could not write backup metadata", "name", h.routineName,
"folder", backupFolder, "err", err)
}
hasBackup = true
}

if !hasBackup {
h.deleteEmptyBackup(getIncrementalPath(h.backend.incrementalBackupsPath, backupTimestamp))
}

incrBackupDurationGauge.Set(float64(time.Since(startTime).Milliseconds()))
}

func (h *BackupHandler) deleteEmptyBackup(path string, routineName string) {
func (h *BackupHandler) deleteEmptyBackup(path string) {
if err := h.backend.DeleteFolder(path); err != nil {
slog.Error("Failed to delete empty backup", "name", routineName,
slog.Error("Failed to delete folder", "name", h.routineName,
"path", path, "err", err)
} else {
slog.Debug("Deleted empty backup", "name", routineName, "path", path)
}
}

Expand All @@ -331,24 +339,29 @@ func getFullPath(fullBackupsPath string, backupPolicy *model.BackupPolicy, names
if backupPolicy.RemoveFiles.RemoveFullBackup() {
return fmt.Sprintf("%s/%s/%s", fullBackupsPath, model.DataDirectory, namespace)
}
return fmt.Sprintf("%s/%s/%s/%s", fullBackupsPath, timeSuffix(now), model.DataDirectory, namespace)

return fmt.Sprintf("%s/%s/%s/%s", fullBackupsPath, formatTime(now), model.DataDirectory, namespace)
}

func getIncrementalPath(incrBackupsPath string, t time.Time) string {
return fmt.Sprintf("%s/%s", incrBackupsPath, formatTime(t))
}

func getIncrementalPath(incrBackupsPath string, namespace string, now time.Time) string {
return fmt.Sprintf("%s/%s/%s/%s", incrBackupsPath, timeSuffix(now), model.DataDirectory, namespace)
func getIncrementalPathForNamespace(incrBackupsPath string, namespace string, t time.Time) string {
return fmt.Sprintf("%s/%s/%s", getIncrementalPath(incrBackupsPath, t), model.DataDirectory, namespace)
}

func getConfigurationPath(fullBackupsPath string, backupPolicy *model.BackupPolicy, now time.Time) string {
func getConfigurationPath(fullBackupsPath string, backupPolicy *model.BackupPolicy, t time.Time) string {
if backupPolicy.RemoveFiles.RemoveFullBackup() {
path := fmt.Sprintf("%s/%s", fullBackupsPath, model.ConfigurationBackupDirectory)
return path
}

return fmt.Sprintf("%s/%s/%s", fullBackupsPath, timeSuffix(now), model.ConfigurationBackupDirectory)
return fmt.Sprintf("%s/%s/%s", fullBackupsPath, formatTime(t), model.ConfigurationBackupDirectory)
}

func timeSuffix(now time.Time) string {
return strconv.FormatInt(now.UnixMilli(), 10)
func formatTime(t time.Time) string {
return strconv.FormatInt(t.UnixMilli(), 10)
}

func (h *BackupHandler) GetCurrentStat() *model.CurrentBackups {
Expand Down

0 comments on commit 6c4d873

Please sign in to comment.