diff --git a/api/v1/aerospikecluster_types.go b/api/v1/aerospikecluster_types.go index 028cb68ab..d6649870a 100644 --- a/api/v1/aerospikecluster_types.go +++ b/api/v1/aerospikecluster_types.go @@ -25,6 +25,21 @@ import ( lib "github.com/aerospike/aerospike-management-lib" ) +// +kubebuilder:validation:Enum=AerospikeClusterInProgress;AerospikeClusterCompleted;AerospikeClusterError +type AerospikeClusterPhase string + +// These are the valid phases of Aerospike cluster. +const ( + // AerospikeClusterInProgress means the Aerospike cluster operations are in-progress state. + // This phase denotes that changes are gradually rolling out to the cluster. + AerospikeClusterInProgress AerospikeClusterPhase = "InProgress" + // AerospikeClusterCompleted means the Aerospike cluster has been deployed/upgraded successfully and is ready to use. + AerospikeClusterCompleted AerospikeClusterPhase = "Completed" + // AerospikeClusterError means the Aerospike cluster is in error state because of some reason like misconfiguration, + // infra issues, etc. + AerospikeClusterError AerospikeClusterPhase = "Error" +) + // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. // AerospikeClusterSpec defines the desired state of AerospikeCluster @@ -660,7 +675,10 @@ type AerospikeClusterStatus struct { //nolint:govet // for readability // This is map instead of the conventional map as list convention to allow each pod to patch update its own // status. The map key is the name of the pod. // +patchStrategy=strategic - Pods map[string]AerospikePodStatus `json:"pods" patchStrategy:"strategic"` + Pods map[string]AerospikePodStatus `json:"pods,omitempty" patchStrategy:"strategic"` + + // Phase denotes the current phase of Aerospike cluster operation. + Phase AerospikeClusterPhase `json:"phase,omitempty"` } // AerospikeNetworkType specifies the type of network address to use. @@ -848,9 +866,10 @@ type AerospikePodStatus struct { //nolint:govet // for readability //+kubebuilder:storageversion // +kubebuilder:printcolumn:name="Size",type=string,JSONPath=`.spec.size` // +kubebuilder:printcolumn:name="Image",type=string,JSONPath=`.spec.image` -// +kubebuilder:printcolumn:name="MultiPodPerHost",type=boolean,JSONPath=`.spec.podSpec.MultiPodPerHost` +// +kubebuilder:printcolumn:name="MultiPodPerHost",type=boolean,JSONPath=`.spec.podSpec.multiPodPerHost` // +kubebuilder:printcolumn:name="HostNetwork",type=boolean,JSONPath=`.spec.podSpec.hostNetwork` // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" +// +kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase" // AerospikeCluster is the schema for the AerospikeCluster API // +operator-sdk:csv:customresourcedefinitions:displayName="Aerospike Cluster",resources={{Service, v1},{Pod,v1},{StatefulSet,v1}} diff --git a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml index fdd1e6820..d5b116add 100644 --- a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml +++ b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml @@ -21,7 +21,7 @@ spec: - jsonPath: .spec.image name: Image type: string - - jsonPath: .spec.podSpec.MultiPodPerHost + - jsonPath: .spec.podSpec.multiPodPerHost name: MultiPodPerHost type: boolean - jsonPath: .spec.podSpec.hostNetwork @@ -30,6 +30,9 @@ spec: - jsonPath: .metadata.creationTimestamp name: Age type: date + - jsonPath: .status.phase + name: Phase + type: string name: v1 schema: openAPIV3Schema: @@ -9012,6 +9015,14 @@ spec: list by the operator type: string type: object + phase: + description: Phase denotes the current phase of Aerospike cluster + operation. + enum: + - AerospikeClusterInProgress + - AerospikeClusterCompleted + - AerospikeClusterError + type: string podSpec: description: Additional configuration for create Aerospike pods. properties: @@ -17537,8 +17548,6 @@ spec: - skipWorkDirValidate - skipXdrDlogFileValidate type: object - required: - - pods type: object type: object served: true diff --git a/controllers/pod.go b/controllers/pod.go index 93ff94c55..859bb4989 100644 --- a/controllers/pod.go +++ b/controllers/pod.go @@ -171,7 +171,7 @@ func (r *SingleClusterReconciler) rollingRestartPods( rackState *RackState, podsToRestart []*corev1.Pod, ignorablePodNames sets.Set[string], restartTypeMap map[string]RestartType, ) reconcileResult { - failedPods, activePods := getFailedAndActivePods(podsToRestart, ignorablePodNames) + failedPods, activePods := getFailedAndActivePods(podsToRestart) // If already dead node (failed pod) then no need to check node safety, migration if len(failedPods) != 0 { @@ -377,13 +377,9 @@ func (r *SingleClusterReconciler) ensurePodsRunningAndReady(podsToCheck []*corev return reconcileRequeueAfter(10) } -func getFailedAndActivePods(pods []*corev1.Pod, ignorablePodNames sets.Set[string], -) (failedPods, activePods []*corev1.Pod) { +func getFailedAndActivePods(pods []*corev1.Pod) (failedPods, activePods []*corev1.Pod) { for idx := range pods { pod := pods[idx] - if ignorablePodNames.Has(pod.Name) { - continue - } if err := utils.CheckPodFailed(pod); err != nil { failedPods = append(failedPods, pod) @@ -396,10 +392,26 @@ func getFailedAndActivePods(pods []*corev1.Pod, ignorablePodNames sets.Set[strin return failedPods, activePods } +func getNonIgnorablePods(pods []*corev1.Pod, ignorablePodNames sets.Set[string], +) []*corev1.Pod { + nonIgnorablePods := make([]*corev1.Pod, 0, len(pods)) + + for idx := range pods { + pod := pods[idx] + if ignorablePodNames.Has(pod.Name) { + continue + } + + nonIgnorablePods = append(nonIgnorablePods, pod) + } + + return nonIgnorablePods +} + func (r *SingleClusterReconciler) safelyDeletePodsAndEnsureImageUpdated( rackState *RackState, podsToUpdate []*corev1.Pod, ignorablePodNames sets.Set[string], ) reconcileResult { - failedPods, activePods := getFailedAndActivePods(podsToUpdate, ignorablePodNames) + failedPods, activePods := getFailedAndActivePods(podsToUpdate) // If already dead node (failed pod) then no need to check node safety, migration if len(failedPods) != 0 { diff --git a/controllers/rack.go b/controllers/rack.go index 2748c59f9..d90085617 100644 --- a/controllers/rack.go +++ b/controllers/rack.go @@ -81,7 +81,9 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { ) } - failedPods, _ := getFailedAndActivePods(podList, ignorablePodNames) + failedPods, _ := getFailedAndActivePods(podList) + // remove ignorable pods from failedPods + failedPods = getNonIgnorablePods(failedPods, ignorablePodNames) if len(failedPods) != 0 { r.Log.Info("Reconcile the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", failedPods) @@ -107,7 +109,9 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { ) } - failedPods, _ = getFailedAndActivePods(podList, ignorablePodNames) + failedPods, _ = getFailedAndActivePods(podList) + // remove ignorable pods from failedPods + failedPods = getNonIgnorablePods(failedPods, ignorablePodNames) if len(failedPods) != 0 { r.Log.Info("Restart the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", failedPods) @@ -426,7 +430,7 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat } if r.aeroCluster.Spec.K8sNodeBlockList != nil { - found, res = r.handleK8sNodeBlockListPods(found, rackState, ignorablePodNames) + found, res = r.handleK8sNodeBlockListPods(found, rackState, ignorablePodNames, failedPods) if !res.isSuccess { return found, res } @@ -1069,7 +1073,7 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, } func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1.StatefulSet, rackState *RackState, - ignorablePodNames sets.Set[string], + ignorablePodNames sets.Set[string], failedPods []*corev1.Pod, ) (*appsv1.StatefulSet, reconcileResult) { if err := r.updateSTS(statefulSet, rackState); err != nil { return statefulSet, reconcileError( @@ -1077,13 +1081,27 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1 ) } - podList, err := r.getOrderedRackPodList(rackState.Rack.ID) - if err != nil { - return statefulSet, reconcileError(fmt.Errorf("failed to list pods: %v", err)) + var ( + podList []*corev1.Pod + err error + ) + + if len(failedPods) != 0 { + podList = failedPods + } else { + // List the pods for this aeroCluster's statefulset + podList, err = r.getOrderedRackPodList(rackState.Rack.ID) + if err != nil { + return statefulSet, reconcileError(fmt.Errorf("failed to list pods: %v", err)) + } } blockedK8sNodes := sets.NewString(r.aeroCluster.Spec.K8sNodeBlockList...) + var podsToRestart []*corev1.Pod + + restartTypeMap := make(map[string]RestartType) + for idx := range podList { pod := podList[idx] @@ -1091,19 +1109,35 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1 r.Log.Info("Pod found in blocked nodes list, migrating to a different node", "podName", pod.Name) - if res := r.waitForMultipleNodesSafeStopReady([]*corev1.Pod{pod}, ignorablePodNames); !res.isSuccess { - return statefulSet, res - } + podsToRestart = append(podsToRestart, pod) - restartTypeMap := map[string]RestartType{ + restartTypeMap = map[string]RestartType{ pod.Name: podRestart, } + } + } - if res := r.restartPods(rackState, []*corev1.Pod{pod}, restartTypeMap); !res.isSuccess { - return statefulSet, reconcileError(err) - } + podsBatchList := r.getPodsBatchToRestart(podsToRestart, len(podList)) + + // Restart batch of pods + if len(podsBatchList) > 0 { + // Handle one batch + podsBatch := podsBatchList[0] - // handle next pod on blocked node in subsequent Reconcile. + r.Log.Info( + "Calculated batch for Pod migration to different nodes", + "rackPodList", getPodNames(podList), + "rearrangedPods", getPodNames(podsToRestart), + "podsBatch", getPodNames(podsBatch), + "rollingUpdateBatchSize", r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, + ) + + if res := r.rollingRestartPods(rackState, podsBatch, ignorablePodNames, restartTypeMap); !res.isSuccess { + return statefulSet, res + } + + // Handle next batch in subsequent Reconcile. + if len(podsBatchList) > 1 { return statefulSet, reconcileRequeueAfter(1) } } diff --git a/controllers/reconciler.go b/controllers/reconciler.go index 54e40f877..b9da43092 100644 --- a/controllers/reconciler.go +++ b/controllers/reconciler.go @@ -69,7 +69,17 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { return reconcile.Result{}, nil } - // The cluster is not being deleted, add finalizer in not added already + // Set the status to AerospikeClusterInProgress before starting any operations + if r.aeroCluster.Status.Phase != asdbv1.AerospikeClusterInProgress { + r.aeroCluster.Status.Phase = asdbv1.AerospikeClusterInProgress + + if err := r.Client.Status().Update(context.Background(), r.aeroCluster); err != nil { + r.Log.Error(err, "Failed to set cluster status to AerospikeClusterInProgress") + return reconcile.Result{}, err + } + } + + // The cluster is not being deleted, add finalizer if not added already if err := r.addFinalizer(finalizerName); err != nil { r.Log.Error(err, "Failed to add finalizer") return reconcile.Result{}, err @@ -364,6 +374,7 @@ func (r *SingleClusterReconciler) updateStatus() error { } newAeroCluster.Status.AerospikeClusterStatusSpec = *specToStatus + newAeroCluster.Status.Phase = asdbv1.AerospikeClusterCompleted err = r.patchStatus(newAeroCluster) if err != nil { diff --git a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml index fdd1e6820..d5b116add 100644 --- a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml +++ b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml @@ -21,7 +21,7 @@ spec: - jsonPath: .spec.image name: Image type: string - - jsonPath: .spec.podSpec.MultiPodPerHost + - jsonPath: .spec.podSpec.multiPodPerHost name: MultiPodPerHost type: boolean - jsonPath: .spec.podSpec.hostNetwork @@ -30,6 +30,9 @@ spec: - jsonPath: .metadata.creationTimestamp name: Age type: date + - jsonPath: .status.phase + name: Phase + type: string name: v1 schema: openAPIV3Schema: @@ -9012,6 +9015,14 @@ spec: list by the operator type: string type: object + phase: + description: Phase denotes the current phase of Aerospike cluster + operation. + enum: + - AerospikeClusterInProgress + - AerospikeClusterCompleted + - AerospikeClusterError + type: string podSpec: description: Additional configuration for create Aerospike pods. properties: @@ -17537,8 +17548,6 @@ spec: - skipWorkDirValidate - skipXdrDlogFileValidate type: object - required: - - pods type: object type: object served: true diff --git a/test/cluster_helper.go b/test/cluster_helper.go index 58b9834fa..cc138f397 100644 --- a/test/cluster_helper.go +++ b/test/cluster_helper.go @@ -759,7 +759,7 @@ func deployClusterWithTO( if err != nil { return err } - // Wait for aerocluster to reach desired cluster size. + // Wait for aerocluster to reach the desired cluster size. return waitForAerospikeCluster( k8sClient, ctx, aeroCluster, int(aeroCluster.Spec.Size), retryInterval, timeout, diff --git a/test/cluster_test.go b/test/cluster_test.go index d8b2cfe01..afd16d76e 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -243,7 +243,7 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { Namespace: clusterNamespacedName.Namespace}, pod) Expect(err).ToNot(HaveOccurred()) - pod.Spec.Containers[0].Image = "wrong-image" + pod.Spec.Containers[0].Image = wrongImage err = k8sClient.Update(ctx, pod) Expect(err).ToNot(HaveOccurred()) @@ -282,7 +282,7 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { Namespace: clusterNamespacedName.Namespace}, pod) Expect(err).ToNot(HaveOccurred()) - pod.Spec.Containers[0].Image = "wrong-image" + pod.Spec.Containers[0].Image = wrongImage err = k8sClient.Update(ctx, pod) Expect(err).ToNot(HaveOccurred()) diff --git a/test/k8snode_block_list_test.go b/test/k8snode_block_list_test.go index 56e540407..635df6c6d 100644 --- a/test/k8snode_block_list_test.go +++ b/test/k8snode_block_list_test.go @@ -1 +1,207 @@ package test + +import ( + "context" + "fmt" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + + asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" +) + +const ( + wrongImage = "wrong-image" +) + +var _ = Describe( + "K8sNodeBlockList", func() { + ctx := context.TODO() + Context( + "Migrate pods from K8s blocked nodes", func() { + clusterName := "k8s-node-block-cluster" + clusterNamespacedName := getNamespacedName(clusterName, namespace) + podName := clusterName + "-0-0" + aeroCluster := &asdbv1.AerospikeCluster{} + oldK8sNode := "" + oldPvcInfo := make(map[string]types.UID) + + var err error + + BeforeEach( + func() { + aeroCluster = createDummyAerospikeCluster( + clusterNamespacedName, 3, + ) + aeroCluster.Spec.PodSpec.MultiPodPerHost = false + err = deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + pod := &corev1.Pod{} + err = k8sClient.Get(ctx, getNamespacedName(podName, namespace), pod) + Expect(err).ToNot(HaveOccurred()) + oldK8sNode = pod.Spec.NodeName + oldPvcInfo, err = extractPodPVC(pod) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + AfterEach( + func() { + err = deleteCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + It( + "Should migrate the pods from blocked nodes to other nodes", func() { + By("Blocking the k8s node") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.K8sNodeBlockList = []string{oldK8sNode} + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By("Verifying if the pod is migrated to other nodes and pod pvcs are not deleted") + validatePodAndPVCMigration(ctx, podName, oldK8sNode, oldPvcInfo, true) + }, + ) + + It( + "Should migrate the pods from blocked nodes to other nodes along with rolling "+ + "restart", func() { + By("Blocking the k8s node and updating aerospike config") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.K8sNodeBlockList = []string{oldK8sNode} + aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = + defaultProtofdmax + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By("Verifying if the pod is migrated to other nodes and pod pvcs are not deleted") + validatePodAndPVCMigration(ctx, podName, oldK8sNode, oldPvcInfo, true) + }, + ) + + It( + "Should migrate the pods from blocked nodes to other nodes along with upgrade", func() { + By("Blocking the k8s node and updating aerospike image") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.K8sNodeBlockList = []string{oldK8sNode} + aeroCluster.Spec.Image = availableImage2 + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By("Verifying if the pod is migrated to other nodes and pod pvcs are not deleted") + validatePodAndPVCMigration(ctx, podName, oldK8sNode, oldPvcInfo, true) + }, + ) + + It( + "Should migrate the pods from blocked nodes to other nodes and delete corresponding"+ + "local PVCs", func() { + By("Blocking the k8s node") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.K8sNodeBlockList = []string{oldK8sNode} + aeroCluster.Spec.Storage.LocalStorageClasses = []string{storageClass} + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By("Verifying if the pod is migrated to other nodes and pod local pvcs are deleted") + validatePodAndPVCMigration(ctx, podName, oldK8sNode, oldPvcInfo, false) + }, + ) + + It( + "Should migrate the failed pods from blocked nodes to other nodes with maxIgnorablePod", func() { + By(fmt.Sprintf("Fail %s aerospike pod", podName)) + pod := &corev1.Pod{} + err := k8sClient.Get(ctx, types.NamespacedName{Name: podName, + Namespace: clusterNamespacedName.Namespace}, pod) + Expect(err).ToNot(HaveOccurred()) + + pod.Spec.Containers[0].Image = wrongImage + err = k8sClient.Update(ctx, pod) + Expect(err).ToNot(HaveOccurred()) + + By("Blocking the k8s node and setting maxIgnorablePod to 1") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + maxIgnorablePods := intstr.FromInt(1) + aeroCluster.Spec.RackConfig.MaxIgnorablePods = &maxIgnorablePods + aeroCluster.Spec.K8sNodeBlockList = []string{oldK8sNode} + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By("Verifying if the failed pod is migrated to other nodes and pod pvcs are not deleted") + validatePodAndPVCMigration(ctx, podName, oldK8sNode, oldPvcInfo, true) + }, + ) + }, + ) + }, +) + +func extractPodPVC(pod *corev1.Pod) (map[string]types.UID, error) { + pvcUIDMap := make(map[string]types.UID) + + for idx := range pod.Spec.Volumes { + if pod.Spec.Volumes[idx].PersistentVolumeClaim != nil { + pvcUIDMap[pod.Spec.Volumes[idx].PersistentVolumeClaim.ClaimName] = "" + } + } + + for p := range pvcUIDMap { + pvc := &corev1.PersistentVolumeClaim{} + if err := k8sClient.Get(context.TODO(), getNamespacedName(p, pod.Namespace), pvc); err != nil { + return nil, err + } + + pvcUIDMap[p] = pvc.UID + } + + return pvcUIDMap, nil +} + +func validatePVCDeletion(ctx context.Context, pvcUIDMap map[string]types.UID, shouldDelete bool) error { + pvc := &corev1.PersistentVolumeClaim{} + + for pvcName, pvcUID := range pvcUIDMap { + pvcNamespacesName := getNamespacedName( + pvcName, namespace, + ) + + if err := k8sClient.Get(ctx, pvcNamespacesName, pvc); err != nil { + return err + } + + if shouldDelete && pvc.UID != pvcUID { + return fmt.Errorf("PVC %s is unintentionally deleted", pvcName) + } + + if !shouldDelete && pvc.UID == pvcUID { + return fmt.Errorf("PVC %s is not deleted", pvcName) + } + } + + return nil +} + +func validatePodAndPVCMigration(ctx context.Context, podName, oldK8sNode string, + oldPvcInfo map[string]types.UID, shouldDelete bool) { + pod := &corev1.Pod{} + err := k8sClient.Get(ctx, getNamespacedName(podName, namespace), pod) + Expect(err).ToNot(HaveOccurred()) + Expect(pod.Spec.NodeName).ToNot(Equal(oldK8sNode)) + + err = validatePVCDeletion(ctx, oldPvcInfo, shouldDelete) + Expect(err).ToNot(HaveOccurred()) +} diff --git a/test/utils.go b/test/utils.go index 07b885ac0..feac18cbd 100644 --- a/test/utils.go +++ b/test/utils.go @@ -325,6 +325,11 @@ func isClusterStateValid( return false } + if newCluster.Status.Phase != asdbv1.AerospikeClusterCompleted { + pkgLog.Info("Cluster phase is not set to Completed") + return false + } + return true }