diff --git a/api/v1/aerospikecluster_types.go b/api/v1/aerospikecluster_types.go index 089bed614..9346610f5 100644 --- a/api/v1/aerospikecluster_types.go +++ b/api/v1/aerospikecluster_types.go @@ -72,6 +72,7 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Seeds Finder Services" SeedsFinderServices SeedsFinderServices `json:"seedsFinderServices,omitempty"` // RosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Roster Node BlockList" RosterNodeBlockList []string `json:"rosterNodeBlockList,omitempty"` } @@ -276,6 +277,17 @@ type RackConfig struct { //nolint:govet // for readability // RollingUpdateBatchSize is the percentage/number of rack pods that will be restarted simultaneously // +optional RollingUpdateBatchSize *intstr.IntOrString `json:"rollingUpdateBatchSize,omitempty"` + // MaxIgnorablePods is the maximum number/percentage of pending/failed pods in a rack that are ignored while + // assessing cluster stability. Pods identified using this value are not considered part of the cluster. + // Additionally, in SC mode clusters, these pods are removed from the roster. + // This is particularly useful when some pods are stuck in pending/failed state due to any scheduling issues and + // cannot be fixed by simply updating the CR. + // It enables the operator to perform specific operations on the cluster, like changing Aerospike configurations, + // without being hindered by these problematic pods. + // Remember to set MaxIgnorablePods back to 0 once the required operation is done. + // This makes sure that later on, all pods are properly counted when evaluating the cluster stability. + // +optional + MaxIgnorablePods *intstr.IntOrString `json:"maxIgnorablePods,omitempty"` } // Rack specifies single rack config diff --git a/api/v1/aerospikecluster_validating_webhook.go b/api/v1/aerospikecluster_validating_webhook.go index e98c12f8a..96d8e9454 100644 --- a/api/v1/aerospikecluster_validating_webhook.go +++ b/api/v1/aerospikecluster_validating_webhook.go @@ -605,24 +605,11 @@ func (c *AerospikeCluster) validateRackConfig(_ logr.Logger) error { // Validate batch upgrade/restart param if c.Spec.RackConfig.RollingUpdateBatchSize != nil { - // Just validate if RollingUpdateBatchSize is valid number or string. - randomNumber := 100 - - count, err := intstr.GetScaledValueFromIntOrPercent( - c.Spec.RackConfig.RollingUpdateBatchSize, randomNumber, false, - ) - if err != nil { + if err := validateIntOrStringField(c.Spec.RackConfig.RollingUpdateBatchSize, + "spec.rackConfig.rollingUpdateBatchSize"); err != nil { return err } - // Only negative is not allowed. Any big number can be given. - if count < 0 { - return fmt.Errorf( - "can not use negative rackConfig.RollingUpdateBatchSize %s", - c.Spec.RackConfig.RollingUpdateBatchSize.String(), - ) - } - if len(c.Spec.RackConfig.Racks) < 2 { return fmt.Errorf("can not use rackConfig.RollingUpdateBatchSize when number of racks is less than two") } @@ -651,6 +638,13 @@ func (c *AerospikeCluster) validateRackConfig(_ logr.Logger) error { } } + // Validate MaxIgnorablePods param + if c.Spec.RackConfig.MaxIgnorablePods != nil { + if err := validateIntOrStringField(c.Spec.RackConfig.MaxIgnorablePods, + "spec.rackConfig.maxIgnorablePods"); err != nil { + return err + } + } // TODO: should not use batch if racks are less than replication-factor return nil } @@ -2176,3 +2170,23 @@ func (c *AerospikeCluster) validateNetworkPolicy(namespace string) error { return nil } + +func validateIntOrStringField(value *intstr.IntOrString, fieldPath string) error { + randomNumber := 100 + // Just validate if value is valid number or string. + count, err := intstr.GetScaledValueFromIntOrPercent(value, randomNumber, false) + if err != nil { + return err + } + + // Only negative is not allowed. Any big number can be given. + if count < 0 { + return fmt.Errorf("can not use negative %s: %s", fieldPath, value.String()) + } + + if value.Type == intstr.String && count > 100 { + return fmt.Errorf("%s: %s must not be greater than 100 percent", fieldPath, value.String()) + } + + return nil +} diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index a68fa7d32..6696bf022 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -848,6 +848,11 @@ func (in *RackConfig) DeepCopyInto(out *RackConfig) { *out = new(intstr.IntOrString) **out = **in } + if in.MaxIgnorablePods != nil { + in, out := &in.MaxIgnorablePods, &out.MaxIgnorablePods + *out = new(intstr.IntOrString) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RackConfig. diff --git a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml index a60371e1c..266989bf2 100644 --- a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml +++ b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml @@ -4593,6 +4593,24 @@ spec: Aerospike cluster. Pods will be deployed in given racks based on given configuration properties: + maxIgnorablePods: + anyOf: + - type: integer + - type: string + description: MaxIgnorablePods is the maximum number/percentage + of pending/failed pods in a rack that are ignored while assessing + cluster stability. Pods identified using this value are not + considered part of the cluster. Additionally, in SC mode clusters, + these pods are removed from the roster. This is particularly + useful when some pods are stuck in pending/failed state due + to any scheduling issues and cannot be fixed by simply updating + the CR. It enables the operator to perform specific operations + on the cluster, like changing Aerospike configurations, without + being hindered by these problematic pods. Remember to set MaxIgnorablePods + back to 0 once the required operation is done. This makes sure + that later on, all pods are properly counted when evaluating + the cluster stability. + x-kubernetes-int-or-string: true namespaces: description: List of Aerospike namespaces for which rack feature will be enabled @@ -13330,6 +13348,24 @@ spec: given configuration nullable: true properties: + maxIgnorablePods: + anyOf: + - type: integer + - type: string + description: MaxIgnorablePods is the maximum number/percentage + of pending/failed pods in a rack that are ignored while assessing + cluster stability. Pods identified using this value are not + considered part of the cluster. Additionally, in SC mode clusters, + these pods are removed from the roster. This is particularly + useful when some pods are stuck in pending/failed state due + to any scheduling issues and cannot be fixed by simply updating + the CR. It enables the operator to perform specific operations + on the cluster, like changing Aerospike configurations, without + being hindered by these problematic pods. Remember to set MaxIgnorablePods + back to 0 once the required operation is done. This makes sure + that later on, all pods are properly counted when evaluating + the cluster stability. + x-kubernetes-int-or-string: true namespaces: description: List of Aerospike namespaces for which rack feature will be enabled diff --git a/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml b/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml index 53aa7c3f5..ed26895a5 100644 --- a/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml +++ b/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml @@ -60,6 +60,10 @@ spec: cluster. Pods will be deployed in given racks based on given configuration displayName: Rack Config path: rackConfig + - description: RosterNodeBlockList is a list of blocked nodeIDs from roster + in a strong-consistency setup + displayName: Roster Node BlockList + path: rosterNodeBlockList - description: SeedsFinderServices creates additional Kubernetes service that allow clients to discover Aerospike cluster nodes. displayName: Seeds Finder Services diff --git a/controllers/aero_info_calls.go b/controllers/aero_info_calls.go index 0978ee2b3..093da495b 100644 --- a/controllers/aero_info_calls.go +++ b/controllers/aero_info_calls.go @@ -18,6 +18,7 @@ import ( "time" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" as "github.com/aerospike/aerospike-client-go/v6" asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" @@ -29,24 +30,25 @@ import ( // Aerospike helper // ------------------------------------------------------------------------------------ -// waitForMultipleNodesSafeStopReady waits util the input pods is safe to stop, -// skipping pods that are not running and present in ignorablePods for stability check. -// The ignorablePods list should be a list of failed or pending pods that are going to be -// deleted eventually and are safe to ignore in stability checks. +// waitForMultipleNodesSafeStopReady waits until the input pods are safe to stop, +// skipping pods that are not running and present in ignorablePodNames for stability check. +// The ignorablePodNames is the list of failed or pending pods that are either:: +// 1. going to be deleted eventually and are safe to ignore in stability checks +// 2. given in ignorePodList by the user and are safe to ignore in stability checks func (r *SingleClusterReconciler) waitForMultipleNodesSafeStopReady( - pods []*corev1.Pod, ignorablePods []corev1.Pod, + pods []*corev1.Pod, ignorablePodNames sets.Set[string], ) reconcileResult { if len(pods) == 0 { return reconcileSuccess() } // Remove a node only if cluster is stable - if err := r.waitForAllSTSToBeReady(); err != nil { + if err := r.waitForAllSTSToBeReady(ignorablePodNames); err != nil { return reconcileError(fmt.Errorf("failed to wait for cluster to be ready: %v", err)) } // This doesn't make actual connection, only objects having connection info are created - allHostConns, err := r.newAllHostConnWithOption(ignorablePods) + allHostConns, err := r.newAllHostConnWithOption(ignorablePodNames) if err != nil { return reconcileError(fmt.Errorf("failed to get hostConn for aerospike cluster nodes: %v", err)) } @@ -64,12 +66,12 @@ func (r *SingleClusterReconciler) waitForMultipleNodesSafeStopReady( } // Setup roster after migration. - if err = r.getAndSetRoster(policy, r.aeroCluster.Spec.RosterNodeBlockList, ignorablePods); err != nil { + if err = r.getAndSetRoster(policy, r.aeroCluster.Spec.RosterNodeBlockList, ignorablePodNames); err != nil { r.Log.Error(err, "Failed to set roster for cluster") return reconcileRequeueAfter(1) } - if err := r.quiescePods(policy, allHostConns, pods, ignorablePods); err != nil { + if err := r.quiescePods(policy, allHostConns, pods, ignorablePodNames); err != nil { return reconcileError(err) } @@ -77,7 +79,7 @@ func (r *SingleClusterReconciler) waitForMultipleNodesSafeStopReady( } func (r *SingleClusterReconciler) quiescePods( - policy *as.ClientPolicy, allHostConns []*deployment.HostConn, pods []*corev1.Pod, ignorablePods []corev1.Pod, + policy *as.ClientPolicy, allHostConns []*deployment.HostConn, pods []*corev1.Pod, ignorablePodNames sets.Set[string], ) error { podList := make([]corev1.Pod, 0, len(pods)) @@ -85,7 +87,7 @@ func (r *SingleClusterReconciler) quiescePods( podList = append(podList, *pods[idx]) } - selectedHostConns, err := r.newPodsHostConnWithOption(podList, ignorablePods) + selectedHostConns, err := r.newPodsHostConnWithOption(podList, ignorablePodNames) if err != nil { return err } @@ -173,15 +175,9 @@ func (r *SingleClusterReconciler) alumniReset(pod *corev1.Pod) error { return asConn.AlumniReset(r.getClientPolicy()) } -func (r *SingleClusterReconciler) newAllHostConn() ( - []*deployment.HostConn, error, -) { - return r.newAllHostConnWithOption(nil) -} - // newAllHostConnWithOption returns connections to all pods in the cluster skipping pods that are not running and // present in ignorablePods. -func (r *SingleClusterReconciler) newAllHostConnWithOption(ignorablePods []corev1.Pod) ( +func (r *SingleClusterReconciler) newAllHostConnWithOption(ignorablePodNames sets.Set[string]) ( []*deployment.HostConn, error, ) { podList, err := r.getClusterPodList() @@ -193,12 +189,12 @@ func (r *SingleClusterReconciler) newAllHostConnWithOption(ignorablePods []corev return nil, fmt.Errorf("pod list empty") } - return r.newPodsHostConnWithOption(podList.Items, ignorablePods) + return r.newPodsHostConnWithOption(podList.Items, ignorablePodNames) } // newPodsHostConnWithOption returns connections to all pods given skipping pods that are not running and // present in ignorablePods. -func (r *SingleClusterReconciler) newPodsHostConnWithOption(pods, ignorablePods []corev1.Pod) ( +func (r *SingleClusterReconciler) newPodsHostConnWithOption(pods []corev1.Pod, ignorablePodNames sets.Set[string]) ( []*deployment.HostConn, error, ) { hostConns := make([]*deployment.HostConn, 0, len(pods)) @@ -211,8 +207,7 @@ func (r *SingleClusterReconciler) newPodsHostConnWithOption(pods, ignorablePods // Checking if all the container in the pod are ready or not if !utils.IsPodRunningAndReady(pod) { - ignorablePod := utils.GetPod(pod.Name, ignorablePods) - if ignorablePod != nil { + if ignorablePodNames.Has(pod.Name) { // This pod is not running and ignorable. r.Log.Info( "Ignoring info call on non-running pod ", "pod", pod.Name, @@ -259,7 +254,7 @@ func hostID(hostName string, hostPort int) string { func (r *SingleClusterReconciler) setMigrateFillDelay( policy *as.ClientPolicy, - asConfig *asdbv1.AerospikeConfigSpec, setToZero bool, ignorablePods []corev1.Pod, + asConfig *asdbv1.AerospikeConfigSpec, setToZero bool, ignorablePodNames sets.Set[string], ) reconcileResult { migrateFillDelay, err := asdbv1.GetMigrateFillDelay(asConfig) if err != nil { @@ -286,7 +281,7 @@ func (r *SingleClusterReconciler) setMigrateFillDelay( } // This doesn't make actual connection, only objects having connection info are created - allHostConns, err := r.newAllHostConnWithOption(ignorablePods) + allHostConns, err := r.newAllHostConnWithOption(ignorablePodNames) if err != nil { return reconcileError( fmt.Errorf( diff --git a/controllers/pod.go b/controllers/pod.go index 8574b3c1c..1244e6f3e 100644 --- a/controllers/pod.go +++ b/controllers/pod.go @@ -13,6 +13,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" @@ -51,7 +52,7 @@ func mergeRestartType(current, incoming RestartType) RestartType { // Fetching RestartType of all pods, based on the operation being performed. func (r *SingleClusterReconciler) getRollingRestartTypeMap( - rackState *RackState, pods []*corev1.Pod, + rackState *RackState, pods []*corev1.Pod, ignorablePodNames sets.Set[string], ) (map[string]RestartType, error) { var addedNSDevices []string @@ -65,9 +66,13 @@ func (r *SingleClusterReconciler) getRollingRestartTypeMap( requiredConfHash := confMap.Data[aerospikeConfHashFileName] for idx := range pods { + if ignorablePodNames.Has(pods[idx].Name) { + continue + } + podStatus := r.aeroCluster.Status.Pods[pods[idx].Name] if addedNSDevices == nil && podStatus.AerospikeConfigHash != requiredConfHash { - // Fetching all block devices that has been added in namespaces. + // Fetching all block devices that have been added in namespaces. addedNSDevices, err = r.getNSAddedDevices(rackState) if err != nil { return nil, err @@ -146,10 +151,10 @@ func (r *SingleClusterReconciler) getRollingRestartTypePod( } func (r *SingleClusterReconciler) rollingRestartPods( - rackState *RackState, podsToRestart []*corev1.Pod, ignorablePods []corev1.Pod, + rackState *RackState, podsToRestart []*corev1.Pod, ignorablePodNames sets.Set[string], restartTypeMap map[string]RestartType, ) reconcileResult { - failedPods, activePods := getFailedAndActivePods(podsToRestart) + failedPods, activePods := getFailedAndActivePods(podsToRestart, ignorablePodNames) // If already dead node (failed pod) then no need to check node safety, migration if len(failedPods) != 0 { @@ -163,7 +168,7 @@ func (r *SingleClusterReconciler) rollingRestartPods( if len(activePods) != 0 { r.Log.Info("Restart active pods", "pods", getPodNames(activePods)) - if res := r.waitForMultipleNodesSafeStopReady(activePods, ignorablePods); !res.isSuccess { + if res := r.waitForMultipleNodesSafeStopReady(activePods, ignorablePodNames); !res.isSuccess { return res } @@ -345,9 +350,14 @@ func (r *SingleClusterReconciler) ensurePodsRunningAndReady(podsToCheck []*corev return reconcileRequeueAfter(10) } -func getFailedAndActivePods(pods []*corev1.Pod) (failedPods, activePods []*corev1.Pod) { +func getFailedAndActivePods(pods []*corev1.Pod, ignorablePodNames sets.Set[string], +) (failedPods, activePods []*corev1.Pod) { for idx := range pods { pod := pods[idx] + if ignorablePodNames.Has(pod.Name) { + continue + } + if err := utils.CheckPodFailed(pod); err != nil { failedPods = append(failedPods, pod) continue @@ -360,9 +370,9 @@ func getFailedAndActivePods(pods []*corev1.Pod) (failedPods, activePods []*corev } func (r *SingleClusterReconciler) safelyDeletePodsAndEnsureImageUpdated( - rackState *RackState, podsToUpdate []*corev1.Pod, ignorablePods []corev1.Pod, + rackState *RackState, podsToUpdate []*corev1.Pod, ignorablePodNames sets.Set[string], ) reconcileResult { - failedPods, activePods := getFailedAndActivePods(podsToUpdate) + failedPods, activePods := getFailedAndActivePods(podsToUpdate, ignorablePodNames) // If already dead node (failed pod) then no need to check node safety, migration if len(failedPods) != 0 { @@ -376,7 +386,7 @@ func (r *SingleClusterReconciler) safelyDeletePodsAndEnsureImageUpdated( if len(activePods) != 0 { r.Log.Info("Restart active pods with updated container image", "pods", getPodNames(activePods)) - if res := r.waitForMultipleNodesSafeStopReady(activePods, ignorablePods); !res.isSuccess { + if res := r.waitForMultipleNodesSafeStopReady(activePods, ignorablePodNames); !res.isSuccess { return res } @@ -415,7 +425,7 @@ func (r *SingleClusterReconciler) deletePodAndEnsureImageUpdated( func (r *SingleClusterReconciler) ensurePodsImageUpdated(podsToCheck []*corev1.Pod) reconcileResult { podNames := getPodNames(podsToCheck) - updatedPods := map[string]bool{} + updatedPods := sets.Set[string]{} const ( maxRetries = 6 @@ -428,7 +438,7 @@ func (r *SingleClusterReconciler) ensurePodsImageUpdated(podsToCheck []*corev1.P ) for _, pod := range podsToCheck { - if updatedPods[pod.Name] { + if updatedPods.Has(pod.Name) { continue } @@ -451,7 +461,7 @@ func (r *SingleClusterReconciler) ensurePodsImageUpdated(podsToCheck []*corev1.P break } - updatedPods[pod.Name] = true + updatedPods.Insert(pod.Name) r.Log.Info("Pod is upgraded/downgraded", "podName", pod.Name) } @@ -638,12 +648,14 @@ func (r *SingleClusterReconciler) cleanupDanglingPodsRack(sts *appsv1.StatefulSe return nil } -// getIgnorablePods returns pods from racksToDelete that are currently not running and can be ignored in stability -// checks. -func (r *SingleClusterReconciler) getIgnorablePods(racksToDelete []asdbv1.Rack) ( - []corev1.Pod, error, +// getIgnorablePods returns pods: +// 1. From racksToDelete that are currently not running and can be ignored in stability checks. +// 2. Failed/pending pods from the configuredRacks identified using maxIgnorablePods field and +// can be ignored from stability checks. +func (r *SingleClusterReconciler) getIgnorablePods(racksToDelete []asdbv1.Rack, configuredRacks []RackState) ( + sets.Set[string], error, ) { - var ignorablePods []corev1.Pod + ignorablePodNames := sets.Set[string]{} for rackIdx := range racksToDelete { rackPods, err := r.getRackPodList(racksToDelete[rackIdx].ID) @@ -654,14 +666,56 @@ func (r *SingleClusterReconciler) getIgnorablePods(racksToDelete []asdbv1.Rack) for podIdx := range rackPods.Items { pod := rackPods.Items[podIdx] if !utils.IsPodRunningAndReady(&pod) { - ignorablePods = append(ignorablePods, pod) + ignorablePodNames.Insert(pod.Name) } } } - return ignorablePods, nil -} + for idx := range configuredRacks { + rack := &configuredRacks[idx] + + failedAllowed, _ := intstr.GetScaledValueFromIntOrPercent( + r.aeroCluster.Spec.RackConfig.MaxIgnorablePods, rack.Size, false, + ) + + podList, err := r.getRackPodList(rack.Rack.ID) + if err != nil { + return nil, err + } + + var ( + failedPod []string + pendingPod []string + ) + + for podIdx := range podList.Items { + pod := &podList.Items[podIdx] + if !utils.IsPodRunningAndReady(pod) { + if utils.IsPodReasonUnschedulable(pod) { + pendingPod = append(pendingPod, pod.Name) + continue + } + + failedPod = append(failedPod, pod.Name) + } + } + + // prepend pendingPod to failedPod + failedPod = append(pendingPod, failedPod...) + + for podIdx := range failedPod { + if failedAllowed <= 0 { + break + } + + ignorablePodNames.Insert(failedPod[podIdx]) + failedAllowed-- + } + } + + return ignorablePodNames, nil +} func (r *SingleClusterReconciler) getPodsPVCList( podNames []string, rackID int, ) ([]corev1.PersistentVolumeClaim, error) { @@ -706,9 +760,14 @@ func (r *SingleClusterReconciler) getClusterPodList() ( return podList, nil } -func (r *SingleClusterReconciler) isAnyPodInImageFailedState(podList []corev1.Pod) bool { +func (r *SingleClusterReconciler) isAnyPodInImageFailedState(podList []corev1.Pod, ignorablePodNames sets.Set[string], +) bool { for idx := range podList { pod := &podList[idx] + if ignorablePodNames.Has(pod.Name) { + continue + } + // TODO: Should we use checkPodFailed or CheckPodImageFailed? // scaleDown, rollingRestart should work even if node is crashed // If node was crashed due to wrong config then only rollingRestart can bring it back. diff --git a/controllers/rack.go b/controllers/rack.go index 2c13fcd1b..66e269349 100644 --- a/controllers/rack.go +++ b/controllers/rack.go @@ -12,6 +12,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" @@ -44,19 +45,14 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { rackIDsToDelete = append(rackIDsToDelete, racksToDelete[idx].ID) } - ignorablePods, err := r.getIgnorablePods(racksToDelete) + ignorablePodNames, err := r.getIgnorablePods(racksToDelete, rackStateList) if err != nil { return reconcileError(err) } - ignorablePodNames := make([]string, 0, len(ignorablePods)) - for idx := range ignorablePods { - ignorablePodNames = append(ignorablePodNames, ignorablePods[idx].Name) - } - r.Log.Info( "Rack changes", "racksToDelete", rackIDsToDelete, "ignorablePods", - ignorablePodNames, + ignorablePodNames.UnsortedList(), ) // Handle failed racks @@ -85,12 +81,12 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { ) } - failedPods, _ := getFailedAndActivePods(podList) + failedPods, _ := getFailedAndActivePods(podList, ignorablePodNames) if len(failedPods) != 0 { r.Log.Info("Reconcile the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", failedPods) if res = r.reconcileRack( - found, state, ignorablePods, failedPods, + found, state, ignorablePodNames, failedPods, ); !res.isSuccess { return res } @@ -111,11 +107,11 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { ) } - failedPods, _ = getFailedAndActivePods(podList) + failedPods, _ = getFailedAndActivePods(podList, ignorablePodNames) if len(failedPods) != 0 { r.Log.Info("Restart the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", failedPods) - if _, res = r.rollingRestartRack(found, state, ignorablePods, nil, + if _, res = r.rollingRestartRack(found, state, ignorablePodNames, nil, failedPods); !res.isSuccess { return res } @@ -149,7 +145,7 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { } else { // Reconcile other statefulset if res = r.reconcileRack( - found, state, ignorablePods, nil, + found, state, ignorablePodNames, nil, ); !res.isSuccess { return res } @@ -161,14 +157,14 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { state := scaledDownRackList[idx].rackState sts := scaledDownRackList[idx].rackSTS - if res = r.reconcileRack(sts, state, ignorablePods, nil); !res.isSuccess { + if res = r.reconcileRack(sts, state, ignorablePodNames, nil); !res.isSuccess { return res } } if len(r.aeroCluster.Status.RackConfig.Racks) != 0 { // Remove removed racks - if res = r.deleteRacks(racksToDelete, ignorablePods); !res.isSuccess { + if res = r.deleteRacks(racksToDelete, ignorablePodNames); !res.isSuccess { if res.err != nil { r.Log.Error( err, "Failed to remove statefulset for removed racks", @@ -204,7 +200,7 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { } // Wait for pods to be ready. - if err := r.waitForSTSToBeReady(found); err != nil { + if err := r.waitForSTSToBeReady(found, ignorablePodNames); err != nil { // If the wait times out try again. // The wait is required in cases where scale up waits for a pod to // terminate times out and event is re-queued. @@ -291,7 +287,7 @@ func (r *SingleClusterReconciler) getRacksToDelete(rackStateList []RackState) ( } func (r *SingleClusterReconciler) deleteRacks( - racksToDelete []asdbv1.Rack, ignorablePods []corev1.Pod, + racksToDelete []asdbv1.Rack, ignorablePodNames sets.Set[string], ) reconcileResult { for idx := range racksToDelete { rack := &racksToDelete[idx] @@ -311,7 +307,7 @@ func (r *SingleClusterReconciler) deleteRacks( // TODO: Add option for quick delete of rack. DefaultRackID should always be removed gracefully rackState := &RackState{Size: 0, Rack: rack} - found, res := r.scaleDownRack(found, rackState, ignorablePods) + found, res := r.scaleDownRack(found, rackState, ignorablePodNames) if !res.isSuccess { return res } @@ -349,12 +345,12 @@ func (r *SingleClusterReconciler) deleteRacks( } func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.StatefulSet, rackState *RackState, - ignorablePods []corev1.Pod, failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) { + ignorablePodNames sets.Set[string], failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) { var res reconcileResult // Always update configMap. We won't be able to find if a rack's config, and it's pod config is in sync or not // Checking rack.spec, rack.status will not work. // We may change config, let some pods restart with new config and then change config back to original value. - // Now rack.spec, rack.status will be same but few pods will have changed config. + // Now rack.spec, rack.status will be same, but few pods will have changed config. // So a check based on spec and status will skip configMap update. // Hence, a rolling restart of pod will never bring pod to desired config if err := r.updateSTSConfigMap( @@ -371,13 +367,13 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat } // Upgrade - upgradeNeeded, err := r.isRackUpgradeNeeded(rackState.Rack.ID) + upgradeNeeded, err := r.isRackUpgradeNeeded(rackState.Rack.ID, ignorablePodNames) if err != nil { return found, reconcileError(err) } if upgradeNeeded { - found, res = r.upgradeRack(found, rackState, ignorablePods, failedPods) + found, res = r.upgradeRack(found, rackState, ignorablePodNames, failedPods) if !res.isSuccess { if res.err != nil { r.Log.Error( @@ -396,13 +392,13 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat return found, res } } else { - var needRollingRestartRack, restartTypeMap, nErr = r.needRollingRestartRack(rackState) + var needRollingRestartRack, restartTypeMap, nErr = r.needRollingRestartRack(rackState, ignorablePodNames) if nErr != nil { return found, reconcileError(nErr) } if needRollingRestartRack { - found, res = r.rollingRestartRack(found, rackState, ignorablePods, restartTypeMap, failedPods) + found, res = r.rollingRestartRack(found, rackState, ignorablePodNames, restartTypeMap, failedPods) if !res.isSuccess { if res.err != nil { r.Log.Error( @@ -423,11 +419,49 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat } } + if r.aeroCluster.Spec.RackConfig.MaxIgnorablePods != nil { + if res := r.handleNSOrDeviceRemovalForIgnorablePods(rackState, ignorablePodNames); !res.isSuccess { + return found, res + } + } + return found, reconcileSuccess() } +func (r *SingleClusterReconciler) handleNSOrDeviceRemovalForIgnorablePods( + rackState *RackState, ignorablePodNames sets.Set[string], +) reconcileResult { + podList, err := r.getOrderedRackPodList(rackState.Rack.ID) + if err != nil { + return reconcileError(fmt.Errorf("failed to list pods: %v", err)) + } + // Filter ignoredPods to update their dirtyVolumes in the status. + // IgnoredPods are skipped from upgrade/rolling restart, and as a result in case of device removal, dirtyVolumes + // are not updated in their pod status. This makes devices un-reusable as they cannot be cleaned up during init phase. + // So, explicitly add dirtyVolumes for ignoredPods, so that they can be cleaned in the init phase. + var ignoredPod []*corev1.Pod + + for idx := range podList { + pod := podList[idx] + // Pods, that are not in status are not even initialized, so no need to update dirtyVolumes. + if _, ok := r.aeroCluster.Status.Pods[pod.Name]; ok { + if ignorablePodNames.Has(pod.Name) { + ignoredPod = append(ignoredPod, pod) + } + } + } + + if len(ignoredPod) > 0 { + if err := r.handleNSOrDeviceRemoval(rackState, ignoredPod); err != nil { + return reconcileError(err) + } + } + + return reconcileSuccess() +} + func (r *SingleClusterReconciler) reconcileRack( - found *appsv1.StatefulSet, rackState *RackState, ignorablePods []corev1.Pod, failedPods []*corev1.Pod, + found *appsv1.StatefulSet, rackState *RackState, ignorablePodNames sets.Set[string], failedPods []*corev1.Pod, ) reconcileResult { r.Log.Info( "Reconcile existing Aerospike cluster statefulset", "stsName", @@ -446,7 +480,7 @@ func (r *SingleClusterReconciler) reconcileRack( // Scale down if currentSize > desiredSize { - found, res = r.scaleDownRack(found, rackState, ignorablePods) + found, res = r.scaleDownRack(found, rackState, ignorablePodNames) if !res.isSuccess { if res.err != nil { r.Log.Error( @@ -491,7 +525,7 @@ func (r *SingleClusterReconciler) reconcileRack( return reconcileError(err) } - found, res = r.upgradeOrRollingRestartRack(found, rackState, ignorablePods, failedPods) + found, res = r.upgradeOrRollingRestartRack(found, rackState, ignorablePodNames, failedPods) if !res.isSuccess { return res } @@ -499,7 +533,7 @@ func (r *SingleClusterReconciler) reconcileRack( // Scale up after upgrading, so that new pods come up with new image currentSize = *found.Spec.Replicas if currentSize < desiredSize { - found, res = r.scaleUpRack(found, rackState) + found, res = r.scaleUpRack(found, rackState, ignorablePodNames) if !res.isSuccess { r.Log.Error( res.err, "Failed to scaleUp StatefulSet pods", "stsName", @@ -517,7 +551,7 @@ func (r *SingleClusterReconciler) reconcileRack( } } - // All regular operation are complete. Take time and cleanup dangling nodes that have not been cleaned up + // All regular operations are complete. Take time and cleanup dangling nodes that have not been cleaned up // previously due to errors. if err := r.cleanupDanglingPodsRack(found, rackState); err != nil { return reconcileError(err) @@ -535,7 +569,9 @@ func (r *SingleClusterReconciler) reconcileRack( return reconcileSuccess() } -func (r *SingleClusterReconciler) scaleUpRack(found *appsv1.StatefulSet, rackState *RackState) ( +func (r *SingleClusterReconciler) scaleUpRack( + found *appsv1.StatefulSet, rackState *RackState, ignorablePodNames sets.Set[string], +) ( *appsv1.StatefulSet, reconcileResult, ) { desiredSize := int32(rackState.Size) @@ -556,7 +592,7 @@ func (r *SingleClusterReconciler) scaleUpRack(found *appsv1.StatefulSet, rackSta return found, reconcileError(fmt.Errorf("failed to list pods: %v", err)) } - if r.isAnyPodInImageFailedState(podList.Items) { + if r.isAnyPodInImageFailedState(podList.Items, ignorablePodNames) { return found, reconcileError(fmt.Errorf("cannot scale up AerospikeCluster. A pod is already in failed state")) } @@ -621,7 +657,7 @@ func (r *SingleClusterReconciler) scaleUpRack(found *appsv1.StatefulSet, rackSta } func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, rackState *RackState, - ignorablePods []corev1.Pod, failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) { + ignorablePodNames sets.Set[string], failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) { var ( err error podList []*corev1.Pod @@ -656,13 +692,18 @@ func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, r ) } - // Find pods which needs to be updated + // Find pods which need to be updated podsToUpgrade := make([]*corev1.Pod, 0, len(podList)) for idx := range podList { pod := podList[idx] r.Log.Info("Check if pod needs upgrade or not", "podName", pod.Name) + if ignorablePodNames.Has(pod.Name) { + r.Log.Info("Pod found in ignore pod list, skipping", "podName", pod.Name) + continue + } + if r.isPodUpgraded(pod) { r.Log.Info("Pod doesn't need upgrade", "podName", pod.Name) continue @@ -706,7 +747,7 @@ func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, r "[rack-%d] Updating Containers on Pods %v", rackState.Rack.ID, podNames, ) - res := r.safelyDeletePodsAndEnsureImageUpdated(rackState, podsBatch, ignorablePods) + res := r.safelyDeletePodsAndEnsureImageUpdated(rackState, podsBatch, ignorablePodNames) if !res.isSuccess { return statefulSet, res } @@ -737,7 +778,7 @@ func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, r } func (r *SingleClusterReconciler) scaleDownRack( - found *appsv1.StatefulSet, rackState *RackState, ignorablePods []corev1.Pod, + found *appsv1.StatefulSet, rackState *RackState, ignorablePodNames sets.Set[string], ) (*appsv1.StatefulSet, reconcileResult) { desiredSize := int32(rackState.Size) @@ -762,7 +803,7 @@ func (r *SingleClusterReconciler) scaleDownRack( return found, reconcileError(fmt.Errorf("failed to list pods: %v", err)) } - if r.isAnyPodInImageFailedState(oldPodList.Items) { + if r.isAnyPodInImageFailedState(oldPodList.Items, ignorablePodNames) { return found, reconcileError(fmt.Errorf("cannot scale down AerospikeCluster. A pod is already in failed state")) } @@ -782,7 +823,7 @@ func (r *SingleClusterReconciler) scaleDownRack( // Ignore safe stop check if pod is not running. // Ignore migrate-fill-delay if pod is not running. Deleting this pod will not lead to any migration. if isPodRunningAndReady { - if res := r.waitForMultipleNodesSafeStopReady([]*corev1.Pod{pod}, ignorablePods); !res.isSuccess { + if res := r.waitForMultipleNodesSafeStopReady([]*corev1.Pod{pod}, ignorablePodNames); !res.isSuccess { // The pod is running and is unsafe to terminate. return found, res } @@ -793,7 +834,7 @@ func (r *SingleClusterReconciler) scaleDownRack( // setting migrate-fill-delay will fail if there are any failed pod if res := r.setMigrateFillDelay( policy, &rackState.Rack.AerospikeConfig, true, - append(ignorablePods, *pod), + ignorablePodNames.Insert(pod.Name), ); !res.isSuccess { return found, res } @@ -818,7 +859,7 @@ func (r *SingleClusterReconciler) scaleDownRack( // These checks will fail if there is any other pod in failed state. if isPodRunningAndReady { // Wait for pods to get terminated - if err = r.waitForSTSToBeReady(found); err != nil { + if err = r.waitForSTSToBeReady(found, ignorablePodNames); err != nil { r.Log.Error(err, "Failed to wait for statefulset to be ready") return found, reconcileRequeueAfter(1) } @@ -828,7 +869,7 @@ func (r *SingleClusterReconciler) scaleDownRack( // This can be left to the user but if we would do it here on our own then we can reuse // objects like pvc and service. These objects would have been removed if scaleup is left for the user. // In case of rolling restart, no pod cleanup happens, therefore rolling config back is left to the user. - if err = r.validateSCClusterState(policy, ignorablePods); err != nil { + if err = r.validateSCClusterState(policy, ignorablePodNames); err != nil { // reset cluster size newSize := *found.Spec.Replicas + 1 found.Spec.Replicas = &newSize @@ -890,7 +931,7 @@ func (r *SingleClusterReconciler) scaleDownRack( } func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, rackState *RackState, - ignorablePods []corev1.Pod, restartTypeMap map[string]RestartType, + ignorablePodNames sets.Set[string], restartTypeMap map[string]RestartType, failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) { r.Log.Info("Rolling restart AerospikeCluster statefulset pods") @@ -924,7 +965,7 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, pods = append(pods, *podList[idx]) } - if len(failedPods) != 0 && r.isAnyPodInImageFailedState(pods) { + if len(failedPods) != 0 && r.isAnyPodInImageFailedState(pods, ignorablePodNames) { return found, reconcileError( fmt.Errorf( "cannot Rolling restart AerospikeCluster. " + @@ -944,12 +985,17 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, "Statefulset spec updated - doing rolling restart", ) - // Find pods which needs restart + // Find pods which need restart podsToRestart := make([]*corev1.Pod, 0, len(podList)) for idx := range podList { pod := podList[idx] + if ignorablePodNames.Has(pod.Name) { + r.Log.Info("Pod found in ignore pod list, skipping", "podName", pod.Name) + continue + } + restartType := restartTypeMap[pod.Name] if restartType == noRestart { r.Log.Info("This Pod doesn't need rolling restart, Skip this", "pod", pod.Name) @@ -990,7 +1036,7 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, return nil, reconcileError(err) } - if res := r.rollingRestartPods(rackState, podsBatch, ignorablePods, restartTypeMap); !res.isSuccess { + if res := r.rollingRestartPods(rackState, podsBatch, ignorablePodNames, restartTypeMap); !res.isSuccess { return found, res } @@ -1015,7 +1061,7 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, return found, reconcileSuccess() } -func (r *SingleClusterReconciler) needRollingRestartRack(rackState *RackState) ( +func (r *SingleClusterReconciler) needRollingRestartRack(rackState *RackState, ignorablePodNames sets.Set[string]) ( needRestart bool, restartTypeMap map[string]RestartType, err error, ) { podList, err := r.getOrderedRackPodList(rackState.Rack.ID) @@ -1023,7 +1069,7 @@ func (r *SingleClusterReconciler) needRollingRestartRack(rackState *RackState) ( return false, nil, fmt.Errorf("failed to list pods: %v", err) } - restartTypeMap, err = r.getRollingRestartTypeMap(rackState, podList) + restartTypeMap, err = r.getRollingRestartTypeMap(rackState, podList, ignorablePodNames) if err != nil { return false, nil, err } @@ -1037,7 +1083,7 @@ func (r *SingleClusterReconciler) needRollingRestartRack(rackState *RackState) ( return false, nil, nil } -func (r *SingleClusterReconciler) isRackUpgradeNeeded(rackID int) ( +func (r *SingleClusterReconciler) isRackUpgradeNeeded(rackID int, ignorablePodNames sets.Set[string]) ( bool, error, ) { podList, err := r.getRackPodList(rackID) @@ -1047,6 +1093,11 @@ func (r *SingleClusterReconciler) isRackUpgradeNeeded(rackID int) ( for idx := range podList.Items { pod := &podList.Items[idx] + + if ignorablePodNames.Has(pod.Name) { + continue + } + if !r.isPodOnDesiredImage(pod, true) { r.Log.Info("Pod needs upgrade/downgrade", "podName", pod.Name) return true, nil diff --git a/controllers/reconciler.go b/controllers/reconciler.go index 050c0123b..58b4417bb 100644 --- a/controllers/reconciler.go +++ b/controllers/reconciler.go @@ -124,9 +124,16 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { return reconcile.Result{}, err } + ignorablePodNames, err := r.getIgnorablePods(nil, getConfiguredRackStateList(r.aeroCluster)) + if err != nil { + r.Log.Error(err, "Failed to determine pods to be ignored") + + return reconcile.Result{}, err + } + // Check if there is any node with quiesce status. We need to undo that // It may have been left from previous steps - allHostConns, err := r.newAllHostConn() + allHostConns, err := r.newAllHostConnWithOption(ignorablePodNames) if err != nil { e := fmt.Errorf( "failed to get hostConn for aerospike cluster nodes: %v", err, @@ -146,7 +153,7 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { } // Setup access control. - if err := r.validateAndReconcileAccessControl(); err != nil { + if err := r.validateAndReconcileAccessControl(ignorablePodNames); err != nil { r.Log.Error(err, "Failed to Reconcile access control") r.Recorder.Eventf( r.aeroCluster, corev1.EventTypeWarning, "ACLUpdateFailed", @@ -172,12 +179,12 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { // Use policy from spec after setting up access control policy := r.getClientPolicy() - // revert migrate-fill-delay to original value if it was set to 0 during scale down - // Passing first rack from the list as all the racks will have same migrate-fill-delay + // Revert migrate-fill-delay to original value if it was set to 0 during scale down. + // Passing the first rack from the list as all the racks will have the same migrate-fill-delay // Redundant safe check to revert migrate-fill-delay if previous revert operation missed/skipped somehow if res := r.setMigrateFillDelay( policy, &r.aeroCluster.Spec.RackConfig.Racks[0].AerospikeConfig, - false, nil, + false, ignorablePodNames, ); !res.isSuccess { r.Log.Error(res.err, "Failed to revert migrate-fill-delay") return reconcile.Result{}, res.err @@ -191,7 +198,7 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { } // Setup roster - if err := r.getAndSetRoster(policy, r.aeroCluster.Spec.RosterNodeBlockList, nil); err != nil { + if err := r.getAndSetRoster(policy, r.aeroCluster.Spec.RosterNodeBlockList, ignorablePodNames); err != nil { r.Log.Error(err, "Failed to set roster for cluster") return reconcile.Result{}, err } @@ -209,10 +216,55 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { return reconcile.Result{}, err } + // Try to recover pods only when MaxIgnorablePods is set + if r.aeroCluster.Spec.RackConfig.MaxIgnorablePods != nil { + if res := r.recoverIgnorablePods(); !res.isSuccess { + return res.getResult() + } + } + + r.Log.Info("Reconcile completed successfully") + return reconcile.Result{}, nil } -func (r *SingleClusterReconciler) validateAndReconcileAccessControl() error { +func (r *SingleClusterReconciler) recoverIgnorablePods() reconcileResult { + podList, gErr := r.getClusterPodList() + if gErr != nil { + r.Log.Error(gErr, "Failed to get cluster pod list") + return reconcileError(gErr) + } + + r.Log.Info("Try to recover failed/pending pods if any") + + var anyPodFailed bool + // Try to recover failed/pending pods by deleting them + for idx := range podList.Items { + if cErr := utils.CheckPodFailed(&podList.Items[idx]); cErr != nil { + anyPodFailed = true + + if err := r.createOrUpdatePodServiceIfNeeded([]string{podList.Items[idx].Name}); err != nil { + return reconcileError(err) + } + + if err := r.Client.Delete(context.TODO(), &podList.Items[idx]); err != nil { + r.Log.Error(err, "Failed to delete pod", "pod", podList.Items[idx].Name) + return reconcileError(err) + } + + r.Log.Info("Deleted pod", "pod", podList.Items[idx].Name) + } + } + + if anyPodFailed { + r.Log.Info("Found failed/pending pod(s), requeuing") + return reconcileRequeueAfter(0) + } + + return reconcileSuccess() +} + +func (r *SingleClusterReconciler) validateAndReconcileAccessControl(ignorablePodNames sets.Set[string]) error { version, err := asdbv1.GetImageVersion(r.aeroCluster.Spec.Image) if err != nil { return err @@ -231,7 +283,7 @@ func (r *SingleClusterReconciler) validateAndReconcileAccessControl() error { } // Create client - conns, err := r.newAllHostConn() + conns, err := r.newAllHostConnWithOption(ignorablePodNames) if err != nil { return fmt.Errorf("failed to get host info: %v", err) } diff --git a/controllers/statefulset.go b/controllers/statefulset.go index b4247c9b8..1d36c7596 100644 --- a/controllers/statefulset.go +++ b/controllers/statefulset.go @@ -192,7 +192,7 @@ func (r *SingleClusterReconciler) createSTS( "StatefulSet.Name", st.Name, ) - if err := r.waitForSTSToBeReady(st); err != nil { + if err := r.waitForSTSToBeReady(st, nil); err != nil { return st, fmt.Errorf( "failed to wait for statefulset to be ready: %v", err, ) @@ -209,7 +209,9 @@ func (r *SingleClusterReconciler) deleteSTS(st *appsv1.StatefulSet) error { return r.Client.Delete(context.TODO(), st) } -func (r *SingleClusterReconciler) waitForSTSToBeReady(st *appsv1.StatefulSet) error { +func (r *SingleClusterReconciler) waitForSTSToBeReady( + st *appsv1.StatefulSet, ignorablePodNames sets.Set[string], +) error { const ( podStatusMaxRetry = 18 podStatusRetryInterval = time.Second * 10 @@ -223,6 +225,9 @@ func (r *SingleClusterReconciler) waitForSTSToBeReady(st *appsv1.StatefulSet) er var podIndex int32 for podIndex = 0; podIndex < *st.Spec.Replicas; podIndex++ { podName := getSTSPodName(st.Name, podIndex) + if ignorablePodNames.Has(podName) { + continue + } var isReady bool @@ -1003,7 +1008,7 @@ func updateSTSContainers( return stsContainers[:idx] } -func (r *SingleClusterReconciler) waitForAllSTSToBeReady() error { +func (r *SingleClusterReconciler) waitForAllSTSToBeReady(ignorablePodNames sets.Set[string]) error { r.Log.Info("Waiting for cluster to be ready") allRackIDs := sets.NewInt() @@ -1032,7 +1037,7 @@ func (r *SingleClusterReconciler) waitForAllSTSToBeReady() error { continue } - if err := r.waitForSTSToBeReady(st); err != nil { + if err := r.waitForSTSToBeReady(st, ignorablePodNames); err != nil { return err } } diff --git a/controllers/strong_consistency.go b/controllers/strong_consistency.go index 3ca48582d..7ca99ac66 100644 --- a/controllers/strong_consistency.go +++ b/controllers/strong_consistency.go @@ -1,8 +1,8 @@ package controllers import ( - sets "github.com/deckarep/golang-set/v2" - corev1 "k8s.io/api/core/v1" + gosets "github.com/deckarep/golang-set/v2" + "k8s.io/apimachinery/pkg/util/sets" as "github.com/aerospike/aerospike-client-go/v6" asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" @@ -11,9 +11,9 @@ import ( func (r *SingleClusterReconciler) getAndSetRoster( policy *as.ClientPolicy, rosterNodeBlockList []string, - ignorablePods []corev1.Pod, + ignorablePodNames sets.Set[string], ) error { - allHostConns, err := r.newAllHostConnWithOption(ignorablePods) + allHostConns, err := r.newAllHostConnWithOption(ignorablePodNames) if err != nil { return err } @@ -26,8 +26,9 @@ func (r *SingleClusterReconciler) getAndSetRoster( return deployment.GetAndSetRoster(r.Log, allHostConns, policy, rosterNodeBlockList, ignorableNamespaces) } -func (r *SingleClusterReconciler) validateSCClusterState(policy *as.ClientPolicy, ignorablePods []corev1.Pod) error { - allHostConns, err := r.newAllHostConnWithOption(ignorablePods) +func (r *SingleClusterReconciler) validateSCClusterState(policy *as.ClientPolicy, ignorablePodNames sets.Set[string], +) error { + allHostConns, err := r.newAllHostConnWithOption(ignorablePodNames) if err != nil { return err } @@ -42,8 +43,8 @@ func (r *SingleClusterReconciler) validateSCClusterState(policy *as.ClientPolicy func (r *SingleClusterReconciler) addedSCNamespaces(nodesNamespaces map[string][]string) []string { var ( - specSCNamespaces = sets.NewSet[string]() - newAddedSCNamespaces = sets.NewSet[string]() + specSCNamespaces = gosets.NewSet[string]() + newAddedSCNamespaces = gosets.NewSet[string]() ) // Look inside only 1st rack. SC namespaces should be same across all the racks @@ -58,7 +59,7 @@ func (r *SingleClusterReconciler) addedSCNamespaces(nodesNamespaces map[string][ // Check if SC namespaces are present in all node's namespaces, if not then it's a new SC namespace for _, namespaces := range nodesNamespaces { - nodeNamespaces := sets.NewSet[string](namespaces...) + nodeNamespaces := gosets.NewSet[string](namespaces...) newAddedSCNamespaces.Append(specSCNamespaces.Difference(nodeNamespaces).ToSlice()...) } @@ -66,7 +67,7 @@ func (r *SingleClusterReconciler) addedSCNamespaces(nodesNamespaces map[string][ } func (r *SingleClusterReconciler) getIgnorableNamespaces(allHostConns []*deployment.HostConn) ( - sets.Set[string], error) { + gosets.Set[string], error) { nodesNamespaces, err := deployment.GetClusterNamespaces(r.Log, r.getClientPolicy(), allHostConns) if err != nil { return nil, err @@ -75,7 +76,7 @@ func (r *SingleClusterReconciler) getIgnorableNamespaces(allHostConns []*deploym removedNamespaces := r.removedNamespaces(nodesNamespaces) addSCNamespaces := r.addedSCNamespaces(nodesNamespaces) - ignorableNamespaces := sets.NewSet[string](removedNamespaces...) + ignorableNamespaces := gosets.NewSet[string](removedNamespaces...) ignorableNamespaces.Append(addSCNamespaces...) return ignorableNamespaces, nil diff --git a/go.mod b/go.mod index c6ee1681c..87eff97ac 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/aerospike/aerospike-client-go/v6 v6.14.0 - github.com/aerospike/aerospike-management-lib v0.0.0-20231107182540-fef71e1f5946 + github.com/aerospike/aerospike-management-lib v0.0.0-20231129055344-b6aff63f1dbb github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d github.com/evanphx/json-patch v4.12.0+incompatible github.com/go-logr/logr v1.2.4 diff --git a/go.sum b/go.sum index 7fb23f6a6..abd92dab8 100644 --- a/go.sum +++ b/go.sum @@ -598,8 +598,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= github.com/aerospike/aerospike-client-go/v6 v6.14.0 h1:Z3FcGWJda1sagzdc6Akz4EJ13Pq55Uyn6qtFLrVUDd0= github.com/aerospike/aerospike-client-go/v6 v6.14.0/go.mod h1:/0Wm81GhMqem+9flWcpazPKoRfjFeG6WrQdXGiMNi0A= -github.com/aerospike/aerospike-management-lib v0.0.0-20231107182540-fef71e1f5946 h1:wwCzPj4qk4EfdISK6tzNoEwSLg9vbeqBloNmhfB8mNo= -github.com/aerospike/aerospike-management-lib v0.0.0-20231107182540-fef71e1f5946/go.mod h1:LPOsGG8okRSH4hN9Y8VXFzsfIpBDj2WKEsI/f6wxwaw= +github.com/aerospike/aerospike-management-lib v0.0.0-20231129055344-b6aff63f1dbb h1:ykX3ElBNT/VOUhw/+5+jiFnWw3LSbPfl6eRrhQzBBFk= +github.com/aerospike/aerospike-management-lib v0.0.0-20231129055344-b6aff63f1dbb/go.mod h1:LPOsGG8okRSH4hN9Y8VXFzsfIpBDj2WKEsI/f6wxwaw= github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm/4RlzPXRlREEwqTHAN3T56Bv2ITsFT3gY= github.com/ajstarks/deck/generate v0.0.0-20210309230005-c3f852c02e19/go.mod h1:T13YZdzov6OU0A1+RfKZiZN9ca6VeKdBdyDV+BY97Tk= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= diff --git a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml index a60371e1c..266989bf2 100644 --- a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml +++ b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml @@ -4593,6 +4593,24 @@ spec: Aerospike cluster. Pods will be deployed in given racks based on given configuration properties: + maxIgnorablePods: + anyOf: + - type: integer + - type: string + description: MaxIgnorablePods is the maximum number/percentage + of pending/failed pods in a rack that are ignored while assessing + cluster stability. Pods identified using this value are not + considered part of the cluster. Additionally, in SC mode clusters, + these pods are removed from the roster. This is particularly + useful when some pods are stuck in pending/failed state due + to any scheduling issues and cannot be fixed by simply updating + the CR. It enables the operator to perform specific operations + on the cluster, like changing Aerospike configurations, without + being hindered by these problematic pods. Remember to set MaxIgnorablePods + back to 0 once the required operation is done. This makes sure + that later on, all pods are properly counted when evaluating + the cluster stability. + x-kubernetes-int-or-string: true namespaces: description: List of Aerospike namespaces for which rack feature will be enabled @@ -13330,6 +13348,24 @@ spec: given configuration nullable: true properties: + maxIgnorablePods: + anyOf: + - type: integer + - type: string + description: MaxIgnorablePods is the maximum number/percentage + of pending/failed pods in a rack that are ignored while assessing + cluster stability. Pods identified using this value are not + considered part of the cluster. Additionally, in SC mode clusters, + these pods are removed from the roster. This is particularly + useful when some pods are stuck in pending/failed state due + to any scheduling issues and cannot be fixed by simply updating + the CR. It enables the operator to perform specific operations + on the cluster, like changing Aerospike configurations, without + being hindered by these problematic pods. Remember to set MaxIgnorablePods + back to 0 once the required operation is done. This makes sure + that later on, all pods are properly counted when evaluating + the cluster stability. + x-kubernetes-int-or-string: true namespaces: description: List of Aerospike namespaces for which rack feature will be enabled diff --git a/pkg/utils/pod.go b/pkg/utils/pod.go index 75399f02b..b7c687dd8 100644 --- a/pkg/utils/pod.go +++ b/pkg/utils/pod.go @@ -32,7 +32,7 @@ func CheckPodFailed(pod *corev1.Pod) error { return fmt.Errorf("pod %s has failed status", pod.Name) } - if pod.Status.Phase == corev1.PodPending && isPodReasonUnschedulable(pod) { + if pod.Status.Phase == corev1.PodPending && IsPodReasonUnschedulable(pod) { return fmt.Errorf("pod %s is in unschedulable state", pod.Name) } @@ -210,7 +210,7 @@ func isPodError(reason string) bool { return strings.HasSuffix(reason, "Error") } -func isPodReasonUnschedulable(pod *corev1.Pod) bool { +func IsPodReasonUnschedulable(pod *corev1.Pod) bool { for _, condition := range pod.Status.Conditions { if condition.Type == corev1.PodScheduled && condition.Reason == corev1.PodReasonUnschedulable { return true diff --git a/test/cluster_helper.go b/test/cluster_helper.go index 599b5428a..b1263a8c7 100644 --- a/test/cluster_helper.go +++ b/test/cluster_helper.go @@ -4,6 +4,7 @@ import ( goctx "context" "errors" "fmt" + "reflect" "strconv" "time" @@ -541,6 +542,27 @@ func validateMigrateFillDelay( return err } +func validateDirtyVolumes( + ctx goctx.Context, k8sClient client.Client, + clusterNamespacedName types.NamespacedName, expectedVolumes []string, +) error { + aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName) + if err != nil { + return err + } + + for podName := range aeroCluster.Status.Pods { + if !reflect.DeepEqual(aeroCluster.Status.Pods[podName].DirtyVolumes, expectedVolumes) { + return fmt.Errorf( + "dirtyVolumes mismatch, expected: %v, found %v", expectedVolumes, + aeroCluster.Status.Pods[podName].DirtyVolumes, + ) + } + } + + return nil +} + func upgradeClusterTest( k8sClient client.Client, ctx goctx.Context, clusterNamespacedName types.NamespacedName, image string, diff --git a/test/cluster_test.go b/test/cluster_test.go index d5af68e05..697256638 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -3,14 +3,17 @@ package test import ( goctx "context" "fmt" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" + "github.com/aerospike/aerospike-kubernetes-operator/pkg/utils" ) var _ = Describe( @@ -45,6 +48,11 @@ var _ = Describe( // DeployClusterWithSyslog(ctx) // }, // ) + Context( + "DeployClusterWithMaxIgnorablePod", func() { + clusterWithMaxIgnorablePod(ctx) + }, + ) Context( "CommonNegativeClusterValidationTest", func() { NegativeClusterValidationTest(ctx) @@ -124,6 +132,222 @@ func ScaleDownWithMigrateFillDelay(ctx goctx.Context) { ) } +func clusterWithMaxIgnorablePod(ctx goctx.Context) { + var ( + aeroCluster *asdbv1.AerospikeCluster + err error + nodeList = &v1.NodeList{} + podList = &v1.PodList{} + ) + + clusterNamespacedName := getNamespacedName( + "ignore-pod-cluster", namespace, + ) + + AfterEach( + func() { + err = deleteCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + Context( + "UpdateClusterWithMaxIgnorablePodAndPendingPod", func() { + BeforeEach( + func() { + nodeList, err = getNodeList(ctx, k8sClient) + Expect(err).ToNot(HaveOccurred()) + size := len(nodeList.Items) + + deployClusterForMaxIgnorablePods(ctx, clusterNamespacedName, size) + + By("Scale up 1 pod to make that pod pending due to lack of k8s nodes") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.Size++ + err = k8sClient.Update(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + It( + "Should allow cluster operations with pending pod", func() { + By("Set MaxIgnorablePod and Rolling restart cluster") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + val := intstr.FromInt(1) + aeroCluster.Spec.RackConfig.MaxIgnorablePods = &val + aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = int64(18000) + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By("Upgrade version") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + newImage := baseImage + ":7.0.0.0_2" + aeroCluster.Spec.Image = newImage + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By("Verify pending pod") + podList, err = getPodList(aeroCluster, k8sClient) + + var counter int + + for idx := range podList.Items { + if podList.Items[idx].Status.Phase == v1.PodPending { + counter++ + } + } + // There should be only one pending pod + Expect(counter).To(Equal(1)) + + By("Scale down 1 pod") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.Size-- + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By("Verify if all pods are running") + podList, err = getPodList(aeroCluster, k8sClient) + Expect(err).ToNot(HaveOccurred()) + + for idx := range podList.Items { + Expect(utils.IsPodRunningAndReady(&podList.Items[idx])).To(BeTrue()) + } + }, + ) + }, + ) + + Context( + "UpdateClusterWithMaxIgnorablePodAndFailedPod", func() { + clusterNamespacedName := getNamespacedName( + "ignore-pod-cluster", namespace, + ) + + BeforeEach( + func() { + deployClusterForMaxIgnorablePods(ctx, clusterNamespacedName, 4) + }, + ) + + It( + "Should allow rack deletion with failed pods in different rack", func() { + By("Fail 1-1 aerospike pod") + ignorePodName := clusterNamespacedName.Name + "-1-1" + pod := &v1.Pod{} + + err := k8sClient.Get(ctx, types.NamespacedName{Name: ignorePodName, + Namespace: clusterNamespacedName.Namespace}, pod) + Expect(err).ToNot(HaveOccurred()) + + pod.Spec.Containers[0].Image = "wrong-image" + err = k8sClient.Update(ctx, pod) + Expect(err).ToNot(HaveOccurred()) + + By("Delete rack with id 2") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + val := intstr.FromInt(1) + aeroCluster.Spec.RackConfig.MaxIgnorablePods = &val + aeroCluster.Spec.RackConfig.Racks = getDummyRackConf(1) + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By(fmt.Sprintf("Verify if failed pod %s is automatically recovered", ignorePodName)) + Eventually(func() bool { + err = k8sClient.Get(ctx, types.NamespacedName{Name: ignorePodName, + Namespace: clusterNamespacedName.Namespace}, pod) + + return *pod.Status.ContainerStatuses[0].Started && pod.Status.ContainerStatuses[0].Ready + }, 1*time.Minute).Should(BeTrue()) + + Eventually(func() error { + return InterceptGomegaFailure(func() { + validateRoster(k8sClient, ctx, clusterNamespacedName, scNamespace) + }) + }, 4*time.Minute).Should(BeNil()) + }, + ) + + It( + "Should allow namespace addition and removal with failed pod", func() { + By("Fail 1-1 aerospike pod") + ignorePodName := clusterNamespacedName.Name + "-1-1" + pod := &v1.Pod{} + + err := k8sClient.Get(ctx, types.NamespacedName{Name: ignorePodName, + Namespace: clusterNamespacedName.Namespace}, pod) + Expect(err).ToNot(HaveOccurred()) + + pod.Spec.Containers[0].Image = "wrong-image" + err = k8sClient.Update(ctx, pod) + Expect(err).ToNot(HaveOccurred()) + + By("Set MaxIgnorablePod and Rolling restart by removing namespace") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + val := intstr.FromInt(1) + aeroCluster.Spec.RackConfig.MaxIgnorablePods = &val + nsList := aeroCluster.Spec.AerospikeConfig.Value["namespaces"].([]interface{}) + nsList = nsList[:len(nsList)-1] + aeroCluster.Spec.AerospikeConfig.Value["namespaces"] = nsList + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + err = validateDirtyVolumes(ctx, k8sClient, clusterNamespacedName, []string{"bar"}) + Expect(err).ToNot(HaveOccurred()) + + By("RollingRestart by re-using previously removed namespace storage") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + nsList = aeroCluster.Spec.AerospikeConfig.Value["namespaces"].([]interface{}) + nsList = append(nsList, getNonSCNamespaceConfig("barnew", "/test/dev/xvdf1")) + aeroCluster.Spec.AerospikeConfig.Value["namespaces"] = nsList + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }, + ) + }, + ) +} + +func deployClusterForMaxIgnorablePods(ctx goctx.Context, clusterNamespacedName types.NamespacedName, size int) { + By("Deploying cluster") + + aeroCluster := createDummyAerospikeCluster(clusterNamespacedName, int32(size)) + + // Add a nonsc namespace. This will be used to test dirty volumes + nsList := aeroCluster.Spec.AerospikeConfig.Value["namespaces"].([]interface{}) + nsList = append(nsList, getNonSCNamespaceConfig("bar", "/test/dev/xvdf1")) + aeroCluster.Spec.AerospikeConfig.Value["namespaces"] = nsList + + aeroCluster.Spec.Storage.Volumes = append(aeroCluster.Spec.Storage.Volumes, + asdbv1.VolumeSpec{ + Name: "bar", + Source: asdbv1.VolumeSource{ + PersistentVolume: &asdbv1.PersistentVolumeSpec{ + Size: resource.MustParse("1Gi"), + StorageClass: storageClass, + VolumeMode: v1.PersistentVolumeBlock, + }, + }, + Aerospike: &asdbv1.AerospikeServerVolumeAttachment{ + Path: "/test/dev/xvdf1", + }, + }, + ) + racks := getDummyRackConf(1, 2) + aeroCluster.Spec.RackConfig = asdbv1.RackConfig{ + Namespaces: []string{scNamespace}, Racks: racks} + aeroCluster.Spec.PodSpec.MultiPodPerHost = false + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) +} + // Test cluster deployment with all image post 4.9.0 func DeployClusterForAllImagesPost490(ctx goctx.Context) { // post 4.9.0, need feature-key file diff --git a/test/rack_management_test.go b/test/rack_management_test.go index 68b0d0bfb..7edd0b93b 100644 --- a/test/rack_management_test.go +++ b/test/rack_management_test.go @@ -804,13 +804,16 @@ var _ = Describe( ) BeforeEach( func() { - aeroCluster := createDummyAerospikeCluster(clusterNamespacedName, 2) + nodes, err := getNodeList(ctx, k8sClient) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster := createDummyAerospikeCluster(clusterNamespacedName, int32(len(nodes.Items))) racks := getDummyRackConf(1, 2) aeroCluster.Spec.RackConfig = asdbv1.RackConfig{Racks: racks} aeroCluster.Spec.PodSpec.MultiPodPerHost = false By("Deploying cluster") - err := deployCluster(k8sClient, ctx, aeroCluster) + err = deployCluster(k8sClient, ctx, aeroCluster) Expect(err).ToNot(HaveOccurred()) }, ) @@ -831,22 +834,19 @@ var _ = Describe( aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName) Expect(err).ToNot(HaveOccurred()) - nodes, err := getNodeList(ctx, k8sClient) - Expect(err).ToNot(HaveOccurred()) - - aeroCluster.Spec.Size = int32(len(nodes.Items) + 1) + aeroCluster.Spec.Size++ // scaleup, no need to wait for long - err = updateClusterWithTO(k8sClient, ctx, aeroCluster, time.Minute*2) + err = updateClusterWithTO(k8sClient, ctx, aeroCluster, time.Minute*1) Expect(err).To(HaveOccurred()) By("Scaling down the cluster size, failed pods should recover") aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) Expect(err).ToNot(HaveOccurred()) - aeroCluster.Spec.Size = int32(len(nodes.Items) - 1) + aeroCluster.Spec.Size-- - err = updateClusterWithTO(k8sClient, ctx, aeroCluster, time.Minute*10) + err = updateClusterWithTO(k8sClient, ctx, aeroCluster, time.Minute*2) Expect(err).ToNot(HaveOccurred()) }) }) diff --git a/test/utils.go b/test/utils.go index 6046917f2..07b885ac0 100644 --- a/test/utils.go +++ b/test/utils.go @@ -296,11 +296,13 @@ func isClusterStateValid( return false } - // Validate pods - if len(newCluster.Status.Pods) != replicas { - pkgLog.Info("Cluster status doesn't have pod status for all nodes. Cluster status may not have fully updated") - return false - } + // TODO: This is not valid for tests where maxUnavailablePods flag is used. + // We can take the param in func to skip this check + // // Validate pods + // if len(newCluster.Status.Pods) != replicas { + // pkgLog.Info("Cluster status doesn't have pod status for all nodes. Cluster status may not have fully updated") + // return false + // } for podName := range newCluster.Status.Pods { if newCluster.Status.Pods[podName].Aerospike.NodeID == "" {