Skip to content

Commit

Permalink
Added support for k8sNodeBlockList
Browse files Browse the repository at this point in the history
  • Loading branch information
abhishekdwivedi3060 committed Dec 12, 2023
1 parent 9e089ef commit 65f3792
Show file tree
Hide file tree
Showing 13 changed files with 310 additions and 32 deletions.
30 changes: 29 additions & 1 deletion api/v1/aerospikecluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability
// RosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Roster Node BlockList"
RosterNodeBlockList []string `json:"rosterNodeBlockList,omitempty"`
// K8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods.
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Kubernetes Node BlockList"
K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"`
}

type SeedsFinderServices struct {
Expand Down Expand Up @@ -568,9 +571,12 @@ type AerospikeStorageSpec struct { //nolint:govet // for readability
// BlockVolumePolicy contains default policies for block volumes.
BlockVolumePolicy AerospikePersistentVolumePolicySpec `json:"blockVolumePolicy,omitempty"`

// CleanupThreads contains maximum number of cleanup threads(dd or blkdiscard) per init container.
// CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container.
CleanupThreads int `json:"cleanupThreads,omitempty"`

// LocalStorageClasses contains a list of storage classes which provisions local volumes.
LocalStorageClasses []string `json:"localStorageClasses,omitempty"`

// Volumes list to attach to created pods.
// +patchMergeKey=name
// +patchStrategy=merge
Expand Down Expand Up @@ -633,6 +639,8 @@ type AerospikeClusterStatusSpec struct { //nolint:govet // for readability
SeedsFinderServices SeedsFinderServices `json:"seedsFinderServices,omitempty"`
// RosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup
RosterNodeBlockList []string `json:"rosterNodeBlockList,omitempty"`
// K8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods.
K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"`
}

// AerospikeClusterStatus defines the observed state of AerospikeCluster
Expand Down Expand Up @@ -956,6 +964,16 @@ func CopySpecToStatus(spec *AerospikeClusterSpec) (*AerospikeClusterStatusSpec,
status.RosterNodeBlockList = rosterNodeBlockList
}

if len(spec.K8sNodeBlockList) != 0 {
var k8sNodeBlockList []string

lib.DeepCopy(
&k8sNodeBlockList, &spec.K8sNodeBlockList,
)

status.K8sNodeBlockList = k8sNodeBlockList
}

return &status, nil
}

Expand Down Expand Up @@ -1047,5 +1065,15 @@ func CopyStatusToSpec(status *AerospikeClusterStatusSpec) (*AerospikeClusterSpec
spec.RosterNodeBlockList = rosterNodeBlockList
}

if len(status.K8sNodeBlockList) != 0 {
var k8sNodeBlockList []string

lib.DeepCopy(
&k8sNodeBlockList, &status.K8sNodeBlockList,
)

spec.K8sNodeBlockList = k8sNodeBlockList
}

return &spec, nil
}
15 changes: 15 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 54 additions & 6 deletions config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,12 @@ spec:
image:
description: Aerospike server image
type: string
k8sNodeBlockList:
description: K8sNodeBlockList is a list of Kubernetes nodes which
are not used for Aerospike pods.
items:
type: string
type: array
maxUnavailable:
anyOf:
- type: integer
Expand Down Expand Up @@ -5739,7 +5745,7 @@ spec:
type: string
type: object
cleanupThreads:
description: CleanupThreads contains maximum number
description: CleanupThreads contains the maximum number
of cleanup threads(dd or blkdiscard) per init container.
type: integer
filesystemVolumePolicy:
Expand Down Expand Up @@ -5794,6 +5800,12 @@ spec:
- deleteFiles
type: string
type: object
localStorageClasses:
description: LocalStorageClasses contains a list of
storage classes which provisions local volumes.
items:
type: string
type: array
volumes:
description: Volumes list to attach to created pods.
items:
Expand Down Expand Up @@ -7427,7 +7439,7 @@ spec:
type: string
type: object
cleanupThreads:
description: CleanupThreads contains maximum number
description: CleanupThreads contains the maximum number
of cleanup threads(dd or blkdiscard) per init container.
type: integer
filesystemVolumePolicy:
Expand Down Expand Up @@ -7482,6 +7494,12 @@ spec:
- deleteFiles
type: string
type: object
localStorageClasses:
description: LocalStorageClasses contains a list of
storage classes which provisions local volumes.
items:
type: string
type: array
volumes:
description: Volumes list to attach to created pods.
items:
Expand Down Expand Up @@ -8131,7 +8149,7 @@ spec:
type: string
type: object
cleanupThreads:
description: CleanupThreads contains maximum number of cleanup
description: CleanupThreads contains the maximum number of cleanup
threads(dd or blkdiscard) per init container.
type: integer
filesystemVolumePolicy:
Expand Down Expand Up @@ -8186,6 +8204,12 @@ spec:
- deleteFiles
type: string
type: object
localStorageClasses:
description: LocalStorageClasses contains a list of storage classes
which provisions local volumes.
items:
type: string
type: array
volumes:
description: Volumes list to attach to created pods.
items:
Expand Down Expand Up @@ -8915,6 +8939,12 @@ spec:
image:
description: Aerospike server image
type: string
k8sNodeBlockList:
description: K8sNodeBlockList is a list of Kubernetes nodes which
are not used for Aerospike pods.
items:
type: string
type: array
maxUnavailable:
anyOf:
- type: integer
Expand Down Expand Up @@ -14502,7 +14532,7 @@ spec:
type: string
type: object
cleanupThreads:
description: CleanupThreads contains maximum number
description: CleanupThreads contains the maximum number
of cleanup threads(dd or blkdiscard) per init container.
type: integer
filesystemVolumePolicy:
Expand Down Expand Up @@ -14557,6 +14587,12 @@ spec:
- deleteFiles
type: string
type: object
localStorageClasses:
description: LocalStorageClasses contains a list of
storage classes which provisions local volumes.
items:
type: string
type: array
volumes:
description: Volumes list to attach to created pods.
items:
Expand Down Expand Up @@ -16190,7 +16226,7 @@ spec:
type: string
type: object
cleanupThreads:
description: CleanupThreads contains maximum number
description: CleanupThreads contains the maximum number
of cleanup threads(dd or blkdiscard) per init container.
type: integer
filesystemVolumePolicy:
Expand Down Expand Up @@ -16245,6 +16281,12 @@ spec:
- deleteFiles
type: string
type: object
localStorageClasses:
description: LocalStorageClasses contains a list of
storage classes which provisions local volumes.
items:
type: string
type: array
volumes:
description: Volumes list to attach to created pods.
items:
Expand Down Expand Up @@ -16945,7 +16987,7 @@ spec:
type: string
type: object
cleanupThreads:
description: CleanupThreads contains maximum number of cleanup
description: CleanupThreads contains the maximum number of cleanup
threads(dd or blkdiscard) per init container.
type: integer
filesystemVolumePolicy:
Expand Down Expand Up @@ -17000,6 +17042,12 @@ spec:
- deleteFiles
type: string
type: object
localStorageClasses:
description: LocalStorageClasses contains a list of storage classes
which provisions local volumes.
items:
type: string
type: array
volumes:
description: Volumes list to attach to created pods.
items:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ spec:
- description: Aerospike server image
displayName: Server Image
path: image
- description: K8sNodeBlockList is a list of Kubernetes nodes which are not
used for Aerospike pods.
displayName: Kubernetes Node BlockList
path: k8sNodeBlockList
- description: MaxUnavailable is the percentage/number of pods that can be allowed
to go down or unavailable before application disruption. This value is used
to create PodDisruptionBudget. Defaults to 1.
Expand Down
2 changes: 1 addition & 1 deletion controllers/aero_info_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (r *SingleClusterReconciler) waitForMultipleNodesSafeStopReady(
return reconcileSuccess()
}

// Remove a node only if cluster is stable
// Remove a node only if the cluster is stable
if err := r.waitForAllSTSToBeReady(ignorablePodNames); err != nil {
return reconcileError(fmt.Errorf("failed to wait for cluster to be ready: %v", err))
}
Expand Down
41 changes: 36 additions & 5 deletions controllers/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,23 @@ func (r *SingleClusterReconciler) getRollingRestartTypeMap(
return nil, err
}

blockedK8sNodes := sets.NewString(r.aeroCluster.Spec.K8sNodeBlockList...)
requiredConfHash := confMap.Data[aerospikeConfHashFileName]

for idx := range pods {
if ignorablePodNames.Has(pods[idx].Name) {
continue
}

if blockedK8sNodes.Has(pods[idx].Spec.NodeName) {
r.Log.Info("Pod found in blocked nodes list, will be migrated to a different node",
"podName", pods[idx].Name)

restartTypeMap[pods[idx].Name] = podRestart

continue
}

podStatus := r.aeroCluster.Status.Pods[pods[idx].Name]
if addedNSDevices == nil && podStatus.AerospikeConfigHash != requiredConfHash {
// Fetching all block devices that have been added in namespaces.
Expand Down Expand Up @@ -260,19 +270,29 @@ func (r *SingleClusterReconciler) restartPods(
}

restartedPods := make([]*corev1.Pod, 0, len(podsToRestart))
blockedK8sNodes := sets.NewString(r.aeroCluster.Spec.K8sNodeBlockList...)

for idx := range podsToRestart {
pod := podsToRestart[idx]
// Check if this pod needs restart
restartType := restartTypeMap[pod.Name]

if restartType == quickRestart {
// If ASD restart fails then go ahead and restart the pod
// If ASD restart fails, then go ahead and restart the pod
if err := r.restartASDInPod(rackState, pod); err == nil {
continue
}
}

if blockedK8sNodes.Has(pod.Spec.NodeName) {
r.Log.Info("Pod found in blocked nodes list, deleting corresponding local PVCs if any",
"podName", pod.Name)

if err := r.deleteLocalPVCs(rackState, pod); err != nil {
return reconcileError(err)
}
}

if err := r.Client.Delete(context.TODO(), pod); err != nil {
r.Log.Error(err, "Failed to delete pod")
return reconcileError(err)
Expand Down Expand Up @@ -414,16 +434,27 @@ func (r *SingleClusterReconciler) deletePodAndEnsureImageUpdated(
return reconcileError(err)
}

blockedK8sNodes := sets.NewString(r.aeroCluster.Spec.K8sNodeBlockList...)

// Delete pods
for _, p := range podsToUpdate {
if err := r.Client.Delete(context.TODO(), p); err != nil {
for _, pod := range podsToUpdate {
if blockedK8sNodes.Has(pod.Spec.NodeName) {
r.Log.Info("Pod found in blocked nodes list, deleting corresponding local PVCs if any",
"podName", pod.Name)

if err := r.deleteLocalPVCs(rackState, pod); err != nil {
return reconcileError(err)
}
}

if err := r.Client.Delete(context.TODO(), pod); err != nil {
return reconcileError(err)
}

r.Log.V(1).Info("Pod deleted", "podName", p.Name)
r.Log.V(1).Info("Pod deleted", "podName", pod.Name)
r.Recorder.Eventf(
r.aeroCluster, corev1.EventTypeNormal, "PodWaitUpdate",
"[rack-%d] Waiting to update Pod %s", rackState.Rack.ID, p.Name,
"[rack-%d] Waiting to update Pod %s", rackState.Rack.ID, pod.Name,
)
}

Expand Down
26 changes: 26 additions & 0 deletions controllers/pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,32 @@ func (r *SingleClusterReconciler) removePVCsAsync(
return deletedPVCs, nil
}

func (r *SingleClusterReconciler) deleteLocalPVCs(rackState *RackState, pod *corev1.Pod) error {
pvcItems, err := r.getPodsPVCList([]string{pod.Name}, rackState.Rack.ID)
if err != nil {
return fmt.Errorf("could not find pvc for pod %v: %v", pod.Name, err)
}

for idx := range pvcItems {
pvcStorageClass := pvcItems[idx].Spec.StorageClassName
if pvcStorageClass == nil {
r.Log.Info("PVC does not have storageClass set, no need to delete PVC", "pvcName", pvcItems[idx].Name)

continue
}

if utils.ContainsString(rackState.Rack.Storage.LocalStorageClasses, *pvcStorageClass) {
if err := r.Client.Delete(context.TODO(), &pvcItems[idx]); err != nil {
return fmt.Errorf(
"could not delete pvc %s: %v", pvcItems[idx].Name, err,
)
}
}
}

return nil
}

func (r *SingleClusterReconciler) waitForPVCTermination(deletedPVCs []corev1.PersistentVolumeClaim) error {
if len(deletedPVCs) == 0 {
return nil
Expand Down
Loading

0 comments on commit 65f3792

Please sign in to comment.