Skip to content

Commit

Permalink
Added support for batch scale-down
Browse files Browse the repository at this point in the history
  • Loading branch information
abhishekdwivedi3060 committed Mar 25, 2024
1 parent 3b2f304 commit c16ece9
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 69 deletions.
3 changes: 3 additions & 0 deletions api/v1/aerospikecluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ type RackConfig struct { //nolint:govet // for readability
// RollingUpdateBatchSize is the percentage/number of rack pods that will be restarted simultaneously
// +optional
RollingUpdateBatchSize *intstr.IntOrString `json:"rollingUpdateBatchSize,omitempty"`
// ScaleDownBatchSize is the percentage/number of rack pods that will be scaled down simultaneously
// +optional
ScaleDownBatchSize *intstr.IntOrString `json:"scaleDownBatchSize,omitempty"`
// MaxIgnorablePods is the maximum number/percentage of pending/failed pods in a rack that are ignored while
// assessing cluster stability. Pods identified using this value are not considered part of the cluster.
// Additionally, in SC mode clusters, these pods are removed from the roster.
Expand Down
77 changes: 46 additions & 31 deletions api/v1/aerospikecluster_validating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,38 +603,15 @@ func (c *AerospikeCluster) validateRackConfig(_ logr.Logger) error {
}

// Validate batch upgrade/restart param
if c.Spec.RackConfig.RollingUpdateBatchSize != nil {
if err := validateIntOrStringField(c.Spec.RackConfig.RollingUpdateBatchSize,
"spec.rackConfig.rollingUpdateBatchSize"); err != nil {
return err
}

if len(c.Spec.RackConfig.Racks) < 2 {
return fmt.Errorf("can not use rackConfig.RollingUpdateBatchSize when number of racks is less than two")
}

nsConfsNamespaces := c.getNsConfsForNamespaces()
for ns, nsConf := range nsConfsNamespaces {
if !isNameExist(c.Spec.RackConfig.Namespaces, ns) {
return fmt.Errorf(
"can not use rackConfig.RollingUpdateBatchSize when there is any non-rack enabled namespace %s", ns,
)
}

if nsConf.noOfRacksForNamespaces <= 1 {
return fmt.Errorf(
"can not use rackConfig.RollingUpdateBatchSize when namespace `%s` is configured in only one rack",
ns,
)
}
if err := c.validateBatchSize(c.Spec.RackConfig.RollingUpdateBatchSize,
"spec.rackConfig.rollingUpdateBatchSize"); err != nil {
return err
}

if nsConf.replicationFactor <= 1 {
return fmt.Errorf(
"can not use rackConfig.RollingUpdateBatchSize when namespace `%s` is configured with replication-factor 1",
ns,
)
}
}
// Validate batch scaleDown param
if err := c.validateBatchSize(c.Spec.RackConfig.ScaleDownBatchSize,
"spec.rackConfig.scaleDownBatchSize"); err != nil {
return err
}

// Validate MaxIgnorablePods param
Expand Down Expand Up @@ -2158,6 +2135,44 @@ func (c *AerospikeCluster) validateNetworkPolicy(namespace string) error {
return nil
}

func (c *AerospikeCluster) validateBatchSize(batchSize *intstr.IntOrString, fieldPath string) error {
if batchSize == nil {
return nil
}

if err := validateIntOrStringField(batchSize, fieldPath); err != nil {
return err
}

if len(c.Spec.RackConfig.Racks) < 2 {
return fmt.Errorf("can not use %s when number of racks is less than two", fieldPath)
}

nsConfsNamespaces := c.getNsConfsForNamespaces()
for ns, nsConf := range nsConfsNamespaces {
if !isNameExist(c.Spec.RackConfig.Namespaces, ns) {
return fmt.Errorf(
"can not use %s when there is any non-rack enabled namespace %s", fieldPath, ns,
)
}

if nsConf.noOfRacksForNamespaces <= 1 {
return fmt.Errorf(
"can not use %s when namespace `%s` is configured in only one rack", fieldPath, ns,
)
}

if nsConf.replicationFactor <= 1 {
return fmt.Errorf(
"can not use %s when namespace `%s` is configured with replication-factor 1", fieldPath,
ns,
)
}
}

return nil
}

func validateIntOrStringField(value *intstr.IntOrString, fieldPath string) error {
randomNumber := 100
// Just validate if value is valid number or string.
Expand Down
5 changes: 5 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8686,6 +8686,13 @@ spec:
description: RollingUpdateBatchSize is the percentage/number of
rack pods that will be restarted simultaneously
x-kubernetes-int-or-string: true
scaleDownBatchSize:
anyOf:
- type: integer
- type: string
description: ScaleDownBatchSize is the percentage/number of rack
pods that will be scaled down simultaneously
x-kubernetes-int-or-string: true
type: object
rosterNodeBlockList:
description: RosterNodeBlockList is a list of blocked nodeIDs from
Expand Down Expand Up @@ -18125,6 +18132,13 @@ spec:
description: RollingUpdateBatchSize is the percentage/number of
rack pods that will be restarted simultaneously
x-kubernetes-int-or-string: true
scaleDownBatchSize:
anyOf:
- type: integer
- type: string
description: ScaleDownBatchSize is the percentage/number of rack
pods that will be scaled down simultaneously
x-kubernetes-int-or-string: true
type: object
resources:
description: 'Define resources requests and limits for Aerospike Server
Expand Down
4 changes: 2 additions & 2 deletions controllers/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,10 +810,10 @@ func (r *SingleClusterReconciler) getClusterPodList() (
return podList, nil
}

func (r *SingleClusterReconciler) isAnyPodInImageFailedState(podList []corev1.Pod, ignorablePodNames sets.Set[string],
func (r *SingleClusterReconciler) isAnyPodInImageFailedState(podList []*corev1.Pod, ignorablePodNames sets.Set[string],
) bool {
for idx := range podList {
pod := &podList[idx]
pod := podList[idx]
if ignorablePodNames.Has(pod.Name) {
continue
}
Expand Down
102 changes: 66 additions & 36 deletions controllers/rack.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,12 +599,12 @@ func (r *SingleClusterReconciler) scaleUpRack(

// No need for this? But if image is bad then new pod will also come up
// with bad node.
podList, err := r.getRackPodList(rackState.Rack.ID)
podList, err := r.getOrderedRackPodList(rackState.Rack.ID)
if err != nil {
return found, reconcileError(fmt.Errorf("failed to list pods: %v", err))
}

if r.isAnyPodInImageFailedState(podList.Items, ignorablePodNames) {
if r.isAnyPodInImageFailedState(podList, ignorablePodNames) {
return found, reconcileError(fmt.Errorf("cannot scale up AerospikeCluster. A pod is already in failed state"))
}

Expand All @@ -615,8 +615,8 @@ func (r *SingleClusterReconciler) scaleUpRack(

// Ensure none of the to be launched pods are active.
for _, newPodName := range newPodNames {
for idx := range podList.Items {
if podList.Items[idx].Name == newPodName {
for idx := range podList {
if podList[idx].Name == newPodName {
return found, reconcileError(
fmt.Errorf(
"pod %s yet to be launched is still present",
Expand Down Expand Up @@ -734,7 +734,7 @@ func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, r
podsBatchList[0] = podsToUpgrade
} else {
// Create batch of pods
podsBatchList = r.getPodsBatchToRestart(podsToUpgrade, len(podList))
podsBatchList = r.getPodBatchToRestart(podsToUpgrade, len(podList))
}

if len(podsBatchList) > 0 {
Expand Down Expand Up @@ -800,8 +800,8 @@ func (r *SingleClusterReconciler) scaleDownRack(
}

r.Log.Info(
"ScaleDown AerospikeCluster statefulset", "desiredSz", desiredSize,
"currentSz", *found.Spec.Replicas,
"ScaleDown AerospikeCluster statefulset", "desiredSize", desiredSize,
"currentSize", *found.Spec.Replicas,
)
r.Recorder.Eventf(
r.aeroCluster, corev1.EventTypeNormal, "RackScaleDown",
Expand All @@ -810,32 +810,53 @@ func (r *SingleClusterReconciler) scaleDownRack(
desiredSize,
)

oldPodList, err := r.getRackPodList(rackState.Rack.ID)
oldPodList, err := r.getOrderedRackPodList(rackState.Rack.ID)
if err != nil {
return found, reconcileError(fmt.Errorf("failed to list pods: %v", err))
}

if r.isAnyPodInImageFailedState(oldPodList.Items, ignorablePodNames) {
if r.isAnyPodInImageFailedState(oldPodList, ignorablePodNames) {
return found, reconcileError(fmt.Errorf("cannot scale down AerospikeCluster. A pod is already in failed state"))
}

// code flow will reach this stage only when found.Spec.Replicas > desiredSize

// maintain list of removed pods. It will be used for alumni-reset and tip-clear
var pod *corev1.Pod
// Code flow will reach this stage only when found.Spec.Replicas > desiredSize
// Maintain a list of removed pods. It will be used for alumni-reset and tip-clear

policy := r.getClientPolicy()
diffPods := *found.Spec.Replicas - desiredSize

podName := getSTSPodName(found.Name, *found.Spec.Replicas-1)
podsBatchList := r.getPodBatchToScaleDown(oldPodList[:diffPods], len(oldPodList))

pod = utils.GetPod(podName, oldPodList.Items)
// Handle one batch
podsBatch := podsBatchList[0]

isPodRunningAndReady := utils.IsPodRunningAndReady(pod)
r.Log.Info(
"Calculated batch for Pod scale-down",
"rackPodList", getPodNames(oldPodList),
"podsBatch", getPodNames(podsBatch),
"scaleDownBatchSize", r.aeroCluster.Spec.RackConfig.ScaleDownBatchSize,
)

// Ignore safe stop check if pod is not running.
var (
runningPods []*corev1.Pod
isAnyPodRunningAndReady bool
)

for idx := range podsBatch {
if utils.IsPodRunningAndReady(podsBatch[idx]) {
runningPods = append(runningPods, podsBatch[idx])
isAnyPodRunningAndReady = true

continue
}

ignorablePodNames.Insert(podsBatch[idx].Name)
}

// Ignore safe stop check if all pods in the batch are not running.
// Ignore migrate-fill-delay if pod is not running. Deleting this pod will not lead to any migration.
if isPodRunningAndReady {
if res := r.waitForMultipleNodesSafeStopReady([]*corev1.Pod{pod}, ignorablePodNames); !res.isSuccess {
if isAnyPodRunningAndReady {
if res := r.waitForMultipleNodesSafeStopReady(runningPods, ignorablePodNames); !res.isSuccess {
// The pod is running and is unsafe to terminate.
return found, res
}
Expand All @@ -845,15 +866,14 @@ func (r *SingleClusterReconciler) scaleDownRack(
// This check ensures that migrate-fill-delay is not set while processing failed racks.
// setting migrate-fill-delay will fail if there are any failed pod
if res := r.setMigrateFillDelay(
policy, &rackState.Rack.AerospikeConfig, true,
ignorablePodNames.Insert(pod.Name),
policy, &rackState.Rack.AerospikeConfig, true, ignorablePodNames,
); !res.isSuccess {
return found, res
}
}

// Update new object with new size
newSize := *found.Spec.Replicas - 1
newSize := *found.Spec.Replicas - int32(len(podsBatch))
found.Spec.Replicas = &newSize

if err = r.Client.Update(
Expand All @@ -869,7 +889,7 @@ func (r *SingleClusterReconciler) scaleDownRack(

// No need for these checks if pod was not running.
// These checks will fail if there is any other pod in failed state.
if isPodRunningAndReady {
if isAnyPodRunningAndReady {
// Wait for pods to get terminated
if err = r.waitForSTSToBeReady(found, ignorablePodNames); err != nil {
r.Log.Error(err, "Failed to wait for statefulset to be ready")
Expand Down Expand Up @@ -918,18 +938,20 @@ func (r *SingleClusterReconciler) scaleDownRack(

found = nFound

if err := r.cleanupPods([]string{podName}, rackState); err != nil {
podNames := getPodNames(podsBatch)

if err := r.cleanupPods(podNames, rackState); err != nil {
return nFound, reconcileError(
fmt.Errorf(
"failed to cleanup pod %s: %v", podName, err,
"failed to cleanup pod %s: %v", podNames, err,
),
)
}

r.Log.Info("Pod Removed", "podName", podName)
r.Log.Info("Pod Removed", "podNames", podNames)
r.Recorder.Eventf(
r.aeroCluster, corev1.EventTypeNormal, "PodDeleted",
"[rack-%d] Deleted Pod %s", rackState.Rack.ID, pod.Name,
"[rack-%d] Deleted Pods %s", rackState.Rack.ID, podNames,
)

r.Recorder.Eventf(
Expand Down Expand Up @@ -972,12 +994,7 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet,
}
}

pods := make([]corev1.Pod, 0, len(podList))
for idx := range podList {
pods = append(pods, *podList[idx])
}

if len(failedPods) != 0 && r.isAnyPodInImageFailedState(pods, ignorablePodNames) {
if len(failedPods) != 0 && r.isAnyPodInImageFailedState(podList, ignorablePodNames) {
return found, reconcileError(
fmt.Errorf(
"cannot Rolling restart AerospikeCluster. " +
Expand Down Expand Up @@ -1027,7 +1044,7 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet,
podsBatchList[0] = podsToRestart
} else {
// Create batch of pods
podsBatchList = r.getPodsBatchToRestart(podsToRestart, len(podList))
podsBatchList = r.getPodBatchToRestart(podsToRestart, len(podList))
}

// Restart batch of pods
Expand Down Expand Up @@ -1116,7 +1133,7 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1
}
}

podsBatchList := r.getPodsBatchToRestart(podsToRestart, len(podList))
podsBatchList := r.getPodBatchToRestart(podsToRestart, len(podList))

// Restart batch of pods
if len(podsBatchList) > 0 {
Expand Down Expand Up @@ -1760,7 +1777,7 @@ func getOriginalPath(path string) string {
return path
}

func (r *SingleClusterReconciler) getPodsBatchToRestart(podList []*corev1.Pod, rackSize int) [][]*corev1.Pod {
func (r *SingleClusterReconciler) getPodBatchToRestart(podList []*corev1.Pod, rackSize int) [][]*corev1.Pod {
// Error is already handled in validation
rollingUpdateBatchSize, _ := intstr.GetScaledValueFromIntOrPercent(
r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, rackSize, false,
Expand All @@ -1769,6 +1786,19 @@ func (r *SingleClusterReconciler) getPodsBatchToRestart(podList []*corev1.Pod, r
return chunkBy(podList, rollingUpdateBatchSize)
}

func (r *SingleClusterReconciler) getPodBatchToScaleDown(podList []*corev1.Pod, rackSize int) [][]*corev1.Pod {
// Error is already handled in validation
scaleDownBatchSize, _ := intstr.GetScaledValueFromIntOrPercent(
r.aeroCluster.Spec.RackConfig.ScaleDownBatchSize, rackSize, false,
)

if len(podList) < scaleDownBatchSize {
scaleDownBatchSize = len(podList)
}

return chunkBy(podList, scaleDownBatchSize)
}

func chunkBy[T any](items []*T, chunkSize int) (chunks [][]*T) {
if chunkSize <= 0 {
chunkSize = 1
Expand Down
Loading

0 comments on commit c16ece9

Please sign in to comment.