Skip to content

Commit

Permalink
Added a paused flag to pause the reconciliation of the AerospikeClust…
Browse files Browse the repository at this point in the history
…er (#302)

* Added a paused flag to pause the reconciliation of the AerospikeCluster resource

Co-authored-by: Abhisek Dwivedi <[email protected]>
  • Loading branch information
sud82 and abhishekdwivedi3060 authored Aug 14, 2024
1 parent aeb0ec9 commit a97e3e1
Show file tree
Hide file tree
Showing 12 changed files with 386 additions and 186 deletions.
3 changes: 3 additions & 0 deletions api/v1/aerospikecluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Kubernetes Node BlockList"
// +kubebuilder:validation:MinItems:=1
K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"`
// Paused flag is used to pause the reconciliation for the AerospikeCluster.
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Pause Reconcile"
Paused *bool `json:"paused,omitempty"`
// Operations is a list of on-demand operations to be performed on the Aerospike cluster.
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Operations"
// +kubebuilder:validation:MaxItems:=1
Expand Down
5 changes: 5 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ spec:
list by the operator
type: string
type: object
paused:
description: Paused flag is used to pause the reconciliation for the
AerospikeCluster.
type: boolean
podSpec:
description: Specify additional configuration for the Aerospike pods
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ spec:
- description: Certificates to connect to Aerospike.
displayName: Operator Client Cert
path: operatorClientCert
- description: Paused flag is used to pause the reconciliation for the AerospikeCluster.
displayName: Pause Reconcile
path: paused
- description: Specify additional configuration for the Aerospike pods
displayName: Pod Configuration
path: podSpec
Expand Down
84 changes: 58 additions & 26 deletions controllers/rack.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,19 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult {
// remove ignorable pods from failedPods
failedPods = getNonIgnorablePods(failedPods, ignorablePodNames)
if len(failedPods) != 0 {
r.Log.Info("Reconcile the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods))
r.Log.Info(
"Reconcile the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods),
)

if res = r.reconcileRack(
found, state, ignorablePodNames, failedPods,
); !res.isSuccess {
return res
}

r.Log.Info("Reconciled the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods))
r.Log.Info(
"Reconciled the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods),
)
}

// 2. Again, fetch the pods for the rack and if there are failed pods then restart them.
Expand All @@ -114,14 +118,20 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult {
// remove ignorable pods from failedPods
failedPods = getNonIgnorablePods(failedPods, ignorablePodNames)
if len(failedPods) != 0 {
r.Log.Info("Restart the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods))
r.Log.Info(
"Restart the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods),
)

if _, res = r.rollingRestartRack(found, state, ignorablePodNames, nil,
failedPods); !res.isSuccess {
if _, res = r.rollingRestartRack(
found, state, ignorablePodNames, nil,
failedPods,
); !res.isSuccess {
return res
}

r.Log.Info("Restarted the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods))
r.Log.Info(
"Restarted the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods),
)
// Requeue after 1 second to fetch latest CR object with updated pod status
return reconcileRequeueAfter(1)
}
Expand Down Expand Up @@ -351,8 +361,10 @@ func (r *SingleClusterReconciler) deleteRacks(
return reconcileSuccess()
}

func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.StatefulSet, rackState *RackState,
ignorablePodNames sets.Set[string], failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) {
func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(
found *appsv1.StatefulSet, rackState *RackState,
ignorablePodNames sets.Set[string], failedPods []*corev1.Pod,
) (*appsv1.StatefulSet, reconcileResult) {
var res reconcileResult
// Always update configMap. We won't be able to find if a rack's config, and it's pod config is in sync or not
// Checking rack.spec, rack.status will not work.
Expand Down Expand Up @@ -413,7 +425,9 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat
}

if rollingRestartInfo.needRestart {
found, res = r.rollingRestartRack(found, rackState, ignorablePodNames, rollingRestartInfo.restartTypeMap, failedPods)
found, res = r.rollingRestartRack(
found, rackState, ignorablePodNames, rollingRestartInfo.restartTypeMap, failedPods,
)
if !res.isSuccess {
if res.err != nil {
r.Log.Error(
Expand All @@ -434,8 +448,10 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat
}

if len(failedPods) == 0 && rollingRestartInfo.needUpdateConf {
res = r.updateDynamicConfig(rackState, ignorablePodNames,
rollingRestartInfo.restartTypeMap, rollingRestartInfo.dynamicConfDiffPerPod)
res = r.updateDynamicConfig(
rackState, ignorablePodNames,
rollingRestartInfo.restartTypeMap, rollingRestartInfo.dynamicConfDiffPerPod,
)
if !res.isSuccess {
if res.err != nil {
r.Log.Error(
Expand Down Expand Up @@ -473,9 +489,11 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat
return found, reconcileSuccess()
}

func (r *SingleClusterReconciler) updateDynamicConfig(rackState *RackState,
func (r *SingleClusterReconciler) updateDynamicConfig(
rackState *RackState,
ignorablePodNames sets.Set[string], restartTypeMap map[string]RestartType,
dynamicConfDiffPerPod map[string]asconfig.DynamicConfigMap) reconcileResult {
dynamicConfDiffPerPod map[string]asconfig.DynamicConfigMap,
) reconcileResult {
r.Log.Info("Update dynamic config in Aerospike pods")

r.Recorder.Eventf(
Expand Down Expand Up @@ -601,8 +619,10 @@ func (r *SingleClusterReconciler) reconcileRack(
// before the scale down could complete.
if (r.aeroCluster.Status.Size > r.aeroCluster.Spec.Size) ||
(!r.IsStatusEmpty() && len(r.aeroCluster.Status.RackConfig.Racks) != len(r.aeroCluster.Spec.RackConfig.Racks)) {
if res = r.setMigrateFillDelay(r.getClientPolicy(), &rackState.Rack.AerospikeConfig, false,
nil); !res.isSuccess {
if res = r.setMigrateFillDelay(
r.getClientPolicy(), &rackState.Rack.AerospikeConfig, false,
nil,
); !res.isSuccess {
r.Log.Error(res.err, "Failed to revert migrate-fill-delay after scale down")
return res
}
Expand Down Expand Up @@ -749,8 +769,10 @@ func (r *SingleClusterReconciler) scaleUpRack(
return found, reconcileSuccess()
}

func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, rackState *RackState,
ignorablePodNames sets.Set[string], failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) {
func (r *SingleClusterReconciler) upgradeRack(
statefulSet *appsv1.StatefulSet, rackState *RackState,
ignorablePodNames sets.Set[string], failedPods []*corev1.Pod,
) (*appsv1.StatefulSet, reconcileResult) {
var (
err error
podList []*corev1.Pod
Expand Down Expand Up @@ -815,7 +837,9 @@ func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, r
podsBatchList[0] = podsToUpgrade
} else {
// Create batch of pods
podsBatchList = getPodsBatchList(r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToUpgrade, len(podList))
podsBatchList = getPodsBatchList(
r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToUpgrade, len(podList),
)
}

if len(podsBatchList) > 0 {
Expand Down Expand Up @@ -907,7 +931,8 @@ func (r *SingleClusterReconciler) scaleDownRack(
diffPods := *found.Spec.Replicas - desiredSize

podsBatchList := getPodsBatchList(
r.aeroCluster.Spec.RackConfig.ScaleDownBatchSize, oldPodList[:diffPods], len(oldPodList))
r.aeroCluster.Spec.RackConfig.ScaleDownBatchSize, oldPodList[:diffPods], len(oldPodList),
)

// Handle one batch
podsBatch := podsBatchList[0]
Expand Down Expand Up @@ -1051,9 +1076,11 @@ func (r *SingleClusterReconciler) scaleDownRack(
return found, reconcileRequeueAfter(1)
}

func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, rackState *RackState,
func (r *SingleClusterReconciler) rollingRestartRack(
found *appsv1.StatefulSet, rackState *RackState,
ignorablePodNames sets.Set[string], restartTypeMap map[string]RestartType,
failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) {
failedPods []*corev1.Pod,
) (*appsv1.StatefulSet, reconcileResult) {
r.Log.Info("Rolling restart AerospikeCluster statefulset pods")

r.Recorder.Eventf(
Expand Down Expand Up @@ -1132,7 +1159,8 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet,
} else {
// Create batch of pods
podsBatchList = getPodsBatchList(
r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList))
r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList),
)
}

// Restart batch of pods
Expand Down Expand Up @@ -1178,7 +1206,8 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet,
return found, reconcileSuccess()
}

func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1.StatefulSet, rackState *RackState,
func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(
statefulSet *appsv1.StatefulSet, rackState *RackState,
ignorablePodNames sets.Set[string], failedPods []*corev1.Pod,
) (*appsv1.StatefulSet, reconcileResult) {
if err := r.updateSTS(statefulSet, rackState); err != nil {
Expand Down Expand Up @@ -1212,8 +1241,10 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1
pod := podList[idx]

if blockedK8sNodes.Has(pod.Spec.NodeName) {
r.Log.Info("Pod found in blocked nodes list, migrating to a different node",
"podName", pod.Name)
r.Log.Info(
"Pod found in blocked nodes list, migrating to a different node",
"podName", pod.Name,
)

podsToRestart = append(podsToRestart, pod)

Expand All @@ -1222,7 +1253,8 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1
}

podsBatchList := getPodsBatchList(
r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList))
r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList),
)

// Restart batch of pods
if len(podsBatchList) > 0 {
Expand Down
17 changes: 14 additions & 3 deletions controllers/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ func (r *SingleClusterReconciler) Reconcile() (result ctrl.Result, recErr error)
return reconcile.Result{}, nil
}

// Pause the reconciliation for the AerospikeCluster if the paused field is set to true.
// Deletion of the AerospikeCluster will not be paused.
if asdbv1.GetBool(r.aeroCluster.Spec.Paused) {
r.Log.Info("Reconciliation is paused for this AerospikeCluster")
return reconcile.Result{}, nil
}

// Set the status to AerospikeClusterInProgress before starting any operations
if err := r.setStatusPhase(asdbv1.AerospikeClusterInProgress); err != nil {
return reconcile.Result{}, err
Expand Down Expand Up @@ -301,8 +308,10 @@ func (r *SingleClusterReconciler) recoverIgnorablePods() reconcileResult {
return reconcileSuccess()
}

func (r *SingleClusterReconciler) validateAndReconcileAccessControl(selectedPods []corev1.Pod,
ignorablePodNames sets.Set[string]) error {
func (r *SingleClusterReconciler) validateAndReconcileAccessControl(
selectedPods []corev1.Pod,
ignorablePodNames sets.Set[string],
) error {
version, err := asdbv1.GetImageVersion(r.aeroCluster.Spec.Image)
if err != nil {
return err
Expand Down Expand Up @@ -959,7 +968,9 @@ func (r *SingleClusterReconciler) migrateInitialisedVolumeNames(ctx context.Cont
}

// Appending volume name as <vol_name>@<pvcUID> in initializedVolumes list
initializedVolumes = append(initializedVolumes, fmt.Sprintf("%s@%s", oldFormatInitVolNames[oldVolIdx], pvcUID))
initializedVolumes = append(
initializedVolumes, fmt.Sprintf("%s@%s", oldFormatInitVolNames[oldVolIdx], pvcUID),
)
}
}

Expand Down
42 changes: 24 additions & 18 deletions helm-charts/aerospike-cluster/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,30 @@ helm install aerospike ./aerospike-cluster/ \

## Configurations

| Name | Description | Default |
| ---------- | ----------- | --------- |
| `replicas` | Aerospike cluster size | `3` |
| `image.repository` | Aerospike server container image repository | `aerospike/aerospike-server-enterprise` |
| `image.tag` | Aerospike server container image tag | `7.1.0.0` |
| `imagePullSecrets` | Secrets containing credentials to pull Aerospike container image from a private registry | `{}` (nil) |
| `customLabels` | Custom labels to add on the aerospikecluster resource | `{}` (nil) |
| `aerospikeAccessControl` | Aerospike access control configuration. Define users and roles to be created on the cluster. | `{}` (nil) |
| `aerospikeConfig` | Aerospike configuration | `{}` (nil) |
| `aerospikeNetworkPolicy` | Network policy (client access configuration) | `{}` (nil) |
| `commonName` | Base string for naming pods, services, stateful sets, etc. | Release name truncated to 63 characters (without hyphens) |
| `podSpec` | Aerospike pod spec configuration | `{}` (nil) |
| `rackConfig` | Aerospike rack configuration | `{}` (nil) |
| `storage` | Aerospike pod storage configuration | `{}` (nil) |
| `validationPolicy` | Validation policy | `{}` (nil) |
| `operatorClientCert` | Client certificates to connect to Aerospike | `{}` (nil) |
| `seedsFinderServices` | Service (e.g. loadbalancer) for Aerospike cluster discovery | `{}` (nil) |
| `devMode` | Deploy Aerospike cluster in dev mode | `false` |
| Name | Description | Default |
| ---------- |---------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------|
| `replicas` | Aerospike cluster size | `3` |
| `image.repository` | Aerospike server container image repository | `aerospike/aerospike-server-enterprise` |
| `image.tag` | Aerospike server container image tag | `7.1.0.0` |
| `imagePullSecrets` | Secrets containing credentials to pull Aerospike container image from a private registry | `{}` (nil) |
| `customLabels` | Custom labels to add on the aerospikecluster resource | `{}` (nil) |
| `aerospikeAccessControl` | Aerospike access control configuration. Define users and roles to be created on the cluster. | `{}` (nil) |
| `aerospikeConfig` | Aerospike configuration | `{}` (nil) |
| `aerospikeNetworkPolicy` | Network policy (client access configuration) | `{}` (nil) |
| `commonName` | Base string for naming pods, services, stateful sets, etc. | Release name truncated to 63 characters (without hyphens) |
| `podSpec` | Aerospike pod spec configuration | `{}` (nil) |
| `rackConfig` | Aerospike rack configuration | `{}` (nil) |
| `storage` | Aerospike pod storage configuration | `{}` (nil) |
| `validationPolicy` | Validation policy | `{}` (nil) |
| `operatorClientCert` | Client certificates to connect to Aerospike | `{}` (nil) |
| `seedsFinderServices` | Service (e.g. loadbalancer) for Aerospike cluster discovery | `{}` (nil) |
| `maxUnavailable` | maxUnavailable defines percentage/number of pods that can be allowed to go down or unavailable before application disruption | `1` |
| `disablePDB` | Disable the PodDisruptionBudget creation for the Aerospike cluster | `false` |
| `enableDynamicConfigUpdate` | enableDynamicConfigUpdate enables dynamic config update flow of the operator | `false` |
| `rosterNodeBlockList` | rosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup | `[]` |
| `k8sNodeBlockList` | k8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods | `[]` |
| `paused` | Pause reconciliation of the cluster | `false` |
| `devMode` | Deploy Aerospike cluster in dev mode | `false` |

### Default values in "dev" mode (`devMode=true`):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,7 @@ spec:
## k8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods.
{{- with .Values.k8sNodeBlockList }}
k8sNodeBlockList: {{- toYaml . | nindent 4 }}
{{- end }}
{{- end }}

## Pause reconciliation of the cluster
paused: {{ .Values.paused }}
Loading

0 comments on commit a97e3e1

Please sign in to comment.