From 65c7556a91049bb2fa257d12ceab728bf6a78de5 Mon Sep 17 00:00:00 2001 From: Abhisek Dwivedi Date: Tue, 19 Sep 2023 12:22:16 +0530 Subject: [PATCH 1/3] Added support for PDB and readiness --- api/v1/aerospikecluster_mutating_webhook.go | 10 +- api/v1/aerospikecluster_types.go | 10 +- api/v1/aerospikecluster_validating_webhook.go | 60 +++++++++--- api/v1/zz_generated.deepcopy.go | 10 ++ .../asdb.aerospike.com_aerospikeclusters.yaml | 16 ++++ ...rnetes-operator.clusterserviceversion.yaml | 9 +- config/rbac/role.yaml | 9 ++ controllers/aerospikecluster_controller.go | 1 + controllers/pod.go | 7 ++ controllers/poddistruptionbudget.go | 96 +++++++++++++++++++ controllers/reconciler.go | 11 +++ controllers/statefulset.go | 65 ++++++++++++- ..._aerospikeclusters.asdb.aerospike.com.yaml | 16 ++++ ...erospike-operator-manager-clusterrole.yaml | 9 ++ test/cluster_helper.go | 58 ++++++++++- test/poddisruptionbudget_test.go | 77 +++++++++++++++ test/sample_files_test.go | 2 +- test/utils.go | 5 + 18 files changed, 446 insertions(+), 25 deletions(-) create mode 100644 controllers/poddistruptionbudget.go create mode 100644 test/poddisruptionbudget_test.go diff --git a/api/v1/aerospikecluster_mutating_webhook.go b/api/v1/aerospikecluster_mutating_webhook.go index 4dc2f371c..ed6a44019 100644 --- a/api/v1/aerospikecluster_mutating_webhook.go +++ b/api/v1/aerospikecluster_mutating_webhook.go @@ -25,6 +25,7 @@ import ( "gomodules.xyz/jsonpatch/v2" v1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/webhook" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" @@ -69,6 +70,13 @@ func (c *AerospikeCluster) Default(operation v1.Operation) admission.Response { } func (c *AerospikeCluster) setDefaults(asLog logr.Logger) error { + // Set maxUnavailable default + if c.Spec.MaxUnavailable == nil { + // Set default maxUnavailable to 1 + maxUnavailable := intstr.FromInt(1) + c.Spec.MaxUnavailable = &maxUnavailable + } + // Set network defaults c.Spec.AerospikeNetworkPolicy.setDefaults(c.ObjectMeta.Namespace) @@ -504,7 +512,7 @@ func setDefaultNetworkConf( ) } // Override these sections - // TODO: These values lines will be replaces with runtime info by script in init-container + // TODO: These values lines will be replaced with runtime info by script in init-container // See if we can get better way to make template serviceDefaults := map[string]interface{}{} srvPort := GetServicePort(configSpec) diff --git a/api/v1/aerospikecluster_types.go b/api/v1/aerospikecluster_types.go index 089bed614..09195635d 100644 --- a/api/v1/aerospikecluster_types.go +++ b/api/v1/aerospikecluster_types.go @@ -37,6 +37,10 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability // Aerospike cluster size // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Cluster Size" Size int32 `json:"size"` + // MaxUnavailable is the percentage/number of pods that can be allowed to go down or unavailable before application + // disruption. This value is used to create PodDisruptionBudget. Defaults to 1. + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Max Unavailable" + MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"` // Aerospike server image // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Server Image" Image string `json:"image"` @@ -566,10 +570,12 @@ type AerospikeStorageSpec struct { //nolint:govet // for readability // AerospikeClusterStatusSpec captures the current status of the cluster. type AerospikeClusterStatusSpec struct { //nolint:govet // for readability // Aerospike cluster size - // +operator-sdk:csv:customresourcedefinitions:type=status,displayName="Cluster Size" Size int32 `json:"size,omitempty"` // Aerospike server image Image string `json:"image,omitempty"` + // MaxUnavailable is the percentage/number of pods that can be allowed to go down or unavailable before application + // disruption. This value is used to create PodDisruptionBudget. Defaults to 1. + MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"` // If set true then multiple pods can be created per Kubernetes Node. // This will create a NodePort service for each Pod. // NodePort, as the name implies, opens a specific port on all the Kubernetes Nodes , @@ -857,6 +863,7 @@ func CopySpecToStatus(spec *AerospikeClusterSpec) (*AerospikeClusterStatusSpec, status.Size = spec.Size status.Image = spec.Image + status.MaxUnavailable = spec.MaxUnavailable // Storage statusStorage := AerospikeStorageSpec{} @@ -946,6 +953,7 @@ func CopyStatusToSpec(status *AerospikeClusterStatusSpec) (*AerospikeClusterSpec spec.Size = status.Size spec.Image = status.Image + spec.MaxUnavailable = status.MaxUnavailable // Storage specStorage := AerospikeStorageSpec{} diff --git a/api/v1/aerospikecluster_validating_webhook.go b/api/v1/aerospikecluster_validating_webhook.go index 2cfab8d9e..f83e61fee 100644 --- a/api/v1/aerospikecluster_validating_webhook.go +++ b/api/v1/aerospikecluster_validating_webhook.go @@ -166,6 +166,11 @@ func (c *AerospikeCluster) validate(aslog logr.Logger) error { return fmt.Errorf("invalid cluster size 0") } + // Validate MaxUnavailable for PodDisruptionBudget + if err := c.validateMaxUnavailable(); err != nil { + return err + } + // Validate Image version version, err := GetImageVersion(c.Spec.Image) if err != nil { @@ -604,24 +609,11 @@ func (c *AerospikeCluster) validateRackConfig(_ logr.Logger) error { // Validate batch upgrade/restart param if c.Spec.RackConfig.RollingUpdateBatchSize != nil { - // Just validate if RollingUpdateBatchSize is valid number or string. - randomNumber := 100 - - count, err := intstr.GetScaledValueFromIntOrPercent( - c.Spec.RackConfig.RollingUpdateBatchSize, randomNumber, false, - ) - if err != nil { + if err := validateIntOrStringField(c.Spec.RackConfig.RollingUpdateBatchSize, + "spec.rackConfig.rollingUpdateBatchSize"); err != nil { return err } - // Only negative is not allowed. Any big number can be given. - if count < 0 { - return fmt.Errorf( - "can not use negative rackConfig.RollingUpdateBatchSize %s", - c.Spec.RackConfig.RollingUpdateBatchSize.String(), - ) - } - if len(c.Spec.RackConfig.Racks) < 2 { return fmt.Errorf("can not use rackConfig.RollingUpdateBatchSize when number of racks is less than two") } @@ -2175,3 +2167,41 @@ func (c *AerospikeCluster) validateNetworkPolicy(namespace string) error { return nil } + +func validateIntOrStringField(intOrStr *intstr.IntOrString, fieldPath string) error { + randomNumber := 100 + // Just validate if MaxUnavailable is valid number or string. + count, err := intstr.GetScaledValueFromIntOrPercent(intOrStr, randomNumber, false) + if err != nil { + return err + } + + // Only negative is not allowed. Any big number can be given. + if count < 0 { + return fmt.Errorf("can not use negative %s: %s", fieldPath, intOrStr.String()) + } + + if intOrStr.Type == intstr.String && count > 100 { + return fmt.Errorf("%s: %s must not be greater than 100 percent", fieldPath, intOrStr.String()) + } + + return nil +} + +func (c *AerospikeCluster) validateMaxUnavailable() error { + // safe check for corner cases when mutation webhook somehow didn't work + if c.Spec.MaxUnavailable == nil { + return fmt.Errorf("maxUnavailable cannot be nil. Mutation webhook didn't work") + } + + if err := validateIntOrStringField(c.Spec.MaxUnavailable, "spec.maxUnavailable"); err != nil { + return err + } + + // TODO: Do we need such types of check? Maybe > size/2 etc + if c.Spec.MaxUnavailable.IntValue() > int(c.Spec.Size) { + return fmt.Errorf("maxUnavailable %s cannot be greater than size", c.Spec.MaxUnavailable.String()) + } + + return nil +} diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index a68fa7d32..81959680c 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -154,6 +154,11 @@ func (in *AerospikeClusterList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AerospikeClusterSpec) DeepCopyInto(out *AerospikeClusterSpec) { *out = *in + if in.MaxUnavailable != nil { + in, out := &in.MaxUnavailable, &out.MaxUnavailable + *out = new(intstr.IntOrString) + **out = **in + } in.Storage.DeepCopyInto(&out.Storage) if in.AerospikeAccessControl != nil { in, out := &in.AerospikeAccessControl, &out.AerospikeAccessControl @@ -221,6 +226,11 @@ func (in *AerospikeClusterStatus) DeepCopy() *AerospikeClusterStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AerospikeClusterStatusSpec) DeepCopyInto(out *AerospikeClusterStatusSpec) { *out = *in + if in.MaxUnavailable != nil { + in, out := &in.MaxUnavailable, &out.MaxUnavailable + *out = new(intstr.IntOrString) + **out = **in + } in.Storage.DeepCopyInto(&out.Storage) if in.AerospikeAccessControl != nil { in, out := &in.AerospikeAccessControl, &out.AerospikeAccessControl diff --git a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml index 65a697cac..321f14bfd 100644 --- a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml +++ b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml @@ -281,6 +281,14 @@ spec: image: description: Aerospike server image type: string + maxUnavailable: + anyOf: + - type: integer + - type: string + description: MaxUnavailable is the percentage/number of pods that + can be allowed to go down or unavailable before application disruption. + This value is used to create PodDisruptionBudget. Defaults to 1. + x-kubernetes-int-or-string: true operatorClientCert: description: Certificates to connect to Aerospike. properties: @@ -8830,6 +8838,14 @@ spec: image: description: Aerospike server image type: string + maxUnavailable: + anyOf: + - type: integer + - type: string + description: MaxUnavailable is the percentage/number of pods that + can be allowed to go down or unavailable before application disruption. + This value is used to create PodDisruptionBudget. Defaults to 1. + x-kubernetes-int-or-string: true multiPodPerHost: description: "If set true then multiple pods can be created per Kubernetes Node. This will create a NodePort service for each Pod. NodePort, diff --git a/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml b/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml index 53aa7c3f5..46d3ff67e 100644 --- a/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml +++ b/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml @@ -50,6 +50,11 @@ spec: - description: Aerospike server image displayName: Server Image path: image + - description: MaxUnavailable is the percentage/number of pods that can be allowed + to go down or unavailable before application disruption. This value is used + to create PodDisruptionBudget. Defaults to 1. + displayName: Max Unavailable + path: maxUnavailable - description: Certificates to connect to Aerospike. displayName: Operator Client Cert path: operatorClientCert @@ -74,10 +79,6 @@ spec: resource. displayName: Validation Policy path: validationPolicy - statusDescriptors: - - description: Aerospike cluster size - displayName: Cluster Size - path: size version: v1 - description: AerospikeCluster is the schema for the AerospikeCluster API displayName: Aerospike Cluster diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 9fcc1ea4f..841e8c24e 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -112,3 +112,12 @@ rules: - patch - update - watch +- apiGroups: + - policy + resources: + - poddisruptionbudgets + verbs: + - create + - get + - patch + - update diff --git a/controllers/aerospikecluster_controller.go b/controllers/aerospikecluster_controller.go index 2c2f8f441..701e9b9a4 100644 --- a/controllers/aerospikecluster_controller.go +++ b/controllers/aerospikecluster_controller.go @@ -86,6 +86,7 @@ type RackState struct { // +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=secrets,verbs=get // +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=get;create;update;patch //nolint:lll // marker // +kubebuilder:rbac:groups=asdb.aerospike.com,resources=aerospikeclusters,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=asdb.aerospike.com,resources=aerospikeclusters/status,verbs=get;update;patch diff --git a/controllers/pod.go b/controllers/pod.go index 8574b3c1c..7946d1644 100644 --- a/controllers/pod.go +++ b/controllers/pod.go @@ -142,6 +142,13 @@ func (r *SingleClusterReconciler) getRollingRestartTypePod( r.Log.Info("Aerospike rack storage changed. Need rolling restart") } + if r.isReadinessPortUpdated(pod) { + restartType = mergeRestartType(restartType, podRestart) + + r.Log.Info("Aerospike readiness tcp port changed. Need rolling restart", + "newTCPPort", r.getReadinessProbe().TCPSocket.String()) + } + return restartType } diff --git a/controllers/poddistruptionbudget.go b/controllers/poddistruptionbudget.go new file mode 100644 index 000000000..1b78e0b2a --- /dev/null +++ b/controllers/poddistruptionbudget.go @@ -0,0 +1,96 @@ +package controllers + +import ( + "context" + "fmt" + + v1 "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + "github.com/aerospike/aerospike-kubernetes-operator/pkg/utils" +) + +func (r *SingleClusterReconciler) createOrUpdatePDB() error { + ls := utils.LabelsForAerospikeCluster(r.aeroCluster.Name) + pdb := &v1.PodDisruptionBudget{} + + err := r.Client.Get( + context.TODO(), types.NamespacedName{ + Name: r.aeroCluster.Name, Namespace: r.aeroCluster.Namespace, + }, pdb, + ) + if err != nil { + if !errors.IsNotFound(err) { + return err + } + + r.Log.Info("Create PodDisruptionBudget") + + pdb = &v1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: r.aeroCluster.Name, + Namespace: r.aeroCluster.Namespace, + Labels: ls, + }, + Spec: v1.PodDisruptionBudgetSpec{ + MaxUnavailable: r.aeroCluster.Spec.MaxUnavailable, + Selector: &metav1.LabelSelector{ + MatchLabels: ls, + }, + }, + } + + // This will be true only for old existing CRs. For new operator versions, this field will be + // set by default to 1 by mutating webhook. + if pdb.Spec.MaxUnavailable == nil { + maxUnavailable := intstr.FromInt(1) + pdb.Spec.MaxUnavailable = &maxUnavailable + } + + // Set AerospikeCluster instance as the owner and controller + err = controllerutil.SetControllerReference( + r.aeroCluster, pdb, r.Scheme, + ) + if err != nil { + return err + } + + if err = r.Client.Create( + context.TODO(), pdb, createOption, + ); err != nil { + return fmt.Errorf( + "failed to create PodDisruptionBudget: %v", + err, + ) + } + + r.Log.Info("Created new PodDisruptionBudget") + + return nil + } + + r.Log.Info( + "PodDisruptionBudget already exist. Updating existing PodDisruptionBudget if required", "name", + utils.NamespacedName(r.aeroCluster.Namespace, r.aeroCluster.Name), + ) + + if pdb.Spec.MaxUnavailable.String() != r.aeroCluster.Spec.MaxUnavailable.String() { + pdb.Spec.MaxUnavailable = r.aeroCluster.Spec.MaxUnavailable + if err = r.Client.Update( + context.TODO(), pdb, updateOption, + ); err != nil { + return fmt.Errorf( + "failed to update PodDisruptionBudget: %v", + err, + ) + } + + r.Log.Info("Updated PodDisruptionBudget") + } + + return nil +} diff --git a/controllers/reconciler.go b/controllers/reconciler.go index 050c0123b..af344e4f3 100644 --- a/controllers/reconciler.go +++ b/controllers/reconciler.go @@ -113,6 +113,17 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { return res.getResult() } + if err := r.createOrUpdatePDB(); err != nil { + r.Log.Error(err, "Failed to create PodDisruptionBudget") + r.Recorder.Eventf( + r.aeroCluster, corev1.EventTypeWarning, "PodDisruptionBudgetCreateFailed", + "Failed to create PodDisruptionBudget %s/%s", + r.aeroCluster.Namespace, r.aeroCluster.Name, + ) + + return reconcile.Result{}, err + } + if err := r.createOrUpdateSTSLoadBalancerSvc(); err != nil { r.Log.Error(err, "Failed to create LoadBalancer service") r.Recorder.Eventf( diff --git a/controllers/statefulset.go b/controllers/statefulset.go index b4247c9b8..db74c5e77 100644 --- a/controllers/statefulset.go +++ b/controllers/statefulset.go @@ -15,6 +15,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -162,9 +163,9 @@ func (r *SingleClusterReconciler) createSTS( Ports: ports, Env: envVarList, VolumeMounts: getDefaultAerospikeContainerVolumeMounts(), + ReadinessProbe: r.getReadinessProbe(), }, }, - Volumes: getDefaultSTSVolumes(r.aeroCluster, rackState), }, }, @@ -201,6 +202,51 @@ func (r *SingleClusterReconciler) createSTS( return r.getSTS(rackState) } +func (r *SingleClusterReconciler) getReadinessProbe() *corev1.Probe { + var readinessPort *int + + if _, tlsPort := asdbv1.GetTLSNameAndPort(r.aeroCluster.Spec.AerospikeConfig, asdbv1.ServicePortName); tlsPort != nil { + readinessPort = tlsPort + } else { + readinessPort = asdbv1.GetServicePort(r.aeroCluster.Spec.AerospikeConfig) + } + + return &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{ + Type: intstr.Int, + IntVal: int32(*readinessPort), + }, + }, + }, + InitialDelaySeconds: 2, + PeriodSeconds: 5, + } +} + +func (r *SingleClusterReconciler) isReadinessPortUpdated(pod *corev1.Pod) bool { + for idx := range pod.Spec.Containers { + container := &pod.Spec.Containers[idx] + + if container.Name != asdbv1.AerospikeServerContainerName { + continue + } + + // ignore if readiness probe is not set. Avoid rolling restart for old versions of operator + if container.ReadinessProbe == nil { + return false + } + + if container.ReadinessProbe.TCPSocket != nil && + container.ReadinessProbe.TCPSocket.String() != r.getReadinessProbe().TCPSocket.String() { + return true + } + } + + return false +} + func (r *SingleClusterReconciler) deleteSTS(st *appsv1.StatefulSet) error { r.Log.Info("Delete statefulset") // No need to do cleanup pods after deleting sts @@ -591,6 +637,9 @@ func (r *SingleClusterReconciler) updateSTS( // Container. r.updateContainerImages(statefulSet) + // Updates the readiness probe TCP Port if changed for the aerospike server container + r.updateReadinessProbe(statefulSet) + // This should be called before updating storage r.initializeSTSStorage(statefulSet, rackState) @@ -1090,6 +1139,20 @@ func (r *SingleClusterReconciler) updateContainerImages(statefulset *appsv1.Stat updateImage(statefulset.Spec.Template.Spec.InitContainers) } +func (r *SingleClusterReconciler) updateReadinessProbe(statefulSet *appsv1.StatefulSet) { + for idx := range statefulSet.Spec.Template.Spec.Containers { + container := &statefulSet.Spec.Template.Spec.Containers[idx] + + if container.Name != asdbv1.AerospikeServerContainerName { + continue + } + + container.ReadinessProbe = r.getReadinessProbe() + + break + } +} + func (r *SingleClusterReconciler) updateAerospikeInitContainerImage(statefulSet *appsv1.StatefulSet) error { for idx := range statefulSet.Spec.Template.Spec.InitContainers { container := &statefulSet.Spec.Template.Spec.InitContainers[idx] diff --git a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml index 65a697cac..321f14bfd 100644 --- a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml +++ b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml @@ -281,6 +281,14 @@ spec: image: description: Aerospike server image type: string + maxUnavailable: + anyOf: + - type: integer + - type: string + description: MaxUnavailable is the percentage/number of pods that + can be allowed to go down or unavailable before application disruption. + This value is used to create PodDisruptionBudget. Defaults to 1. + x-kubernetes-int-or-string: true operatorClientCert: description: Certificates to connect to Aerospike. properties: @@ -8830,6 +8838,14 @@ spec: image: description: Aerospike server image type: string + maxUnavailable: + anyOf: + - type: integer + - type: string + description: MaxUnavailable is the percentage/number of pods that + can be allowed to go down or unavailable before application disruption. + This value is used to create PodDisruptionBudget. Defaults to 1. + x-kubernetes-int-or-string: true multiPodPerHost: description: "If set true then multiple pods can be created per Kubernetes Node. This will create a NodePort service for each Pod. NodePort, diff --git a/helm-charts/aerospike-kubernetes-operator/templates/aerospike-operator-manager-clusterrole.yaml b/helm-charts/aerospike-kubernetes-operator/templates/aerospike-operator-manager-clusterrole.yaml index c379e7f0e..adf721767 100644 --- a/helm-charts/aerospike-kubernetes-operator/templates/aerospike-operator-manager-clusterrole.yaml +++ b/helm-charts/aerospike-kubernetes-operator/templates/aerospike-operator-manager-clusterrole.yaml @@ -116,4 +116,13 @@ rules: - patch - update - watch +- apiGroups: + - policy + resources: + - poddisruptionbudgets + verbs: + - create + - get + - patch + - update {{- end }} diff --git a/test/cluster_helper.go b/test/cluster_helper.go index 945b871a9..c18b1a2a9 100644 --- a/test/cluster_helper.go +++ b/test/cluster_helper.go @@ -4,6 +4,7 @@ import ( goctx "context" "errors" "fmt" + "reflect" "strconv" "time" @@ -15,6 +16,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/client" @@ -85,6 +87,12 @@ func rollingRestartClusterByEnablingTLS( return err } + // Port should be changed to service tls-port + err = validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceTLSPort) + if err != nil { + return err + } + network := aeroCluster.Spec.AerospikeConfig.Value["network"].(map[string]interface{}) serviceNetwork := network[asdbv1.ServicePortName].(map[string]interface{}) fabricNetwork := network[asdbv1.FabricPortName].(map[string]interface{}) @@ -99,7 +107,12 @@ func rollingRestartClusterByEnablingTLS( network[asdbv1.HeartbeatPortName] = heartbeartNetwork aeroCluster.Spec.AerospikeConfig.Value["network"] = network - return updateCluster(k8sClient, ctx, aeroCluster) + err = updateCluster(k8sClient, ctx, aeroCluster) + if err != nil { + return err + } + + return validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceTLSPort) } func rollingRestartClusterByDisablingTLS( @@ -128,10 +141,22 @@ func rollingRestartClusterByDisablingTLS( return err } + // port should remain same i.e. service tls-port + err = validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceTLSPort) + if err != nil { + return err + } + aeroCluster.Spec.AerospikeConfig.Value["network"] = getNetworkConfig() aeroCluster.Spec.OperatorClientCertSpec = nil - return updateCluster(k8sClient, ctx, aeroCluster) + err = updateCluster(k8sClient, ctx, aeroCluster) + if err != nil { + return err + } + + // Port should be updated to service non-tls port + return validateReadinessProbe(ctx, k8sClient, aeroCluster, servicePort) } func scaleUpClusterTestWithNSDeviceHandling( @@ -542,6 +567,35 @@ func validateMigrateFillDelay( return err } +// validate readiness port +func validateReadinessProbe(ctx goctx.Context, k8sClient client.Client, aeroCluster *asdbv1.AerospikeCluster, + requiredPort int) error { + for podName := range aeroCluster.Status.Pods { + pod := &corev1.Pod{} + if err := k8sClient.Get(ctx, types.NamespacedName{ + Namespace: aeroCluster.Namespace, + Name: podName, + }, pod); err != nil { + return err + } + + for idx := range pod.Spec.Containers { + container := &pod.Spec.Containers[idx] + + if container.Name != asdbv1.AerospikeServerContainerName { + continue + } + + if !reflect.DeepEqual(container.ReadinessProbe.TCPSocket.Port, intstr.FromInt(requiredPort)) { + return fmt.Errorf("readiness tcp port mismatch, expected: %v, found: %v", + intstr.FromInt(requiredPort), container.ReadinessProbe.TCPSocket.Port) + } + } + } + + return nil +} + func upgradeClusterTest( k8sClient client.Client, ctx goctx.Context, clusterNamespacedName types.NamespacedName, image string, diff --git a/test/poddisruptionbudget_test.go b/test/poddisruptionbudget_test.go new file mode 100644 index 000000000..dab2e1592 --- /dev/null +++ b/test/poddisruptionbudget_test.go @@ -0,0 +1,77 @@ +package test + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + policyv1 "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + + asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" +) + +var _ = Describe( + "PodDisruptionBudget", func() { + ctx := context.TODO() + aeroCluster := &asdbv1.AerospikeCluster{} + maxAvailable := intstr.FromInt(2) + clusterNamespacedName := getNamespacedName("pdb-test-cluster", namespace) + + BeforeEach(func() { + aeroCluster = createDummyAerospikeCluster( + clusterNamespacedName, 2, + ) + }) + + AfterEach(func() { + Expect(deleteCluster(k8sClient, ctx, aeroCluster)).NotTo(HaveOccurred()) + }) + + It("Validate create PDB with default maxUnavailable", func() { + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 1) + }) + + It("Validate create PDB with specified maxUnavailable", func() { + aeroCluster.Spec.MaxUnavailable = &maxAvailable + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 2) + }) + + It("Validate update PDB", func() { + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 1) + + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + // Update maxUnavailable + By("Update maxUnavailable to 2") + aeroCluster.Spec.MaxUnavailable = &maxAvailable + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 2) + }) + }) + +func validatePDB(ctx context.Context, aerocluster *asdbv1.AerospikeCluster, expectedMaxUnavailable int) { + pdb := policyv1.PodDisruptionBudget{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Namespace: aerocluster.Namespace, + Name: aerocluster.Name, + }, &pdb) + Expect(err).ToNot(HaveOccurred()) + + // Validate PDB + Expect(pdb.Spec.MaxUnavailable.IntValue()).To(Equal(expectedMaxUnavailable)) + Expect(pdb.Status.ExpectedPods).To(Equal(aerocluster.Spec.Size)) + Expect(pdb.Status.CurrentHealthy).To(Equal(aerocluster.Spec.Size)) + Expect(pdb.Status.DisruptionsAllowed).To(Equal(int32(expectedMaxUnavailable))) + Expect(pdb.Status.DesiredHealthy).To(Equal(aerocluster.Spec.Size - int32(expectedMaxUnavailable))) +} diff --git a/test/sample_files_test.go b/test/sample_files_test.go index f0cecea23..18f638fdd 100644 --- a/test/sample_files_test.go +++ b/test/sample_files_test.go @@ -139,7 +139,7 @@ func getSamplesFiles() ([]string, error) { } // Files/Dirs ignored are: - // 1.PMEM sample file as hardware is not available + // 1. PMEM sample file as hardware is not available // 2. XDR related files as they are separately tested // 3. All files which are not CR samples if strings.Contains(path, "pmem_cluster_cr.yaml") || strings.Contains(path, "xdr_") || diff --git a/test/utils.go b/test/utils.go index 955b4d723..1c8223535 100644 --- a/test/utils.go +++ b/test/utils.go @@ -35,6 +35,11 @@ var ( pkgLog = ctrl.Log.WithName("test") ) +const ( + servicePort = 3000 + serviceTLSPort = 4333 +) + var secrets map[string][]byte var cacertSecrets map[string][]byte From 0597eb205dd044ca1df849b46048f9f7ea223284 Mon Sep 17 00:00:00 2001 From: Abhisek Dwivedi Date: Fri, 22 Sep 2023 13:11:58 +0530 Subject: [PATCH 2/3] Added support for PDB and readiness --- api/v1/aerospikecluster_mutating_webhook.go | 3 +- api/v1/aerospikecluster_validating_webhook.go | 41 +++++++++- controllers/aerospikecluster_controller.go | 30 +++++++ controllers/poddistruptionbudget.go | 80 ++++++++++++------- test/cluster_helper.go | 12 ++- test/cluster_test.go | 7 ++ test/utils.go | 5 -- 7 files changed, 140 insertions(+), 38 deletions(-) diff --git a/api/v1/aerospikecluster_mutating_webhook.go b/api/v1/aerospikecluster_mutating_webhook.go index ed6a44019..41e79ebf4 100644 --- a/api/v1/aerospikecluster_mutating_webhook.go +++ b/api/v1/aerospikecluster_mutating_webhook.go @@ -70,9 +70,8 @@ func (c *AerospikeCluster) Default(operation v1.Operation) admission.Response { } func (c *AerospikeCluster) setDefaults(asLog logr.Logger) error { - // Set maxUnavailable default + // Set maxUnavailable default to 1 if c.Spec.MaxUnavailable == nil { - // Set default maxUnavailable to 1 maxUnavailable := intstr.FromInt(1) c.Spec.MaxUnavailable = &maxUnavailable } diff --git a/api/v1/aerospikecluster_validating_webhook.go b/api/v1/aerospikecluster_validating_webhook.go index f83e61fee..8642549f1 100644 --- a/api/v1/aerospikecluster_validating_webhook.go +++ b/api/v1/aerospikecluster_validating_webhook.go @@ -2198,9 +2198,44 @@ func (c *AerospikeCluster) validateMaxUnavailable() error { return err } - // TODO: Do we need such types of check? Maybe > size/2 etc - if c.Spec.MaxUnavailable.IntValue() > int(c.Spec.Size) { - return fmt.Errorf("maxUnavailable %s cannot be greater than size", c.Spec.MaxUnavailable.String()) + maxUnavailable := int(c.Spec.Size) + + // If Size is 1, then ignore it for maxUnavailable calculation as it will anyway result in data loss + if maxUnavailable == 1 { + return nil + } + + for idx := range c.Spec.RackConfig.Racks { + rack := &c.Spec.RackConfig.Racks[idx] + nsList := rack.AerospikeConfig.Value["namespaces"].([]interface{}) + + for _, nsInterface := range nsList { + rfInterface, exists := nsInterface.(map[string]interface{})["replication-factor"] + if !exists { + // Default RF is 2 if not given + maxUnavailable = 2 + continue + } + + rf, err := GetIntType(rfInterface) + if err != nil { + return fmt.Errorf("namespace replication-factor %v", err) + } + + // If RF is 1, then ignore it for maxUnavailable calculation as it will anyway result in data loss + if rf == 1 { + continue + } + + if rf < maxUnavailable { + maxUnavailable = rf + } + } + } + + if c.Spec.MaxUnavailable.IntValue() >= maxUnavailable { + return fmt.Errorf("maxUnavailable %s cannot be greater than or equal to %v", + c.Spec.MaxUnavailable.String(), maxUnavailable) } return nil diff --git a/controllers/aerospikecluster_controller.go b/controllers/aerospikecluster_controller.go index 701e9b9a4..d23cc9956 100644 --- a/controllers/aerospikecluster_controller.go +++ b/controllers/aerospikecluster_controller.go @@ -6,8 +6,11 @@ import ( "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" + policyv1 "k8s.io/api/policy/v1" + policyv1beta1 "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/errors" k8sRuntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" @@ -37,6 +40,8 @@ var ( } ) +var PDBbGvk = policyv1beta1.SchemeGroupVersion.WithKind("PodDisruptionBudget") + // AerospikeClusterReconciler reconciles AerospikeClusters type AerospikeClusterReconciler struct { client.Client @@ -49,6 +54,10 @@ type AerospikeClusterReconciler struct { // SetupWithManager sets up the controller with the Manager func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { + if err := r.setupPdbAPI(r.KubeConfig); err != nil { + return err + } + return ctrl.NewControllerManagedBy(mgr). For(&asdbv1.AerospikeCluster{}). Owns( @@ -72,6 +81,27 @@ func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } +// setupPdbAPI sets up the pdb api version to use as per the k8s version. +// TODO: Move to v1 when minimum supported k8s version is 1.21 +func (r *AerospikeClusterReconciler) setupPdbAPI(config *rest.Config) error { + discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(config) + + resources, err := discoveryClient.ServerResourcesForGroupVersion("policy/v1") + if err != nil { + r.Log.Info("Could not get ServerResourcesForGroupVersion for policy/v1, falling back to policy/v1beta1") + return nil + } + + for i := range resources.APIResources { + if resources.APIResources[i].Kind == "PodDisruptionBudget" { + PDBbGvk = policyv1.SchemeGroupVersion.WithKind("PodDisruptionBudget") + return nil + } + } + + return nil +} + // RackState contains the rack configuration and rack size. type RackState struct { Rack *asdbv1.Rack diff --git a/controllers/poddistruptionbudget.go b/controllers/poddistruptionbudget.go index 1b78e0b2a..580a390a0 100644 --- a/controllers/poddistruptionbudget.go +++ b/controllers/poddistruptionbudget.go @@ -3,54 +3,66 @@ package controllers import ( "context" "fmt" + "strconv" - v1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" "github.com/aerospike/aerospike-kubernetes-operator/pkg/utils" ) func (r *SingleClusterReconciler) createOrUpdatePDB() error { + podList, err := r.getClusterPodList() + if err != nil { + return err + } + + for podIdx := range podList.Items { + pod := &podList.Items[podIdx] + + for containerIdx := range pod.Spec.Containers { + if pod.Spec.Containers[containerIdx].Name != asdbv1.AerospikeServerContainerName { + continue + } + + if pod.Spec.Containers[containerIdx].ReadinessProbe == nil { + r.Log.Info("Pod found without ReadinessProbe, skipping PodDisruptionBudget", + "name", pod.Name) + return nil + } + } + } + ls := utils.LabelsForAerospikeCluster(r.aeroCluster.Name) - pdb := &v1.PodDisruptionBudget{} + // TODO: Move to concrete object when minimum supported k8s version is 1.21 + pdb := &unstructured.Unstructured{} + pdb.SetGroupVersionKind(PDBbGvk) - err := r.Client.Get( + if err := r.Client.Get( context.TODO(), types.NamespacedName{ Name: r.aeroCluster.Name, Namespace: r.aeroCluster.Namespace, }, pdb, - ) - if err != nil { + ); err != nil { if !errors.IsNotFound(err) { return err } r.Log.Info("Create PodDisruptionBudget") - pdb = &v1.PodDisruptionBudget{ - ObjectMeta: metav1.ObjectMeta{ - Name: r.aeroCluster.Name, - Namespace: r.aeroCluster.Namespace, - Labels: ls, - }, - Spec: v1.PodDisruptionBudgetSpec{ - MaxUnavailable: r.aeroCluster.Spec.MaxUnavailable, - Selector: &metav1.LabelSelector{ - MatchLabels: ls, - }, + pdb.SetName(r.aeroCluster.Name) + pdb.SetNamespace(r.aeroCluster.Namespace) + pdb.SetLabels(ls) + pdb.Object["spec"] = map[string]interface{}{ + "maxUnavailable": r.aeroCluster.Spec.MaxUnavailable, + "selector": &metav1.LabelSelector{ + MatchLabels: ls, }, } - // This will be true only for old existing CRs. For new operator versions, this field will be - // set by default to 1 by mutating webhook. - if pdb.Spec.MaxUnavailable == nil { - maxUnavailable := intstr.FromInt(1) - pdb.Spec.MaxUnavailable = &maxUnavailable - } - // Set AerospikeCluster instance as the owner and controller err = controllerutil.SetControllerReference( r.aeroCluster, pdb, r.Scheme, @@ -78,9 +90,23 @@ func (r *SingleClusterReconciler) createOrUpdatePDB() error { utils.NamespacedName(r.aeroCluster.Namespace, r.aeroCluster.Name), ) - if pdb.Spec.MaxUnavailable.String() != r.aeroCluster.Spec.MaxUnavailable.String() { - pdb.Spec.MaxUnavailable = r.aeroCluster.Spec.MaxUnavailable - if err = r.Client.Update( + var value string + + maxUnavailable := pdb.Object["spec"].(map[string]interface{})["maxUnavailable"] + + // Type casting is required because of unstructured object + if val, ok := maxUnavailable.(string); ok { + value = val + } else { + value = strconv.Itoa(int(maxUnavailable.(int64))) + } + + if value != r.aeroCluster.Spec.MaxUnavailable.String() { + spec := pdb.Object["spec"].(map[string]interface{}) + spec["maxUnavailable"] = r.aeroCluster.Spec.MaxUnavailable + pdb.Object["spec"] = spec + + if err := r.Client.Update( context.TODO(), pdb, updateOption, ); err != nil { return fmt.Errorf( diff --git a/test/cluster_helper.go b/test/cluster_helper.go index c18b1a2a9..67a1fe849 100644 --- a/test/cluster_helper.go +++ b/test/cluster_helper.go @@ -112,6 +112,11 @@ func rollingRestartClusterByEnablingTLS( return err } + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + if err != nil { + return err + } + return validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceTLSPort) } @@ -155,8 +160,13 @@ func rollingRestartClusterByDisablingTLS( return err } + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + if err != nil { + return err + } + // Port should be updated to service non-tls port - return validateReadinessProbe(ctx, k8sClient, aeroCluster, servicePort) + return validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceNonTLSPort) } func scaleUpClusterTestWithNSDeviceHandling( diff --git a/test/cluster_test.go b/test/cluster_test.go index 6b0a1293a..d0c1b3663 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -150,6 +150,13 @@ func DeployClusterForAllImagesPost490(ctx goctx.Context) { err = deployCluster(k8sClient, ctx, aeroCluster) Expect(err).ToNot(HaveOccurred()) + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + By("Validating Readiness probe") + err = validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceTLSPort) + Expect(err).ToNot(HaveOccurred()) + _ = deleteCluster(k8sClient, ctx, aeroCluster) }, ) diff --git a/test/utils.go b/test/utils.go index 1c8223535..955b4d723 100644 --- a/test/utils.go +++ b/test/utils.go @@ -35,11 +35,6 @@ var ( pkgLog = ctrl.Log.WithName("test") ) -const ( - servicePort = 3000 - serviceTLSPort = 4333 -) - var secrets map[string][]byte var cacertSecrets map[string][]byte From 24c1e81fda9a42c95d1874a5da25a8eae732fcd0 Mon Sep 17 00:00:00 2001 From: Abhisek Dwivedi Date: Thu, 19 Oct 2023 11:06:38 +0530 Subject: [PATCH 3/3] Added test-cases --- test/poddisruptionbudget_test.go | 68 +++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/test/poddisruptionbudget_test.go b/test/poddisruptionbudget_test.go index dab2e1592..a864fbade 100644 --- a/test/poddisruptionbudget_test.go +++ b/test/poddisruptionbudget_test.go @@ -16,7 +16,7 @@ var _ = Describe( "PodDisruptionBudget", func() { ctx := context.TODO() aeroCluster := &asdbv1.AerospikeCluster{} - maxAvailable := intstr.FromInt(2) + maxAvailable := intstr.FromInt(0) clusterNamespacedName := getNamespacedName("pdb-test-cluster", namespace) BeforeEach(func() { @@ -29,34 +29,54 @@ var _ = Describe( Expect(deleteCluster(k8sClient, ctx, aeroCluster)).NotTo(HaveOccurred()) }) - It("Validate create PDB with default maxUnavailable", func() { - err := deployCluster(k8sClient, ctx, aeroCluster) - Expect(err).ToNot(HaveOccurred()) - validatePDB(ctx, aeroCluster, 1) - }) + Context("Valid Operations", func() { + It("Validate create PDB with default maxUnavailable", func() { + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 1) + }) - It("Validate create PDB with specified maxUnavailable", func() { - aeroCluster.Spec.MaxUnavailable = &maxAvailable - err := deployCluster(k8sClient, ctx, aeroCluster) - Expect(err).ToNot(HaveOccurred()) - validatePDB(ctx, aeroCluster, 2) - }) + It("Validate create PDB with specified maxUnavailable", func() { + aeroCluster.Spec.MaxUnavailable = &maxAvailable + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 0) + }) + + It("Validate update PDB", func() { + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 1) - It("Validate update PDB", func() { - err := deployCluster(k8sClient, ctx, aeroCluster) - Expect(err).ToNot(HaveOccurred()) - validatePDB(ctx, aeroCluster, 1) + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + // Update maxUnavailable + By("Update maxUnavailable to 0") + aeroCluster.Spec.MaxUnavailable = &maxAvailable + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 0) + }) + }) - aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) - Expect(err).ToNot(HaveOccurred()) + Context("Invalid Operations", func() { + value := intstr.FromInt(3) - // Update maxUnavailable - By("Update maxUnavailable to 2") - aeroCluster.Spec.MaxUnavailable = &maxAvailable + It("Should fail if maxUnavailable is greater than size", func() { + aeroCluster.Spec.MaxUnavailable = &value + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }) - err = updateCluster(k8sClient, ctx, aeroCluster) - Expect(err).ToNot(HaveOccurred()) - validatePDB(ctx, aeroCluster, 2) + It("Should fail if maxUnavailable is greater than RF", func() { + aeroCluster.Spec.Size = 3 + value := intstr.FromInt(3) + aeroCluster.Spec.MaxUnavailable = &value + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }) }) })