From a97e3e195f3bc7feafe0b5e734ae270266e7e586 Mon Sep 17 00:00:00 2001 From: Sudhanshu Ranjan Date: Wed, 14 Aug 2024 20:50:42 +0530 Subject: [PATCH] Added a paused flag to pause the reconciliation of the AerospikeCluster (#302) * Added a paused flag to pause the reconciliation of the AerospikeCluster resource Co-authored-by: Abhisek Dwivedi --- api/v1/aerospikecluster_types.go | 3 + api/v1/zz_generated.deepcopy.go | 5 + .../asdb.aerospike.com_aerospikeclusters.yaml | 4 + ...rnetes-operator.clusterserviceversion.yaml | 3 + controllers/rack.go | 84 ++++-- controllers/reconciler.go | 17 +- helm-charts/aerospike-cluster/README.md | 42 +-- .../templates/aerospike-cluster-cr.yaml | 5 +- helm-charts/aerospike-cluster/values.yaml | 127 ++++----- ..._aerospikeclusters.asdb.aerospike.com.yaml | 4 + test/cluster_test.go | 254 +++++++++++++----- test/utils.go | 24 +- 12 files changed, 386 insertions(+), 186 deletions(-) diff --git a/api/v1/aerospikecluster_types.go b/api/v1/aerospikecluster_types.go index 18d235822..26a727faa 100644 --- a/api/v1/aerospikecluster_types.go +++ b/api/v1/aerospikecluster_types.go @@ -124,6 +124,9 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Kubernetes Node BlockList" // +kubebuilder:validation:MinItems:=1 K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"` + // Paused flag is used to pause the reconciliation for the AerospikeCluster. + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Pause Reconcile" + Paused *bool `json:"paused,omitempty"` // Operations is a list of on-demand operations to be performed on the Aerospike cluster. // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Operations" // +kubebuilder:validation:MaxItems:=1 diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 03adc6db2..8a1a8f646 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -203,6 +203,11 @@ func (in *AerospikeClusterSpec) DeepCopyInto(out *AerospikeClusterSpec) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.Paused != nil { + in, out := &in.Paused, &out.Paused + *out = new(bool) + **out = **in + } if in.Operations != nil { in, out := &in.Operations, &out.Operations *out = make([]OperationSpec, len(*in)) diff --git a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml index 7b5ad432e..e36acf1f4 100644 --- a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml +++ b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml @@ -382,6 +382,10 @@ spec: list by the operator type: string type: object + paused: + description: Paused flag is used to pause the reconciliation for the + AerospikeCluster. + type: boolean podSpec: description: Specify additional configuration for the Aerospike pods properties: diff --git a/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml b/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml index 964726d13..e88283472 100644 --- a/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml +++ b/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml @@ -78,6 +78,9 @@ spec: - description: Certificates to connect to Aerospike. displayName: Operator Client Cert path: operatorClientCert + - description: Paused flag is used to pause the reconciliation for the AerospikeCluster. + displayName: Pause Reconcile + path: paused - description: Specify additional configuration for the Aerospike pods displayName: Pod Configuration path: podSpec diff --git a/controllers/rack.go b/controllers/rack.go index 061e25d5e..d15ae8917 100644 --- a/controllers/rack.go +++ b/controllers/rack.go @@ -86,7 +86,9 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { // remove ignorable pods from failedPods failedPods = getNonIgnorablePods(failedPods, ignorablePodNames) if len(failedPods) != 0 { - r.Log.Info("Reconcile the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods)) + r.Log.Info( + "Reconcile the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods), + ) if res = r.reconcileRack( found, state, ignorablePodNames, failedPods, @@ -94,7 +96,9 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { return res } - r.Log.Info("Reconciled the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods)) + r.Log.Info( + "Reconciled the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods), + ) } // 2. Again, fetch the pods for the rack and if there are failed pods then restart them. @@ -114,14 +118,20 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { // remove ignorable pods from failedPods failedPods = getNonIgnorablePods(failedPods, ignorablePodNames) if len(failedPods) != 0 { - r.Log.Info("Restart the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods)) + r.Log.Info( + "Restart the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods), + ) - if _, res = r.rollingRestartRack(found, state, ignorablePodNames, nil, - failedPods); !res.isSuccess { + if _, res = r.rollingRestartRack( + found, state, ignorablePodNames, nil, + failedPods, + ); !res.isSuccess { return res } - r.Log.Info("Restarted the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods)) + r.Log.Info( + "Restarted the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods), + ) // Requeue after 1 second to fetch latest CR object with updated pod status return reconcileRequeueAfter(1) } @@ -351,8 +361,10 @@ func (r *SingleClusterReconciler) deleteRacks( return reconcileSuccess() } -func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.StatefulSet, rackState *RackState, - ignorablePodNames sets.Set[string], failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) { +func (r *SingleClusterReconciler) upgradeOrRollingRestartRack( + found *appsv1.StatefulSet, rackState *RackState, + ignorablePodNames sets.Set[string], failedPods []*corev1.Pod, +) (*appsv1.StatefulSet, reconcileResult) { var res reconcileResult // Always update configMap. We won't be able to find if a rack's config, and it's pod config is in sync or not // Checking rack.spec, rack.status will not work. @@ -413,7 +425,9 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat } if rollingRestartInfo.needRestart { - found, res = r.rollingRestartRack(found, rackState, ignorablePodNames, rollingRestartInfo.restartTypeMap, failedPods) + found, res = r.rollingRestartRack( + found, rackState, ignorablePodNames, rollingRestartInfo.restartTypeMap, failedPods, + ) if !res.isSuccess { if res.err != nil { r.Log.Error( @@ -434,8 +448,10 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat } if len(failedPods) == 0 && rollingRestartInfo.needUpdateConf { - res = r.updateDynamicConfig(rackState, ignorablePodNames, - rollingRestartInfo.restartTypeMap, rollingRestartInfo.dynamicConfDiffPerPod) + res = r.updateDynamicConfig( + rackState, ignorablePodNames, + rollingRestartInfo.restartTypeMap, rollingRestartInfo.dynamicConfDiffPerPod, + ) if !res.isSuccess { if res.err != nil { r.Log.Error( @@ -473,9 +489,11 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat return found, reconcileSuccess() } -func (r *SingleClusterReconciler) updateDynamicConfig(rackState *RackState, +func (r *SingleClusterReconciler) updateDynamicConfig( + rackState *RackState, ignorablePodNames sets.Set[string], restartTypeMap map[string]RestartType, - dynamicConfDiffPerPod map[string]asconfig.DynamicConfigMap) reconcileResult { + dynamicConfDiffPerPod map[string]asconfig.DynamicConfigMap, +) reconcileResult { r.Log.Info("Update dynamic config in Aerospike pods") r.Recorder.Eventf( @@ -601,8 +619,10 @@ func (r *SingleClusterReconciler) reconcileRack( // before the scale down could complete. if (r.aeroCluster.Status.Size > r.aeroCluster.Spec.Size) || (!r.IsStatusEmpty() && len(r.aeroCluster.Status.RackConfig.Racks) != len(r.aeroCluster.Spec.RackConfig.Racks)) { - if res = r.setMigrateFillDelay(r.getClientPolicy(), &rackState.Rack.AerospikeConfig, false, - nil); !res.isSuccess { + if res = r.setMigrateFillDelay( + r.getClientPolicy(), &rackState.Rack.AerospikeConfig, false, + nil, + ); !res.isSuccess { r.Log.Error(res.err, "Failed to revert migrate-fill-delay after scale down") return res } @@ -749,8 +769,10 @@ func (r *SingleClusterReconciler) scaleUpRack( return found, reconcileSuccess() } -func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, rackState *RackState, - ignorablePodNames sets.Set[string], failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) { +func (r *SingleClusterReconciler) upgradeRack( + statefulSet *appsv1.StatefulSet, rackState *RackState, + ignorablePodNames sets.Set[string], failedPods []*corev1.Pod, +) (*appsv1.StatefulSet, reconcileResult) { var ( err error podList []*corev1.Pod @@ -815,7 +837,9 @@ func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, r podsBatchList[0] = podsToUpgrade } else { // Create batch of pods - podsBatchList = getPodsBatchList(r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToUpgrade, len(podList)) + podsBatchList = getPodsBatchList( + r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToUpgrade, len(podList), + ) } if len(podsBatchList) > 0 { @@ -907,7 +931,8 @@ func (r *SingleClusterReconciler) scaleDownRack( diffPods := *found.Spec.Replicas - desiredSize podsBatchList := getPodsBatchList( - r.aeroCluster.Spec.RackConfig.ScaleDownBatchSize, oldPodList[:diffPods], len(oldPodList)) + r.aeroCluster.Spec.RackConfig.ScaleDownBatchSize, oldPodList[:diffPods], len(oldPodList), + ) // Handle one batch podsBatch := podsBatchList[0] @@ -1051,9 +1076,11 @@ func (r *SingleClusterReconciler) scaleDownRack( return found, reconcileRequeueAfter(1) } -func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, rackState *RackState, +func (r *SingleClusterReconciler) rollingRestartRack( + found *appsv1.StatefulSet, rackState *RackState, ignorablePodNames sets.Set[string], restartTypeMap map[string]RestartType, - failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) { + failedPods []*corev1.Pod, +) (*appsv1.StatefulSet, reconcileResult) { r.Log.Info("Rolling restart AerospikeCluster statefulset pods") r.Recorder.Eventf( @@ -1132,7 +1159,8 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, } else { // Create batch of pods podsBatchList = getPodsBatchList( - r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList)) + r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList), + ) } // Restart batch of pods @@ -1178,7 +1206,8 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, return found, reconcileSuccess() } -func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1.StatefulSet, rackState *RackState, +func (r *SingleClusterReconciler) handleK8sNodeBlockListPods( + statefulSet *appsv1.StatefulSet, rackState *RackState, ignorablePodNames sets.Set[string], failedPods []*corev1.Pod, ) (*appsv1.StatefulSet, reconcileResult) { if err := r.updateSTS(statefulSet, rackState); err != nil { @@ -1212,8 +1241,10 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1 pod := podList[idx] if blockedK8sNodes.Has(pod.Spec.NodeName) { - r.Log.Info("Pod found in blocked nodes list, migrating to a different node", - "podName", pod.Name) + r.Log.Info( + "Pod found in blocked nodes list, migrating to a different node", + "podName", pod.Name, + ) podsToRestart = append(podsToRestart, pod) @@ -1222,7 +1253,8 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1 } podsBatchList := getPodsBatchList( - r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList)) + r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList), + ) // Restart batch of pods if len(podsBatchList) > 0 { diff --git a/controllers/reconciler.go b/controllers/reconciler.go index 74d3cbf92..41e91dc8a 100644 --- a/controllers/reconciler.go +++ b/controllers/reconciler.go @@ -81,6 +81,13 @@ func (r *SingleClusterReconciler) Reconcile() (result ctrl.Result, recErr error) return reconcile.Result{}, nil } + // Pause the reconciliation for the AerospikeCluster if the paused field is set to true. + // Deletion of the AerospikeCluster will not be paused. + if asdbv1.GetBool(r.aeroCluster.Spec.Paused) { + r.Log.Info("Reconciliation is paused for this AerospikeCluster") + return reconcile.Result{}, nil + } + // Set the status to AerospikeClusterInProgress before starting any operations if err := r.setStatusPhase(asdbv1.AerospikeClusterInProgress); err != nil { return reconcile.Result{}, err @@ -301,8 +308,10 @@ func (r *SingleClusterReconciler) recoverIgnorablePods() reconcileResult { return reconcileSuccess() } -func (r *SingleClusterReconciler) validateAndReconcileAccessControl(selectedPods []corev1.Pod, - ignorablePodNames sets.Set[string]) error { +func (r *SingleClusterReconciler) validateAndReconcileAccessControl( + selectedPods []corev1.Pod, + ignorablePodNames sets.Set[string], +) error { version, err := asdbv1.GetImageVersion(r.aeroCluster.Spec.Image) if err != nil { return err @@ -959,7 +968,9 @@ func (r *SingleClusterReconciler) migrateInitialisedVolumeNames(ctx context.Cont } // Appending volume name as @ in initializedVolumes list - initializedVolumes = append(initializedVolumes, fmt.Sprintf("%s@%s", oldFormatInitVolNames[oldVolIdx], pvcUID)) + initializedVolumes = append( + initializedVolumes, fmt.Sprintf("%s@%s", oldFormatInitVolNames[oldVolIdx], pvcUID), + ) } } diff --git a/helm-charts/aerospike-cluster/README.md b/helm-charts/aerospike-cluster/README.md index 52d051aa7..7c640dd24 100644 --- a/helm-charts/aerospike-cluster/README.md +++ b/helm-charts/aerospike-cluster/README.md @@ -44,24 +44,30 @@ helm install aerospike ./aerospike-cluster/ \ ## Configurations -| Name | Description | Default | -| ---------- | ----------- | --------- | -| `replicas` | Aerospike cluster size | `3` | -| `image.repository` | Aerospike server container image repository | `aerospike/aerospike-server-enterprise` | -| `image.tag` | Aerospike server container image tag | `7.1.0.0` | -| `imagePullSecrets` | Secrets containing credentials to pull Aerospike container image from a private registry | `{}` (nil) | -| `customLabels` | Custom labels to add on the aerospikecluster resource | `{}` (nil) | -| `aerospikeAccessControl` | Aerospike access control configuration. Define users and roles to be created on the cluster. | `{}` (nil) | -| `aerospikeConfig` | Aerospike configuration | `{}` (nil) | -| `aerospikeNetworkPolicy` | Network policy (client access configuration) | `{}` (nil) | -| `commonName` | Base string for naming pods, services, stateful sets, etc. | Release name truncated to 63 characters (without hyphens) | -| `podSpec` | Aerospike pod spec configuration | `{}` (nil) | -| `rackConfig` | Aerospike rack configuration | `{}` (nil) | -| `storage` | Aerospike pod storage configuration | `{}` (nil) | -| `validationPolicy` | Validation policy | `{}` (nil) | -| `operatorClientCert` | Client certificates to connect to Aerospike | `{}` (nil) | -| `seedsFinderServices` | Service (e.g. loadbalancer) for Aerospike cluster discovery | `{}` (nil) | -| `devMode` | Deploy Aerospike cluster in dev mode | `false` | +| Name | Description | Default | +| ---------- |---------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------| +| `replicas` | Aerospike cluster size | `3` | +| `image.repository` | Aerospike server container image repository | `aerospike/aerospike-server-enterprise` | +| `image.tag` | Aerospike server container image tag | `7.1.0.0` | +| `imagePullSecrets` | Secrets containing credentials to pull Aerospike container image from a private registry | `{}` (nil) | +| `customLabels` | Custom labels to add on the aerospikecluster resource | `{}` (nil) | +| `aerospikeAccessControl` | Aerospike access control configuration. Define users and roles to be created on the cluster. | `{}` (nil) | +| `aerospikeConfig` | Aerospike configuration | `{}` (nil) | +| `aerospikeNetworkPolicy` | Network policy (client access configuration) | `{}` (nil) | +| `commonName` | Base string for naming pods, services, stateful sets, etc. | Release name truncated to 63 characters (without hyphens) | +| `podSpec` | Aerospike pod spec configuration | `{}` (nil) | +| `rackConfig` | Aerospike rack configuration | `{}` (nil) | +| `storage` | Aerospike pod storage configuration | `{}` (nil) | +| `validationPolicy` | Validation policy | `{}` (nil) | +| `operatorClientCert` | Client certificates to connect to Aerospike | `{}` (nil) | +| `seedsFinderServices` | Service (e.g. loadbalancer) for Aerospike cluster discovery | `{}` (nil) | +| `maxUnavailable` | maxUnavailable defines percentage/number of pods that can be allowed to go down or unavailable before application disruption | `1` | +| `disablePDB` | Disable the PodDisruptionBudget creation for the Aerospike cluster | `false` | +| `enableDynamicConfigUpdate` | enableDynamicConfigUpdate enables dynamic config update flow of the operator | `false` | +| `rosterNodeBlockList` | rosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup | `[]` | +| `k8sNodeBlockList` | k8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods | `[]` | +| `paused` | Pause reconciliation of the cluster | `false` | +| `devMode` | Deploy Aerospike cluster in dev mode | `false` | ### Default values in "dev" mode (`devMode=true`): diff --git a/helm-charts/aerospike-cluster/templates/aerospike-cluster-cr.yaml b/helm-charts/aerospike-cluster/templates/aerospike-cluster-cr.yaml index 103e13f6e..738ec9300 100644 --- a/helm-charts/aerospike-cluster/templates/aerospike-cluster-cr.yaml +++ b/helm-charts/aerospike-cluster/templates/aerospike-cluster-cr.yaml @@ -125,4 +125,7 @@ spec: ## k8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods. {{- with .Values.k8sNodeBlockList }} k8sNodeBlockList: {{- toYaml . | nindent 4 }} - {{- end }} \ No newline at end of file + {{- end }} + + ## Pause reconciliation of the cluster + paused: {{ .Values.paused }} diff --git a/helm-charts/aerospike-cluster/values.yaml b/helm-charts/aerospike-cluster/values.yaml index 2b1ad06f1..e661a40d6 100644 --- a/helm-charts/aerospike-cluster/values.yaml +++ b/helm-charts/aerospike-cluster/values.yaml @@ -21,89 +21,75 @@ customLabels: {} ## Aerospike access control configuration aerospikeAccessControl: {} - # users: - # - name: admin - # secretName: auth-secret - # roles: - # - sys-admin - # - user-admin - # adminPolicy: - # # timeout in milliseconds - # timeout: 1000 - # roles: - # - name: - # privileges: [] - # whitelist: [] +# users: +# - name: admin +# secretName: auth-secret +# roles: +# - sys-admin +# - user-admin +# adminPolicy: +# # timeout in milliseconds +# timeout: 1000 +# roles: +# - name: +# privileges: [] +# whitelist: [] ## Aerospike Configuration aerospikeConfig: - # service: - # feature-key-file: /etc/aerospike/secrets/features.conf - - # security: - # enable-security: false - - # network: - # service: - # port: 3000 - # fabric: - # port: 3001 - # heartbeat: - # port: 3002 - - # namespaces: - # - name: test - # replication-factor: 2 - # storage-engine: - # type: memory - # data-size: 1073741824 # 1GiB - - -## Aerospike secrets -## To add feature key file, tls certificates etc. -## We may be able to add feature key file, certificates and other secrets dynamically during helm install -## when, -## 1. operator supports adding multiple secret sources, or -## 2. https://github.com/helm/helm/pull/8841 feature is added. - -# aerospikeSecretName: aerospike-secrets -# aerospikeSecretMountPath: /etc/aerospike/secrets/ +# service: +# feature-key-file: /etc/aerospike/secrets/features.conf +# +# network: +# service: +# port: 3000 +# fabric: +# port: 3001 +# heartbeat: +# port: 3002 +# +# namespaces: +# - name: test +# replication-factor: 2 +# storage-engine: +# type: memory +# data-size: 1073741824 # 1GiB ## Network policy aerospikeNetworkPolicy: {} - # access: pod - # alternateAccess: hostExternal - # tlsAccess: pod - # tlsAlternateAccess: hostExternal +# access: pod +# alternateAccess: hostExternal +# tlsAccess: pod +# tlsAlternateAccess: hostExternal ## Pod spec podSpec: {} - # Multi pod per host - # multiPodPerHost: true - # sidecars: - # - name: aerospike-prometheus-exporter - # image: "aerospike/aerospike-prometheus-exporter:1.1.6" - # ports: - # - containerPort: 9145 - # name: exporter +## Multi pod per host +# multiPodPerHost: true +# sidecars: +# - name: aerospike-prometheus-exporter +# image: aerospike/aerospike-prometheus-exporter:v1.18.0 +# ports: +# - containerPort: 9145 +# name: exporter ## Rack configuration rackConfig: {} ## Storage configuration storage: {} - # volumes: - # - name: aerospike-config-secret - # source: - # secret: - # secretName: aerospike-secret - # aerospike: - # path: /etc/aerospike/secrets +# volumes: +# - name: aerospike-config-secret +# source: +# secret: +# secretName: aerospike-secret +# aerospike: +# path: /etc/aerospike/secrets ## Validation policy validationPolicy: {} - # skipWorkDirValidate: true - # skipXdrDlogFileValidate: true +# skipWorkDirValidate: true +# skipXdrDlogFileValidate: true ## seedsFinderServices defines service (e.g. loadbalancer) to connect to Aerospike seedsFinderServices: {} @@ -111,9 +97,6 @@ seedsFinderServices: {} ## operatorClientCert defines certificates to connect to Aerospike operatorClientCert: {} -## Dev Mode -devMode: false - ## maxUnavailable defines percentage/number of pods that can be allowed to go down or unavailable ## before application disruption. maxUnavailable: 1 @@ -132,4 +115,10 @@ rosterNodeBlockList: [] ## k8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods. ## Replace the value with kubernetes cluster node name which needs to be blocked. k8sNodeBlockList: [] -# - \ No newline at end of file +# - + +## Pause reconciliation of the cluster +paused: false + +## Dev Mode +devMode: false diff --git a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml index 7b5ad432e..e36acf1f4 100644 --- a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml +++ b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml @@ -382,6 +382,10 @@ spec: list by the operator type: string type: object + paused: + description: Paused flag is used to pause the reconciliation for the + AerospikeCluster. + type: boolean podSpec: description: Specify additional configuration for the Aerospike pods properties: diff --git a/test/cluster_test.go b/test/cluster_test.go index df802367c..fb88bd186 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -79,9 +79,114 @@ var _ = Describe( UpdateClusterPre600(ctx) }, ) + Context( + "PauseReconcile", func() { + PauseReconcileTest(ctx) + }, + ) }, ) +func PauseReconcileTest(ctx goctx.Context) { + clusterNamespacedName := getNamespacedName( + "pause-reconcile", namespace, + ) + + BeforeEach( + func() { + aeroCluster := createDummyAerospikeCluster(clusterNamespacedName, 2) + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + AfterEach( + func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + _ = deleteCluster(k8sClient, ctx, aeroCluster) + }, + ) + + It( + "Should pause reconcile", func() { + // Testing over upgrade as it is a long-running operation + By("1. Start upgrade and pause at partial upgrade") + aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + err = UpdateClusterImage(aeroCluster, nextImage) + Expect(err).ToNot(HaveOccurred()) + + err = k8sClient.Update(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + Eventually( + func() bool { + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + // Check if at least one pod is upgraded + podUpgraded := false + for podName := range aeroCluster.Status.Pods { + podStatus := aeroCluster.Status.Pods[podName] + if podStatus.Image == nextImage { + pkgLog.Info("One Pod upgraded", "pod", podName, "image", podStatus.Image) + podUpgraded = true + break + } + } + + return podUpgraded + }, 2*time.Minute, 1*time.Second, + ).Should(BeTrue()) + + By("Pause reconcile") + err = setPauseFlag(ctx, clusterNamespacedName, ptr.To(true)) + Expect(err).ToNot(HaveOccurred()) + + By("2. Upgrade should fail") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + err = waitForAerospikeCluster( + k8sClient, ctx, aeroCluster, int(aeroCluster.Spec.Size), retryInterval, + getTimeout(1), []asdbv1.AerospikeClusterPhase{asdbv1.AerospikeClusterCompleted}, + ) + Expect(err).To(HaveOccurred()) + + // Resume reconcile and Wait for all pods to be upgraded + By("3. Resume reconcile and upgrade should succeed") + err = setPauseFlag(ctx, clusterNamespacedName, nil) + Expect(err).ToNot(HaveOccurred()) + + By("Upgrade should succeed") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + err = waitForAerospikeCluster( + k8sClient, ctx, aeroCluster, int(aeroCluster.Spec.Size), retryInterval, + getTimeout(2), []asdbv1.AerospikeClusterPhase{asdbv1.AerospikeClusterCompleted}, + ) + Expect(err).ToNot(HaveOccurred()) + }, + ) +} + +func setPauseFlag(ctx goctx.Context, clusterNamespacedName types.NamespacedName, pause *bool) error { + aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName) + if err != nil { + return err + } + + aeroCluster.Spec.Paused = pause + + return k8sClient.Update(ctx, aeroCluster) +} + func UpdateClusterPre600(ctx goctx.Context) { Context( "UpdateClusterPre600", func() { @@ -199,7 +304,8 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { nodeList = &v1.NodeList{} podList = &v1.PodList{} expectedPhases = []asdbv1.AerospikeClusterPhase{ - asdbv1.AerospikeClusterInProgress, asdbv1.AerospikeClusterCompleted} + asdbv1.AerospikeClusterInProgress, asdbv1.AerospikeClusterCompleted, + } ) clusterNamespacedName := getNamespacedName( @@ -238,27 +344,31 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { // As pod is in pending state, CR object will be updated continuously // This is put in eventually to retry Object Conflict error - Eventually(func() error { - aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) - Expect(err).ToNot(HaveOccurred()) - val := intstr.FromInt32(1) - aeroCluster.Spec.RackConfig.MaxIgnorablePods = &val - aeroCluster.Spec.AerospikeConfig.Value["security"].(map[string]interface{})["enable-quotas"] = true - - // As pod is in pending state, CR object won't reach the final phase. - // So expectedPhases can be InProgress or Completed - return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases) - }, 1*time.Minute).ShouldNot(HaveOccurred()) + Eventually( + func() error { + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + val := intstr.FromInt32(1) + aeroCluster.Spec.RackConfig.MaxIgnorablePods = &val + aeroCluster.Spec.AerospikeConfig.Value["security"].(map[string]interface{})["enable-quotas"] = true + + // As pod is in pending state, CR object won't reach the final phase. + // So expectedPhases can be InProgress or Completed + return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases) + }, 1*time.Minute, + ).ShouldNot(HaveOccurred()) By("Upgrade version") - Eventually(func() error { - aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) - Expect(err).ToNot(HaveOccurred()) - aeroCluster.Spec.Image = nextImage - // As pod is in pending state, CR object won't reach the final phase. - // So expectedPhases can be InProgress or Completed - return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases) - }, 1*time.Minute).ShouldNot(HaveOccurred()) + Eventually( + func() error { + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.Image = nextImage + // As pod is in pending state, CR object won't reach the final phase. + // So expectedPhases can be InProgress or Completed + return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases) + }, 1*time.Minute, + ).ShouldNot(HaveOccurred()) By("Verify pending pod") podList, err = getPodList(aeroCluster, k8sClient) @@ -274,21 +384,23 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { Expect(counter).To(Equal(1)) By("Executing on-demand operation") - Eventually(func() error { - aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) - Expect(err).ToNot(HaveOccurred()) - - operations := []asdbv1.OperationSpec{ - { - Kind: asdbv1.OperationWarmRestart, - ID: "1", - }, - } - aeroCluster.Spec.Operations = operations - // As pod is in pending state, CR object won't reach the final phase. - // So expectedPhases can be InProgress or Completed - return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases) - }, 1*time.Minute).ShouldNot(HaveOccurred()) + Eventually( + func() error { + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + }, + } + aeroCluster.Spec.Operations = operations + // As pod is in pending state, CR object won't reach the final phase. + // So expectedPhases can be InProgress or Completed + return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases) + }, 1*time.Minute, + ).ShouldNot(HaveOccurred()) By("Verify pending pod") podList, err = getPodList(aeroCluster, k8sClient) @@ -342,8 +454,12 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { ignorePodName := clusterNamespacedName.Name + "-1-1" pod := &v1.Pod{} - err := k8sClient.Get(ctx, types.NamespacedName{Name: ignorePodName, - Namespace: clusterNamespacedName.Namespace}, pod) + err := k8sClient.Get( + ctx, types.NamespacedName{ + Name: ignorePodName, + Namespace: clusterNamespacedName.Namespace, + }, pod, + ) Expect(err).ToNot(HaveOccurred()) pod.Spec.Containers[0].Image = wrongImage @@ -361,19 +477,29 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { Expect(err).ToNot(HaveOccurred()) By(fmt.Sprintf("Verify if failed pod %s is automatically recovered", ignorePodName)) - Eventually(func() bool { - err = k8sClient.Get(ctx, types.NamespacedName{Name: ignorePodName, - Namespace: clusterNamespacedName.Namespace}, pod) - - return len(pod.Status.ContainerStatuses) != 0 && *pod.Status.ContainerStatuses[0].Started && - pod.Status.ContainerStatuses[0].Ready - }, 1*time.Minute).Should(BeTrue()) - - Eventually(func() error { - return InterceptGomegaFailure(func() { - validateRoster(k8sClient, ctx, clusterNamespacedName, scNamespace) - }) - }, 4*time.Minute).Should(BeNil()) + Eventually( + func() bool { + err = k8sClient.Get( + ctx, types.NamespacedName{ + Name: ignorePodName, + Namespace: clusterNamespacedName.Namespace, + }, pod, + ) + + return len(pod.Status.ContainerStatuses) != 0 && *pod.Status.ContainerStatuses[0].Started && + pod.Status.ContainerStatuses[0].Ready + }, 1*time.Minute, + ).Should(BeTrue()) + + Eventually( + func() error { + return InterceptGomegaFailure( + func() { + validateRoster(k8sClient, ctx, clusterNamespacedName, scNamespace) + }, + ) + }, 4*time.Minute, + ).Should(BeNil()) }, ) @@ -383,8 +509,12 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { ignorePodName := clusterNamespacedName.Name + "-1-1" pod := &v1.Pod{} - err := k8sClient.Get(ctx, types.NamespacedName{Name: ignorePodName, - Namespace: clusterNamespacedName.Namespace}, pod) + err := k8sClient.Get( + ctx, types.NamespacedName{ + Name: ignorePodName, + Namespace: clusterNamespacedName.Namespace, + }, pod, + ) Expect(err).ToNot(HaveOccurred()) pod.Spec.Containers[0].Image = wrongImage @@ -430,7 +560,8 @@ func deployClusterForMaxIgnorablePods(ctx goctx.Context, clusterNamespacedName t nsList = append(nsList, getNonSCNamespaceConfig("bar", "/test/dev/xvdf1")) aeroCluster.Spec.AerospikeConfig.Value["namespaces"] = nsList - aeroCluster.Spec.Storage.Volumes = append(aeroCluster.Spec.Storage.Volumes, + aeroCluster.Spec.Storage.Volumes = append( + aeroCluster.Spec.Storage.Volumes, asdbv1.VolumeSpec{ Name: "bar", Source: asdbv1.VolumeSource{ @@ -447,7 +578,8 @@ func deployClusterForMaxIgnorablePods(ctx goctx.Context, clusterNamespacedName t ) racks := getDummyRackConf(1, 2) aeroCluster.Spec.RackConfig = asdbv1.RackConfig{ - Namespaces: []string{scNamespace}, Racks: racks} + Namespaces: []string{scNamespace}, Racks: racks, + } aeroCluster.Spec.PodSpec.MultiPodPerHost = ptr.To(false) err := deployCluster(k8sClient, ctx, aeroCluster) Expect(err).ToNot(HaveOccurred()) @@ -702,12 +834,14 @@ func UpdateTLSClusterTest(ctx goctx.Context) { network := aeroCluster.Spec.AerospikeConfig.Value["network"].(map[string]interface{}) tlsList := network["tls"].([]interface{}) - tlsList = append(tlsList, map[string]interface{}{ - "name": "aerospike-a-0.test-runner1", - "cert-file": "/etc/aerospike/secret/svc_cluster_chain.pem", - "key-file": "/etc/aerospike/secret/svc_key.pem", - "ca-file": "/etc/aerospike/secret/cacert.pem", - }) + tlsList = append( + tlsList, map[string]interface{}{ + "name": "aerospike-a-0.test-runner1", + "cert-file": "/etc/aerospike/secret/svc_cluster_chain.pem", + "key-file": "/etc/aerospike/secret/svc_key.pem", + "ca-file": "/etc/aerospike/secret/cacert.pem", + }, + ) network["tls"] = tlsList aeroCluster.Spec.AerospikeConfig.Value["network"] = network err = updateCluster(k8sClient, ctx, aeroCluster) diff --git a/test/utils.go b/test/utils.go index cf6fb89cf..302cbc049 100644 --- a/test/utils.go +++ b/test/utils.go @@ -288,16 +288,20 @@ func isClusterStateValid( return false } - // Validate status - statusToSpec, err := asdbv1.CopyStatusToSpec(&newCluster.Status.AerospikeClusterStatusSpec) - if err != nil { - pkgLog.Error(err, "Failed to copy spec in status", "err", err) - return false - } + // Do not compare status with spec if cluster reconciliation is paused + // `paused` flag only exists in the spec and not in the status. + if !asdbv1.GetBool(aeroCluster.Spec.Paused) { + // Validate status + statusToSpec, err := asdbv1.CopyStatusToSpec(&newCluster.Status.AerospikeClusterStatusSpec) + if err != nil { + pkgLog.Error(err, "Failed to copy spec in status", "err", err) + return false + } - if !reflect.DeepEqual(statusToSpec, &newCluster.Spec) { - pkgLog.Info("Cluster status is not matching the spec") - return false + if !reflect.DeepEqual(statusToSpec, &newCluster.Spec) { + pkgLog.Info("Cluster status is not matching the spec") + return false + } } // TODO: This is not valid for tests where maxUnavailablePods flag is used. @@ -323,6 +327,8 @@ func isClusterStateValid( aeroCluster.Spec.Image, ), ) + + return false } if newCluster.Labels[asdbv1.AerospikeAPIVersionLabel] != asdbv1.AerospikeAPIVersion {