Skip to content

Commit

Permalink
Added a paused flag to pause the reconciliation of the AerospikeClust…
Browse files Browse the repository at this point in the history
…er resource
  • Loading branch information
sud82 committed Jul 8, 2024
1 parent 03662f3 commit 73b2d9b
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 12 deletions.
3 changes: 3 additions & 0 deletions api/v1/aerospikecluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Kubernetes Node BlockList"
// +kubebuilder:validation:MinItems:=1
K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"`
// Paused flag is used to pause the reconciliation for the AerospikeCluster.
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Pause Reconcile"
Paused *bool `json:"paused,omitempty"`
}

type SeedsFinderServices struct {
Expand Down
5 changes: 5 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ spec:
list by the operator
type: string
type: object
paused:
description: Paused flag is used to pause the reconciliation for the
AerospikeCluster.
type: boolean
podSpec:
description: Specify additional configuration for the Aerospike pods
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ spec:
- description: Certificates to connect to Aerospike.
displayName: Operator Client Cert
path: operatorClientCert
- description: Paused flag is used to pause the reconciliation for the AerospikeCluster.
displayName: Pause Reconcile
path: paused
- description: Specify additional configuration for the Aerospike pods
displayName: Pod Configuration
path: podSpec
Expand Down
7 changes: 7 additions & 0 deletions controllers/rack.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,13 @@ func (r *SingleClusterReconciler) handleNSOrDeviceRemovalForIgnorablePods(
func (r *SingleClusterReconciler) reconcileRack(
found *appsv1.StatefulSet, rackState *RackState, ignorablePodNames sets.Set[string], failedPods []*corev1.Pod,
) reconcileResult {
if asdbv1.GetBool(r.aeroCluster.Spec.Paused) {
// This check is not strictly necessary here. It is already checked in the parent reconcile function.
// But, it is added here to avoid unnecessary reconciliation of rack when reconcileRack is called in a loop.
r.Log.Info("Reconciliation is paused for this AerospikeCluster")
return reconcileRequeueAfter(1)
}

r.Log.Info(
"Reconcile existing Aerospike cluster statefulset", "stsName",
found.Name,
Expand Down
12 changes: 9 additions & 3 deletions controllers/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -16,9 +13,11 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"reflect"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"strings"

as "github.com/aerospike/aerospike-client-go/v7"
asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1"
Expand Down Expand Up @@ -81,6 +80,13 @@ func (r *SingleClusterReconciler) Reconcile() (result ctrl.Result, recErr error)
return reconcile.Result{}, nil
}

// Pause the reconciliation for the AerospikeCluster if the paused field is set to true.
// Deletion of the AerospikeCluster will not be paused.
if asdbv1.GetBool(r.aeroCluster.Spec.Paused) {
r.Log.Info("Reconciliation is paused for this AerospikeCluster")
return reconcile.Result{}, nil
}

// Set the status to AerospikeClusterInProgress before starting any operations
if err := r.setStatusPhase(asdbv1.AerospikeClusterInProgress); err != nil {
return reconcile.Result{}, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ spec:
list by the operator
type: string
type: object
paused:
description: Paused flag is used to pause the reconciliation for the
AerospikeCluster.
type: boolean
podSpec:
description: Specify additional configuration for the Aerospike pods
properties:
Expand Down
62 changes: 62 additions & 0 deletions test/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,71 @@ var _ = Describe(
UpdateClusterPre600(ctx)
},
)
Context("PauseReconcile", func() {
PauseReconcileTest(ctx)
})
},
)

func PauseReconcileTest(ctx goctx.Context) {
clusterNamespacedName := getNamespacedName(
"pause-reconcile", namespace,
)

BeforeEach(
func() {
aeroCluster := createDummyAerospikeCluster(clusterNamespacedName, 2)
err := deployCluster(k8sClient, ctx, aeroCluster)
Expect(err).ToNot(HaveOccurred())
},
)

AfterEach(
func() {
aeroCluster, err := getCluster(
k8sClient, ctx, clusterNamespacedName,
)
Expect(err).ToNot(HaveOccurred())

_ = deleteCluster(k8sClient, ctx, aeroCluster)
},
)

It("Should pause reconcile", func() {
// Pause reconcile and then apply operation
// Testing over upgrade as it is a long-running operation
By("Pause reconcile")
err := setPauseFlag(ctx, clusterNamespacedName, ptr.To(true))
Expect(err).ToNot(HaveOccurred())

By("Start upgrade, it should fail")
err = upgradeClusterTest(k8sClient, ctx, clusterNamespacedName, nextImage)
Expect(err).To(HaveOccurred())

By("Resume reconcile")
err = setPauseFlag(ctx, clusterNamespacedName, nil)
Expect(err).ToNot(HaveOccurred())

By("Upgrade should succeed")
aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName)
Expect(err).ToNot(HaveOccurred())

err = waitForAerospikeCluster(k8sClient, ctx, aeroCluster, int(aeroCluster.Spec.Size), retryInterval, getTimeout(2), []asdbv1.AerospikeClusterPhase{asdbv1.AerospikeClusterCompleted})
Expect(err).ToNot(HaveOccurred())
})
}

func setPauseFlag(ctx goctx.Context, clusterNamespacedName types.NamespacedName, pause *bool) error {
aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName)
if err != nil {
return err
}

aeroCluster.Spec.Paused = pause

return updateCluster(k8sClient, ctx, aeroCluster)
}

func UpdateClusterPre600(ctx goctx.Context) {
Context(
"UpdateClusterPre600", func() {
Expand Down
24 changes: 15 additions & 9 deletions test/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,16 +287,20 @@ func isClusterStateValid(
return false
}

// Validate status
statusToSpec, err := asdbv1.CopyStatusToSpec(&newCluster.Status.AerospikeClusterStatusSpec)
if err != nil {
pkgLog.Error(err, "Failed to copy spec in status", "err", err)
return false
}
// Do not compare status with spec if cluster reconciliation is paused
// `paused` flag only exists in the spec and not in the status.
if !asdbv1.GetBool(aeroCluster.Spec.Paused) {
// Validate status
statusToSpec, err := asdbv1.CopyStatusToSpec(&newCluster.Status.AerospikeClusterStatusSpec)
if err != nil {
pkgLog.Error(err, "Failed to copy spec in status", "err", err)
return false
}

if !reflect.DeepEqual(statusToSpec, &newCluster.Spec) {
pkgLog.Info("Cluster status is not matching the spec")
return false
if !reflect.DeepEqual(statusToSpec, &newCluster.Spec) {
pkgLog.Info("Cluster status is not matching the spec")
return false
}
}

// TODO: This is not valid for tests where maxUnavailablePods flag is used.
Expand All @@ -322,6 +326,8 @@ func isClusterStateValid(
aeroCluster.Spec.Image,
),
)

return false
}

if newCluster.Labels[asdbv1.AerospikeAPIVersionLabel] != asdbv1.AerospikeAPIVersion {
Expand Down

0 comments on commit 73b2d9b

Please sign in to comment.