diff --git a/api/v1/aerospikecluster_mutating_webhook.go b/api/v1/aerospikecluster_mutating_webhook.go index 7e6203ab9..10c7222c4 100644 --- a/api/v1/aerospikecluster_mutating_webhook.go +++ b/api/v1/aerospikecluster_mutating_webhook.go @@ -31,6 +31,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" "github.com/aerospike/aerospike-kubernetes-operator/pkg/merge" + lib "github.com/aerospike/aerospike-management-lib" ) //nolint:lll // for readability @@ -248,21 +249,19 @@ func (c *AerospikeCluster) updateRacksAerospikeConfigFromGlobal(asLog logr.Logge for idx := range c.Spec.RackConfig.Racks { rack := &c.Spec.RackConfig.Racks[idx] - var ( - m map[string]interface{} - err error - ) + var m map[string]interface{} if rack.InputAerospikeConfig != nil { // Merge this rack's and global config. - m, err = merge.Merge( + merged, err := merge.Merge( c.Spec.AerospikeConfig.Value, rack.InputAerospikeConfig.Value, ) - if err != nil { return err } + m = lib.DeepCopy(merged).(map[string]interface{}) + asLog.V(1).Info( "Merged rack config from global aerospikeConfig", "rack id", rack.ID, "rackAerospikeConfig", m, "globalAerospikeConfig", diff --git a/controllers/aero_info_calls.go b/controllers/aero_info_calls.go index df016e31a..7f51fb22a 100644 --- a/controllers/aero_info_calls.go +++ b/controllers/aero_info_calls.go @@ -338,7 +338,7 @@ func (r *SingleClusterReconciler) setDynamicConfig( for _, host := range selectedHostConns { podName := podIPNameMap[host.ASConn.AerospikeHostName] - asConfCmds, err := asconfig.CreateSetConfigCmdList(dynamicConfDiffPerPod[podName], + asConfCmds, err := asconfig.CreateSetConfigCmdList(r.Log, dynamicConfDiffPerPod[podName], host.ASConn, r.getClientPolicy()) if err != nil { diff --git a/go.mod b/go.mod index df7b4e00c..370c45345 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21 toolchain go1.21.8 require ( - github.com/aerospike/aerospike-management-lib v1.3.1-0.20240404063536-2adfbedf9687 + github.com/aerospike/aerospike-management-lib v1.3.1-0.20240414093735-a11e7fe7c3ec github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d github.com/evanphx/json-patch v4.12.0+incompatible github.com/go-logr/logr v1.3.0 diff --git a/go.sum b/go.sum index 947f1be32..8dbbc6e5d 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/aerospike/aerospike-management-lib v1.2.1-0.20240325134810-f8046fe987 github.com/aerospike/aerospike-management-lib v1.2.1-0.20240325134810-f8046fe9872e/go.mod h1:E4dk798IikCp9a8fugpYoeQVIXuvdxogHvt6sKhaORQ= github.com/aerospike/aerospike-management-lib v1.3.1-0.20240404063536-2adfbedf9687 h1:d7oDvHmiKhq4rzcD/w3z9tP3wH0+iaDvxKDk3IYuqeU= github.com/aerospike/aerospike-management-lib v1.3.1-0.20240404063536-2adfbedf9687/go.mod h1:E4dk798IikCp9a8fugpYoeQVIXuvdxogHvt6sKhaORQ= +github.com/aerospike/aerospike-management-lib v1.3.1-0.20240414093735-a11e7fe7c3ec h1:o0G+fsUiUfiKU/tnG20/rhHsKd579HkWPMvzz7OwtSU= +github.com/aerospike/aerospike-management-lib v1.3.1-0.20240414093735-a11e7fe7c3ec/go.mod h1:E4dk798IikCp9a8fugpYoeQVIXuvdxogHvt6sKhaORQ= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d h1:Byv0BzEl3/e6D5CLfI0j/7hiIEtvGVFPCZ7Ei2oq8iQ= diff --git a/test/cluster_helper.go b/test/cluster_helper.go index 241eae04e..d1af3fe5f 100644 --- a/test/cluster_helper.go +++ b/test/cluster_helper.go @@ -1006,6 +1006,7 @@ func createDummyAerospikeClusterWithoutStorage( return createDummyAerospikeClusterWithRFAndStorage(clusterNamespacedName, size, 1, nil) } + func createDummyAerospikeClusterWithRF( clusterNamespacedName types.NamespacedName, size int32, rf int, ) *asdbv1.AerospikeCluster { @@ -1554,6 +1555,23 @@ func getSCNamespaceConfig(name, path string) map[string]interface{} { } } +func getSCNamespaceConfigWithSet(name, path string) map[string]interface{} { + return map[string]interface{}{ + "name": name, + "replication-factor": 2, + "strong-consistency": true, + "storage-engine": map[string]interface{}{ + "type": "device", + "devices": []interface{}{path}, + }, + "sets": []map[string]interface{}{ + { + "name": "testset", + }, + }, + } +} + func getNonSCInMemoryNamespaceConfig(name string) map[string]interface{} { return map[string]interface{}{ "name": name, diff --git a/test/dynamic_config_test.go b/test/dynamic_config_test.go index d6565aee0..e52707742 100644 --- a/test/dynamic_config_test.go +++ b/test/dynamic_config_test.go @@ -3,14 +3,22 @@ package test import ( goctx "context" "fmt" + "os" + "strings" "time" + mapset "github.com/deckarep/golang-set/v2" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "golang.org/x/net/context" + ctrl "sigs.k8s.io/controller-runtime" asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" + "github.com/aerospike/aerospike-kubernetes-operator/pkg/configschema" "github.com/aerospike/aerospike-kubernetes-operator/pkg/utils" + lib "github.com/aerospike/aerospike-management-lib" + "github.com/aerospike/aerospike-management-lib/asconfig" + "github.com/aerospike/aerospike-management-lib/info" ) type podID struct { @@ -18,6 +26,8 @@ type podID struct { asdPID string } +var configWithMaxDefaultVal = mapset.NewSet("info-max-ms", "flush-max-ms") + var _ = Describe( "DynamicConfig", func() { @@ -293,6 +303,164 @@ var _ = Describe( ) }, ) + + Context( + "When doing complete dynamic config change", func() { + + clusterName := "aerocluster" + clusterNamespacedName := getNamespacedName( + clusterName, "test", + ) + + BeforeEach( + func() { + // Create a 2 node cluster + aeroCluster := createDummyAerospikeCluster( + clusterNamespacedName, 2, + ) + aeroCluster.Spec.AerospikeConfig.Value["xdr"] = map[string]interface{}{ + "dcs": []map[string]interface{}{ + { + "name": "dc1", + "auth-mode": "internal", + "auth-user": "admin", + "node-address-ports": []string{ + "aeroclusterdst-0-0 3000", + }, + "auth-password-file": "/etc/aerospike/secret/password_DC1.txt", + "namespaces": []map[string]interface{}{ + { + "name": "test", + }, + }, + }, + }, + } + + aeroCluster.Spec.AerospikeConfig.Value["namespaces"] = []interface{}{ + getSCNamespaceConfigWithSet("test", "/test/dev/xvdf"), + } + + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + AfterEach( + func() { + aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + _ = deleteCluster(k8sClient, ctx, aeroCluster) + }, + ) + + It( + "Should update config dynamically", func() { + + By("Modify all dynamic config fields") + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + schemaMap, err := configschema.NewSchemaMap() + if err != nil { + os.Exit(1) + } + + schemaMapLogger := ctrl.Log.WithName("schema-map") + asconfig.InitFromMap(schemaMapLogger, schemaMap) + + pods, err := getPodList(aeroCluster, k8sClient) + Expect(err).ToNot(HaveOccurred()) + + podPIDMap, err := getPodIDs(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + pod := aeroCluster.Status.Pods[pods.Items[0].Name] + + host, err := createHost(&pod) + Expect(err).ToNot(HaveOccurred()) + + asinfo := info.NewAsInfo( + logger, host, getClientPolicy(aeroCluster, k8sClient), + ) + + dynamic, err := asconfig.GetDynamic("7.0.0") + Expect(err).ToNot(HaveOccurred()) + + serverConf, err := asconfig.GenerateConf(logger, asinfo, false) + Expect(err).ToNot(HaveOccurred()) + + server, err := asconfig.NewMapAsConfig(logger, serverConf.Conf) + Expect(err).ToNot(HaveOccurred()) + + flatServer := server.GetFlatMap() + + spec, err := asconfig.NewMapAsConfig(logger, aeroCluster.Spec.AerospikeConfig.Value) + Expect(err).ToNot(HaveOccurred()) + + flatSpec := spec.GetFlatMap() + + By("Verify Service Context configs dynamically") + err = validateServiceContextDynamically(ctx, flatServer, flatSpec, aeroCluster, dynamic) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + By("Verify no warm/cold restarts in Pods") + validateServerRestart(ctx, aeroCluster, podPIDMap, false) + + By("Verify Network Context configs dynamically") + err = validateNetworkContextDynamically(ctx, flatServer, flatSpec, aeroCluster, dynamic) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + By("Verify no warm/cold restarts in Pods") + validateServerRestart(ctx, aeroCluster, podPIDMap, false) + + By("Verify Namespace Context configs dynamically") + err = validateNamespaceContextDynamically(ctx, flatServer, flatSpec, aeroCluster, dynamic) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + By("Verify no warm/cold restarts in Pods") + validateServerRestart(ctx, aeroCluster, podPIDMap, false) + + By("Verify Security Context configs dynamically") + err = validateSecurityContextDynamically(ctx, flatServer, flatSpec, aeroCluster, dynamic) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + By("Verify no warm/cold restarts in Pods") + validateServerRestart(ctx, aeroCluster, podPIDMap, false) + + By("Verify XDR Context configs dynamically") + err = validateXDRContextDynamically(ctx, flatServer, flatSpec, aeroCluster, dynamic) + Expect(err).ToNot(HaveOccurred()) + + By("Verify no warm/cold restarts in Pods") + validateServerRestart(ctx, aeroCluster, podPIDMap, false) + }, + ) + }, + ) }, ) @@ -348,3 +516,193 @@ func getPodIDs(ctx context.Context, aeroCluster *asdbv1.AerospikeCluster) (map[s return pidMap, nil } + +func updateValue(val interface{}) interface{} { + switch val2 := val.(type) { + case []string: + + case string: + + case bool: + + case int: + return val2 + 1 + case uint64: + return val2 + 1 + case int64: + return val2 + 1 + case float64: + return val2 + 1 + + case lib.Stats: + + default: + return nil + } + + return nil +} + +func validateServiceContextDynamically( + ctx goctx.Context, flatServer, flatSpec *asconfig.Conf, + aeroCluster *asdbv1.AerospikeCluster, dynamic mapset.Set[string], +) error { + newSpec := *flatSpec + ignoredConf := mapset.NewSet("cluster-name") + + for confKey, val := range *flatServer { + if asconfig.ContextKey(confKey) != "service" { + continue + } + + tokens := strings.Split(confKey, ".") + + if dyn := dynamic.Contains(asconfig.GetFlatKey(tokens)); dyn && !ignoredConf.Contains(asconfig.BaseKey(confKey)) { + v := updateValue(val) + if v != nil { + if configWithMaxDefaultVal.Contains(asconfig.BaseKey(confKey)) { + v = v.(uint64) - 1 + } + + if asconfig.BaseKey(confKey) == "proto-fd-idle-ms" { + v = 70000 + } + + newSpec[confKey] = v + } + } + } + + newConf := asconfig.New(logger, &newSpec) + newMap := *newConf.ToMap() + + aeroCluster.Spec.AerospikeConfig.Value["service"] = lib.DeepCopy(newMap["service"]) + + return updateCluster(k8sClient, ctx, aeroCluster) +} + +func validateNetworkContextDynamically( + ctx goctx.Context, flatServer, flatSpec *asconfig.Conf, + aeroCluster *asdbv1.AerospikeCluster, dynamic mapset.Set[string], +) error { + newSpec := *flatSpec + ignoredConf := mapset.NewSet("connect-timeout-ms") + + for confKey, val := range *flatServer { + if asconfig.ContextKey(confKey) != "network" { + continue + } + + tokens := strings.Split(confKey, ".") + + if dyn := dynamic.Contains(asconfig.GetFlatKey(tokens)); dyn && !ignoredConf.Contains(asconfig.BaseKey(confKey)) { + v := updateValue(val) + if v != nil { + newSpec[confKey] = v + } + } + } + + newConf := asconfig.New(logger, &newSpec) + newMap := *newConf.ToMap() + + aeroCluster.Spec.AerospikeConfig.Value["network"] = lib.DeepCopy(newMap["network"]) + + return updateCluster(k8sClient, ctx, aeroCluster) +} + +func validateNamespaceContextDynamically( + ctx goctx.Context, flatServer, flatSpec *asconfig.Conf, + aeroCluster *asdbv1.AerospikeCluster, dynamic mapset.Set[string], +) error { + newSpec := *flatSpec + ignoredConf := mapset.NewSet("rack-id", "default-ttl") + + for confKey, val := range *flatServer { + if asconfig.ContextKey(confKey) != "namespaces" { + continue + } + + tokens := strings.Split(confKey, ".") + if dyn := dynamic.Contains(asconfig.GetFlatKey(tokens)); dyn && !ignoredConf.Contains(asconfig.BaseKey(confKey)) { + v := updateValue(val) + if v != nil { + if configWithMaxDefaultVal.Contains(asconfig.BaseKey(confKey)) { + v = v.(int64) - 1 + } + + newSpec[confKey] = v + } + } + } + + newConf := asconfig.New(logger, &newSpec) + newMap := *newConf.ToMap() + + aeroCluster.Spec.AerospikeConfig.Value["namespaces"] = lib.DeepCopy(newMap["namespaces"]) + + return updateCluster(k8sClient, ctx, aeroCluster) +} + +func validateSecurityContextDynamically( + ctx goctx.Context, flatServer, flatSpec *asconfig.Conf, + aeroCluster *asdbv1.AerospikeCluster, dynamic mapset.Set[string], +) error { + newSpec := *flatSpec + + for confKey, val := range *flatServer { + if asconfig.ContextKey(confKey) != "security" { + continue + } + + tokens := strings.Split(confKey, ".") + if dyn := dynamic.Contains(asconfig.GetFlatKey(tokens)); dyn { + v := updateValue(val) + if v != nil { + newSpec[confKey] = v + } + } + } + + newConf := asconfig.New(logger, &newSpec) + newMap := *newConf.ToMap() + + aeroCluster.Spec.AerospikeConfig.Value["security"] = lib.DeepCopy(newMap["security"]) + + return updateCluster(k8sClient, ctx, aeroCluster) +} + +func validateXDRContextDynamically( + ctx goctx.Context, flatServer, flatSpec *asconfig.Conf, + aeroCluster *asdbv1.AerospikeCluster, dynamic mapset.Set[string], +) error { + newSpec := *flatSpec + + for confKey, val := range *flatServer { + if asconfig.ContextKey(confKey) != "xdr" { + continue + } + + tokens := strings.Split(confKey, ".") + if dyn := dynamic.Contains(asconfig.GetFlatKey(tokens)); dyn { + v := updateValue(val) + if v != nil { + switch asconfig.BaseKey(confKey) { + case "max-throughput": + v = v.(int64) + 99 + case "transaction-queue-limit": + v = (v.(int64) - 1) * 2 + } + + newSpec[confKey] = v + } + } + } + + newConf := asconfig.New(logger, &newSpec) + newMap := *newConf.ToMap() + + aeroCluster.Spec.AerospikeConfig.Value["xdr"] = lib.DeepCopy(newMap["xdr"]) + + return updateCluster(k8sClient, ctx, aeroCluster) +}