From c16ece933384c5fa3002d50e8978add7130abbd8 Mon Sep 17 00:00:00 2001 From: Abhisek Dwivedi Date: Thu, 21 Mar 2024 16:07:01 +0530 Subject: [PATCH] Added support for batch scale-down --- api/v1/aerospikecluster_types.go | 3 + api/v1/aerospikecluster_validating_webhook.go | 77 +++++++------ api/v1/zz_generated.deepcopy.go | 5 + .../asdb.aerospike.com_aerospikeclusters.yaml | 14 +++ controllers/pod.go | 4 +- controllers/rack.go | 102 +++++++++++------- ..._aerospikeclusters.asdb.aerospike.com.yaml | 14 +++ test/batch_scaledown_pods_test.go | 1 + 8 files changed, 151 insertions(+), 69 deletions(-) create mode 100644 test/batch_scaledown_pods_test.go diff --git a/api/v1/aerospikecluster_types.go b/api/v1/aerospikecluster_types.go index 32a28c781..bea1f1e17 100644 --- a/api/v1/aerospikecluster_types.go +++ b/api/v1/aerospikecluster_types.go @@ -310,6 +310,9 @@ type RackConfig struct { //nolint:govet // for readability // RollingUpdateBatchSize is the percentage/number of rack pods that will be restarted simultaneously // +optional RollingUpdateBatchSize *intstr.IntOrString `json:"rollingUpdateBatchSize,omitempty"` + // ScaleDownBatchSize is the percentage/number of rack pods that will be scaled down simultaneously + // +optional + ScaleDownBatchSize *intstr.IntOrString `json:"scaleDownBatchSize,omitempty"` // MaxIgnorablePods is the maximum number/percentage of pending/failed pods in a rack that are ignored while // assessing cluster stability. Pods identified using this value are not considered part of the cluster. // Additionally, in SC mode clusters, these pods are removed from the roster. diff --git a/api/v1/aerospikecluster_validating_webhook.go b/api/v1/aerospikecluster_validating_webhook.go index fe340aee2..90df2675a 100644 --- a/api/v1/aerospikecluster_validating_webhook.go +++ b/api/v1/aerospikecluster_validating_webhook.go @@ -603,38 +603,15 @@ func (c *AerospikeCluster) validateRackConfig(_ logr.Logger) error { } // Validate batch upgrade/restart param - if c.Spec.RackConfig.RollingUpdateBatchSize != nil { - if err := validateIntOrStringField(c.Spec.RackConfig.RollingUpdateBatchSize, - "spec.rackConfig.rollingUpdateBatchSize"); err != nil { - return err - } - - if len(c.Spec.RackConfig.Racks) < 2 { - return fmt.Errorf("can not use rackConfig.RollingUpdateBatchSize when number of racks is less than two") - } - - nsConfsNamespaces := c.getNsConfsForNamespaces() - for ns, nsConf := range nsConfsNamespaces { - if !isNameExist(c.Spec.RackConfig.Namespaces, ns) { - return fmt.Errorf( - "can not use rackConfig.RollingUpdateBatchSize when there is any non-rack enabled namespace %s", ns, - ) - } - - if nsConf.noOfRacksForNamespaces <= 1 { - return fmt.Errorf( - "can not use rackConfig.RollingUpdateBatchSize when namespace `%s` is configured in only one rack", - ns, - ) - } + if err := c.validateBatchSize(c.Spec.RackConfig.RollingUpdateBatchSize, + "spec.rackConfig.rollingUpdateBatchSize"); err != nil { + return err + } - if nsConf.replicationFactor <= 1 { - return fmt.Errorf( - "can not use rackConfig.RollingUpdateBatchSize when namespace `%s` is configured with replication-factor 1", - ns, - ) - } - } + // Validate batch scaleDown param + if err := c.validateBatchSize(c.Spec.RackConfig.ScaleDownBatchSize, + "spec.rackConfig.scaleDownBatchSize"); err != nil { + return err } // Validate MaxIgnorablePods param @@ -2158,6 +2135,44 @@ func (c *AerospikeCluster) validateNetworkPolicy(namespace string) error { return nil } +func (c *AerospikeCluster) validateBatchSize(batchSize *intstr.IntOrString, fieldPath string) error { + if batchSize == nil { + return nil + } + + if err := validateIntOrStringField(batchSize, fieldPath); err != nil { + return err + } + + if len(c.Spec.RackConfig.Racks) < 2 { + return fmt.Errorf("can not use %s when number of racks is less than two", fieldPath) + } + + nsConfsNamespaces := c.getNsConfsForNamespaces() + for ns, nsConf := range nsConfsNamespaces { + if !isNameExist(c.Spec.RackConfig.Namespaces, ns) { + return fmt.Errorf( + "can not use %s when there is any non-rack enabled namespace %s", fieldPath, ns, + ) + } + + if nsConf.noOfRacksForNamespaces <= 1 { + return fmt.Errorf( + "can not use %s when namespace `%s` is configured in only one rack", fieldPath, ns, + ) + } + + if nsConf.replicationFactor <= 1 { + return fmt.Errorf( + "can not use %s when namespace `%s` is configured with replication-factor 1", fieldPath, + ns, + ) + } + } + + return nil +} + func validateIntOrStringField(value *intstr.IntOrString, fieldPath string) error { randomNumber := 100 // Just validate if value is valid number or string. diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 3bd576ec1..d4afc22b4 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -883,6 +883,11 @@ func (in *RackConfig) DeepCopyInto(out *RackConfig) { *out = new(intstr.IntOrString) **out = **in } + if in.ScaleDownBatchSize != nil { + in, out := &in.ScaleDownBatchSize, &out.ScaleDownBatchSize + *out = new(intstr.IntOrString) + **out = **in + } if in.MaxIgnorablePods != nil { in, out := &in.MaxIgnorablePods, &out.MaxIgnorablePods *out = new(intstr.IntOrString) diff --git a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml index 4fe9c6c14..db7e166ce 100644 --- a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml +++ b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml @@ -8686,6 +8686,13 @@ spec: description: RollingUpdateBatchSize is the percentage/number of rack pods that will be restarted simultaneously x-kubernetes-int-or-string: true + scaleDownBatchSize: + anyOf: + - type: integer + - type: string + description: ScaleDownBatchSize is the percentage/number of rack + pods that will be scaled down simultaneously + x-kubernetes-int-or-string: true type: object rosterNodeBlockList: description: RosterNodeBlockList is a list of blocked nodeIDs from @@ -18125,6 +18132,13 @@ spec: description: RollingUpdateBatchSize is the percentage/number of rack pods that will be restarted simultaneously x-kubernetes-int-or-string: true + scaleDownBatchSize: + anyOf: + - type: integer + - type: string + description: ScaleDownBatchSize is the percentage/number of rack + pods that will be scaled down simultaneously + x-kubernetes-int-or-string: true type: object resources: description: 'Define resources requests and limits for Aerospike Server diff --git a/controllers/pod.go b/controllers/pod.go index 9509986b9..caa7fb3e5 100644 --- a/controllers/pod.go +++ b/controllers/pod.go @@ -810,10 +810,10 @@ func (r *SingleClusterReconciler) getClusterPodList() ( return podList, nil } -func (r *SingleClusterReconciler) isAnyPodInImageFailedState(podList []corev1.Pod, ignorablePodNames sets.Set[string], +func (r *SingleClusterReconciler) isAnyPodInImageFailedState(podList []*corev1.Pod, ignorablePodNames sets.Set[string], ) bool { for idx := range podList { - pod := &podList[idx] + pod := podList[idx] if ignorablePodNames.Has(pod.Name) { continue } diff --git a/controllers/rack.go b/controllers/rack.go index 087b56fef..913cbf5a9 100644 --- a/controllers/rack.go +++ b/controllers/rack.go @@ -599,12 +599,12 @@ func (r *SingleClusterReconciler) scaleUpRack( // No need for this? But if image is bad then new pod will also come up // with bad node. - podList, err := r.getRackPodList(rackState.Rack.ID) + podList, err := r.getOrderedRackPodList(rackState.Rack.ID) if err != nil { return found, reconcileError(fmt.Errorf("failed to list pods: %v", err)) } - if r.isAnyPodInImageFailedState(podList.Items, ignorablePodNames) { + if r.isAnyPodInImageFailedState(podList, ignorablePodNames) { return found, reconcileError(fmt.Errorf("cannot scale up AerospikeCluster. A pod is already in failed state")) } @@ -615,8 +615,8 @@ func (r *SingleClusterReconciler) scaleUpRack( // Ensure none of the to be launched pods are active. for _, newPodName := range newPodNames { - for idx := range podList.Items { - if podList.Items[idx].Name == newPodName { + for idx := range podList { + if podList[idx].Name == newPodName { return found, reconcileError( fmt.Errorf( "pod %s yet to be launched is still present", @@ -734,7 +734,7 @@ func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, r podsBatchList[0] = podsToUpgrade } else { // Create batch of pods - podsBatchList = r.getPodsBatchToRestart(podsToUpgrade, len(podList)) + podsBatchList = r.getPodBatchToRestart(podsToUpgrade, len(podList)) } if len(podsBatchList) > 0 { @@ -800,8 +800,8 @@ func (r *SingleClusterReconciler) scaleDownRack( } r.Log.Info( - "ScaleDown AerospikeCluster statefulset", "desiredSz", desiredSize, - "currentSz", *found.Spec.Replicas, + "ScaleDown AerospikeCluster statefulset", "desiredSize", desiredSize, + "currentSize", *found.Spec.Replicas, ) r.Recorder.Eventf( r.aeroCluster, corev1.EventTypeNormal, "RackScaleDown", @@ -810,32 +810,53 @@ func (r *SingleClusterReconciler) scaleDownRack( desiredSize, ) - oldPodList, err := r.getRackPodList(rackState.Rack.ID) + oldPodList, err := r.getOrderedRackPodList(rackState.Rack.ID) if err != nil { return found, reconcileError(fmt.Errorf("failed to list pods: %v", err)) } - if r.isAnyPodInImageFailedState(oldPodList.Items, ignorablePodNames) { + if r.isAnyPodInImageFailedState(oldPodList, ignorablePodNames) { return found, reconcileError(fmt.Errorf("cannot scale down AerospikeCluster. A pod is already in failed state")) } - // code flow will reach this stage only when found.Spec.Replicas > desiredSize - - // maintain list of removed pods. It will be used for alumni-reset and tip-clear - var pod *corev1.Pod + // Code flow will reach this stage only when found.Spec.Replicas > desiredSize + // Maintain a list of removed pods. It will be used for alumni-reset and tip-clear policy := r.getClientPolicy() + diffPods := *found.Spec.Replicas - desiredSize - podName := getSTSPodName(found.Name, *found.Spec.Replicas-1) + podsBatchList := r.getPodBatchToScaleDown(oldPodList[:diffPods], len(oldPodList)) - pod = utils.GetPod(podName, oldPodList.Items) + // Handle one batch + podsBatch := podsBatchList[0] - isPodRunningAndReady := utils.IsPodRunningAndReady(pod) + r.Log.Info( + "Calculated batch for Pod scale-down", + "rackPodList", getPodNames(oldPodList), + "podsBatch", getPodNames(podsBatch), + "scaleDownBatchSize", r.aeroCluster.Spec.RackConfig.ScaleDownBatchSize, + ) - // Ignore safe stop check if pod is not running. + var ( + runningPods []*corev1.Pod + isAnyPodRunningAndReady bool + ) + + for idx := range podsBatch { + if utils.IsPodRunningAndReady(podsBatch[idx]) { + runningPods = append(runningPods, podsBatch[idx]) + isAnyPodRunningAndReady = true + + continue + } + + ignorablePodNames.Insert(podsBatch[idx].Name) + } + + // Ignore safe stop check if all pods in the batch are not running. // Ignore migrate-fill-delay if pod is not running. Deleting this pod will not lead to any migration. - if isPodRunningAndReady { - if res := r.waitForMultipleNodesSafeStopReady([]*corev1.Pod{pod}, ignorablePodNames); !res.isSuccess { + if isAnyPodRunningAndReady { + if res := r.waitForMultipleNodesSafeStopReady(runningPods, ignorablePodNames); !res.isSuccess { // The pod is running and is unsafe to terminate. return found, res } @@ -845,15 +866,14 @@ func (r *SingleClusterReconciler) scaleDownRack( // This check ensures that migrate-fill-delay is not set while processing failed racks. // setting migrate-fill-delay will fail if there are any failed pod if res := r.setMigrateFillDelay( - policy, &rackState.Rack.AerospikeConfig, true, - ignorablePodNames.Insert(pod.Name), + policy, &rackState.Rack.AerospikeConfig, true, ignorablePodNames, ); !res.isSuccess { return found, res } } // Update new object with new size - newSize := *found.Spec.Replicas - 1 + newSize := *found.Spec.Replicas - int32(len(podsBatch)) found.Spec.Replicas = &newSize if err = r.Client.Update( @@ -869,7 +889,7 @@ func (r *SingleClusterReconciler) scaleDownRack( // No need for these checks if pod was not running. // These checks will fail if there is any other pod in failed state. - if isPodRunningAndReady { + if isAnyPodRunningAndReady { // Wait for pods to get terminated if err = r.waitForSTSToBeReady(found, ignorablePodNames); err != nil { r.Log.Error(err, "Failed to wait for statefulset to be ready") @@ -918,18 +938,20 @@ func (r *SingleClusterReconciler) scaleDownRack( found = nFound - if err := r.cleanupPods([]string{podName}, rackState); err != nil { + podNames := getPodNames(podsBatch) + + if err := r.cleanupPods(podNames, rackState); err != nil { return nFound, reconcileError( fmt.Errorf( - "failed to cleanup pod %s: %v", podName, err, + "failed to cleanup pod %s: %v", podNames, err, ), ) } - r.Log.Info("Pod Removed", "podName", podName) + r.Log.Info("Pod Removed", "podNames", podNames) r.Recorder.Eventf( r.aeroCluster, corev1.EventTypeNormal, "PodDeleted", - "[rack-%d] Deleted Pod %s", rackState.Rack.ID, pod.Name, + "[rack-%d] Deleted Pods %s", rackState.Rack.ID, podNames, ) r.Recorder.Eventf( @@ -972,12 +994,7 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, } } - pods := make([]corev1.Pod, 0, len(podList)) - for idx := range podList { - pods = append(pods, *podList[idx]) - } - - if len(failedPods) != 0 && r.isAnyPodInImageFailedState(pods, ignorablePodNames) { + if len(failedPods) != 0 && r.isAnyPodInImageFailedState(podList, ignorablePodNames) { return found, reconcileError( fmt.Errorf( "cannot Rolling restart AerospikeCluster. " + @@ -1027,7 +1044,7 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, podsBatchList[0] = podsToRestart } else { // Create batch of pods - podsBatchList = r.getPodsBatchToRestart(podsToRestart, len(podList)) + podsBatchList = r.getPodBatchToRestart(podsToRestart, len(podList)) } // Restart batch of pods @@ -1116,7 +1133,7 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1 } } - podsBatchList := r.getPodsBatchToRestart(podsToRestart, len(podList)) + podsBatchList := r.getPodBatchToRestart(podsToRestart, len(podList)) // Restart batch of pods if len(podsBatchList) > 0 { @@ -1760,7 +1777,7 @@ func getOriginalPath(path string) string { return path } -func (r *SingleClusterReconciler) getPodsBatchToRestart(podList []*corev1.Pod, rackSize int) [][]*corev1.Pod { +func (r *SingleClusterReconciler) getPodBatchToRestart(podList []*corev1.Pod, rackSize int) [][]*corev1.Pod { // Error is already handled in validation rollingUpdateBatchSize, _ := intstr.GetScaledValueFromIntOrPercent( r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, rackSize, false, @@ -1769,6 +1786,19 @@ func (r *SingleClusterReconciler) getPodsBatchToRestart(podList []*corev1.Pod, r return chunkBy(podList, rollingUpdateBatchSize) } +func (r *SingleClusterReconciler) getPodBatchToScaleDown(podList []*corev1.Pod, rackSize int) [][]*corev1.Pod { + // Error is already handled in validation + scaleDownBatchSize, _ := intstr.GetScaledValueFromIntOrPercent( + r.aeroCluster.Spec.RackConfig.ScaleDownBatchSize, rackSize, false, + ) + + if len(podList) < scaleDownBatchSize { + scaleDownBatchSize = len(podList) + } + + return chunkBy(podList, scaleDownBatchSize) +} + func chunkBy[T any](items []*T, chunkSize int) (chunks [][]*T) { if chunkSize <= 0 { chunkSize = 1 diff --git a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml index 4fe9c6c14..db7e166ce 100644 --- a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml +++ b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml @@ -8686,6 +8686,13 @@ spec: description: RollingUpdateBatchSize is the percentage/number of rack pods that will be restarted simultaneously x-kubernetes-int-or-string: true + scaleDownBatchSize: + anyOf: + - type: integer + - type: string + description: ScaleDownBatchSize is the percentage/number of rack + pods that will be scaled down simultaneously + x-kubernetes-int-or-string: true type: object rosterNodeBlockList: description: RosterNodeBlockList is a list of blocked nodeIDs from @@ -18125,6 +18132,13 @@ spec: description: RollingUpdateBatchSize is the percentage/number of rack pods that will be restarted simultaneously x-kubernetes-int-or-string: true + scaleDownBatchSize: + anyOf: + - type: integer + - type: string + description: ScaleDownBatchSize is the percentage/number of rack + pods that will be scaled down simultaneously + x-kubernetes-int-or-string: true type: object resources: description: 'Define resources requests and limits for Aerospike Server diff --git a/test/batch_scaledown_pods_test.go b/test/batch_scaledown_pods_test.go new file mode 100644 index 000000000..56e540407 --- /dev/null +++ b/test/batch_scaledown_pods_test.go @@ -0,0 +1 @@ +package test