Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass context object from the top level #198

Merged
merged 3 commits into from
Jul 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down Expand Up @@ -447,6 +450,8 @@ github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down Expand Up @@ -549,6 +554,8 @@ google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM=
google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc=
google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
Expand Down
15 changes: 9 additions & 6 deletions pkg/service/backup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (h *BackupHandler) runFullBackupInternal(ctx context.Context, now time.Time
clear(h.fullBackupHandlers)
}()

err = h.startFullBackupForAllNamespaces(now, client)
err = h.startFullBackupForAllNamespaces(ctx, now, client)
if err != nil {
return err
}
Expand All @@ -126,7 +126,8 @@ func (h *BackupHandler) runFullBackupInternal(ctx context.Context, now time.Time
return nil
}

func (h *BackupHandler) startFullBackupForAllNamespaces(upperBound time.Time, client *aerospike.Client) error {
func (h *BackupHandler) startFullBackupForAllNamespaces(
ctx context.Context, upperBound time.Time, client *aerospike.Client) error {
clear(h.fullBackupHandlers)

timebounds := model.TimeBounds{}
Expand All @@ -137,7 +138,7 @@ func (h *BackupHandler) startFullBackupForAllNamespaces(upperBound time.Time, cl
for _, namespace := range h.namespaces {
backupFolder := getFullPath(h.backend.fullBackupsPath, h.backupFullPolicy, namespace, upperBound)
backupPath := h.backend.wrapWithPrefix(backupFolder)
handler, err := backupService.BackupRun(h.backupRoutine, h.backupFullPolicy, client,
handler, err := backupService.BackupRun(ctx, h.backupRoutine, h.backupFullPolicy, client,
h.storage, h.secretAgent, timebounds, &namespace, backupPath)
if err != nil {
backupFailureCounter.Inc()
Expand Down Expand Up @@ -245,7 +246,7 @@ func (h *BackupHandler) runIncrementalBackup(ctx context.Context, now time.Time)
clear(h.incrBackupHandlers)
}()

h.startIncrementalBackupForAllNamespaces(client, now)
h.startIncrementalBackupForAllNamespaces(ctx, client, now)

h.waitForIncrementalBackups(ctx, now)
// increment incrBackupCounter metric
Expand All @@ -255,7 +256,9 @@ func (h *BackupHandler) runIncrementalBackup(ctx context.Context, now time.Time)
h.updateIncrementalBackupState(now)
}

func (h *BackupHandler) startIncrementalBackupForAllNamespaces(client *aerospike.Client, upperBound time.Time) {
func (h *BackupHandler) startIncrementalBackupForAllNamespaces(
ctx context.Context, client *aerospike.Client, upperBound time.Time) {

timebounds := model.NewTimeBoundsFrom(h.state.LastRun())
if h.backupFullPolicy.IsSealed() {
timebounds.ToTime = &upperBound
Expand All @@ -265,7 +268,7 @@ func (h *BackupHandler) startIncrementalBackupForAllNamespaces(client *aerospike
for _, namespace := range h.namespaces {
backupFolder := getIncrementalPath(h.backend.incrementalBackupsPath, namespace, upperBound)
backupPath := h.backend.wrapWithPrefix(backupFolder)
handler, err := backupService.BackupRun(
handler, err := backupService.BackupRun(ctx,
h.backupRoutine, h.backupIncrPolicy, client, h.storage, h.secretAgent, *timebounds, &namespace, backupPath)
if err != nil {
incrBackupFailureCounter.Inc()
Expand Down
29 changes: 19 additions & 10 deletions pkg/service/restore_memory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package service

import (
"context"
"fmt"
"log/slog"
"path/filepath"
Expand Down Expand Up @@ -38,15 +39,17 @@ func (r *RestoreMemory) Restore(request *model.RestoreRequestInternal) (int, err
if err := validateStorageContainsBackup(request.SourceStorage); err != nil {
return 0, err
}
ctx := context.TODO()
go func() {
restoreResult, err := r.restoreService.RestoreRun(request)
restoreResult, err := r.restoreService.RestoreRun(ctx, request)
if err != nil {
r.restoreJobs.setFailed(jobID, fmt.Errorf("failed restore operation: %w", err))
return
}
r.restoreJobs.increaseStats(jobID, restoreResult)
r.restoreJobs.setDone(jobID)
}()

return jobID, nil
}

Expand All @@ -60,17 +63,20 @@ func (r *RestoreMemory) RestoreByTime(request *model.RestoreTimestampRequest) (i
return 0, fmt.Errorf("last full backup not found: %v", err)
}
jobID := r.restoreJobs.newJob()
go r.restoreByTimeSync(reader, request, jobID, fullBackups)
ctx := context.TODO()
go r.restoreByTimeSync(ctx, reader, request, jobID, fullBackups)

return jobID, nil
}

func (r *RestoreMemory) restoreByTimeSync(backend BackupListReader,
func (r *RestoreMemory) restoreByTimeSync(ctx context.Context,
backend BackupListReader,
request *model.RestoreTimestampRequest,
jobID int,
fullBackups []model.BackupDetails,
) {
for _, nsBackup := range fullBackups {
if err := r.restoreNamespace(backend, request, jobID, nsBackup); err != nil {
if err := r.restoreNamespace(ctx, backend, request, jobID, nsBackup); err != nil {
slog.Error("Failed to restore by timestamp", "routine", request.Routine, "err", err)
r.restoreJobs.setFailed(jobID, err)
return
Expand All @@ -80,11 +86,12 @@ func (r *RestoreMemory) restoreByTimeSync(backend BackupListReader,
}

func (r *RestoreMemory) restoreNamespace(
ctx context.Context,
backend BackupListReader,
request *model.RestoreTimestampRequest,
jobID int, fullBackup model.BackupDetails,
) error {
result, err := r.restoreFromPath(request, fullBackup.Key)
result, err := r.restoreFromPath(ctx, request, fullBackup.Key)
if err != nil {
return fmt.Errorf("could not restore full backup for namespace %s: %v", fullBackup.Namespace, err)
}
Expand All @@ -97,7 +104,7 @@ func (r *RestoreMemory) restoreNamespace(
}
slog.Info("Apply incremental backups", "size", len(incrementalBackups))
for _, incrBackup := range incrementalBackups {
result, err := r.restoreFromPath(request, incrBackup.Key)
result, err := r.restoreFromPath(ctx, request, incrBackup.Key)
if err != nil {
return fmt.Errorf("could not restore incremental backup %s: %v", *incrBackup.Key, err)
}
Expand All @@ -107,14 +114,16 @@ func (r *RestoreMemory) restoreNamespace(
}

func (r *RestoreMemory) restoreFromPath(
ctx context.Context,
request *model.RestoreTimestampRequest,
backupPath *string,
) (*model.RestoreResult, error) {
restoreRequest := r.toRestoreRequest(request)
restoreResult, err := r.restoreService.RestoreRun(&model.RestoreRequestInternal{
RestoreRequest: *restoreRequest,
Dir: backupPath,
})
restoreResult, err := r.restoreService.RestoreRun(ctx,
&model.RestoreRequestInternal{
RestoreRequest: *restoreRequest,
Dir: backupPath,
})
if err != nil {
return nil, fmt.Errorf("could not restore backup at %s: %w", *backupPath, err)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/shared/backup_go.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
// BackupRun calls the backup_run function from the asbackup shared library.
//
//nolint:funlen,gocritic
func (b *BackupGo) BackupRun(backupRoutine *model.BackupRoutine, backupPolicy *model.BackupPolicy,
func (b *BackupGo) BackupRun(ctx context.Context, backupRoutine *model.BackupRoutine, backupPolicy *model.BackupPolicy,
client *a.Client, storage *model.Storage, _ *model.SecretAgent,
timebounds model.TimeBounds, namespace *string, path *string) (*backup.BackupHandler, error) {

Check failure on line 33 in pkg/shared/backup_go.go

View workflow job for this annotation

GitHub Actions / lint

undefined: backup (typecheck)

backupClient, err := backup.NewClient(client, "1", slog.Default())
if err != nil {
Expand Down Expand Up @@ -93,7 +93,6 @@
}
}

ctx := context.TODO()
writerFactory, err := getWriter(ctx, path, storage)
if err != nil {
return nil, fmt.Errorf("failed to create backup writer, %w", err)
Expand All @@ -107,7 +106,7 @@
return handler, nil
}

func getWriter(ctx context.Context, path *string, storage *model.Storage) (backup.WriteFactory, error) {

Check failure on line 109 in pkg/shared/backup_go.go

View workflow job for this annotation

GitHub Actions / lint

undefined: backup (typecheck)
switch storage.Type {
case model.Local:
return local.NewDirectoryWriterFactory(*path, true)
Expand Down
4 changes: 2 additions & 2 deletions pkg/shared/restore_go.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func NewRestoreGo() *RestoreGo {
// RestoreRun calls the restore_run function from the asrestore shared library.
//
//nolint:funlen,gocritic
func (r *RestoreGo) RestoreRun(restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error) {
func (r *RestoreGo) RestoreRun(ctx context.Context, restoreRequest *model.RestoreRequestInternal,
) (*model.RestoreResult, error) {
var err error
client, err := a.NewClientWithPolicyAndHost(
restoreRequest.DestinationCuster.ASClientPolicy(),
Expand Down Expand Up @@ -106,7 +107,6 @@ func (r *RestoreGo) RestoreRun(restoreRequest *model.RestoreRequestInternal) (*m
}
}

ctx := context.TODO()
reader, err := getReader(ctx, restoreRequest.Dir, restoreRequest.SourceStorage, config.DecoderFactory)
if err != nil {
return nil, fmt.Errorf("failed to create backup reader, %w", err)
Expand Down
4 changes: 3 additions & 1 deletion pkg/shared/restore_mock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package shared

import (
"context"
"fmt"
"log/slog"
"time"
Expand All @@ -21,7 +22,8 @@ func NewRestoreMock() *RestoreMock {
}

// RestoreRun mocks the interface method.
func (r *RestoreMock) RestoreRun(restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error) {
func (r *RestoreMock) RestoreRun(_ context.Context, restoreRequest *model.RestoreRequestInternal,
) (*model.RestoreResult, error) {
if restoreRequest.DestinationCuster == nil {
return nil, fmt.Errorf("RestoreRun mock call without DestinationCuster provided, will fail")
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/shared/types.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package shared

import (
"context"

"github.com/aerospike/aerospike-client-go/v7"
"github.com/aerospike/backup-go"
"github.com/aerospike/backup/pkg/model"
)

// Backup represents a backup service.
type Backup interface {
BackupRun(backupRoutine *model.BackupRoutine,
BackupRun(ctx context.Context,
backupRoutine *model.BackupRoutine,
backupPolicy *model.BackupPolicy,
client *aerospike.Client,
storage *model.Storage,
Expand All @@ -16,10 +19,10 @@
timebounds model.TimeBounds,
namespace *string,
path *string,
) (*backup.BackupHandler, error)

Check failure on line 22 in pkg/shared/types.go

View workflow job for this annotation

GitHub Actions / lint

undefined: backup (typecheck)
}

// Restore represents a restore service.
type Restore interface {
RestoreRun(restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error)
RestoreRun(ctx context.Context, restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error)
}
Loading