Skip to content

Commit

Permalink
Added support for PDB and readiness
Browse files Browse the repository at this point in the history
  • Loading branch information
abhishekdwivedi3060 committed Dec 6, 2023
1 parent f2cc4b4 commit 2346ff9
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 38 deletions.
3 changes: 1 addition & 2 deletions api/v1/aerospikecluster_mutating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ func (c *AerospikeCluster) Default(operation v1.Operation) admission.Response {
}

func (c *AerospikeCluster) setDefaults(asLog logr.Logger) error {
// Set maxUnavailable default
// Set maxUnavailable default to 1
if c.Spec.MaxUnavailable == nil {
// Set default maxUnavailable to 1
maxUnavailable := intstr.FromInt(1)
c.Spec.MaxUnavailable = &maxUnavailable
}
Expand Down
41 changes: 38 additions & 3 deletions api/v1/aerospikecluster_validating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2206,9 +2206,44 @@ func (c *AerospikeCluster) validateMaxUnavailable() error {
return err
}

// TODO: Do we need such types of check? Maybe > size/2 etc
if c.Spec.MaxUnavailable.IntValue() > int(c.Spec.Size) {
return fmt.Errorf("maxUnavailable %s cannot be greater than size", c.Spec.MaxUnavailable.String())
maxUnavailable := int(c.Spec.Size)

// If Size is 1, then ignore it for maxUnavailable calculation as it will anyway result in data loss
if maxUnavailable == 1 {
return nil
}

for idx := range c.Spec.RackConfig.Racks {
rack := &c.Spec.RackConfig.Racks[idx]
nsList := rack.AerospikeConfig.Value["namespaces"].([]interface{})

for _, nsInterface := range nsList {
rfInterface, exists := nsInterface.(map[string]interface{})["replication-factor"]
if !exists {
// Default RF is 2 if not given
maxUnavailable = 2
continue
}

rf, err := GetIntType(rfInterface)
if err != nil {
return fmt.Errorf("namespace replication-factor %v", err)
}

// If RF is 1, then ignore it for maxUnavailable calculation as it will anyway result in data loss
if rf == 1 {
continue
}

if rf < maxUnavailable {
maxUnavailable = rf
}
}
}

if c.Spec.MaxUnavailable.IntValue() >= maxUnavailable {
return fmt.Errorf("maxUnavailable %s cannot be greater than or equal to %v",
c.Spec.MaxUnavailable.String(), maxUnavailable)
}

return nil
Expand Down
30 changes: 30 additions & 0 deletions controllers/aerospikecluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
k8sRuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -37,6 +40,8 @@ var (
}
)

var PDBbGvk = policyv1beta1.SchemeGroupVersion.WithKind("PodDisruptionBudget")

// AerospikeClusterReconciler reconciles AerospikeClusters
type AerospikeClusterReconciler struct {
client.Client
Expand All @@ -49,6 +54,10 @@ type AerospikeClusterReconciler struct {

// SetupWithManager sets up the controller with the Manager
func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
if err := r.setupPdbAPI(r.KubeConfig); err != nil {
return err
}

return ctrl.NewControllerManagedBy(mgr).
For(&asdbv1.AerospikeCluster{}).
Owns(
Expand All @@ -72,6 +81,27 @@ func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

// setupPdbAPI sets up the pdb api version to use as per the k8s version.
// TODO: Move to v1 when minimum supported k8s version is 1.21
func (r *AerospikeClusterReconciler) setupPdbAPI(config *rest.Config) error {
discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(config)

resources, err := discoveryClient.ServerResourcesForGroupVersion("policy/v1")
if err != nil {
r.Log.Info("Could not get ServerResourcesForGroupVersion for policy/v1, falling back to policy/v1beta1")
return nil
}

for i := range resources.APIResources {
if resources.APIResources[i].Kind == "PodDisruptionBudget" {
PDBbGvk = policyv1.SchemeGroupVersion.WithKind("PodDisruptionBudget")
return nil
}
}

return nil
}

// RackState contains the rack configuration and rack size.
type RackState struct {
Rack *asdbv1.Rack
Expand Down
80 changes: 53 additions & 27 deletions controllers/poddistruptionbudget.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,66 @@ package controllers
import (
"context"
"fmt"
"strconv"

v1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1"
"github.com/aerospike/aerospike-kubernetes-operator/pkg/utils"
)

func (r *SingleClusterReconciler) createOrUpdatePDB() error {
podList, err := r.getClusterPodList()
if err != nil {
return err
}

for podIdx := range podList.Items {
pod := &podList.Items[podIdx]

for containerIdx := range pod.Spec.Containers {
if pod.Spec.Containers[containerIdx].Name != asdbv1.AerospikeServerContainerName {
continue
}

if pod.Spec.Containers[containerIdx].ReadinessProbe == nil {
r.Log.Info("Pod found without ReadinessProbe, skipping PodDisruptionBudget",
"name", pod.Name)
return nil
}
}
}

ls := utils.LabelsForAerospikeCluster(r.aeroCluster.Name)
pdb := &v1.PodDisruptionBudget{}
// TODO: Move to concrete object when minimum supported k8s version is 1.21
pdb := &unstructured.Unstructured{}
pdb.SetGroupVersionKind(PDBbGvk)

err := r.Client.Get(
if err := r.Client.Get(
context.TODO(), types.NamespacedName{
Name: r.aeroCluster.Name, Namespace: r.aeroCluster.Namespace,
}, pdb,
)
if err != nil {
); err != nil {
if !errors.IsNotFound(err) {
return err
}

r.Log.Info("Create PodDisruptionBudget")

pdb = &v1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: r.aeroCluster.Name,
Namespace: r.aeroCluster.Namespace,
Labels: ls,
},
Spec: v1.PodDisruptionBudgetSpec{
MaxUnavailable: r.aeroCluster.Spec.MaxUnavailable,
Selector: &metav1.LabelSelector{
MatchLabels: ls,
},
pdb.SetName(r.aeroCluster.Name)
pdb.SetNamespace(r.aeroCluster.Namespace)
pdb.SetLabels(ls)
pdb.Object["spec"] = map[string]interface{}{
"maxUnavailable": r.aeroCluster.Spec.MaxUnavailable,
"selector": &metav1.LabelSelector{
MatchLabels: ls,
},
}

// This will be true only for old existing CRs. For new operator versions, this field will be
// set by default to 1 by mutating webhook.
if pdb.Spec.MaxUnavailable == nil {
maxUnavailable := intstr.FromInt(1)
pdb.Spec.MaxUnavailable = &maxUnavailable
}

// Set AerospikeCluster instance as the owner and controller
err = controllerutil.SetControllerReference(
r.aeroCluster, pdb, r.Scheme,
Expand Down Expand Up @@ -78,9 +90,23 @@ func (r *SingleClusterReconciler) createOrUpdatePDB() error {
utils.NamespacedName(r.aeroCluster.Namespace, r.aeroCluster.Name),
)

if pdb.Spec.MaxUnavailable.String() != r.aeroCluster.Spec.MaxUnavailable.String() {
pdb.Spec.MaxUnavailable = r.aeroCluster.Spec.MaxUnavailable
if err = r.Client.Update(
var value string

maxUnavailable := pdb.Object["spec"].(map[string]interface{})["maxUnavailable"]

// Type casting is required because of unstructured object
if val, ok := maxUnavailable.(string); ok {
value = val
} else {
value = strconv.Itoa(int(maxUnavailable.(int64)))
}

if value != r.aeroCluster.Spec.MaxUnavailable.String() {
spec := pdb.Object["spec"].(map[string]interface{})
spec["maxUnavailable"] = r.aeroCluster.Spec.MaxUnavailable
pdb.Object["spec"] = spec

if err := r.Client.Update(
context.TODO(), pdb, updateOption,
); err != nil {
return fmt.Errorf(
Expand Down
12 changes: 11 additions & 1 deletion test/cluster_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func rollingRestartClusterByEnablingTLS(
return err
}

aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName)
if err != nil {
return err
}

return validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceTLSPort)
}

Expand Down Expand Up @@ -155,8 +160,13 @@ func rollingRestartClusterByDisablingTLS(
return err
}

aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName)
if err != nil {
return err
}

// Port should be updated to service non-tls port
return validateReadinessProbe(ctx, k8sClient, aeroCluster, servicePort)
return validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceNonTLSPort)
}

func scaleUpClusterTestWithNSDeviceHandling(
Expand Down
7 changes: 7 additions & 0 deletions test/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,13 @@ func DeployClusterForAllImagesPost490(ctx goctx.Context) {
err = deployCluster(k8sClient, ctx, aeroCluster)
Expect(err).ToNot(HaveOccurred())

aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName)
Expect(err).ToNot(HaveOccurred())

By("Validating Readiness probe")
err = validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceTLSPort)
Expect(err).ToNot(HaveOccurred())

_ = deleteCluster(k8sClient, ctx, aeroCluster)
},
)
Expand Down
5 changes: 0 additions & 5 deletions test/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ var (
pkgLog = ctrl.Log.WithName("test")
)

const (
servicePort = 3000
serviceTLSPort = 4333
)

var secrets map[string][]byte
var cacertSecrets map[string][]byte

Expand Down

0 comments on commit 2346ff9

Please sign in to comment.