diff --git a/api/v1beta1/aerospikerestore_webhook.go b/api/v1beta1/aerospikerestore_webhook.go index 74a771a2..e47d24d8 100644 --- a/api/v1beta1/aerospikerestore_webhook.go +++ b/api/v1beta1/aerospikerestore_webhook.go @@ -29,6 +29,7 @@ import ( "sigs.k8s.io/yaml" "github.com/abhishekdwivedi3060/aerospike-backup-service/pkg/model" + "github.com/aerospike/aerospike-kubernetes-operator/controllers/common" ) // log is for logging in this package. @@ -94,10 +95,24 @@ func (r *AerospikeRestore) ValidateDelete() (admission.Warnings, error) { } func (r *AerospikeRestore) validateRestoreConfig() error { + restoreConfigInMap := make(map[string]interface{}) + + if err := yaml.Unmarshal(r.Spec.Config.Raw, &restoreConfigInMap); err != nil { + return err + } + switch r.Spec.Type { case Full, Incremental: var restoreRequest model.RestoreRequest + if _, ok := restoreConfigInMap[common.RoutineKey]; ok { + return fmt.Errorf("routine key is not allowed in restore config for restore type %s", r.Spec.Type) + } + + if _, ok := restoreConfigInMap[common.TimeKey]; ok { + return fmt.Errorf("time key is not allowed in restore config for restore type %s", r.Spec.Type) + } + if err := yaml.Unmarshal(r.Spec.Config.Raw, &restoreRequest); err != nil { return err } @@ -107,6 +122,10 @@ func (r *AerospikeRestore) validateRestoreConfig() error { case TimeStamp: var restoreRequest model.RestoreTimestampRequest + if _, ok := restoreConfigInMap[common.SourceKey]; ok { + return fmt.Errorf("source key is not allowed in restore config for restore type %s", r.Spec.Type) + } + if err := yaml.Unmarshal(r.Spec.Config.Raw, &restoreRequest); err != nil { return err } diff --git a/config/samples/aerospikerestore.yaml b/config/samples/aerospikerestore.yaml index a07f2f31..209b2acd 100644 --- a/config/samples/aerospikerestore.yaml +++ b/config/samples/aerospikerestore.yaml @@ -28,6 +28,5 @@ spec: no-generation: true no-indexes: true source: - "path": "/localStorage/test-routine/backup/1719911170537/data/test" - "type": 0 - + "path": "/localStorage/aerospike-aerospikebackup-test-routine/backup/1722326391329/data/test" + "type": local diff --git a/controllers/common/constant.go b/controllers/common/constant.go index 990a1f58..b6ed628a 100644 --- a/controllers/common/constant.go +++ b/controllers/common/constant.go @@ -12,6 +12,13 @@ const ( BackupServiceConfigYAML = "aerospike-backup-service.yml" ) +// Restore config fields +const ( + RoutineKey = "routine" + TimeKey = "time" + SourceKey = "source" +) + const ( HTTPKey = "http" AerospikeBackupService = "aerospike-backup-service" diff --git a/pkg/backup-service/client.go b/pkg/backup-service/client.go index f7509f80..70992b55 100644 --- a/pkg/backup-service/client.go +++ b/pkg/backup-service/client.go @@ -568,7 +568,7 @@ func (c *Client) GetFullBackups() (map[string][]interface{}, error) { return backups, nil } -func (c *Client) GetFullBackupForRoutine(routineName string) ([]interface{}, error) { +func (c *Client) GetFullBackupsForRoutine(routineName string) ([]interface{}, error) { url := c.API(fmt.Sprintf("/backups/full/%s", routineName)) resp, err := http.Get(url) diff --git a/test/backup/backup_suite_test.go b/test/backup/backup_suite_test.go index c7c18147..e53be0e8 100644 --- a/test/backup/backup_suite_test.go +++ b/test/backup/backup_suite_test.go @@ -58,7 +58,10 @@ var _ = BeforeSuite( Expect(err).ToNot(HaveOccurred()) By("Deploy Aerospike Cluster") + cascadeDeleteTrue := true aeroCluster := cluster.CreateDummyAerospikeCluster(aerospikeNsNm, 2) + aeroCluster.Spec.Storage.BlockVolumePolicy.InputCascadeDelete = &cascadeDeleteTrue + aeroCluster.Spec.Storage.FileSystemVolumePolicy.InputCascadeDelete = &cascadeDeleteTrue err = cluster.DeployCluster(k8sClient, testCtx, aeroCluster) Expect(err).ToNot(HaveOccurred()) diff --git a/test/backup/backup_test.go b/test/backup/backup_test.go index ddcba97e..c2b33fad 100644 --- a/test/backup/backup_test.go +++ b/test/backup/backup_test.go @@ -24,20 +24,20 @@ var _ = Describe( ) AfterEach(func() { - Expect(deleteBackup(k8sClient, backup)).ToNot(HaveOccurred()) + Expect(DeleteBackup(k8sClient, backup)).ToNot(HaveOccurred()) }) Context( "When doing Invalid operations", func() { It("Should fail when wrong format backup config is given", func() { - backup, err = newBackup(backupNsNm) + backup, err = NewBackup(backupNsNm) Expect(err).ToNot(HaveOccurred()) badConfig, gErr := getWrongBackupConfBytes(namePrefix(backupNsNm)) Expect(gErr).ToNot(HaveOccurred()) backup.Spec.Config.Raw = badConfig - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).To(HaveOccurred()) }) @@ -48,7 +48,7 @@ var _ = Describe( Expect(mErr).ToNot(HaveOccurred()) backup = newBackupWithConfig(backupNsNm, configBytes) - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("name should start with %s", namePrefix(backupNsNm))) }) @@ -63,12 +63,12 @@ var _ = Describe( Expect(mErr).ToNot(HaveOccurred()) backup = newBackupWithConfig(backupNsNm, configBytes) - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).To(HaveOccurred()) }) It("Should fail when on-demand backup is given at the time of creation", func() { - backup, err = newBackup(backupNsNm) + backup, err = NewBackup(backupNsNm) Expect(err).ToNot(HaveOccurred()) backup.Spec.OnDemandBackups = []asdbv1beta1.OnDemandBackupSpec{ @@ -78,15 +78,15 @@ var _ = Describe( }, } - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).To(HaveOccurred()) }) It("Should fail when non-existing routine is given in on-demand backup", func() { - backup, err = newBackup(backupNsNm) + backup, err = NewBackup(backupNsNm) Expect(err).ToNot(HaveOccurred()) - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).ToNot(HaveOccurred()) backup, err = getBackupObj(k8sClient, backup.Name, backup.Namespace) @@ -104,20 +104,20 @@ var _ = Describe( }) It("Should fail when backup service is not present", func() { - backup, err = newBackup(backupNsNm) + backup, err = NewBackup(backupNsNm) Expect(err).ToNot(HaveOccurred()) backup.Spec.BackupService.Name = "wrong-backup-service" - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).To(HaveOccurred()) }) It("Should fail when backup service reference is updated", func() { - backup, err = newBackup(backupNsNm) + backup, err = NewBackup(backupNsNm) Expect(err).ToNot(HaveOccurred()) - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).ToNot(HaveOccurred()) backup, err = getBackupObj(k8sClient, backup.Name, backup.Namespace) @@ -140,7 +140,7 @@ var _ = Describe( Expect(mErr).ToNot(HaveOccurred()) backup = newBackupWithConfig(backupNsNm, configBytes) - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).To(HaveOccurred()) }) @@ -155,7 +155,7 @@ var _ = Describe( Expect(mErr).ToNot(HaveOccurred()) backup = newBackupWithConfig(backupNsNm, configBytes) - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).To(HaveOccurred()) }) @@ -170,7 +170,7 @@ var _ = Describe( Expect(mErr).ToNot(HaveOccurred()) backup = newBackupWithConfig(backupNsNm, configBytes) - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).To(HaveOccurred()) }) @@ -185,7 +185,7 @@ var _ = Describe( Expect(mErr).ToNot(HaveOccurred()) backup = newBackupWithConfig(backupNsNm, configBytes) - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).To(HaveOccurred()) }) @@ -201,7 +201,7 @@ var _ = Describe( Expect(mErr).ToNot(HaveOccurred()) backup = newBackupWithConfig(backupNsNm, configBytes) - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).To(HaveOccurred()) Expect(err.Error()).To( ContainSubstring("service field cannot be specified in backup config")) @@ -221,7 +221,7 @@ var _ = Describe( Expect(mErr).ToNot(HaveOccurred()) backup = newBackupWithConfig(backupNsNm, configBytes) - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).To(HaveOccurred()) Expect(err.Error()).To( ContainSubstring("backup-policies field cannot be specified in backup config")) @@ -240,7 +240,7 @@ var _ = Describe( Expect(mErr).ToNot(HaveOccurred()) backup = newBackupWithConfig(backupNsNm, configBytes) - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).To(HaveOccurred()) Expect(err.Error()).To( ContainSubstring("storage field cannot be specified in backup config")) @@ -259,20 +259,20 @@ var _ = Describe( Expect(mErr).ToNot(HaveOccurred()) backup = newBackupWithConfig(backupNsNm, configBytes) - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).To(HaveOccurred()) Expect(err.Error()).To( ContainSubstring("secret-agent field cannot be specified in backup config")) }) It("Should fail when aerospike-cluster name is updated", func() { - backup, err = newBackup(backupNsNm) + backup, err = NewBackup(backupNsNm) Expect(err).ToNot(HaveOccurred()) - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).ToNot(HaveOccurred()) - err = validateTriggeredBackup(k8sClient, backupServiceName, backupServiceNamespace, backup) + err = validateTriggeredBackup(k8sClient, backup) Expect(err).ToNot(HaveOccurred()) backup, err = getBackupObj(k8sClient, backup.Name, backup.Namespace) @@ -297,12 +297,12 @@ var _ = Describe( Context("When doing Valid operations", func() { It("Should trigger backup when correct backup config with local storage is given", func() { - backup, err = newBackup(backupNsNm) + backup, err = NewBackup(backupNsNm) Expect(err).ToNot(HaveOccurred()) - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).ToNot(HaveOccurred()) - err = validateTriggeredBackup(k8sClient, backupServiceName, backupServiceNamespace, backup) + err = validateTriggeredBackup(k8sClient, backup) Expect(err).ToNot(HaveOccurred()) }) @@ -320,17 +320,17 @@ var _ = Describe( backup = newBackupWithConfig(backupNsNm, configBytes) - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).ToNot(HaveOccurred()) - err = validateTriggeredBackup(k8sClient, backupServiceName, backupServiceNamespace, backup) + err = validateTriggeredBackup(k8sClient, backup) Expect(err).ToNot(HaveOccurred()) }) It("Should trigger on-demand backup when given", func() { - backup, err = newBackup(backupNsNm) + backup, err = NewBackup(backupNsNm) Expect(err).ToNot(HaveOccurred()) - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).ToNot(HaveOccurred()) backup, err = getBackupObj(k8sClient, backup.Name, backup.Namespace) @@ -346,7 +346,7 @@ var _ = Describe( err = updateBackup(k8sClient, backup) Expect(err).ToNot(HaveOccurred()) - err = validateTriggeredBackup(k8sClient, backupServiceName, backupServiceNamespace, backup) + err = validateTriggeredBackup(k8sClient, backup) Expect(err).ToNot(HaveOccurred()) }) @@ -368,10 +368,10 @@ var _ = Describe( Expect(err).ToNot(HaveOccurred()) backup = newBackupWithConfig(backupNsNm, configBytes) - err = deployBackup(k8sClient, backup) + err = CreateBackup(k8sClient, backup) Expect(err).ToNot(HaveOccurred()) - err = validateTriggeredBackup(k8sClient, backupServiceName, backupServiceNamespace, backup) + err = validateTriggeredBackup(k8sClient, backup) Expect(err).ToNot(HaveOccurred()) backup, err = getBackupObj(k8sClient, backup.Name, backup.Namespace) @@ -388,7 +388,7 @@ var _ = Describe( err = updateBackup(k8sClient, backup) Expect(err).ToNot(HaveOccurred()) - err = validateTriggeredBackup(k8sClient, backupServiceName, backupServiceNamespace, backup) + err = validateTriggeredBackup(k8sClient, backup) Expect(err).ToNot(HaveOccurred()) }) diff --git a/test/backup/test_utils.go b/test/backup/test_utils.go index 73316560..79eead9c 100644 --- a/test/backup/test_utils.go +++ b/test/backup/test_utils.go @@ -4,8 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io" - "net/http" "reflect" "time" @@ -22,6 +20,7 @@ import ( "github.com/abhishekdwivedi3060/aerospike-backup-service/pkg/model" asdbv1beta1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1beta1" "github.com/aerospike/aerospike-kubernetes-operator/controllers/common" + backup_service "github.com/aerospike/aerospike-kubernetes-operator/pkg/backup-service" ) const ( @@ -41,7 +40,7 @@ var aerospikeNsNm = types.NamespacedName{ Namespace: namespace, } -func newBackup(backupNsNm types.NamespacedName) (*asdbv1beta1.AerospikeBackup, error) { +func NewBackup(backupNsNm types.NamespacedName) (*asdbv1beta1.AerospikeBackup, error) { configBytes, err := getBackupConfBytes(namePrefix(backupNsNm)) if err != nil { return nil, err @@ -153,7 +152,7 @@ func getBackupObj(cl client.Client, name, namespace string) (*asdbv1beta1.Aerosp return &backup, nil } -func deployBackup(cl client.Client, backup *asdbv1beta1.AerospikeBackup) error { +func CreateBackup(cl client.Client, backup *asdbv1beta1.AerospikeBackup) error { if err := cl.Create(testCtx, backup); err != nil { return err } @@ -169,7 +168,7 @@ func updateBackup(cl client.Client, backup *asdbv1beta1.AerospikeBackup) error { return waitForBackup(cl, backup, timeout) } -func deleteBackup(cl client.Client, backup *asdbv1beta1.AerospikeBackup) error { +func DeleteBackup(cl client.Client, backup *asdbv1beta1.AerospikeBackup) error { if err := cl.Delete(testCtx, backup); err != nil && !k8serrors.IsNotFound(err) { return err } @@ -219,30 +218,30 @@ func waitForBackup(cl client.Client, backup *asdbv1beta1.AerospikeBackup, } // validateTriggeredBackup validates if the backup is triggered by checking the current config of backup-service -func validateTriggeredBackup(k8sClient client.Client, backupServiceName, backupServiceNamespace string, - backup *asdbv1beta1.AerospikeBackup) error { +func validateTriggeredBackup(k8sClient client.Client, backup *asdbv1beta1.AerospikeBackup) error { var backupK8sService corev1.Service - validateNewEntries := func(current *model.Config, desiredConfigMap map[string]interface{}, fieldPath string) error { - newCluster := desiredConfigMap[common.AerospikeClusterKey].(map[string]interface{}) + validateNewEntries := func(currentConfigInMap map[string]interface{}, desiredConfigInMap map[string]interface{}, + fieldPath string) error { + newCluster := desiredConfigInMap[common.AerospikeClusterKey].(map[string]interface{}) for clusterName := range newCluster { - if _, ok := current.AerospikeClusters[clusterName]; !ok { + if _, ok := currentConfigInMap[common.AerospikeClustersKey].(map[string]interface{})[clusterName]; !ok { return fmt.Errorf("cluster %s not found in %s backup config", clusterName, fieldPath) } } pkgLog.Info(fmt.Sprintf("Cluster info is found in %s backup config", fieldPath)) - routines := desiredConfigMap[common.BackupRoutinesKey].(map[string]interface{}) + routines := desiredConfigInMap[common.BackupRoutinesKey].(map[string]interface{}) for routineName := range routines { - if _, ok := current.BackupRoutines[routineName]; !ok { + if _, ok := currentConfigInMap[common.BackupRoutinesKey].(map[string]interface{})[routineName]; !ok { return fmt.Errorf("routine %s not found in %s backup config", routineName, fieldPath) } } - if len(routines) != len(current.BackupRoutines) { + if len(routines) != len(currentConfigInMap[common.BackupRoutinesKey].(map[string]interface{})) { return fmt.Errorf("backup routine count mismatch in %s backup config", fieldPath) } @@ -254,24 +253,27 @@ func validateTriggeredBackup(k8sClient client.Client, backupServiceName, backupS // Validate from backup service configmap var configmap corev1.ConfigMap if err := k8sClient.Get(testCtx, - types.NamespacedName{Name: backupServiceName, Namespace: backupServiceNamespace}, &configmap, + types.NamespacedName{ + Name: backup.Spec.BackupService.Name, + Namespace: backup.Spec.BackupService.Namespace, + }, &configmap, ); err != nil { return err } - var config model.Config + backupSvcConfig := make(map[string]interface{}) - if err := yaml.Unmarshal([]byte(configmap.Data[common.BackupServiceConfigYAML]), &config); err != nil { + if err := yaml.Unmarshal([]byte(configmap.Data[common.BackupServiceConfigYAML]), &backupSvcConfig); err != nil { return err } - desiredConfigMap := make(map[string]interface{}) + desiredConfigInMap := make(map[string]interface{}) - if err := yaml.Unmarshal(backup.Spec.Config.Raw, &desiredConfigMap); err != nil { + if err := yaml.Unmarshal(backup.Spec.Config.Raw, &desiredConfigInMap); err != nil { return err } - if err := validateNewEntries(&config, desiredConfigMap, "configMap"); err != nil { + if err := validateNewEntries(backupSvcConfig, desiredConfigInMap, "configMap"); err != nil { return err } @@ -279,7 +281,10 @@ func validateTriggeredBackup(k8sClient client.Client, backupServiceName, backupS if err := wait.PollUntilContextTimeout(testCtx, interval, timeout, true, func(ctx context.Context) (bool, error) { if err := k8sClient.Get(testCtx, - types.NamespacedName{Name: backupServiceName, Namespace: backupServiceNamespace}, + types.NamespacedName{ + Name: backup.Spec.BackupService.Name, + Namespace: backup.Spec.BackupService.Namespace, + }, &backupK8sService); err != nil { return false, err } @@ -293,42 +298,95 @@ func validateTriggeredBackup(k8sClient client.Client, backupServiceName, backupS return err } - var body []byte + serviceClient := backup_service.Client{ + Address: backupK8sService.Status.LoadBalancer.Ingress[0].IP, + Port: 8081, + } // Wait for Backup service to be ready if err := wait.PollUntilContextTimeout(testCtx, interval, timeout, true, func(ctx context.Context) (bool, error) { - resp, err := http.Get("http://" + backupK8sService.Status.LoadBalancer.Ingress[0].IP + ":8081/v1/config") + config, err := serviceClient.GetBackupServiceConfig() if err != nil { pkgLog.Error(err, "Failed to get backup service config") return false, nil } - defer resp.Body.Close() + backupSvcConfig = config + return true, nil + }); err != nil { + return err + } - if resp.StatusCode != http.StatusOK { - return false, fmt.Errorf("backup service config fetch failed with status code: %d", - resp.StatusCode) - } + return validateNewEntries(backupSvcConfig, desiredConfigInMap, "backup-service API") +} - // Validate the config - body, err = io.ReadAll(resp.Body) - if err != nil { +func namePrefix(nsNm types.NamespacedName) string { + return nsNm.Namespace + "-" + nsNm.Name +} + +func GetBackupDataPaths(k8sClient client.Client, backup *asdbv1beta1.AerospikeBackup) ([]string, error) { + var backupK8sService corev1.Service + + // Wait for Service LB IP to be populated + if err := wait.PollUntilContextTimeout(testCtx, interval, timeout, true, + func(ctx context.Context) (bool, error) { + if err := k8sClient.Get(testCtx, + types.NamespacedName{ + Name: backup.Spec.BackupService.Name, + Namespace: backup.Spec.BackupService.Namespace, + }, + &backupK8sService, + ); err != nil { return false, err } + if backupK8sService.Status.LoadBalancer.Ingress == nil { + return false, nil + } + return true, nil }); err != nil { - return err + return nil, err } - if err := yaml.Unmarshal(body, &config); err != nil { - return err + var ( + config model.Config + backupDataPaths []string + ) + + if err := yaml.Unmarshal(backup.Spec.Config.Raw, &config); err != nil { + return backupDataPaths, err } - return validateNewEntries(&config, desiredConfigMap, "backup-service API") -} + serviceClient := backup_service.Client{ + Address: backupK8sService.Status.LoadBalancer.Ingress[0].IP, + Port: 8081, + } -func namePrefix(nsNm types.NamespacedName) string { - return nsNm.Namespace + "-" + nsNm.Name + if err := wait.PollUntilContextTimeout(testCtx, interval, timeout, true, + func(ctx context.Context) (bool, error) { + for routineName := range config.BackupRoutines { + backups, err := serviceClient.GetFullBackupsForRoutine(routineName) + if err != nil { + return false, nil + } + + if len(backups) == 0 { + pkgLog.Info("No backups found for routine", "name", routineName) + return false, nil + } + + for idx := range backups { + backupMeta := backups[idx].(map[string]interface{}) + backupDataPaths = append(backupDataPaths, backupMeta["key"].(string)) + } + } + + return true, nil + }); err != nil { + return backupDataPaths, err + } + + return backupDataPaths, nil } diff --git a/test/cluster/cluster_helper.go b/test/cluster/cluster_helper.go index e730d320..8911c075 100644 --- a/test/cluster/cluster_helper.go +++ b/test/cluster/cluster_helper.go @@ -737,7 +737,6 @@ func deleteCluster( // Wait for all removed PVCs to be terminated. for { newPVCList, err := getAeroClusterPVCList(aeroCluster, k8sClient) - if err != nil { return fmt.Errorf("error getting PVCs: %v", err) } @@ -1521,7 +1520,6 @@ func getBasicStorageSpecObject() asdbv1.AerospikeStorageSpec { storage := asdbv1.AerospikeStorageSpec{ BlockVolumePolicy: asdbv1.AerospikePersistentVolumePolicySpec{ InputCascadeDelete: &cascadeDeleteFalse, - CascadeDelete: cascadeDeleteTrue, }, FileSystemVolumePolicy: asdbv1.AerospikePersistentVolumePolicySpec{ InputInitMethod: &aerospikeVolumeInitMethodDeleteFiles, diff --git a/test/restore/restore_suite_test.go b/test/restore/restore_suite_test.go new file mode 100644 index 00000000..e6b52cca --- /dev/null +++ b/test/restore/restore_suite_test.go @@ -0,0 +1,151 @@ +package restore + +import ( + "fmt" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/gexec" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8Runtime "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" + asdbv1beta1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1beta1" + "github.com/aerospike/aerospike-kubernetes-operator/test" + "github.com/aerospike/aerospike-kubernetes-operator/test/backup" + backupservice "github.com/aerospike/aerospike-kubernetes-operator/test/backup_service" + "github.com/aerospike/aerospike-kubernetes-operator/test/cluster" +) + +var testEnv *envtest.Environment + +var k8sClient client.Client + +var scheme = k8Runtime.NewScheme() + +func TestRestore(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Restore Suite") +} + +var _ = BeforeSuite( + func() { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + By("Bootstrapping test environment") + var err error + + testEnv, _, k8sClient, _, err = test.BootStrapTestEnv(scheme) + Expect(err).NotTo(HaveOccurred()) + + By("Deploy Backup Service") + backupService, err := backupservice.NewBackupService() + Expect(err).ToNot(HaveOccurred()) + + backupService.Spec.Service = &asdbv1beta1.Service{ + Type: corev1.ServiceTypeLoadBalancer, + } + + backupServiceName = backupService.Name + backupServiceNamespace = backupService.Namespace + + err = backupservice.DeployBackupService(k8sClient, backupService) + Expect(err).ToNot(HaveOccurred()) + + cascadeDeleteTrue := true + + By(fmt.Sprintf("Deploy source Aerospike Cluster: %s", sourceAerospikeClusterNsNm.String())) + aeroCluster := cluster.CreateDummyAerospikeCluster(sourceAerospikeClusterNsNm, 2) + aeroCluster.Spec.Storage.BlockVolumePolicy.InputCascadeDelete = &cascadeDeleteTrue + aeroCluster.Spec.Storage.FileSystemVolumePolicy.InputCascadeDelete = &cascadeDeleteTrue + + err = cluster.DeployCluster(k8sClient, testCtx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + backupObj, err := backup.NewBackup(backupNsNm) + Expect(err).ToNot(HaveOccurred()) + + // Point to current suite's backup service + backupObj.Spec.BackupService.Name = backupServiceName + backupObj.Spec.BackupService.Namespace = backupServiceNamespace + + err = backup.CreateBackup(k8sClient, backupObj) + Expect(err).ToNot(HaveOccurred()) + + backupDataPaths, err := backup.GetBackupDataPaths(k8sClient, backupObj) + Expect(err).ToNot(HaveOccurred()) + + pkgLog.Info(fmt.Sprintf("BackupDataPaths: %v", backupDataPaths)) + Expect(backupDataPaths).ToNot(BeEmpty()) + + // Example backupDataPath = "/localStorage/test-sample-backup-test-routine/backup/1722353745635/data/test" + backupDataPath = backupDataPaths[0] + + By(fmt.Sprintf("Deploy destination Aerospike Cluster: %s", destinationAerospikeClusterNsNm.String())) + aeroCluster = cluster.CreateDummyAerospikeCluster(destinationAerospikeClusterNsNm, 2) + aeroCluster.Spec.Storage.BlockVolumePolicy.InputCascadeDelete = &cascadeDeleteTrue + aeroCluster.Spec.Storage.FileSystemVolumePolicy.InputCascadeDelete = &cascadeDeleteTrue + + err = cluster.DeployCluster(k8sClient, testCtx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }) + +var _ = AfterSuite( + func() { + By("Delete Aerospike Cluster") + aeroClusters := []asdbv1.AerospikeCluster{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: sourceAerospikeClusterNsNm.Name, + Namespace: sourceAerospikeClusterNsNm.Namespace, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: destinationAerospikeClusterNsNm.Name, + Namespace: destinationAerospikeClusterNsNm.Namespace, + }, + }, + } + + for idx := range aeroClusters { + aeroCluster := aeroClusters[idx] + err := cluster.DeleteCluster(k8sClient, testCtx, &aeroCluster) + Expect(err).ToNot(HaveOccurred()) + } + + By("Delete Backup") + backupObj := asdbv1beta1.AerospikeBackup{ + ObjectMeta: metav1.ObjectMeta{ + Name: backupNsNm.Name, + Namespace: backupNsNm.Namespace, + }, + } + + err := backup.DeleteBackup(k8sClient, &backupObj) + Expect(err).ToNot(HaveOccurred()) + + By("Delete Backup Service") + backupService := asdbv1beta1.AerospikeBackupService{ + ObjectMeta: metav1.ObjectMeta{ + Name: backupServiceName, + Namespace: backupServiceNamespace, + }, + } + + err = backupservice.DeleteBackupService(k8sClient, &backupService) + Expect(err).ToNot(HaveOccurred()) + + By("tearing down the test environment") + gexec.KillAndWait(5 * time.Second) + err = testEnv.Stop() + Expect(err).ToNot(HaveOccurred()) + }, +) diff --git a/test/restore/restore_test.go b/test/restore/restore_test.go new file mode 100644 index 00000000..cacbac9d --- /dev/null +++ b/test/restore/restore_test.go @@ -0,0 +1,178 @@ +package restore + +import ( + "encoding/json" + "strconv" + "strings" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/types" + + asdbv1beta1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1beta1" + "github.com/aerospike/aerospike-kubernetes-operator/controllers/common" +) + +var _ = Describe( + "Restore Test", func() { + + var ( + restore *asdbv1beta1.AerospikeRestore + err error + restoreNsNm = types.NamespacedName{ + Namespace: namespace, + Name: "sample-restore", + } + ) + + AfterEach(func() { + Expect(deleteRestore(k8sClient, restore)).ToNot(HaveOccurred()) + }) + + Context( + "When doing Invalid operations", func() { + It("Should fail when wrong format restore config is given", func() { + config := getRestoreConfigInMap(backupDataPath) + + // change the format from a single element to slice + config["destination"] = []interface{}{config["destination"]} + + configBytes, mErr := json.Marshal(config) + Expect(mErr).ToNot(HaveOccurred()) + + restore = newRestoreWithConfig(restoreNsNm, asdbv1beta1.Full, configBytes) + err = createRestore(k8sClient, restore) + Expect(err).To(HaveOccurred()) + }) + + It("Should fail when spec is updated", func() { + restore, err = newRestore(restoreNsNm, asdbv1beta1.Full) + Expect(err).ToNot(HaveOccurred()) + + err = createRestore(k8sClient, restore) + Expect(err).ToNot(HaveOccurred()) + + restore, err = getRestoreObj(k8sClient, restoreNsNm) + Expect(err).ToNot(HaveOccurred()) + + restore.Spec.Type = asdbv1beta1.Incremental + + err = k8sClient.Update(testCtx, restore) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("AerospikeRestore Spec is immutable")) + }) + + It("Should fail restore when wrong backup path is given", func() { + config := getRestoreConfigInMap("wrong-backup-path") + + configBytes, mErr := json.Marshal(config) + Expect(mErr).ToNot(HaveOccurred()) + + restore = newRestoreWithConfig(restoreNsNm, asdbv1beta1.Full, configBytes) + + err = createRestoreWithTO(k8sClient, restore, 30*time.Second) + Expect(err).To(HaveOccurred()) + }) + + It("Should fail when routine/time is not given for TimeStamp restore type", func() { + // getRestoreConfigInMap returns restore config without a routine, time and with source type + restoreConfig := getRestoreConfigInMap(backupDataPath) + delete(restoreConfig, common.SourceKey) + + configBytes, mErr := json.Marshal(restoreConfig) + Expect(mErr).ToNot(HaveOccurred()) + + restore = newRestoreWithConfig(restoreNsNm, asdbv1beta1.TimeStamp, configBytes) + + err = createRestore(k8sClient, restore) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("restore point in time should be positive")) + }) + + It("Should fail when source is given for TimeStamp restore type", func() { + restore, err = newRestore(restoreNsNm, asdbv1beta1.TimeStamp) + Expect(err).ToNot(HaveOccurred()) + + err = createRestore(k8sClient, restore) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("source key is not allowed in restore config")) + }) + + It("Should fail when routine key is given for Full/Incremental restore type", func() { + restoreConfig := getRestoreConfigInMap(backupDataPath) + restoreConfig[common.RoutineKey] = "test-routine" + + configBytes, mErr := json.Marshal(restoreConfig) + Expect(mErr).ToNot(HaveOccurred()) + + restore = newRestoreWithConfig(restoreNsNm, asdbv1beta1.Full, configBytes) + + err = createRestore(k8sClient, restore) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("routine key is not allowed in restore config")) + }) + + It("Should fail when time key is given for Full/Incremental restore type", func() { + restoreConfig := getRestoreConfigInMap(backupDataPath) + restoreConfig[common.TimeKey] = 1722408895094 + + configBytes, mErr := json.Marshal(restoreConfig) + Expect(mErr).ToNot(HaveOccurred()) + + restore = newRestoreWithConfig(restoreNsNm, asdbv1beta1.Full, configBytes) + + err = createRestore(k8sClient, restore) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("time key is not allowed in restore config")) + }) + }) + + Context( + "When doing valid operations", func() { + It( + "Should complete restore for Full restore type", func() { + restore, err = newRestore(restoreNsNm, asdbv1beta1.Full) + Expect(err).ToNot(HaveOccurred()) + + err = createRestore(k8sClient, restore) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + It( + "Should complete restore for Incremental restore type", func() { + restore, err = newRestore(restoreNsNm, asdbv1beta1.Incremental) + Expect(err).ToNot(HaveOccurred()) + + err = createRestore(k8sClient, restore) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + It( + "Should complete restore for TimeStamp restore type", func() { + restoreConfig := getRestoreConfigInMap(backupDataPath) + delete(restoreConfig, common.SourceKey) + + parts := strings.Split(backupDataPath, "/") + + time := parts[len(parts)-3] + timeInt, err := strconv.Atoi(time) + Expect(err).ToNot(HaveOccurred()) + + // increase time by 1 millisecond to consider the latest backup under time bound + restoreConfig[common.TimeKey] = int64(timeInt) + 1 + restoreConfig[common.RoutineKey] = parts[len(parts)-5] + + configBytes, err := json.Marshal(restoreConfig) + Expect(err).ToNot(HaveOccurred()) + + restore = newRestoreWithConfig(restoreNsNm, asdbv1beta1.TimeStamp, configBytes) + + err = createRestore(k8sClient, restore) + Expect(err).ToNot(HaveOccurred()) + }, + ) + }) + }) diff --git a/test/restore/test_utils.go b/test/restore/test_utils.go new file mode 100644 index 00000000..0fe94ed5 --- /dev/null +++ b/test/restore/test_utils.go @@ -0,0 +1,236 @@ +package restore + +import ( + "context" + "encoding/json" + "fmt" + "time" + + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/abhishekdwivedi3060/aerospike-backup-service/pkg/model" + asdbv1beta1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1beta1" +) + +const ( + timeout = 2 * time.Minute + interval = 2 * time.Second + namespace = "test" +) + +var testCtx = context.TODO() + +var backupServiceName, backupServiceNamespace string + +var backupDataPath string + +var pkgLog = ctrl.Log.WithName("restore") + +var backupNsNm = types.NamespacedName{ + Name: "sample-backup", + Namespace: namespace, +} + +var sourceAerospikeClusterNsNm = types.NamespacedName{ + Name: "aerocluster", + Namespace: namespace, +} + +var destinationAerospikeClusterNsNm = types.NamespacedName{ + Name: "destination-aerocluster", + Namespace: namespace, +} + +func newRestore(restoreNsNm types.NamespacedName, restoreType asdbv1beta1.RestoreType, +) (*asdbv1beta1.AerospikeRestore, error) { + configBytes, err := getRestoreConfBytes(backupDataPath) + if err != nil { + return nil, err + } + + restore := newRestoreWithEmptyConfig(restoreNsNm, restoreType) + + restore.Spec.Config = runtime.RawExtension{ + Raw: configBytes, + } + + return restore, nil +} + +func newRestoreWithConfig(restoreNsNm types.NamespacedName, restoreType asdbv1beta1.RestoreType, configBytes []byte, +) *asdbv1beta1.AerospikeRestore { + restore := newRestoreWithEmptyConfig(restoreNsNm, restoreType) + + restore.Spec.Config = runtime.RawExtension{ + Raw: configBytes, + } + + return restore +} + +func newRestoreWithEmptyConfig(restoreNsNm types.NamespacedName, restoreType asdbv1beta1.RestoreType, +) *asdbv1beta1.AerospikeRestore { + return &asdbv1beta1.AerospikeRestore{ + ObjectMeta: metav1.ObjectMeta{ + Name: restoreNsNm.Name, + Namespace: restoreNsNm.Namespace, + }, + Spec: asdbv1beta1.AerospikeRestoreSpec{ + BackupService: asdbv1beta1.BackupService{ + Name: backupServiceName, + Namespace: backupServiceNamespace, + }, + Type: restoreType, + }, + } +} + +func getRestoreObj(cl client.Client, restoreNsNm types.NamespacedName) (*asdbv1beta1.AerospikeRestore, error) { + var restore asdbv1beta1.AerospikeRestore + + if err := cl.Get(testCtx, restoreNsNm, &restore); err != nil { + return nil, err + } + + return &restore, nil +} + +func createRestore(cl client.Client, restore *asdbv1beta1.AerospikeRestore) error { + if err := cl.Create(testCtx, restore); err != nil { + return err + } + + return waitForRestore(cl, restore, timeout) +} + +func createRestoreWithTO(cl client.Client, restore *asdbv1beta1.AerospikeRestore, timeout time.Duration) error { + if err := cl.Create(testCtx, restore); err != nil { + return err + } + + return waitForRestore(cl, restore, timeout) +} + +func deleteRestore(cl client.Client, restore *asdbv1beta1.AerospikeRestore) error { + if err := cl.Delete(testCtx, restore); err != nil && !k8serrors.IsNotFound(err) { + return err + } + + for { + _, err := getRestoreObj(cl, types.NamespacedName{ + Namespace: restore.Namespace, + Name: restore.Name, + }) + + if err != nil { + if k8serrors.IsNotFound(err) { + break + } + + return err + } + + time.Sleep(1 * time.Second) + } + + return nil +} + +func waitForRestore(cl client.Client, restore *asdbv1beta1.AerospikeRestore, + timeout time.Duration) error { + namespaceName := types.NamespacedName{ + Name: restore.Name, Namespace: restore.Namespace, + } + + if err := wait.PollUntilContextTimeout( + testCtx, 1*time.Second, + timeout, true, func(ctx context.Context) (bool, error) { + if err := cl.Get(ctx, namespaceName, restore); err != nil { + return false, nil + } + + if restore.Status.Phase != asdbv1beta1.AerospikeRestoreCompleted { + pkgLog.Info(fmt.Sprintf("Restore is in %s phase", restore.Status.Phase)) + return false, nil + } + + return true, nil + }, + ); err != nil { + return err + } + + pkgLog.Info(fmt.Sprintf("Restore is in %s phase", restore.Status.Phase)) + + if restore.Status.JobID == nil { + return fmt.Errorf("restore job id is not set") + } + + if restore.Status.RestoreResult.Raw == nil { + return fmt.Errorf("restore result is not set") + } + + var restoreResult model.RestoreJobStatus + + if err := json.Unmarshal(restore.Status.RestoreResult.Raw, &restoreResult); err != nil { + return err + } + + if restoreResult.Status != model.JobStatusDone { + return fmt.Errorf("restore job status is not done") + } + + if restoreResult.Error != "" { + return fmt.Errorf("restore job failed with error: %s", restoreResult.Error) + } + + return nil +} + +func getRestoreConfBytes(backupPath string) ([]byte, error) { + restoreConfig := getRestoreConfigInMap(backupPath) + + configBytes, err := json.Marshal(restoreConfig) + if err != nil { + return nil, err + } + + pkgLog.Info(string(configBytes)) + + return configBytes, nil +} + +func getRestoreConfigInMap(backupPath string) map[string]interface{} { + return map[string]interface{}{ + "destination": map[string]interface{}{ + "label": "destinationCluster", + "credentials": map[string]interface{}{ + "password": "admin123", + "user": "admin", + }, + "seed-nodes": []map[string]interface{}{ + { + "host-name": fmt.Sprintf("%s.%s.svc.cluster.local", + destinationAerospikeClusterNsNm.Name, destinationAerospikeClusterNsNm.Namespace, + ), + "port": 3000, + }, + }, + }, + "policy": map[string]interface{}{ + "parallel": 3, + "no-generation": true, + "no-indexes": true, + }, + "source": map[string]interface{}{ + "path": backupPath, + "type": "local", + }, + } +}