From 46fb50ed99d598dd80351b2dbbe3472cf7853dbd Mon Sep 17 00:00:00 2001 From: Geet Date: Tue, 13 Aug 2024 07:15:51 +0000 Subject: [PATCH 1/2] AKS support for asyncdr --- pkg/asyncdr/asyncdr.go | 177 ++++++++++++++++++++++++++++------- tests/basic/async_dr_test.go | 92 ++++++++++++++++-- 2 files changed, 229 insertions(+), 40 deletions(-) diff --git a/pkg/asyncdr/asyncdr.go b/pkg/asyncdr/asyncdr.go index 320c7122a..ecff0551b 100644 --- a/pkg/asyncdr/asyncdr.go +++ b/pkg/asyncdr/asyncdr.go @@ -2,6 +2,7 @@ package asyncdr import ( "encoding/json" + "flag" "fmt" "io/ioutil" "os" @@ -10,11 +11,15 @@ import ( "time" crdv1 "github.com/kubernetes-incubator/external-storage/snapshot/pkg/apis/crd/v1" + oputils "github.com/libopenstorage/operator/drivers/storage/portworx/util" + opcorev1 "github.com/libopenstorage/operator/pkg/apis/core/v1" + storkdriver "github.com/libopenstorage/stork/drivers/volume" storkapi "github.com/libopenstorage/stork/pkg/apis/stork/v1alpha1" "github.com/libopenstorage/stork/pkg/k8sutils" "github.com/portworx/sched-ops/k8s/apiextensions" "github.com/portworx/sched-ops/k8s/apps" "github.com/portworx/sched-ops/k8s/core" + "github.com/portworx/sched-ops/k8s/operator" "github.com/portworx/sched-ops/k8s/storage" storkops "github.com/portworx/sched-ops/k8s/stork" "github.com/portworx/sched-ops/task" @@ -58,9 +63,9 @@ const ( tempDir = "/tmp" portworxProvisioner = "kubernetes.io/portworx-volume" DefaultScName = "async-sc" - FirstCluster = 0 - SecondCluster = 1 - ThirdCluster = 2 + FirstCluster = 0 + SecondCluster = 1 + ThirdCluster = 2 ) var ( @@ -287,7 +292,7 @@ func CreateSnapshotSchedule( pvcName string, scheduleName string, policyName string, - snapshotType string,) (*storkapi.VolumeSnapshotSchedule, error) { + snapshotType string) (*storkapi.VolumeSnapshotSchedule, error) { snapSched := &storkapi.VolumeSnapshotSchedule{ ObjectMeta: meta_v1.ObjectMeta{ @@ -320,7 +325,7 @@ func CreateSchedulePolicyWithRetain(policyName string, interval int, retain stor Policy: storkapi.SchedulePolicyItem{ Interval: &storkapi.IntervalPolicy{ IntervalMinutes: interval, - Retain: retain, + Retain: retain, }, }} schedPolicy, err = storkops.Instance().CreateSchedulePolicy(schedPolicy) @@ -338,12 +343,12 @@ func ValidateSnapshotScheduleCount(pvcs []string, schedNs, schedPol, snapshotTyp return fmt.Errorf("Error creating snapshot schedule: %v", err) } log.InfoD("SnapSchedule is %v", snapSchedule) - time.Sleep(30*time.Second) + time.Sleep(30 * time.Second) err = WaitForRetainSnapshotsSuccessful(scheduleName, schedNs, int(retain), snapInterval) if err != nil { return fmt.Errorf("Error waiting for retain snapshots: %v", err) } - time.Sleep(30*time.Second) + time.Sleep(30 * time.Second) schedule, err := storkops.Instance().GetSnapshotSchedule(scheduleName, schedNs) if err != nil { return fmt.Errorf("Failed to get snapshot schedule") @@ -358,39 +363,39 @@ func ValidateSnapshotScheduleCount(pvcs []string, schedNs, schedPol, snapshotTyp // WaitForRetainSnapshotsSuccessful waits for a certain number of snapshots to complete. func WaitForRetainSnapshotsSuccessful(snapSchedName string, schedNs string, retain int, snapInterval int) error { - for i := 0; i < retain + 1; i++ { - checkSuccessfulSnapshots := func() (interface{}, bool, error) { - snapSchedule, err := storkops.Instance().GetSnapshotSchedule(snapSchedName, schedNs) - if err != nil { - return nil, true, fmt.Errorf("failed to get snapshot schedule: %v", snapSchedName) - } + for i := 0; i < retain+1; i++ { + checkSuccessfulSnapshots := func() (interface{}, bool, error) { + snapSchedule, err := storkops.Instance().GetSnapshotSchedule(snapSchedName, schedNs) + if err != nil { + return nil, true, fmt.Errorf("failed to get snapshot schedule: %v", snapSchedName) + } currentIndex := i - if currentIndex >= len(snapSchedule.Status.Items["Interval"]) { - currentIndex = len(snapSchedule.Status.Items["Interval"]) - 1 - } - if snapSchedule.Status.Items["Interval"][currentIndex].Status != crdv1.VolumeSnapshotConditionReady { - return nil, true, fmt.Errorf("snapshot %v failed with status: %v", snapSchedule.Status.Items["Interval"][currentIndex].Name, snapSchedule.Status.Items["Interval"][currentIndex].Status) - } - return nil, false, nil - } - - snapStartTime := time.Now() - _, err := task.DoRetryWithTimeout(checkSuccessfulSnapshots, time.Minute*time.Duration(snapInterval*2), time.Second*10) - if err != nil { - return err - } - snapEndTime := time.Now() - snapTimeTaken := snapEndTime.Sub(snapStartTime) - snapIntervalMins := time.Minute * time.Duration(snapInterval) + if currentIndex >= len(snapSchedule.Status.Items["Interval"]) { + currentIndex = len(snapSchedule.Status.Items["Interval"]) - 1 + } + if snapSchedule.Status.Items["Interval"][currentIndex].Status != crdv1.VolumeSnapshotConditionReady { + return nil, true, fmt.Errorf("snapshot %v failed with status: %v", snapSchedule.Status.Items["Interval"][currentIndex].Name, snapSchedule.Status.Items["Interval"][currentIndex].Status) + } + return nil, false, nil + } + + snapStartTime := time.Now() + _, err := task.DoRetryWithTimeout(checkSuccessfulSnapshots, time.Minute*time.Duration(snapInterval*2), time.Second*10) + if err != nil { + return err + } + snapEndTime := time.Now() + snapTimeTaken := snapEndTime.Sub(snapStartTime) + snapIntervalMins := time.Minute * time.Duration(snapInterval) if i == retain { break } - log.Infof("Waiting for next snapshot interval to start. Time pending to start next snap trigger: %v", snapIntervalMins-snapTimeTaken) - time.Sleep(snapIntervalMins - snapTimeTaken + 10*time.Second) - } - return nil + log.Infof("Waiting for next snapshot interval to start. Time pending to start next snap trigger: %v", snapIntervalMins-snapTimeTaken) + time.Sleep(snapIntervalMins - snapTimeTaken + 10*time.Second) + } + return nil } func CreateSchedulePolicy(policyName string, interval int) (pol *storkapi.SchedulePolicy, err error) { @@ -756,3 +761,107 @@ func CheckDefaultPxStorageClass(name string) error { } return nil } + +func ChangePxServiceToLoadBalancer(internalLB bool) error { + // Check if service is already of type loadbalancer + const ( + pxStcServiceTypeKey = "portworx.io/service-type" + stcLoadBalancerValue = "LoadBalancer" + pxStcServiceKey = "service/portworx-service" + awsInternalLBKey = "service.beta.kubernetes.io/aws-load-balancer-internal" + awsInternalLBValue = "true" + awsLBTypeKey = "service.beta.kubernetes.io/aws-load-balancer-type" + awsLBTypeVal = "nlb" + awsNLBTargetTypeKey = "service.beta.kubernetes.io/aws-load-balancer-nlb-target-type" + awsNLBTargetTypeVal = "ip" + awsLBSubnetKey = "service.beta.kubernetes.io/aws-load-balancer-subnets" + pxServiceName = "portworx-service" + pxNamespace = "kube-system" + ) + + pxService, err := core.Instance().GetService(pxServiceName, pxNamespace) + if err != nil { + return fmt.Errorf("failed to get portworx service before changing it to load balancer: %v", err) + } + log.Infof("Current Service Type is %v, pxService.Spec.Type") + if pxService.Spec.Type == v1.ServiceTypeLoadBalancer { + log.InfoD("portworx service is already of type LoadBalancer") + return nil + } + + storageDriverName := flag.Lookup("storage-driver").Value.(flag.Getter).Get().(string) + log.Infof("Storage driver name: %v", storageDriverName) + + if storageDriverName == storkdriver.PortworxDriverName { + stc, err := operator.Instance().ListStorageClusters(pxNamespace) + if err != nil { + return fmt.Errorf("failed to list PX storage cluster on EKS/AKS/GKE: %v", err) + } + if len(stc.Items) > 0 { + pxStc := (*stc).Items[0] + // Change portworx service to LoadBalancer, since this is an EKS/AKS/GKE with PX operator install for Portworx + if pxStc.ObjectMeta.Annotations == nil { + pxStc.ObjectMeta.Annotations = make(map[string]string) + } + pxStc.ObjectMeta.Annotations[pxStcServiceTypeKey] = stcLoadBalancerValue + + if internalLB { + // Add LoadBalancer annotations specific to EKS ELB + if pxStc.Spec.Metadata == nil { + pxStc.Spec.Metadata = &opcorev1.Metadata{} + pxStc.Spec.Metadata.Annotations = make(map[string]map[string]string) + pxStc.Spec.Metadata.Annotations[pxStcServiceKey] = make(map[string]string) + } + if pxStc.Spec.Metadata.Annotations == nil { + pxStc.Spec.Metadata.Annotations = make(map[string]map[string]string) + } + pxStc.Spec.Metadata.Annotations[pxStcServiceKey] = make(map[string]string) + pxStc.Spec.Metadata.Annotations[pxStcServiceKey][awsInternalLBKey] = awsInternalLBValue + pxStc.Spec.Metadata.Annotations[pxStcServiceKey][awsLBTypeKey] = awsLBTypeVal + pxStc.Spec.Metadata.Annotations[pxStcServiceKey][awsNLBTargetTypeKey] = awsNLBTargetTypeVal + pxStc.Spec.Metadata.Annotations[pxStcServiceKey][awsLBSubnetKey] = os.Getenv("LB_SUBNET_KEY") + } + _, err = operator.Instance().UpdateStorageCluster(&pxStc) + + if err != nil { + return fmt.Errorf("failed to update PX service type to LoadBalancer on EKS: %v", err) + } + + } else { + return fmt.Errorf("No storage clusters found") + } + + time.Sleep(1 * time.Minute) + + // Check if service has been changed to type loadbalancer + pxService, err := core.Instance().GetService(pxServiceName, pxNamespace) + if err != nil { + return fmt.Errorf("failed to get portworx service after changing it to load balancer: %v", err) + } + if pxService.Spec.Type != v1.ServiceTypeLoadBalancer { + return fmt.Errorf("failed to set portworx service to type %s", v1.ServiceTypeLoadBalancer) + } + + } + return nil +} + +func IsCloud() (bool, string) { + stc, err := operator.Instance().ListStorageClusters("kube-system") + if len(stc.Items) > 0 && err == nil { + log.InfoD("Storage cluster name: %s", stc.Items[0].Name) + if len(stc.Items) > 0 { + if oputils.IsEKS(&stc.Items[0]) { + log.InfoD("EKS installation detected.") + return true, "eks" + } else if oputils.IsAKS(&stc.Items[0]) { + log.InfoD("AKS installation detected.") + return true, "aks" + } else if oputils.IsGKE(&stc.Items[0]) { + log.InfoD("GKE installation detected.") + return true, "gke" + } + } + } + return false, "" +} diff --git a/tests/basic/async_dr_test.go b/tests/basic/async_dr_test.go index 1b3395fdf..1f4a90fef 100644 --- a/tests/basic/async_dr_test.go +++ b/tests/basic/async_dr_test.go @@ -45,8 +45,10 @@ const ( defaultClusterPairDirNew = "cluster-pair-new" defaultClusterPairName = "remoteclusterpair" defaultClusterPairNameNew = "remoteclusterpairnew" - defaultBackupLocation = "s3" - defaultSecret = "s3secret" + azureSecret = "azuresecret" + azureBackupLocation = "azure" + googleSecret = "googlesecret" + googleBackupLocation = "google" defaultMigSchedName = "automation-migration-schedule-" migrationKey = "async-dr-" migrationSchedKey = "mig-sched-" @@ -55,7 +57,9 @@ const ( ) var ( - kubeConfigWritten bool + kubeConfigWritten bool + defaultBackupLocation = "s3" + defaultSecret = "s3secret" ) type failoverFailbackParam struct { @@ -647,6 +651,36 @@ var _ = Describe("{StorkctlPerformFailoverFailbackeckEsClusterwide}", func() { }) }) +var _ = Describe("{ChangePXSvc}", func() { + // testrailID = 297921 + // testrailID corresponds to: https://portworx.testrail.net/index.php?/cases/view/297921 + BeforeEach(func() { + if !kubeConfigWritten { + // Write kubeconfig files after reading from the config maps created by torpedo deploy script + WriteKubeconfigToFiles() + kubeConfigWritten = true + } + wantAllAfterSuiteActions = false + }) + + // JustBeforeEach(func() { + // StartTorpedoTest("Change PX Svc", "Change PX Svc", nil, testrailID) + // runID = testrailuttils.AddRunsToMilestone(testrailID) + // }) + + It("Change PX service type", func() { + Step("Change PX service type", func() { + err := asyncdr.ChangePxServiceToLoadBalancer(false) + log.Infof("Error is: %v", err) + }) + }) + + // JustAfterEach(func() { + // defer EndTorpedoTest() + // AfterEachTest(contexts, testrailID, runID) + // }) +}) + var _ = Describe("{UpgradeVolumeDriverDuringAppBkpRestore}", func() { BeforeEach(func() { if !kubeConfigWritten { @@ -676,7 +710,7 @@ var _ = Describe("{UpgradeVolumeDriverDuringAppBkpRestore}", func() { backupName = "storkbackup-" + time.Now().Format("15h03m05s") taskNamePrefix = "appbkprest-upgradepx" defaultNs = "kube-system" - timeout = 10 * time.Minute + timeout = 10 * time.Minute ) bkpNs, contexts := initialSetupApps(taskNamePrefix, true) storageNodes := node.GetStorageNodes() @@ -806,7 +840,7 @@ var _ = Describe("{UpgradeVolumeDriverDuringAsyncDrMigration}", func() { kubeConfigPath[cluster], err = GetCustomClusterConfigPath(cluster) log.FailOnError(err, "Getting error while fetching path for %v cluster, error is %v", cluster, err) } - + var migrationSchedName string var schdPol *storkapi.SchedulePolicy cpName := defaultClusterPairName + time.Now().Format("15h03m05s") @@ -1010,14 +1044,48 @@ func validateFailoverFailback(clusterType, taskNamePrefix string, single, skipSo "namespaces": migNamespaces, "kubeconfig": kubeConfigPathSrc, } + + isCloud, cloudName := asyncdr.IsCloud() + + if isCloud { + if err = asyncdr.ChangePxServiceToLoadBalancer(false); err != nil { + log.FailOnError(err, "failed to change PX service to LoadBalancer on source cluster") + } + err = SetDestinationKubeConfig() + log.FailOnError(err, "Failed to set destination kubeconfig") + if err = asyncdr.ChangePxServiceToLoadBalancer(false); err != nil { + log.FailOnError(err, "failed to change PX service to LoadBalancer on destination cluster") + } + err = SetSourceKubeConfig() + log.FailOnError(err, "Failed to set source kubeconfig") + } + + if cloudName == "aks" { + defaultSecret = azureSecret + defaultBackupLocation = azureBackupLocation + } else if cloudName == "gke" { + defaultSecret = googleSecret + defaultBackupLocation = googleBackupLocation + } + log.Infof("Creating clusterpair between first and second cluster") cpName := defaultClusterPairName + time.Now().Format("15h03m05s") + if clusterType == "asyncdr" { err = ScheduleBidirectionalClusterPair(cpName, defaultNs, "", storkapi.BackupLocationType(defaultBackupLocation), defaultSecret, "async-dr", asyncdr.FirstCluster, asyncdr.SecondCluster) } else { err = ScheduleBidirectionalClusterPair(cpName, defaultNs, "", "", "", "sync-dr", asyncdr.FirstCluster, asyncdr.SecondCluster) } + log.FailOnError(err, "Failed creating bidirectional cluster pair") + + if isCloud { + err := patchClusterPair(cpName, defaultNs, kubeConfigPathSrc) + log.FailOnError(err, "Failed patching cluster pair") + err = patchClusterPair(cpName, defaultNs, kubeConfigPathDest) + log.FailOnError(err, "Failed patching cluster pair") + } + log.Infof("Start migration schedule and perform failover") migrationSchedName := migrationSchedKey + time.Now().Format("15h03m05s") createMigSchdAndValidateMigration(migrationSchedName, cpName, defaultNs, kubeConfigPathSrc, extraArgs) @@ -1589,6 +1657,18 @@ func patchStashStrategy(crName string) error { return nil } +func patchClusterPair(cpName, cpNs, configPath string) error { + patch := []byte(`[{"op": "remove", "path": "/spec/options/mode"}]`) + cmd := fmt.Sprintf(`kubectl --kubeconfig %v patch clusterpair %v -n %v --type='json' -p='%s'`, configPath, cpName, cpNs, string(patch)) + log.Infof("Running command: %v", cmd) + _, err = exec.Command("sh", "-c", cmd).CombinedOutput() + if err != nil { + log.Infof("Error running command: %v and err is: %v", cmd, err) + return err + } + return nil +} + func extractActionName(output, action string) string { splitOutput := strings.Split(output, "\n") prefix := fmt.Sprintf("To check %s status use the command : `", action) @@ -1596,4 +1676,4 @@ func extractActionName(output, action string) string { getStatusCommand = strings.TrimSuffix(getStatusCommand, "`") getStatusCmdArgs := strings.Split(getStatusCommand, " ") return getStatusCmdArgs[3] -} \ No newline at end of file +} From 715d1ad6ca193c36d3a64047df189416d13ad726 Mon Sep 17 00:00:00 2001 From: Geet Jain Date: Tue, 20 Aug 2024 15:07:35 +0000 Subject: [PATCH 2/2] Updated for Asyncdr eks support --- Dockerfile | 3 ++ pkg/asyncdr/asyncdr.go | 2 +- tests/basic/async_dr_test.go | 62 +++++++++++++++--------------------- tests/common.go | 9 +++++- 4 files changed, 37 insertions(+), 39 deletions(-) diff --git a/Dockerfile b/Dockerfile index 0ecbc18ea..366ba9128 100644 --- a/Dockerfile +++ b/Dockerfile @@ -95,6 +95,9 @@ RUN apk add --no-cache openssh sshpass # Install dependancy for OCP 4.14 CLI RUN apk --update add gcompat +#Install aws-cli +RUN apk add aws-cli && aws --version + # Install yq RUN wget https://github.com/mikefarah/yq/releases/download/v4.25.1/yq_linux_amd64 -O /usr/bin/yq && \ chmod +x /usr/bin/yq diff --git a/pkg/asyncdr/asyncdr.go b/pkg/asyncdr/asyncdr.go index ecff0551b..e467bec05 100644 --- a/pkg/asyncdr/asyncdr.go +++ b/pkg/asyncdr/asyncdr.go @@ -766,7 +766,7 @@ func ChangePxServiceToLoadBalancer(internalLB bool) error { // Check if service is already of type loadbalancer const ( pxStcServiceTypeKey = "portworx.io/service-type" - stcLoadBalancerValue = "LoadBalancer" + stcLoadBalancerValue = "portworx-service:LoadBalancer" pxStcServiceKey = "service/portworx-service" awsInternalLBKey = "service.beta.kubernetes.io/aws-load-balancer-internal" awsInternalLBValue = "true" diff --git a/tests/basic/async_dr_test.go b/tests/basic/async_dr_test.go index 1f4a90fef..4e1291cab 100644 --- a/tests/basic/async_dr_test.go +++ b/tests/basic/async_dr_test.go @@ -235,7 +235,7 @@ var _ = Describe("{MigrateDeploymentMetroAsync}", func() { namespace := GetAppNamespace(ctx, taskName) migrationNamespaces = append(migrationNamespaces, namespace) log.Infof("Creating clusterpair between first and second cluster") - err = ScheduleBidirectionalClusterPair(defaultClusterPairName, namespace, "", "", "", "sync-dr", asyncdr.FirstCluster, asyncdr.SecondCluster) + err = ScheduleBidirectionalClusterPair(defaultClusterPairName, namespace, "", "", "", "sync-dr", asyncdr.FirstCluster, asyncdr.SecondCluster, nil) log.FailOnError(err, "Failed creating bidirectional cluster pair") } } @@ -268,7 +268,7 @@ var _ = Describe("{MigrateDeploymentMetroAsync}", func() { for i, currMigNamespace := range migrationNamespaces { log.Infof("Creating clusterpair between second and third cluster") - ScheduleBidirectionalClusterPair(defaultClusterPairNameNew, currMigNamespace, "", storkapi.BackupLocationType(defaultBackupLocation), defaultSecret, "async-dr", asyncdr.SecondCluster, asyncdr.ThirdCluster) + ScheduleBidirectionalClusterPair(defaultClusterPairNameNew, currMigNamespace, "", storkapi.BackupLocationType(defaultBackupLocation), defaultSecret, "async-dr", asyncdr.SecondCluster, asyncdr.ThirdCluster, nil) migrationName := migrationKey + fmt.Sprintf("%d", i) + time.Now().Format("15h03m05s") currMig, err := asyncdr.CreateMigration(migrationName, currMigNamespace, defaultClusterPairNameNew, currMigNamespace, &includeVolumesFlagAsync, &includeResourcesFlag, &startApplicationsFlag, nil) Expect(err).NotTo(HaveOccurred(), @@ -651,36 +651,6 @@ var _ = Describe("{StorkctlPerformFailoverFailbackeckEsClusterwide}", func() { }) }) -var _ = Describe("{ChangePXSvc}", func() { - // testrailID = 297921 - // testrailID corresponds to: https://portworx.testrail.net/index.php?/cases/view/297921 - BeforeEach(func() { - if !kubeConfigWritten { - // Write kubeconfig files after reading from the config maps created by torpedo deploy script - WriteKubeconfigToFiles() - kubeConfigWritten = true - } - wantAllAfterSuiteActions = false - }) - - // JustBeforeEach(func() { - // StartTorpedoTest("Change PX Svc", "Change PX Svc", nil, testrailID) - // runID = testrailuttils.AddRunsToMilestone(testrailID) - // }) - - It("Change PX service type", func() { - Step("Change PX service type", func() { - err := asyncdr.ChangePxServiceToLoadBalancer(false) - log.Infof("Error is: %v", err) - }) - }) - - // JustAfterEach(func() { - // defer EndTorpedoTest() - // AfterEachTest(contexts, testrailID, runID) - // }) -}) - var _ = Describe("{UpgradeVolumeDriverDuringAppBkpRestore}", func() { BeforeEach(func() { if !kubeConfigWritten { @@ -859,7 +829,7 @@ var _ = Describe("{UpgradeVolumeDriverDuringAsyncDrMigration}", func() { Step("create clusterpair and start migration", func() { log.InfoD("Creating clusterpair between first and second cluster") - err = ScheduleBidirectionalClusterPair(cpName, defaultNs, "", storkapi.BackupLocationType(defaultBackupLocation), defaultSecret, "async-dr", asyncdr.FirstCluster, asyncdr.SecondCluster) + err = ScheduleBidirectionalClusterPair(cpName, defaultNs, "", storkapi.BackupLocationType(defaultBackupLocation), defaultSecret, "async-dr", asyncdr.FirstCluster, asyncdr.SecondCluster, nil) log.FailOnError(err, "Failed creating bidirectional cluster pair") log.InfoD("Start migration schedule and perform failover") @@ -1046,16 +1016,29 @@ func validateFailoverFailback(clusterType, taskNamePrefix string, single, skipSo } isCloud, cloudName := asyncdr.IsCloud() + var srcEp, destEp string + extraArgsCp := map[string]string{} + const defaultPort = "9001" if isCloud { if err = asyncdr.ChangePxServiceToLoadBalancer(false); err != nil { log.FailOnError(err, "failed to change PX service to LoadBalancer on source cluster") } + if cloudName == "eks" { + pxService, err := core.Instance().GetService("portworx-service", "kube-system") + log.FailOnError(err, "failed to get px service") + srcEp = pxService.Status.LoadBalancer.Ingress[0].Hostname + } err = SetDestinationKubeConfig() log.FailOnError(err, "Failed to set destination kubeconfig") if err = asyncdr.ChangePxServiceToLoadBalancer(false); err != nil { log.FailOnError(err, "failed to change PX service to LoadBalancer on destination cluster") } + if cloudName == "eks" { + pxService, err := core.Instance().GetService("portworx-service", "kube-system") + log.FailOnError(err, "failed to get px service") + destEp = pxService.Status.LoadBalancer.Ingress[0].Hostname + } err = SetSourceKubeConfig() log.FailOnError(err, "Failed to set source kubeconfig") } @@ -1068,13 +1051,18 @@ func validateFailoverFailback(clusterType, taskNamePrefix string, single, skipSo defaultBackupLocation = googleBackupLocation } + if cloudName == "eks" { + extraArgsCp["src-ep"] = srcEp + ":" + defaultPort + extraArgsCp["dest-ep"] = destEp + ":" + defaultPort + } + log.Infof("Creating clusterpair between first and second cluster") cpName := defaultClusterPairName + time.Now().Format("15h03m05s") if clusterType == "asyncdr" { - err = ScheduleBidirectionalClusterPair(cpName, defaultNs, "", storkapi.BackupLocationType(defaultBackupLocation), defaultSecret, "async-dr", asyncdr.FirstCluster, asyncdr.SecondCluster) + err = ScheduleBidirectionalClusterPair(cpName, defaultNs, "", storkapi.BackupLocationType(defaultBackupLocation), defaultSecret, "async-dr", asyncdr.FirstCluster, asyncdr.SecondCluster, extraArgsCp) } else { - err = ScheduleBidirectionalClusterPair(cpName, defaultNs, "", "", "", "sync-dr", asyncdr.FirstCluster, asyncdr.SecondCluster) + err = ScheduleBidirectionalClusterPair(cpName, defaultNs, "", "", "", "sync-dr", asyncdr.FirstCluster, asyncdr.SecondCluster, extraArgsCp) } log.FailOnError(err, "Failed creating bidirectional cluster pair") @@ -1418,9 +1406,9 @@ func validateOperatorMigFailover(namespace, clusterType, opName, crName string, kubeConfigPathDest, err := GetCustomClusterConfigPath(asyncdr.SecondCluster) log.FailOnError(err, "Failed to get destination configPath: %v", err) if clusterType == "asyncdr" { - err = ScheduleBidirectionalClusterPair(cpName, namespace, "", storkapi.BackupLocationType(defaultBackupLocation), defaultSecret, "async-dr", asyncdr.FirstCluster, asyncdr.SecondCluster) + err = ScheduleBidirectionalClusterPair(cpName, namespace, "", storkapi.BackupLocationType(defaultBackupLocation), defaultSecret, "async-dr", asyncdr.FirstCluster, asyncdr.SecondCluster, nil) } else { - err = ScheduleBidirectionalClusterPair(cpName, namespace, "", "", "", "sync-dr", asyncdr.FirstCluster, asyncdr.SecondCluster) + err = ScheduleBidirectionalClusterPair(cpName, namespace, "", "", "", "sync-dr", asyncdr.FirstCluster, asyncdr.SecondCluster, nil) } log.FailOnError(err, "Failed creating bidirectional cluster pair") log.Infof("Start migration schedule and perform failover") diff --git a/tests/common.go b/tests/common.go index 69155d9f1..03257b09d 100644 --- a/tests/common.go +++ b/tests/common.go @@ -4113,7 +4113,7 @@ func CreateClusterPairFile(pairInfo map[string]string, skipStorage, resetConfig return addStorageOptions(pairInfo, clusterPairFileName) } -func ScheduleBidirectionalClusterPair(cpName, cpNamespace, projectMappings string, objectStoreType storkv1.BackupLocationType, secretName string, mode string, sourceCluster int, destCluster int) (err error) { +func ScheduleBidirectionalClusterPair(cpName, cpNamespace, projectMappings string, objectStoreType storkv1.BackupLocationType, secretName string, mode string, sourceCluster int, destCluster int, extraArgs map[string]string) (err error) { // var token string // Setting kubeconfig to source because we will create bidirectional cluster pair based on source as reference err = SetCustomKubeConfig(sourceCluster) @@ -4196,6 +4196,13 @@ func ScheduleBidirectionalClusterPair(cpName, cpNamespace, projectMappings strin } } + if extraArgs != nil { + for key, value := range extraArgs { + cmdArgs = append(cmdArgs, "--"+key) + cmdArgs = append(cmdArgs, value) + } + } + if projectMappings != "" { cmdArgs = append(cmdArgs, "--project-mappings") cmdArgs = append(cmdArgs, projectMappings)