From 5c0af06ba8bc31243cdc18a710f7931b18b87301 Mon Sep 17 00:00:00 2001 From: Tanmay Jain <103629776+tanmayja@users.noreply.github.com> Date: Sat, 18 May 2024 21:01:13 +0530 Subject: [PATCH] Fixing bugs (#287) * Checking aerospikeConf annotation availability in the pod * Deleting rack-id from global config namespaces * Changing 0 rack-id to correct rack id in rackConfig * Fixing the rack-id issue in the operator upgrade * Remove the isSecurityEnabled flag. Determine enabled security by comparing the config hash of the configMap and the pod. * LDAP dynamic config fix and remove rack-id fixes * Added isClusterReadinessEnabled field in status * check dynamic config enable flag based on the init image tag * Update init image --------- Co-authored-by: Sudhanshu Ranjan Co-authored-by: Abhisek Dwivedi --- api/v1/aerospikecluster_mutating_webhook.go | 14 +- api/v1/aerospikecluster_types.go | 11 +- api/v1/aerospikecluster_validating_webhook.go | 151 ++++++++++++++---- api/v1/utils.go | 8 +- .../asdb.aerospike.com_aerospikeclusters.yaml | 14 +- controllers/configmap.go | 19 +++ controllers/pod.go | 12 +- controllers/poddistruptionbudget.go | 30 ++-- controllers/rack.go | 83 +++++++++- controllers/reconciler.go | 97 ++++------- go.mod | 2 +- go.sum | 4 +- ..._aerospikeclusters.asdb.aerospike.com.yaml | 14 +- test/batch_scaledown_pods_test.go | 32 +++- test/dynamic_config_test.go | 33 +++- test/ldap_auth_test.go | 42 +---- test/poddisruptionbudget_test.go | 14 +- 17 files changed, 389 insertions(+), 191 deletions(-) diff --git a/api/v1/aerospikecluster_mutating_webhook.go b/api/v1/aerospikecluster_mutating_webhook.go index 3f3f65d87..fbcea4ed6 100644 --- a/api/v1/aerospikecluster_mutating_webhook.go +++ b/api/v1/aerospikecluster_mutating_webhook.go @@ -73,7 +73,7 @@ func (c *AerospikeCluster) Default(operation v1.Operation) admission.Response { func (c *AerospikeCluster) setDefaults(asLog logr.Logger) error { // Set maxUnavailable default to 1 if !GetBool(c.Spec.DisablePDB) && c.Spec.MaxUnavailable == nil { - maxUnavailable := intstr.FromInt(1) + maxUnavailable := intstr.FromInt32(1) c.Spec.MaxUnavailable = &maxUnavailable } @@ -403,6 +403,14 @@ func setDefaultNsConf(asLog logr.Logger, configSpec AerospikeConfigSpec, if rackID != nil { // Add rack-id only in rack specific config, not in global config defaultConfs := map[string]interface{}{"rack-id": *rackID} + + // rack-id was historically set to 0 for all namespaces, but since the AKO 3.3.0, it reflects actual values. + // During the AKO 3.3.0 upgrade rack-id for namespaces in rack specific config is set to 0. + // Hence, deleting this 0 rack-id so that correct rack-id will be added. + if id, ok := nsMap["rack-id"]; ok && id == float64(0) && *rackID != 0 { + delete(nsMap, "rack-id") + } + if err := setDefaultsInConfigMap( asLog, nsMap, defaultConfs, ); err != nil { @@ -411,6 +419,10 @@ func setDefaultNsConf(asLog logr.Logger, configSpec AerospikeConfigSpec, err, ) } + } else { + // Deleting rack-id for namespaces in global config. + // Correct rack-id will be added in rack specific config. + delete(nsMap, "rack-id") } } else { // User may have added this key or may have patched object with new smaller rackEnabledNamespace list diff --git a/api/v1/aerospikecluster_types.go b/api/v1/aerospikecluster_types.go index d1eec5a0f..72e942cf3 100644 --- a/api/v1/aerospikecluster_types.go +++ b/api/v1/aerospikecluster_types.go @@ -669,6 +669,11 @@ type AerospikeClusterStatusSpec struct { //nolint:govet // for readability // In case of inconsistent state during dynamic config update, operator falls back to rolling restart. // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Enable Dynamic Config Update" EnableDynamicConfigUpdate *bool `json:"enableDynamicConfigUpdate,omitempty"` + + // IsReadinessProbeEnabled tells whether the readiness probe is present in all pods or not. + // Moreover, PodDisruptionBudget should be created for the Aerospike cluster only when this field is enabled. + // +optional + IsReadinessProbeEnabled bool `json:"isClusterReadinessEnabled"` // Define resources requests and limits for Aerospike Server Container. // Please contact aerospike for proper sizing exercise // Only Memory and Cpu resources can be given @@ -867,6 +872,9 @@ type AerospikeInstanceSummary struct { //nolint:govet // for readability type AerospikePodStatus struct { //nolint:govet // for readability // Image is the Aerospike image this pod is running. Image string `json:"image"` + // InitImage is the Aerospike init image this pod's init container is running. + // +optional + InitImage string `json:"initImage,omitempty"` // PodIP in the K8s network. PodIP string `json:"podIP"` // HostInternalIP of the K8s host this pod is scheduled on. @@ -902,9 +910,6 @@ type AerospikePodStatus struct { //nolint:govet // for readability // DynamicConfigUpdateStatus is the status of dynamic config update operation. // Empty "" status means successful update. DynamicConfigUpdateStatus DynamicConfigUpdateStatus `json:"dynamicConfigUpdateStatus,omitempty"` - - // IsSecurityEnabled is true if security is enabled in the pod - IsSecurityEnabled bool `json:"isSecurityEnabled"` } // +kubebuilder:object:root=true diff --git a/api/v1/aerospikecluster_validating_webhook.go b/api/v1/aerospikecluster_validating_webhook.go index 7b063bdb1..fd8dcb34e 100644 --- a/api/v1/aerospikecluster_validating_webhook.go +++ b/api/v1/aerospikecluster_validating_webhook.go @@ -33,7 +33,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/utils/ptr" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/webhook" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" @@ -82,6 +81,10 @@ func (c *AerospikeCluster) ValidateUpdate(oldObj runtime.Object) (admission.Warn return nil, err } + if err := c.validateEnableDynamicConfigUpdate(); err != nil { + return nil, err + } + outgoingVersion, err := GetImageVersion(old.Spec.Image) if err != nil { return nil, err @@ -104,7 +107,7 @@ func (c *AerospikeCluster) ValidateUpdate(oldObj runtime.Object) (admission.Warn } // MultiPodPerHost cannot be updated - if !ptr.Equal(c.Spec.PodSpec.MultiPodPerHost, old.Spec.PodSpec.MultiPodPerHost) { + if GetBool(c.Spec.PodSpec.MultiPodPerHost) != GetBool(old.Spec.PodSpec.MultiPodPerHost) { return nil, fmt.Errorf("cannot update MultiPodPerHost setting") } @@ -118,7 +121,7 @@ func (c *AerospikeCluster) ValidateUpdate(oldObj runtime.Object) (admission.Warn if err := validateAerospikeConfigUpdate( aslog, incomingVersion, outgoingVersion, c.Spec.AerospikeConfig, old.Spec.AerospikeConfig, - c.Status.AerospikeConfig, c.Status.Pods, + c.Status.AerospikeConfig, ); err != nil { return nil, err } @@ -442,7 +445,7 @@ func (c *AerospikeCluster) validateRackUpdate( if err := validateAerospikeConfigUpdate( aslog, incomingVersion, outgoingVersion, &newRack.AerospikeConfig, &oldRack.AerospikeConfig, - rackStatusConfig, c.Status.Pods, + rackStatusConfig, ); err != nil { return fmt.Errorf( "invalid update in Rack(ID: %d) aerospikeConfig: %v", @@ -602,14 +605,12 @@ func (c *AerospikeCluster) validateRackConfig(_ logr.Logger) error { } // Validate batch upgrade/restart param - if err := c.validateBatchSize(c.Spec.RackConfig.RollingUpdateBatchSize, - "spec.rackConfig.rollingUpdateBatchSize"); err != nil { + if err := c.validateBatchSize(c.Spec.RackConfig.RollingUpdateBatchSize, true); err != nil { return err } // Validate batch scaleDown param - if err := c.validateBatchSize(c.Spec.RackConfig.ScaleDownBatchSize, - "spec.rackConfig.scaleDownBatchSize"); err != nil { + if err := c.validateBatchSize(c.Spec.RackConfig.ScaleDownBatchSize, false); err != nil { return err } @@ -627,6 +628,7 @@ func (c *AerospikeCluster) validateRackConfig(_ logr.Logger) error { type nsConf struct { noOfRacksForNamespaces int replicationFactor int + scEnabled bool } func getNsConfForNamespaces(rackConfig RackConfig) map[string]nsConf { @@ -647,9 +649,13 @@ func getNsConfForNamespaces(rackConfig RackConfig) map[string]nsConf { } rf, _ := getNamespaceReplicationFactor(nsInterface.(map[string]interface{})) + + ns := nsInterface.(map[string]interface{}) + scEnabled := IsNSSCEnabled(ns) nsConfs[nsName] = nsConf{ noOfRacksForNamespaces: noOfRacksForNamespaces, replicationFactor: rf, + scEnabled: scEnabled, } } } @@ -945,7 +951,7 @@ func readNamesFromLocalCertificate(clientCertSpec *AerospikeOperatorClientCertSp return result, err } - if len(cert.Subject.CommonName) > 0 { + if cert.Subject.CommonName != "" { result[cert.Subject.CommonName] = struct{}{} } @@ -1249,8 +1255,23 @@ func getNamespaceReplicationFactor(nsConf map[string]interface{}) (int, error) { } func validateSecurityConfigUpdate( - newVersion, oldVersion string, newSpec, oldSpec *AerospikeConfigSpec, podStatus map[string]AerospikePodStatus, -) error { + newVersion, oldVersion string, newSpec, oldSpec, currentStatus *AerospikeConfigSpec) error { + if currentStatus != nil { + currentSecurityConfig, err := IsSecurityEnabled(oldVersion, currentStatus) + if err != nil { + return err + } + + desiredSecurityConfig, err := IsSecurityEnabled(newVersion, newSpec) + if err != nil { + return err + } + + if currentSecurityConfig && !desiredSecurityConfig { + return fmt.Errorf("cannot disable cluster security in running cluster") + } + } + nv, err := lib.CompareVersions(newVersion, "5.7.0") if err != nil { return err @@ -1261,23 +1282,14 @@ func validateSecurityConfigUpdate( return err } - isSecurityEnabledPodExist := false - - for pod := range podStatus { - if podStatus[pod].IsSecurityEnabled { - isSecurityEnabledPodExist = true - break - } - } - if nv >= 0 || ov >= 0 { - return validateSecurityContext(newVersion, oldVersion, newSpec, oldSpec, isSecurityEnabledPodExist) + return validateSecurityContext(newVersion, oldVersion, newSpec, oldSpec) } - return validateEnableSecurityConfig(newSpec, oldSpec, isSecurityEnabledPodExist) + return validateEnableSecurityConfig(newSpec, oldSpec) } -func validateEnableSecurityConfig(newConfSpec, oldConfSpec *AerospikeConfigSpec, isSecurityEnabledPodExist bool) error { +func validateEnableSecurityConfig(newConfSpec, oldConfSpec *AerospikeConfigSpec) error { newConf := newConfSpec.Value oldConf := oldConfSpec.Value oldSec, oldSecConfFound := oldConf["security"] @@ -1291,8 +1303,7 @@ func validateEnableSecurityConfig(newConfSpec, oldConfSpec *AerospikeConfigSpec, oldSecFlag, oldEnableSecurityFlagFound := oldSec.(map[string]interface{})["enable-security"] newSecFlag, newEnableSecurityFlagFound := newSec.(map[string]interface{})["enable-security"] - if oldEnableSecurityFlagFound && oldSecFlag.(bool) && (!newEnableSecurityFlagFound || !newSecFlag.(bool)) && - isSecurityEnabledPodExist { + if oldEnableSecurityFlagFound && oldSecFlag.(bool) && (!newEnableSecurityFlagFound || !newSecFlag.(bool)) { return fmt.Errorf("cannot disable cluster security in running cluster") } } @@ -1301,8 +1312,7 @@ func validateEnableSecurityConfig(newConfSpec, oldConfSpec *AerospikeConfigSpec, } func validateSecurityContext( - newVersion, oldVersion string, newSpec, oldSpec *AerospikeConfigSpec, isSecurityEnabledPodExist bool, -) error { + newVersion, oldVersion string, newSpec, oldSpec *AerospikeConfigSpec) error { ovflag, err := IsSecurityEnabled(oldVersion, oldSpec) if err != nil { if !errors.Is(err, internalerrors.ErrNotFound) { @@ -1322,7 +1332,7 @@ func validateSecurityContext( } } - if !ivflag && ovflag && isSecurityEnabledPodExist { + if !ivflag && ovflag { return fmt.Errorf("cannot disable cluster security in running cluster") } @@ -1332,14 +1342,12 @@ func validateSecurityContext( func validateAerospikeConfigUpdate( aslog logr.Logger, incomingVersion, outgoingVersion string, incomingSpec, outgoingSpec, currentStatus *AerospikeConfigSpec, - podStatus map[string]AerospikePodStatus, ) error { aslog.Info("Validate AerospikeConfig update") if err := validateSecurityConfigUpdate( incomingVersion, outgoingVersion, incomingSpec, outgoingSpec, - podStatus, - ); err != nil { + currentStatus); err != nil { return err } @@ -2145,11 +2153,22 @@ func (c *AerospikeCluster) validateNetworkPolicy(namespace string) error { return nil } -func (c *AerospikeCluster) validateBatchSize(batchSize *intstr.IntOrString, fieldPath string) error { +// validateBatchSize validates the batch size for the following types: +// - rollingUpdateBatchSize: Rolling update batch size +// - scaleDownBatchSize: Scale down batch size +func (c *AerospikeCluster) validateBatchSize(batchSize *intstr.IntOrString, rollingUpdateBatch bool) error { + var fieldPath string + if batchSize == nil { return nil } + if rollingUpdateBatch { + fieldPath = "spec.rackConfig.rollingUpdateBatchSize" + } else { + fieldPath = "spec.rackConfig.scaleDownBatchSize" + } + if err := validateIntOrStringField(batchSize, fieldPath); err != nil { return err } @@ -2179,6 +2198,14 @@ func (c *AerospikeCluster) validateBatchSize(batchSize *intstr.IntOrString, fiel ns, ) } + + // If Strong Consistency is enabled, then scaleDownBatchSize can't be used + if !rollingUpdateBatch && nsConf.scEnabled { + return fmt.Errorf( + "can not use %s when namespace `%s` is configured with Strong Consistency", fieldPath, + ns, + ) + } } return nil @@ -2282,3 +2309,63 @@ func (c *AerospikeCluster) validateMaxUnavailable() error { return nil } + +func (c *AerospikeCluster) validateEnableDynamicConfigUpdate() error { + if !GetBool(c.Spec.EnableDynamicConfigUpdate) { + return nil + } + + if len(c.Status.Pods) == 0 { + return nil + } + + minInitVersion, err := getMinRunningInitVersion(c.Status.Pods) + if err != nil { + return err + } + + val, err := lib.CompareVersions(minInitVersion, minInitVersionForDynamicConf) + if err != nil { + return fmt.Errorf("failed to check image version: %v", err) + } + + if val < 0 { + return fmt.Errorf("cannot enable enableDynamicConfigUpdate flag, some init containers are running version less"+ + " than %s. Please visit https://aerospike.com/docs/cloud/kubernetes/operator/Cluster-configuration-settings#spec"+ + " for more details about enableDynamicConfigUpdate flag", + minInitVersionForDynamicConf) + } + + return nil +} + +func getMinRunningInitVersion(pods map[string]AerospikePodStatus) (string, error) { + minVersion := "" + + for idx := range pods { + if pods[idx].InitImage != "" { + version, err := GetImageVersion(pods[idx].InitImage) + if err != nil { + return "", err + } + + if minVersion == "" { + minVersion = version + continue + } + + val, err := lib.CompareVersions(version, minVersion) + if err != nil { + return "", fmt.Errorf("failed to check image version: %v", err) + } + + if val < 0 { + minVersion = version + } + } else { + return baseInitVersion, nil + } + } + + return minVersion, nil +} diff --git a/api/v1/utils.go b/api/v1/utils.go index 653145710..d17cc9f58 100644 --- a/api/v1/utils.go +++ b/api/v1/utils.go @@ -32,7 +32,11 @@ const ( InfoPortName = "info" ) -const baseVersion = "4.9.0.3" +const ( + baseVersion = "4.9.0.3" + baseInitVersion = "1.0.0" + minInitVersionForDynamicConf = "2.2.0" +) const ( // Namespace keys. @@ -63,7 +67,7 @@ const ( AerospikeInitContainerRegistryEnvVar = "AEROSPIKE_KUBERNETES_INIT_REGISTRY" AerospikeInitContainerDefaultRegistry = "docker.io" AerospikeInitContainerDefaultRegistryNamespace = "aerospike" - AerospikeInitContainerDefaultRepoAndTag = "aerospike-kubernetes-init:2.2.0-dev3" + AerospikeInitContainerDefaultRepoAndTag = "aerospike-kubernetes-init:2.2.0-dev4" AerospikeAppLabel = "app" AerospikeAppLabelValue = "aerospike-cluster" AerospikeCustomResourceLabel = "aerospike.com/cr" diff --git a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml index f1fdbf581..3fc9d4050 100644 --- a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml +++ b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml @@ -9618,6 +9618,11 @@ spec: image: description: Aerospike server image type: string + isClusterReadinessEnabled: + description: IsReadinessProbeEnabled tells whether the readiness probe + is present in all pods or not. Moreover, PodDisruptionBudget should + be created for the Aerospike cluster only when this field is enabled. + type: boolean k8sNodeBlockList: description: K8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods. @@ -14285,16 +14290,16 @@ spec: image: description: Image is the Aerospike image this pod is running. type: string + initImage: + description: InitImage is the Aerospike init image this pod's + init container is running. + type: string initializedVolumes: description: InitializedVolumes is the list of volume names that have already been initialized. items: type: string type: array - isSecurityEnabled: - description: IsSecurityEnabled is true if security is enabled - in the pod - type: boolean networkPolicyHash: description: NetworkPolicyHash is ripemd160 hash of NetworkPolicy used by this pod @@ -14318,7 +14323,6 @@ spec: required: - aerospikeConfigHash - image - - isSecurityEnabled - networkPolicyHash - podIP - podPort diff --git a/controllers/configmap.go b/controllers/configmap.go index 1514a63ba..c9271b98d 100644 --- a/controllers/configmap.go +++ b/controllers/configmap.go @@ -8,6 +8,7 @@ import ( "fmt" "io/fs" "path/filepath" + "regexp" "strings" "text/template" @@ -106,6 +107,15 @@ func (r *SingleClusterReconciler) createConfigMapData(rack *asdbv1.Rack) ( confData[aerospikeTemplateConfFileName] = confTemp + // [Backward compatibility fix for AKO 3.3.0 upgrade] + // rack-id was historically set to 0 for all namespaces, but since the AKO 3.3.0, it reflects actual values. + // This change led to hash mismatches during the AKO 3.3.0 upgrade, triggering unnecessary warm restarts. + // Solution: Replace real rack-id with 0 in hash calculations to avoid this issue. + re := regexp.MustCompile(`rack-id.*\d+`) + if rackStr := re.FindString(confTemp); rackStr != "" { + confTemp = strings.ReplaceAll(confTemp, rackStr, "rack-id 0") + } + // Add conf hash confHash, err := utils.GetHash(confTemp) if err != nil { @@ -137,6 +147,7 @@ func (r *SingleClusterReconciler) createConfigMapData(rack *asdbv1.Rack) ( return nil, err } + // [Backward compatibility fix for AKO 2.1.0 upgrade] // This is a newly introduced field in 2.1.0. // Ignore empty value from hash computation so that on upgrade clusters are // not rolling restarted. @@ -144,6 +155,14 @@ func (r *SingleClusterReconciler) createConfigMapData(rack *asdbv1.Rack) ( string(podSpecStr), "\"aerospikeInitContainer\":{},", "", )) + // [Backward compatibility fix for AKO 3.3.0 upgrade] + // This field is changed from bool type to *bool type in 3.3.0 + // Ignore false value from hash computation so that on upgrade clusters are + // not rolling restarted. + podSpecStr = []byte(strings.ReplaceAll( + string(podSpecStr), "\"multiPodPerHost\":false,", "", + )) + podSpecHash, err := utils.GetHash(string(podSpecStr)) if err != nil { return nil, err diff --git a/controllers/pod.go b/controllers/pod.go index 29d9d27e2..1eee1214f 100644 --- a/controllers/pod.go +++ b/controllers/pod.go @@ -1356,7 +1356,12 @@ func (r *SingleClusterReconciler) handleDynamicConfigChange(rackState *RackState return nil, fmt.Errorf("failed to load config map by lib: %v", err) } - asConfSpec, err := getFlatConfig(r.Log, asConf.ToConfFile()) + // special handling for DNE in ldap configurations + specConfFile := asConf.ToConfFile() + specConfFile = strings.ReplaceAll(specConfFile, "$${_DNE}{un}", "${un}") + specConfFile = strings.ReplaceAll(specConfFile, "$${_DNE}{dn}", "${dn}") + + asConfSpec, err := getFlatConfig(r.Log, specConfFile) if err != nil { return nil, fmt.Errorf("failed to load config map by lib: %v", err) } @@ -1364,14 +1369,15 @@ func (r *SingleClusterReconciler) handleDynamicConfigChange(rackState *RackState specToStatusDiffs, err := asconfig.ConfDiff(r.Log, *asConfSpec, *asConfStatus, true, version) if err != nil { - r.Log.Info("Failed to get config diff to change config dynamically, fallback to rolling restart: %v", err) + r.Log.Info("Failed to get config diff to change config dynamically, fallback to rolling restart", + "error", err.Error()) return nil, nil } if len(specToStatusDiffs) > 0 { isDynamic, err := asconfig.IsAllDynamicConfig(r.Log, specToStatusDiffs, version) if err != nil { - r.Log.Info("Failed to check if all config is dynamic, fallback to rolling restart: %v", err) + r.Log.Info("Failed to check if all config is dynamic, fallback to rolling restart", "error", err.Error()) return nil, nil } diff --git a/controllers/poddistruptionbudget.go b/controllers/poddistruptionbudget.go index 20eafbd3c..f0d70a662 100644 --- a/controllers/poddistruptionbudget.go +++ b/controllers/poddistruptionbudget.go @@ -16,7 +16,7 @@ import ( func (r *SingleClusterReconciler) reconcilePDB() error { // If spec.DisablePDB is set to true, then we don't need to create PDB - // If it exist then delete it + // If it exists then delete it if asdbv1.GetBool(r.aeroCluster.Spec.DisablePDB) { if !asdbv1.GetBool(r.aeroCluster.Status.DisablePDB) { r.Log.Info("PodDisruptionBudget is disabled. Deleting old PodDisruptionBudget") @@ -63,24 +63,18 @@ func (r *SingleClusterReconciler) deletePDB() error { } func (r *SingleClusterReconciler) createOrUpdatePDB() error { - podList, err := r.getClusterPodList() - if err != nil { - return err - } - - for podIdx := range podList.Items { - pod := &podList.Items[podIdx] - - for containerIdx := range pod.Spec.Containers { - if pod.Spec.Containers[containerIdx].Name != asdbv1.AerospikeServerContainerName { - continue - } + // Check for cluster readiness status only when it's false. + // Once enabled it won't be disabled. + if !r.IsStatusEmpty() && !r.aeroCluster.Status.IsReadinessProbeEnabled { + clusterReadinessEnabled, err := r.getClusterReadinessStatus() + if err != nil { + return fmt.Errorf("failed to get cluster readiness status: %v", err) + } - if pod.Spec.Containers[containerIdx].ReadinessProbe == nil { - r.Log.Info("Pod found without ReadinessProbe, skipping PodDisruptionBudget. Refer Aerospike "+ - "documentation for more details.", "name", pod.Name) - return nil - } + if !clusterReadinessEnabled { + r.Log.Info("Pod Readiness is not enabled throughout cluster. Skipping PodDisruptionBudget." + + " Refer Aerospike documentation for more details.") + return nil } } diff --git a/controllers/rack.go b/controllers/rack.go index f4600741c..c8306759f 100644 --- a/controllers/rack.go +++ b/controllers/rack.go @@ -373,6 +373,14 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat return found, reconcileError(err) } + // Handle enable security just after updating configMap. + // This code will run only when security is being enabled in an existing cluster + // Update for security is verified by checking the config hash of the pod with the + // config hash present in config map + if err := r.handleEnableSecurity(rackState, ignorablePodNames); err != nil { + return found, reconcileError(err) + } + // Upgrade upgradeNeeded, err := r.isRackUpgradeNeeded(rackState.Rack.ID, ignorablePodNames) if err != nil { @@ -978,7 +986,7 @@ func (r *SingleClusterReconciler) scaleDownRack( // In case of rolling restart, no pod cleanup happens, therefore rolling config back is left to the user. if err = r.validateSCClusterState(policy, ignorablePodNames); err != nil { // reset cluster size - newSize := *found.Spec.Replicas + 1 + newSize := *found.Spec.Replicas + int32(len(podsBatch)) found.Spec.Replicas = &newSize r.Log.Error( @@ -997,6 +1005,10 @@ func (r *SingleClusterReconciler) scaleDownRack( ) } + if err = r.waitForSTSToBeReady(found, ignorablePodNames); err != nil { + r.Log.Error(err, "Failed to wait for statefulset to be ready") + } + return found, reconcileRequeueAfter(1) } } @@ -1731,6 +1743,75 @@ func (r *SingleClusterReconciler) getCurrentRackList() ( return rackList, nil } +func (r *SingleClusterReconciler) handleEnableSecurity(rackState *RackState, ignorablePodNames sets.Set[string]) error { + if !r.enablingSecurity() { + // No need to proceed if security is not to be enabling + return nil + } + + // Get pods where security-enabled config is applied + securityEnabledPods, err := r.getPodsWithUpdatedConfigForRack(rackState) + if err != nil { + return err + } + + if len(securityEnabledPods) == 0 { + // No security-enabled pods found + return nil + } + + // Setup access control. + if err := r.validateAndReconcileAccessControl(securityEnabledPods, ignorablePodNames); err != nil { + r.Log.Error(err, "Failed to Reconcile access control") + r.Recorder.Eventf( + r.aeroCluster, corev1.EventTypeWarning, "ACLUpdateFailed", + "Failed to setup Access Control %s/%s", r.aeroCluster.Namespace, + r.aeroCluster.Name, + ) + + return err + } + + return nil +} + +func (r *SingleClusterReconciler) enablingSecurity() bool { + return r.aeroCluster.Spec.AerospikeAccessControl != nil && r.aeroCluster.Status.AerospikeAccessControl == nil +} + +func (r *SingleClusterReconciler) getPodsWithUpdatedConfigForRack(rackState *RackState) ([]corev1.Pod, error) { + pods, err := r.getOrderedRackPodList(rackState.Rack.ID) + if err != nil { + return nil, fmt.Errorf("failed to list pods: %v", err) + } + + if len(pods) == 0 { + // No pod found for the rack + return nil, nil + } + + confMap, err := r.getConfigMap(rackState.Rack.ID) + if err != nil { + return nil, err + } + + requiredConfHash := confMap.Data[aerospikeConfHashFileName] + + updatedPods := make([]corev1.Pod, 0, len(pods)) + + for idx := range pods { + podName := pods[idx].Name + podStatus := r.aeroCluster.Status.Pods[podName] + + if podStatus.AerospikeConfigHash == requiredConfHash { + // Config hash is matching, it means config has been applied + updatedPods = append(updatedPods, *pods[idx]) + } + } + + return updatedPods, nil +} + func isContainerNameInStorageVolumeAttachments( containerName string, mounts []asdbv1.VolumeAttachment, ) bool { diff --git a/controllers/reconciler.go b/controllers/reconciler.go index e7329b8ec..c39fcd6fa 100644 --- a/controllers/reconciler.go +++ b/controllers/reconciler.go @@ -119,10 +119,6 @@ func (r *SingleClusterReconciler) Reconcile() (result ctrl.Result, recErr error) return reconcile.Result{}, recErr } - if err := r.handleEnableSecurity(); err != nil { - return reconcile.Result{}, err - } - // Reconcile all racks if res := r.reconcileRacks(); !res.isSuccess { if res.err != nil { @@ -421,6 +417,17 @@ func (r *SingleClusterReconciler) updateStatus() error { newAeroCluster.Status.AerospikeClusterStatusSpec = *specToStatus newAeroCluster.Status.Phase = asdbv1.AerospikeClusterCompleted + // If IsReadinessProbeEnabled is not enabled, then only check for cluster readiness. + // This is to avoid checking cluster readiness for every reconcile as once it is enabled, it will not be disabled. + if !newAeroCluster.Status.IsReadinessProbeEnabled { + clusterReadinessEnable, gErr := r.getClusterReadinessStatus() + if gErr != nil { + return fmt.Errorf("failed to get cluster readiness status: %v", gErr) + } + + newAeroCluster.Status.IsReadinessProbeEnabled = clusterReadinessEnable + } + err = r.patchStatus(newAeroCluster) if err != nil { return fmt.Errorf("error updating status: %w", err) @@ -446,6 +453,29 @@ func (r *SingleClusterReconciler) setStatusPhase(phase asdbv1.AerospikeClusterPh return nil } +func (r *SingleClusterReconciler) getClusterReadinessStatus() (bool, error) { + podList, err := r.getClusterPodList() + if err != nil { + return false, err + } + + for podIdx := range podList.Items { + pod := &podList.Items[podIdx] + + for containerIdx := range pod.Spec.Containers { + if pod.Spec.Containers[containerIdx].Name != asdbv1.AerospikeServerContainerName { + continue + } + + if pod.Spec.Containers[containerIdx].ReadinessProbe == nil { + return false, nil + } + } + } + + return true, nil +} + func (r *SingleClusterReconciler) updateAccessControlStatus() error { if r.aeroCluster.Spec.AerospikeAccessControl == nil { return nil @@ -983,62 +1013,3 @@ func (r *SingleClusterReconciler) AddAPIVersionLabel(ctx context.Context) error return r.Client.Update(ctx, aeroCluster, updateOption) } - -func (r *SingleClusterReconciler) getSecurityEnabledPods() ([]corev1.Pod, error) { - securityEnabledPods := make([]corev1.Pod, 0, len(r.aeroCluster.Status.Pods)) - - for podName := range r.aeroCluster.Status.Pods { - if r.aeroCluster.Status.Pods[podName].IsSecurityEnabled { - pod := &corev1.Pod{} - podName := types.NamespacedName{Name: podName, Namespace: r.aeroCluster.Namespace} - - if err := r.Client.Get(context.TODO(), podName, pod); err != nil { - return securityEnabledPods, err - } - - securityEnabledPods = append(securityEnabledPods, *pod) - } - } - - return securityEnabledPods, nil -} - -func (r *SingleClusterReconciler) enablingSecurity() bool { - return r.aeroCluster.Spec.AerospikeAccessControl != nil && r.aeroCluster.Status.AerospikeAccessControl == nil -} - -func (r *SingleClusterReconciler) handleEnableSecurity() error { - if !r.enablingSecurity() { - return nil // No need to proceed if security is not to be enabling - } - - securityEnabledPods, err := r.getSecurityEnabledPods() - if err != nil { - return err - } - - if len(securityEnabledPods) == 0 { - return nil // No security-enabled pods found - } - - ignorablePodNames, err := r.getIgnorablePods(nil, getConfiguredRackStateList(r.aeroCluster)) - if err != nil { - r.Log.Error(err, "Failed to determine pods to be ignored") - - return err - } - - // Setup access control. - if err := r.validateAndReconcileAccessControl(securityEnabledPods, ignorablePodNames); err != nil { - r.Log.Error(err, "Failed to Reconcile access control") - r.Recorder.Eventf( - r.aeroCluster, corev1.EventTypeWarning, "ACLUpdateFailed", - "Failed to setup Access Control %s/%s", r.aeroCluster.Namespace, - r.aeroCluster.Name, - ) - - return err - } - - return nil -} diff --git a/go.mod b/go.mod index cccac1c6b..f2a4abb7a 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21 toolchain go1.21.8 require ( - github.com/aerospike/aerospike-management-lib v1.3.1-0.20240423071640-92c4d186a795 + github.com/aerospike/aerospike-management-lib v1.3.1-0.20240506094830-f70b1a5ea4e7 github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d github.com/evanphx/json-patch v4.12.0+incompatible github.com/go-logr/logr v1.4.1 diff --git a/go.sum b/go.sum index 3c738906d..b68faee27 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ github.com/aerospike/aerospike-client-go/v7 v7.1.0 h1:yvCTKdbpqZxHvv7sWsFHV1j49jZcC8yXRooWsDFqKtA= github.com/aerospike/aerospike-client-go/v7 v7.1.0/go.mod h1:AkHiKvCbqa1c16gCNGju3c5X/yzwLVvblNczqjxNwNk= -github.com/aerospike/aerospike-management-lib v1.3.1-0.20240423071640-92c4d186a795 h1:6YxT+4mYhnGtUu42RSVZaCT8DW9Npx0DUJB68tnXm8w= -github.com/aerospike/aerospike-management-lib v1.3.1-0.20240423071640-92c4d186a795/go.mod h1:3JKrmC/mLSV8SygbrPQPNV8T7bFaTMjB8wfnX25gB+4= +github.com/aerospike/aerospike-management-lib v1.3.1-0.20240506094830-f70b1a5ea4e7 h1:aeK9T07k3l9Ea/LKnCXKRbVWu+o6rZVpUpGadxEokxA= +github.com/aerospike/aerospike-management-lib v1.3.1-0.20240506094830-f70b1a5ea4e7/go.mod h1:3JKrmC/mLSV8SygbrPQPNV8T7bFaTMjB8wfnX25gB+4= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d h1:Byv0BzEl3/e6D5CLfI0j/7hiIEtvGVFPCZ7Ei2oq8iQ= diff --git a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml index f1fdbf581..3fc9d4050 100644 --- a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml +++ b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml @@ -9618,6 +9618,11 @@ spec: image: description: Aerospike server image type: string + isClusterReadinessEnabled: + description: IsReadinessProbeEnabled tells whether the readiness probe + is present in all pods or not. Moreover, PodDisruptionBudget should + be created for the Aerospike cluster only when this field is enabled. + type: boolean k8sNodeBlockList: description: K8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods. @@ -14285,16 +14290,16 @@ spec: image: description: Image is the Aerospike image this pod is running. type: string + initImage: + description: InitImage is the Aerospike init image this pod's + init container is running. + type: string initializedVolumes: description: InitializedVolumes is the list of volume names that have already been initialized. items: type: string type: array - isSecurityEnabled: - description: IsSecurityEnabled is true if security is enabled - in the pod - type: boolean networkPolicyHash: description: NetworkPolicyHash is ripemd160 hash of NetworkPolicy used by this pod @@ -14318,7 +14323,6 @@ spec: required: - aerospikeConfigHash - image - - isSecurityEnabled - networkPolicyHash - podIP - podPort diff --git a/test/batch_scaledown_pods_test.go b/test/batch_scaledown_pods_test.go index 7eee7dcd2..c216ef48a 100644 --- a/test/batch_scaledown_pods_test.go +++ b/test/batch_scaledown_pods_test.go @@ -28,7 +28,7 @@ var _ = Describe("BatchScaleDown", func() { BeforeEach( func() { - aeroCluster = createDummyAerospikeCluster(clusterNamespacedName, 8) + aeroCluster = createNonSCDummyAerospikeCluster(clusterNamespacedName, 8) racks := getDummyRackConf(1, 2) aeroCluster.Spec.RackConfig.Racks = racks aeroCluster.Spec.RackConfig.Namespaces = []string{"test"} @@ -53,7 +53,7 @@ var _ = Describe("BatchScaleDown", func() { Expect(err).ToNot(HaveOccurred()) }) - It("Should do ScaleDownBatch when ScaleDownBatchSize is greater than the actual numbers of pods per rack"+ + It("Should do ScaleDownBatch when ScaleDownBatchSize is greater than the actual numbers of pods per rack "+ "to be scaled down", func() { err := batchScaleDownTest(k8sClient, ctx, clusterNamespacedName, count(3), 4) Expect(err).ToNot(HaveOccurred()) @@ -88,6 +88,34 @@ var _ = Describe("BatchScaleDown", func() { // TODO: Do we need to add all the invalid operation test-cases here? // Skipped for now as they are exactly same as RollingUpdateBatchSize invalid operation test-cases + Context("When doing invalid operations", func() { + clusterNamespacedName := getNamespacedName( + batchScaleDownClusterName, namespace, + ) + aeroCluster := &asdbv1.AerospikeCluster{} + + BeforeEach( + func() { + aeroCluster = createDummyAerospikeCluster(clusterNamespacedName, 8) + racks := getDummyRackConf(1, 2) + aeroCluster.Spec.RackConfig.Racks = racks + aeroCluster.Spec.RackConfig.Namespaces = []string{"test"} + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + AfterEach( + func() { + Expect(deleteCluster(k8sClient, ctx, aeroCluster)).ToNot(HaveOccurred()) + }, + ) + + It("Should fail batch scale-down if SC namespace is present", func() { + err := batchScaleDownTest(k8sClient, ctx, clusterNamespacedName, count(3), 2) + Expect(err).Should(HaveOccurred()) + }) + }) }) func batchScaleDownTest( diff --git a/test/dynamic_config_test.go b/test/dynamic_config_test.go index 1ef4913f5..9e5adc727 100644 --- a/test/dynamic_config_test.go +++ b/test/dynamic_config_test.go @@ -11,6 +11,8 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "golang.org/x/net/context" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" @@ -435,8 +437,23 @@ var _ = Describe( BeforeEach( func() { // Create a 2 node cluster - aeroCluster := createDummyAerospikeCluster( - clusterNamespacedName, 2, + aeroCluster := getAerospikeClusterSpecWithLDAP( + clusterNamespacedName, + ) + aeroCluster.Spec.Storage.Volumes = append(aeroCluster.Spec.Storage.Volumes, + asdbv1.VolumeSpec{ + Name: "bar", + Source: asdbv1.VolumeSource{ + PersistentVolume: &asdbv1.PersistentVolumeSpec{ + Size: resource.MustParse("1Gi"), + StorageClass: storageClass, + VolumeMode: v1.PersistentVolumeBlock, + }, + }, + Aerospike: &asdbv1.AerospikeServerVolumeAttachment{ + Path: "/test/dev/xvdf1", + }, + }, ) aeroCluster.Spec.AerospikeConfig.Value["xdr"] = map[string]interface{}{ "dcs": []map[string]interface{}{ @@ -459,13 +476,12 @@ var _ = Describe( aeroCluster.Spec.AerospikeConfig.Value["namespaces"] = []interface{}{ getSCNamespaceConfigWithSet("test", "/test/dev/xvdf"), + getNonSCNamespaceConfig("bar", "/test/dev/xvdf1"), } - aeroCluster.Spec.AerospikeConfig.Value["security"] = map[string]interface{}{ - "log": map[string]interface{}{ - "report-data-op-role": []string{"read"}, - "report-data-op-user": []string{"admin2"}, - }, + aeroCluster.Spec.AerospikeConfig.Value["security"].(map[string]interface{})["log"] = map[string]interface{}{ + "report-data-op-role": []string{"read"}, + "report-data-op-user": []string{"admin2"}, } aeroCluster.Spec.EnableDynamicConfigUpdate = ptr.To(true) @@ -768,7 +784,8 @@ func validateNamespaceContextDynamically( ) error { newSpec := *flatSpec ignoredConf := mapset.NewSet("rack-id", "default-ttl", "disable-write-dup-res", - "disallow-expunge", "conflict-resolution-policy") + "disallow-expunge", "conflict-resolution-policy", "compression-acceleration", "compression-level", + "strong-consistency-allow-expunge") for confKey, val := range *flatServer { if asconfig.ContextKey(confKey) != "namespaces" { diff --git a/test/ldap_auth_test.go b/test/ldap_auth_test.go index 6c697533e..fbeacc410 100644 --- a/test/ldap_auth_test.go +++ b/test/ldap_auth_test.go @@ -10,8 +10,6 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" @@ -97,7 +95,6 @@ func validateTransactions( func getAerospikeClusterSpecWithLDAP( clusterNamespacedName types.NamespacedName, ) *asdbv1.AerospikeCluster { - cascadeDelete := true networkConf := getNetworkTLSConfig() operatorClientCertSpec := getOperatorCert() @@ -107,32 +104,9 @@ func getAerospikeClusterSpecWithLDAP( Namespace: clusterNamespacedName.Namespace, }, Spec: asdbv1.AerospikeClusterSpec{ - Size: 2, - Image: latestImage, - Storage: asdbv1.AerospikeStorageSpec{ - FileSystemVolumePolicy: asdbv1.AerospikePersistentVolumePolicySpec{ - InputCascadeDelete: &cascadeDelete, - }, - BlockVolumePolicy: asdbv1.AerospikePersistentVolumePolicySpec{ - InputCascadeDelete: &cascadeDelete, - }, - Volumes: []asdbv1.VolumeSpec{ - { - Name: "workdir", - Source: asdbv1.VolumeSource{ - PersistentVolume: &asdbv1.PersistentVolumeSpec{ - Size: resource.MustParse("1Gi"), - StorageClass: storageClass, - VolumeMode: corev1.PersistentVolumeFilesystem, - }, - }, - Aerospike: &asdbv1.AerospikeServerVolumeAttachment{ - Path: "/opt/aerospike", - }, - }, - getStorageVolumeForSecret(), - }, - }, + Size: 2, + Image: latestImage, + Storage: getBasicStorageSpecObject(), AerospikeAccessControl: &asdbv1.AerospikeAccessControlSpec{ Users: []asdbv1.AerospikeUserSpec{ @@ -177,15 +151,7 @@ func getAerospikeClusterSpecWithLDAP( }, }, "namespaces": []interface{}{ - map[string]interface{}{ - "name": "test", - "replication-factor": 2, - "migrate-sleep": 0, - "storage-engine": map[string]interface{}{ - "type": "memory", - "data-size": 1073741824, - }, - }, + getSCNamespaceConfig("test", "/test/dev/xvdf"), }, }, }, diff --git a/test/poddisruptionbudget_test.go b/test/poddisruptionbudget_test.go index 385b652d1..c6c16e009 100644 --- a/test/poddisruptionbudget_test.go +++ b/test/poddisruptionbudget_test.go @@ -19,8 +19,8 @@ var _ = Describe( "PodDisruptionBudget", func() { ctx := context.TODO() aeroCluster := &asdbv1.AerospikeCluster{} - maxUnavailable := intstr.FromInt(0) - defaultMaxUnavailable := intstr.FromInt(1) + maxUnavailable := intstr.FromInt32(0) + defaultMaxUnavailable := intstr.FromInt32(1) clusterNamespacedName := getNamespacedName("pdb-test-cluster", namespace) BeforeEach(func() { @@ -114,7 +114,7 @@ var _ = Describe( By("Create cluster. It should fail as PDB is already created") // Create cluster should fail as PDB is not created by operator - err = deployCluster(k8sClient, ctx, aeroCluster) + err = deployClusterWithTO(k8sClient, ctx, aeroCluster, retryInterval, shortRetry) Expect(err).To(HaveOccurred()) By("Delete PDB") @@ -144,7 +144,7 @@ var _ = Describe( }) Context("Invalid Operations", func() { - value := intstr.FromInt(3) + value := intstr.FromInt32(3) It("Should fail if maxUnavailable is greater than size", func() { // Cluster size is 2 @@ -154,9 +154,9 @@ var _ = Describe( }) It("Should fail if maxUnavailable is greater than RF", func() { - // PDB should be < (least rf)). rf is 2 in this test + // PDB should be < (least rf). rf is 2 in this test aeroCluster.Spec.Size = 4 - value := intstr.FromInt(2) + value := intstr.FromInt32(2) aeroCluster.Spec.MaxUnavailable = &value err := deployCluster(k8sClient, ctx, aeroCluster) Expect(err).To(HaveOccurred()) @@ -164,7 +164,7 @@ var _ = Describe( It("Should fail if maxUnavailable is given but disablePDB is true", func() { aeroCluster.Spec.DisablePDB = ptr.To(true) - value := intstr.FromInt(1) + value := intstr.FromInt32(1) aeroCluster.Spec.MaxUnavailable = &value err := deployCluster(k8sClient, ctx, aeroCluster) Expect(err).To(HaveOccurred())