diff --git a/api/v1/access_control_validate.go b/api/v1/access_control_validate.go index b68183496..2fc11011a 100644 --- a/api/v1/access_control_validate.go +++ b/api/v1/access_control_validate.go @@ -33,6 +33,9 @@ const ( // DefaultAdminPassword si default admin user password. DefaultAdminPassword = "admin" + + // Version6 server version 6 tag + Version6 = "6.0.0.0" ) // roleNameForbiddenChars are characters forbidden in role name. @@ -319,7 +322,7 @@ func isPrivilegeValid( } // Check if new privileges are used in an older version. - cmp, err := lib.CompareVersions(version, "6.0.0.0") + cmp, err := lib.CompareVersions(version, Version6) if err != nil { return false, err } diff --git a/api/v1/aerospikecluster_mutating_webhook.go b/api/v1/aerospikecluster_mutating_webhook.go index 085c7baec..7e6203ab9 100644 --- a/api/v1/aerospikecluster_mutating_webhook.go +++ b/api/v1/aerospikecluster_mutating_webhook.go @@ -1,5 +1,5 @@ /* -Copyright 2021. +Copyright 2024. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -95,9 +95,7 @@ func (c *AerospikeCluster) setDefaults(asLog logr.Logger) error { // Set common aerospikeConfig defaults // Update configMap - if err := c.setDefaultAerospikeConfigs( - asLog, *c.Spec.AerospikeConfig, - ); err != nil { + if err := c.setDefaultAerospikeConfigs(asLog, *c.Spec.AerospikeConfig, nil); err != nil { return err } @@ -261,18 +259,18 @@ func (c *AerospikeCluster) updateRacksAerospikeConfigFromGlobal(asLog logr.Logge c.Spec.AerospikeConfig.Value, rack.InputAerospikeConfig.Value, ) + if err != nil { + return err + } + asLog.V(1).Info( "Merged rack config from global aerospikeConfig", "rack id", rack.ID, "rackAerospikeConfig", m, "globalAerospikeConfig", c.Spec.AerospikeConfig, ) - - if err != nil { - return err - } } else { // Use the global config. - m = c.Spec.AerospikeConfig.Value + m = c.Spec.AerospikeConfig.DeepCopy().Value } asLog.V(1).Info( @@ -282,9 +280,7 @@ func (c *AerospikeCluster) updateRacksAerospikeConfigFromGlobal(asLog logr.Logge // Set defaults in updated rack config // Above merge function may have overwritten defaults. - if err := c.setDefaultAerospikeConfigs( - asLog, AerospikeConfigSpec{Value: m}, - ); err != nil { + if err := c.setDefaultAerospikeConfigs(asLog, AerospikeConfigSpec{Value: m}, &rack.ID); err != nil { return err } @@ -294,15 +290,12 @@ func (c *AerospikeCluster) updateRacksAerospikeConfigFromGlobal(asLog logr.Logge return nil } -func (c *AerospikeCluster) setDefaultAerospikeConfigs( - asLog logr.Logger, configSpec AerospikeConfigSpec, -) error { +func (c *AerospikeCluster) setDefaultAerospikeConfigs(asLog logr.Logger, + configSpec AerospikeConfigSpec, rackID *int) error { config := configSpec.Value // namespace conf - if err := setDefaultNsConf( - asLog, configSpec, c.Spec.RackConfig.Namespaces, - ); err != nil { + if err := setDefaultNsConf(asLog, configSpec, c.Spec.RackConfig.Namespaces, rackID); err != nil { return err } @@ -369,10 +362,8 @@ func (n *AerospikeNetworkPolicy) setNetworkNamespace(namespace string) { // Helper // ***************************************************************************** -func setDefaultNsConf( - asLog logr.Logger, configSpec AerospikeConfigSpec, - rackEnabledNsList []string, -) error { +func setDefaultNsConf(asLog logr.Logger, configSpec AerospikeConfigSpec, + rackEnabledNsList []string, rackID *int) error { config := configSpec.Value // namespace conf nsConf, ok := config["namespaces"] @@ -406,20 +397,21 @@ func setDefaultNsConf( ) } - // Add dummy rack-id only for rackEnabled namespaces - defaultConfs := map[string]interface{}{"rack-id": DefaultRackID} - if nsName, ok := nsMap["name"]; ok { - if _, ok := nsName.(string); ok { - if isNameExist(rackEnabledNsList, nsName.(string)) { - // Add dummy rack-id, should be replaced with actual rack-id by init-container script - if err := setDefaultsInConfigMap( - asLog, nsMap, defaultConfs, - ); err != nil { - return fmt.Errorf( - "failed to set default aerospikeConfig.namespaces rack config: %v", - err, - ) + if name, ok := nsName.(string); ok { + if isNameExist(rackEnabledNsList, name) { + // Add rack-id only for rackEnabled namespaces + if rackID != nil { + // Add rack-id only in rack specific config, not in global config + defaultConfs := map[string]interface{}{"rack-id": *rackID} + if err := setDefaultsInConfigMap( + asLog, nsMap, defaultConfs, + ); err != nil { + return fmt.Errorf( + "failed to set default aerospikeConfig.namespaces rack config: %v", + err, + ) + } } } else { // User may have added this key or may have patched object with new smaller rackEnabledNamespace list @@ -427,7 +419,7 @@ func setDefaultNsConf( // that some namespace is removed from rackEnabledNamespace list and cluster needs rolling restart asLog.Info( "Name aerospikeConfig.namespaces.name not found in rackEnabled namespace list. "+ - "Namespace will not have defaultRackID", + "Namespace will not have any rackID", "nsName", nsName, "rackEnabledNamespaces", rackEnabledNsList, ) diff --git a/api/v1/aerospikecluster_types.go b/api/v1/aerospikecluster_types.go index 04b8e4da6..5fa0f1356 100644 --- a/api/v1/aerospikecluster_types.go +++ b/api/v1/aerospikecluster_types.go @@ -1,5 +1,5 @@ /* -Copyright 2021. +Copyright 2024. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -872,6 +872,9 @@ type AerospikePodStatus struct { //nolint:govet // for readability // PodSpecHash is ripemd160 hash of PodSpec used by this pod PodSpecHash string `json:"podSpecHash"` + // DynamicConfigFailed is true if aerospike config change failed to apply dynamically. + DynamicConfigFailed bool `json:"dynamicConfigFailed,omitempty"` + // IsSecurityEnabled is true if security is enabled in the pod IsSecurityEnabled bool `json:"isSecurityEnabled"` } @@ -913,7 +916,9 @@ func init() { } // CopySpecToStatus copy spec in status. Spec to Status DeepCopy doesn't work. It fails in reflect lib. -func CopySpecToStatus(spec *AerospikeClusterSpec) (*AerospikeClusterStatusSpec, error) { //nolint:dupl // not duplicate +// +//nolint:dupl // not duplicate +func CopySpecToStatus(spec *AerospikeClusterSpec) (*AerospikeClusterStatusSpec, error) { status := AerospikeClusterStatusSpec{} status.Size = spec.Size @@ -921,99 +926,88 @@ func CopySpecToStatus(spec *AerospikeClusterSpec) (*AerospikeClusterStatusSpec, status.MaxUnavailable = spec.MaxUnavailable // Storage - statusStorage := AerospikeStorageSpec{} - lib.DeepCopy(&statusStorage, &spec.Storage) + statusStorage := lib.DeepCopy(&spec.Storage).(*AerospikeStorageSpec) - status.Storage = statusStorage + status.Storage = *statusStorage if spec.AerospikeAccessControl != nil { // AerospikeAccessControl - statusAerospikeAccessControl := &AerospikeAccessControlSpec{} - lib.DeepCopy( - statusAerospikeAccessControl, spec.AerospikeAccessControl, - ) + statusAerospikeAccessControl := lib.DeepCopy( + spec.AerospikeAccessControl, + ).(*AerospikeAccessControlSpec) status.AerospikeAccessControl = statusAerospikeAccessControl } - // AerospikeConfig - statusAerospikeConfig := &AerospikeConfigSpec{} - lib.DeepCopy( - statusAerospikeConfig, spec.AerospikeConfig, - ) + if spec.AerospikeConfig != nil { + // AerospikeConfig + statusAerospikeConfig := lib.DeepCopy( + spec.AerospikeConfig, + ).(*AerospikeConfigSpec) - status.AerospikeConfig = statusAerospikeConfig + status.AerospikeConfig = statusAerospikeConfig + } if spec.ValidationPolicy != nil { // ValidationPolicy - statusValidationPolicy := &ValidationPolicySpec{} - lib.DeepCopy( - statusValidationPolicy, spec.ValidationPolicy, - ) + statusValidationPolicy := lib.DeepCopy( + spec.ValidationPolicy, + ).(*ValidationPolicySpec) status.ValidationPolicy = statusValidationPolicy } // RackConfig - statusRackConfig := RackConfig{} - lib.DeepCopy(&statusRackConfig, &spec.RackConfig) - status.RackConfig = statusRackConfig + statusRackConfig := lib.DeepCopy(&spec.RackConfig).(*RackConfig) + status.RackConfig = *statusRackConfig // AerospikeNetworkPolicy - statusAerospikeNetworkPolicy := AerospikeNetworkPolicy{} - lib.DeepCopy( - &statusAerospikeNetworkPolicy, &spec.AerospikeNetworkPolicy, - ) + statusAerospikeNetworkPolicy := lib.DeepCopy( + &spec.AerospikeNetworkPolicy, + ).(*AerospikeNetworkPolicy) - status.AerospikeNetworkPolicy = statusAerospikeNetworkPolicy + status.AerospikeNetworkPolicy = *statusAerospikeNetworkPolicy if spec.OperatorClientCertSpec != nil { - clientCertSpec := &AerospikeOperatorClientCertSpec{} - lib.DeepCopy( - clientCertSpec, spec.OperatorClientCertSpec, - ) + clientCertSpec := lib.DeepCopy( + spec.OperatorClientCertSpec, + ).(*AerospikeOperatorClientCertSpec) status.OperatorClientCertSpec = clientCertSpec } // Storage - statusPodSpec := AerospikePodSpec{} - lib.DeepCopy(&statusPodSpec, &spec.PodSpec) - status.PodSpec = statusPodSpec + statusPodSpec := lib.DeepCopy(&spec.PodSpec).(*AerospikePodSpec) + status.PodSpec = *statusPodSpec - seedsFinderServices := SeedsFinderServices{} - lib.DeepCopy( - &seedsFinderServices, &spec.SeedsFinderServices, - ) + seedsFinderServices := lib.DeepCopy( + &spec.SeedsFinderServices, + ).(*SeedsFinderServices) - status.SeedsFinderServices = seedsFinderServices + status.SeedsFinderServices = *seedsFinderServices // RosterNodeBlockList if len(spec.RosterNodeBlockList) != 0 { - var rosterNodeBlockList []string - - lib.DeepCopy( - &rosterNodeBlockList, &spec.RosterNodeBlockList, - ) + rosterNodeBlockList := lib.DeepCopy( + &spec.RosterNodeBlockList, + ).(*[]string) - status.RosterNodeBlockList = rosterNodeBlockList + status.RosterNodeBlockList = *rosterNodeBlockList } if len(spec.K8sNodeBlockList) != 0 { - var k8sNodeBlockList []string - - lib.DeepCopy( - &k8sNodeBlockList, &spec.K8sNodeBlockList, - ) + k8sNodeBlockList := lib.DeepCopy(&spec.K8sNodeBlockList).(*[]string) - status.K8sNodeBlockList = k8sNodeBlockList + status.K8sNodeBlockList = *k8sNodeBlockList } return &status, nil } // CopyStatusToSpec copy status in spec. Status to Spec DeepCopy doesn't work. It fails in reflect lib. -func CopyStatusToSpec(status *AerospikeClusterStatusSpec) (*AerospikeClusterSpec, error) { //nolint:dupl // no need +// +//nolint:dupl // not duplicate +func CopyStatusToSpec(status *AerospikeClusterStatusSpec) (*AerospikeClusterSpec, error) { spec := AerospikeClusterSpec{} spec.Size = status.Size @@ -1021,93 +1015,79 @@ func CopyStatusToSpec(status *AerospikeClusterStatusSpec) (*AerospikeClusterSpec spec.MaxUnavailable = status.MaxUnavailable // Storage - specStorage := AerospikeStorageSpec{} - lib.DeepCopy(&specStorage, &status.Storage) - spec.Storage = specStorage + specStorage := lib.DeepCopy(&status.Storage).(*AerospikeStorageSpec) + spec.Storage = *specStorage if status.AerospikeAccessControl != nil { // AerospikeAccessControl - specAerospikeAccessControl := &AerospikeAccessControlSpec{} - lib.DeepCopy( - specAerospikeAccessControl, status.AerospikeAccessControl, - ) + specAerospikeAccessControl := lib.DeepCopy( + status.AerospikeAccessControl, + ).(*AerospikeAccessControlSpec) spec.AerospikeAccessControl = specAerospikeAccessControl } // AerospikeConfig - specAerospikeConfig := &AerospikeConfigSpec{} - lib.DeepCopy( - specAerospikeConfig, status.AerospikeConfig, - ) + if status.AerospikeConfig != nil { + specAerospikeConfig := lib.DeepCopy( + status.AerospikeConfig, + ).(*AerospikeConfigSpec) - spec.AerospikeConfig = specAerospikeConfig + spec.AerospikeConfig = specAerospikeConfig + } if status.ValidationPolicy != nil { // ValidationPolicy - specValidationPolicy := &ValidationPolicySpec{} - lib.DeepCopy( - specValidationPolicy, status.ValidationPolicy, - ) + specValidationPolicy := lib.DeepCopy( + status.ValidationPolicy, + ).(*ValidationPolicySpec) spec.ValidationPolicy = specValidationPolicy } // RackConfig - specRackConfig := RackConfig{} - lib.DeepCopy(&specRackConfig, &status.RackConfig) + specRackConfig := lib.DeepCopy(&status.RackConfig).(*RackConfig) - spec.RackConfig = specRackConfig + spec.RackConfig = *specRackConfig // AerospikeNetworkPolicy - specAerospikeNetworkPolicy := AerospikeNetworkPolicy{} - lib.DeepCopy( - &specAerospikeNetworkPolicy, &status.AerospikeNetworkPolicy, - ) + specAerospikeNetworkPolicy := lib.DeepCopy( + &status.AerospikeNetworkPolicy, + ).(*AerospikeNetworkPolicy) - spec.AerospikeNetworkPolicy = specAerospikeNetworkPolicy + spec.AerospikeNetworkPolicy = *specAerospikeNetworkPolicy if status.OperatorClientCertSpec != nil { - clientCertSpec := &AerospikeOperatorClientCertSpec{} - lib.DeepCopy( - clientCertSpec, status.OperatorClientCertSpec, - ) + clientCertSpec := lib.DeepCopy( + status.OperatorClientCertSpec, + ).(*AerospikeOperatorClientCertSpec) spec.OperatorClientCertSpec = clientCertSpec } // Storage - specPodSpec := AerospikePodSpec{} - lib.DeepCopy(&specPodSpec, &status.PodSpec) + specPodSpec := lib.DeepCopy(&status.PodSpec).(*AerospikePodSpec) - spec.PodSpec = specPodSpec + spec.PodSpec = *specPodSpec - seedsFinderServices := SeedsFinderServices{} - lib.DeepCopy( - &seedsFinderServices, &status.SeedsFinderServices, - ) + seedsFinderServices := lib.DeepCopy( + &status.SeedsFinderServices, + ).(*SeedsFinderServices) - spec.SeedsFinderServices = seedsFinderServices + spec.SeedsFinderServices = *seedsFinderServices // RosterNodeBlockList if len(status.RosterNodeBlockList) != 0 { - var rosterNodeBlockList []string - - lib.DeepCopy( - &rosterNodeBlockList, &status.RosterNodeBlockList, - ) + rosterNodeBlockList := lib.DeepCopy( + &status.RosterNodeBlockList, + ).(*[]string) - spec.RosterNodeBlockList = rosterNodeBlockList + spec.RosterNodeBlockList = *rosterNodeBlockList } if len(status.K8sNodeBlockList) != 0 { - var k8sNodeBlockList []string - - lib.DeepCopy( - &k8sNodeBlockList, &status.K8sNodeBlockList, - ) - - spec.K8sNodeBlockList = k8sNodeBlockList + k8sNodeBlockList := lib.DeepCopy(&status.K8sNodeBlockList).(*[]string) + spec.K8sNodeBlockList = *k8sNodeBlockList } return &spec, nil diff --git a/api/v1/aerospikecluster_validating_webhook.go b/api/v1/aerospikecluster_validating_webhook.go index c15d55aff..414040093 100644 --- a/api/v1/aerospikecluster_validating_webhook.go +++ b/api/v1/aerospikecluster_validating_webhook.go @@ -41,7 +41,6 @@ import ( internalerrors "github.com/aerospike/aerospike-kubernetes-operator/errors" lib "github.com/aerospike/aerospike-management-lib" "github.com/aerospike/aerospike-management-lib/asconfig" - "github.com/aerospike/aerospike-management-lib/deployment" ) var networkConnectionTypes = []string{"service", "heartbeat", "fabric"} @@ -93,7 +92,7 @@ func (c *AerospikeCluster) ValidateUpdate(oldObj runtime.Object) (admission.Warn return nil, err } - if err := deployment.IsValidUpgrade( + if err := asconfig.IsValidUpgrade( outgoingVersion, incomingVersion, ); err != nil { return nil, fmt.Errorf("failed to start upgrade: %v", err) @@ -1382,7 +1381,7 @@ func validateAerospikeConfigUpdate( } } - return validateNsConfUpdate(incomingSpec, outgoingSpec, currentStatus) + return validateNsConfUpdate(incomingSpec, outgoingSpec, currentStatus, incomingVersion) } func validateTLSUpdate(oldConf, newConf map[string]interface{}) error { @@ -1540,7 +1539,7 @@ func validateNetworkPolicyUpdate(oldPolicy, newPolicy *AerospikeNetworkPolicy) e return nil } -func validateNsConfUpdate(newConfSpec, oldConfSpec, currentStatus *AerospikeConfigSpec) error { +func validateNsConfUpdate(newConfSpec, oldConfSpec, currentStatus *AerospikeConfigSpec, incomingVersion string) error { newConf := newConfSpec.Value oldConf := oldConfSpec.Value @@ -1574,8 +1573,13 @@ func validateNsConfUpdate(newConfSpec, oldConfSpec, currentStatus *AerospikeConf } if singleConf["name"] == oldSingleConf["name"] { - // replication-factor update not allowed - if isValueUpdated( + val, err := lib.CompareVersions(incomingVersion, Version6) + if err != nil { + return fmt.Errorf("failed to check image version: %v", err) + } + + // For versions 6.0 and later, replication-factor is dynamic for AP namespaces (non strong-consistency). + if (IsNSSCEnabled(singleConf) || val < 0) && isValueUpdated( oldSingleConf, singleConf, "replication-factor", ) { return fmt.Errorf( diff --git a/api/v1/aerospikeconfig.go b/api/v1/aerospikeconfig.go index 824b75a40..d54137182 100644 --- a/api/v1/aerospikeconfig.go +++ b/api/v1/aerospikeconfig.go @@ -24,10 +24,5 @@ func (c *AerospikeConfigSpec) UnmarshalJSON(b []byte) error { } func (c *AerospikeConfigSpec) DeepCopy() *AerospikeConfigSpec { - dst := &AerospikeConfigSpec{ - Value: map[string]interface{}{}, - } - lib.DeepCopy(dst, c) - - return dst + return lib.DeepCopy(c).(*AerospikeConfigSpec) } diff --git a/api/v1/utils.go b/api/v1/utils.go index bea471b81..f4062b48f 100644 --- a/api/v1/utils.go +++ b/api/v1/utils.go @@ -63,7 +63,7 @@ const ( AerospikeInitContainerRegistryEnvVar = "AEROSPIKE_KUBERNETES_INIT_REGISTRY" AerospikeInitContainerDefaultRegistry = "docker.io" AerospikeInitContainerDefaultRegistryNamespace = "aerospike" - AerospikeInitContainerDefaultRepoAndTag = "aerospike-kubernetes-init:2.2.0-dev1" + AerospikeInitContainerDefaultRepoAndTag = "aerospike-kubernetes-init:2.2.0-dev2" AerospikeAppLabel = "app" AerospikeCustomResourceLabel = "aerospike.com/cr" AerospikeRackIDLabel = "aerospike.com/rack-id" diff --git a/api/v1beta1/aerospikeconfig.go b/api/v1beta1/aerospikeconfig.go index 7d7ae431c..5d4f83fa5 100644 --- a/api/v1beta1/aerospikeconfig.go +++ b/api/v1beta1/aerospikeconfig.go @@ -24,11 +24,5 @@ func (c *AerospikeConfigSpec) UnmarshalJSON(b []byte) error { } func (c *AerospikeConfigSpec) DeepCopy() *AerospikeConfigSpec { - src := *c - dst := AerospikeConfigSpec{ - Value: map[string]interface{}{}, - } - lib.DeepCopy(dst.Value, src.Value) - - return &dst + return lib.DeepCopy(c).(*AerospikeConfigSpec) } diff --git a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml index 25ad29de4..c977eb9b7 100644 --- a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml +++ b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml @@ -14238,6 +14238,10 @@ spec: items: type: string type: array + dynamicConfigFailed: + description: DynamicConfigFailed is true if aerospike config + change failed to apply dynamically. + type: boolean hostExternalIP: description: HostExternalIP of the K8s host this pod is scheduled on. diff --git a/config/manifests/rbac/aerospikecluster_role.yaml b/config/manifests/rbac/aerospikecluster_role.yaml index 12283773a..b3c8f36bc 100644 --- a/config/manifests/rbac/aerospikecluster_role.yaml +++ b/config/manifests/rbac/aerospikecluster_role.yaml @@ -6,7 +6,6 @@ rules: - apiGroups: - "" resources: - - pods - nodes - services - configmaps @@ -19,4 +18,12 @@ rules: resources: - '*' verbs: - - '*' \ No newline at end of file + - '*' +- apiGroups: + - "" + resources: + - pods + verbs: + - get + - list + - update \ No newline at end of file diff --git a/controllers/aero_info_calls.go b/controllers/aero_info_calls.go index fc005aee0..df016e31a 100644 --- a/controllers/aero_info_calls.go +++ b/controllers/aero_info_calls.go @@ -1,5 +1,5 @@ /* -Copyright 2018 The aerospike-operator Authors. +Copyright 2024 The aerospike-operator Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -14,6 +14,7 @@ limitations under the License. package controllers import ( + "context" "fmt" "time" @@ -22,7 +23,9 @@ import ( as "github.com/aerospike/aerospike-client-go/v7" asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" + "github.com/aerospike/aerospike-kubernetes-operator/pkg/jsonpatch" "github.com/aerospike/aerospike-kubernetes-operator/pkg/utils" + "github.com/aerospike/aerospike-management-lib/asconfig" "github.com/aerospike/aerospike-management-lib/deployment" ) @@ -296,3 +299,79 @@ func (r *SingleClusterReconciler) setMigrateFillDelay( return reconcileSuccess() } + +func (r *SingleClusterReconciler) setDynamicConfig( + dynamicConfDiffPerPod map[string]asconfig.DynamicConfigMap, pods []*corev1.Pod, ignorablePodNames sets.Set[string], +) reconcileResult { + // This doesn't make actual connection, only objects having connection info are created + allHostConns, err := r.newAllHostConnWithOption(ignorablePodNames) + if err != nil { + return reconcileError( + fmt.Errorf( + "failed to get hostConn for aerospike cluster nodes: %v", err, + ), + ) + } + + podList := make([]corev1.Pod, 0, len(pods)) + podIPNameMap := make(map[string]string, len(pods)) + + for idx := range pods { + podIPNameMap[pods[idx].Status.PodIP] = pods[idx].Name + podList = append(podList, *pods[idx]) + } + + selectedHostConns, err := r.newPodsHostConnWithOption(podList, ignorablePodNames) + if err != nil { + return reconcileError( + fmt.Errorf( + "failed to get hostConn for aerospike cluster nodes: %v", err, + ), + ) + } + + if len(selectedHostConns) == 0 { + r.Log.Info("No pods selected for dynamic config change") + + return reconcileSuccess() + } + + for _, host := range selectedHostConns { + podName := podIPNameMap[host.ASConn.AerospikeHostName] + asConfCmds, err := asconfig.CreateSetConfigCmdList(dynamicConfDiffPerPod[podName], + host.ASConn, r.getClientPolicy()) + + if err != nil { + // Assuming error returned here will not be a server error. + return reconcileError(err) + } + + r.Log.Info("Generated dynamic config commands", "commands", fmt.Sprintf("%v", asConfCmds), "pod", podName) + + if err := deployment.SetConfigCommandsOnHosts(r.Log, r.getClientPolicy(), allHostConns, + []*deployment.HostConn{host}, asConfCmds); err != nil { + var patches []jsonpatch.PatchOperation + + patch := jsonpatch.PatchOperation{ + Operation: "replace", + Path: "/status/pods/" + podName + "/dynamicConfigFailed", + Value: true, + } + patches = append(patches, patch) + + if patchErr := r.patchPodStatus( + context.TODO(), patches, + ); patchErr != nil { + return reconcileError(fmt.Errorf("error updating status: %v, dynamic config command error: %v", patchErr, err)) + } + + return reconcileError(err) + } + + if err := r.updateAerospikeConfInPod(podName); err != nil { + return reconcileError(err) + } + } + + return reconcileSuccess() +} diff --git a/controllers/configmap.go b/controllers/configmap.go index c4a88f9b3..1514a63ba 100644 --- a/controllers/configmap.go +++ b/controllers/configmap.go @@ -8,7 +8,6 @@ import ( "fmt" "io/fs" "path/filepath" - "strconv" "strings" "text/template" @@ -89,15 +88,6 @@ func init() { } } -func getNamespacedNameForSTSConfigMap( - aeroCluster *asdbv1.AerospikeCluster, rackID int, -) types.NamespacedName { - return types.NamespacedName{ - Name: aeroCluster.Name + "-" + strconv.Itoa(rackID), - Namespace: aeroCluster.Namespace, - } -} - // createConfigMapData create configMap data func (r *SingleClusterReconciler) createConfigMapData(rack *asdbv1.Rack) ( map[string]string, error, @@ -167,16 +157,15 @@ func (r *SingleClusterReconciler) createConfigMapData(rack *asdbv1.Rack) ( func createPodSpecForRack( aeroCluster *asdbv1.AerospikeCluster, rack *asdbv1.Rack, ) *asdbv1.AerospikePodSpec { - rackFullPodSpec := asdbv1.AerospikePodSpec{} - lib.DeepCopy( - &rackFullPodSpec, &aeroCluster.Spec.PodSpec, - ) + rackFullPodSpec := lib.DeepCopy( + &aeroCluster.Spec.PodSpec, + ).(*asdbv1.AerospikePodSpec) rackFullPodSpec.Affinity = rack.PodSpec.Affinity rackFullPodSpec.Tolerations = rack.PodSpec.Tolerations rackFullPodSpec.NodeSelector = rack.PodSpec.NodeSelector - return &rackFullPodSpec + return rackFullPodSpec } func (r *SingleClusterReconciler) buildConfigTemplate(rack *asdbv1.Rack) ( @@ -311,7 +300,7 @@ func (r *SingleClusterReconciler) getFQDNsForCluster() ([]string, error) { for idx := range rackStateList { rackState := &rackStateList[idx] size := rackState.Size - stsName := getNamespacedNameForSTS(r.aeroCluster, rackState.Rack.ID) + stsName := utils.GetNamespacedNameForSTSOrConfigMap(r.aeroCluster, rackState.Rack.ID) for i := 0; i < size; i++ { fqdn := getFQDNForPod(r.aeroCluster, getSTSPodName(stsName.Name, int32(i))) diff --git a/controllers/pod.go b/controllers/pod.go index 9509986b9..c2e0f9e33 100644 --- a/controllers/pod.go +++ b/controllers/pod.go @@ -15,11 +15,14 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" "github.com/aerospike/aerospike-kubernetes-operator/pkg/jsonpatch" "github.com/aerospike/aerospike-kubernetes-operator/pkg/utils" + lib "github.com/aerospike/aerospike-management-lib" + "github.com/aerospike/aerospike-management-lib/asconfig" ) // RestartType is the type of pod restart to use. @@ -29,6 +32,9 @@ const ( // noRestart needed. noRestart RestartType = iota + // noRestartUpdateConf indicates that restart is not needed but conf file has to be updated. + noRestartUpdateConf + // podRestart indicates that restart requires a restart of the pod. podRestart @@ -37,7 +43,7 @@ const ( ) // mergeRestartType generates the updated restart type based on precedence. -// podRestart > quickRestart > noRestart +// podRestart > quickRestart > noRestartUpdateConf > noRestart func mergeRestartType(current, incoming RestartType) RestartType { if current == podRestart || incoming == podRestart { return podRestart @@ -47,20 +53,29 @@ func mergeRestartType(current, incoming RestartType) RestartType { return quickRestart } + if current == noRestartUpdateConf || incoming == noRestartUpdateConf { + return noRestartUpdateConf + } + return noRestart } // Fetching RestartType of all pods, based on the operation being performed. -func (r *SingleClusterReconciler) getRollingRestartTypeMap( - rackState *RackState, pods []*corev1.Pod, ignorablePodNames sets.Set[string], -) (map[string]RestartType, error) { +func (r *SingleClusterReconciler) getRollingRestartTypeMap(rackState *RackState, ignorablePodNames sets.Set[string]) ( + restartTypeMap map[string]RestartType, dynamicConfDiffPerPod map[string]asconfig.DynamicConfigMap, err error) { var addedNSDevices []string - restartTypeMap := make(map[string]RestartType) + restartTypeMap = make(map[string]RestartType) + dynamicConfDiffPerPod = make(map[string]asconfig.DynamicConfigMap) + + pods, err := r.getOrderedRackPodList(rackState.Rack.ID) + if err != nil { + return nil, nil, fmt.Errorf("failed to list pods: %v", err) + } confMap, err := r.getConfigMap(rackState.Rack.ID) if err != nil { - return nil, err + return nil, nil, err } blockedK8sNodes := sets.NewString(r.aeroCluster.Spec.K8sNodeBlockList...) @@ -81,24 +96,56 @@ func (r *SingleClusterReconciler) getRollingRestartTypeMap( } podStatus := r.aeroCluster.Status.Pods[pods[idx].Name] - if addedNSDevices == nil && podStatus.AerospikeConfigHash != requiredConfHash { - // Fetching all block devices that have been added in namespaces. - addedNSDevices, err = r.getNSAddedDevices(rackState) + if podStatus.AerospikeConfigHash != requiredConfHash { + if addedNSDevices == nil { + // Fetching all block devices that have been added in namespaces. + addedNSDevices, err = r.getNSAddedDevices(rackState) + if err != nil { + return nil, nil, err + } + } + + serverContainer := getContainer(pods[idx].Spec.Containers, asdbv1.AerospikeServerContainerName) + + version, err := asdbv1.GetImageVersion(serverContainer.Image) + if err != nil { + return nil, nil, err + } + + v, err := lib.CompareVersions(version, "6.0.0") if err != nil { - return nil, err + return nil, nil, err + } + + // If version >= 6.0.0, then we can update config dynamically. + if v >= 0 { + // If dynamic commands have failed in previous retry, then we should not try to update config dynamically. + if !podStatus.DynamicConfigFailed { + // Fetching all dynamic config change. + dynamicConfDiffPerPod[pods[idx].Name], err = r.handleDynamicConfigChange(rackState, pods[idx], version) + if err != nil { + return nil, nil, err + } + } + } else { + r.Log.Info("Dynamic config change not supported for version < 6.0.0", "currentVersion", version) } } - restartType := r.getRollingRestartTypePod(rackState, pods[idx], confMap, addedNSDevices) + restartTypeMap[pods[idx].Name] = r.getRollingRestartTypePod(rackState, pods[idx], confMap, addedNSDevices, + len(dynamicConfDiffPerPod[pods[idx].Name]) > 0) - restartTypeMap[pods[idx].Name] = restartType + if podStatus.DynamicConfigFailed { + restartTypeMap[pods[idx].Name] = mergeRestartType(restartTypeMap[pods[idx].Name], quickRestart) + } } - return restartTypeMap, nil + return restartTypeMap, dynamicConfDiffPerPod, nil } func (r *SingleClusterReconciler) getRollingRestartTypePod( - rackState *RackState, pod *corev1.Pod, confMap *corev1.ConfigMap, addedNSDevices []string, + rackState *RackState, pod *corev1.Pod, confMap *corev1.ConfigMap, + addedNSDevices []string, onlyDynamicConfigChange bool, ) RestartType { restartType := noRestart @@ -115,14 +162,20 @@ func (r *SingleClusterReconciler) getRollingRestartTypePod( // Check if aerospikeConfig is updated if podStatus.AerospikeConfigHash != requiredConfHash { + podRestartType := quickRestart // checking if volumes added in namespace is part of dirtyVolumes. // if yes, then podRestart is needed. - podRestartType := r.handleNSOrDeviceAddition(addedNSDevices, pod.Name) + if len(addedNSDevices) > 0 { + podRestartType = r.handleNSOrDeviceAddition(addedNSDevices, pod.Name) + } else if onlyDynamicConfigChange { + // If only dynamic config change is there, then we can update config dynamically. + podRestartType = noRestartUpdateConf + } restartType = mergeRestartType(restartType, podRestartType) r.Log.Info( - "AerospikeConfig changed. Need rolling restart", + "AerospikeConfig changed. Need rolling restart or update config dynamically", "requiredHash", requiredConfHash, "currentHash", podStatus.AerospikeConfigHash, ) @@ -197,14 +250,37 @@ func (r *SingleClusterReconciler) rollingRestartPods( return reconcileSuccess() } -func (r *SingleClusterReconciler) restartASDInPod( - rackState *RackState, pod *corev1.Pod, -) error { - cmName := getNamespacedNameForSTSConfigMap(r.aeroCluster, rackState.Rack.ID) +func (r *SingleClusterReconciler) restartASDOrUpdateAerospikeConf(podName string, + operation RestartType) error { + rackID, err := utils.GetRackIDFromPodName(podName) + if err != nil { + return fmt.Errorf( + "failed to get rackID for the pod %s", podName, + ) + } + + podNamespacedName := types.NamespacedName{ + Name: podName, + Namespace: r.aeroCluster.Namespace, + } + + cmName := utils.GetNamespacedNameForSTSOrConfigMap(r.aeroCluster, *rackID) initBinary := "/etc/aerospike/akoinit" + + var subCommand string + + switch operation { + case noRestart, podRestart: + return fmt.Errorf("invalid operation for akoinit") + case quickRestart: + subCommand = "quick-restart" + case noRestartUpdateConf: + subCommand = "update-conf" + } + cmd := []string{ initBinary, - "quick-restart", + subCommand, "--cm-name", cmName.Name, "--cm-namespace", @@ -214,7 +290,7 @@ func (r *SingleClusterReconciler) restartASDInPod( // Quick restart attempt should not take significant time. // Therefore, it's ok to block the operator on the quick restart attempt. stdout, stderr, err := utils.Exec( - pod, asdbv1.AerospikeServerContainerName, cmd, r.KubeClient, + podNamespacedName, asdbv1.AerospikeServerContainerName, cmd, r.KubeClient, r.KubeConfig, ) if err != nil { @@ -229,13 +305,13 @@ func (r *SingleClusterReconciler) restartASDInPod( // Quick restart attempt should not take significant time. // Therefore, it's ok to block the operator on the quick restart attempt. stdout, stderr, err = utils.Exec( - pod, asdbv1.AerospikeServerContainerName, cmd, r.KubeClient, + podNamespacedName, asdbv1.AerospikeServerContainerName, cmd, r.KubeClient, r.KubeConfig, ) if err != nil { r.Log.V(1).Info( - "Failed warm restart", "err", err, "podName", pod.Name, "stdout", + "Failed warm restart", "err", err, "podName", podNamespacedName.Name, "stdout", stdout, "stderr", stderr, ) @@ -243,7 +319,7 @@ func (r *SingleClusterReconciler) restartASDInPod( } } else { r.Log.V(1).Info( - "Failed warm restart", "err", err, "podName", pod.Name, "stdout", + "Failed to perform", "operation", subCommand, "err", err, "podName", podNamespacedName.Name, "stdout", stdout, "stderr", stderr, ) @@ -251,11 +327,19 @@ func (r *SingleClusterReconciler) restartASDInPod( } } - r.Recorder.Eventf( - r.aeroCluster, corev1.EventTypeNormal, "PodWarmRestarted", - "[rack-%d] Restarted Pod %s", rackState.Rack.ID, pod.Name, - ) - r.Log.V(1).Info("Pod warm restarted", "podName", pod.Name) + if subCommand == "quick-restart" { + r.Recorder.Eventf( + r.aeroCluster, corev1.EventTypeNormal, "PodWarmRestarted", + "[rack-%d] Restarted Pod %s", *rackID, podNamespacedName.Name, + ) + r.Log.V(1).Info("Pod warm restarted", "podName", podNamespacedName.Name) + } else { + r.Recorder.Eventf( + r.aeroCluster, corev1.EventTypeNormal, "PodConfUpdated", + "[rack-%d] Updated Pod %s", *rackID, podNamespacedName.Name, + ) + r.Log.V(1).Info("Pod conf updated", "podName", podNamespacedName.Name) + } return nil } @@ -279,7 +363,7 @@ func (r *SingleClusterReconciler) restartPods( if restartType == quickRestart { // If ASD restart fails, then go ahead and restart the pod - if err := r.restartASDInPod(rackState, pod); err == nil { + if err := r.restartASDOrUpdateAerospikeConf(pod.Name, quickRestart); err == nil { continue } } @@ -310,6 +394,18 @@ func (r *SingleClusterReconciler) restartPods( return reconcileSuccess() } +func (r *SingleClusterReconciler) updateAerospikeConfInPod(podName string) error { + r.Log.Info("Updating aerospike config file in pod", "pod", podName) + + if err := r.restartASDOrUpdateAerospikeConf(podName, noRestartUpdateConf); err != nil { + return err + } + + r.Log.V(1).Info("Updated aerospike config file in pod", "podName", podName) + + return nil +} + func (r *SingleClusterReconciler) ensurePodsRunningAndReady(podsToCheck []*corev1.Pod) reconcileResult { podNames := getPodNames(podsToCheck) readyPods := map[string]bool{} @@ -643,22 +739,7 @@ func (r *SingleClusterReconciler) removePodStatus(podNames []string) error { patches = append(patches, patch) } - jsonPatchJSON, err := json.Marshal(patches) - if err != nil { - return fmt.Errorf("error creating json-patch : %v", err) - } - - constantPatch := client.RawPatch(types.JSONPatchType, jsonPatchJSON) - - // Since the pod status is updated from pod init container, - // set the field owner to "pod" for pod status updates. - if err = r.Client.Status().Patch( - context.TODO(), r.aeroCluster, constantPatch, client.FieldOwner("pod"), - ); err != nil { - return fmt.Errorf("error updating status: %v", err) - } - - return nil + return r.patchPodStatus(context.TODO(), patches) } func (r *SingleClusterReconciler) cleanupDanglingPodsRack(sts *appsv1.StatefulSet, rackState *RackState) error { @@ -691,8 +772,10 @@ func (r *SingleClusterReconciler) cleanupDanglingPodsRack(sts *appsv1.StatefulSe } } - if err := r.cleanupPods(danglingPods, rackState); err != nil { - return fmt.Errorf("failed dangling pod cleanup: %v", err) + if len(danglingPods) > 0 { + if err := r.cleanupPods(danglingPods, rackState); err != nil { + return fmt.Errorf("failed dangling pod cleanup: %v", err) + } } return nil @@ -1052,7 +1135,7 @@ func (r *SingleClusterReconciler) handleNSOrDeviceRemoval(rackState *RackState, } for _, pod := range podsToRestart { - err := r.handleNSOrDeviceRemovalPerPod(removedDevices, removedFiles, pod) + err := r.handleNSOrDeviceRemovalPerPod(removedDevices, removedFiles, pod.Name) if err != nil { return err } @@ -1062,12 +1145,12 @@ func (r *SingleClusterReconciler) handleNSOrDeviceRemoval(rackState *RackState, } func (r *SingleClusterReconciler) handleNSOrDeviceRemovalPerPod( - removedDevices []string, removedFiles []string, pod *corev1.Pod, + removedDevices, removedFiles []string, podName string, ) error { - podStatus := r.aeroCluster.Status.Pods[pod.Name] + podStatus := r.aeroCluster.Status.Pods[podName] for _, file := range removedFiles { - err := r.deleteFileStorage(pod, file) + err := r.deleteFileStorage(podName, file) if err != nil { return err } @@ -1082,25 +1165,14 @@ func (r *SingleClusterReconciler) handleNSOrDeviceRemovalPerPod( patch1 := jsonpatch.PatchOperation{ Operation: "replace", - Path: "/status/pods/" + pod.Name + "/dirtyVolumes", + Path: "/status/pods/" + podName + "/dirtyVolumes", Value: sets.List(dirtyVolumes), } patches = append(patches, patch1) - jsonPatchJSON, err := json.Marshal(patches) - if err != nil { + if err := r.patchPodStatus(context.TODO(), patches); err != nil { return err } - - constantPatch := client.RawPatch(types.JSONPatchType, jsonPatchJSON) - - // Since the pod status is updated from pod init container, - // set the field owner to "pod" for pod status updates. - if err = r.Client.Status().Patch( - context.TODO(), r.aeroCluster, constantPatch, client.FieldOwner("pod"), - ); err != nil { - return fmt.Errorf("error updating status: %v", err) - } } return nil @@ -1227,7 +1299,7 @@ func getVolumeNameFromDevicePath(volumes []asdbv1.VolumeSpec, s string) string { return "" } -func (r *SingleClusterReconciler) deleteFileStorage(pod *corev1.Pod, fileName string) error { +func (r *SingleClusterReconciler) deleteFileStorage(podName, fileName string) error { cmd := []string{ "bash", "-c", fmt.Sprintf( "rm -rf %s", @@ -1235,14 +1307,17 @@ func (r *SingleClusterReconciler) deleteFileStorage(pod *corev1.Pod, fileName st ), } r.Log.Info( - "Deleting file", "file", fileName, "cmd", cmd, "podname", pod.Name, + "Deleting file", "file", fileName, "cmd", cmd, "podname", podName, ) - stdout, stderr, err := utils.Exec(pod, asdbv1.AerospikeServerContainerName, cmd, r.KubeClient, r.KubeConfig) + podNamespacedName := types.NamespacedName{Name: podName, Namespace: r.aeroCluster.Namespace} + + stdout, stderr, err := utils.Exec(podNamespacedName, asdbv1.AerospikeServerContainerName, + cmd, r.KubeClient, r.KubeConfig) if err != nil { r.Log.V(1).Info( - "File deletion failed", "err", err, "podName", pod.Name, "stdout", + "File deletion failed", "err", err, "podName", podName, "stdout", stdout, "stderr", stderr, ) @@ -1253,7 +1328,7 @@ func (r *SingleClusterReconciler) deleteFileStorage(pod *corev1.Pod, fileName st } func (r *SingleClusterReconciler) getConfigMap(rackID int) (*corev1.ConfigMap, error) { - cmName := getNamespacedNameForSTSConfigMap(r.aeroCluster, rackID) + cmName := utils.GetNamespacedNameForSTSOrConfigMap(r.aeroCluster, rackID) confMap := &corev1.ConfigMap{} if err := r.Client.Get(context.TODO(), cmName, confMap); err != nil { @@ -1262,3 +1337,122 @@ func (r *SingleClusterReconciler) getConfigMap(rackID int) (*corev1.ConfigMap, e return confMap, nil } + +func (r *SingleClusterReconciler) handleDynamicConfigChange(rackState *RackState, pod *corev1.Pod, version string) ( + asconfig.DynamicConfigMap, error) { + statusFromAnnotation := pod.Annotations["aerospikeConf"] + + asConfStatus, err := getFlatConfig(r.Log, statusFromAnnotation) + if err != nil { + return nil, fmt.Errorf("failed to load config map by lib: %v", err) + } + + asConf, err := asconfig.NewMapAsConfig(r.Log, rackState.Rack.AerospikeConfig.Value) + if err != nil { + return nil, fmt.Errorf("failed to load config map by lib: %v", err) + } + + asConfSpec, err := getFlatConfig(r.Log, asConf.ToConfFile()) + if err != nil { + return nil, fmt.Errorf("failed to load config map by lib: %v", err) + } + + specToStatusDiffs, err := asconfig.ConfDiff(r.Log, *asConfSpec, *asConfStatus, + true, version) + if err != nil { + r.Log.Info("Failed to get config diff to change config dynamically, fallback to rolling restart: %v", err) + return nil, nil + } + + if len(specToStatusDiffs) > 0 { + isDynamic, err := asconfig.IsAllDynamicConfig(r.Log, specToStatusDiffs, version) + if err != nil { + r.Log.Info("Failed to check if all config is dynamic, fallback to rolling restart: %v", err) + return nil, nil + } + + if !isDynamic { + r.Log.Info("Static field has been modified, cannot change config dynamically") + return nil, nil + } + } + + return specToStatusDiffs, nil +} + +// mapping functions get mapped to each key value pair in a management lib Stats map +// m is the map that k and v came from +type mapping func(k string, v any, m asconfig.Conf) + +// mutateMap maps functions to each key value pair in the management lib's Stats map +// the functions are applied sequentially to each k,v pair. +func mutateMap(in asconfig.Conf, funcs []mapping) { + for k, v := range in { + switch v := v.(type) { + case asconfig.Conf: + mutateMap(v, funcs) + case []asconfig.Conf: + for _, lv := range v { + mutateMap(lv, funcs) + } + } + + for _, f := range funcs { + f(k, in[k], in) + } + } +} + +func getFlatConfig(log logger, confStr string) (*asconfig.Conf, error) { + asConf, err := asconfig.FromConfFile(log, strings.NewReader(confStr)) + if err != nil { + return nil, fmt.Errorf("failed to load config map by lib: %v", err) + } + + cmap := *asConf.ToMap() + + mutateMap(cmap, []mapping{ + asconfig.ToPlural, + }) + + asConf, err = asconfig.NewMapAsConfig( + log, + cmap, + ) + + if err != nil { + return nil, err + } + + return asConf.GetFlatMap(), nil +} + +func (r *SingleClusterReconciler) patchPodStatus(ctx context.Context, patches []jsonpatch.PatchOperation) error { + if len(patches) == 0 { + return nil + } + + jsonPatchJSON, err := json.Marshal(patches) + if err != nil { + return err + } + + constantPatch := client.RawPatch(types.JSONPatchType, jsonPatchJSON) + + return retry.OnError(retry.DefaultBackoff, func(err error) bool { + // Customize the error check for retrying, return true to retry, false to stop retrying + return true + }, func() error { + // Patch the resource + // Since the pod status is updated from pod init container, + // set the field owner to "pod" for pod status updates. + if err := r.Client.Status().Patch( + ctx, r.aeroCluster, constantPatch, client.FieldOwner("pod"), + ); err != nil { + return fmt.Errorf("error updating status: %v", err) + } + + r.Log.Info("Pod status patched successfully") + return nil + }) +} diff --git a/controllers/rack.go b/controllers/rack.go index 087b56fef..c442a9c3b 100644 --- a/controllers/rack.go +++ b/controllers/rack.go @@ -18,6 +18,7 @@ import ( asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" "github.com/aerospike/aerospike-kubernetes-operator/pkg/utils" lib "github.com/aerospike/aerospike-management-lib" + "github.com/aerospike/aerospike-management-lib/asconfig" ) type scaledDownRack struct { @@ -61,7 +62,7 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { state := &rackStateList[idx] found := &appsv1.StatefulSet{} - stsName := getNamespacedNameForSTS(r.aeroCluster, state.Rack.ID) + stsName := utils.GetNamespacedNameForSTSOrConfigMap(r.aeroCluster, state.Rack.ID) if err = r.Client.Get(context.TODO(), stsName, found); err != nil { if !errors.IsNotFound(err) { @@ -127,7 +128,7 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { for idx := range rackStateList { state := &rackStateList[idx] found := &appsv1.StatefulSet{} - stsName := getNamespacedNameForSTS(r.aeroCluster, state.Rack.ID) + stsName := utils.GetNamespacedNameForSTSOrConfigMap(r.aeroCluster, state.Rack.ID) if err = r.Client.Get(context.TODO(), stsName, found); err != nil { if !errors.IsNotFound(err) { @@ -187,7 +188,7 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { for idx := range rackStateList { state := &rackStateList[idx] found := &appsv1.StatefulSet{} - stsName := getNamespacedNameForSTS(r.aeroCluster, state.Rack.ID) + stsName := utils.GetNamespacedNameForSTSOrConfigMap(r.aeroCluster, state.Rack.ID) if err := r.Client.Get(context.TODO(), stsName, found); err != nil { if !errors.IsNotFound(err) { @@ -233,13 +234,13 @@ func (r *SingleClusterReconciler) createEmptyRack(rackState *RackState) ( r.Log.Info("AerospikeCluster", "Spec", r.aeroCluster.Spec) // Bad config should not come here. It should be validated in validation hook - cmName := getNamespacedNameForSTSConfigMap(r.aeroCluster, rackState.Rack.ID) + cmName := utils.GetNamespacedNameForSTSOrConfigMap(r.aeroCluster, rackState.Rack.ID) if err := r.buildSTSConfigMap(cmName, rackState.Rack); err != nil { r.Log.Error(err, "Failed to create configMap from AerospikeConfig") return nil, reconcileError(err) } - stsName := getNamespacedNameForSTS(r.aeroCluster, rackState.Rack.ID) + stsName := utils.GetNamespacedNameForSTSOrConfigMap(r.aeroCluster, rackState.Rack.ID) found, err := r.createSTS(stsName, rackState) if err != nil { @@ -296,7 +297,7 @@ func (r *SingleClusterReconciler) deleteRacks( for idx := range racksToDelete { rack := &racksToDelete[idx] found := &appsv1.StatefulSet{} - stsName := getNamespacedNameForSTS(r.aeroCluster, rack.ID) + stsName := utils.GetNamespacedNameForSTSOrConfigMap(r.aeroCluster, rack.ID) err := r.Client.Get(context.TODO(), stsName, found) if err != nil { @@ -328,7 +329,7 @@ func (r *SingleClusterReconciler) deleteRacks( } // Delete configMap - cmName := getNamespacedNameForSTSConfigMap(r.aeroCluster, rack.ID) + cmName := utils.GetNamespacedNameForSTSOrConfigMap(r.aeroCluster, rack.ID) if err = r.deleteRackConfigMap(cmName); err != nil { return reconcileError(err) } @@ -358,7 +359,7 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat // So a check based on spec and status will skip configMap update. // Hence, a rolling restart of pod will never bring pod to desired config if err := r.updateSTSConfigMap( - getNamespacedNameForSTSConfigMap( + utils.GetNamespacedNameForSTSOrConfigMap( r.aeroCluster, rackState.Rack.ID, ), rackState.Rack, ); err != nil { @@ -396,13 +397,13 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat return found, res } } else { - var needRollingRestartRack, restartTypeMap, nErr = r.needRollingRestartRack(rackState, ignorablePodNames) + var rollingRestartInfo, nErr = r.getRollingRestartInfo(rackState, ignorablePodNames) if nErr != nil { return found, reconcileError(nErr) } - if needRollingRestartRack { - found, res = r.rollingRestartRack(found, rackState, ignorablePodNames, restartTypeMap, failedPods) + if rollingRestartInfo.needRestart { + found, res = r.rollingRestartRack(found, rackState, ignorablePodNames, rollingRestartInfo.restartTypeMap, failedPods) if !res.isSuccess { if res.err != nil { r.Log.Error( @@ -421,6 +422,28 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat return found, res } } + + if len(failedPods) == 0 && rollingRestartInfo.needUpdateConf { + res = r.updateDynamicConfig(rackState, ignorablePodNames, + rollingRestartInfo.restartTypeMap, rollingRestartInfo.dynamicConfDiffPerPod) + if !res.isSuccess { + if res.err != nil { + r.Log.Error( + res.err, "Failed to do dynamic update", "stsName", + found.Name, + ) + + r.Recorder.Eventf( + r.aeroCluster, corev1.EventTypeWarning, + "RackDynamicConfigUpdateFailed", + "[rack-%d] Failed to update aerospike config dynamically {STS: %s/%s}", + rackState.Rack.ID, found.Namespace, found.Name, + ) + } + + return found, res + } + } } if r.aeroCluster.Spec.RackConfig.MaxIgnorablePods != nil { @@ -440,6 +463,54 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat return found, reconcileSuccess() } +func (r *SingleClusterReconciler) updateDynamicConfig(rackState *RackState, + ignorablePodNames sets.Set[string], restartTypeMap map[string]RestartType, + dynamicConfDiffPerPod map[string]asconfig.DynamicConfigMap) reconcileResult { + r.Log.Info("Update dynamic config in Aerospike pods") + + r.Recorder.Eventf( + r.aeroCluster, corev1.EventTypeNormal, "DynamicConfigUpdate", + "[rack-%d] Started dynamic config update", rackState.Rack.ID, + ) + + var ( + err error + podList []*corev1.Pod + ) + + // List the pods for this aeroCluster's statefulset + podList, err = r.getOrderedRackPodList(rackState.Rack.ID) + if err != nil { + return reconcileError(fmt.Errorf("failed to list pods: %v", err)) + } + + // Find pods which needs restart + podsToUpdate := make([]*corev1.Pod, 0, len(podList)) + + for idx := range podList { + pod := podList[idx] + + restartType := restartTypeMap[pod.Name] + if restartType != noRestartUpdateConf { + r.Log.Info("This Pod doesn't need any update, Skip this", "pod", pod.Name) + continue + } + + podsToUpdate = append(podsToUpdate, pod) + } + + if res := r.setDynamicConfig(dynamicConfDiffPerPod, podsToUpdate, ignorablePodNames); !res.isSuccess { + return res + } + + r.Recorder.Eventf( + r.aeroCluster, corev1.EventTypeNormal, "DynamicConfigUpdate", + "[rack-%d] Finished Dynamic config update", rackState.Rack.ID, + ) + + return reconcileSuccess() +} + func (r *SingleClusterReconciler) handleNSOrDeviceRemovalForIgnorablePods( rackState *RackState, ignorablePodNames sets.Set[string], ) reconcileResult { @@ -1009,7 +1080,7 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, } restartType := restartTypeMap[pod.Name] - if restartType == noRestart { + if restartType == noRestart || restartType == noRestartUpdateConf { r.Log.Info("This Pod doesn't need rolling restart, Skip this", "pod", pod.Name) continue } @@ -1144,26 +1215,41 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1 return statefulSet, reconcileSuccess() } -func (r *SingleClusterReconciler) needRollingRestartRack(rackState *RackState, ignorablePodNames sets.Set[string]) ( - needRestart bool, restartTypeMap map[string]RestartType, err error, +type rollingRestartInfo struct { + restartTypeMap map[string]RestartType + dynamicConfDiffPerPod map[string]asconfig.DynamicConfigMap + needRestart, needUpdateConf bool +} + +func (r *SingleClusterReconciler) getRollingRestartInfo(rackState *RackState, ignorablePodNames sets.Set[string]) ( + info *rollingRestartInfo, err error, ) { - podList, err := r.getOrderedRackPodList(rackState.Rack.ID) + restartTypeMap, dynamicConfDiffPerPod, err := r.getRollingRestartTypeMap(rackState, ignorablePodNames) if err != nil { - return false, nil, fmt.Errorf("failed to list pods: %v", err) + return nil, err } - restartTypeMap, err = r.getRollingRestartTypeMap(rackState, podList, ignorablePodNames) - if err != nil { - return false, nil, err - } + needRestart, needUpdateConf := false, false for _, restartType := range restartTypeMap { - if restartType != noRestart { - return true, restartTypeMap, nil + switch restartType { + case noRestart: + // Do nothing + case noRestartUpdateConf: + needUpdateConf = true + case podRestart, quickRestart: + needRestart = true } } - return false, nil, nil + info = &rollingRestartInfo{ + needRestart: needRestart, + needUpdateConf: needUpdateConf, + restartTypeMap: restartTypeMap, + dynamicConfDiffPerPod: dynamicConfDiffPerPod, + } + + return info, nil } func (r *SingleClusterReconciler) isRackUpgradeNeeded(rackID int, ignorablePodNames sets.Set[string]) ( @@ -1296,9 +1382,7 @@ func (r *SingleClusterReconciler) isStorageVolumeSourceUpdated(volume *asdbv1.Vo return true } - var volumeCopy asdbv1.VolumeSpec - - lib.DeepCopy(&volumeCopy, volume) + volumeCopy := lib.DeepCopy(volume).(*asdbv1.VolumeSpec) if volumeCopy.Source.Secret != nil { setDefaultsSecretVolumeSource(volumeCopy.Source.Secret) diff --git a/controllers/reconciler.go b/controllers/reconciler.go index d4376ad71..0ae486729 100644 --- a/controllers/reconciler.go +++ b/controllers/reconciler.go @@ -463,11 +463,13 @@ func (r *SingleClusterReconciler) updateAccessControlStatus() error { return err } - // AerospikeAccessControl - statusAerospikeAccessControl := &asdbv1.AerospikeAccessControlSpec{} - lib.DeepCopy( - statusAerospikeAccessControl, r.aeroCluster.Spec.AerospikeAccessControl, - ) + var statusAerospikeAccessControl *asdbv1.AerospikeAccessControlSpec + if r.aeroCluster.Spec.AerospikeAccessControl != nil { + // AerospikeAccessControl + statusAerospikeAccessControl = lib.DeepCopy( + r.aeroCluster.Spec.AerospikeAccessControl, + ).(*asdbv1.AerospikeAccessControlSpec) + } newAeroCluster.Status.AerospikeClusterStatusSpec.AerospikeAccessControl = statusAerospikeAccessControl @@ -619,7 +621,8 @@ func (r *SingleClusterReconciler) patchStatus(newAeroCluster *asdbv1.AerospikeCl // Seems like a bug in encoding/json/Unmarshall. // // Workaround by force copying new object's status to old object's status. - lib.DeepCopy(&oldAeroCluster.Status, &newAeroCluster.Status) + aeroclusterStatus := lib.DeepCopy(&newAeroCluster.Status).(*asdbv1.AerospikeClusterStatus) + oldAeroCluster.Status = *aeroclusterStatus return nil } @@ -945,28 +948,9 @@ func (r *SingleClusterReconciler) migrateInitialisedVolumeNames(ctx context.Cont } } - if len(patches) == 0 { - return nil - } - - jsonPatchJSON, err := json.Marshal(patches) - if err != nil { - return err - } - - constantPatch := client.RawPatch(types.JSONPatchType, jsonPatchJSON) - - // Since the pod status is updated from pod init container, - // set the field owner to "pod" for pod status updates. r.Log.Info("Patching status with updated initialised volumes") - if err = r.Client.Status().Patch( - ctx, r.aeroCluster, constantPatch, client.FieldOwner("pod"), - ); err != nil { - return fmt.Errorf("error updating status: %v", err) - } - - return nil + return r.patchPodStatus(ctx, patches) } func (r *SingleClusterReconciler) getPVCUid(ctx context.Context, pod *corev1.Pod, volName string) (string, error) { diff --git a/controllers/statefulset.go b/controllers/statefulset.go index 58b958d70..1528f725b 100644 --- a/controllers/statefulset.go +++ b/controllers/statefulset.go @@ -147,7 +147,7 @@ func (r *SingleClusterReconciler) createSTS( // TODO: Do we need this var? { Name: "CONFIG_MAP_NAME", - Value: getNamespacedNameForSTSConfigMap( + Value: utils.GetNamespacedNameForSTSOrConfigMap( r.aeroCluster, rackState.Rack.ID, ).Name, }, @@ -380,7 +380,7 @@ func (r *SingleClusterReconciler) getSTS(rackState *RackState) (*appsv1.Stateful found := &appsv1.StatefulSet{} if err := r.Client.Get( context.TODO(), - getNamespacedNameForSTS(r.aeroCluster, rackState.Rack.ID), + utils.GetNamespacedNameForSTSOrConfigMap(r.aeroCluster, rackState.Rack.ID), found, ); err != nil { return nil, err @@ -849,9 +849,9 @@ func (r *SingleClusterReconciler) updateSTSSchedulingPolicy( // Use rack affinity, if given if rackState.Rack.PodSpec.Affinity != nil { - lib.DeepCopy(affinity, rackState.Rack.PodSpec.Affinity) + affinity = lib.DeepCopy(rackState.Rack.PodSpec.Affinity).(*corev1.Affinity) } else if r.aeroCluster.Spec.PodSpec.Affinity != nil { - lib.DeepCopy(affinity, r.aeroCluster.Spec.PodSpec.Affinity) + affinity = lib.DeepCopy(r.aeroCluster.Spec.PodSpec.Affinity).(*corev1.Affinity) } // Set our rules in PodAntiAffinity @@ -1026,8 +1026,7 @@ func updateSTSContainers( // Create a copy because updating stateful sets defaults // on the sidecar container object which mutates original aeroCluster object. - specContainerCopy := &corev1.Container{} - lib.DeepCopy(specContainerCopy, specContainer) + specContainerCopy := lib.DeepCopy(specContainer).(*corev1.Container) for stsIdx := range stsContainers { if specContainer.Name != stsContainers[stsIdx].Name { @@ -1091,7 +1090,7 @@ func (r *SingleClusterReconciler) waitForAllSTSToBeReady(ignorablePodNames sets. for rackID := range allRackIDs { st := &appsv1.StatefulSet{} - stsName := getNamespacedNameForSTS(r.aeroCluster, rackID) + stsName := utils.GetNamespacedNameForSTSOrConfigMap(r.aeroCluster, rackID) if err := r.Client.Get(context.TODO(), stsName, st); err != nil { if !errors.IsNotFound(err) { @@ -1286,7 +1285,7 @@ func getDefaultSTSVolumes( VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ - Name: getNamespacedNameForSTSConfigMap( + Name: utils.GetNamespacedNameForSTSOrConfigMap( aeroCluster, rackState.Rack.ID, ).Name, }, @@ -1581,15 +1580,6 @@ func getSTSContainerPort( return ports } -func getNamespacedNameForSTS( - aeroCluster *asdbv1.AerospikeCluster, rackID int, -) types.NamespacedName { - return types.NamespacedName{ - Name: aeroCluster.Name + "-" + strconv.Itoa(rackID), - Namespace: aeroCluster.Namespace, - } -} - func getSTSPodOrdinal(podName string) (*int32, error) { parts := strings.Split(podName, "-") ordinalStr := parts[len(parts)-1] diff --git a/go.mod b/go.mod index 204c7cf87..df7b4e00c 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21 toolchain go1.21.8 require ( - github.com/aerospike/aerospike-management-lib v1.2.1-0.20240319095728-1600222cec4c + github.com/aerospike/aerospike-management-lib v1.3.1-0.20240404063536-2adfbedf9687 github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d github.com/evanphx/json-patch v4.12.0+incompatible github.com/go-logr/logr v1.3.0 @@ -26,6 +26,7 @@ require ( github.com/deckarep/golang-set/v2 v2.3.1 github.com/sirupsen/logrus v1.9.0 golang.org/x/crypto v0.21.0 + golang.org/x/net v0.21.0 gomodules.xyz/jsonpatch/v2 v2.3.0 k8s.io/utils v0.0.0-20230726121419-3b25d923346b ) @@ -67,8 +68,6 @@ require ( github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect - github.com/qdm12/reprint v0.0.0-20200326205758-722754a53494 // indirect - github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect @@ -77,7 +76,6 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.24.0 // indirect - golang.org/x/net v0.21.0 // indirect golang.org/x/sync v0.6.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/go.sum b/go.sum index 26033d0ed..947f1be32 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,9 @@ github.com/aerospike/aerospike-client-go/v7 v7.1.0 h1:yvCTKdbpqZxHvv7sWsFHV1j49jZcC8yXRooWsDFqKtA= github.com/aerospike/aerospike-client-go/v7 v7.1.0/go.mod h1:AkHiKvCbqa1c16gCNGju3c5X/yzwLVvblNczqjxNwNk= -github.com/aerospike/aerospike-management-lib v1.2.1-0.20240319095728-1600222cec4c h1:wCscajyxCdQ9NeDxJdMbBascFym9MQV0aALTJ2dANOc= -github.com/aerospike/aerospike-management-lib v1.2.1-0.20240319095728-1600222cec4c/go.mod h1:o1TV3BTsAiuZ5HtZi9E4FgXqWRwjDzlkS4bfvfaAHLU= +github.com/aerospike/aerospike-management-lib v1.2.1-0.20240325134810-f8046fe9872e h1:Q/AfYe++0ouO5csLS8l99kCQqJJvDKlfHwhuWbECpaQ= +github.com/aerospike/aerospike-management-lib v1.2.1-0.20240325134810-f8046fe9872e/go.mod h1:E4dk798IikCp9a8fugpYoeQVIXuvdxogHvt6sKhaORQ= +github.com/aerospike/aerospike-management-lib v1.3.1-0.20240404063536-2adfbedf9687 h1:d7oDvHmiKhq4rzcD/w3z9tP3wH0+iaDvxKDk3IYuqeU= +github.com/aerospike/aerospike-management-lib v1.3.1-0.20240404063536-2adfbedf9687/go.mod h1:E4dk798IikCp9a8fugpYoeQVIXuvdxogHvt6sKhaORQ= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d h1:Byv0BzEl3/e6D5CLfI0j/7hiIEtvGVFPCZ7Ei2oq8iQ= @@ -117,10 +119,8 @@ github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdO github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= -github.com/qdm12/reprint v0.0.0-20200326205758-722754a53494 h1:wSmWgpuccqS2IOfmYrbRiUgv+g37W5suLLLxwwniTSc= -github.com/qdm12/reprint v0.0.0-20200326205758-722754a53494/go.mod h1:yipyliwI08eQ6XwDm1fEwKPdF/xdbkiHtrU+1Hg+vc4= -github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= -github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= @@ -129,7 +129,6 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -241,7 +240,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml index 25ad29de4..c977eb9b7 100644 --- a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml +++ b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml @@ -14238,6 +14238,10 @@ spec: items: type: string type: array + dynamicConfigFailed: + description: DynamicConfigFailed is true if aerospike config + change failed to apply dynamically. + type: boolean hostExternalIP: description: HostExternalIP of the K8s host this pod is scheduled on. diff --git a/helm-charts/aerospike-kubernetes-operator/templates/aerospikecluster-clusterrole.yaml b/helm-charts/aerospike-kubernetes-operator/templates/aerospikecluster-clusterrole.yaml index 262c13c03..1c5d7596d 100644 --- a/helm-charts/aerospike-kubernetes-operator/templates/aerospikecluster-clusterrole.yaml +++ b/helm-charts/aerospike-kubernetes-operator/templates/aerospikecluster-clusterrole.yaml @@ -10,7 +10,6 @@ rules: - apiGroups: - "" resources: - - pods - nodes - services - configmaps @@ -24,3 +23,11 @@ rules: - '*' verbs: - '*' +- apiGroups: + - "" + resources: + - pods + verbs: + - get + - list + - update diff --git a/pkg/utils/pod.go b/pkg/utils/pod.go index 629f6c9b2..8a4d6abfb 100644 --- a/pkg/utils/pod.go +++ b/pkg/utils/pod.go @@ -9,6 +9,7 @@ import ( "strings" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" @@ -148,17 +149,15 @@ func GetRackIDFromPodName(podName string) (*int, error) { } // Exec executes a non interactive command on a pod. -func Exec( - pod *corev1.Pod, container string, cmd []string, - kubeClient *kubernetes.Clientset, kubeConfig *rest.Config, -) (stdoutStr, stderrStr string, err error) { +func Exec(podNamespacedName types.NamespacedName, container string, cmd []string, kubeClient *kubernetes.Clientset, + kubeConfig *rest.Config) (stdoutStr, stderrStr string, err error) { request := kubeClient. CoreV1(). RESTClient(). Post(). Resource("pods"). - Namespace(pod.Namespace). - Name(pod.Name). + Namespace(podNamespacedName.Namespace). + Name(podNamespacedName.Name). SubResource("exec"). VersionedParams( &corev1.PodExecOptions{ diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index eb6a9c607..f87e44d5d 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -10,6 +10,8 @@ import ( //nolint:staticcheck // this ripemd160 legacy hash is only used for diff comparison not for security purpose "golang.org/x/crypto/ripemd160" corev1 "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" ) @@ -37,6 +39,22 @@ func NamespacedName(namespace, name string) string { return fmt.Sprintf("%s/%s", namespace, name) } +func GetNamespacedName(obj meta.Object) types.NamespacedName { + return types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + } +} + +func GetNamespacedNameForSTSOrConfigMap( + aeroCluster *asdbv1.AerospikeCluster, rackID int, +) types.NamespacedName { + return types.NamespacedName{ + Name: aeroCluster.Name + "-" + strconv.Itoa(rackID), + Namespace: aeroCluster.Namespace, + } +} + // IsImageEqual returns true if image name image1 is equal to image name image2. func IsImageEqual(image1, image2 string) bool { desiredImageWithVersion := strings.TrimPrefix(image1, DockerHubImagePrefix) diff --git a/test/cluster_helper.go b/test/cluster_helper.go index 17f981c63..241eae04e 100644 --- a/test/cluster_helper.go +++ b/test/cluster_helper.go @@ -332,7 +332,7 @@ func rollingRestartClusterTest( aeroCluster.Spec.AerospikeConfig.Value["service"] = map[string]interface{}{} } - aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = defaultProtofdmax + 1 + aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["indent-allocations"] = true err = updateCluster(k8sClient, ctx, aeroCluster) if err != nil { @@ -341,7 +341,7 @@ func rollingRestartClusterTest( // Verify that the change has been applied on the cluster. return validateAerospikeConfigServiceClusterUpdate( - log, k8sClient, ctx, clusterNamespacedName, []string{"proto-fd-max"}, + log, k8sClient, ctx, clusterNamespacedName, []string{"indent-allocations"}, ) } @@ -1121,6 +1121,7 @@ func createDummyAerospikeCluster( "service": map[string]interface{}{ "feature-key-file": "/etc/aerospike/secret/features.conf", "proto-fd-max": defaultProtofdmax, + "auto-pin": "none", }, "security": map[string]interface{}{}, "network": getNetworkConfig(), @@ -1442,15 +1443,12 @@ func aerospikeClusterCreateUpdateWithTO( // Apply the update. if desired.Spec.AerospikeAccessControl != nil { current.Spec.AerospikeAccessControl = &asdbv1.AerospikeAccessControlSpec{} - lib.DeepCopy(¤t.Spec, &desired.Spec) + current.Spec = *lib.DeepCopy(&desired.Spec).(*asdbv1.AerospikeClusterSpec) } else { current.Spec.AerospikeAccessControl = nil } - lib.DeepCopy( - ¤t.Spec.AerospikeConfig.Value, - &desired.Spec.AerospikeConfig.Value, - ) + current.Spec.AerospikeConfig.Value = desired.Spec.AerospikeConfig.DeepCopy().Value if err := k8sClient.Update(ctx, current); err != nil { return err diff --git a/test/cluster_test.go b/test/cluster_test.go index 6e29a8ee1..28cd1ef25 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -74,9 +74,68 @@ var _ = Describe( ScaleDownWithMigrateFillDelay(ctx) }, ) + Context( + "UpdateClusterPre600", func() { + UpdateClusterPre600(ctx) + }, + ) }, ) +func UpdateClusterPre600(ctx goctx.Context) { + Context( + "UpdateClusterPre600", func() { + clusterNamespacedName := getNamespacedName( + "deploy-cluster-pre6", namespace, + ) + + BeforeEach( + func() { + image := fmt.Sprintf( + "aerospike/aerospike-server-enterprise:%s", pre6Version, + ) + aeroCluster, err := getAeroClusterConfig( + clusterNamespacedName, image, + ) + Expect(err).ToNot(HaveOccurred()) + + err = deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + AfterEach( + func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + _ = deleteCluster(k8sClient, ctx, aeroCluster) + }, + ) + + It( + "UpdateReplicationFactor: should fail for updating namespace replication-factor on server"+ + "before 6.0.0. Cannot be updated", func() { + aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + namespaceConfig := + aeroCluster.Spec.AerospikeConfig.Value["namespaces"].([]interface{})[0].(map[string]interface{}) + namespaceConfig["replication-factor"] = 5 + aeroCluster.Spec.AerospikeConfig.Value["namespaces"].([]interface{})[0] = namespaceConfig + + err = k8sClient.Update( + ctx, aeroCluster, + ) + Expect(err).Should(HaveOccurred()) + }, + ) + }, + ) +} + func ScaleDownWithMigrateFillDelay(ctx goctx.Context) { Context( "ScaleDownWithMigrateFillDelay", func() { @@ -184,8 +243,7 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { Expect(err).ToNot(HaveOccurred()) val := intstr.FromInt32(1) aeroCluster.Spec.RackConfig.MaxIgnorablePods = &val - aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = - int64(18000) + aeroCluster.Spec.AerospikeConfig.Value["security"].(map[string]interface{})["enable-quotas"] = true // As pod is in pending state, CR object will be won't reach the final phase. // So expectedPhases can be InProgress or Completed @@ -193,14 +251,15 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { }, 1*time.Minute).ShouldNot(HaveOccurred()) By("Upgrade version") - aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) - Expect(err).ToNot(HaveOccurred()) - newImage := baseImage + ":7.0.0.0_2" - aeroCluster.Spec.Image = newImage - // As pod is in pending state, CR object will be won't reach the final phase. - // So expectedPhases can be InProgress or Completed - err = updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases) - Expect(err).ToNot(HaveOccurred()) + Eventually(func() error { + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + newImage := baseImage + ":7.0.0.0_2" + aeroCluster.Spec.Image = newImage + // As pod is in pending state, CR object will be won't reach the final phase. + // So expectedPhases can be InProgress or Completed + return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases) + }, 1*time.Minute).ShouldNot(HaveOccurred()) By("Verify pending pod") podList, err = getPodList(aeroCluster, k8sClient) @@ -920,6 +979,22 @@ func UpdateClusterTest(ctx goctx.Context) { ) Expect(err).ToNot(HaveOccurred()) + By("UpdateReplicationFactor: should update namespace replication-factor on non-SC namespace") + + aeroCluster, err := getCluster( + k8sClient, ctx, + clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + nsList := aeroCluster.Spec.AerospikeConfig.Value["namespaces"].([]interface{}) + namespaceConfig := nsList[len(nsList)-1].(map[string]interface{}) + // aeroCluster.Spec.AerospikeConfig.Value["namespaces"].([]interface{})[0].(map[string]interface{}) + namespaceConfig["replication-factor"] = 3 + aeroCluster.Spec.AerospikeConfig.Value["namespaces"].([]interface{})[len(nsList)-1] = namespaceConfig + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + By("Scaling up along with modifying Namespace storage Dynamically") err = scaleUpClusterTestWithNSDeviceHandling( @@ -1067,7 +1142,8 @@ func UpdateClusterTest(ctx goctx.Context) { Context( "Namespace", func() { It( - "UpdateReplicationFactor: should fail for updating namespace replication-factor. Cannot be updated", + "UpdateReplicationFactor: should fail for updating namespace"+ + "replication-factor on SC namespace. Cannot be updated", func() { aeroCluster, err := getCluster( k8sClient, ctx, diff --git a/test/dynamic_config_test.go b/test/dynamic_config_test.go new file mode 100644 index 000000000..df87c5681 --- /dev/null +++ b/test/dynamic_config_test.go @@ -0,0 +1,265 @@ +package test + +import ( + goctx "context" + "fmt" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "golang.org/x/net/context" + + asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" + "github.com/aerospike/aerospike-kubernetes-operator/pkg/utils" +) + +type podID struct { + podUID string + asdPID string +} + +var _ = Describe( + "DynamicConfig", func() { + + ctx := goctx.Background() + + Context( + "When doing valid operations", func() { + + clusterName := "dynamic-config-test" + clusterNamespacedName := getNamespacedName( + clusterName, namespace, + ) + BeforeEach( + func() { + // Create a 2 node cluster + aeroCluster := createDummyAerospikeCluster( + clusterNamespacedName, 2, + ) + aeroCluster.Spec.AerospikeConfig.Value["xdr"] = map[string]interface{}{ + "dcs": []map[string]interface{}{ + { + "name": "dc1", + "auth-mode": "internal", + "auth-user": "admin", + "node-address-ports": []string{ + "aeroclusterdst-0-0 3000", + }, + "auth-password-file": "/etc/aerospike/secret/password_DC1.txt", + "namespaces": []map[string]interface{}{ + { + "name": "test", + }, + }, + }, + }, + } + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + AfterEach( + func() { + aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + _ = deleteCluster(k8sClient, ctx, aeroCluster) + }, + ) + + It( + "Should update config dynamically", func() { + + By("Modify dynamic config by adding fields") + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + podPIDMap, err := getPodIDs(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + log := map[string]interface{}{ + "report-data-op": []string{"test"}, + } + + aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = 18000 + + aeroCluster.Spec.AerospikeConfig.Value["security"].(map[string]interface{})["log"] = log + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + pod := aeroCluster.Status.Pods["dynamic-config-test-0-0"] + + conf, err := getAerospikeConfigFromNode(logger, k8sClient, ctx, clusterNamespacedName, + "service", &pod) + Expect(err).ToNot(HaveOccurred()) + cv, ok := conf["proto-fd-max"] + Expect(ok).ToNot(BeFalse()) + + Expect(cv).To(Equal(int64(18000))) + + conf, err = getAerospikeConfigFromNode(logger, k8sClient, ctx, clusterNamespacedName, + "security", &pod) + Expect(err).ToNot(HaveOccurred()) + + reportDataOp, ok := conf["log.report-data-op[0]"].(string) + Expect(ok).ToNot(BeFalse()) + + Expect(reportDataOp).To(Equal("test")) + + By("Verify no warm/cold restarts in Pods") + validateServerRestart(ctx, aeroCluster, podPIDMap, false) + + By("Modify dynamic config by removing fields") + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + podPIDMap, err = getPodIDs(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + delete(aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{}), "proto-fd-max") + delete(aeroCluster.Spec.AerospikeConfig.Value["security"].(map[string]interface{}), "log") + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + pod = aeroCluster.Status.Pods["dynamic-config-test-0-0"] + + conf, err = getAerospikeConfigFromNode(logger, k8sClient, ctx, clusterNamespacedName, + "service", &pod) + Expect(err).ToNot(HaveOccurred()) + cv, ok = conf["proto-fd-max"] + Expect(ok).ToNot(BeFalse()) + + Expect(cv).To(Equal(int64(15000))) + + conf, err = getAerospikeConfigFromNode(logger, k8sClient, ctx, clusterNamespacedName, + "security", &pod) + Expect(err).ToNot(HaveOccurred()) + + _, ok = conf["log.report-data-op[0]"].(string) + Expect(ok).ToNot(BeTrue()) + + By("Verify no warm/cold restarts in Pods") + validateServerRestart(ctx, aeroCluster, podPIDMap, false) + + }, + ) + + It( + "Should update config statically", func() { + + By("Modify static config") + + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + podPIDMap, err := getPodIDs(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster.Spec.AerospikeConfig.Value["security"].(map[string]interface{})["enable-quotas"] = true + dc := map[string]interface{}{ + "name": "dc2", + "auth-mode": "internal", + "auth-user": "admin", + "node-address-ports": []string{ + "aeroclusterdst-0-0 3000", + }, + "auth-password-file": "/etc/aerospike/secret/password_DC1.txt", + "namespaces": []map[string]interface{}{ + { + "name": "test", + }, + }, + } + + aeroCluster.Spec.AerospikeConfig.Value["xdr"].(map[string]interface{})["dcs"] = append( + aeroCluster.Spec.AerospikeConfig.Value["xdr"].(map[string]interface{})["dcs"].([]interface{}), dc) + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + pod := aeroCluster.Status.Pods["dynamic-config-test-0-0"] + + conf, err := getAerospikeConfigFromNode(logger, k8sClient, ctx, clusterNamespacedName, + "security", &pod) + Expect(err).ToNot(HaveOccurred()) + + enableQuotas, ok := conf["enable-quotas"].(bool) + Expect(ok).ToNot(BeFalse()) + + Expect(enableQuotas).To(BeTrue()) + + conf, err = getAerospikeConfigFromNode(logger, k8sClient, ctx, clusterNamespacedName, + "xdr", &pod) + Expect(err).ToNot(HaveOccurred()) + + Expect(conf["dcs"]).To(HaveLen(2)) + + By("Verify warm restarts in Pods") + validateServerRestart(ctx, aeroCluster, podPIDMap, true) + }, + ) + }, + ) + }, +) + +func validateServerRestart(ctx goctx.Context, cluster *asdbv1.AerospikeCluster, pidMap map[string]podID, + shouldRestart bool) { + restarted := false + + newPodPidMap, err := getPodIDs(ctx, cluster) + Expect(err).ToNot(HaveOccurred()) + + for podName, pid := range pidMap { + if newPodPidMap[podName].podUID != pid.podUID || newPodPidMap[podName].asdPID != pid.asdPID { + restarted = true + break + } + } + + Expect(restarted).To(Equal(shouldRestart)) +} + +func getPodIDs(ctx context.Context, aeroCluster *asdbv1.AerospikeCluster) (map[string]podID, error) { + podList, err := getClusterPodList(k8sClient, ctx, aeroCluster) + if err != nil { + return nil, err + } + + pidMap := make(map[string]podID) + + for podIndex := range podList.Items { + pod := &podList.Items[podIndex] + cmd := []string{ + "bash", + "-c", + "ps -A -o pid,cmd|grep \"asd\" | grep -v grep | grep -v tini |head -n 1 | awk '{print $1}'", + } + + stdout, _, execErr := utils.Exec( + utils.GetNamespacedName(pod), asdbv1.AerospikeServerContainerName, cmd, k8sClientset, + cfg, + ) + + if execErr != nil { + return nil, fmt.Errorf( + "error reading ASD Pid from pod %s - %v", pod.Name, execErr, + ) + } + + pidMap[pod.Name] = podID{ + podUID: string(pod.UID), + asdPID: stdout, + } + } + + return pidMap, nil +} diff --git a/test/large_reconcile_test.go b/test/large_reconcile_test.go index 27556c281..244a880aa 100644 --- a/test/large_reconcile_test.go +++ b/test/large_reconcile_test.go @@ -104,9 +104,8 @@ var _ = Describe( ) Expect(err).ToNot(HaveOccurred()) - // oldService := aeroCluster.Spec.AerospikeConfig.Value["service"] - tempConf := 18000 - aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = tempConf + tempConf := "cpu" + aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["auto-pin"] = tempConf err = k8sClient.Update(goctx.TODO(), aeroCluster) Expect(err).ToNot(HaveOccurred()) @@ -117,7 +116,7 @@ var _ = Describe( ) Expect(err).ToNot(HaveOccurred()) - aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = defaultProtofdmax + aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["auto-pin"] = "none" return k8sClient.Update(goctx.TODO(), aeroCluster) }, 1*time.Minute).ShouldNot(HaveOccurred()) @@ -332,7 +331,7 @@ func waitForClusterScaleDown( func waitForClusterRollingRestart( k8sClient client.Client, aeroCluster *asdbv1.AerospikeCluster, - replicas int, tempConf int, retryInterval, timeout time.Duration, + replicas int, tempConf string, retryInterval, timeout time.Duration, ) error { err := wait.PollUntilContextTimeout(goctx.TODO(), retryInterval, timeout, true, func(ctx goctx.Context) (done bool, err error) { @@ -350,10 +349,10 @@ func waitForClusterRollingRestart( return false, err } - protofdmax := newCluster.Status.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"].(float64) - if int(protofdmax) == tempConf { + autoPin := newCluster.Status.AerospikeConfig.Value["service"].(map[string]interface{})["auto-pin"].(string) + if autoPin == tempConf { err := fmt.Errorf( - "cluster status can not be updated with intermediate conf value %d,"+ + "cluster status can not be updated with intermediate conf value %v,"+ " it should have only final value, as this is the new reconcile flow", tempConf, ) diff --git a/test/rack_utils.go b/test/rack_utils.go index d54bda2ab..098dde03f 100644 --- a/test/rack_utils.go +++ b/test/rack_utils.go @@ -16,6 +16,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" + "github.com/aerospike/aerospike-kubernetes-operator/pkg/utils" lib "github.com/aerospike/aerospike-management-lib" "github.com/aerospike/aerospike-management-lib/info" ) @@ -91,22 +92,11 @@ func validateAerospikeConfigServiceUpdate( // TODO: // We may need to check for all keys in aerospikeConfig in rack // but we know that we are changing for service only for now - host, err := createHost(&pod) + svcConfs, err := getAerospikeConfigFromNode(log, k8sClient, ctx, clusterNamespacedName, "service", &pod) if err != nil { return err } - asinfo := info.NewAsInfo( - log, host, getClientPolicy(aeroCluster, k8sClient), - ) - - confs, err := getAsConfig(asinfo, "service") - if err != nil { - return err - } - - svcConfs := confs["service"].(lib.Stats) - for k, v := range rack.InputAerospikeConfig.Value["service"].(map[string]interface{}) { if vint, ok := v.(int); ok { v = int64(vint) @@ -195,7 +185,7 @@ func getPodSpecAnnotations( for rackStateIndex := range rackStateList { found := &appsv1.StatefulSet{} - stsName := getNamespacedNameForStatefulSet(aeroCluster, rackStateList[rackStateIndex].Rack.ID) + stsName := utils.GetNamespacedNameForSTSOrConfigMap(aeroCluster, rackStateList[rackStateIndex].Rack.ID) err := k8sClient.Get(ctx, stsName, found) if errors.IsNotFound(err) { @@ -222,7 +212,7 @@ func getPodSpecLabels( rackStateList := getConfiguredRackStateList(aeroCluster) for rackStateIndex := range rackStateList { found := &appsv1.StatefulSet{} - stsName := getNamespacedNameForStatefulSet(aeroCluster, rackStateList[rackStateIndex].Rack.ID) + stsName := utils.GetNamespacedNameForSTSOrConfigMap(aeroCluster, rackStateList[rackStateIndex].Rack.ID) err := k8sClient.Get(ctx, stsName, found) if errors.IsNotFound(err) { @@ -248,7 +238,7 @@ func validateRackEnabledCluster( rackStateList := getConfiguredRackStateList(aeroCluster) for rackStateIndex := range rackStateList { found := &appsv1.StatefulSet{} - stsName := getNamespacedNameForStatefulSet( + stsName := utils.GetNamespacedNameForSTSOrConfigMap( aeroCluster, rackStateList[rackStateIndex].Rack.ID, ) @@ -456,15 +446,6 @@ func splitRacks(nodeCount, rackCount int) []int { return topology } -func getNamespacedNameForStatefulSet( - aeroCluster *asdbv1.AerospikeCluster, rackID int, -) types.NamespacedName { - return types.NamespacedName{ - Name: aeroCluster.Name + "-" + strconv.Itoa(rackID), - Namespace: aeroCluster.Namespace, - } -} - func getNamespacedName(name, namespace string) types.NamespacedName { return types.NamespacedName{ Name: name, diff --git a/test/services_test.go b/test/services_test.go index 10d79fbcb..8c80d1329 100644 --- a/test/services_test.go +++ b/test/services_test.go @@ -129,10 +129,7 @@ func createLoadBalancer() *asdbv1.LoadBalancerSpec { ), ) - result := &asdbv1.LoadBalancerSpec{} - lib.DeepCopy(result, lb) - - return result + return lib.DeepCopy(&lb).(*asdbv1.LoadBalancerSpec) } func loadBalancerName(aeroCluster *asdbv1.AerospikeCluster) types.NamespacedName { diff --git a/test/storage_init_test.go b/test/storage_init_test.go index f5a28bc4d..5cb21b8d0 100644 --- a/test/storage_init_test.go +++ b/test/storage_init_test.go @@ -709,7 +709,7 @@ func writeDataToVolumeBlock( magicBytes, path, ), } - _, _, err := utils.Exec(pod, cName, cmd, k8sClientset, cfg) + _, _, err := utils.Exec(utils.GetNamespacedName(pod), cName, cmd, k8sClientset, cfg) if err != nil { return fmt.Errorf("error creating file %v", err) @@ -726,7 +726,7 @@ func writeDataToVolumeFileSystem( cmd := []string{ "bash", "-c", fmt.Sprintf("echo %s > %s/magic.txt", magicBytes, path), } - _, _, err := utils.Exec(pod, cName, cmd, k8sClientset, cfg) + _, _, err := utils.Exec(utils.GetNamespacedName(pod), cName, cmd, k8sClientset, cfg) if err != nil { return fmt.Errorf("error creating file %v", err) @@ -741,7 +741,7 @@ func hasDataBlock(pod *corev1.Pod, volume *asdbv1.VolumeSpec) bool { cmd := []string{ "bash", "-c", fmt.Sprintf("dd if=%s count=1 status=none", path), } - stdout, _, _ := utils.Exec(pod, cName, cmd, k8sClientset, cfg) + stdout, _, _ := utils.Exec(utils.GetNamespacedName(pod), cName, cmd, k8sClientset, cfg) return strings.HasPrefix(stdout, magicBytes) } @@ -750,7 +750,7 @@ func hasDataFilesystem(pod *corev1.Pod, volume *asdbv1.VolumeSpec) bool { cName, path := getContainerNameAndPath(volume) cmd := []string{"bash", "-c", fmt.Sprintf("cat %s/magic.txt", path)} - stdout, _, _ := utils.Exec(pod, cName, cmd, k8sClientset, cfg) + stdout, _, _ := utils.Exec(utils.GetNamespacedName(pod), cName, cmd, k8sClientset, cfg) return strings.HasPrefix(stdout, magicBytes) } diff --git a/test/utils.go b/test/utils.go index c7ee3982b..7db639344 100644 --- a/test/utils.go +++ b/test/utils.go @@ -15,6 +15,7 @@ import ( "time" set "github.com/deckarep/golang-set/v2" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -28,6 +29,7 @@ import ( asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" operatorUtils "github.com/aerospike/aerospike-kubernetes-operator/pkg/utils" lib "github.com/aerospike/aerospike-management-lib" + "github.com/aerospike/aerospike-management-lib/info" ) var ( @@ -767,3 +769,27 @@ func getGitRepoRootPath() (string, error) { return strings.TrimSpace(string(path)), nil } + +func getAerospikeConfigFromNode(log logr.Logger, k8sClient client.Client, ctx goctx.Context, + clusterNamespacedName types.NamespacedName, configContext string, pod *asdbv1.AerospikePodStatus) (lib.Stats, error) { + aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName) + if err != nil { + return nil, err + } + + host, err := createHost(pod) + if err != nil { + return nil, err + } + + asinfo := info.NewAsInfo( + log, host, getClientPolicy(aeroCluster, k8sClient), + ) + + confs, err := getAsConfig(asinfo, configContext) + if err != nil { + return nil, err + } + + return confs[configContext].(lib.Stats), nil +} diff --git a/test/warm_restart_test.go b/test/warm_restart_test.go index 6f91a8268..f1fe64aa3 100644 --- a/test/warm_restart_test.go +++ b/test/warm_restart_test.go @@ -108,6 +108,7 @@ func createMarkerFile( } for podIndex := range podList.Items { + pod := &podList.Items[podIndex] cmd := []string{ "bash", "-c", @@ -115,13 +116,13 @@ func createMarkerFile( } _, _, err := utils.Exec( - &podList.Items[podIndex], asdbv1.AerospikeServerContainerName, cmd, k8sClientset, + utils.GetNamespacedName(pod), asdbv1.AerospikeServerContainerName, cmd, k8sClientset, cfg, ) if err != nil { return fmt.Errorf( - "error reading ASD Pid from pod %s - %v", podList.Items[podIndex].Name, err, + "error reading ASD Pid from pod %s - %v", pod.Name, err, ) } } @@ -141,6 +142,7 @@ func isMarkerPresent( podToMarkerPresent := make(map[string]bool) for podIndex := range podList.Items { + pod := &podList.Items[podIndex] cmd := []string{ "bash", "-c", @@ -148,7 +150,7 @@ func isMarkerPresent( } _, _, err := utils.Exec( - &podList.Items[podIndex], asdbv1.AerospikeServerContainerName, cmd, k8sClientset, + utils.GetNamespacedName(pod), asdbv1.AerospikeServerContainerName, cmd, k8sClientset, cfg, )