Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
sud82 committed Aug 2, 2024
1 parent 0013bfc commit cdde5d8
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 40 deletions.
91 changes: 58 additions & 33 deletions controllers/rack.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,19 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult {
// remove ignorable pods from failedPods
failedPods = getNonIgnorablePods(failedPods, ignorablePodNames)
if len(failedPods) != 0 {
r.Log.Info("Reconcile the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods))
r.Log.Info(
"Reconcile the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods),
)

if res = r.reconcileRack(
found, state, ignorablePodNames, failedPods,
); !res.isSuccess {
return res
}

r.Log.Info("Reconciled the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods))
r.Log.Info(
"Reconciled the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods),
)
}

// 2. Again, fetch the pods for the rack and if there are failed pods then restart them.
Expand All @@ -114,14 +118,20 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult {
// remove ignorable pods from failedPods
failedPods = getNonIgnorablePods(failedPods, ignorablePodNames)
if len(failedPods) != 0 {
r.Log.Info("Restart the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods))
r.Log.Info(
"Restart the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods),
)

if _, res = r.rollingRestartRack(found, state, ignorablePodNames, nil,
failedPods); !res.isSuccess {
if _, res = r.rollingRestartRack(
found, state, ignorablePodNames, nil,
failedPods,
); !res.isSuccess {
return res
}

r.Log.Info("Restarted the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods))
r.Log.Info(
"Restarted the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods),
)
// Requeue after 1 second to fetch latest CR object with updated pod status
return reconcileRequeueAfter(1)
}
Expand Down Expand Up @@ -351,8 +361,10 @@ func (r *SingleClusterReconciler) deleteRacks(
return reconcileSuccess()
}

func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.StatefulSet, rackState *RackState,
ignorablePodNames sets.Set[string], failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) {
func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(
found *appsv1.StatefulSet, rackState *RackState,
ignorablePodNames sets.Set[string], failedPods []*corev1.Pod,
) (*appsv1.StatefulSet, reconcileResult) {
var res reconcileResult
// Always update configMap. We won't be able to find if a rack's config, and it's pod config is in sync or not
// Checking rack.spec, rack.status will not work.
Expand Down Expand Up @@ -413,7 +425,9 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat
}

if rollingRestartInfo.needRestart {
found, res = r.rollingRestartRack(found, rackState, ignorablePodNames, rollingRestartInfo.restartTypeMap, failedPods)
found, res = r.rollingRestartRack(
found, rackState, ignorablePodNames, rollingRestartInfo.restartTypeMap, failedPods,
)
if !res.isSuccess {
if res.err != nil {
r.Log.Error(
Expand All @@ -434,8 +448,10 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat
}

if len(failedPods) == 0 && rollingRestartInfo.needUpdateConf {
res = r.updateDynamicConfig(rackState, ignorablePodNames,
rollingRestartInfo.restartTypeMap, rollingRestartInfo.dynamicConfDiffPerPod)
res = r.updateDynamicConfig(
rackState, ignorablePodNames,
rollingRestartInfo.restartTypeMap, rollingRestartInfo.dynamicConfDiffPerPod,
)
if !res.isSuccess {
if res.err != nil {
r.Log.Error(
Expand Down Expand Up @@ -473,9 +489,11 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat
return found, reconcileSuccess()
}

func (r *SingleClusterReconciler) updateDynamicConfig(rackState *RackState,
func (r *SingleClusterReconciler) updateDynamicConfig(
rackState *RackState,
ignorablePodNames sets.Set[string], restartTypeMap map[string]RestartType,
dynamicConfDiffPerPod map[string]asconfig.DynamicConfigMap) reconcileResult {
dynamicConfDiffPerPod map[string]asconfig.DynamicConfigMap,
) reconcileResult {
r.Log.Info("Update dynamic config in Aerospike pods")

r.Recorder.Eventf(
Expand Down Expand Up @@ -556,13 +574,6 @@ func (r *SingleClusterReconciler) handleNSOrDeviceRemovalForIgnorablePods(
func (r *SingleClusterReconciler) reconcileRack(
found *appsv1.StatefulSet, rackState *RackState, ignorablePodNames sets.Set[string], failedPods []*corev1.Pod,
) reconcileResult {
if asdbv1.GetBool(r.aeroCluster.Spec.Paused) {
// This check is not strictly necessary here. It is already checked in the parent reconcile function.
// But, it is added here to avoid unnecessary reconciliation of rack when reconcileRack is called in a loop.
r.Log.Info("Reconciliation is paused for this AerospikeCluster")
return reconcileRequeueAfter(1)
}

r.Log.Info(
"Reconcile existing Aerospike cluster statefulset", "stsName",
found.Name,
Expand Down Expand Up @@ -608,8 +619,10 @@ func (r *SingleClusterReconciler) reconcileRack(
// before the scale down could complete.
if (r.aeroCluster.Status.Size > r.aeroCluster.Spec.Size) ||
(!r.IsStatusEmpty() && len(r.aeroCluster.Status.RackConfig.Racks) != len(r.aeroCluster.Spec.RackConfig.Racks)) {
if res = r.setMigrateFillDelay(r.getClientPolicy(), &rackState.Rack.AerospikeConfig, false,
nil); !res.isSuccess {
if res = r.setMigrateFillDelay(
r.getClientPolicy(), &rackState.Rack.AerospikeConfig, false,
nil,
); !res.isSuccess {
r.Log.Error(res.err, "Failed to revert migrate-fill-delay after scale down")
return res
}
Expand Down Expand Up @@ -756,8 +769,10 @@ func (r *SingleClusterReconciler) scaleUpRack(
return found, reconcileSuccess()
}

func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, rackState *RackState,
ignorablePodNames sets.Set[string], failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) {
func (r *SingleClusterReconciler) upgradeRack(
statefulSet *appsv1.StatefulSet, rackState *RackState,
ignorablePodNames sets.Set[string], failedPods []*corev1.Pod,
) (*appsv1.StatefulSet, reconcileResult) {
var (
err error
podList []*corev1.Pod
Expand Down Expand Up @@ -822,7 +837,9 @@ func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, r
podsBatchList[0] = podsToUpgrade
} else {
// Create batch of pods
podsBatchList = getPodsBatchList(r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToUpgrade, len(podList))
podsBatchList = getPodsBatchList(
r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToUpgrade, len(podList),
)
}

if len(podsBatchList) > 0 {
Expand Down Expand Up @@ -914,7 +931,8 @@ func (r *SingleClusterReconciler) scaleDownRack(
diffPods := *found.Spec.Replicas - desiredSize

podsBatchList := getPodsBatchList(
r.aeroCluster.Spec.RackConfig.ScaleDownBatchSize, oldPodList[:diffPods], len(oldPodList))
r.aeroCluster.Spec.RackConfig.ScaleDownBatchSize, oldPodList[:diffPods], len(oldPodList),
)

// Handle one batch
podsBatch := podsBatchList[0]
Expand Down Expand Up @@ -1058,9 +1076,11 @@ func (r *SingleClusterReconciler) scaleDownRack(
return found, reconcileRequeueAfter(1)
}

func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, rackState *RackState,
func (r *SingleClusterReconciler) rollingRestartRack(
found *appsv1.StatefulSet, rackState *RackState,
ignorablePodNames sets.Set[string], restartTypeMap map[string]RestartType,
failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) {
failedPods []*corev1.Pod,
) (*appsv1.StatefulSet, reconcileResult) {
r.Log.Info("Rolling restart AerospikeCluster statefulset pods")

r.Recorder.Eventf(
Expand Down Expand Up @@ -1139,7 +1159,8 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet,
} else {
// Create batch of pods
podsBatchList = getPodsBatchList(
r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList))
r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList),
)
}

// Restart batch of pods
Expand Down Expand Up @@ -1185,7 +1206,8 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet,
return found, reconcileSuccess()
}

func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1.StatefulSet, rackState *RackState,
func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(
statefulSet *appsv1.StatefulSet, rackState *RackState,
ignorablePodNames sets.Set[string], failedPods []*corev1.Pod,
) (*appsv1.StatefulSet, reconcileResult) {
if err := r.updateSTS(statefulSet, rackState); err != nil {
Expand Down Expand Up @@ -1219,8 +1241,10 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1
pod := podList[idx]

if blockedK8sNodes.Has(pod.Spec.NodeName) {
r.Log.Info("Pod found in blocked nodes list, migrating to a different node",
"podName", pod.Name)
r.Log.Info(
"Pod found in blocked nodes list, migrating to a different node",
"podName", pod.Name,
)

podsToRestart = append(podsToRestart, pod)

Expand All @@ -1229,7 +1253,8 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1
}

podsBatchList := getPodsBatchList(
r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList))
r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList),
)

// Restart batch of pods
if len(podsBatchList) > 0 {
Expand Down
50 changes: 43 additions & 7 deletions test/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,22 +113,58 @@ func PauseReconcileTest(ctx goctx.Context) {

It(
"Should pause reconcile", func() {
// Pause reconcile and then apply operation
// Testing over upgrade as it is a long-running operation
By("1. Start upgrade and pause at partial upgrade")
aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName)
Expect(err).ToNot(HaveOccurred())

err = UpdateClusterImage(aeroCluster, nextImage)
Expect(err).ToNot(HaveOccurred())

err = k8sClient.Update(ctx, aeroCluster)
Expect(err).ToNot(HaveOccurred())

Eventually(
func() bool {
aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName)
Expect(err).ToNot(HaveOccurred())

// Check if at least one pod is upgraded
podUpgraded := false
for podName, podStatus := range aeroCluster.Status.Pods {

Check failure on line 134 in test/cluster_test.go

View workflow job for this annotation

GitHub Actions / lint

rangeValCopy: each iteration copies 360 bytes (consider pointers or indexing) (gocritic)
if podStatus.Image == nextImage {
pkgLog.Info("One Pod upgraded", "pod", podName, "image", podStatus.Image)
podUpgraded = true
break
}
}

return podUpgraded

Check failure on line 143 in test/cluster_test.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)
}, 2*time.Minute, 1*time.Second,
).Should(BeTrue())

By("Pause reconcile")
err := setPauseFlag(ctx, clusterNamespacedName, ptr.To(true))
err = setPauseFlag(ctx, clusterNamespacedName, ptr.To(true))
Expect(err).ToNot(HaveOccurred())

By("2. Upgrade should fail")
aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName)
Expect(err).ToNot(HaveOccurred())

By("Start upgrade, it should fail")
err = upgradeClusterTest(k8sClient, ctx, clusterNamespacedName, nextImage)
err = waitForAerospikeCluster(
k8sClient, ctx, aeroCluster, int(aeroCluster.Spec.Size), retryInterval,
getTimeout(1), []asdbv1.AerospikeClusterPhase{asdbv1.AerospikeClusterCompleted},
)
Expect(err).To(HaveOccurred())

By("Resume reconcile")
// Resume reconcile and Wait for all pods to be upgraded
By("3. Resume reconcile and upgrade should succeed")
err = setPauseFlag(ctx, clusterNamespacedName, nil)
Expect(err).ToNot(HaveOccurred())

By("Upgrade should succeed")
aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName)
aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName)
Expect(err).ToNot(HaveOccurred())

err = waitForAerospikeCluster(
Expand All @@ -148,7 +184,7 @@ func setPauseFlag(ctx goctx.Context, clusterNamespacedName types.NamespacedName,

aeroCluster.Spec.Paused = pause

return updateCluster(k8sClient, ctx, aeroCluster)
return k8sClient.Update(ctx, aeroCluster)
}

func UpdateClusterPre600(ctx goctx.Context) {
Expand Down

0 comments on commit cdde5d8

Please sign in to comment.