Skip to content

Commit

Permalink
KO-285: Allow Operator operations with failed pods (#257)
Browse files Browse the repository at this point in the history
* Allow Operator operations with pending and failed pods

Co-authored-by: Sudhanshu Ranjan <[email protected]>
  • Loading branch information
abhishekdwivedi3060 and sud82 authored Dec 4, 2023
1 parent cfdd748 commit a8fba60
Show file tree
Hide file tree
Showing 19 changed files with 666 additions and 148 deletions.
12 changes: 12 additions & 0 deletions api/v1/aerospikecluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Seeds Finder Services"
SeedsFinderServices SeedsFinderServices `json:"seedsFinderServices,omitempty"`
// RosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Roster Node BlockList"
RosterNodeBlockList []string `json:"rosterNodeBlockList,omitempty"`
}

Expand Down Expand Up @@ -276,6 +277,17 @@ type RackConfig struct { //nolint:govet // for readability
// RollingUpdateBatchSize is the percentage/number of rack pods that will be restarted simultaneously
// +optional
RollingUpdateBatchSize *intstr.IntOrString `json:"rollingUpdateBatchSize,omitempty"`
// MaxIgnorablePods is the maximum number/percentage of pending/failed pods in a rack that are ignored while
// assessing cluster stability. Pods identified using this value are not considered part of the cluster.
// Additionally, in SC mode clusters, these pods are removed from the roster.
// This is particularly useful when some pods are stuck in pending/failed state due to any scheduling issues and
// cannot be fixed by simply updating the CR.
// It enables the operator to perform specific operations on the cluster, like changing Aerospike configurations,
// without being hindered by these problematic pods.
// Remember to set MaxIgnorablePods back to 0 once the required operation is done.
// This makes sure that later on, all pods are properly counted when evaluating the cluster stability.
// +optional
MaxIgnorablePods *intstr.IntOrString `json:"maxIgnorablePods,omitempty"`
}

// Rack specifies single rack config
Expand Down
44 changes: 29 additions & 15 deletions api/v1/aerospikecluster_validating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,24 +605,11 @@ func (c *AerospikeCluster) validateRackConfig(_ logr.Logger) error {

// Validate batch upgrade/restart param
if c.Spec.RackConfig.RollingUpdateBatchSize != nil {
// Just validate if RollingUpdateBatchSize is valid number or string.
randomNumber := 100

count, err := intstr.GetScaledValueFromIntOrPercent(
c.Spec.RackConfig.RollingUpdateBatchSize, randomNumber, false,
)
if err != nil {
if err := validateIntOrStringField(c.Spec.RackConfig.RollingUpdateBatchSize,
"spec.rackConfig.rollingUpdateBatchSize"); err != nil {
return err
}

// Only negative is not allowed. Any big number can be given.
if count < 0 {
return fmt.Errorf(
"can not use negative rackConfig.RollingUpdateBatchSize %s",
c.Spec.RackConfig.RollingUpdateBatchSize.String(),
)
}

if len(c.Spec.RackConfig.Racks) < 2 {
return fmt.Errorf("can not use rackConfig.RollingUpdateBatchSize when number of racks is less than two")
}
Expand Down Expand Up @@ -651,6 +638,13 @@ func (c *AerospikeCluster) validateRackConfig(_ logr.Logger) error {
}
}

// Validate MaxIgnorablePods param
if c.Spec.RackConfig.MaxIgnorablePods != nil {
if err := validateIntOrStringField(c.Spec.RackConfig.MaxIgnorablePods,
"spec.rackConfig.maxIgnorablePods"); err != nil {
return err
}
}
// TODO: should not use batch if racks are less than replication-factor
return nil
}
Expand Down Expand Up @@ -2176,3 +2170,23 @@ func (c *AerospikeCluster) validateNetworkPolicy(namespace string) error {

return nil
}

func validateIntOrStringField(value *intstr.IntOrString, fieldPath string) error {
randomNumber := 100
// Just validate if value is valid number or string.
count, err := intstr.GetScaledValueFromIntOrPercent(value, randomNumber, false)
if err != nil {
return err
}

// Only negative is not allowed. Any big number can be given.
if count < 0 {
return fmt.Errorf("can not use negative %s: %s", fieldPath, value.String())
}

if value.Type == intstr.String && count > 100 {
return fmt.Errorf("%s: %s must not be greater than 100 percent", fieldPath, value.String())
}

return nil
}
5 changes: 5 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4593,6 +4593,24 @@ spec:
Aerospike cluster. Pods will be deployed in given racks based on
given configuration
properties:
maxIgnorablePods:
anyOf:
- type: integer
- type: string
description: MaxIgnorablePods is the maximum number/percentage
of pending/failed pods in a rack that are ignored while assessing
cluster stability. Pods identified using this value are not
considered part of the cluster. Additionally, in SC mode clusters,
these pods are removed from the roster. This is particularly
useful when some pods are stuck in pending/failed state due
to any scheduling issues and cannot be fixed by simply updating
the CR. It enables the operator to perform specific operations
on the cluster, like changing Aerospike configurations, without
being hindered by these problematic pods. Remember to set MaxIgnorablePods
back to 0 once the required operation is done. This makes sure
that later on, all pods are properly counted when evaluating
the cluster stability.
x-kubernetes-int-or-string: true
namespaces:
description: List of Aerospike namespaces for which rack feature
will be enabled
Expand Down Expand Up @@ -13330,6 +13348,24 @@ spec:
given configuration
nullable: true
properties:
maxIgnorablePods:
anyOf:
- type: integer
- type: string
description: MaxIgnorablePods is the maximum number/percentage
of pending/failed pods in a rack that are ignored while assessing
cluster stability. Pods identified using this value are not
considered part of the cluster. Additionally, in SC mode clusters,
these pods are removed from the roster. This is particularly
useful when some pods are stuck in pending/failed state due
to any scheduling issues and cannot be fixed by simply updating
the CR. It enables the operator to perform specific operations
on the cluster, like changing Aerospike configurations, without
being hindered by these problematic pods. Remember to set MaxIgnorablePods
back to 0 once the required operation is done. This makes sure
that later on, all pods are properly counted when evaluating
the cluster stability.
x-kubernetes-int-or-string: true
namespaces:
description: List of Aerospike namespaces for which rack feature
will be enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ spec:
cluster. Pods will be deployed in given racks based on given configuration
displayName: Rack Config
path: rackConfig
- description: RosterNodeBlockList is a list of blocked nodeIDs from roster
in a strong-consistency setup
displayName: Roster Node BlockList
path: rosterNodeBlockList
- description: SeedsFinderServices creates additional Kubernetes service that
allow clients to discover Aerospike cluster nodes.
displayName: Seeds Finder Services
Expand Down
43 changes: 19 additions & 24 deletions controllers/aero_info_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"

as "github.com/aerospike/aerospike-client-go/v6"
asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1"
Expand All @@ -29,24 +30,25 @@ import (
// Aerospike helper
// ------------------------------------------------------------------------------------

// waitForMultipleNodesSafeStopReady waits util the input pods is safe to stop,
// skipping pods that are not running and present in ignorablePods for stability check.
// The ignorablePods list should be a list of failed or pending pods that are going to be
// deleted eventually and are safe to ignore in stability checks.
// waitForMultipleNodesSafeStopReady waits until the input pods are safe to stop,
// skipping pods that are not running and present in ignorablePodNames for stability check.
// The ignorablePodNames is the list of failed or pending pods that are either::
// 1. going to be deleted eventually and are safe to ignore in stability checks
// 2. given in ignorePodList by the user and are safe to ignore in stability checks
func (r *SingleClusterReconciler) waitForMultipleNodesSafeStopReady(
pods []*corev1.Pod, ignorablePods []corev1.Pod,
pods []*corev1.Pod, ignorablePodNames sets.Set[string],
) reconcileResult {
if len(pods) == 0 {
return reconcileSuccess()
}

// Remove a node only if cluster is stable
if err := r.waitForAllSTSToBeReady(); err != nil {
if err := r.waitForAllSTSToBeReady(ignorablePodNames); err != nil {
return reconcileError(fmt.Errorf("failed to wait for cluster to be ready: %v", err))
}

// This doesn't make actual connection, only objects having connection info are created
allHostConns, err := r.newAllHostConnWithOption(ignorablePods)
allHostConns, err := r.newAllHostConnWithOption(ignorablePodNames)
if err != nil {
return reconcileError(fmt.Errorf("failed to get hostConn for aerospike cluster nodes: %v", err))
}
Expand All @@ -64,28 +66,28 @@ func (r *SingleClusterReconciler) waitForMultipleNodesSafeStopReady(
}

// Setup roster after migration.
if err = r.getAndSetRoster(policy, r.aeroCluster.Spec.RosterNodeBlockList, ignorablePods); err != nil {
if err = r.getAndSetRoster(policy, r.aeroCluster.Spec.RosterNodeBlockList, ignorablePodNames); err != nil {
r.Log.Error(err, "Failed to set roster for cluster")
return reconcileRequeueAfter(1)
}

if err := r.quiescePods(policy, allHostConns, pods, ignorablePods); err != nil {
if err := r.quiescePods(policy, allHostConns, pods, ignorablePodNames); err != nil {
return reconcileError(err)
}

return reconcileSuccess()
}

func (r *SingleClusterReconciler) quiescePods(
policy *as.ClientPolicy, allHostConns []*deployment.HostConn, pods []*corev1.Pod, ignorablePods []corev1.Pod,
policy *as.ClientPolicy, allHostConns []*deployment.HostConn, pods []*corev1.Pod, ignorablePodNames sets.Set[string],
) error {
podList := make([]corev1.Pod, 0, len(pods))

for idx := range pods {
podList = append(podList, *pods[idx])
}

selectedHostConns, err := r.newPodsHostConnWithOption(podList, ignorablePods)
selectedHostConns, err := r.newPodsHostConnWithOption(podList, ignorablePodNames)
if err != nil {
return err
}
Expand Down Expand Up @@ -173,15 +175,9 @@ func (r *SingleClusterReconciler) alumniReset(pod *corev1.Pod) error {
return asConn.AlumniReset(r.getClientPolicy())
}

func (r *SingleClusterReconciler) newAllHostConn() (
[]*deployment.HostConn, error,
) {
return r.newAllHostConnWithOption(nil)
}

// newAllHostConnWithOption returns connections to all pods in the cluster skipping pods that are not running and
// present in ignorablePods.
func (r *SingleClusterReconciler) newAllHostConnWithOption(ignorablePods []corev1.Pod) (
func (r *SingleClusterReconciler) newAllHostConnWithOption(ignorablePodNames sets.Set[string]) (
[]*deployment.HostConn, error,
) {
podList, err := r.getClusterPodList()
Expand All @@ -193,12 +189,12 @@ func (r *SingleClusterReconciler) newAllHostConnWithOption(ignorablePods []corev
return nil, fmt.Errorf("pod list empty")
}

return r.newPodsHostConnWithOption(podList.Items, ignorablePods)
return r.newPodsHostConnWithOption(podList.Items, ignorablePodNames)
}

// newPodsHostConnWithOption returns connections to all pods given skipping pods that are not running and
// present in ignorablePods.
func (r *SingleClusterReconciler) newPodsHostConnWithOption(pods, ignorablePods []corev1.Pod) (
func (r *SingleClusterReconciler) newPodsHostConnWithOption(pods []corev1.Pod, ignorablePodNames sets.Set[string]) (
[]*deployment.HostConn, error,
) {
hostConns := make([]*deployment.HostConn, 0, len(pods))
Expand All @@ -211,8 +207,7 @@ func (r *SingleClusterReconciler) newPodsHostConnWithOption(pods, ignorablePods

// Checking if all the container in the pod are ready or not
if !utils.IsPodRunningAndReady(pod) {
ignorablePod := utils.GetPod(pod.Name, ignorablePods)
if ignorablePod != nil {
if ignorablePodNames.Has(pod.Name) {
// This pod is not running and ignorable.
r.Log.Info(
"Ignoring info call on non-running pod ", "pod", pod.Name,
Expand Down Expand Up @@ -259,7 +254,7 @@ func hostID(hostName string, hostPort int) string {

func (r *SingleClusterReconciler) setMigrateFillDelay(
policy *as.ClientPolicy,
asConfig *asdbv1.AerospikeConfigSpec, setToZero bool, ignorablePods []corev1.Pod,
asConfig *asdbv1.AerospikeConfigSpec, setToZero bool, ignorablePodNames sets.Set[string],
) reconcileResult {
migrateFillDelay, err := asdbv1.GetMigrateFillDelay(asConfig)
if err != nil {
Expand All @@ -286,7 +281,7 @@ func (r *SingleClusterReconciler) setMigrateFillDelay(
}

// This doesn't make actual connection, only objects having connection info are created
allHostConns, err := r.newAllHostConnWithOption(ignorablePods)
allHostConns, err := r.newAllHostConnWithOption(ignorablePodNames)
if err != nil {
return reconcileError(
fmt.Errorf(
Expand Down
Loading

0 comments on commit a8fba60

Please sign in to comment.