From e6e353953ff969c65fc7661ff0fd1e9c59a673b3 Mon Sep 17 00:00:00 2001 From: Anton Korotkov Date: Sun, 28 Jul 2024 14:51:05 +0300 Subject: [PATCH 1/2] pass context from the top --- pkg/service/backup_handler.go | 15 +++++++++------ pkg/service/restore_memory.go | 29 +++++++++++++++++++---------- pkg/shared/backup_go.go | 3 +-- pkg/shared/restore_go.go | 4 ++-- pkg/shared/restore_mock.go | 4 +++- pkg/shared/types.go | 6 ++++-- 6 files changed, 38 insertions(+), 23 deletions(-) diff --git a/pkg/service/backup_handler.go b/pkg/service/backup_handler.go index 72bcfb34..6fb2339f 100644 --- a/pkg/service/backup_handler.go +++ b/pkg/service/backup_handler.go @@ -105,7 +105,7 @@ func (h *BackupHandler) runFullBackupInternal(ctx context.Context, now time.Time clear(h.fullBackupHandlers) }() - err = h.startFullBackupForAllNamespaces(now, client) + err = h.startFullBackupForAllNamespaces(ctx, now, client) if err != nil { return err } @@ -127,7 +127,8 @@ func (h *BackupHandler) runFullBackupInternal(ctx context.Context, now time.Time return nil } -func (h *BackupHandler) startFullBackupForAllNamespaces(upperBound time.Time, client *aerospike.Client) error { +func (h *BackupHandler) startFullBackupForAllNamespaces( + ctx context.Context, upperBound time.Time, client *aerospike.Client) error { clear(h.fullBackupHandlers) options := shared.BackupOptions{} @@ -138,7 +139,7 @@ func (h *BackupHandler) startFullBackupForAllNamespaces(upperBound time.Time, cl for _, namespace := range h.namespaces { backupFolder := getFullPath(h.backend.fullBackupsPath, h.backupFullPolicy, namespace, upperBound) backupPath := h.backend.wrapWithPrefix(backupFolder) - handler, err := backupService.BackupRun(h.backupRoutine, h.backupFullPolicy, client, + handler, err := backupService.BackupRun(ctx, h.backupRoutine, h.backupFullPolicy, client, h.storage, h.secretAgent, options, &namespace, backupPath) if err != nil { backupFailureCounter.Inc() @@ -247,7 +248,7 @@ func (h *BackupHandler) runIncrementalBackup(ctx context.Context, now time.Time) clear(h.incrBackupHandlers) }() - h.startIncrementalBackupForAllNamespaces(client, now) + h.startIncrementalBackupForAllNamespaces(ctx, client, now) h.waitForIncrementalBackups(ctx, now) // increment incrBackupCounter metric @@ -257,7 +258,9 @@ func (h *BackupHandler) runIncrementalBackup(ctx context.Context, now time.Time) h.updateIncrementalBackupState(now) } -func (h *BackupHandler) startIncrementalBackupForAllNamespaces(client *aerospike.Client, upperBound time.Time) { +func (h *BackupHandler) startIncrementalBackupForAllNamespaces( + ctx context.Context, client *aerospike.Client, upperBound time.Time) { + fromEpoch := h.state.LastRunEpoch() options := shared.BackupOptions{ ModAfter: util.Ptr(time.Unix(0, fromEpoch)), @@ -270,7 +273,7 @@ func (h *BackupHandler) startIncrementalBackupForAllNamespaces(client *aerospike for _, namespace := range h.namespaces { backupFolder := getIncrementalPath(h.backend.incrementalBackupsPath, namespace, upperBound) backupPath := h.backend.wrapWithPrefix(backupFolder) - handler, err := backupService.BackupRun( + handler, err := backupService.BackupRun(ctx, h.backupRoutine, h.backupIncrPolicy, client, h.storage, h.secretAgent, options, &namespace, backupPath) if err != nil { incrBackupFailureCounter.Inc() diff --git a/pkg/service/restore_memory.go b/pkg/service/restore_memory.go index bbe78a9f..d2e0fb79 100644 --- a/pkg/service/restore_memory.go +++ b/pkg/service/restore_memory.go @@ -1,6 +1,7 @@ package service import ( + "context" "fmt" "log/slog" "path/filepath" @@ -38,8 +39,9 @@ func (r *RestoreMemory) Restore(request *model.RestoreRequestInternal) (int, err if err := validateStorageContainsBackup(request.SourceStorage); err != nil { return 0, err } + ctx := context.TODO() go func() { - restoreResult, err := r.restoreService.RestoreRun(request) + restoreResult, err := r.restoreService.RestoreRun(ctx, request) if err != nil { r.restoreJobs.setFailed(jobID, fmt.Errorf("failed restore operation: %w", err)) return @@ -47,6 +49,7 @@ func (r *RestoreMemory) Restore(request *model.RestoreRequestInternal) (int, err r.restoreJobs.increaseStats(jobID, restoreResult) r.restoreJobs.setDone(jobID) }() + return jobID, nil } @@ -60,17 +63,20 @@ func (r *RestoreMemory) RestoreByTime(request *model.RestoreTimestampRequest) (i return 0, fmt.Errorf("last full backup not found: %v", err) } jobID := r.restoreJobs.newJob() - go r.restoreByTimeSync(reader, request, jobID, fullBackups) + ctx := context.TODO() + go r.restoreByTimeSync(ctx, reader, request, jobID, fullBackups) + return jobID, nil } -func (r *RestoreMemory) restoreByTimeSync(backend BackupListReader, +func (r *RestoreMemory) restoreByTimeSync(ctx context.Context, + backend BackupListReader, request *model.RestoreTimestampRequest, jobID int, fullBackups []model.BackupDetails, ) { for _, nsBackup := range fullBackups { - if err := r.restoreNamespace(backend, request, jobID, nsBackup); err != nil { + if err := r.restoreNamespace(ctx, backend, request, jobID, nsBackup); err != nil { slog.Error("Failed to restore by timestamp", "routine", request.Routine, "err", err) r.restoreJobs.setFailed(jobID, err) return @@ -80,11 +86,12 @@ func (r *RestoreMemory) restoreByTimeSync(backend BackupListReader, } func (r *RestoreMemory) restoreNamespace( + ctx context.Context, backend BackupListReader, request *model.RestoreTimestampRequest, jobID int, fullBackup model.BackupDetails, ) error { - result, err := r.restoreFromPath(request, fullBackup.Key) + result, err := r.restoreFromPath(ctx, request, fullBackup.Key) if err != nil { return fmt.Errorf("could not restore full backup for namespace %s: %v", fullBackup.Namespace, err) } @@ -97,7 +104,7 @@ func (r *RestoreMemory) restoreNamespace( } slog.Info("Apply incremental backups", "size", len(incrementalBackups)) for _, incrBackup := range incrementalBackups { - result, err := r.restoreFromPath(request, incrBackup.Key) + result, err := r.restoreFromPath(ctx, request, incrBackup.Key) if err != nil { return fmt.Errorf("could not restore incremental backup %s: %v", *incrBackup.Key, err) } @@ -107,14 +114,16 @@ func (r *RestoreMemory) restoreNamespace( } func (r *RestoreMemory) restoreFromPath( + ctx context.Context, request *model.RestoreTimestampRequest, backupPath *string, ) (*model.RestoreResult, error) { restoreRequest := r.toRestoreRequest(request) - restoreResult, err := r.restoreService.RestoreRun(&model.RestoreRequestInternal{ - RestoreRequest: *restoreRequest, - Dir: backupPath, - }) + restoreResult, err := r.restoreService.RestoreRun(ctx, + &model.RestoreRequestInternal{ + RestoreRequest: *restoreRequest, + Dir: backupPath, + }) if err != nil { return nil, fmt.Errorf("could not restore backup at %s: %w", *backupPath, err) } diff --git a/pkg/shared/backup_go.go b/pkg/shared/backup_go.go index 1a6db9e8..bafe8fdf 100644 --- a/pkg/shared/backup_go.go +++ b/pkg/shared/backup_go.go @@ -28,7 +28,7 @@ func NewBackupGo() *BackupGo { // BackupRun calls the backup_run function from the asbackup shared library. // //nolint:funlen,gocritic -func (b *BackupGo) BackupRun(backupRoutine *model.BackupRoutine, backupPolicy *model.BackupPolicy, +func (b *BackupGo) BackupRun(ctx context.Context, backupRoutine *model.BackupRoutine, backupPolicy *model.BackupPolicy, client *a.Client, storage *model.Storage, _ *model.SecretAgent, opts BackupOptions, namespace *string, path *string) (*backup.BackupHandler, error) { @@ -93,7 +93,6 @@ func (b *BackupGo) BackupRun(backupRoutine *model.BackupRoutine, backupPolicy *m } } - ctx := context.TODO() writerFactory, err := getWriter(ctx, path, storage) if err != nil { return nil, fmt.Errorf("failed to create backup writer, %w", err) diff --git a/pkg/shared/restore_go.go b/pkg/shared/restore_go.go index 8c0736e2..f6638f94 100644 --- a/pkg/shared/restore_go.go +++ b/pkg/shared/restore_go.go @@ -29,7 +29,8 @@ func NewRestoreGo() *RestoreGo { // RestoreRun calls the restore_run function from the asrestore shared library. // //nolint:funlen,gocritic -func (r *RestoreGo) RestoreRun(restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error) { +func (r *RestoreGo) RestoreRun(ctx context.Context, restoreRequest *model.RestoreRequestInternal, +) (*model.RestoreResult, error) { var err error client, err := a.NewClientWithPolicyAndHost( restoreRequest.DestinationCuster.ASClientPolicy(), @@ -106,7 +107,6 @@ func (r *RestoreGo) RestoreRun(restoreRequest *model.RestoreRequestInternal) (*m } } - ctx := context.TODO() reader, err := getReader(ctx, restoreRequest.Dir, restoreRequest.SourceStorage, config.DecoderFactory) if err != nil { return nil, fmt.Errorf("failed to create backup reader, %w", err) diff --git a/pkg/shared/restore_mock.go b/pkg/shared/restore_mock.go index 36a3ee66..a7d2229d 100644 --- a/pkg/shared/restore_mock.go +++ b/pkg/shared/restore_mock.go @@ -1,6 +1,7 @@ package shared import ( + "context" "fmt" "log/slog" "time" @@ -21,7 +22,8 @@ func NewRestoreMock() *RestoreMock { } // RestoreRun mocks the interface method. -func (r *RestoreMock) RestoreRun(restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error) { +func (r *RestoreMock) RestoreRun(_ context.Context, restoreRequest *model.RestoreRequestInternal, +) (*model.RestoreResult, error) { if restoreRequest.DestinationCuster == nil { return nil, fmt.Errorf("RestoreRun mock call without DestinationCuster provided, will fail") } diff --git a/pkg/shared/types.go b/pkg/shared/types.go index 9d028298..6e75b302 100644 --- a/pkg/shared/types.go +++ b/pkg/shared/types.go @@ -1,6 +1,7 @@ package shared import ( + "context" "time" "github.com/aerospike/aerospike-client-go/v7" @@ -16,7 +17,8 @@ type BackupOptions struct { // Backup represents a backup service. type Backup interface { - BackupRun(backupRoutine *model.BackupRoutine, + BackupRun(ctx context.Context, + backupRoutine *model.BackupRoutine, backupPolicy *model.BackupPolicy, client *aerospike.Client, storage *model.Storage, @@ -29,5 +31,5 @@ type Backup interface { // Restore represents a restore service. type Restore interface { - RestoreRun(restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error) + RestoreRun(ctx context.Context, restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error) } From f91ff65afebb6f70056b7052ae0ae9a904507602 Mon Sep 17 00:00:00 2001 From: Anton Korotkov Date: Sun, 28 Jul 2024 15:23:51 +0300 Subject: [PATCH 2/2] merge --- go.sum | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/go.sum b/go.sum index 22f940f3..39516e9a 100644 --- a/go.sum +++ b/go.sum @@ -352,7 +352,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -447,6 +450,8 @@ github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= +github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= +github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -549,6 +554,8 @@ google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=