Skip to content

Commit

Permalink
Adding support for active-rack (#312)
Browse files Browse the repository at this point in the history
* Adding support for active-rack

* updating submodule to fetch 7.2 schema.

* changing order of recluster and set roster.

* Adding SC testcase

* updating schema
  • Loading branch information
tanmayja authored Oct 4, 2024
1 parent 512d802 commit 3997de8
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 4 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/aerospike/aerospike-kubernetes-operator
go 1.22

require (
github.com/aerospike/aerospike-management-lib v1.4.0
github.com/aerospike/aerospike-management-lib v1.4.1-0.20240905074352-532f7cf2e66b
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/go-logr/logr v1.4.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ github.com/aerospike/aerospike-backup-service v0.0.0-20240822110128-dc2b4811b9d3
github.com/aerospike/aerospike-backup-service v0.0.0-20240822110128-dc2b4811b9d3/go.mod h1:PFWhqxcMsEEyoOZtQ70b+X8xWbbemDYuitT24EPBizk=
github.com/aerospike/aerospike-client-go/v7 v7.6.1 h1:VZK6S9YKq2w6ptTk3kXXjTxG2U9M9Y7Oi3YQ+3T7wQQ=
github.com/aerospike/aerospike-client-go/v7 v7.6.1/go.mod h1:uCbSYMpjlRcH/9f26VSF/luzDDXrcDaV8c6/WIcKtT4=
github.com/aerospike/aerospike-management-lib v1.4.0 h1:wT0l3kwzXv5DV5Cd+hD0BQq3hjSIyaPX1HaUb1304TI=
github.com/aerospike/aerospike-management-lib v1.4.0/go.mod h1:3JKrmC/mLSV8SygbrPQPNV8T7bFaTMjB8wfnX25gB+4=
github.com/aerospike/aerospike-management-lib v1.4.1-0.20240905074352-532f7cf2e66b h1:x7zrG9VzAdGU4MUOVBhB8OZ8VWUgko1qqfA5ikyocWw=
github.com/aerospike/aerospike-management-lib v1.4.1-0.20240905074352-532f7cf2e66b/go.mod h1:3JKrmC/mLSV8SygbrPQPNV8T7bFaTMjB8wfnX25gB+4=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d h1:Byv0BzEl3/e6D5CLfI0j/7hiIEtvGVFPCZ7Ei2oq8iQ=
Expand Down
56 changes: 56 additions & 0 deletions internal/controller/cluster/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,17 @@ func (r *SingleClusterReconciler) Reconcile() (result ctrl.Result, recErr error)
return reconcile.Result{}, recErr
}

// Doing recluster before setting up roster to get the latest observed node list from server.
if r.IsReclusterNeeded() {
if err = deployment.InfoRecluster(
r.Log,
r.getClientPolicy(), allHostConns,
); err != nil {
r.Log.Error(err, "Failed to do recluster")
return reconcile.Result{}, err
}
}

if asdbv1.IsClusterSCEnabled(r.aeroCluster) {
if !r.IsStatusEmpty() {
if res := r.waitForClusterStability(policy, allHostConns); !res.IsSuccess {
Expand Down Expand Up @@ -1023,3 +1034,48 @@ func (r *SingleClusterReconciler) AddAPIVersionLabel(ctx context.Context) error

return r.Client.Update(ctx, aeroCluster, common.UpdateOption)
}

func (r *SingleClusterReconciler) IsReclusterNeeded() bool {
// Return false if dynamic configuration updates are disabled
if !asdbv1.GetBool(r.aeroCluster.Spec.EnableDynamicConfigUpdate) {
return false
}

for specIdx := range r.aeroCluster.Spec.RackConfig.Racks {
for statusIdx := range r.aeroCluster.Status.RackConfig.Racks {
if r.aeroCluster.Spec.RackConfig.Racks[specIdx].ID == r.aeroCluster.Status.RackConfig.Racks[statusIdx].ID &&
r.IsReclusterNeededForRack(&r.aeroCluster.Spec.RackConfig.Racks[specIdx],
&r.aeroCluster.Status.RackConfig.Racks[statusIdx]) {
return true
}
}
}

return false
}

func (r *SingleClusterReconciler) IsReclusterNeededForRack(specRack, statusRack *asdbv1.Rack) bool {
specNamespaces, ok := specRack.AerospikeConfig.Value["namespaces"].([]interface{})
if !ok {
return false
}

statusNamespaces, ok := statusRack.AerospikeConfig.Value["namespaces"].([]interface{})
if !ok {
return false
}

for _, specNamespace := range specNamespaces {
for _, statusNamespace := range statusNamespaces {
if specNamespace.(map[string]interface{})["name"] != statusNamespace.(map[string]interface{})["name"] {
continue
}

if specNamespace.(map[string]interface{})["active-rack"] != statusNamespace.(map[string]interface{})["active-rack"] {
return true
}
}
}

return false
}
2 changes: 1 addition & 1 deletion pkg/configschema/schemas
113 changes: 113 additions & 0 deletions test/cluster/dynamic_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,119 @@ var _ = Describe(
)
},
)

Context(
"When changing fields those need recluster", func() {
BeforeEach(
func() {
// Create a 4 node cluster
aeroCluster := createNonSCDummyAerospikeCluster(
clusterNamespacedName, 4,
)
aeroCluster.Spec.Storage.Volumes = append(
aeroCluster.Spec.Storage.Volumes,
asdbv1.VolumeSpec{
Name: "ns1",
Source: asdbv1.VolumeSource{
PersistentVolume: &asdbv1.PersistentVolumeSpec{
Size: resource.MustParse("1Gi"),
StorageClass: storageClass,
VolumeMode: v1.PersistentVolumeBlock,
},
},
Aerospike: &asdbv1.AerospikeServerVolumeAttachment{
Path: "/test/dev/xvdf1",
},
},
)
aeroCluster.Spec.EnableDynamicConfigUpdate = ptr.To(true)
aeroCluster.Spec.Image = "aerospike.jfrog.io/docker/aerospike/aerospike-server-enterprise-rc:7.2.0.0-rc2"
aeroCluster.Spec.PodSpec.ImagePullSecrets = []v1.LocalObjectReference{
{
Name: "regcred",
},
}
aeroCluster.Spec.RackConfig.Racks = append(aeroCluster.Spec.RackConfig.Racks,
asdbv1.Rack{
ID: 1,
},
asdbv1.Rack{
ID: 2,
})
aeroCluster.Spec.RackConfig.Namespaces = []string{
"test",
"test1",
}
nsList := aeroCluster.Spec.AerospikeConfig.Value["namespaces"].([]interface{})
nsList = append(nsList, getSCNamespaceConfig("test1", "/test/dev/xvdf1"))
aeroCluster.Spec.AerospikeConfig.Value["namespaces"] = nsList
err := deployCluster(k8sClient, ctx, aeroCluster)
Expect(err).ToNot(HaveOccurred())
},
)

AfterEach(
func() {
aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName)
Expect(err).ToNot(HaveOccurred())

_ = deleteCluster(k8sClient, ctx, aeroCluster)
},
)

It(
"Should update active-rack dynamically", func() {

By("Modify dynamic config by adding fields")

aeroCluster, err := getCluster(
k8sClient, ctx, clusterNamespacedName,
)
Expect(err).ToNot(HaveOccurred())

podPIDMap, err := getPodIDs(ctx, aeroCluster)
Expect(err).ToNot(HaveOccurred())

nsList := aeroCluster.Spec.AerospikeConfig.Value["namespaces"].([]interface{})
nsList[0].(map[string]interface{})["active-rack"] = 1
nsList[1].(map[string]interface{})["active-rack"] = 2

err = updateCluster(k8sClient, ctx, aeroCluster)
Expect(err).ToNot(HaveOccurred())

By("Fetch and verify dynamic configs")

pod := aeroCluster.Status.Pods["dynamic-config-test-1-0"]

info, err := requestInfoFromNode(logger, k8sClient, ctx, clusterNamespacedName, "namespace/test", &pod)
Expect(err).ToNot(HaveOccurred())

confs := strings.Split(info["namespace/test"], ";")
for _, conf := range confs {
if strings.Contains(conf, "effective_active_rack") {
keyValue := strings.Split(conf, "=")
Expect(keyValue[1]).To(Equal("1"))
}
}

info, err = requestInfoFromNode(logger, k8sClient, ctx, clusterNamespacedName, "namespace/test1", &pod)
Expect(err).ToNot(HaveOccurred())

confs = strings.Split(info["namespace/test1"], ";")
for _, conf := range confs {
if strings.Contains(conf, "effective_active_rack") {
keyValue := strings.Split(conf, "=")
Expect(keyValue[1]).To(Equal("2"))
}
}

By("Verify no warm/cold restarts in Pods")

validateServerRestart(ctx, aeroCluster, podPIDMap, false)
},
)
},
)
},
)

Expand Down
24 changes: 24 additions & 0 deletions test/cluster/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,30 @@ func getAerospikeConfigFromNode(log logr.Logger, k8sClient client.Client, ctx go
return confs[configContext].(lib.Stats), nil
}

func requestInfoFromNode(log logr.Logger, k8sClient client.Client, ctx goctx.Context,
clusterNamespacedName types.NamespacedName, cmd string, pod *asdbv1.AerospikePodStatus) (map[string]string, error) {
aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName)
if err != nil {
return nil, err
}

host, err := createHost(pod)
if err != nil {
return nil, err
}

asinfo := info.NewAsInfo(
log, host, getClientPolicy(aeroCluster, k8sClient),
)

confs, err := asinfo.RequestInfo(cmd)
if err != nil {
return nil, err
}

return confs, nil
}

func getPasswordFromSecret(k8sClient client.Client,
secretNamespcedName types.NamespacedName, passFileName string,
) (string, error) {
Expand Down

0 comments on commit 3997de8

Please sign in to comment.