diff --git a/api/v1/aerospikecluster_mutating_webhook.go b/api/v1/aerospikecluster_mutating_webhook.go index 4dc2f371c..085c7baec 100644 --- a/api/v1/aerospikecluster_mutating_webhook.go +++ b/api/v1/aerospikecluster_mutating_webhook.go @@ -25,6 +25,7 @@ import ( "gomodules.xyz/jsonpatch/v2" v1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/webhook" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" @@ -69,6 +70,12 @@ func (c *AerospikeCluster) Default(operation v1.Operation) admission.Response { } func (c *AerospikeCluster) setDefaults(asLog logr.Logger) error { + // Set maxUnavailable default to 1 + if c.Spec.MaxUnavailable == nil { + maxUnavailable := intstr.FromInt(1) + c.Spec.MaxUnavailable = &maxUnavailable + } + // Set network defaults c.Spec.AerospikeNetworkPolicy.setDefaults(c.ObjectMeta.Namespace) @@ -504,7 +511,7 @@ func setDefaultNetworkConf( ) } // Override these sections - // TODO: These values lines will be replaces with runtime info by script in init-container + // TODO: These values lines will be replaced with runtime info by akoinit binary in init-container // See if we can get better way to make template serviceDefaults := map[string]interface{}{} srvPort := GetServicePort(configSpec) diff --git a/api/v1/aerospikecluster_types.go b/api/v1/aerospikecluster_types.go index 0d1150be6..90c23d689 100644 --- a/api/v1/aerospikecluster_types.go +++ b/api/v1/aerospikecluster_types.go @@ -25,6 +25,28 @@ import ( lib "github.com/aerospike/aerospike-management-lib" ) +// +kubebuilder:validation:Enum=InProgress;Completed;Error +type AerospikeClusterPhase string + +// These are the valid phases of Aerospike cluster. +const ( + // AerospikeClusterInProgress means the Aerospike cluster CR is being reconciled and operations are in-progress state. + // This phase denotes that changes are gradually rolling out to the cluster. + // For example, when the Aerospike server version is upgraded in CR, then InProgress phase is set until the upgrade + // is completed. + AerospikeClusterInProgress AerospikeClusterPhase = "InProgress" + // AerospikeClusterCompleted means the Aerospike cluster CR has been reconciled. This phase denotes that the cluster + // has been deployed/upgraded successfully and is ready to use. + // For example, when the Aerospike server version is upgraded in CR, then Completed phase is set after the upgrade is + // completed. + AerospikeClusterCompleted AerospikeClusterPhase = "Completed" + // AerospikeClusterError means the Aerospike cluster operation is in error state because of some reason like + // misconfiguration, infra issues, etc. + // For example, when the Aerospike server version is upgraded in CR, then Error phase is set if the upgrade fails + // due to the wrong image issue, etc. + AerospikeClusterError AerospikeClusterPhase = "Error" +) + // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. // AerospikeClusterSpec defines the desired state of AerospikeCluster @@ -37,6 +59,11 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability // Aerospike cluster size // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Cluster Size" Size int32 `json:"size"` + // MaxUnavailable is the percentage/number of pods that can be allowed to go down or unavailable before application + // disruption. This value is used to create PodDisruptionBudget. Defaults to 1. + // Refer Aerospike documentation for more details. + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Max Unavailable" + MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"` // Aerospike server image // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Server Image" Image string `json:"image"` @@ -74,6 +101,12 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability // RosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Roster Node BlockList" RosterNodeBlockList []string `json:"rosterNodeBlockList,omitempty"` + // K8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods. Pods are not scheduled on + // these nodes. Pods are migrated from these nodes if already present. This is useful for the maintenance of + // Kubernetes nodes. + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Kubernetes Node BlockList" + // +kubebuilder:validation:MinItems:=1 + K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"` } type SeedsFinderServices struct { @@ -564,9 +597,12 @@ type AerospikeStorageSpec struct { //nolint:govet // for readability // BlockVolumePolicy contains default policies for block volumes. BlockVolumePolicy AerospikePersistentVolumePolicySpec `json:"blockVolumePolicy,omitempty"` - // CleanupThreads contains maximum number of cleanup threads(dd or blkdiscard) per init container. + // CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. CleanupThreads int `json:"cleanupThreads,omitempty"` + // LocalStorageClasses contains a list of storage classes which provisions local volumes. + LocalStorageClasses []string `json:"localStorageClasses,omitempty"` + // Volumes list to attach to created pods. // +patchMergeKey=name // +patchStrategy=merge @@ -578,10 +614,12 @@ type AerospikeStorageSpec struct { //nolint:govet // for readability // AerospikeClusterStatusSpec captures the current status of the cluster. type AerospikeClusterStatusSpec struct { //nolint:govet // for readability // Aerospike cluster size - // +operator-sdk:csv:customresourcedefinitions:type=status,displayName="Cluster Size" Size int32 `json:"size,omitempty"` // Aerospike server image Image string `json:"image,omitempty"` + // MaxUnavailable is the percentage/number of pods that can be allowed to go down or unavailable before application + // disruption. This value is used to create PodDisruptionBudget. Defaults to 1. + MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"` // If set true then multiple pods can be created per Kubernetes Node. // This will create a NodePort service for each Pod. // NodePort, as the name implies, opens a specific port on all the Kubernetes Nodes , @@ -627,6 +665,8 @@ type AerospikeClusterStatusSpec struct { //nolint:govet // for readability SeedsFinderServices SeedsFinderServices `json:"seedsFinderServices,omitempty"` // RosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup RosterNodeBlockList []string `json:"rosterNodeBlockList,omitempty"` + // K8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods. + K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"` } // AerospikeClusterStatus defines the observed state of AerospikeCluster @@ -646,7 +686,11 @@ type AerospikeClusterStatus struct { //nolint:govet // for readability // This is map instead of the conventional map as list convention to allow each pod to patch update its own // status. The map key is the name of the pod. // +patchStrategy=strategic + // +optional Pods map[string]AerospikePodStatus `json:"pods" patchStrategy:"strategic"` + + // Phase denotes the current phase of Aerospike cluster operation. + Phase AerospikeClusterPhase `json:"phase,omitempty"` } // AerospikeNetworkType specifies the type of network address to use. @@ -837,9 +881,10 @@ type AerospikePodStatus struct { //nolint:govet // for readability // +kubebuilder:storageversion // +kubebuilder:printcolumn:name="Size",type=string,JSONPath=`.spec.size` // +kubebuilder:printcolumn:name="Image",type=string,JSONPath=`.spec.image` -// +kubebuilder:printcolumn:name="MultiPodPerHost",type=boolean,JSONPath=`.spec.podSpec.MultiPodPerHost` +// +kubebuilder:printcolumn:name="MultiPodPerHost",type=boolean,JSONPath=`.spec.podSpec.multiPodPerHost` // +kubebuilder:printcolumn:name="HostNetwork",type=boolean,JSONPath=`.spec.podSpec.hostNetwork` // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" +// +kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase" // AerospikeCluster is the schema for the AerospikeCluster API // +operator-sdk:csv:customresourcedefinitions:displayName="Aerospike Cluster",resources={{Service, v1},{Pod,v1},{StatefulSet,v1}} @@ -873,6 +918,7 @@ func CopySpecToStatus(spec *AerospikeClusterSpec) (*AerospikeClusterStatusSpec, status.Size = spec.Size status.Image = spec.Image + status.MaxUnavailable = spec.MaxUnavailable // Storage statusStorage := AerospikeStorageSpec{} @@ -953,6 +999,16 @@ func CopySpecToStatus(spec *AerospikeClusterSpec) (*AerospikeClusterStatusSpec, status.RosterNodeBlockList = rosterNodeBlockList } + if len(spec.K8sNodeBlockList) != 0 { + var k8sNodeBlockList []string + + lib.DeepCopy( + &k8sNodeBlockList, &spec.K8sNodeBlockList, + ) + + status.K8sNodeBlockList = k8sNodeBlockList + } + return &status, nil } @@ -962,6 +1018,7 @@ func CopyStatusToSpec(status *AerospikeClusterStatusSpec) (*AerospikeClusterSpec spec.Size = status.Size spec.Image = status.Image + spec.MaxUnavailable = status.MaxUnavailable // Storage specStorage := AerospikeStorageSpec{} @@ -1043,5 +1100,15 @@ func CopyStatusToSpec(status *AerospikeClusterStatusSpec) (*AerospikeClusterSpec spec.RosterNodeBlockList = rosterNodeBlockList } + if len(status.K8sNodeBlockList) != 0 { + var k8sNodeBlockList []string + + lib.DeepCopy( + &k8sNodeBlockList, &status.K8sNodeBlockList, + ) + + spec.K8sNodeBlockList = k8sNodeBlockList + } + return &spec, nil } diff --git a/api/v1/aerospikecluster_validating_webhook.go b/api/v1/aerospikecluster_validating_webhook.go index 8dec672f2..4c6676635 100644 --- a/api/v1/aerospikecluster_validating_webhook.go +++ b/api/v1/aerospikecluster_validating_webhook.go @@ -160,6 +160,11 @@ func (c *AerospikeCluster) validate(aslog logr.Logger) error { return fmt.Errorf("invalid cluster size 0") } + // Validate MaxUnavailable for PodDisruptionBudget + if err := c.validateMaxUnavailable(); err != nil { + return err + } + // Validate Image version version, err := GetImageVersion(c.Spec.Image) if err != nil { @@ -2170,3 +2175,57 @@ func validateIntOrStringField(value *intstr.IntOrString, fieldPath string) error return nil } + +func (c *AerospikeCluster) validateMaxUnavailable() error { + // safe check for corner cases when mutation webhook somehow didn't work + if c.Spec.MaxUnavailable == nil { + return fmt.Errorf("maxUnavailable cannot be nil. Mutation webhook didn't work") + } + + if err := validateIntOrStringField(c.Spec.MaxUnavailable, "spec.maxUnavailable"); err != nil { + return err + } + + safeMaxUnavailable := int(c.Spec.Size) + + // If Size is 1, then ignore it for maxUnavailable calculation as it will anyway result in data loss + if safeMaxUnavailable == 1 { + return nil + } + + for idx := range c.Spec.RackConfig.Racks { + rack := &c.Spec.RackConfig.Racks[idx] + nsList := rack.AerospikeConfig.Value["namespaces"].([]interface{}) + + for _, nsInterface := range nsList { + rfInterface, exists := nsInterface.(map[string]interface{})["replication-factor"] + if !exists { + // Default RF is 2 if not given + safeMaxUnavailable = 2 + continue + } + + rf, err := GetIntType(rfInterface) + if err != nil { + return fmt.Errorf("namespace replication-factor %v", err) + } + + // If RF is 1, then ignore it for maxUnavailable calculation as it will anyway result in data loss + if rf == 1 { + continue + } + + if rf < safeMaxUnavailable { + safeMaxUnavailable = rf + } + } + } + + if c.Spec.MaxUnavailable.IntValue() >= safeMaxUnavailable { + return fmt.Errorf("maxUnavailable %s cannot be greater than or equal to %v as it may result in "+ + "data loss. Set it to a lower value", + c.Spec.MaxUnavailable.String(), safeMaxUnavailable) + } + + return nil +} diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 6696bf022..489353971 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -154,6 +154,11 @@ func (in *AerospikeClusterList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AerospikeClusterSpec) DeepCopyInto(out *AerospikeClusterSpec) { *out = *in + if in.MaxUnavailable != nil { + in, out := &in.MaxUnavailable, &out.MaxUnavailable + *out = new(intstr.IntOrString) + **out = **in + } in.Storage.DeepCopyInto(&out.Storage) if in.AerospikeAccessControl != nil { in, out := &in.AerospikeAccessControl, &out.AerospikeAccessControl @@ -183,6 +188,11 @@ func (in *AerospikeClusterSpec) DeepCopyInto(out *AerospikeClusterSpec) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.K8sNodeBlockList != nil { + in, out := &in.K8sNodeBlockList, &out.K8sNodeBlockList + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AerospikeClusterSpec. @@ -221,6 +231,11 @@ func (in *AerospikeClusterStatus) DeepCopy() *AerospikeClusterStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AerospikeClusterStatusSpec) DeepCopyInto(out *AerospikeClusterStatusSpec) { *out = *in + if in.MaxUnavailable != nil { + in, out := &in.MaxUnavailable, &out.MaxUnavailable + *out = new(intstr.IntOrString) + **out = **in + } in.Storage.DeepCopyInto(&out.Storage) if in.AerospikeAccessControl != nil { in, out := &in.AerospikeAccessControl, &out.AerospikeAccessControl @@ -255,6 +270,11 @@ func (in *AerospikeClusterStatusSpec) DeepCopyInto(out *AerospikeClusterStatusSp *out = make([]string, len(*in)) copy(*out, *in) } + if in.K8sNodeBlockList != nil { + in, out := &in.K8sNodeBlockList, &out.K8sNodeBlockList + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AerospikeClusterStatusSpec. @@ -652,6 +672,11 @@ func (in *AerospikeStorageSpec) DeepCopyInto(out *AerospikeStorageSpec) { *out = *in in.FileSystemVolumePolicy.DeepCopyInto(&out.FileSystemVolumePolicy) in.BlockVolumePolicy.DeepCopyInto(&out.BlockVolumePolicy) + if in.LocalStorageClasses != nil { + in, out := &in.LocalStorageClasses, &out.LocalStorageClasses + *out = make([]string, len(*in)) + copy(*out, *in) + } if in.Volumes != nil { in, out := &in.Volumes, &out.Volumes *out = make([]VolumeSpec, len(*in)) diff --git a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml index 070d4e753..757c0d37a 100644 --- a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml +++ b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml @@ -22,7 +22,7 @@ spec: - jsonPath: .spec.image name: Image type: string - - jsonPath: .spec.podSpec.MultiPodPerHost + - jsonPath: .spec.podSpec.multiPodPerHost name: MultiPodPerHost type: boolean - jsonPath: .spec.podSpec.hostNetwork @@ -31,6 +31,9 @@ spec: - jsonPath: .metadata.creationTimestamp name: Age type: date + - jsonPath: .status.phase + name: Phase + type: string name: v1 schema: openAPIV3Schema: @@ -281,6 +284,24 @@ spec: image: description: Aerospike server image type: string + k8sNodeBlockList: + description: K8sNodeBlockList is a list of Kubernetes nodes which + are not used for Aerospike pods. Pods are not scheduled on these + nodes. Pods are migrated from these nodes if already present. This + is useful for the maintenance of Kubernetes nodes. + items: + type: string + minItems: 1 + type: array + maxUnavailable: + anyOf: + - type: integer + - type: string + description: MaxUnavailable is the percentage/number of pods that + can be allowed to go down or unavailable before application disruption. + This value is used to create PodDisruptionBudget. Defaults to 1. + Refer Aerospike documentation for more details. + x-kubernetes-int-or-string: true operatorClientCert: description: Certificates to connect to Aerospike. properties: @@ -6174,7 +6195,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -6229,6 +6250,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of + storage classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -8064,7 +8091,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -8119,6 +8146,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of + storage classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -8768,7 +8801,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number of cleanup + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -8823,6 +8856,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of storage classes + which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -9552,6 +9591,20 @@ spec: image: description: Aerospike server image type: string + k8sNodeBlockList: + description: K8sNodeBlockList is a list of Kubernetes nodes which + are not used for Aerospike pods. + items: + type: string + type: array + maxUnavailable: + anyOf: + - type: integer + - type: string + description: MaxUnavailable is the percentage/number of pods that + can be allowed to go down or unavailable before application disruption. + This value is used to create PodDisruptionBudget. Defaults to 1. + x-kubernetes-int-or-string: true multiPodPerHost: description: "If set true then multiple pods can be created per Kubernetes Node. This will create a NodePort service for each Pod. NodePort, @@ -9611,6 +9664,14 @@ spec: list by the operator type: string type: object + phase: + description: Phase denotes the current phase of Aerospike cluster + operation. + enum: + - InProgress + - Completed + - Error + type: string podSpec: description: Additional configuration for create Aerospike pods. properties: @@ -15578,7 +15639,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -15633,6 +15694,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of + storage classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -17468,7 +17535,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -17523,6 +17590,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of + storage classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -18223,7 +18296,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number of cleanup + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -18278,6 +18351,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of storage classes + which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -18767,8 +18846,6 @@ spec: - skipWorkDirValidate - skipXdrDlogFileValidate type: object - required: - - pods type: object type: object served: true diff --git a/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml b/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml index ed26895a5..e9cec80a0 100644 --- a/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml +++ b/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml @@ -50,6 +50,15 @@ spec: - description: Aerospike server image displayName: Server Image path: image + - description: K8sNodeBlockList is a list of Kubernetes nodes which are not + used for Aerospike pods. + displayName: Kubernetes Node BlockList + path: k8sNodeBlockList + - description: MaxUnavailable is the percentage/number of pods that can be allowed + to go down or unavailable before application disruption. This value is used + to create PodDisruptionBudget. Defaults to 1. + displayName: Max Unavailable + path: maxUnavailable - description: Certificates to connect to Aerospike. displayName: Operator Client Cert path: operatorClientCert @@ -78,10 +87,6 @@ spec: resource. displayName: Validation Policy path: validationPolicy - statusDescriptors: - - description: Aerospike cluster size - displayName: Cluster Size - path: size version: v1 - description: AerospikeCluster is the schema for the AerospikeCluster API displayName: Aerospike Cluster diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 981820e2c..4cd176f8d 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -111,3 +111,12 @@ rules: - patch - update - watch +- apiGroups: + - policy + resources: + - poddisruptionbudgets + verbs: + - create + - get + - patch + - update diff --git a/controllers/aero_info_calls.go b/controllers/aero_info_calls.go index 093da495b..4290a5d06 100644 --- a/controllers/aero_info_calls.go +++ b/controllers/aero_info_calls.go @@ -42,7 +42,7 @@ func (r *SingleClusterReconciler) waitForMultipleNodesSafeStopReady( return reconcileSuccess() } - // Remove a node only if cluster is stable + // Remove a node only if the cluster is stable if err := r.waitForAllSTSToBeReady(ignorablePodNames); err != nil { return reconcileError(fmt.Errorf("failed to wait for cluster to be ready: %v", err)) } diff --git a/controllers/aerospikecluster_controller.go b/controllers/aerospikecluster_controller.go index 2c2f8f441..701e9b9a4 100644 --- a/controllers/aerospikecluster_controller.go +++ b/controllers/aerospikecluster_controller.go @@ -86,6 +86,7 @@ type RackState struct { // +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=secrets,verbs=get // +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=get;create;update;patch //nolint:lll // marker // +kubebuilder:rbac:groups=asdb.aerospike.com,resources=aerospikeclusters,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=asdb.aerospike.com,resources=aerospikeclusters/status,verbs=get;update;patch diff --git a/controllers/pod.go b/controllers/pod.go index 1244e6f3e..859bb4989 100644 --- a/controllers/pod.go +++ b/controllers/pod.go @@ -63,6 +63,7 @@ func (r *SingleClusterReconciler) getRollingRestartTypeMap( return nil, err } + blockedK8sNodes := sets.NewString(r.aeroCluster.Spec.K8sNodeBlockList...) requiredConfHash := confMap.Data[aerospikeConfHashFileName] for idx := range pods { @@ -70,6 +71,15 @@ func (r *SingleClusterReconciler) getRollingRestartTypeMap( continue } + if blockedK8sNodes.Has(pods[idx].Spec.NodeName) { + r.Log.Info("Pod found in blocked nodes list, will be migrated to a different node", + "podName", pods[idx].Name) + + restartTypeMap[pods[idx].Name] = podRestart + + continue + } + podStatus := r.aeroCluster.Status.Pods[pods[idx].Name] if addedNSDevices == nil && podStatus.AerospikeConfigHash != requiredConfHash { // Fetching all block devices that have been added in namespaces. @@ -147,6 +157,13 @@ func (r *SingleClusterReconciler) getRollingRestartTypePod( r.Log.Info("Aerospike rack storage changed. Need rolling restart") } + if r.isReadinessPortUpdated(pod) { + restartType = mergeRestartType(restartType, podRestart) + + r.Log.Info("Aerospike readiness tcp port changed. Need rolling restart", + "newTCPPort", r.getReadinessProbe().TCPSocket.String()) + } + return restartType } @@ -154,7 +171,7 @@ func (r *SingleClusterReconciler) rollingRestartPods( rackState *RackState, podsToRestart []*corev1.Pod, ignorablePodNames sets.Set[string], restartTypeMap map[string]RestartType, ) reconcileResult { - failedPods, activePods := getFailedAndActivePods(podsToRestart, ignorablePodNames) + failedPods, activePods := getFailedAndActivePods(podsToRestart) // If already dead node (failed pod) then no need to check node safety, migration if len(failedPods) != 0 { @@ -253,6 +270,7 @@ func (r *SingleClusterReconciler) restartPods( } restartedPods := make([]*corev1.Pod, 0, len(podsToRestart)) + blockedK8sNodes := sets.NewString(r.aeroCluster.Spec.K8sNodeBlockList...) for idx := range podsToRestart { pod := podsToRestart[idx] @@ -260,12 +278,21 @@ func (r *SingleClusterReconciler) restartPods( restartType := restartTypeMap[pod.Name] if restartType == quickRestart { - // If ASD restart fails then go ahead and restart the pod + // If ASD restart fails, then go ahead and restart the pod if err := r.restartASDInPod(rackState, pod); err == nil { continue } } + if blockedK8sNodes.Has(pod.Spec.NodeName) { + r.Log.Info("Pod found in blocked nodes list, deleting corresponding local PVCs if any", + "podName", pod.Name) + + if err := r.deleteLocalPVCs(rackState, pod); err != nil { + return reconcileError(err) + } + } + if err := r.Client.Delete(context.TODO(), pod); err != nil { r.Log.Error(err, "Failed to delete pod") return reconcileError(err) @@ -350,13 +377,9 @@ func (r *SingleClusterReconciler) ensurePodsRunningAndReady(podsToCheck []*corev return reconcileRequeueAfter(10) } -func getFailedAndActivePods(pods []*corev1.Pod, ignorablePodNames sets.Set[string], -) (failedPods, activePods []*corev1.Pod) { +func getFailedAndActivePods(pods []*corev1.Pod) (failedPods, activePods []*corev1.Pod) { for idx := range pods { pod := pods[idx] - if ignorablePodNames.Has(pod.Name) { - continue - } if err := utils.CheckPodFailed(pod); err != nil { failedPods = append(failedPods, pod) @@ -369,10 +392,26 @@ func getFailedAndActivePods(pods []*corev1.Pod, ignorablePodNames sets.Set[strin return failedPods, activePods } +func getNonIgnorablePods(pods []*corev1.Pod, ignorablePodNames sets.Set[string], +) []*corev1.Pod { + nonIgnorablePods := make([]*corev1.Pod, 0, len(pods)) + + for idx := range pods { + pod := pods[idx] + if ignorablePodNames.Has(pod.Name) { + continue + } + + nonIgnorablePods = append(nonIgnorablePods, pod) + } + + return nonIgnorablePods +} + func (r *SingleClusterReconciler) safelyDeletePodsAndEnsureImageUpdated( rackState *RackState, podsToUpdate []*corev1.Pod, ignorablePodNames sets.Set[string], ) reconcileResult { - failedPods, activePods := getFailedAndActivePods(podsToUpdate, ignorablePodNames) + failedPods, activePods := getFailedAndActivePods(podsToUpdate) // If already dead node (failed pod) then no need to check node safety, migration if len(failedPods) != 0 { @@ -407,16 +446,27 @@ func (r *SingleClusterReconciler) deletePodAndEnsureImageUpdated( return reconcileError(err) } + blockedK8sNodes := sets.NewString(r.aeroCluster.Spec.K8sNodeBlockList...) + // Delete pods - for _, p := range podsToUpdate { - if err := r.Client.Delete(context.TODO(), p); err != nil { + for _, pod := range podsToUpdate { + if blockedK8sNodes.Has(pod.Spec.NodeName) { + r.Log.Info("Pod found in blocked nodes list, deleting corresponding local PVCs if any", + "podName", pod.Name) + + if err := r.deleteLocalPVCs(rackState, pod); err != nil { + return reconcileError(err) + } + } + + if err := r.Client.Delete(context.TODO(), pod); err != nil { return reconcileError(err) } - r.Log.V(1).Info("Pod deleted", "podName", p.Name) + r.Log.V(1).Info("Pod deleted", "podName", pod.Name) r.Recorder.Eventf( r.aeroCluster, corev1.EventTypeNormal, "PodWaitUpdate", - "[rack-%d] Waiting to update Pod %s", rackState.Rack.ID, p.Name, + "[rack-%d] Waiting to update Pod %s", rackState.Rack.ID, pod.Name, ) } diff --git a/controllers/poddistruptionbudget.go b/controllers/poddistruptionbudget.go new file mode 100644 index 000000000..4d0bb2065 --- /dev/null +++ b/controllers/poddistruptionbudget.go @@ -0,0 +1,106 @@ +package controllers + +import ( + "context" + "fmt" + + v1 "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" + "github.com/aerospike/aerospike-kubernetes-operator/pkg/utils" +) + +func (r *SingleClusterReconciler) createOrUpdatePDB() error { + podList, err := r.getClusterPodList() + if err != nil { + return err + } + + for podIdx := range podList.Items { + pod := &podList.Items[podIdx] + + for containerIdx := range pod.Spec.Containers { + if pod.Spec.Containers[containerIdx].Name != asdbv1.AerospikeServerContainerName { + continue + } + + if pod.Spec.Containers[containerIdx].ReadinessProbe == nil { + r.Log.Info("Pod found without ReadinessProbe, skipping PodDisruptionBudget. Refer Aerospike "+ + "documentation for more details.", "name", pod.Name) + return nil + } + } + } + + ls := utils.LabelsForAerospikeCluster(r.aeroCluster.Name) + pdb := &v1.PodDisruptionBudget{} + + if err := r.Client.Get( + context.TODO(), types.NamespacedName{ + Name: r.aeroCluster.Name, Namespace: r.aeroCluster.Namespace, + }, pdb, + ); err != nil { + if !errors.IsNotFound(err) { + return err + } + + r.Log.Info("Create PodDisruptionBudget") + + pdb.SetName(r.aeroCluster.Name) + pdb.SetNamespace(r.aeroCluster.Namespace) + pdb.SetLabels(ls) + pdb.Spec.MaxUnavailable = r.aeroCluster.Spec.MaxUnavailable + pdb.Spec.Selector = &metav1.LabelSelector{ + MatchLabels: ls, + } + + // Set AerospikeCluster instance as the owner and controller + err = controllerutil.SetControllerReference( + r.aeroCluster, pdb, r.Scheme, + ) + if err != nil { + return err + } + + if err = r.Client.Create( + context.TODO(), pdb, createOption, + ); err != nil { + return fmt.Errorf( + "failed to create PodDisruptionBudget: %v", + err, + ) + } + + r.Log.Info("Created new PodDisruptionBudget", "name", + utils.NamespacedName(r.aeroCluster.Namespace, r.aeroCluster.Name)) + + return nil + } + + r.Log.Info( + "PodDisruptionBudget already exist. Updating existing PodDisruptionBudget if required", "name", + utils.NamespacedName(r.aeroCluster.Namespace, r.aeroCluster.Name), + ) + + if pdb.Spec.MaxUnavailable.String() != r.aeroCluster.Spec.MaxUnavailable.String() { + pdb.Spec.MaxUnavailable = r.aeroCluster.Spec.MaxUnavailable + + if err := r.Client.Update( + context.TODO(), pdb, updateOption, + ); err != nil { + return fmt.Errorf( + "failed to update PodDisruptionBudget: %v", + err, + ) + } + + r.Log.Info("Updated PodDisruptionBudget", "name", + utils.NamespacedName(r.aeroCluster.Namespace, r.aeroCluster.Name)) + } + + return nil +} diff --git a/controllers/pvc.go b/controllers/pvc.go index f354ce77e..7a316e617 100644 --- a/controllers/pvc.go +++ b/controllers/pvc.go @@ -6,6 +6,7 @@ import ( "time" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "sigs.k8s.io/controller-runtime/pkg/client" @@ -103,6 +104,34 @@ func (r *SingleClusterReconciler) removePVCsAsync( return deletedPVCs, nil } +// deleteLocalPVCs deletes PVCs which are created using local storage classes +// It considers the user given LocalStorageClasses list from spec to determine if a PVC is local or not. +func (r *SingleClusterReconciler) deleteLocalPVCs(rackState *RackState, pod *corev1.Pod) error { + pvcItems, err := r.getPodsPVCList([]string{pod.Name}, rackState.Rack.ID) + if err != nil { + return fmt.Errorf("could not find pvc for pod %v: %v", pod.Name, err) + } + + for idx := range pvcItems { + pvcStorageClass := pvcItems[idx].Spec.StorageClassName + if pvcStorageClass == nil { + r.Log.Info("PVC does not have storageClass set, no need to delete PVC", "pvcName", pvcItems[idx].Name) + + continue + } + + if utils.ContainsString(rackState.Rack.Storage.LocalStorageClasses, *pvcStorageClass) { + if err := r.Client.Delete(context.TODO(), &pvcItems[idx]); err != nil && !errors.IsNotFound(err) { + return fmt.Errorf( + "could not delete pvc %s: %v", pvcItems[idx].Name, err, + ) + } + } + } + + return nil +} + func (r *SingleClusterReconciler) waitForPVCTermination(deletedPVCs []corev1.PersistentVolumeClaim) error { if len(deletedPVCs) == 0 { return nil diff --git a/controllers/rack.go b/controllers/rack.go index 66e269349..fae60b782 100644 --- a/controllers/rack.go +++ b/controllers/rack.go @@ -81,7 +81,9 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { ) } - failedPods, _ := getFailedAndActivePods(podList, ignorablePodNames) + failedPods, _ := getFailedAndActivePods(podList) + // remove ignorable pods from failedPods + failedPods = getNonIgnorablePods(failedPods, ignorablePodNames) if len(failedPods) != 0 { r.Log.Info("Reconcile the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", failedPods) @@ -107,7 +109,9 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { ) } - failedPods, _ = getFailedAndActivePods(podList, ignorablePodNames) + failedPods, _ = getFailedAndActivePods(podList) + // remove ignorable pods from failedPods + failedPods = getNonIgnorablePods(failedPods, ignorablePodNames) if len(failedPods) != 0 { r.Log.Info("Restart the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", failedPods) @@ -350,7 +354,7 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat // Always update configMap. We won't be able to find if a rack's config, and it's pod config is in sync or not // Checking rack.spec, rack.status will not work. // We may change config, let some pods restart with new config and then change config back to original value. - // Now rack.spec, rack.status will be same, but few pods will have changed config. + // Now rack.spec, rack.status will be the same, but few pods will have changed config. // So a check based on spec and status will skip configMap update. // Hence, a rolling restart of pod will never bring pod to desired config if err := r.updateSTSConfigMap( @@ -420,7 +424,15 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat } if r.aeroCluster.Spec.RackConfig.MaxIgnorablePods != nil { - if res := r.handleNSOrDeviceRemovalForIgnorablePods(rackState, ignorablePodNames); !res.isSuccess { + if res = r.handleNSOrDeviceRemovalForIgnorablePods(rackState, ignorablePodNames); !res.isSuccess { + return found, res + } + } + + // handle k8sNodeBlockList pods only if it is changed + if !reflect.DeepEqual(r.aeroCluster.Spec.K8sNodeBlockList, r.aeroCluster.Status.K8sNodeBlockList) { + found, res = r.handleK8sNodeBlockListPods(found, rackState, ignorablePodNames, failedPods) + if !res.isSuccess { return found, res } } @@ -502,9 +514,9 @@ func (r *SingleClusterReconciler) reconcileRack( } if failedPods == nil { - // revert migrate-fill-delay to original value if it was set to 0 during scale down - // Reset will be done if there is Scale down or Rack redistribution - // This check won't cover scenario where scale down operation was done and then reverted to previous value + // Revert migrate-fill-delay to original value if it was set to 0 during scale down. + // Reset will be done if there is scale-down or Rack redistribution. + // This check won't cover a scenario where scale-down operation was done and then reverted to previous value // before the scale down could complete. if (r.aeroCluster.Status.Size > r.aeroCluster.Spec.Size) || (!r.IsStatusEmpty() && len(r.aeroCluster.Status.RackConfig.Racks) != len(r.aeroCluster.Spec.RackConfig.Racks)) { @@ -1061,6 +1073,77 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, return found, reconcileSuccess() } +func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1.StatefulSet, rackState *RackState, + ignorablePodNames sets.Set[string], failedPods []*corev1.Pod, +) (*appsv1.StatefulSet, reconcileResult) { + if err := r.updateSTS(statefulSet, rackState); err != nil { + return statefulSet, reconcileError( + fmt.Errorf("k8s node block list processing failed: %v", err), + ) + } + + var ( + podList []*corev1.Pod + err error + ) + + if len(failedPods) != 0 { + podList = failedPods + } else { + // List the pods for this aeroCluster's statefulset + podList, err = r.getOrderedRackPodList(rackState.Rack.ID) + if err != nil { + return statefulSet, reconcileError(fmt.Errorf("failed to list pods: %v", err)) + } + } + + blockedK8sNodes := sets.NewString(r.aeroCluster.Spec.K8sNodeBlockList...) + + var podsToRestart []*corev1.Pod + + restartTypeMap := make(map[string]RestartType) + + for idx := range podList { + pod := podList[idx] + + if blockedK8sNodes.Has(pod.Spec.NodeName) { + r.Log.Info("Pod found in blocked nodes list, migrating to a different node", + "podName", pod.Name) + + podsToRestart = append(podsToRestart, pod) + + restartTypeMap[pod.Name] = podRestart + } + } + + podsBatchList := r.getPodsBatchToRestart(podsToRestart, len(podList)) + + // Restart batch of pods + if len(podsBatchList) > 0 { + // Handle one batch + podsBatch := podsBatchList[0] + + r.Log.Info( + "Calculated batch for Pod migration to different nodes", + "rackPodList", getPodNames(podList), + "rearrangedPods", getPodNames(podsToRestart), + "podsBatch", getPodNames(podsBatch), + "rollingUpdateBatchSize", r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, + ) + + if res := r.rollingRestartPods(rackState, podsBatch, ignorablePodNames, restartTypeMap); !res.isSuccess { + return statefulSet, res + } + + // Handle next batch in subsequent Reconcile. + if len(podsBatchList) > 1 { + return statefulSet, reconcileRequeueAfter(1) + } + } + + return statefulSet, reconcileSuccess() +} + func (r *SingleClusterReconciler) needRollingRestartRack(rackState *RackState, ignorablePodNames sets.Set[string]) ( needRestart bool, restartTypeMap map[string]RestartType, err error, ) { diff --git a/controllers/reconciler.go b/controllers/reconciler.go index bb9d24c51..8a0fa6184 100644 --- a/controllers/reconciler.go +++ b/controllers/reconciler.go @@ -39,13 +39,25 @@ type SingleClusterReconciler struct { Log logr.Logger } -func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { +func (r *SingleClusterReconciler) Reconcile() (result ctrl.Result, recErr error) { r.Log.V(1).Info( "AerospikeCluster", "Spec", r.aeroCluster.Spec, "Status", r.aeroCluster.Status, ) - // Check DeletionTimestamp to see if cluster is being deleted + // Set the status phase to Error if the recErr is not nil + // recErr is only set when reconcile failure should result in Error phase of the cluster + defer func() { + if recErr != nil { + r.Log.Error(recErr, "Reconcile failed") + + if err := r.setStatusPhase(asdbv1.AerospikeClusterError); err != nil { + recErr = err + } + } + }() + + // Check DeletionTimestamp to see if the cluster is being deleted if !r.aeroCluster.ObjectMeta.DeletionTimestamp.IsZero() { r.Log.V(1).Info("Deleting AerospikeCluster") // The cluster is being deleted @@ -69,7 +81,12 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { return reconcile.Result{}, nil } - // The cluster is not being deleted, add finalizer in not added already + // Set the status to AerospikeClusterInProgress before starting any operations + if err := r.setStatusPhase(asdbv1.AerospikeClusterInProgress); err != nil { + return reconcile.Result{}, err + } + + // The cluster is not being deleted, add finalizer if not added already if err := r.addFinalizer(finalizerName); err != nil { r.Log.Error(err, "Failed to add finalizer") return reconcile.Result{}, err @@ -97,7 +114,9 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { r.aeroCluster.Namespace, r.aeroCluster.Name, ) - return reconcile.Result{}, err + recErr = err + + return reconcile.Result{}, recErr } if r.aeroCluster.Status.Pods != nil && r.enablingSecurity() { @@ -136,9 +155,24 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { "Failed to reconcile Racks for cluster %s/%s", r.aeroCluster.Namespace, r.aeroCluster.Name, ) + + recErr = res.err } - return res.getResult() + return res.result, recErr + } + + if err := r.createOrUpdatePDB(); err != nil { + r.Log.Error(err, "Failed to create PodDisruptionBudget") + r.Recorder.Eventf( + r.aeroCluster, corev1.EventTypeWarning, "PodDisruptionBudgetCreateFailed", + "Failed to create PodDisruptionBudget %s/%s", + r.aeroCluster.Namespace, r.aeroCluster.Name, + ) + + recErr = err + + return reconcile.Result{}, recErr } if err := r.createOrUpdateSTSLoadBalancerSvc(); err != nil { @@ -149,7 +183,9 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { r.aeroCluster.Namespace, r.aeroCluster.Name, ) - return reconcile.Result{}, err + recErr = err + + return reconcile.Result{}, recErr } ignorablePodNames, err := r.getIgnorablePods(nil, getConfiguredRackStateList(r.aeroCluster)) @@ -177,12 +213,15 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { r.getClientPolicy(), allHostConns, ); err != nil { r.Log.Error(err, "Failed to check for Quiesced nodes") - return reconcile.Result{}, err + + recErr = err + + return reconcile.Result{}, recErr } // Setup access control. // Assuming all pods must be security enabled or disabled. - if err := r.validateAndReconcileAccessControl(nil, ignorablePodNames); err != nil { + if err = r.validateAndReconcileAccessControl(nil, ignorablePodNames); err != nil { r.Log.Error(err, "Failed to Reconcile access control") r.Recorder.Eventf( r.aeroCluster, corev1.EventTypeWarning, "ACLUpdateFailed", @@ -190,7 +229,9 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { r.aeroCluster.Name, ) - return reconcile.Result{}, err + recErr = err + + return reconcile.Result{}, recErr } // Use policy from spec after setting up access control @@ -204,25 +245,32 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { false, ignorablePodNames, ); !res.isSuccess { r.Log.Error(res.err, "Failed to revert migrate-fill-delay") - return reconcile.Result{}, res.err + + recErr = res.err + + return reconcile.Result{}, recErr } if asdbv1.IsClusterSCEnabled(r.aeroCluster) { if !r.IsStatusEmpty() { if res := r.waitForClusterStability(policy, allHostConns); !res.isSuccess { - return res.result, res.err + recErr = res.err + + return res.result, recErr } } // Setup roster - if err := r.getAndSetRoster(policy, r.aeroCluster.Spec.RosterNodeBlockList, ignorablePodNames); err != nil { + if err = r.getAndSetRoster(policy, r.aeroCluster.Spec.RosterNodeBlockList, ignorablePodNames); err != nil { r.Log.Error(err, "Failed to set roster for cluster") - return reconcile.Result{}, err + recErr = err + + return reconcile.Result{}, recErr } } // Update the AerospikeCluster status. - if err := r.updateStatus(); err != nil { + if err = r.updateStatus(); err != nil { r.Log.Error(err, "Failed to update AerospikeCluster status") r.Recorder.Eventf( r.aeroCluster, corev1.EventTypeWarning, "StatusUpdateFailed", @@ -329,7 +377,6 @@ func (r *SingleClusterReconciler) validateAndReconcileAccessControl(selectedPods // Create policy using status, status has current connection info clientPolicy := r.getClientPolicy() - aeroClient, err := as.NewClientWithPolicyAndHost(clientPolicy, hosts...) if err != nil { @@ -396,6 +443,7 @@ func (r *SingleClusterReconciler) updateStatus() error { } newAeroCluster.Status.AerospikeClusterStatusSpec = *specToStatus + newAeroCluster.Status.Phase = asdbv1.AerospikeClusterCompleted err = r.patchStatus(newAeroCluster) if err != nil { @@ -409,6 +457,19 @@ func (r *SingleClusterReconciler) updateStatus() error { return nil } +func (r *SingleClusterReconciler) setStatusPhase(phase asdbv1.AerospikeClusterPhase) error { + if r.aeroCluster.Status.Phase != phase { + r.aeroCluster.Status.Phase = phase + + if err := r.Client.Status().Update(context.Background(), r.aeroCluster); err != nil { + r.Log.Error(err, fmt.Sprintf("Failed to set cluster status to %s", phase)) + return err + } + } + + return nil +} + func (r *SingleClusterReconciler) updateAccessControlStatus() error { if r.aeroCluster.Spec.AerospikeAccessControl == nil { return nil diff --git a/controllers/statefulset.go b/controllers/statefulset.go index b8a21c7c8..f726fe49b 100644 --- a/controllers/statefulset.go +++ b/controllers/statefulset.go @@ -15,7 +15,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -162,9 +164,9 @@ func (r *SingleClusterReconciler) createSTS( Ports: ports, Env: envVarList, VolumeMounts: getDefaultAerospikeContainerVolumeMounts(), + ReadinessProbe: r.getReadinessProbe(), }, }, - Volumes: getDefaultSTSVolumes(r.aeroCluster, rackState), }, }, @@ -201,6 +203,51 @@ func (r *SingleClusterReconciler) createSTS( return r.getSTS(rackState) } +func (r *SingleClusterReconciler) getReadinessProbe() *corev1.Probe { + var readinessPort *int + + if _, tlsPort := asdbv1.GetTLSNameAndPort(r.aeroCluster.Spec.AerospikeConfig, asdbv1.ServicePortName); tlsPort != nil { + readinessPort = tlsPort + } else { + readinessPort = asdbv1.GetServicePort(r.aeroCluster.Spec.AerospikeConfig) + } + + return &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{ + Type: intstr.Int, + IntVal: int32(*readinessPort), + }, + }, + }, + InitialDelaySeconds: 2, + PeriodSeconds: 5, + } +} + +func (r *SingleClusterReconciler) isReadinessPortUpdated(pod *corev1.Pod) bool { + for idx := range pod.Spec.Containers { + container := &pod.Spec.Containers[idx] + + if container.Name != asdbv1.AerospikeServerContainerName { + continue + } + + // ignore if readiness probe is not set. Avoid rolling restart for old versions of operator + if container.ReadinessProbe == nil { + return false + } + + if container.ReadinessProbe.TCPSocket != nil && + container.ReadinessProbe.TCPSocket.String() != r.getReadinessProbe().TCPSocket.String() { + return true + } + } + + return false +} + func (r *SingleClusterReconciler) deleteSTS(st *appsv1.StatefulSet) error { r.Log.Info("Delete statefulset") // No need to do cleanup pods after deleting sts @@ -596,17 +643,25 @@ func (r *SingleClusterReconciler) updateSTS( // Container. r.updateContainerImages(statefulSet) + // Updates the readiness probe TCP Port if changed for the aerospike server container + r.updateReadinessProbe(statefulSet) + // This should be called before updating storage r.initializeSTSStorage(statefulSet, rackState) // TODO: Add validation. device, file, both should not exist in same storage class r.updateSTSStorage(statefulSet, rackState) - // Save the updated stateful set. - // Can we optimize this? Update stateful set only if there is any change - // in it. - err := r.Client.Update(context.TODO(), statefulSet, updateOption) - if err != nil { + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + found, err := r.getSTS(rackState) + if err != nil { + return err + } + + // Save the updated stateful set. + found.Spec = statefulSet.Spec + return r.Client.Update(context.TODO(), found, updateOption) + }); err != nil { return fmt.Errorf( "failed to update StatefulSet %s: %v", statefulSet.Name, @@ -870,6 +925,16 @@ func (r *SingleClusterReconciler) updateSTSSchedulingPolicy( ) } + if len(r.aeroCluster.Spec.K8sNodeBlockList) > 0 { + matchExpressions = append( + matchExpressions, corev1.NodeSelectorRequirement{ + Key: "kubernetes.io/hostname", + Operator: corev1.NodeSelectorOpNotIn, + Values: r.aeroCluster.Spec.K8sNodeBlockList, + }, + ) + } + if len(matchExpressions) != 0 { if affinity.NodeAffinity == nil { affinity.NodeAffinity = &corev1.NodeAffinity{} @@ -1095,6 +1160,20 @@ func (r *SingleClusterReconciler) updateContainerImages(statefulset *appsv1.Stat updateImage(statefulset.Spec.Template.Spec.InitContainers) } +func (r *SingleClusterReconciler) updateReadinessProbe(statefulSet *appsv1.StatefulSet) { + for idx := range statefulSet.Spec.Template.Spec.Containers { + container := &statefulSet.Spec.Template.Spec.Containers[idx] + + if container.Name != asdbv1.AerospikeServerContainerName { + continue + } + + container.ReadinessProbe = r.getReadinessProbe() + + break + } +} + func (r *SingleClusterReconciler) updateAerospikeInitContainerImage(statefulSet *appsv1.StatefulSet) error { for idx := range statefulSet.Spec.Template.Spec.InitContainers { container := &statefulSet.Spec.Template.Spec.InitContainers[idx] @@ -1463,8 +1542,18 @@ func getSTSContainerPort( multiPodPerHost bool, aeroConf *asdbv1.AerospikeConfigSpec, ) []corev1.ContainerPort { ports := make([]corev1.ContainerPort, 0, len(defaultContainerPorts)) + portNames := make([]string, 0, len(defaultContainerPorts)) + + // Sorting defaultContainerPorts to fetch map in ordered manner. + // Helps reduce unnecessary sts object updates. + for portName := range defaultContainerPorts { + portNames = append(portNames, portName) + } + + sort.Strings(portNames) - for portName, portInfo := range defaultContainerPorts { + for _, portName := range portNames { + portInfo := defaultContainerPorts[portName] configPort := asdbv1.GetPortFromConfig( aeroConf, portInfo.connectionType, portInfo.configParam, ) diff --git a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml index 070d4e753..757c0d37a 100644 --- a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml +++ b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml @@ -22,7 +22,7 @@ spec: - jsonPath: .spec.image name: Image type: string - - jsonPath: .spec.podSpec.MultiPodPerHost + - jsonPath: .spec.podSpec.multiPodPerHost name: MultiPodPerHost type: boolean - jsonPath: .spec.podSpec.hostNetwork @@ -31,6 +31,9 @@ spec: - jsonPath: .metadata.creationTimestamp name: Age type: date + - jsonPath: .status.phase + name: Phase + type: string name: v1 schema: openAPIV3Schema: @@ -281,6 +284,24 @@ spec: image: description: Aerospike server image type: string + k8sNodeBlockList: + description: K8sNodeBlockList is a list of Kubernetes nodes which + are not used for Aerospike pods. Pods are not scheduled on these + nodes. Pods are migrated from these nodes if already present. This + is useful for the maintenance of Kubernetes nodes. + items: + type: string + minItems: 1 + type: array + maxUnavailable: + anyOf: + - type: integer + - type: string + description: MaxUnavailable is the percentage/number of pods that + can be allowed to go down or unavailable before application disruption. + This value is used to create PodDisruptionBudget. Defaults to 1. + Refer Aerospike documentation for more details. + x-kubernetes-int-or-string: true operatorClientCert: description: Certificates to connect to Aerospike. properties: @@ -6174,7 +6195,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -6229,6 +6250,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of + storage classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -8064,7 +8091,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -8119,6 +8146,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of + storage classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -8768,7 +8801,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number of cleanup + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -8823,6 +8856,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of storage classes + which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -9552,6 +9591,20 @@ spec: image: description: Aerospike server image type: string + k8sNodeBlockList: + description: K8sNodeBlockList is a list of Kubernetes nodes which + are not used for Aerospike pods. + items: + type: string + type: array + maxUnavailable: + anyOf: + - type: integer + - type: string + description: MaxUnavailable is the percentage/number of pods that + can be allowed to go down or unavailable before application disruption. + This value is used to create PodDisruptionBudget. Defaults to 1. + x-kubernetes-int-or-string: true multiPodPerHost: description: "If set true then multiple pods can be created per Kubernetes Node. This will create a NodePort service for each Pod. NodePort, @@ -9611,6 +9664,14 @@ spec: list by the operator type: string type: object + phase: + description: Phase denotes the current phase of Aerospike cluster + operation. + enum: + - InProgress + - Completed + - Error + type: string podSpec: description: Additional configuration for create Aerospike pods. properties: @@ -15578,7 +15639,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -15633,6 +15694,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of + storage classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -17468,7 +17535,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -17523,6 +17590,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of + storage classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -18223,7 +18296,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number of cleanup + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -18278,6 +18351,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of storage classes + which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -18767,8 +18846,6 @@ spec: - skipWorkDirValidate - skipXdrDlogFileValidate type: object - required: - - pods type: object type: object served: true diff --git a/helm-charts/aerospike-kubernetes-operator/templates/aerospike-operator-manager-clusterrole.yaml b/helm-charts/aerospike-kubernetes-operator/templates/aerospike-operator-manager-clusterrole.yaml index c379e7f0e..adf721767 100644 --- a/helm-charts/aerospike-kubernetes-operator/templates/aerospike-operator-manager-clusterrole.yaml +++ b/helm-charts/aerospike-kubernetes-operator/templates/aerospike-operator-manager-clusterrole.yaml @@ -116,4 +116,13 @@ rules: - patch - update - watch +- apiGroups: + - policy + resources: + - poddisruptionbudgets + verbs: + - create + - get + - patch + - update {{- end }} diff --git a/pkg/utils/pod.go b/pkg/utils/pod.go index b7c687dd8..629f6c9b2 100644 --- a/pkg/utils/pod.go +++ b/pkg/utils/pod.go @@ -212,7 +212,8 @@ func isPodError(reason string) bool { func IsPodReasonUnschedulable(pod *corev1.Pod) bool { for _, condition := range pod.Status.Conditions { - if condition.Type == corev1.PodScheduled && condition.Reason == corev1.PodReasonUnschedulable { + if condition.Type == corev1.PodScheduled && (condition.Reason == corev1.PodReasonUnschedulable || + condition.Reason == corev1.PodReasonSchedulerError) { return true } } diff --git a/test/cluster_helper.go b/test/cluster_helper.go index 5ea880937..c9199ae79 100644 --- a/test/cluster_helper.go +++ b/test/cluster_helper.go @@ -16,6 +16,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/client" @@ -85,6 +86,12 @@ func rollingRestartClusterByEnablingTLS( return err } + // Port should be changed to service tls-port + err = validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceTLSPort) + if err != nil { + return err + } + network := aeroCluster.Spec.AerospikeConfig.Value["network"].(map[string]interface{}) serviceNetwork := network[asdbv1.ServicePortName].(map[string]interface{}) fabricNetwork := network[asdbv1.FabricPortName].(map[string]interface{}) @@ -99,7 +106,17 @@ func rollingRestartClusterByEnablingTLS( network[asdbv1.HeartbeatPortName] = heartbeartNetwork aeroCluster.Spec.AerospikeConfig.Value["network"] = network - return updateCluster(k8sClient, ctx, aeroCluster) + err = updateCluster(k8sClient, ctx, aeroCluster) + if err != nil { + return err + } + + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + if err != nil { + return err + } + + return validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceTLSPort) } func rollingRestartClusterByDisablingTLS( @@ -128,10 +145,27 @@ func rollingRestartClusterByDisablingTLS( return err } + // port should remain same i.e. service tls-port + err = validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceTLSPort) + if err != nil { + return err + } + aeroCluster.Spec.AerospikeConfig.Value["network"] = getNetworkConfig() aeroCluster.Spec.OperatorClientCertSpec = nil - return updateCluster(k8sClient, ctx, aeroCluster) + err = updateCluster(k8sClient, ctx, aeroCluster) + if err != nil { + return err + } + + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + if err != nil { + return err + } + + // Port should be updated to service non-tls port + return validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceNonTLSPort) } func scaleUpClusterTestWithNSDeviceHandling( @@ -541,6 +575,35 @@ func validateMigrateFillDelay( return err } +// validate readiness port +func validateReadinessProbe(ctx goctx.Context, k8sClient client.Client, aeroCluster *asdbv1.AerospikeCluster, + requiredPort int) error { + for podName := range aeroCluster.Status.Pods { + pod := &corev1.Pod{} + if err := k8sClient.Get(ctx, types.NamespacedName{ + Namespace: aeroCluster.Namespace, + Name: podName, + }, pod); err != nil { + return err + } + + for idx := range pod.Spec.Containers { + container := &pod.Spec.Containers[idx] + + if container.Name != asdbv1.AerospikeServerContainerName { + continue + } + + if !reflect.DeepEqual(container.ReadinessProbe.TCPSocket.Port, intstr.FromInt(requiredPort)) { + return fmt.Errorf("readiness tcp port mismatch, expected: %v, found: %v", + intstr.FromInt(requiredPort), container.ReadinessProbe.TCPSocket.Port) + } + } + } + + return nil +} + func validateDirtyVolumes( ctx goctx.Context, k8sClient client.Client, clusterNamespacedName types.NamespacedName, expectedVolumes []string, @@ -695,7 +758,7 @@ func deployClusterWithTO( if err != nil { return err } - // Wait for aerocluster to reach desired cluster size. + // Wait for aerocluster to reach the desired cluster size. return waitForAerospikeCluster( k8sClient, ctx, aeroCluster, int(aeroCluster.Spec.Size), retryInterval, timeout, diff --git a/test/cluster_test.go b/test/cluster_test.go index ff5997c95..f28ec7709 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -173,13 +173,19 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { It( "Should allow cluster operations with pending pod", func() { By("Set MaxIgnorablePod and Rolling restart cluster") - aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) - Expect(err).ToNot(HaveOccurred()) - val := intstr.FromInt(1) - aeroCluster.Spec.RackConfig.MaxIgnorablePods = &val - aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = int64(18000) - err = updateCluster(k8sClient, ctx, aeroCluster) - Expect(err).ToNot(HaveOccurred()) + + // As pod is in pending state, CR object will be updated continuously + // This is put in eventually to retry Object Conflict error + Eventually(func() error { + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + val := intstr.FromInt(1) + aeroCluster.Spec.RackConfig.MaxIgnorablePods = &val + aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = + int64(18000) + + return updateCluster(k8sClient, ctx, aeroCluster) + }, 1*time.Minute).ShouldNot(HaveOccurred()) By("Upgrade version") aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) @@ -243,7 +249,7 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { Namespace: clusterNamespacedName.Namespace}, pod) Expect(err).ToNot(HaveOccurred()) - pod.Spec.Containers[0].Image = "wrong-image" + pod.Spec.Containers[0].Image = wrongImage err = k8sClient.Update(ctx, pod) Expect(err).ToNot(HaveOccurred()) @@ -283,7 +289,7 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { Namespace: clusterNamespacedName.Namespace}, pod) Expect(err).ToNot(HaveOccurred()) - pod.Spec.Containers[0].Image = "wrong-image" + pod.Spec.Containers[0].Image = wrongImage err = k8sClient.Update(ctx, pod) Expect(err).ToNot(HaveOccurred()) @@ -375,6 +381,13 @@ func DeployClusterForAllImagesPost490(ctx goctx.Context) { err = deployCluster(k8sClient, ctx, aeroCluster) Expect(err).ToNot(HaveOccurred()) + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + By("Validating Readiness probe") + err = validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceTLSPort) + Expect(err).ToNot(HaveOccurred()) + _ = deleteCluster(k8sClient, ctx, aeroCluster) }, ) diff --git a/test/k8snode_block_list_test.go b/test/k8snode_block_list_test.go new file mode 100644 index 000000000..9580576cb --- /dev/null +++ b/test/k8snode_block_list_test.go @@ -0,0 +1,233 @@ +package test + +import ( + "context" + "fmt" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + + asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" +) + +const ( + wrongImage = "wrong-image" +) + +var _ = Describe( + "K8sNodeBlockList", func() { + ctx := context.TODO() + Context( + "Migrate pods from K8s blocked nodes", func() { + clusterName := "k8s-node-block-cluster" + clusterNamespacedName := getNamespacedName(clusterName, namespace) + podName := clusterName + "-2-0" + aeroCluster := &asdbv1.AerospikeCluster{} + oldK8sNode := "" + oldPvcInfo := make(map[string]types.UID) + + var ( + err error + zones []string + ) + + BeforeEach( + func() { + aeroCluster = createDummyAerospikeCluster( + clusterNamespacedName, 3, + ) + + // Zones are set to distribute the pods across different zone nodes. + zones, err = getZones(ctx, k8sClient) + Expect(err).ToNot(HaveOccurred()) + + zone1 := zones[0] + zone2 := zones[0] + if len(zones) > 1 { + zone2 = zones[1] + } + + batchSize := intstr.FromString("100%") + rackConf := asdbv1.RackConfig{ + Racks: []asdbv1.Rack{ + {ID: 1, Zone: zone1}, + {ID: 2, Zone: zone2}, + }, + RollingUpdateBatchSize: &batchSize, + Namespaces: []string{"test"}, + } + + aeroCluster.Spec.RackConfig = rackConf + + aeroCluster.Spec.PodSpec.MultiPodPerHost = false + err = deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + pod := &corev1.Pod{} + err = k8sClient.Get(ctx, getNamespacedName(podName, namespace), pod) + Expect(err).ToNot(HaveOccurred()) + oldK8sNode = pod.Spec.NodeName + oldPvcInfo, err = extractPodPVC(pod) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + AfterEach( + func() { + err = deleteCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + It( + "Should migrate the pods from blocked nodes to other nodes", func() { + By("Blocking the k8s node") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.K8sNodeBlockList = []string{oldK8sNode} + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By("Verifying if the pod is migrated to other nodes and pod pvcs are not deleted") + validatePodAndPVCMigration(ctx, podName, oldK8sNode, oldPvcInfo, true) + }, + ) + + It( + "Should migrate the pods from blocked nodes to other nodes along with rolling "+ + "restart", func() { + By("Blocking the k8s node and updating aerospike config") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.K8sNodeBlockList = []string{oldK8sNode} + aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = + defaultProtofdmax + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By("Verifying if the pod is migrated to other nodes and pod pvcs are not deleted") + validatePodAndPVCMigration(ctx, podName, oldK8sNode, oldPvcInfo, true) + }, + ) + + It( + "Should migrate the pods from blocked nodes to other nodes along with upgrade", func() { + By("Blocking the k8s node and updating aerospike image") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.K8sNodeBlockList = []string{oldK8sNode} + aeroCluster.Spec.Image = availableImage2 + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By("Verifying if the pod is migrated to other nodes and pod pvcs are not deleted") + validatePodAndPVCMigration(ctx, podName, oldK8sNode, oldPvcInfo, true) + }, + ) + + It( + "Should migrate the pods from blocked nodes to other nodes and delete corresponding"+ + "local PVCs", func() { + By("Blocking the k8s node") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.K8sNodeBlockList = []string{oldK8sNode} + aeroCluster.Spec.Storage.LocalStorageClasses = []string{storageClass} + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By("Verifying if the pod is migrated to other nodes and pod local pvcs are deleted") + validatePodAndPVCMigration(ctx, podName, oldK8sNode, oldPvcInfo, false) + }, + ) + + It( + "Should migrate the failed pods from blocked nodes to other nodes with maxIgnorablePod", func() { + By(fmt.Sprintf("Fail %s aerospike pod", podName)) + pod := &corev1.Pod{} + err := k8sClient.Get(ctx, types.NamespacedName{Name: podName, + Namespace: clusterNamespacedName.Namespace}, pod) + Expect(err).ToNot(HaveOccurred()) + + pod.Spec.Containers[0].Image = wrongImage + err = k8sClient.Update(ctx, pod) + Expect(err).ToNot(HaveOccurred()) + + By("Blocking the k8s node and setting maxIgnorablePod to 1") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + maxIgnorablePods := intstr.FromInt32(1) + aeroCluster.Spec.RackConfig.MaxIgnorablePods = &maxIgnorablePods + aeroCluster.Spec.K8sNodeBlockList = []string{oldK8sNode} + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By("Verifying if the failed pod is migrated to other nodes and pod pvcs are not deleted") + validatePodAndPVCMigration(ctx, podName, oldK8sNode, oldPvcInfo, true) + }, + ) + }, + ) + }, +) + +func extractPodPVC(pod *corev1.Pod) (map[string]types.UID, error) { + pvcUIDMap := make(map[string]types.UID) + + for idx := range pod.Spec.Volumes { + if pod.Spec.Volumes[idx].PersistentVolumeClaim != nil { + pvcUIDMap[pod.Spec.Volumes[idx].PersistentVolumeClaim.ClaimName] = "" + } + } + + for p := range pvcUIDMap { + pvc := &corev1.PersistentVolumeClaim{} + if err := k8sClient.Get(context.TODO(), getNamespacedName(p, pod.Namespace), pvc); err != nil { + return nil, err + } + + pvcUIDMap[p] = pvc.UID + } + + return pvcUIDMap, nil +} + +func validatePVCDeletion(ctx context.Context, pvcUIDMap map[string]types.UID, shouldDelete bool) error { + pvc := &corev1.PersistentVolumeClaim{} + + for pvcName, pvcUID := range pvcUIDMap { + pvcNamespacesName := getNamespacedName( + pvcName, namespace, + ) + + if err := k8sClient.Get(ctx, pvcNamespacesName, pvc); err != nil { + return err + } + + if shouldDelete && pvc.UID != pvcUID { + return fmt.Errorf("PVC %s is unintentionally deleted", pvcName) + } + + if !shouldDelete && pvc.UID == pvcUID { + return fmt.Errorf("PVC %s is not deleted", pvcName) + } + } + + return nil +} + +func validatePodAndPVCMigration(ctx context.Context, podName, oldK8sNode string, + oldPvcInfo map[string]types.UID, shouldDelete bool) { + pod := &corev1.Pod{} + err := k8sClient.Get(ctx, getNamespacedName(podName, namespace), pod) + Expect(err).ToNot(HaveOccurred()) + Expect(pod.Spec.NodeName).ToNot(Equal(oldK8sNode)) + + err = validatePVCDeletion(ctx, oldPvcInfo, shouldDelete) + Expect(err).ToNot(HaveOccurred()) +} diff --git a/test/large_reconcile_test.go b/test/large_reconcile_test.go index 2944e7f44..6e59732b0 100644 --- a/test/large_reconcile_test.go +++ b/test/large_reconcile_test.go @@ -73,15 +73,17 @@ var _ = Describe( err = k8sClient.Update(goctx.TODO(), aeroCluster) Expect(err).ToNot(HaveOccurred()) - // Change size to 4 immediately - aeroCluster, err = getCluster( - k8sClient, ctx, clusterNamespacedName, - ) - Expect(err).ToNot(HaveOccurred()) + // This is put in eventually to retry Object Conflict error and change size to 4 immediately + Eventually(func() error { + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) - aeroCluster.Spec.Size = 4 - err = k8sClient.Update(goctx.TODO(), aeroCluster) - Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.Size = 4 + + return k8sClient.Update(goctx.TODO(), aeroCluster) + }, 1*time.Minute).ShouldNot(HaveOccurred()) // Cluster size should never go below 4, // as only one node is removed at a time and before reducing 2nd node, we changed the size to 4 @@ -109,14 +111,16 @@ var _ = Describe( Expect(err).ToNot(HaveOccurred()) // Change config back to original value - aeroCluster, err = getCluster( - k8sClient, ctx, clusterNamespacedName, - ) - Expect(err).ToNot(HaveOccurred()) + Eventually(func() error { + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) - aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = defaultProtofdmax - err = k8sClient.Update(goctx.TODO(), aeroCluster) - Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = defaultProtofdmax + + return k8sClient.Update(goctx.TODO(), aeroCluster) + }, 1*time.Minute).ShouldNot(HaveOccurred()) // Cluster status should never get updated with old conf "tempConf" err = waitForClusterRollingRestart( @@ -142,14 +146,16 @@ var _ = Describe( Expect(err).ToNot(HaveOccurred()) // Change build back to original - aeroCluster, err = getCluster( - k8sClient, ctx, clusterNamespacedName, - ) - Expect(err).ToNot(HaveOccurred()) - err = UpdateClusterImage(aeroCluster, latestImage) - Expect(err).ToNot(HaveOccurred()) - err = k8sClient.Update(goctx.TODO(), aeroCluster) - Expect(err).ToNot(HaveOccurred()) + Eventually(func() error { + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + err = UpdateClusterImage(aeroCluster, latestImage) + Expect(err).ToNot(HaveOccurred()) + return k8sClient.Update(goctx.TODO(), aeroCluster) + }, 1*time.Minute).ShouldNot(HaveOccurred()) // Only 1 pod need upgrade err = waitForClusterUpgrade( diff --git a/test/poddisruptionbudget_test.go b/test/poddisruptionbudget_test.go new file mode 100644 index 000000000..a864fbade --- /dev/null +++ b/test/poddisruptionbudget_test.go @@ -0,0 +1,97 @@ +package test + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + policyv1 "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + + asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" +) + +var _ = Describe( + "PodDisruptionBudget", func() { + ctx := context.TODO() + aeroCluster := &asdbv1.AerospikeCluster{} + maxAvailable := intstr.FromInt(0) + clusterNamespacedName := getNamespacedName("pdb-test-cluster", namespace) + + BeforeEach(func() { + aeroCluster = createDummyAerospikeCluster( + clusterNamespacedName, 2, + ) + }) + + AfterEach(func() { + Expect(deleteCluster(k8sClient, ctx, aeroCluster)).NotTo(HaveOccurred()) + }) + + Context("Valid Operations", func() { + It("Validate create PDB with default maxUnavailable", func() { + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 1) + }) + + It("Validate create PDB with specified maxUnavailable", func() { + aeroCluster.Spec.MaxUnavailable = &maxAvailable + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 0) + }) + + It("Validate update PDB", func() { + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 1) + + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + // Update maxUnavailable + By("Update maxUnavailable to 0") + aeroCluster.Spec.MaxUnavailable = &maxAvailable + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 0) + }) + }) + + Context("Invalid Operations", func() { + value := intstr.FromInt(3) + + It("Should fail if maxUnavailable is greater than size", func() { + aeroCluster.Spec.MaxUnavailable = &value + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }) + + It("Should fail if maxUnavailable is greater than RF", func() { + aeroCluster.Spec.Size = 3 + value := intstr.FromInt(3) + aeroCluster.Spec.MaxUnavailable = &value + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }) + }) + }) + +func validatePDB(ctx context.Context, aerocluster *asdbv1.AerospikeCluster, expectedMaxUnavailable int) { + pdb := policyv1.PodDisruptionBudget{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Namespace: aerocluster.Namespace, + Name: aerocluster.Name, + }, &pdb) + Expect(err).ToNot(HaveOccurred()) + + // Validate PDB + Expect(pdb.Spec.MaxUnavailable.IntValue()).To(Equal(expectedMaxUnavailable)) + Expect(pdb.Status.ExpectedPods).To(Equal(aerocluster.Spec.Size)) + Expect(pdb.Status.CurrentHealthy).To(Equal(aerocluster.Spec.Size)) + Expect(pdb.Status.DisruptionsAllowed).To(Equal(int32(expectedMaxUnavailable))) + Expect(pdb.Status.DesiredHealthy).To(Equal(aerocluster.Spec.Size - int32(expectedMaxUnavailable))) +} diff --git a/test/sample_files_test.go b/test/sample_files_test.go index f0cecea23..18f638fdd 100644 --- a/test/sample_files_test.go +++ b/test/sample_files_test.go @@ -139,7 +139,7 @@ func getSamplesFiles() ([]string, error) { } // Files/Dirs ignored are: - // 1.PMEM sample file as hardware is not available + // 1. PMEM sample file as hardware is not available // 2. XDR related files as they are separately tested // 3. All files which are not CR samples if strings.Contains(path, "pmem_cluster_cr.yaml") || strings.Contains(path, "xdr_") || diff --git a/test/utils.go b/test/utils.go index 94da43766..78207c235 100644 --- a/test/utils.go +++ b/test/utils.go @@ -325,6 +325,11 @@ func isClusterStateValid( return false } + if newCluster.Status.Phase != asdbv1.AerospikeClusterCompleted { + pkgLog.Info("Cluster phase is not set to Completed") + return false + } + return true }