From 06d326abcde2a3d3c9eb7cfbd910670384bc7363 Mon Sep 17 00:00:00 2001 From: Anton Korotkov Date: Sun, 28 Jul 2024 15:15:11 +0300 Subject: [PATCH 1/7] don't create client for cluster configuration requests --- pkg/service/aerospike_service.go | 27 +++++++++++---------------- pkg/service/backup_handler.go | 11 ++++++----- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/pkg/service/aerospike_service.go b/pkg/service/aerospike_service.go index fbdbdb91..7461192b 100644 --- a/pkg/service/aerospike_service.go +++ b/pkg/service/aerospike_service.go @@ -16,6 +16,9 @@ import ( const namespaceInfo = "namespaces" +// getAllNamespacesOfCluster retrieves a list of all namespaces in an Aerospike cluster. +// this function is called maximum once for each routine (on application startup) +// so it's ok to create client here. func getAllNamespacesOfCluster(cluster *model.AerospikeCluster) ([]string, error) { client, err := as.NewClientWithPolicyAndHost(cluster.ASClientPolicy(), cluster.ASClientHosts()...) @@ -36,15 +39,13 @@ func getAllNamespacesOfCluster(cluster *model.AerospikeCluster) ([]string, error return strings.Split(namespaces, ";"), nil } -func getClusterConfiguration(cluster *model.AerospikeCluster) ([]asconfig.DotConf, error) { - activeHosts, err := getActiveHosts(cluster) - if err != nil { - return nil, err - } +func getClusterConfiguration(client *as.Client) []asconfig.DotConf { + activeHosts := getActiveHosts(client) var outputs = make([]asconfig.DotConf, 0, len(activeHosts)) + policy := client.Cluster().ClientPolicy() for _, host := range activeHosts { - asInfo := info.NewAsInfo(logr.Logger{}, host, cluster.ASClientPolicy()) + asInfo := info.NewAsInfo(logr.Logger{}, host, &policy) conf, err := asconfig.GenerateConf(logr.Discard(), asInfo, true) if err != nil { @@ -61,22 +62,16 @@ func getClusterConfiguration(cluster *model.AerospikeCluster) ([]asconfig.DotCon outputs = append(outputs, configAsString) } - return outputs, nil + return outputs } -func getActiveHosts(cluster *model.AerospikeCluster) ([]*as.Host, error) { - client, err := as.NewClientWithPolicyAndHost(cluster.ASClientPolicy(), cluster.ASClientHosts()...) - if err != nil { - return nil, err - } - - defer client.Close() - +func getActiveHosts(client *as.Client) []*as.Host { var activeHosts []*as.Host for _, node := range client.GetNodes() { if node.IsActive() { activeHosts = append(activeHosts, node.GetHost()) } } - return activeHosts, nil + + return activeHosts } diff --git a/pkg/service/backup_handler.go b/pkg/service/backup_handler.go index 72bcfb34..773407c0 100644 --- a/pkg/service/backup_handler.go +++ b/pkg/service/backup_handler.go @@ -123,7 +123,7 @@ func (h *BackupHandler) runFullBackupInternal(ctx context.Context, now time.Time h.cleanIncrementalBackups() - h.writeClusterConfiguration(now) + h.writeClusterConfiguration(client, now) return nil } @@ -169,12 +169,13 @@ func (h *BackupHandler) waitForFullBackups(ctx context.Context, backupTimestamp return nil } -func (h *BackupHandler) writeClusterConfiguration(now time.Time) { - infos, err := getClusterConfiguration(h.cluster) - if err != nil || len(infos) == 0 { - slog.Warn("Could not read aerospike configuration", "err", err, "name", h.routineName) +func (h *BackupHandler) writeClusterConfiguration(client *aerospike.Client, now time.Time) { + infos := getClusterConfiguration(client) + if len(infos) == 0 { + slog.Warn("Could not read aerospike configuration", "name", h.routineName) return } + path := getConfigurationPath(h.backend.fullBackupsPath, h.backupFullPolicy, now) h.backend.CreateFolder(path) for i, info := range infos { From f92f78128587a7489c937ab6644a9e7a0302b85e Mon Sep 17 00:00:00 2001 From: Anton Korotkov Date: Mon, 29 Jul 2024 10:34:57 +0300 Subject: [PATCH 2/7] reuse client on restore --- pkg/service/restore_memory.go | 62 +++++++++++++++++++++++++++-------- pkg/shared/restore_go.go | 10 +----- pkg/shared/restore_mock.go | 3 +- pkg/shared/types.go | 2 +- 4 files changed, 52 insertions(+), 25 deletions(-) diff --git a/pkg/service/restore_memory.go b/pkg/service/restore_memory.go index bbe78a9f..3c9606d1 100644 --- a/pkg/service/restore_memory.go +++ b/pkg/service/restore_memory.go @@ -2,9 +2,11 @@ package service import ( "fmt" + "github.com/aerospike/aerospike-client-go/v7" "log/slog" "path/filepath" "sort" + "sync" "time" "github.com/aerospike/backup/pkg/model" @@ -38,8 +40,17 @@ func (r *RestoreMemory) Restore(request *model.RestoreRequestInternal) (int, err if err := validateStorageContainsBackup(request.SourceStorage); err != nil { return 0, err } + + client, aerr := aerospike.NewClientWithPolicyAndHost( + request.DestinationCuster.ASClientPolicy(), + request.DestinationCuster.ASClientHosts()...) + if aerr != nil { + return 0, fmt.Errorf("failed to connect to aerospike cluster, %w", aerr) + } + go func() { - restoreResult, err := r.restoreService.RestoreRun(request) + defer client.Close() + restoreResult, err := r.restoreService.RestoreRun(client, request) if err != nil { r.restoreJobs.setFailed(jobID, fmt.Errorf("failed restore operation: %w", err)) return @@ -47,6 +58,7 @@ func (r *RestoreMemory) Restore(request *model.RestoreRequestInternal) (int, err r.restoreJobs.increaseStats(jobID, restoreResult) r.restoreJobs.setDone(jobID) }() + return jobID, nil } @@ -60,31 +72,50 @@ func (r *RestoreMemory) RestoreByTime(request *model.RestoreTimestampRequest) (i return 0, fmt.Errorf("last full backup not found: %v", err) } jobID := r.restoreJobs.newJob() - go r.restoreByTimeSync(reader, request, jobID, fullBackups) + client, aerr := aerospike.NewClientWithPolicyAndHost( + request.DestinationCuster.ASClientPolicy(), + request.DestinationCuster.ASClientHosts()...) + if aerr != nil { + return 0, fmt.Errorf("failed to connect to aerospike cluster, %w", aerr) + } + go r.restoreByTimeSync(client, reader, request, jobID, fullBackups) return jobID, nil } -func (r *RestoreMemory) restoreByTimeSync(backend BackupListReader, +func (r *RestoreMemory) restoreByTimeSync( + client *aerospike.Client, + backend BackupListReader, request *model.RestoreTimestampRequest, jobID int, fullBackups []model.BackupDetails, ) { + var wg sync.WaitGroup + for _, nsBackup := range fullBackups { - if err := r.restoreNamespace(backend, request, jobID, nsBackup); err != nil { - slog.Error("Failed to restore by timestamp", "routine", request.Routine, "err", err) - r.restoreJobs.setFailed(jobID, err) - return - } + wg.Add(1) + go func(nsBackup model.BackupDetails) { + defer wg.Done() + if err := r.restoreNamespace(client, backend, request, jobID, nsBackup); err != nil { + slog.Error("Failed to restore by timestamp", "routine", request.Routine, "err", err) + r.restoreJobs.setFailed(jobID, err) + return + } + }(nsBackup) } + + wg.Wait() + r.restoreJobs.setDone(jobID) + client.Close() } func (r *RestoreMemory) restoreNamespace( + client *aerospike.Client, backend BackupListReader, request *model.RestoreTimestampRequest, jobID int, fullBackup model.BackupDetails, ) error { - result, err := r.restoreFromPath(request, fullBackup.Key) + result, err := r.restoreFromPath(client, request, fullBackup.Key) if err != nil { return fmt.Errorf("could not restore full backup for namespace %s: %v", fullBackup.Namespace, err) } @@ -97,7 +128,7 @@ func (r *RestoreMemory) restoreNamespace( } slog.Info("Apply incremental backups", "size", len(incrementalBackups)) for _, incrBackup := range incrementalBackups { - result, err := r.restoreFromPath(request, incrBackup.Key) + result, err := r.restoreFromPath(client, request, incrBackup.Key) if err != nil { return fmt.Errorf("could not restore incremental backup %s: %v", *incrBackup.Key, err) } @@ -107,14 +138,17 @@ func (r *RestoreMemory) restoreNamespace( } func (r *RestoreMemory) restoreFromPath( + client *aerospike.Client, request *model.RestoreTimestampRequest, backupPath *string, ) (*model.RestoreResult, error) { restoreRequest := r.toRestoreRequest(request) - restoreResult, err := r.restoreService.RestoreRun(&model.RestoreRequestInternal{ - RestoreRequest: *restoreRequest, - Dir: backupPath, - }) + restoreResult, err := r.restoreService.RestoreRun( + client, + &model.RestoreRequestInternal{ + RestoreRequest: *restoreRequest, + Dir: backupPath, + }) if err != nil { return nil, fmt.Errorf("could not restore backup at %s: %w", *backupPath, err) } diff --git a/pkg/shared/restore_go.go b/pkg/shared/restore_go.go index 8c0736e2..4382e63c 100644 --- a/pkg/shared/restore_go.go +++ b/pkg/shared/restore_go.go @@ -29,16 +29,8 @@ func NewRestoreGo() *RestoreGo { // RestoreRun calls the restore_run function from the asrestore shared library. // //nolint:funlen,gocritic -func (r *RestoreGo) RestoreRun(restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error) { +func (r *RestoreGo) RestoreRun(client *a.Client, restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error) { var err error - client, err := a.NewClientWithPolicyAndHost( - restoreRequest.DestinationCuster.ASClientPolicy(), - restoreRequest.DestinationCuster.ASClientHosts()...) - if err != nil { - return nil, fmt.Errorf("failed to connect to aerospike cluster, %w", err) - } - defer client.Close() - backupClient, err := backup.NewClient(client, "1", slog.Default()) if err != nil { return nil, fmt.Errorf("failed to create backup client, %w", err) diff --git a/pkg/shared/restore_mock.go b/pkg/shared/restore_mock.go index 36a3ee66..d4b37d7b 100644 --- a/pkg/shared/restore_mock.go +++ b/pkg/shared/restore_mock.go @@ -2,6 +2,7 @@ package shared import ( "fmt" + "github.com/aerospike/aerospike-client-go/v7" "log/slog" "time" @@ -21,7 +22,7 @@ func NewRestoreMock() *RestoreMock { } // RestoreRun mocks the interface method. -func (r *RestoreMock) RestoreRun(restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error) { +func (r *RestoreMock) RestoreRun(_ *aerospike.Client, restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error) { if restoreRequest.DestinationCuster == nil { return nil, fmt.Errorf("RestoreRun mock call without DestinationCuster provided, will fail") } diff --git a/pkg/shared/types.go b/pkg/shared/types.go index 9d028298..25d04540 100644 --- a/pkg/shared/types.go +++ b/pkg/shared/types.go @@ -29,5 +29,5 @@ type Backup interface { // Restore represents a restore service. type Restore interface { - RestoreRun(restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error) + RestoreRun(client *aerospike.Client, restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error) } From 7d12d39aff7ee4b2e4b0711a24ddc6c77a4e7e65 Mon Sep 17 00:00:00 2001 From: Anton Korotkov Date: Mon, 29 Jul 2024 11:22:36 +0300 Subject: [PATCH 3/7] linter warnings --- pkg/service/restore_memory.go | 2 +- pkg/shared/restore_go.go | 5 +++-- pkg/shared/restore_mock.go | 5 +++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/service/restore_memory.go b/pkg/service/restore_memory.go index 3c9606d1..146d9d9e 100644 --- a/pkg/service/restore_memory.go +++ b/pkg/service/restore_memory.go @@ -2,13 +2,13 @@ package service import ( "fmt" - "github.com/aerospike/aerospike-client-go/v7" "log/slog" "path/filepath" "sort" "sync" "time" + "github.com/aerospike/aerospike-client-go/v7" "github.com/aerospike/backup/pkg/model" "github.com/aerospike/backup/pkg/shared" "github.com/aerospike/backup/pkg/util" diff --git a/pkg/shared/restore_go.go b/pkg/shared/restore_go.go index 4382e63c..f15d0f3d 100644 --- a/pkg/shared/restore_go.go +++ b/pkg/shared/restore_go.go @@ -26,10 +26,11 @@ func NewRestoreGo() *RestoreGo { return &RestoreGo{} } -// RestoreRun calls the restore_run function from the asrestore shared library. +// RestoreRun calls the restore function from the asbackup library. // //nolint:funlen,gocritic -func (r *RestoreGo) RestoreRun(client *a.Client, restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error) { +func (r *RestoreGo) RestoreRun(client *a.Client, restoreRequest *model.RestoreRequestInternal, +) (*model.RestoreResult, error) { var err error backupClient, err := backup.NewClient(client, "1", slog.Default()) if err != nil { diff --git a/pkg/shared/restore_mock.go b/pkg/shared/restore_mock.go index d4b37d7b..c3cb26a3 100644 --- a/pkg/shared/restore_mock.go +++ b/pkg/shared/restore_mock.go @@ -2,10 +2,10 @@ package shared import ( "fmt" - "github.com/aerospike/aerospike-client-go/v7" "log/slog" "time" + "github.com/aerospike/aerospike-client-go/v7" "github.com/aerospike/backup/pkg/model" ) @@ -22,7 +22,8 @@ func NewRestoreMock() *RestoreMock { } // RestoreRun mocks the interface method. -func (r *RestoreMock) RestoreRun(_ *aerospike.Client, restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error) { +func (r *RestoreMock) RestoreRun(_ *aerospike.Client, restoreRequest *model.RestoreRequestInternal, +) (*model.RestoreResult, error) { if restoreRequest.DestinationCuster == nil { return nil, fmt.Errorf("RestoreRun mock call without DestinationCuster provided, will fail") } From d82b750c3f6ed1492c9601cc0db513fea19fc6c2 Mon Sep 17 00:00:00 2001 From: Anton Korotkov Date: Mon, 29 Jul 2024 11:53:19 +0300 Subject: [PATCH 4/7] merge --- pkg/service/backup_handler.go | 2 +- pkg/shared/types.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/service/backup_handler.go b/pkg/service/backup_handler.go index c9291ae8..d80d2420 100644 --- a/pkg/service/backup_handler.go +++ b/pkg/service/backup_handler.go @@ -180,7 +180,7 @@ func (h *BackupHandler) writeClusterConfiguration(client *aerospike.Client, now for i, info := range infos { confFilePath := fmt.Sprintf("%s/aerospike_%d.conf", path, i) slog.Debug("Write aerospike configuration", "path", confFilePath) - err = h.backend.write(confFilePath, []byte(info)) + err := h.backend.write(confFilePath, []byte(info)) if err != nil { slog.Error("Failed to write configuration for the backup", "name", h.routineName, "err", err) } diff --git a/pkg/shared/types.go b/pkg/shared/types.go index 7ba7e8eb..8b0e1ba1 100644 --- a/pkg/shared/types.go +++ b/pkg/shared/types.go @@ -24,5 +24,6 @@ type Backup interface { // Restore represents a restore service. type Restore interface { - RestoreRun(ctx context.Context, client *aerospike.Client, restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error) + RestoreRun(ctx context.Context, client *aerospike.Client, restoreRequest *model.RestoreRequestInternal, + ) (*model.RestoreResult, error) } From 28343a67fcd8cfa814dab2644bfea144d22bf1fb Mon Sep 17 00:00:00 2001 From: Anton Korotkov Date: Mon, 29 Jul 2024 12:38:32 +0300 Subject: [PATCH 5/7] create client in goroutine --- pkg/service/restore_memory.go | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/pkg/service/restore_memory.go b/pkg/service/restore_memory.go index 13e7a0d7..46f7b60c 100644 --- a/pkg/service/restore_memory.go +++ b/pkg/service/restore_memory.go @@ -42,15 +42,17 @@ func (r *RestoreMemory) Restore(request *model.RestoreRequestInternal) (int, err return 0, err } - client, aerr := aerospike.NewClientWithPolicyAndHost( - request.DestinationCuster.ASClientPolicy(), - request.DestinationCuster.ASClientHosts()...) - if aerr != nil { - return 0, fmt.Errorf("failed to connect to aerospike cluster, %w", aerr) - } - ctx := context.TODO() go func() { + client, aerr := aerospike.NewClientWithPolicyAndHost( + request.DestinationCuster.ASClientPolicy(), + request.DestinationCuster.ASClientHosts()...) + if aerr != nil { + err := fmt.Errorf("failed to connect to aerospike cluster, %w", aerr) + slog.Error("Failed to restore by timestamp", "cluster", request.RestoreRequest.DestinationCuster, "err", err) + r.restoreJobs.setFailed(jobID, err) + return + } defer client.Close() restoreResult, err := r.restoreService.RestoreRun(ctx, client, request) if err != nil { @@ -74,26 +76,29 @@ func (r *RestoreMemory) RestoreByTime(request *model.RestoreTimestampRequest) (i return 0, fmt.Errorf("last full backup not found: %v", err) } jobID := r.restoreJobs.newJob() - client, aerr := aerospike.NewClientWithPolicyAndHost( - request.DestinationCuster.ASClientPolicy(), - request.DestinationCuster.ASClientHosts()...) - if aerr != nil { - return 0, fmt.Errorf("failed to connect to aerospike cluster, %w", aerr) - } ctx := context.TODO() - go r.restoreByTimeSync(ctx, client, reader, request, jobID, fullBackups) + go r.restoreByTimeSync(ctx, reader, request, jobID, fullBackups) return jobID, nil } func (r *RestoreMemory) restoreByTimeSync( ctx context.Context, - client *aerospike.Client, backend BackupListReader, request *model.RestoreTimestampRequest, jobID int, fullBackups []model.BackupDetails, ) { + client, aerr := aerospike.NewClientWithPolicyAndHost( + request.DestinationCuster.ASClientPolicy(), + request.DestinationCuster.ASClientHosts()...) + if aerr != nil { + err := fmt.Errorf("failed to connect to aerospike cluster, %w", aerr) + slog.Error("Failed to restore by timestamp", "routine", request.Routine, "err", err) + r.restoreJobs.setFailed(jobID, err) + return + } + var wg sync.WaitGroup for _, nsBackup := range fullBackups { From c819d8dbdfd7ad32b356cbf54eb1189aa6dea148 Mon Sep 17 00:00:00 2001 From: Anton Korotkov Date: Mon, 29 Jul 2024 12:41:23 +0300 Subject: [PATCH 6/7] use defer to close client --- pkg/service/restore_memory.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/service/restore_memory.go b/pkg/service/restore_memory.go index 46f7b60c..228dc269 100644 --- a/pkg/service/restore_memory.go +++ b/pkg/service/restore_memory.go @@ -54,6 +54,7 @@ func (r *RestoreMemory) Restore(request *model.RestoreRequestInternal) (int, err return } defer client.Close() + restoreResult, err := r.restoreService.RestoreRun(ctx, client, request) if err != nil { r.restoreJobs.setFailed(jobID, fmt.Errorf("failed restore operation: %w", err)) @@ -98,6 +99,7 @@ func (r *RestoreMemory) restoreByTimeSync( r.restoreJobs.setFailed(jobID, err) return } + defer client.Close() var wg sync.WaitGroup @@ -116,7 +118,6 @@ func (r *RestoreMemory) restoreByTimeSync( wg.Wait() r.restoreJobs.setDone(jobID) - client.Close() } func (r *RestoreMemory) restoreNamespace( From ebb3d1d6084536266621f2483a52befd1d0cdbf7 Mon Sep 17 00:00:00 2001 From: Anton Korotkov Date: Mon, 29 Jul 2024 15:53:10 +0300 Subject: [PATCH 7/7] extract init client method --- pkg/service/restore_memory.go | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/pkg/service/restore_memory.go b/pkg/service/restore_memory.go index 228dc269..cf9163a1 100644 --- a/pkg/service/restore_memory.go +++ b/pkg/service/restore_memory.go @@ -44,13 +44,8 @@ func (r *RestoreMemory) Restore(request *model.RestoreRequestInternal) (int, err ctx := context.TODO() go func() { - client, aerr := aerospike.NewClientWithPolicyAndHost( - request.DestinationCuster.ASClientPolicy(), - request.DestinationCuster.ASClientHosts()...) - if aerr != nil { - err := fmt.Errorf("failed to connect to aerospike cluster, %w", aerr) - slog.Error("Failed to restore by timestamp", "cluster", request.RestoreRequest.DestinationCuster, "err", err) - r.restoreJobs.setFailed(jobID, err) + client, err := r.initClient(request.DestinationCuster, jobID) + if err != nil { return } defer client.Close() @@ -67,6 +62,19 @@ func (r *RestoreMemory) Restore(request *model.RestoreRequestInternal) (int, err return jobID, nil } +func (r *RestoreMemory) initClient(cluster *model.AerospikeCluster, jobID int) (*aerospike.Client, error) { + client, aerr := aerospike.NewClientWithPolicyAndHost( + cluster.ASClientPolicy(), + cluster.ASClientHosts()...) + if aerr != nil { + err := fmt.Errorf("failed to connect to aerospike cluster, %w", aerr) + slog.Error("Failed to restore by timestamp", "cluster", cluster, "err", err) + r.restoreJobs.setFailed(jobID, err) + return nil, err + } + return client, nil +} + func (r *RestoreMemory) RestoreByTime(request *model.RestoreTimestampRequest) (int, error) { reader, found := r.backends.GetReader(request.Routine) if !found { @@ -90,13 +98,8 @@ func (r *RestoreMemory) restoreByTimeSync( jobID int, fullBackups []model.BackupDetails, ) { - client, aerr := aerospike.NewClientWithPolicyAndHost( - request.DestinationCuster.ASClientPolicy(), - request.DestinationCuster.ASClientHosts()...) - if aerr != nil { - err := fmt.Errorf("failed to connect to aerospike cluster, %w", aerr) - slog.Error("Failed to restore by timestamp", "routine", request.Routine, "err", err) - r.restoreJobs.setFailed(jobID, err) + client, err := r.initClient(request.DestinationCuster, jobID) + if err != nil { return } defer client.Close() @@ -280,11 +283,11 @@ func validateStorageContainsBackup(storage *model.Storage) error { case model.Local: return validatePathContainsBackup(*storage.Path) case model.S3: - context, err := NewS3Context(storage) + s3context, err := NewS3Context(storage) if err != nil { return err } - return context.validateStorageContainsBackup() + return s3context.validateStorageContainsBackup() } return nil }