Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APPS-1243 Improve list backups performance #202

Merged
merged 15 commits into from
Aug 5, 2024
Merged
11 changes: 6 additions & 5 deletions internal/server/restore_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/aerospike/backup/pkg/model"
"github.com/aerospike/backup/pkg/service"
)

// @Summary Trigger an asynchronous full restore operation.
Expand Down Expand Up @@ -38,7 +39,7 @@ func (ws *HTTPServer) restoreFullHandler(w http.ResponseWriter, r *http.Request)
Dir: request.SourceStorage.Path,
}

jobID, err := ws.restoreService.Restore(requestInternal)
jobID, err := ws.restoreManager.Restore(requestInternal)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand Down Expand Up @@ -77,7 +78,7 @@ func (ws *HTTPServer) restoreIncrementalHandler(w http.ResponseWriter, r *http.R
RestoreRequest: request,
Dir: request.SourceStorage.Path,
}
jobID, err := ws.restoreService.Restore(requestInternal)
jobID, err := ws.restoreManager.Restore(requestInternal)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand Down Expand Up @@ -113,7 +114,7 @@ func (ws *HTTPServer) restoreByTimeHandler(w http.ResponseWriter, r *http.Reques
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
jobID, err := ws.restoreService.RestoreByTime(&request)
jobID, err := ws.restoreManager.RestoreByTime(&request)
if err != nil {
slog.Error("Restore by timestamp failed", "routine", request.Routine, "err", err)
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down Expand Up @@ -148,7 +149,7 @@ func (ws *HTTPServer) restoreStatusHandler(w http.ResponseWriter, r *http.Reques
return
}
w.Header().Set("Content-Type", "application/json")
status, err := ws.restoreService.JobStatus(jobID)
status, err := ws.restoreManager.JobStatus(service.RestoreJobID(jobID))
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
} else {
Expand Down Expand Up @@ -198,7 +199,7 @@ func (ws *HTTPServer) retrieveConfig(w http.ResponseWriter, r *http.Request) {
return
}

buf, err := ws.restoreService.RetrieveConfiguration(name, time.UnixMilli(timestamp))
buf, err := ws.restoreManager.RetrieveConfiguration(name, time.UnixMilli(timestamp))
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
Expand Down
4 changes: 2 additions & 2 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type HTTPServer struct {
rateLimiter *IPRateLimiter
whiteList *ipWhiteList
scheduler quartz.Scheduler
restoreService service.RestoreService
restoreManager service.RestoreManager
backupBackends service.BackendsHolder
handlerHolder service.BackupHandlerHolder
}
Expand Down Expand Up @@ -111,7 +111,7 @@ func NewHTTPServer(
rateLimiter: rateLimiter,
whiteList: newIPWhiteList(serverConfig.GetRateOrDefault().GetWhiteListOrDefault()),
scheduler: scheduler,
restoreService: service.NewRestoreMemory(backends, config, shared.NewRestoreGo()),
restoreManager: service.NewRestoreManager(backends, config, shared.NewRestoreGo()),
backupBackends: backends,
handlerHolder: handlerHolder,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/model/aerospike_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type AerospikeCluster struct {
Credentials *Credentials `yaml:"credentials,omitempty" json:"credentials,omitempty"`
// The cluster TLS configuration.
TLS *TLS `yaml:"tls,omitempty" json:"tls,omitempty"`
// specifies the size of the Aerospike Connection Queue per node
// Specifies the size of the Aerospike Connection Queue per node.
ConnectionQueueSize *int `yaml:"connection-queue-size,omitempty" json:"connection-queue-size,omitempty" example:"100"`
}

Expand Down
72 changes: 68 additions & 4 deletions pkg/service/backup_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ import (
"fmt"
"log/slog"
"path/filepath"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/aerospike/backup/pkg/model"
"github.com/aerospike/backup/pkg/util"
"gopkg.in/yaml.v3"
)

Expand Down Expand Up @@ -109,12 +112,67 @@ func (b *BackupBackend) FullBackupList(timebounds *model.TimeBounds) ([]model.Ba
return b.fromSubfolders(timebounds, b.fullBackupsPath)
}

// FindLastFullBackup returns last full backup prior to given time.
func (b *BackupBackend) FindLastFullBackup(toTime time.Time) ([]model.BackupDetails, error) {
reugn marked this conversation as resolved.
Show resolved Hide resolved
fullBackupList, err := b.FullBackupList(model.NewTimeBoundsTo(toTime))
if err != nil {
return nil, fmt.Errorf("cannot read full backup list: %w", err)
}

fullBackup := latestFullBackupBeforeTime(fullBackupList, toTime) // it's a list of namespaces
if len(fullBackup) == 0 {
return nil, fmt.Errorf("%w: %s", errBackupNotFound, toTime)
}
return fullBackup, nil
}

// latestFullBackupBeforeTime returns list of backups with same creation time, latest before upperBound.
func latestFullBackupBeforeTime(allBackups []model.BackupDetails, upperBound time.Time) []model.BackupDetails {
var result []model.BackupDetails
var latestTime time.Time
for i := range allBackups {
current := &allBackups[i]
if current.Created.After(upperBound) {
continue
}

if len(result) == 0 || latestTime.Before(current.Created) {
latestTime = current.Created
result = []model.BackupDetails{*current}
} else if current.Created.Equal(latestTime) {
result = append(result, *current)
}
}
return result
}

// FindIncrementalBackupsForNamespace returns all incremental backups in given range, sorted by time.
func (b *BackupBackend) FindIncrementalBackupsForNamespace(bounds *model.TimeBounds, namespace string,
reugn marked this conversation as resolved.
Show resolved Hide resolved
) ([]model.BackupDetails, error) {
allIncrementalBackupList, err := b.IncrementalBackupList(bounds)
if err != nil {
return nil, err
}
var filteredIncrementalBackups []model.BackupDetails
for _, b := range allIncrementalBackupList {
if b.Namespace == namespace {
filteredIncrementalBackups = append(filteredIncrementalBackups, b)
}
}
// Sort in place
sort.Slice(filteredIncrementalBackups, func(i, j int) bool {
return filteredIncrementalBackups[i].Created.Before(filteredIncrementalBackups[j].Created)
})

return filteredIncrementalBackups, nil
}

func (b *BackupBackend) detailsFromPaths(timebounds *model.TimeBounds, useCache bool,
paths ...string) []model.BackupDetails {
// each path contains a backup of specific time
backupDetails := make([]model.BackupDetails, 0, len(paths))
for _, path := range paths {
namespaces, err := b.lsDir(filepath.Join(path, model.DataDirectory))
namespaces, err := b.lsDir(filepath.Join(path, model.DataDirectory), nil)
if err != nil {
slog.Warn("Cannot list backup dir", "path", path, "err", err)
continue
Expand All @@ -133,12 +191,18 @@ func (b *BackupBackend) detailsFromPaths(timebounds *model.TimeBounds, useCache
return backupDetails
}

func (b *BackupBackend) fromSubfolders(timebounds *model.TimeBounds,
backupFolder string) ([]model.BackupDetails, error) {
subfolders, err := b.lsDir(backupFolder)
func (b *BackupBackend) fromSubfolders(timebounds *model.TimeBounds, backupFolder string,
) ([]model.BackupDetails, error) {
var after *string
if timebounds.FromTime != nil {
after = util.Ptr(formatTime(*timebounds.FromTime))
}

subfolders, err := b.lsDir(backupFolder, after)
if err != nil {
return nil, err
}

return b.detailsFromPaths(timebounds, true, subfolders...), nil
}

Expand Down
12 changes: 11 additions & 1 deletion pkg/service/backup_list_reader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package service

import "github.com/aerospike/backup/pkg/model"
import (
"time"

"github.com/aerospike/backup/pkg/model"
)

// BackupListReader allows to read list of existing backups
type BackupListReader interface {
Expand All @@ -16,4 +20,10 @@ type BackupListReader interface {

// ReadClusterConfiguration return backed up cluster configuration as a compressed zip.
ReadClusterConfiguration(path string) ([]byte, error)

// FindLastFullBackup returns last full backup prior to given time.
FindLastFullBackup(toTime time.Time) ([]model.BackupDetails, error)

// FindIncrementalBackupsForNamespace returns all incremental backups in given range, sorted by time.
FindIncrementalBackupsForNamespace(bounds *model.TimeBounds, namespace string) ([]model.BackupDetails, error)
}
18 changes: 10 additions & 8 deletions pkg/service/jobs_holder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,29 @@ import (
"github.com/aerospike/backup/pkg/model"
)

type RestoreJobID int

type JobsHolder struct {
sync.Mutex
restoreJobs map[int]*model.RestoreJobStatus
restoreJobs map[RestoreJobID]*model.RestoreJobStatus
}

func NewJobsHolder() *JobsHolder {
return &JobsHolder{
restoreJobs: make(map[int]*model.RestoreJobStatus),
restoreJobs: make(map[RestoreJobID]*model.RestoreJobStatus),
}
}

func (h *JobsHolder) newJob() int {
func (h *JobsHolder) newJob() RestoreJobID {
// #nosec G404
jobID := rand.Int()
jobID := RestoreJobID(rand.Int())
h.Lock()
defer h.Unlock()
h.restoreJobs[jobID] = model.NewRestoreJobStatus()
return jobID
}

func (h *JobsHolder) getStatus(jobID int) (*model.RestoreJobStatus, error) {
func (h *JobsHolder) getStatus(jobID RestoreJobID) (*model.RestoreJobStatus, error) {
h.Lock()
defer h.Unlock()
jobStatus, exists := h.restoreJobs[jobID]
Expand All @@ -39,7 +41,7 @@ func (h *JobsHolder) getStatus(jobID int) (*model.RestoreJobStatus, error) {
return &copyJob, nil
}

func (h *JobsHolder) increaseStats(jobID int, newStats *model.RestoreResult) {
func (h *JobsHolder) increaseStats(jobID RestoreJobID, newStats *model.RestoreResult) {
h.Lock()
defer h.Unlock()
current, found := h.restoreJobs[jobID]
Expand All @@ -57,7 +59,7 @@ func (h *JobsHolder) increaseStats(jobID int, newStats *model.RestoreResult) {
}
}

func (h *JobsHolder) setDone(jobID int) {
func (h *JobsHolder) setDone(jobID RestoreJobID) {
h.Lock()
defer h.Unlock()
current, found := h.restoreJobs[jobID]
Expand All @@ -66,7 +68,7 @@ func (h *JobsHolder) setDone(jobID int) {
}
}

func (h *JobsHolder) setFailed(jobID int, err error) {
func (h *JobsHolder) setFailed(jobID RestoreJobID, err error) {
h.Lock()
defer h.Unlock()
current, found := h.restoreJobs[jobID]
Expand Down
50 changes: 38 additions & 12 deletions pkg/service/os_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"
"os"
"path/filepath"
"sort"
"strings"

"github.com/aerospike/backup/pkg/model"
Expand Down Expand Up @@ -78,23 +79,48 @@ func (o *OSDiskAccessor) write(filePath string, data []byte) error {
return os.WriteFile(filePath, data, 0600)
}

func (o *OSDiskAccessor) lsDir(path string) ([]string, error) {
content, err := os.ReadDir(path)
func (o *OSDiskAccessor) lsDir(path string, after *string) ([]string, error) {
var result []string

err := filepath.WalkDir(path, func(p string, d fs.DirEntry, err error) error {
if err != nil {
return err
}

// Skip the root directory
if p == path {
return nil
}

// We're only interested in directories
if !d.IsDir() {
return nil
}

// Get the relative path
relPath, err := filepath.Rel(path, p)
if err != nil {
return fmt.Errorf("error getting relative path: %w", err)
}

// If 'after' is set, skip directories that come before to it lexicographically
if after != nil && relPath < *after {
return nil
}

result = append(result, p)
return filepath.SkipDir // Don't descend into this directory
})

if err != nil {
if os.IsNotExist(err) {
return []string{}, nil
}
return nil, err
return nil, fmt.Errorf("error walking directory: %w", err)
}

var onlyDirs []string
for _, c := range content {
if c.IsDir() {
fullPath := filepath.Join(path, c.Name())
onlyDirs = append(onlyDirs, fullPath)
}
}
return onlyDirs, nil
sort.Strings(result) // Ensure consistent ordering
return result, nil
}

func (o *OSDiskAccessor) lsFiles(path string) ([]string, error) {
Expand Down Expand Up @@ -124,7 +150,7 @@ func (o *OSDiskAccessor) DeleteFolder(pathToDelete string) error {
}

parentDir := filepath.Dir(pathToDelete)
lsDir, err := o.lsDir(parentDir)
lsDir, err := o.lsDir(parentDir, nil)
if err != nil {
return err
}
Expand Down
Loading
Loading