Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a paused flag to pause the reconciliation of the AerospikeCluster #302

Merged
merged 7 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/v1/aerospikecluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Kubernetes Node BlockList"
// +kubebuilder:validation:MinItems:=1
K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"`
// Paused flag is used to pause the reconciliation for the AerospikeCluster.
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Pause Reconcile"
Paused *bool `json:"paused,omitempty"`
abhishekdwivedi3060 marked this conversation as resolved.
Show resolved Hide resolved
// Operations is a list of on-demand operations to be performed on the Aerospike cluster.
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Operations"
// +kubebuilder:validation:MaxItems:=1
Expand Down
5 changes: 5 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ spec:
list by the operator
type: string
type: object
paused:
description: Paused flag is used to pause the reconciliation for the
AerospikeCluster.
type: boolean
podSpec:
description: Specify additional configuration for the Aerospike pods
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ spec:
- description: Certificates to connect to Aerospike.
displayName: Operator Client Cert
path: operatorClientCert
- description: Paused flag is used to pause the reconciliation for the AerospikeCluster.
displayName: Pause Reconcile
path: paused
- description: Specify additional configuration for the Aerospike pods
displayName: Pod Configuration
path: podSpec
Expand Down
84 changes: 58 additions & 26 deletions controllers/rack.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,19 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult {
// remove ignorable pods from failedPods
failedPods = getNonIgnorablePods(failedPods, ignorablePodNames)
if len(failedPods) != 0 {
r.Log.Info("Reconcile the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods))
r.Log.Info(
"Reconcile the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods),
)

if res = r.reconcileRack(
found, state, ignorablePodNames, failedPods,
); !res.isSuccess {
return res
}

r.Log.Info("Reconciled the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods))
r.Log.Info(
"Reconciled the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods),
)
}

// 2. Again, fetch the pods for the rack and if there are failed pods then restart them.
Expand All @@ -114,14 +118,20 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult {
// remove ignorable pods from failedPods
failedPods = getNonIgnorablePods(failedPods, ignorablePodNames)
if len(failedPods) != 0 {
r.Log.Info("Restart the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods))
r.Log.Info(
"Restart the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods),
)

if _, res = r.rollingRestartRack(found, state, ignorablePodNames, nil,
failedPods); !res.isSuccess {
if _, res = r.rollingRestartRack(
found, state, ignorablePodNames, nil,
failedPods,
); !res.isSuccess {
return res
}

r.Log.Info("Restarted the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods))
r.Log.Info(
"Restarted the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods),
)
// Requeue after 1 second to fetch latest CR object with updated pod status
return reconcileRequeueAfter(1)
}
Expand Down Expand Up @@ -351,8 +361,10 @@ func (r *SingleClusterReconciler) deleteRacks(
return reconcileSuccess()
}

func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.StatefulSet, rackState *RackState,
ignorablePodNames sets.Set[string], failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) {
func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(
found *appsv1.StatefulSet, rackState *RackState,
ignorablePodNames sets.Set[string], failedPods []*corev1.Pod,
) (*appsv1.StatefulSet, reconcileResult) {
var res reconcileResult
// Always update configMap. We won't be able to find if a rack's config, and it's pod config is in sync or not
// Checking rack.spec, rack.status will not work.
Expand Down Expand Up @@ -413,7 +425,9 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat
}

if rollingRestartInfo.needRestart {
found, res = r.rollingRestartRack(found, rackState, ignorablePodNames, rollingRestartInfo.restartTypeMap, failedPods)
found, res = r.rollingRestartRack(
found, rackState, ignorablePodNames, rollingRestartInfo.restartTypeMap, failedPods,
)
if !res.isSuccess {
if res.err != nil {
r.Log.Error(
Expand All @@ -434,8 +448,10 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat
}

if len(failedPods) == 0 && rollingRestartInfo.needUpdateConf {
res = r.updateDynamicConfig(rackState, ignorablePodNames,
rollingRestartInfo.restartTypeMap, rollingRestartInfo.dynamicConfDiffPerPod)
res = r.updateDynamicConfig(
rackState, ignorablePodNames,
rollingRestartInfo.restartTypeMap, rollingRestartInfo.dynamicConfDiffPerPod,
)
if !res.isSuccess {
if res.err != nil {
r.Log.Error(
Expand Down Expand Up @@ -473,9 +489,11 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat
return found, reconcileSuccess()
}

func (r *SingleClusterReconciler) updateDynamicConfig(rackState *RackState,
func (r *SingleClusterReconciler) updateDynamicConfig(
rackState *RackState,
ignorablePodNames sets.Set[string], restartTypeMap map[string]RestartType,
dynamicConfDiffPerPod map[string]asconfig.DynamicConfigMap) reconcileResult {
dynamicConfDiffPerPod map[string]asconfig.DynamicConfigMap,
) reconcileResult {
r.Log.Info("Update dynamic config in Aerospike pods")

r.Recorder.Eventf(
Expand Down Expand Up @@ -601,8 +619,10 @@ func (r *SingleClusterReconciler) reconcileRack(
// before the scale down could complete.
if (r.aeroCluster.Status.Size > r.aeroCluster.Spec.Size) ||
(!r.IsStatusEmpty() && len(r.aeroCluster.Status.RackConfig.Racks) != len(r.aeroCluster.Spec.RackConfig.Racks)) {
if res = r.setMigrateFillDelay(r.getClientPolicy(), &rackState.Rack.AerospikeConfig, false,
nil); !res.isSuccess {
if res = r.setMigrateFillDelay(
r.getClientPolicy(), &rackState.Rack.AerospikeConfig, false,
nil,
); !res.isSuccess {
r.Log.Error(res.err, "Failed to revert migrate-fill-delay after scale down")
return res
}
Expand Down Expand Up @@ -749,8 +769,10 @@ func (r *SingleClusterReconciler) scaleUpRack(
return found, reconcileSuccess()
}

func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, rackState *RackState,
ignorablePodNames sets.Set[string], failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) {
func (r *SingleClusterReconciler) upgradeRack(
statefulSet *appsv1.StatefulSet, rackState *RackState,
ignorablePodNames sets.Set[string], failedPods []*corev1.Pod,
) (*appsv1.StatefulSet, reconcileResult) {
var (
err error
podList []*corev1.Pod
Expand Down Expand Up @@ -815,7 +837,9 @@ func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, r
podsBatchList[0] = podsToUpgrade
} else {
// Create batch of pods
podsBatchList = getPodsBatchList(r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToUpgrade, len(podList))
podsBatchList = getPodsBatchList(
r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToUpgrade, len(podList),
)
}

if len(podsBatchList) > 0 {
Expand Down Expand Up @@ -907,7 +931,8 @@ func (r *SingleClusterReconciler) scaleDownRack(
diffPods := *found.Spec.Replicas - desiredSize

podsBatchList := getPodsBatchList(
r.aeroCluster.Spec.RackConfig.ScaleDownBatchSize, oldPodList[:diffPods], len(oldPodList))
r.aeroCluster.Spec.RackConfig.ScaleDownBatchSize, oldPodList[:diffPods], len(oldPodList),
)

// Handle one batch
podsBatch := podsBatchList[0]
Expand Down Expand Up @@ -1051,9 +1076,11 @@ func (r *SingleClusterReconciler) scaleDownRack(
return found, reconcileRequeueAfter(1)
}

func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, rackState *RackState,
func (r *SingleClusterReconciler) rollingRestartRack(
found *appsv1.StatefulSet, rackState *RackState,
ignorablePodNames sets.Set[string], restartTypeMap map[string]RestartType,
failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) {
failedPods []*corev1.Pod,
) (*appsv1.StatefulSet, reconcileResult) {
r.Log.Info("Rolling restart AerospikeCluster statefulset pods")

r.Recorder.Eventf(
Expand Down Expand Up @@ -1132,7 +1159,8 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet,
} else {
// Create batch of pods
podsBatchList = getPodsBatchList(
r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList))
r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList),
)
}

// Restart batch of pods
Expand Down Expand Up @@ -1178,7 +1206,8 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet,
return found, reconcileSuccess()
}

func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1.StatefulSet, rackState *RackState,
func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(
statefulSet *appsv1.StatefulSet, rackState *RackState,
ignorablePodNames sets.Set[string], failedPods []*corev1.Pod,
) (*appsv1.StatefulSet, reconcileResult) {
if err := r.updateSTS(statefulSet, rackState); err != nil {
Expand Down Expand Up @@ -1212,8 +1241,10 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1
pod := podList[idx]

if blockedK8sNodes.Has(pod.Spec.NodeName) {
r.Log.Info("Pod found in blocked nodes list, migrating to a different node",
"podName", pod.Name)
r.Log.Info(
"Pod found in blocked nodes list, migrating to a different node",
"podName", pod.Name,
)

podsToRestart = append(podsToRestart, pod)

Expand All @@ -1222,7 +1253,8 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1
}

podsBatchList := getPodsBatchList(
r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList))
r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList),
)

// Restart batch of pods
if len(podsBatchList) > 0 {
Expand Down
17 changes: 14 additions & 3 deletions controllers/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ func (r *SingleClusterReconciler) Reconcile() (result ctrl.Result, recErr error)
return reconcile.Result{}, nil
}

// Pause the reconciliation for the AerospikeCluster if the paused field is set to true.
// Deletion of the AerospikeCluster will not be paused.
if asdbv1.GetBool(r.aeroCluster.Spec.Paused) {
r.Log.Info("Reconciliation is paused for this AerospikeCluster")
return reconcile.Result{}, nil
}

// Set the status to AerospikeClusterInProgress before starting any operations
if err := r.setStatusPhase(asdbv1.AerospikeClusterInProgress); err != nil {
return reconcile.Result{}, err
Expand Down Expand Up @@ -301,8 +308,10 @@ func (r *SingleClusterReconciler) recoverIgnorablePods() reconcileResult {
return reconcileSuccess()
}

func (r *SingleClusterReconciler) validateAndReconcileAccessControl(selectedPods []corev1.Pod,
ignorablePodNames sets.Set[string]) error {
func (r *SingleClusterReconciler) validateAndReconcileAccessControl(
selectedPods []corev1.Pod,
ignorablePodNames sets.Set[string],
) error {
version, err := asdbv1.GetImageVersion(r.aeroCluster.Spec.Image)
if err != nil {
return err
Expand Down Expand Up @@ -959,7 +968,9 @@ func (r *SingleClusterReconciler) migrateInitialisedVolumeNames(ctx context.Cont
}

// Appending volume name as <vol_name>@<pvcUID> in initializedVolumes list
initializedVolumes = append(initializedVolumes, fmt.Sprintf("%s@%s", oldFormatInitVolNames[oldVolIdx], pvcUID))
initializedVolumes = append(
initializedVolumes, fmt.Sprintf("%s@%s", oldFormatInitVolNames[oldVolIdx], pvcUID),
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ spec:
list by the operator
type: string
type: object
paused:
description: Paused flag is used to pause the reconciliation for the
AerospikeCluster.
type: boolean
podSpec:
description: Specify additional configuration for the Aerospike pods
properties:
Expand Down
Loading
Loading