Skip to content

Commit

Permalink
APPS-1228 Don't create multiple clients on every backup/restore (#199)
Browse files Browse the repository at this point in the history
* don't create client for cluster configuration requests

* reuse client on restore

* linter warnings

* merge

* create client in goroutine

* use defer to close client

* extract init client method
  • Loading branch information
korotkov-aerospike authored Jul 29, 2024
1 parent 2b3195c commit 55111c1
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 45 deletions.
27 changes: 11 additions & 16 deletions pkg/service/aerospike_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (

const namespaceInfo = "namespaces"

// getAllNamespacesOfCluster retrieves a list of all namespaces in an Aerospike cluster.
// this function is called maximum once for each routine (on application startup)
// so it's ok to create client here.
func getAllNamespacesOfCluster(cluster *model.AerospikeCluster) ([]string, error) {
client, err := as.NewClientWithPolicyAndHost(cluster.ASClientPolicy(), cluster.ASClientHosts()...)

Expand All @@ -36,15 +39,13 @@ func getAllNamespacesOfCluster(cluster *model.AerospikeCluster) ([]string, error
return strings.Split(namespaces, ";"), nil
}

func getClusterConfiguration(cluster *model.AerospikeCluster) ([]asconfig.DotConf, error) {
activeHosts, err := getActiveHosts(cluster)
if err != nil {
return nil, err
}
func getClusterConfiguration(client *as.Client) []asconfig.DotConf {
activeHosts := getActiveHosts(client)

var outputs = make([]asconfig.DotConf, 0, len(activeHosts))
policy := client.Cluster().ClientPolicy()
for _, host := range activeHosts {
asInfo := info.NewAsInfo(logr.Logger{}, host, cluster.ASClientPolicy())
asInfo := info.NewAsInfo(logr.Logger{}, host, &policy)

conf, err := asconfig.GenerateConf(logr.Discard(), asInfo, true)
if err != nil {
Expand All @@ -61,22 +62,16 @@ func getClusterConfiguration(cluster *model.AerospikeCluster) ([]asconfig.DotCon
outputs = append(outputs, configAsString)
}

return outputs, nil
return outputs
}

func getActiveHosts(cluster *model.AerospikeCluster) ([]*as.Host, error) {
client, err := as.NewClientWithPolicyAndHost(cluster.ASClientPolicy(), cluster.ASClientHosts()...)
if err != nil {
return nil, err
}

defer client.Close()

func getActiveHosts(client *as.Client) []*as.Host {
var activeHosts []*as.Host
for _, node := range client.GetNodes() {
if node.IsActive() {
activeHosts = append(activeHosts, node.GetHost())
}
}
return activeHosts, nil

return activeHosts
}
13 changes: 7 additions & 6 deletions pkg/service/backup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (h *BackupHandler) runFullBackupInternal(ctx context.Context, now time.Time

h.cleanIncrementalBackups()

h.writeClusterConfiguration(now)
h.writeClusterConfiguration(client, now)
return nil
}

Expand Down Expand Up @@ -169,17 +169,18 @@ func (h *BackupHandler) waitForFullBackups(ctx context.Context, backupTimestamp
return nil
}

func (h *BackupHandler) writeClusterConfiguration(now time.Time) {
infos, err := getClusterConfiguration(h.cluster)
if err != nil || len(infos) == 0 {
slog.Warn("Could not read aerospike configuration", "err", err, "name", h.routineName)
func (h *BackupHandler) writeClusterConfiguration(client *aerospike.Client, now time.Time) {
infos := getClusterConfiguration(client)
if len(infos) == 0 {
slog.Warn("Could not read aerospike configuration", "name", h.routineName)
return
}

path := getConfigurationPath(h.backend.fullBackupsPath, h.backupFullPolicy, now)
for i, info := range infos {
confFilePath := fmt.Sprintf("%s/aerospike_%d.conf", path, i)
slog.Debug("Write aerospike configuration", "path", confFilePath)
err = h.backend.write(confFilePath, []byte(info))
err := h.backend.write(confFilePath, []byte(info))
if err != nil {
slog.Error("Failed to write configuration for the backup", "name", h.routineName, "err", err)
}
Expand Down
63 changes: 52 additions & 11 deletions pkg/service/restore_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"log/slog"
"path/filepath"
"sort"
"sync"
"time"

"github.com/aerospike/aerospike-client-go/v7"
"github.com/aerospike/backup/pkg/model"
"github.com/aerospike/backup/pkg/shared"
"github.com/aerospike/backup/pkg/util"
Expand Down Expand Up @@ -39,9 +41,16 @@ func (r *RestoreMemory) Restore(request *model.RestoreRequestInternal) (int, err
if err := validateStorageContainsBackup(request.SourceStorage); err != nil {
return 0, err
}

ctx := context.TODO()
go func() {
restoreResult, err := r.restoreService.RestoreRun(ctx, request)
client, err := r.initClient(request.DestinationCuster, jobID)
if err != nil {
return
}
defer client.Close()

restoreResult, err := r.restoreService.RestoreRun(ctx, client, request)
if err != nil {
r.restoreJobs.setFailed(jobID, fmt.Errorf("failed restore operation: %w", err))
return
Expand All @@ -53,6 +62,19 @@ func (r *RestoreMemory) Restore(request *model.RestoreRequestInternal) (int, err
return jobID, nil
}

func (r *RestoreMemory) initClient(cluster *model.AerospikeCluster, jobID int) (*aerospike.Client, error) {
client, aerr := aerospike.NewClientWithPolicyAndHost(
cluster.ASClientPolicy(),
cluster.ASClientHosts()...)
if aerr != nil {
err := fmt.Errorf("failed to connect to aerospike cluster, %w", aerr)
slog.Error("Failed to restore by timestamp", "cluster", cluster, "err", err)
r.restoreJobs.setFailed(jobID, err)
return nil, err
}
return client, nil
}

func (r *RestoreMemory) RestoreByTime(request *model.RestoreTimestampRequest) (int, error) {
reader, found := r.backends.GetReader(request.Routine)
if !found {
Expand All @@ -69,29 +91,46 @@ func (r *RestoreMemory) RestoreByTime(request *model.RestoreTimestampRequest) (i
return jobID, nil
}

func (r *RestoreMemory) restoreByTimeSync(ctx context.Context,
func (r *RestoreMemory) restoreByTimeSync(
ctx context.Context,
backend BackupListReader,
request *model.RestoreTimestampRequest,
jobID int,
fullBackups []model.BackupDetails,
) {
client, err := r.initClient(request.DestinationCuster, jobID)
if err != nil {
return
}
defer client.Close()

var wg sync.WaitGroup

for _, nsBackup := range fullBackups {
if err := r.restoreNamespace(ctx, backend, request, jobID, nsBackup); err != nil {
slog.Error("Failed to restore by timestamp", "routine", request.Routine, "err", err)
r.restoreJobs.setFailed(jobID, err)
return
}
wg.Add(1)
go func(nsBackup model.BackupDetails) {
defer wg.Done()
if err := r.restoreNamespace(ctx, client, backend, request, jobID, nsBackup); err != nil {
slog.Error("Failed to restore by timestamp", "routine", request.Routine, "err", err)
r.restoreJobs.setFailed(jobID, err)
return
}
}(nsBackup)
}

wg.Wait()

r.restoreJobs.setDone(jobID)
}

func (r *RestoreMemory) restoreNamespace(
ctx context.Context,
client *aerospike.Client,
backend BackupListReader,
request *model.RestoreTimestampRequest,
jobID int, fullBackup model.BackupDetails,
) error {
result, err := r.restoreFromPath(ctx, request, fullBackup.Key)
result, err := r.restoreFromPath(ctx, client, request, fullBackup.Key)
if err != nil {
return fmt.Errorf("could not restore full backup for namespace %s: %v", fullBackup.Namespace, err)
}
Expand All @@ -104,7 +143,7 @@ func (r *RestoreMemory) restoreNamespace(
}
slog.Info("Apply incremental backups", "size", len(incrementalBackups))
for _, incrBackup := range incrementalBackups {
result, err := r.restoreFromPath(ctx, request, incrBackup.Key)
result, err := r.restoreFromPath(ctx, client, request, incrBackup.Key)
if err != nil {
return fmt.Errorf("could not restore incremental backup %s: %v", *incrBackup.Key, err)
}
Expand All @@ -115,11 +154,13 @@ func (r *RestoreMemory) restoreNamespace(

func (r *RestoreMemory) restoreFromPath(
ctx context.Context,
client *aerospike.Client,
request *model.RestoreTimestampRequest,
backupPath *string,
) (*model.RestoreResult, error) {
restoreRequest := r.toRestoreRequest(request)
restoreResult, err := r.restoreService.RestoreRun(ctx,
client,
&model.RestoreRequestInternal{
RestoreRequest: *restoreRequest,
Dir: backupPath,
Expand Down Expand Up @@ -242,11 +283,11 @@ func validateStorageContainsBackup(storage *model.Storage) error {
case model.Local:
return validatePathContainsBackup(*storage.Path)
case model.S3:
context, err := NewS3Context(storage)
s3context, err := NewS3Context(storage)
if err != nil {
return err
}
return context.validateStorageContainsBackup()
return s3context.validateStorageContainsBackup()
}
return nil
}
12 changes: 2 additions & 10 deletions pkg/shared/restore_go.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,12 @@ func NewRestoreGo() *RestoreGo {
return &RestoreGo{}
}

// RestoreRun calls the restore_run function from the asrestore shared library.
// RestoreRun calls the restore function from the asbackup library.
//
//nolint:funlen,gocritic
func (r *RestoreGo) RestoreRun(ctx context.Context, restoreRequest *model.RestoreRequestInternal,
func (r *RestoreGo) RestoreRun(ctx context.Context, client *a.Client, restoreRequest *model.RestoreRequestInternal,
) (*model.RestoreResult, error) {
var err error
client, err := a.NewClientWithPolicyAndHost(
restoreRequest.DestinationCuster.ASClientPolicy(),
restoreRequest.DestinationCuster.ASClientHosts()...)
if err != nil {
return nil, fmt.Errorf("failed to connect to aerospike cluster, %w", err)
}
defer client.Close()

backupClient, err := backup.NewClient(client, "1", slog.Default())
if err != nil {
return nil, fmt.Errorf("failed to create backup client, %w", err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/shared/restore_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log/slog"
"time"

"github.com/aerospike/aerospike-client-go/v7"
"github.com/aerospike/backup/pkg/model"
)

Expand All @@ -22,7 +23,7 @@ func NewRestoreMock() *RestoreMock {
}

// RestoreRun mocks the interface method.
func (r *RestoreMock) RestoreRun(_ context.Context, restoreRequest *model.RestoreRequestInternal,
func (r *RestoreMock) RestoreRun(_ context.Context, _ *aerospike.Client, restoreRequest *model.RestoreRequestInternal,
) (*model.RestoreResult, error) {
if restoreRequest.DestinationCuster == nil {
return nil, fmt.Errorf("RestoreRun mock call without DestinationCuster provided, will fail")
Expand Down
3 changes: 2 additions & 1 deletion pkg/shared/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ type Backup interface {

// Restore represents a restore service.
type Restore interface {
RestoreRun(ctx context.Context, restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error)
RestoreRun(ctx context.Context, client *aerospike.Client, restoreRequest *model.RestoreRequestInternal,
) (*model.RestoreResult, error)
}

0 comments on commit 55111c1

Please sign in to comment.