From efa1ba448105f40e7afdbd731cac496ee92e2cf3 Mon Sep 17 00:00:00 2001 From: Tanmay Jain <103629776+tanmayja@users.noreply.github.com> Date: Thu, 11 Jul 2024 13:33:13 +0530 Subject: [PATCH 1/2] [KO-211] A way to trigger a warm and cold rolling restart on demand (#294) * Added a way to trigger a warm and cold rolling restart on demand using the `Operations` field in the CR * Allowing only a single operation at a time. * Disallow operation modification * Blocking image upgrade and rack scaleup along with on-demand operations --- Dockerfile | 2 +- Jenkinsfile | 4 +- Makefile | 2 +- api/v1/aerospikecluster_types.go | 38 ++ api/v1/aerospikecluster_validating_webhook.go | 98 ++-- api/v1/utils.go | 61 +++ api/v1/zz_generated.deepcopy.go | 34 ++ .../asdb.aerospike.com_aerospikeclusters.yaml | 51 ++ ...rnetes-operator.clusterserviceversion.yaml | 4 + controllers/pod.go | 237 ++++++++- controllers/rack.go | 20 +- controllers/reconciler.go | 2 - ..._aerospikeclusters.asdb.aerospike.com.yaml | 51 ++ pkg/utils/pod.go | 11 - test/access_control_test.go | 19 + test/batch_restart_pods_test.go | 18 +- test/cluster_test.go | 36 +- test/large_reconcile_test.go | 36 +- test/on_demand_operations_test.go | 481 ++++++++++++++++++ test/storage_wipe_test.go | 69 +-- test/suite_test.go | 2 +- test/test_client.go | 5 +- test/utils.go | 47 +- test/warm_restart_test.go | 19 +- 24 files changed, 1130 insertions(+), 217 deletions(-) create mode 100644 test/on_demand_operations_test.go diff --git a/Dockerfile b/Dockerfile index 46cff61df..66072d160 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # Build the manager binary -FROM --platform=$BUILDPLATFORM golang:1.21 as builder +FROM --platform=$BUILDPLATFORM golang:1.21 AS builder # OS and Arch args ARG TARGETOS diff --git a/Jenkinsfile b/Jenkinsfile index 95b2201b8..a34b36e10 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -80,8 +80,8 @@ pipeline { sh "./snyk-linux test --severity-threshold=high --fail-on=all" // Scan the operator images - sh "./snyk-linux container test ${OPERATOR_CONTAINER_IMAGE_CANDIDATE_NAME} --severity-threshold=high --file=Dockerfile --policy-path=.snyk --fail-on=all" - sh "./snyk-linux container test ${OPERATOR_BUNDLE_IMAGE_CANDIDATE_NAME} --severity-threshold=high --file=Dockerfile --policy-path=.snyk --fail-on=all" + sh "./snyk-linux container test ${OPERATOR_CONTAINER_IMAGE_CANDIDATE_NAME} --severity-threshold=high --file=Dockerfile --policy-path=.snyk --fail-on=all" + sh "./snyk-linux container test ${OPERATOR_BUNDLE_IMAGE_CANDIDATE_NAME} --severity-threshold=high --file=Dockerfile --policy-path=.snyk --fail-on=all" } } } diff --git a/Makefile b/Makefile index d4181b9f5..132ef14bc 100644 --- a/Makefile +++ b/Makefile @@ -140,7 +140,7 @@ go-lint: golanci-lint ## Run golangci-lint against code. .PHONY: test test: manifests generate fmt vet envtest ## Run tests. # KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test ./... -coverprofile cover.out - KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" cd $(shell pwd)/test; go run github.com/onsi/ginkgo/v2/ginkgo -coverprofile cover.out -progress -v -timeout=12h0m0s -focus=${FOCUS} --junit-report="junit.xml" -- ${ARGS} + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" cd $(shell pwd)/test; go run github.com/onsi/ginkgo/v2/ginkgo -coverprofile cover.out -show-node-events -v -timeout=12h0m0s -focus=${FOCUS} --junit-report="junit.xml" -- ${ARGS} ##@ Build diff --git a/api/v1/aerospikecluster_types.go b/api/v1/aerospikecluster_types.go index bcee242a2..18d235822 100644 --- a/api/v1/aerospikecluster_types.go +++ b/api/v1/aerospikecluster_types.go @@ -124,6 +124,31 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Kubernetes Node BlockList" // +kubebuilder:validation:MinItems:=1 K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"` + // Operations is a list of on-demand operations to be performed on the Aerospike cluster. + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Operations" + // +kubebuilder:validation:MaxItems:=1 + Operations []OperationSpec `json:"operations,omitempty"` +} + +type OperationKind string + +const ( + // OperationWarmRestart is the on-demand operation that leads to the warm restart of the aerospike pods + // (Restarting ASD in the pods). https://aerospike.com/docs/cloud/kubernetes/operator/Warm-restart + OperationWarmRestart OperationKind = "WarmRestart" + + // OperationPodRestart is the on-demand operation that leads to the restart of aerospike pods. + OperationPodRestart OperationKind = "PodRestart" +) + +type OperationSpec struct { + // Kind is the type of operation to be performed on the Aerospike cluster. + // +kubebuilder:validation:Enum=WarmRestart;PodRestart + Kind OperationKind `json:"kind"` + // +kubebuilder:validation:MaxLength=20 + // +kubebuilder:validation:MinLength=1 + ID string `json:"id"` + PodList []string `json:"podList,omitempty"` } type SeedsFinderServices struct { @@ -699,6 +724,8 @@ type AerospikeClusterStatusSpec struct { //nolint:govet // for readability RosterNodeBlockList []string `json:"rosterNodeBlockList,omitempty"` // K8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods. K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"` + // Operations is a list of on-demand operation to be performed on the Aerospike cluster. + Operations []OperationSpec `json:"operations,omitempty"` } // AerospikeClusterStatus defines the observed state of AerospikeCluster @@ -1044,6 +1071,12 @@ func CopySpecToStatus(spec *AerospikeClusterSpec) (*AerospikeClusterStatusSpec, status.K8sNodeBlockList = *k8sNodeBlockList } + if len(spec.Operations) != 0 { + operations := lib.DeepCopy(&spec.Operations).(*[]OperationSpec) + + status.Operations = *operations + } + return &status, nil } @@ -1143,5 +1176,10 @@ func CopyStatusToSpec(status *AerospikeClusterStatusSpec) (*AerospikeClusterSpec spec.K8sNodeBlockList = *k8sNodeBlockList } + if len(status.Operations) != 0 { + operations := lib.DeepCopy(&status.Operations).(*[]OperationSpec) + spec.Operations = *operations + } + return &spec, nil } diff --git a/api/v1/aerospikecluster_validating_webhook.go b/api/v1/aerospikecluster_validating_webhook.go index 1e060cd16..5c08d6af7 100644 --- a/api/v1/aerospikecluster_validating_webhook.go +++ b/api/v1/aerospikecluster_validating_webhook.go @@ -117,6 +117,12 @@ func (c *AerospikeCluster) ValidateUpdate(oldObj runtime.Object) (admission.Warn return nil, err } + if err := validateOperationUpdate( + &old.Spec, &c.Spec, &c.Status, + ); err != nil { + return nil, err + } + // Validate AerospikeConfig update if err := validateAerospikeConfigUpdate( aslog, incomingVersion, outgoingVersion, @@ -191,6 +197,10 @@ func (c *AerospikeCluster) validate(aslog logr.Logger) error { return err } + if err := c.validateOperation(); err != nil { + return err + } + // Storage should be validated before validating aerospikeConfig and fileStorage if err := validateStorage(&c.Spec.Storage, &c.Spec.PodSpec); err != nil { return err @@ -263,6 +273,19 @@ func (c *AerospikeCluster) validate(aslog logr.Logger) error { return c.validateSCNamespaces() } +func (c *AerospikeCluster) validateOperation() error { + // Nothing to validate if no operation + if len(c.Spec.Operations) == 0 { + return nil + } + + if c.Status.AerospikeConfig == nil { + return fmt.Errorf("operation cannot be added during aerospike cluster creation") + } + + return nil +} + func (c *AerospikeCluster) validateSCNamespaces() error { scNamespaceSet := sets.NewString() @@ -1292,20 +1315,22 @@ func validateSecurityConfigUpdate( func validateEnableSecurityConfig(newConfSpec, oldConfSpec *AerospikeConfigSpec) error { newConf := newConfSpec.Value oldConf := oldConfSpec.Value + oldSec, oldSecConfFound := oldConf["security"] - newSec, newSecConfFound := newConf["security"] + if !oldSecConfFound { + return nil + } - if oldSecConfFound && !newSecConfFound { + newSec, newSecConfFound := newConf["security"] + if !newSecConfFound { return fmt.Errorf("cannot remove cluster security config") } - if oldSecConfFound && newSecConfFound { - oldSecFlag, oldEnableSecurityFlagFound := oldSec.(map[string]interface{})["enable-security"] - newSecFlag, newEnableSecurityFlagFound := newSec.(map[string]interface{})["enable-security"] + oldSecFlag, oldEnableSecurityFlagFound := oldSec.(map[string]interface{})["enable-security"] + newSecFlag, newEnableSecurityFlagFound := newSec.(map[string]interface{})["enable-security"] - if oldEnableSecurityFlagFound && oldSecFlag.(bool) && (!newEnableSecurityFlagFound || !newSecFlag.(bool)) { - return fmt.Errorf("cannot disable cluster security in running cluster") - } + if oldEnableSecurityFlagFound && oldSecFlag.(bool) && (!newEnableSecurityFlagFound || !newSecFlag.(bool)) { + return fmt.Errorf("cannot disable cluster security in running cluster") } return nil @@ -2360,33 +2385,46 @@ func (c *AerospikeCluster) validateEnableDynamicConfigUpdate() error { return nil } -func getMinRunningInitVersion(pods map[string]AerospikePodStatus) (string, error) { - minVersion := "" +func validateOperationUpdate(oldSpec, newSpec *AerospikeClusterSpec, status *AerospikeClusterStatus) error { + if len(newSpec.Operations) == 0 { + return nil + } - for idx := range pods { - if pods[idx].InitImage != "" { - version, err := GetImageVersion(pods[idx].InitImage) - if err != nil { - return "", err - } + newOp := &newSpec.Operations[0] - if minVersion == "" { - minVersion = version - continue - } + var oldOp *OperationSpec - val, err := lib.CompareVersions(version, minVersion) - if err != nil { - return "", fmt.Errorf("failed to check image version: %v", err) - } + if len(oldSpec.Operations) != 0 { + oldOp = &oldSpec.Operations[0] + } - if val < 0 { - minVersion = version - } - } else { - return baseInitVersion, nil + if oldOp != nil && oldOp.ID == newOp.ID && !reflect.DeepEqual(oldOp, newOp) { + return fmt.Errorf("operation %s cannot be updated", newOp.ID) + } + + allPodNames := GetAllPodNames(status.Pods) + + podSet := sets.New(newSpec.Operations[0].PodList...) + if !allPodNames.IsSuperset(podSet) { + return fmt.Errorf("invalid pod names in operation %v", podSet.Difference(allPodNames).UnsortedList()) + } + + // Don't allow any on-demand operation along with these cluster change: + // 1- scale up + // 2- racks added or removed + // 3- image update + // New pods won't be available for operation + if !reflect.DeepEqual(newSpec.Operations, status.Operations) { + switch { + case newSpec.Size > status.Size: + return fmt.Errorf("cannot change Spec.Operations along with cluster scale-up") + case len(newSpec.RackConfig.Racks) != len(status.RackConfig.Racks) || + len(newSpec.RackConfig.Racks) != len(oldSpec.RackConfig.Racks): + return fmt.Errorf("cannot change Spec.Operations along with rack addition/removal") + case newSpec.Image != status.Image || newSpec.Image != oldSpec.Image: + return fmt.Errorf("cannot change Spec.Operations along with image update") } } - return minVersion, nil + return nil } diff --git a/api/v1/utils.go b/api/v1/utils.go index 833b67b0a..ce14db874 100644 --- a/api/v1/utils.go +++ b/api/v1/utils.go @@ -7,6 +7,8 @@ import ( "regexp" "strings" + "k8s.io/apimachinery/pkg/util/sets" + v1 "k8s.io/api/core/v1" "k8s.io/utils/ptr" @@ -536,3 +538,62 @@ func GetDefaultPasswordFilePath(aerospikeConfigSpec *AerospikeConfigSpec) *strin return &passFile } + +func getMinRunningInitVersion(pods map[string]AerospikePodStatus) (string, error) { + minVersion := "" + + for idx := range pods { + if pods[idx].InitImage != "" { + version, err := GetImageVersion(pods[idx].InitImage) + if err != nil { + return "", err + } + + if minVersion == "" { + minVersion = version + continue + } + + val, err := lib.CompareVersions(version, minVersion) + if err != nil { + return "", fmt.Errorf("failed to check image version: %v", err) + } + + if val < 0 { + minVersion = version + } + } else { + return baseInitVersion, nil + } + } + + return minVersion, nil +} + +func DistributeItems(totalItems, totalGroups int) []int { + itemsPerGroup, extraItems := totalItems/totalGroups, totalItems%totalGroups + + // Distributing nodes in given racks + var topology []int + + for groupIdx := 0; groupIdx < totalGroups; groupIdx++ { + itemsForThisGroup := itemsPerGroup + if groupIdx < extraItems { + itemsForThisGroup++ + } + + topology = append(topology, itemsForThisGroup) + } + + return topology +} + +func GetAllPodNames(pods map[string]AerospikePodStatus) sets.Set[string] { + podNames := make(sets.Set[string]) + + for podName := range pods { + podNames.Insert(podName) + } + + return podNames +} diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 41e05a81f..03adc6db2 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -203,6 +203,13 @@ func (in *AerospikeClusterSpec) DeepCopyInto(out *AerospikeClusterSpec) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.Operations != nil { + in, out := &in.Operations, &out.Operations + *out = make([]OperationSpec, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AerospikeClusterSpec. @@ -300,6 +307,13 @@ func (in *AerospikeClusterStatusSpec) DeepCopyInto(out *AerospikeClusterStatusSp *out = make([]string, len(*in)) copy(*out, *in) } + if in.Operations != nil { + in, out := &in.Operations, &out.Operations + *out = make([]OperationSpec, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AerospikeClusterStatusSpec. @@ -824,6 +838,26 @@ func (in *MountOptions) DeepCopy() *MountOptions { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OperationSpec) DeepCopyInto(out *OperationSpec) { + *out = *in + if in.PodList != nil { + in, out := &in.PodList, &out.PodList + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OperationSpec. +func (in *OperationSpec) DeepCopy() *OperationSpec { + if in == nil { + return nil + } + out := new(OperationSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PersistentVolumeSpec) DeepCopyInto(out *PersistentVolumeSpec) { *out = *in diff --git a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml index 1984ff2b2..7b5ad432e 100644 --- a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml +++ b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml @@ -312,6 +312,32 @@ spec: This value is used to create PodDisruptionBudget. Defaults to 1. Refer Aerospike documentation for more details. x-kubernetes-int-or-string: true + operations: + description: Operations is a list of on-demand operations to be performed + on the Aerospike cluster. + items: + properties: + id: + maxLength: 20 + minLength: 1 + type: string + kind: + description: Kind is the type of operation to be performed on + the Aerospike cluster. + enum: + - WarmRestart + - PodRestart + type: string + podList: + items: + type: string + type: array + required: + - id + - kind + type: object + maxItems: 1 + type: array operatorClientCert: description: Certificates to connect to Aerospike. properties: @@ -9651,6 +9677,31 @@ spec: is the port requested by the user. Deprecated: MultiPodPerHost is now part of podSpec" type: boolean + operations: + description: Operations is a list of on-demand operation to be performed + on the Aerospike cluster. + items: + properties: + id: + maxLength: 20 + minLength: 1 + type: string + kind: + description: Kind is the type of operation to be performed on + the Aerospike cluster. + enum: + - WarmRestart + - PodRestart + type: string + podList: + items: + type: string + type: array + required: + - id + - kind + type: object + type: array operatorClientCertSpec: description: Certificates to connect to Aerospike. If omitted then certs are taken from the secret 'aerospike-secret'. diff --git a/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml b/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml index 95b650fb0..964726d13 100644 --- a/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml +++ b/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml @@ -71,6 +71,10 @@ spec: for more details. displayName: Max Unavailable path: maxUnavailable + - description: Operations is a list of on-demand operations to be performed + on the Aerospike cluster. + displayName: Operations + path: operations - description: Certificates to connect to Aerospike. displayName: Operator Client Cert path: operatorClientCert diff --git a/controllers/pod.go b/controllers/pod.go index 1eee1214f..04708fc1d 100644 --- a/controllers/pod.go +++ b/controllers/pod.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net" + "reflect" "strconv" "strings" "time" @@ -81,6 +82,9 @@ func (r *SingleClusterReconciler) getRollingRestartTypeMap(rackState *RackState, blockedK8sNodes := sets.NewString(r.aeroCluster.Spec.K8sNodeBlockList...) requiredConfHash := confMap.Data[aerospikeConfHashFileName] + // Fetching all pods requested for on-demand operations. + onDemandQuickRestarts, onDemandPodRestarts := r.podsToRestart() + for idx := range pods { if ignorablePodNames.Has(pods[idx].Name) { continue @@ -136,7 +140,7 @@ func (r *SingleClusterReconciler) getRollingRestartTypeMap(rackState *RackState, } restartTypeMap[pods[idx].Name] = r.getRollingRestartTypePod(rackState, pods[idx], confMap, addedNSDevices, - len(dynamicConfDiffPerPod[pods[idx].Name]) > 0) + len(dynamicConfDiffPerPod[pods[idx].Name]) > 0, onDemandQuickRestarts, onDemandPodRestarts) // Fallback to rolling restart in case of partial failure to recover with the desired Aerospike config if podStatus.DynamicConfigUpdateStatus == asdbv1.PartiallyFailed { @@ -150,6 +154,7 @@ func (r *SingleClusterReconciler) getRollingRestartTypeMap(rackState *RackState, func (r *SingleClusterReconciler) getRollingRestartTypePod( rackState *RackState, pod *corev1.Pod, confMap *corev1.ConfigMap, addedNSDevices []string, onlyDynamicConfigChange bool, + onDemandQuickRestarts, onDemandPodRestarts sets.Set[string], ) RestartType { restartType := noRestart @@ -221,6 +226,13 @@ func (r *SingleClusterReconciler) getRollingRestartTypePod( "newTCPPort", r.getReadinessProbe().TCPSocket.String()) } + if opType := r.onDemandOperationType(pod.Name, onDemandQuickRestarts, onDemandPodRestarts); opType != noRestart { + restartType = mergeRestartType(restartType, opType) + + r.Log.Info("Pod warm/cold restart requested. Need rolling restart", + "pod name", pod.Name, "operation", opType, "restartType", restartType) + } + return restartType } @@ -358,6 +370,8 @@ func (r *SingleClusterReconciler) restartPods( } restartedPods := make([]*corev1.Pod, 0, len(podsToRestart)) + restartedPodNames := make([]string, 0, len(podsToRestart)) + restartedASDPodNames := make([]string, 0, len(podsToRestart)) blockedK8sNodes := sets.NewString(r.aeroCluster.Spec.K8sNodeBlockList...) for idx := range podsToRestart { @@ -366,33 +380,43 @@ func (r *SingleClusterReconciler) restartPods( restartType := restartTypeMap[pod.Name] if restartType == quickRestart { - // If ASD restart fails, then go ahead and restart the pod - if err := r.restartASDOrUpdateAerospikeConf(pod.Name, quickRestart); err == nil { - continue + // We assume that the pod server image supports pod warm restart. + if err := r.restartASDOrUpdateAerospikeConf(pod.Name, quickRestart); err != nil { + r.Log.Error(err, "Failed to warm restart pod", "podName", pod.Name) + return reconcileError(err) } - } - if blockedK8sNodes.Has(pod.Spec.NodeName) { - r.Log.Info("Pod found in blocked nodes list, deleting corresponding local PVCs if any", - "podName", pod.Name) + restartedASDPodNames = append(restartedASDPodNames, pod.Name) + } else if restartType == podRestart { + if blockedK8sNodes.Has(pod.Spec.NodeName) { + r.Log.Info("Pod found in blocked nodes list, deleting corresponding local PVCs if any", + "podName", pod.Name) - if err := r.deleteLocalPVCs(rackState, pod); err != nil { + if err := r.deleteLocalPVCs(rackState, pod); err != nil { + return reconcileError(err) + } + } + + if err := r.Client.Delete(context.TODO(), pod); err != nil { + r.Log.Error(err, "Failed to delete pod") return reconcileError(err) } - } - if err := r.Client.Delete(context.TODO(), pod); err != nil { - r.Log.Error(err, "Failed to delete pod") - return reconcileError(err) - } + restartedPods = append(restartedPods, pod) + restartedPodNames = append(restartedPodNames, pod.Name) - restartedPods = append(restartedPods, pod) + r.Log.V(1).Info("Pod deleted", "podName", pod.Name) + } + } - r.Log.V(1).Info("Pod deleted", "podName", pod.Name) + if err := r.updateOperationStatus(restartedASDPodNames, restartedPodNames); err != nil { + return reconcileError(err) } if len(restartedPods) > 0 { - return r.ensurePodsRunningAndReady(restartedPods) + if result := r.ensurePodsRunningAndReady(restartedPods); !result.isSuccess { + return result + } } return reconcileSuccess() @@ -1466,3 +1490,182 @@ func (r *SingleClusterReconciler) patchPodStatus(ctx context.Context, patches [] return nil }) } + +func (r *SingleClusterReconciler) onDemandOperationType(podName string, onDemandQuickRestarts, + onDemandPodRestarts sets.Set[string]) RestartType { + switch { + case onDemandQuickRestarts.Has(podName): + return quickRestart + case onDemandPodRestarts.Has(podName): + return podRestart + } + + return noRestart +} + +func (r *SingleClusterReconciler) updateOperationStatus(restartedASDPodNames, restartedPodNames []string) error { + if len(restartedASDPodNames)+len(restartedPodNames) == 0 || len(r.aeroCluster.Spec.Operations) == 0 { + return nil + } + + statusOps := lib.DeepCopy(r.aeroCluster.Status.Operations).([]asdbv1.OperationSpec) + + allPodNames := asdbv1.GetAllPodNames(r.aeroCluster.Status.Pods) + + quickRestartsSet := sets.New(restartedASDPodNames...) + podRestartsSet := sets.New(restartedPodNames...) + + specOp := r.aeroCluster.Spec.Operations[0] + + var specPods sets.Set[string] + + // If no pod list is provided, it indicates that all pods need to be restarted. + if len(specOp.PodList) == 0 { + specPods = allPodNames + } else { + specPods = sets.New(specOp.PodList...) + } + + opFound := false + + for idx := range statusOps { + statusOp := &statusOps[idx] + if statusOp.ID == specOp.ID { + opFound = true + + if len(statusOp.PodList) != 0 { + statusPods := sets.New(statusOp.PodList...) + + if statusOp.Kind == asdbv1.OperationWarmRestart { + if quickRestartsSet != nil { + statusOp.PodList = statusPods.Union(quickRestartsSet.Intersection(specPods)).UnsortedList() + } + + // If the operation is a warm restart and the pod undergoes a cold restart for any reason, + // we will still consider the warm restart operation as completed for that pod. + if podRestartsSet != nil { + statusOp.PodList = statusPods.Union(podRestartsSet.Intersection(specPods)).UnsortedList() + } + } + + if statusOp.Kind == asdbv1.OperationPodRestart && podRestartsSet != nil { + statusOp.PodList = statusPods.Union(podRestartsSet.Intersection(specPods)).UnsortedList() + } + } + + break + } + } + + if !opFound { + var podList []string + + if specOp.Kind == asdbv1.OperationWarmRestart { + if quickRestartsSet != nil { + podList = quickRestartsSet.Intersection(specPods).UnsortedList() + } + + // If the operation is a warm restart and the pod undergoes a cold restart for any reason, + // we will still consider the warm restart operation as completed for that pod. + if podRestartsSet != nil { + podList = append(podList, podRestartsSet.Intersection(specPods).UnsortedList()...) + } + } + + if specOp.Kind == asdbv1.OperationPodRestart && podRestartsSet != nil { + podList = podRestartsSet.Intersection(specPods).UnsortedList() + } + + statusOps = append(statusOps, asdbv1.OperationSpec{ + ID: specOp.ID, + Kind: specOp.Kind, + PodList: podList, + }) + } + + // Get the old object, it may have been updated in between. + newAeroCluster := &asdbv1.AerospikeCluster{} + if err := r.Client.Get( + context.TODO(), types.NamespacedName{ + Name: r.aeroCluster.Name, Namespace: r.aeroCluster.Namespace, + }, newAeroCluster, + ); err != nil { + return err + } + + newAeroCluster.Status.Operations = statusOps + + if err := r.patchStatus(newAeroCluster); err != nil { + return fmt.Errorf("error updating status: %w", err) + } + + return nil +} + +// podsToRestart returns the pods that need to be restarted(quick/pod restart) based on the on-demand operations. +func (r *SingleClusterReconciler) podsToRestart() (quickRestarts, podRestarts sets.Set[string]) { + quickRestarts = make(sets.Set[string]) + podRestarts = make(sets.Set[string]) + + specOps := r.aeroCluster.Spec.Operations + statusOps := r.aeroCluster.Status.Operations + allPodNames := asdbv1.GetAllPodNames(r.aeroCluster.Status.Pods) + + // If no spec operations, no pods to restart + // If the Spec.Operations and Status.Operations are equal, no pods to restart. + if len(specOps) == 0 || reflect.DeepEqual(specOps, statusOps) { + return quickRestarts, podRestarts + } + + // Assuming only one operation is present in the spec. + specOp := specOps[0] + + var ( + podsToRestart, specPods sets.Set[string] + ) + // If no pod list is provided, it indicates that all pods need to be restarted. + if len(specOp.PodList) == 0 { + specPods = allPodNames + } else { + specPods = sets.New(specOp.PodList...) + } + + opFound := false + + // If the operation is not present in the status, all pods need to be restarted. + // If the operation is present in the status, only the pods that are not present in the status need to be restarted. + // If the operation is present in the status and podList is empty, no pods need to be restarted. + for _, statusOp := range statusOps { + if statusOp.ID != specOp.ID { + continue + } + + var statusPods sets.Set[string] + if len(statusOp.PodList) == 0 { + statusPods = allPodNames + } else { + statusPods = sets.New(statusOp.PodList...) + } + + podsToRestart = specPods.Difference(statusPods) + opFound = true + + break + } + + if !opFound { + podsToRestart = specPods + } + + // Separate pods to be restarted based on operation type + if podsToRestart != nil && podsToRestart.Len() > 0 { + switch specOp.Kind { + case asdbv1.OperationWarmRestart: + quickRestarts.Insert(podsToRestart.UnsortedList()...) + case asdbv1.OperationPodRestart: + podRestarts.Insert(podsToRestart.UnsortedList()...) + } + } + + return quickRestarts, podRestarts +} diff --git a/controllers/rack.go b/controllers/rack.go index c8306759f..c98ad2018 100644 --- a/controllers/rack.go +++ b/controllers/rack.go @@ -1824,26 +1824,8 @@ func isContainerNameInStorageVolumeAttachments( return false } -func splitRacks(nodes, racks int) []int { - nodesPerRack, extraNodes := nodes/racks, nodes%racks - - // Distributing nodes in given racks - var topology []int - - for rackIdx := 0; rackIdx < racks; rackIdx++ { - nodesForThisRack := nodesPerRack - if rackIdx < extraNodes { - nodesForThisRack++ - } - - topology = append(topology, nodesForThisRack) - } - - return topology -} - func getConfiguredRackStateList(aeroCluster *asdbv1.AerospikeCluster) []RackState { - topology := splitRacks( + topology := asdbv1.DistributeItems( int(aeroCluster.Spec.Size), len(aeroCluster.Spec.RackConfig.Racks), ) diff --git a/controllers/reconciler.go b/controllers/reconciler.go index c39fcd6fa..74d3cbf92 100644 --- a/controllers/reconciler.go +++ b/controllers/reconciler.go @@ -507,8 +507,6 @@ func (r *SingleClusterReconciler) updateAccessControlStatus() error { return fmt.Errorf("error updating status: %w", err) } - r.aeroCluster.Status.AerospikeClusterStatusSpec.AerospikeAccessControl = statusAerospikeAccessControl - r.Log.Info("Updated access control status", "status", newAeroCluster.Status) return nil diff --git a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml index 1984ff2b2..7b5ad432e 100644 --- a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml +++ b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml @@ -312,6 +312,32 @@ spec: This value is used to create PodDisruptionBudget. Defaults to 1. Refer Aerospike documentation for more details. x-kubernetes-int-or-string: true + operations: + description: Operations is a list of on-demand operations to be performed + on the Aerospike cluster. + items: + properties: + id: + maxLength: 20 + minLength: 1 + type: string + kind: + description: Kind is the type of operation to be performed on + the Aerospike cluster. + enum: + - WarmRestart + - PodRestart + type: string + podList: + items: + type: string + type: array + required: + - id + - kind + type: object + maxItems: 1 + type: array operatorClientCert: description: Certificates to connect to Aerospike. properties: @@ -9651,6 +9677,31 @@ spec: is the port requested by the user. Deprecated: MultiPodPerHost is now part of podSpec" type: boolean + operations: + description: Operations is a list of on-demand operation to be performed + on the Aerospike cluster. + items: + properties: + id: + maxLength: 20 + minLength: 1 + type: string + kind: + description: Kind is the type of operation to be performed on + the Aerospike cluster. + enum: + - WarmRestart + - PodRestart + type: string + podList: + items: + type: string + type: array + required: + - id + - kind + type: object + type: array operatorClientCertSpec: description: Certificates to connect to Aerospike. If omitted then certs are taken from the secret 'aerospike-secret'. diff --git a/pkg/utils/pod.go b/pkg/utils/pod.go index 8a4d6abfb..5addec1fc 100644 --- a/pkg/utils/pod.go +++ b/pkg/utils/pod.go @@ -119,17 +119,6 @@ func IsPodTerminating(pod *corev1.Pod) bool { return pod.DeletionTimestamp != nil } -// GetPod get pod from pod list by name -func GetPod(podName string, pods []corev1.Pod) *corev1.Pod { - for idx := range pods { - if podName == pods[idx].Name { - return &pods[idx] - } - } - - return nil -} - // GetRackIDFromPodName returns the rack id given a pod name. func GetRackIDFromPodName(podName string) (*int, error) { parts := strings.Split(podName, "-") diff --git a/test/access_control_test.go b/test/access_control_test.go index 78e2cf495..449c838f1 100644 --- a/test/access_control_test.go +++ b/test/access_control_test.go @@ -2191,6 +2191,8 @@ var _ = Describe( racks := getDummyRackConf(1, 2) aeroCluster.Spec.RackConfig.Racks = racks aeroCluster.Spec.RackConfig.Namespaces = []string{"test"} + // Setting incorrect secret name so that access control reconciler could not set the password for admin. + aeroCluster.Spec.AerospikeAccessControl.Users[0].SecretName = "incorrectSecretName" // This file is already added in the storage volume backed by the secret. aeroCluster.Spec.AerospikeConfig.Value["security"] = map[string]interface{}{ "default-password-file": "/etc/aerospike/secret/password.conf", @@ -2214,6 +2216,7 @@ var _ = Describe( Eventually(func() error { clientPolicy := getClientPolicy(aeroCluster, k8sClient) clientPolicy.Password = pass + clientPolicy.FailIfNotConnected = true client, cerr := getClientWithPolicy( pkgLog, aeroCluster, k8sClient, clientPolicy) @@ -2231,11 +2234,27 @@ var _ = Describe( return nil }, 5*time.Minute).ShouldNot(HaveOccurred()) + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + // Set correct secret name for admin user credentials. + aeroCluster.Spec.AerospikeAccessControl.Users[0].SecretName = authSecretName + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + By("Try scaleup") err = scaleUpClusterTest( k8sClient, ctx, clusterNamespacedName, 1, ) Expect(err).ToNot(HaveOccurred()) + + if aeroCluster != nil { + err = deleteCluster( + k8sClient, ctx, aeroCluster, + ) + Expect(err).ToNot(HaveOccurred()) + } }) }) }, diff --git a/test/batch_restart_pods_test.go b/test/batch_restart_pods_test.go index 5a9342c79..6e05d9cab 100644 --- a/test/batch_restart_pods_test.go +++ b/test/batch_restart_pods_test.go @@ -30,7 +30,7 @@ func percent(val string) *intstr.IntOrString { } func count(val int) *intstr.IntOrString { - v := intstr.FromInt(val) + v := intstr.FromInt32(int32(val)) return &v } @@ -528,9 +528,7 @@ func batchRollingRestartTest( aeroCluster.Spec.RackConfig.RollingUpdateBatchSize = batchSize aeroCluster.Spec.PodSpec.AerospikeContainerSpec.Resources = unschedulableResource() - err = updateClusterForBatchRestart(k8sClient, ctx, aeroCluster) - - return err + return updateClusterForBatchRestart(k8sClient, ctx, aeroCluster) } func batchUpgradeTest( @@ -546,9 +544,7 @@ func batchUpgradeTest( aeroCluster.Spec.RackConfig.RollingUpdateBatchSize = batchSize aeroCluster.Spec.Image = unavailableImage - err = updateClusterForBatchRestart(k8sClient, ctx, aeroCluster) - - return err + return updateClusterForBatchRestart(k8sClient, ctx, aeroCluster) } func rollingRestartTest( @@ -569,9 +565,7 @@ func rollingRestartTest( aeroCluster.Spec.PodSpec.AerospikeContainerSpec.Resources = unschedulableResource() } - err = updateCluster(k8sClient, ctx, aeroCluster) - - return err + return updateCluster(k8sClient, ctx, aeroCluster) } func upgradeTest( @@ -587,7 +581,5 @@ func upgradeTest( aeroCluster.Spec.RackConfig.RollingUpdateBatchSize = batchSize aeroCluster.Spec.Image = image - err = updateCluster(k8sClient, ctx, aeroCluster) - - return err + return updateCluster(k8sClient, ctx, aeroCluster) } diff --git a/test/cluster_test.go b/test/cluster_test.go index 85a3d8146..1c19952c9 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -245,7 +245,7 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { aeroCluster.Spec.RackConfig.MaxIgnorablePods = &val aeroCluster.Spec.AerospikeConfig.Value["security"].(map[string]interface{})["enable-quotas"] = true - // As pod is in pending state, CR object will be won't reach the final phase. + // As pod is in pending state, CR object won't reach the final phase. // So expectedPhases can be InProgress or Completed return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases) }, 1*time.Minute).ShouldNot(HaveOccurred()) @@ -255,7 +255,7 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) Expect(err).ToNot(HaveOccurred()) aeroCluster.Spec.Image = nextImage - // As pod is in pending state, CR object will be won't reach the final phase. + // As pod is in pending state, CR object won't reach the final phase. // So expectedPhases can be InProgress or Completed return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases) }, 1*time.Minute).ShouldNot(HaveOccurred()) @@ -273,11 +273,41 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { // There should be only one pending pod Expect(counter).To(Equal(1)) + By("Executing on-demand operation") + Eventually(func() error { + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + }, + } + aeroCluster.Spec.Operations = operations + // As pod is in pending state, CR object won't reach the final phase. + // So expectedPhases can be InProgress or Completed + return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases) + }, 1*time.Minute).ShouldNot(HaveOccurred()) + + By("Verify pending pod") + podList, err = getPodList(aeroCluster, k8sClient) + + counter = 0 + + for idx := range podList.Items { + if podList.Items[idx].Status.Phase == v1.PodPending { + counter++ + } + } + // There should be only one pending pod + Expect(counter).To(Equal(1)) + By("Scale down 1 pod") aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) Expect(err).ToNot(HaveOccurred()) aeroCluster.Spec.Size-- - // As pod is in pending state, CR object will be won't reach the final phase. + // As pod is in pending state, CR object won't reach the final phase. // So expectedPhases can be InProgress or Completed err = updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases) Expect(err).ToNot(HaveOccurred()) diff --git a/test/large_reconcile_test.go b/test/large_reconcile_test.go index 1a5b9a8a2..470b41a55 100644 --- a/test/large_reconcile_test.go +++ b/test/large_reconcile_test.go @@ -198,41 +198,16 @@ var _ = Describe( func loadDataInCluster( k8sClient client.Client, aeroCluster *asdbv1.AerospikeCluster, ) error { - policy := getClientPolicy(aeroCluster, k8sClient) - policy.FailIfNotConnected = false - policy.Timeout = time.Minute * 2 - policy.UseServicesAlternate = true - policy.ConnectionQueueSize = 100 - policy.LimitConnectionsToQueueSize = true - - hostList := make([]*as.Host, 0, len(aeroCluster.Status.Pods)) - - for podName := range aeroCluster.Status.Pods { - pod := aeroCluster.Status.Pods[podName] - - host, err := createHost(&pod) - if err != nil { - return err - } - - hostList = append(hostList, host) - } - - clientP, err := as.NewClientWithPolicyAndHost(policy, hostList...) - if clientP == nil { - return fmt.Errorf( - "failed to create aerospike cluster asClient: %v", err, - ) + asClient, err := getAerospikeClient(aeroCluster, k8sClient) + if err != nil { + return err } - asClient := *clientP defer func() { fmt.Println("Closing Aerospike client") asClient.Close() }() - _, _ = asClient.WarmUp(-1) - keyPrefix := "testkey" size := 100 @@ -244,11 +219,6 @@ func loadDataInCluster( return readErr } - for !asClient.IsConnected() { - pkgLog.Info("Waiting for cluster to connect") - time.Sleep(2 * time.Second) - } - pkgLog.Info( "Loading record", "nodes", asClient.GetNodeNames(), ) diff --git a/test/on_demand_operations_test.go b/test/on_demand_operations_test.go new file mode 100644 index 000000000..ed32c6bf2 --- /dev/null +++ b/test/on_demand_operations_test.go @@ -0,0 +1,481 @@ +package test + +import ( + goctx "context" + "fmt" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "k8s.io/utils/ptr" + + asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" +) + +var _ = Describe( + "OnDemandOperations", func() { + + ctx := goctx.Background() + var clusterNamespacedName = getNamespacedName( + "operations", namespace, + ) + + aeroCluster := &asdbv1.AerospikeCluster{} + + BeforeEach( + func() { + // Create a 2 node cluster + aeroCluster = createDummyRackAwareAerospikeCluster( + clusterNamespacedName, 2, + ) + + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + AfterEach( + func() { + _ = deleteCluster(k8sClient, ctx, aeroCluster) + }, + ) + + Context( + "When doing valid operations", func() { + + It( + "Should execute quickRestart operations on all pods", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + oldPodIDs, err := getPodIDs(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + }, + } + + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operationTypeMap := map[string]asdbv1.OperationKind{ + "operations-1-0": asdbv1.OperationWarmRestart, + "operations-1-1": asdbv1.OperationWarmRestart, + } + + err = validateOperationTypes(ctx, aeroCluster, oldPodIDs, operationTypeMap) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + It( + "Should execute podRestart operations on all pods", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + oldPodIDs, err := getPodIDs(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationPodRestart, + ID: "1", + }, + } + + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operationTypeMap := map[string]asdbv1.OperationKind{ + "operations-1-0": asdbv1.OperationPodRestart, + "operations-1-1": asdbv1.OperationPodRestart, + } + + err = validateOperationTypes(ctx, aeroCluster, oldPodIDs, operationTypeMap) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + It( + "Should be able to replace/remove the running operations", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + oldPodIDs, err := getPodIDs(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + }, + } + + aeroCluster.Spec.Operations = operations + + err = k8sClient.Update(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster.Spec.Operations[0].Kind = asdbv1.OperationPodRestart + aeroCluster.Spec.Operations[0].ID = "2" + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + operationTypeMap := map[string]asdbv1.OperationKind{ + "operations-1-0": asdbv1.OperationPodRestart, + "operations-1-1": asdbv1.OperationPodRestart, + } + + err = validateOperationTypes(ctx, aeroCluster, oldPodIDs, operationTypeMap) + Expect(err).ToNot(HaveOccurred()) + + // Remove operations + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster.Spec.Operations = nil + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + It( + "Should execute operations on selected pods with dynamic config change", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + oldPodIDs, err := getPodIDs(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationPodRestart, + ID: "1", + PodList: []string{"operations-1-0"}, + }, + } + + aeroCluster.Spec.EnableDynamicConfigUpdate = ptr.To(true) + aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = 18000 + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operationTypeMap := map[string]asdbv1.OperationKind{ + "operations-1-0": asdbv1.OperationPodRestart, + "operations-1-1": "noRestart", + } + + err = validateOperationTypes(ctx, aeroCluster, oldPodIDs, operationTypeMap) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + It( + "Should execute on-demand podRestart operations on all pods along with scale down", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster.Spec.Size = 4 + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + oldPodIDs, err := getPodIDs(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationPodRestart, + ID: "1", + }, + } + + aeroCluster.Spec.Operations = operations + aeroCluster.Spec.Size = 2 + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operationTypeMap := map[string]asdbv1.OperationKind{ + "operations-1-0": asdbv1.OperationPodRestart, + "operations-1-1": asdbv1.OperationPodRestart, + } + + err = validateOperationTypes(ctx, aeroCluster, oldPodIDs, operationTypeMap) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + It( + "Should execute podRestart if podSpec is changed with on-demand warm restart", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + oldPodIDs, err := getPodIDs(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster.Spec.PodSpec.AerospikeContainerSpec.Resources = schedulableResource("200Mi") + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + }, + } + + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operationTypeMap := map[string]asdbv1.OperationKind{ + "operations-1-0": asdbv1.OperationPodRestart, + "operations-1-1": asdbv1.OperationPodRestart, + } + + err = validateOperationTypes(ctx, aeroCluster, oldPodIDs, operationTypeMap) + Expect(err).ToNot(HaveOccurred()) + }, + ) + }, + ) + + Context( + "When doing invalid operations", func() { + It( + "Should fail if there are more than 1 operations", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + }, + { + Kind: asdbv1.OperationPodRestart, + ID: "2", + }, + } + + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }, + ) + + It( + "should fail if invalid pod name is mentioned in the pod list", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + PodList: []string{"operations-1-0", "invalid-pod"}, + }, + } + + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }, + ) + + It( + "should fail if operationType is modified", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + PodList: []string{"operations-1-0"}, + }, + } + + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + // Modify operationType + operations[0].Kind = asdbv1.OperationPodRestart + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }, + ) + + It( + "should fail if podList is modified", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + PodList: []string{"operations-1-0"}, + }, + } + + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + // Modify podList + operations[0].PodList = []string{"operations-1-1"} + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }, + ) + + It( + "should fail any operation along with cluster scale-up", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + PodList: []string{"operations-1-0"}, + }, + } + + aeroCluster.Spec.Operations = operations + aeroCluster.Spec.Size++ + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }, + ) + + It( + "should fail any operation along with cluster upgrade", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + PodList: []string{"operations-1-0"}, + }, + } + + aeroCluster.Spec.Operations = operations + aeroCluster.Spec.Image = nextImage + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }, + ) + }, + ) + }, +) + +func validateOperationTypes(ctx goctx.Context, aeroCluster *asdbv1.AerospikeCluster, pid map[string]podID, + operationTypeMap map[string]asdbv1.OperationKind) error { + newPodPidMap, err := getPodIDs(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + for podName, opType := range operationTypeMap { + switch opType { + case asdbv1.OperationWarmRestart: + if newPodPidMap[podName].podUID != pid[podName].podUID || newPodPidMap[podName].asdPID == pid[podName].asdPID { + return fmt.Errorf("failed to quick restart pod %s", podName) + } + case asdbv1.OperationPodRestart: + if newPodPidMap[podName].podUID == pid[podName].podUID { + return fmt.Errorf("failed to restart pod %s", podName) + } + case "noRestart": + if newPodPidMap[podName].podUID != pid[podName].podUID || newPodPidMap[podName].asdPID != pid[podName].asdPID { + return fmt.Errorf("unexpected restart pod %s", podName) + } + } + } + + return nil +} diff --git a/test/storage_wipe_test.go b/test/storage_wipe_test.go index c6133edfe..9067a904d 100644 --- a/test/storage_wipe_test.go +++ b/test/storage_wipe_test.go @@ -3,7 +3,6 @@ package test import ( goctx "context" "fmt" - "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -211,40 +210,12 @@ func writeDataToCluster( k8sClient client.Client, namespaces []string, ) error { - policy := getClientPolicy(aeroCluster, k8sClient) - policy.FailIfNotConnected = false - policy.Timeout = time.Minute * 2 - policy.UseServicesAlternate = true - policy.ConnectionQueueSize = 100 - policy.LimitConnectionsToQueueSize = true - hostList := make([]*as.Host, 0, len(aeroCluster.Status.Pods)) - - for podName := range aeroCluster.Status.Pods { - pod := aeroCluster.Status.Pods[podName] - - host, err := createHost(&pod) - if err != nil { - return err - } - - hostList = append(hostList, host) - } - - asClient, err := as.NewClientWithPolicyAndHost(policy, hostList...) - if asClient == nil { - return fmt.Errorf("aerospike client is nil %v", err) - } - - defer asClient.Close() - - if _, err = asClient.WarmUp(-1); err != nil { + asClient, err := getAerospikeClient(aeroCluster, k8sClient) + if err != nil { return err } - for !asClient.IsConnected() { - pkgLog.Info("Waiting for cluster to connect") - time.Sleep(2 * time.Second) - } + defer asClient.Close() pkgLog.Info( "Loading record", "nodes", asClient.GetNodeNames(), @@ -277,45 +248,17 @@ func checkDataInCluster( ) (map[string]bool, error) { data := make(map[string]bool) - policy := getClientPolicy(aeroCluster, k8sClient) - policy.FailIfNotConnected = false - policy.Timeout = time.Minute * 2 - policy.UseServicesAlternate = true - policy.ConnectionQueueSize = 100 - policy.LimitConnectionsToQueueSize = true - hostList := make([]*as.Host, 0, len(aeroCluster.Status.Pods)) - - for podName := range aeroCluster.Status.Pods { - pod := aeroCluster.Status.Pods[podName] - - host, err := createHost(&pod) - if err != nil { - return nil, err - } - - hostList = append(hostList, host) - } - - asClient, err := as.NewClientWithPolicyAndHost(policy, hostList...) + asClient, err := getAerospikeClient(aeroCluster, k8sClient) if err != nil { return nil, err } defer asClient.Close() - for !asClient.IsConnected() { - pkgLog.Info("Waiting for cluster to connect") - time.Sleep(2 * time.Second) - } - pkgLog.Info( "Loading record", "nodes", asClient.GetNodeNames(), ) - if _, err = asClient.WarmUp(-1); err != nil { - return nil, err - } - for _, ns := range namespaces { newKey, err := as.NewKey(ns, setName, key) if err != nil { @@ -327,8 +270,8 @@ func checkDataInCluster( return nil, nil } - if bin, ok := record.Bins[binName]; ok { - value := bin.(string) + if bin, exists := record.Bins[binName]; exists { + value, ok := bin.(string) if !ok { return nil, fmt.Errorf( diff --git a/test/suite_test.go b/test/suite_test.go index 1cd633125..b3c3ff2d3 100644 --- a/test/suite_test.go +++ b/test/suite_test.go @@ -1,5 +1,5 @@ /* -Copyright 2021. +Copyright 2024. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/test/test_client.go b/test/test_client.go index 7ea76bdbd..295b07159 100644 --- a/test/test_client.go +++ b/test/test_client.go @@ -215,9 +215,8 @@ func getClientPolicy( RootCAs: getClusterServerPool( clientCertSpec, aeroCluster.Namespace, k8sClient, ), - Certificates: []tls.Certificate{}, - PreferServerCipherSuites: true, - MinVersion: tls.VersionTLS12, + Certificates: []tls.Certificate{}, + MinVersion: tls.VersionTLS12, // used only in testing // InsecureSkipVerify: true, } diff --git a/test/utils.go b/test/utils.go index e3c5bf5c5..cf6fb89cf 100644 --- a/test/utils.go +++ b/test/utils.go @@ -26,6 +26,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + as "github.com/aerospike/aerospike-client-go/v7" asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" operatorUtils "github.com/aerospike/aerospike-kubernetes-operator/pkg/utils" lib "github.com/aerospike/aerospike-management-lib" @@ -212,7 +213,7 @@ func createAuthSecret( labels map[string]string, secretName, pass string, ) error { // Create authSecret - as := &corev1.Secret{ + secret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: secretName, Namespace: namespace, @@ -224,7 +225,7 @@ func createAuthSecret( }, } // use test context's create helper to create the object and add a cleanup function for the new object - err := k8sClient.Create(ctx, as) + err := k8sClient.Create(ctx, secret) if !errors.IsAlreadyExists(err) { return err } @@ -816,3 +817,45 @@ func getPasswordFromSecret(k8sClient client.Client, return string(passBytes), nil } + +func getAerospikeClient(aeroCluster *asdbv1.AerospikeCluster, k8sClient client.Client) (*as.Client, error) { + policy := getClientPolicy(aeroCluster, k8sClient) + policy.FailIfNotConnected = false + policy.Timeout = time.Minute * 2 + policy.UseServicesAlternate = true + policy.ConnectionQueueSize = 100 + policy.LimitConnectionsToQueueSize = true + + hostList := make([]*as.Host, 0, len(aeroCluster.Status.Pods)) + + for podName := range aeroCluster.Status.Pods { + pod := aeroCluster.Status.Pods[podName] + + host, err := createHost(&pod) + if err != nil { + return nil, err + } + + hostList = append(hostList, host) + } + + asClient, err := as.NewClientWithPolicyAndHost(policy, hostList...) + if asClient == nil { + return nil, fmt.Errorf( + "failed to create aerospike cluster asClient: %v", err, + ) + } + + _, _ = asClient.WarmUp(-1) + + // Wait for 5 minutes for cluster to connect + for j := 0; j < 150; j++ { + if isConnected := asClient.IsConnected(); isConnected { + break + } + + time.Sleep(time.Second * 2) + } + + return asClient, nil +} diff --git a/test/warm_restart_test.go b/test/warm_restart_test.go index f1fe64aa3..99ed226fc 100644 --- a/test/warm_restart_test.go +++ b/test/warm_restart_test.go @@ -28,29 +28,16 @@ var _ = Describe( WarmRestart(ctx) }, ) - It( - "Should cold start without tini", func() { - PodRestart(ctx) - }, - ) - }, ) }, ) func WarmRestart(ctx goCtx.Context) { - rollCluster(ctx, latestImage, true) -} - -func PodRestart(ctx goCtx.Context) { - image := fmt.Sprintf( - "aerospike/aerospike-server-enterprise:%s", "5.7.0.8", - ) - rollCluster(ctx, image, false) + rollCluster(ctx, latestImage) } -func rollCluster(ctx goCtx.Context, image string, expectWarmStart bool) { +func rollCluster(ctx goCtx.Context, image string) { clusterName := "warm-restart-cluster" clusterNamespacedName := getNamespacedName(clusterName, namespace) @@ -94,7 +81,7 @@ func rollCluster(ctx goCtx.Context, image string, expectWarmStart bool) { pkgLog.Info("Rolling restarted", "Markers", podToMarkerPresent) for _, marker := range podToMarkerPresent { - Expect(marker).To(Equal(expectWarmStart)) + Expect(marker).To(Equal(true)) } } From 13514b7f452da6157c6bd94bcc66427596f0d6d5 Mon Sep 17 00:00:00 2001 From: Tanmay Jain <103629776+tanmayja@users.noreply.github.com> Date: Fri, 12 Jul 2024 12:57:47 +0530 Subject: [PATCH 2/2] Fixing a bug in updating status after on-demand operation. (#301) --- controllers/pod.go | 8 ++++---- controllers/rack.go | 8 ++++---- test/cluster_test.go | 1 + 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/controllers/pod.go b/controllers/pod.go index 04708fc1d..05451e987 100644 --- a/controllers/pod.go +++ b/controllers/pod.go @@ -1537,13 +1537,13 @@ func (r *SingleClusterReconciler) updateOperationStatus(restartedASDPodNames, re statusPods := sets.New(statusOp.PodList...) if statusOp.Kind == asdbv1.OperationWarmRestart { - if quickRestartsSet != nil { + if quickRestartsSet.Len() > 0 { statusOp.PodList = statusPods.Union(quickRestartsSet.Intersection(specPods)).UnsortedList() } // If the operation is a warm restart and the pod undergoes a cold restart for any reason, // we will still consider the warm restart operation as completed for that pod. - if podRestartsSet != nil { + if podRestartsSet.Len() > 0 { statusOp.PodList = statusPods.Union(podRestartsSet.Intersection(specPods)).UnsortedList() } } @@ -1561,13 +1561,13 @@ func (r *SingleClusterReconciler) updateOperationStatus(restartedASDPodNames, re var podList []string if specOp.Kind == asdbv1.OperationWarmRestart { - if quickRestartsSet != nil { + if quickRestartsSet.Len() > 0 { podList = quickRestartsSet.Intersection(specPods).UnsortedList() } // If the operation is a warm restart and the pod undergoes a cold restart for any reason, // we will still consider the warm restart operation as completed for that pod. - if podRestartsSet != nil { + if podRestartsSet.Len() > 0 { podList = append(podList, podRestartsSet.Intersection(specPods).UnsortedList()...) } } diff --git a/controllers/rack.go b/controllers/rack.go index c98ad2018..061e25d5e 100644 --- a/controllers/rack.go +++ b/controllers/rack.go @@ -86,7 +86,7 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { // remove ignorable pods from failedPods failedPods = getNonIgnorablePods(failedPods, ignorablePodNames) if len(failedPods) != 0 { - r.Log.Info("Reconcile the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", failedPods) + r.Log.Info("Reconcile the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods)) if res = r.reconcileRack( found, state, ignorablePodNames, failedPods, @@ -94,7 +94,7 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { return res } - r.Log.Info("Reconciled the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", failedPods) + r.Log.Info("Reconciled the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods)) } // 2. Again, fetch the pods for the rack and if there are failed pods then restart them. @@ -114,14 +114,14 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { // remove ignorable pods from failedPods failedPods = getNonIgnorablePods(failedPods, ignorablePodNames) if len(failedPods) != 0 { - r.Log.Info("Restart the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", failedPods) + r.Log.Info("Restart the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods)) if _, res = r.rollingRestartRack(found, state, ignorablePodNames, nil, failedPods); !res.isSuccess { return res } - r.Log.Info("Restarted the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", failedPods) + r.Log.Info("Restarted the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods)) // Requeue after 1 second to fetch latest CR object with updated pod status return reconcileRequeueAfter(1) } diff --git a/test/cluster_test.go b/test/cluster_test.go index 1c19952c9..df802367c 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -350,6 +350,7 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { err = k8sClient.Update(ctx, pod) Expect(err).ToNot(HaveOccurred()) + // Underlying kubernetes cluster should have atleast 6 nodes to run this test successfully. By("Delete rack with id 2") aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) Expect(err).ToNot(HaveOccurred())