diff --git a/api/v1/aerospikecluster_mutating_webhook.go b/api/v1/aerospikecluster_mutating_webhook.go index ed6a44019..41e79ebf4 100644 --- a/api/v1/aerospikecluster_mutating_webhook.go +++ b/api/v1/aerospikecluster_mutating_webhook.go @@ -70,9 +70,8 @@ func (c *AerospikeCluster) Default(operation v1.Operation) admission.Response { } func (c *AerospikeCluster) setDefaults(asLog logr.Logger) error { - // Set maxUnavailable default + // Set maxUnavailable default to 1 if c.Spec.MaxUnavailable == nil { - // Set default maxUnavailable to 1 maxUnavailable := intstr.FromInt(1) c.Spec.MaxUnavailable = &maxUnavailable } diff --git a/api/v1/aerospikecluster_validating_webhook.go b/api/v1/aerospikecluster_validating_webhook.go index f83e61fee..9f4dc1788 100644 --- a/api/v1/aerospikecluster_validating_webhook.go +++ b/api/v1/aerospikecluster_validating_webhook.go @@ -2198,9 +2198,43 @@ func (c *AerospikeCluster) validateMaxUnavailable() error { return err } - // TODO: Do we need such types of check? Maybe > size/2 etc - if c.Spec.MaxUnavailable.IntValue() > int(c.Spec.Size) { - return fmt.Errorf("maxUnavailable %s cannot be greater than size", c.Spec.MaxUnavailable.String()) + maxUnavailable := int(c.Spec.Size) + + // If Size is 1, then ignore it for maxUnavailable calculation as it will anyway result in data loss + if maxUnavailable == 1 { + return nil + } + + for idx := range c.Spec.RackConfig.Racks { + rack := &c.Spec.RackConfig.Racks[idx] + nsList := rack.AerospikeConfig.Value["namespaces"].([]interface{}) + + for _, nsInterface := range nsList { + rfInterface, exists := nsInterface.(map[string]interface{})["replication-factor"] + if !exists { + // Default RF is 2 if not given + maxUnavailable = 2 + continue + } + + rf, err := GetIntType(rfInterface) + if err != nil { + return fmt.Errorf("namespace replication-factor %v", err) + } + + // If RF is 1, then ignore it for maxUnavailable calculation as it will anyway result in data loss + if rf == 1 { + continue + } + + if rf < maxUnavailable { + maxUnavailable = rf + } + } + } + + if c.Spec.MaxUnavailable.IntValue() >= maxUnavailable { + return fmt.Errorf("maxUnavailable %s cannot be greater than or equal to %v", c.Spec.MaxUnavailable.String(), maxUnavailable) } return nil diff --git a/controllers/aerospikecluster_controller.go b/controllers/aerospikecluster_controller.go index 701e9b9a4..d23cc9956 100644 --- a/controllers/aerospikecluster_controller.go +++ b/controllers/aerospikecluster_controller.go @@ -6,8 +6,11 @@ import ( "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" + policyv1 "k8s.io/api/policy/v1" + policyv1beta1 "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/errors" k8sRuntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" @@ -37,6 +40,8 @@ var ( } ) +var PDBbGvk = policyv1beta1.SchemeGroupVersion.WithKind("PodDisruptionBudget") + // AerospikeClusterReconciler reconciles AerospikeClusters type AerospikeClusterReconciler struct { client.Client @@ -49,6 +54,10 @@ type AerospikeClusterReconciler struct { // SetupWithManager sets up the controller with the Manager func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { + if err := r.setupPdbAPI(r.KubeConfig); err != nil { + return err + } + return ctrl.NewControllerManagedBy(mgr). For(&asdbv1.AerospikeCluster{}). Owns( @@ -72,6 +81,27 @@ func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } +// setupPdbAPI sets up the pdb api version to use as per the k8s version. +// TODO: Move to v1 when minimum supported k8s version is 1.21 +func (r *AerospikeClusterReconciler) setupPdbAPI(config *rest.Config) error { + discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(config) + + resources, err := discoveryClient.ServerResourcesForGroupVersion("policy/v1") + if err != nil { + r.Log.Info("Could not get ServerResourcesForGroupVersion for policy/v1, falling back to policy/v1beta1") + return nil + } + + for i := range resources.APIResources { + if resources.APIResources[i].Kind == "PodDisruptionBudget" { + PDBbGvk = policyv1.SchemeGroupVersion.WithKind("PodDisruptionBudget") + return nil + } + } + + return nil +} + // RackState contains the rack configuration and rack size. type RackState struct { Rack *asdbv1.Rack diff --git a/controllers/poddistruptionbudget.go b/controllers/poddistruptionbudget.go index 1b78e0b2a..580a390a0 100644 --- a/controllers/poddistruptionbudget.go +++ b/controllers/poddistruptionbudget.go @@ -3,54 +3,66 @@ package controllers import ( "context" "fmt" + "strconv" - v1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" "github.com/aerospike/aerospike-kubernetes-operator/pkg/utils" ) func (r *SingleClusterReconciler) createOrUpdatePDB() error { + podList, err := r.getClusterPodList() + if err != nil { + return err + } + + for podIdx := range podList.Items { + pod := &podList.Items[podIdx] + + for containerIdx := range pod.Spec.Containers { + if pod.Spec.Containers[containerIdx].Name != asdbv1.AerospikeServerContainerName { + continue + } + + if pod.Spec.Containers[containerIdx].ReadinessProbe == nil { + r.Log.Info("Pod found without ReadinessProbe, skipping PodDisruptionBudget", + "name", pod.Name) + return nil + } + } + } + ls := utils.LabelsForAerospikeCluster(r.aeroCluster.Name) - pdb := &v1.PodDisruptionBudget{} + // TODO: Move to concrete object when minimum supported k8s version is 1.21 + pdb := &unstructured.Unstructured{} + pdb.SetGroupVersionKind(PDBbGvk) - err := r.Client.Get( + if err := r.Client.Get( context.TODO(), types.NamespacedName{ Name: r.aeroCluster.Name, Namespace: r.aeroCluster.Namespace, }, pdb, - ) - if err != nil { + ); err != nil { if !errors.IsNotFound(err) { return err } r.Log.Info("Create PodDisruptionBudget") - pdb = &v1.PodDisruptionBudget{ - ObjectMeta: metav1.ObjectMeta{ - Name: r.aeroCluster.Name, - Namespace: r.aeroCluster.Namespace, - Labels: ls, - }, - Spec: v1.PodDisruptionBudgetSpec{ - MaxUnavailable: r.aeroCluster.Spec.MaxUnavailable, - Selector: &metav1.LabelSelector{ - MatchLabels: ls, - }, + pdb.SetName(r.aeroCluster.Name) + pdb.SetNamespace(r.aeroCluster.Namespace) + pdb.SetLabels(ls) + pdb.Object["spec"] = map[string]interface{}{ + "maxUnavailable": r.aeroCluster.Spec.MaxUnavailable, + "selector": &metav1.LabelSelector{ + MatchLabels: ls, }, } - // This will be true only for old existing CRs. For new operator versions, this field will be - // set by default to 1 by mutating webhook. - if pdb.Spec.MaxUnavailable == nil { - maxUnavailable := intstr.FromInt(1) - pdb.Spec.MaxUnavailable = &maxUnavailable - } - // Set AerospikeCluster instance as the owner and controller err = controllerutil.SetControllerReference( r.aeroCluster, pdb, r.Scheme, @@ -78,9 +90,23 @@ func (r *SingleClusterReconciler) createOrUpdatePDB() error { utils.NamespacedName(r.aeroCluster.Namespace, r.aeroCluster.Name), ) - if pdb.Spec.MaxUnavailable.String() != r.aeroCluster.Spec.MaxUnavailable.String() { - pdb.Spec.MaxUnavailable = r.aeroCluster.Spec.MaxUnavailable - if err = r.Client.Update( + var value string + + maxUnavailable := pdb.Object["spec"].(map[string]interface{})["maxUnavailable"] + + // Type casting is required because of unstructured object + if val, ok := maxUnavailable.(string); ok { + value = val + } else { + value = strconv.Itoa(int(maxUnavailable.(int64))) + } + + if value != r.aeroCluster.Spec.MaxUnavailable.String() { + spec := pdb.Object["spec"].(map[string]interface{}) + spec["maxUnavailable"] = r.aeroCluster.Spec.MaxUnavailable + pdb.Object["spec"] = spec + + if err := r.Client.Update( context.TODO(), pdb, updateOption, ); err != nil { return fmt.Errorf( diff --git a/test/cluster_helper.go b/test/cluster_helper.go index c18b1a2a9..67a1fe849 100644 --- a/test/cluster_helper.go +++ b/test/cluster_helper.go @@ -112,6 +112,11 @@ func rollingRestartClusterByEnablingTLS( return err } + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + if err != nil { + return err + } + return validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceTLSPort) } @@ -155,8 +160,13 @@ func rollingRestartClusterByDisablingTLS( return err } + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + if err != nil { + return err + } + // Port should be updated to service non-tls port - return validateReadinessProbe(ctx, k8sClient, aeroCluster, servicePort) + return validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceNonTLSPort) } func scaleUpClusterTestWithNSDeviceHandling( diff --git a/test/cluster_test.go b/test/cluster_test.go index 6b0a1293a..d0c1b3663 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -150,6 +150,13 @@ func DeployClusterForAllImagesPost490(ctx goctx.Context) { err = deployCluster(k8sClient, ctx, aeroCluster) Expect(err).ToNot(HaveOccurred()) + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + By("Validating Readiness probe") + err = validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceTLSPort) + Expect(err).ToNot(HaveOccurred()) + _ = deleteCluster(k8sClient, ctx, aeroCluster) }, ) diff --git a/test/utils.go b/test/utils.go index 1c8223535..955b4d723 100644 --- a/test/utils.go +++ b/test/utils.go @@ -35,11 +35,6 @@ var ( pkgLog = ctrl.Log.WithName("test") ) -const ( - servicePort = 3000 - serviceTLSPort = 4333 -) - var secrets map[string][]byte var cacertSecrets map[string][]byte