Skip to content

Commit

Permalink
Disallow DC and Namespace addition/removal dynamically from xdr.
Browse files Browse the repository at this point in the history
  • Loading branch information
tanmayja committed Jan 9, 2024
1 parent 8422b3b commit 0eac6fa
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 94 deletions.
2 changes: 1 addition & 1 deletion api/v1/aerospikecluster_validating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -1578,7 +1578,7 @@ func validateNsConfUpdate(newConfSpec, oldConfSpec, currentStatus *AerospikeConf

if singleConf["name"] == oldSingleConf["name"] {
// replication-factor update not allowed
val, err := asconfig.CompareVersions(incomingVersion, Version6)
val, err := lib.CompareVersions(incomingVersion, Version6)
if err != nil {
return fmt.Errorf("failed to check image version: %v", err)
}
Expand Down
21 changes: 18 additions & 3 deletions controllers/aero_info_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ package controllers

import (
"fmt"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

as "github.com/aerospike/aerospike-client-go/v6"
asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1"
Expand Down Expand Up @@ -312,8 +314,10 @@ func (r *SingleClusterReconciler) setDynamicConfig(
}

podList := make([]corev1.Pod, 0, len(pods))
podIPNameMap := make(map[string]string, len(pods))

for idx := range pods {
podIPNameMap[pods[idx].Status.PodIP] = pods[idx].Name
podList = append(podList, *pods[idx])
}

Expand All @@ -332,15 +336,26 @@ func (r *SingleClusterReconciler) setDynamicConfig(
return reconcileSuccess()
}

asConfCmds, err := deployment.CreateConfigSetCmdList(diffs, selectedHostConns[0].ASConn, r.getClientPolicy())
asConfCmds, err := deployment.CreateConfigSetCmdList(r.Log, diffs, selectedHostConns[0].ASConn, r.getClientPolicy())
if err != nil {
// Assuming error returned here will not be a server error.
return reconcileError(err)
}

r.Log.Info("printing commands", "asConfCmds", fmt.Sprintf("%v", asConfCmds))

if err := deployment.SetConfigCommandsOnHosts(r.Log, policy, allHostConns, selectedHostConns, asConfCmds); err != nil {
return reconcileError(err)
for _, host := range selectedHostConns {
if err := deployment.SetConfigCommandsOnHost(r.Log, policy, allHostConns, host, asConfCmds); err != nil {
if strings.HasPrefix(err.Error(), "ServerError:") {
return reconcileError(reconcile.TerminalError(err))
}

return reconcileError(err)
}

if err := r.updatePod(podIPNameMap[host.ASConn.AerospikeHostName]); err != nil {
return reconcileError(err)
}
}

return reconcileSuccess()
Expand Down
140 changes: 74 additions & 66 deletions controllers/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,21 @@ func (r *SingleClusterReconciler) rollingRestartPods(
return reconcileSuccess()
}

func (r *SingleClusterReconciler) restartOrUpdateAerospikeServer(rackState *RackState,
pod *corev1.Pod, operation RestartType) error {
cmName := getNamespacedNameForSTSConfigMap(r.aeroCluster, rackState.Rack.ID)
func (r *SingleClusterReconciler) restartOrUpdateAerospikeServer(podName string,
operation RestartType) error {
rackID, err := utils.GetRackIDFromPodName(podName)
if err != nil {
return fmt.Errorf(
"failed to get rackID for the pod %s", podName,
)
}

podNamespacedName := types.NamespacedName{
Name: podName,
Namespace: r.aeroCluster.Namespace,
}

cmName := getNamespacedNameForSTSConfigMap(r.aeroCluster, *rackID)
initBinary := "/etc/aerospike/akoinit"

var subCommand string
Expand All @@ -243,7 +255,7 @@ func (r *SingleClusterReconciler) restartOrUpdateAerospikeServer(rackState *Rack
// Quick restart attempt should not take significant time.
// Therefore, it's ok to block the operator on the quick restart attempt.
stdout, stderr, err := utils.Exec(
pod, asdbv1.AerospikeServerContainerName, cmd, r.KubeClient,
podNamespacedName, asdbv1.AerospikeServerContainerName, cmd, r.KubeClient,
r.KubeConfig,
)
if err != nil {
Expand All @@ -258,21 +270,21 @@ func (r *SingleClusterReconciler) restartOrUpdateAerospikeServer(rackState *Rack
// Quick restart attempt should not take significant time.
// Therefore, it's ok to block the operator on the quick restart attempt.
stdout, stderr, err = utils.Exec(
pod, asdbv1.AerospikeServerContainerName, cmd, r.KubeClient,
podNamespacedName, asdbv1.AerospikeServerContainerName, cmd, r.KubeClient,
r.KubeConfig,
)

if err != nil {
r.Log.V(1).Info(
"Failed warm restart", "err", err, "podName", pod.Name, "stdout",
"Failed warm restart", "err", err, "podName", podNamespacedName.Name, "stdout",
stdout, "stderr", stderr,
)

return err
}
} else {
r.Log.V(1).Info(
"Failed to perform", "operation", subCommand, "err", err, "podName", pod.Name, "stdout",
"Failed to perform", "operation", subCommand, "err", err, "podName", podNamespacedName.Name, "stdout",
stdout, "stderr", stderr,
)

Expand All @@ -283,15 +295,15 @@ func (r *SingleClusterReconciler) restartOrUpdateAerospikeServer(rackState *Rack
if subCommand == "quick-restart" {
r.Recorder.Eventf(
r.aeroCluster, corev1.EventTypeNormal, "PodWarmRestarted",
"[rack-%d] Restarted Pod %s", rackState.Rack.ID, pod.Name,
"[rack-%d] Restarted Pod %s", *rackID, podNamespacedName.Name,
)
r.Log.V(1).Info("Pod warm restarted", "podName", pod.Name)
r.Log.V(1).Info("Pod warm restarted", "podName", podNamespacedName.Name)
} else {
r.Recorder.Eventf(
r.aeroCluster, corev1.EventTypeNormal, "PodConfUpdated",
"[rack-%d] Updated Pod %s", rackState.Rack.ID, pod.Name,
"[rack-%d] Updated Pod %s", *rackID, podNamespacedName.Name,
)
r.Log.V(1).Info("Pod conf updated", "podName", pod.Name)
r.Log.V(1).Info("Pod conf updated", "podName", podNamespacedName.Name)
}

return nil
Expand All @@ -315,7 +327,7 @@ func (r *SingleClusterReconciler) restartPods(

if restartType == quickRestart {
// If ASD restart fails then go ahead and restart the pod
if err := r.restartOrUpdateAerospikeServer(rackState, pod, quickRestart); err == nil {
if err := r.restartOrUpdateAerospikeServer(pod.Name, quickRestart); err == nil {
continue
}
}
Expand All @@ -337,42 +349,16 @@ func (r *SingleClusterReconciler) restartPods(
return reconcileSuccess()
}

func (r *SingleClusterReconciler) updatePods(
rackState *RackState, podsToUpdate []*corev1.Pod,
) reconcileResult {
restartedPods := make([]*corev1.Pod, 0, len(podsToUpdate))

for idx := range podsToUpdate {
pod := podsToUpdate[idx]

r.Log.Info("updating pod", "pod", pod.Name)
// If update Conf fails then go ahead and restart the asd
var err error
if err = r.restartOrUpdateAerospikeServer(rackState, pod, noRestartUpdateConf); err == nil {
continue
}

r.Log.Info("Restarting pod as updating failed", "pod", pod.Name, "err", err)
// If ASD restart fails then go ahead and restart the pod
if err := r.restartOrUpdateAerospikeServer(rackState, pod, quickRestart); err == nil {
continue
}

if err := r.Client.Delete(context.TODO(), pod); err != nil {
r.Log.Error(err, "Failed to delete pod")
return reconcileError(err)
}

restartedPods = append(restartedPods, pod)
func (r *SingleClusterReconciler) updatePod(podName string) error {
r.Log.Info("updating pod", "pod", podName)

r.Log.V(1).Info("Pod deleted", "podName", pod.Name)
if err := r.restartOrUpdateAerospikeServer(podName, noRestartUpdateConf); err != nil {
return err
}

if len(restartedPods) > 0 {
return r.ensurePodsRunningAndReady(restartedPods)
}
r.Log.V(1).Info("Pod Updated", "podName", podName)

return reconcileSuccess()
return nil
}

func (r *SingleClusterReconciler) ensurePodsRunningAndReady(podsToCheck []*corev1.Pod) reconcileResult {
Expand Down Expand Up @@ -1096,7 +1082,7 @@ func (r *SingleClusterReconciler) handleNSOrDeviceRemoval(rackState *RackState,
}

for _, pod := range podsToRestart {
err := r.handleNSOrDeviceRemovalPerPod(removedDevices, removedFiles, pod)
err := r.handleNSOrDeviceRemovalPerPod(removedDevices, removedFiles, pod.Name)
if err != nil {
return err
}
Expand All @@ -1106,12 +1092,12 @@ func (r *SingleClusterReconciler) handleNSOrDeviceRemoval(rackState *RackState,
}

func (r *SingleClusterReconciler) handleNSOrDeviceRemovalPerPod(
removedDevices, removedFiles []string, pod *corev1.Pod,
removedDevices, removedFiles []string, podName string,
) error {
podStatus := r.aeroCluster.Status.Pods[pod.Name]
podStatus := r.aeroCluster.Status.Pods[podName]

for _, file := range removedFiles {
err := r.deleteFileStorage(pod, file)
err := r.deleteFileStorage(podName, file)
if err != nil {
return err
}
Expand All @@ -1126,7 +1112,7 @@ func (r *SingleClusterReconciler) handleNSOrDeviceRemovalPerPod(

patch1 := jsonpatch.PatchOperation{
Operation: "replace",
Path: "/status/pods/" + pod.Name + "/dirtyVolumes",
Path: "/status/pods/" + podName + "/dirtyVolumes",
Value: sets.List(dirtyVolumes),
}
patches = append(patches, patch1)
Expand Down Expand Up @@ -1271,22 +1257,25 @@ func getVolumeNameFromDevicePath(volumes []asdbv1.VolumeSpec, s string) string {
return ""
}

func (r *SingleClusterReconciler) deleteFileStorage(pod *corev1.Pod, fileName string) error {
func (r *SingleClusterReconciler) deleteFileStorage(podName, fileName string) error {
cmd := []string{
"bash", "-c", fmt.Sprintf(
"rm -rf %s",
fileName,
),
}
r.Log.Info(
"Deleting file", "file", fileName, "cmd", cmd, "podname", pod.Name,
"Deleting file", "file", fileName, "cmd", cmd, "podname", podName,
)

stdout, stderr, err := utils.Exec(pod, asdbv1.AerospikeServerContainerName, cmd, r.KubeClient, r.KubeConfig)
podNamespacedName := types.NamespacedName{Name: podName, Namespace: r.aeroCluster.Namespace}

stdout, stderr, err := utils.Exec(podNamespacedName, asdbv1.AerospikeServerContainerName,
cmd, r.KubeClient, r.KubeConfig)

if err != nil {
r.Log.V(1).Info(
"File deletion failed", "err", err, "podName", pod.Name, "stdout",
"File deletion failed", "err", err, "podName", podName, "stdout",
stdout, "stderr", stderr,
)

Expand Down Expand Up @@ -1329,24 +1318,43 @@ func (r *SingleClusterReconciler) handleDynamicConfigChange(rackState *RackState
return nil, fmt.Errorf("failed to load config map by lib: %v", err)
}

flatStatusConf := *asConfStatus.GetFlatMap()
flatSpecConf := *asConfSpec.GetFlatMap()
specToStatusDiffs, err := asconfig.ConfDiff(r.Log, *asConfSpec.GetFlatMap(), *asConfStatus.GetFlatMap(),
true, "7.0.0")
if err != nil {
r.Log.Info("failed to get config diff, fallback to rolling restart: %v", err)
return nil, nil
}

specToStatusDiffs := asconfig.ConfDiff(r.Log, flatSpecConf, flatStatusConf, true, "7.0.0")
r.Log.Info("print diff outside", "difference", fmt.Sprintf("%v", specToStatusDiffs))
if len(specToStatusDiffs) > 0 {
r.Log.Info("print diff outside", "difference", fmt.Sprintf("%v", specToStatusDiffs))

if len(specToStatusDiffs) > 1 {
r.Log.V(1).Info("Multiple config change in single go is not supported,"+
"falling back to rolling restart if needed", "diff", fmt.Sprintf("%v", specToStatusDiffs))
if checkXDRDCOrNamespaceAdded(specToStatusDiffs) {
r.Log.Info("XDR DC or Namespace added, not supported dynamically")
return nil, nil
}

return nil, nil
}
isDynamic, err := asconfig.IsAllDynamicConfig(r.Log, specToStatusDiffs, "7.0.0")
if err != nil {
r.Log.Info("failed to check if all config is dynamic, fallback to rolling restart: %v", err)
return nil, nil
}

isDynamic := asconfig.IsAllDynamicConfig(specToStatusDiffs, "7.0.0")
if !isDynamic {
r.Log.Info("Static field has been modified, cannot change config dynamically")
return nil, nil
if !isDynamic {
r.Log.Info("Static field has been modified, cannot change config dynamically")
return nil, nil
}
}

return specToStatusDiffs, nil
}

func checkXDRDCOrNamespaceAdded(diffs map[string]map[string]interface{}) bool {
for k := range diffs {
tokens := strings.Split(k, ".")
if tokens[0] == "xdr" && tokens[len(tokens)-1] == "name" {
return true
}
}

return false
}
4 changes: 0 additions & 4 deletions controllers/rack.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,10 +501,6 @@ func (r *SingleClusterReconciler) updateDynamicConfig(rackState *RackState,
return res
}

if res := r.updatePods(rackState, podsToUpdate); !res.isSuccess {
return res
}

r.Recorder.Eventf(
r.aeroCluster, corev1.EventTypeNormal, "RackDynamicUpdate",
"[rack-%d] Finished Dynamic update", rackState.Rack.ID,
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/aerospike/aerospike-client-go/v6 v6.14.0
github.com/aerospike/aerospike-management-lib v0.0.0-20240103175849-7fd218327d01
github.com/aerospike/aerospike-management-lib v0.0.0-20240109070650-a9a5815dd8e7
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/go-logr/logr v1.2.4
Expand Down Expand Up @@ -68,7 +68,6 @@ require (
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
Expand Down
Loading

0 comments on commit 0eac6fa

Please sign in to comment.