Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APPS-1228 Don't create multiple clients on every backup/restore #199

Merged
27 changes: 11 additions & 16 deletions pkg/service/aerospike_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (

const namespaceInfo = "namespaces"

// getAllNamespacesOfCluster retrieves a list of all namespaces in an Aerospike cluster.
// this function is called maximum once for each routine (on application startup)
// so it's ok to create client here.
func getAllNamespacesOfCluster(cluster *model.AerospikeCluster) ([]string, error) {
client, err := as.NewClientWithPolicyAndHost(cluster.ASClientPolicy(), cluster.ASClientHosts()...)

Expand All @@ -36,15 +39,13 @@ func getAllNamespacesOfCluster(cluster *model.AerospikeCluster) ([]string, error
return strings.Split(namespaces, ";"), nil
}

func getClusterConfiguration(cluster *model.AerospikeCluster) ([]asconfig.DotConf, error) {
activeHosts, err := getActiveHosts(cluster)
if err != nil {
return nil, err
}
func getClusterConfiguration(client *as.Client) []asconfig.DotConf {
activeHosts := getActiveHosts(client)

var outputs = make([]asconfig.DotConf, 0, len(activeHosts))
policy := client.Cluster().ClientPolicy()
for _, host := range activeHosts {
asInfo := info.NewAsInfo(logr.Logger{}, host, cluster.ASClientPolicy())
asInfo := info.NewAsInfo(logr.Logger{}, host, &policy)

conf, err := asconfig.GenerateConf(logr.Discard(), asInfo, true)
if err != nil {
Expand All @@ -61,22 +62,16 @@ func getClusterConfiguration(cluster *model.AerospikeCluster) ([]asconfig.DotCon
outputs = append(outputs, configAsString)
}

return outputs, nil
return outputs
}

func getActiveHosts(cluster *model.AerospikeCluster) ([]*as.Host, error) {
client, err := as.NewClientWithPolicyAndHost(cluster.ASClientPolicy(), cluster.ASClientHosts()...)
if err != nil {
return nil, err
}

defer client.Close()

func getActiveHosts(client *as.Client) []*as.Host {
var activeHosts []*as.Host
for _, node := range client.GetNodes() {
if node.IsActive() {
activeHosts = append(activeHosts, node.GetHost())
}
}
return activeHosts, nil

return activeHosts
}
13 changes: 7 additions & 6 deletions pkg/service/backup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (h *BackupHandler) runFullBackupInternal(ctx context.Context, now time.Time

h.cleanIncrementalBackups()

h.writeClusterConfiguration(now)
h.writeClusterConfiguration(client, now)
return nil
}

Expand Down Expand Up @@ -169,17 +169,18 @@ func (h *BackupHandler) waitForFullBackups(ctx context.Context, backupTimestamp
return nil
}

func (h *BackupHandler) writeClusterConfiguration(now time.Time) {
infos, err := getClusterConfiguration(h.cluster)
if err != nil || len(infos) == 0 {
slog.Warn("Could not read aerospike configuration", "err", err, "name", h.routineName)
func (h *BackupHandler) writeClusterConfiguration(client *aerospike.Client, now time.Time) {
infos := getClusterConfiguration(client)
if len(infos) == 0 {
slog.Warn("Could not read aerospike configuration", "name", h.routineName)
return
}

path := getConfigurationPath(h.backend.fullBackupsPath, h.backupFullPolicy, now)
for i, info := range infos {
confFilePath := fmt.Sprintf("%s/aerospike_%d.conf", path, i)
slog.Debug("Write aerospike configuration", "path", confFilePath)
err = h.backend.write(confFilePath, []byte(info))
err := h.backend.write(confFilePath, []byte(info))
if err != nil {
slog.Error("Failed to write configuration for the backup", "name", h.routineName, "err", err)
}
Expand Down
56 changes: 47 additions & 9 deletions pkg/service/restore_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"log/slog"
"path/filepath"
"sort"
"sync"
"time"

"github.com/aerospike/aerospike-client-go/v7"
"github.com/aerospike/backup/pkg/model"
"github.com/aerospike/backup/pkg/shared"
"github.com/aerospike/backup/pkg/util"
Expand Down Expand Up @@ -39,9 +41,21 @@ func (r *RestoreMemory) Restore(request *model.RestoreRequestInternal) (int, err
if err := validateStorageContainsBackup(request.SourceStorage); err != nil {
return 0, err
}

ctx := context.TODO()
go func() {
restoreResult, err := r.restoreService.RestoreRun(ctx, request)
client, aerr := aerospike.NewClientWithPolicyAndHost(
request.DestinationCuster.ASClientPolicy(),
request.DestinationCuster.ASClientHosts()...)
if aerr != nil {
err := fmt.Errorf("failed to connect to aerospike cluster, %w", aerr)
slog.Error("Failed to restore by timestamp", "cluster", request.RestoreRequest.DestinationCuster, "err", err)
r.restoreJobs.setFailed(jobID, err)
return
}
defer client.Close()

restoreResult, err := r.restoreService.RestoreRun(ctx, client, request)
if err != nil {
r.restoreJobs.setFailed(jobID, fmt.Errorf("failed restore operation: %w", err))
return
Expand Down Expand Up @@ -69,29 +83,51 @@ func (r *RestoreMemory) RestoreByTime(request *model.RestoreTimestampRequest) (i
return jobID, nil
}

func (r *RestoreMemory) restoreByTimeSync(ctx context.Context,
func (r *RestoreMemory) restoreByTimeSync(
ctx context.Context,
backend BackupListReader,
request *model.RestoreTimestampRequest,
jobID int,
fullBackups []model.BackupDetails,
) {
client, aerr := aerospike.NewClientWithPolicyAndHost(
request.DestinationCuster.ASClientPolicy(),
request.DestinationCuster.ASClientHosts()...)
if aerr != nil {
err := fmt.Errorf("failed to connect to aerospike cluster, %w", aerr)
slog.Error("Failed to restore by timestamp", "routine", request.Routine, "err", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log entry is different from the one written in Restore. Is it possible to extract this client initialization block into a method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point.

r.restoreJobs.setFailed(jobID, err)
return
}
defer client.Close()

var wg sync.WaitGroup

for _, nsBackup := range fullBackups {
if err := r.restoreNamespace(ctx, backend, request, jobID, nsBackup); err != nil {
slog.Error("Failed to restore by timestamp", "routine", request.Routine, "err", err)
r.restoreJobs.setFailed(jobID, err)
return
}
wg.Add(1)
go func(nsBackup model.BackupDetails) {
defer wg.Done()
if err := r.restoreNamespace(ctx, client, backend, request, jobID, nsBackup); err != nil {
slog.Error("Failed to restore by timestamp", "routine", request.Routine, "err", err)
r.restoreJobs.setFailed(jobID, err)
return
}
}(nsBackup)
}

wg.Wait()

r.restoreJobs.setDone(jobID)
}

func (r *RestoreMemory) restoreNamespace(
ctx context.Context,
client *aerospike.Client,
backend BackupListReader,
request *model.RestoreTimestampRequest,
jobID int, fullBackup model.BackupDetails,
) error {
result, err := r.restoreFromPath(ctx, request, fullBackup.Key)
result, err := r.restoreFromPath(ctx, client, request, fullBackup.Key)
if err != nil {
return fmt.Errorf("could not restore full backup for namespace %s: %v", fullBackup.Namespace, err)
}
Expand All @@ -104,7 +140,7 @@ func (r *RestoreMemory) restoreNamespace(
}
slog.Info("Apply incremental backups", "size", len(incrementalBackups))
for _, incrBackup := range incrementalBackups {
result, err := r.restoreFromPath(ctx, request, incrBackup.Key)
result, err := r.restoreFromPath(ctx, client, request, incrBackup.Key)
if err != nil {
return fmt.Errorf("could not restore incremental backup %s: %v", *incrBackup.Key, err)
}
Expand All @@ -115,11 +151,13 @@ func (r *RestoreMemory) restoreNamespace(

func (r *RestoreMemory) restoreFromPath(
ctx context.Context,
client *aerospike.Client,
request *model.RestoreTimestampRequest,
backupPath *string,
) (*model.RestoreResult, error) {
restoreRequest := r.toRestoreRequest(request)
restoreResult, err := r.restoreService.RestoreRun(ctx,
client,
&model.RestoreRequestInternal{
RestoreRequest: *restoreRequest,
Dir: backupPath,
Expand Down
12 changes: 2 additions & 10 deletions pkg/shared/restore_go.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,12 @@ func NewRestoreGo() *RestoreGo {
return &RestoreGo{}
}

// RestoreRun calls the restore_run function from the asrestore shared library.
// RestoreRun calls the restore function from the asbackup library.
//
//nolint:funlen,gocritic
func (r *RestoreGo) RestoreRun(ctx context.Context, restoreRequest *model.RestoreRequestInternal,
func (r *RestoreGo) RestoreRun(ctx context.Context, client *a.Client, restoreRequest *model.RestoreRequestInternal,
) (*model.RestoreResult, error) {
var err error
client, err := a.NewClientWithPolicyAndHost(
restoreRequest.DestinationCuster.ASClientPolicy(),
restoreRequest.DestinationCuster.ASClientHosts()...)
if err != nil {
return nil, fmt.Errorf("failed to connect to aerospike cluster, %w", err)
}
defer client.Close()

backupClient, err := backup.NewClient(client, "1", slog.Default())
if err != nil {
return nil, fmt.Errorf("failed to create backup client, %w", err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/shared/restore_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log/slog"
"time"

"github.com/aerospike/aerospike-client-go/v7"
"github.com/aerospike/backup/pkg/model"
)

Expand All @@ -22,7 +23,7 @@ func NewRestoreMock() *RestoreMock {
}

// RestoreRun mocks the interface method.
func (r *RestoreMock) RestoreRun(_ context.Context, restoreRequest *model.RestoreRequestInternal,
func (r *RestoreMock) RestoreRun(_ context.Context, _ *aerospike.Client, restoreRequest *model.RestoreRequestInternal,
) (*model.RestoreResult, error) {
if restoreRequest.DestinationCuster == nil {
return nil, fmt.Errorf("RestoreRun mock call without DestinationCuster provided, will fail")
Expand Down
3 changes: 2 additions & 1 deletion pkg/shared/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ type Backup interface {

// Restore represents a restore service.
type Restore interface {
RestoreRun(ctx context.Context, restoreRequest *model.RestoreRequestInternal) (*model.RestoreResult, error)
RestoreRun(ctx context.Context, client *aerospike.Client, restoreRequest *model.RestoreRequestInternal,
) (*model.RestoreResult, error)
}
Loading