Skip to content

Commit

Permalink
KO-198, KO-264: Migrate pods from the given list of k8s nodes (#265)
Browse files Browse the repository at this point in the history
* Added support for PDB and readiness

* Added support for k8sNodeBlockList

* Added Error phase in CR status
  • Loading branch information
abhishekdwivedi3060 authored Mar 4, 2024
1 parent 2dae26c commit be63a71
Show file tree
Hide file tree
Showing 25 changed files with 1,282 additions and 107 deletions.
9 changes: 8 additions & 1 deletion api/v1/aerospikecluster_mutating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"gomodules.xyz/jsonpatch/v2"
v1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
Expand Down Expand Up @@ -69,6 +70,12 @@ func (c *AerospikeCluster) Default(operation v1.Operation) admission.Response {
}

func (c *AerospikeCluster) setDefaults(asLog logr.Logger) error {
// Set maxUnavailable default to 1
if c.Spec.MaxUnavailable == nil {
maxUnavailable := intstr.FromInt(1)
c.Spec.MaxUnavailable = &maxUnavailable
}

// Set network defaults
c.Spec.AerospikeNetworkPolicy.setDefaults(c.ObjectMeta.Namespace)

Expand Down Expand Up @@ -504,7 +511,7 @@ func setDefaultNetworkConf(
)
}
// Override these sections
// TODO: These values lines will be replaces with runtime info by script in init-container
// TODO: These values lines will be replaced with runtime info by akoinit binary in init-container
// See if we can get better way to make template
serviceDefaults := map[string]interface{}{}
srvPort := GetServicePort(configSpec)
Expand Down
73 changes: 70 additions & 3 deletions api/v1/aerospikecluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,28 @@ import (
lib "github.com/aerospike/aerospike-management-lib"
)

// +kubebuilder:validation:Enum=InProgress;Completed;Error
type AerospikeClusterPhase string

// These are the valid phases of Aerospike cluster.
const (
// AerospikeClusterInProgress means the Aerospike cluster CR is being reconciled and operations are in-progress state.
// This phase denotes that changes are gradually rolling out to the cluster.
// For example, when the Aerospike server version is upgraded in CR, then InProgress phase is set until the upgrade
// is completed.
AerospikeClusterInProgress AerospikeClusterPhase = "InProgress"
// AerospikeClusterCompleted means the Aerospike cluster CR has been reconciled. This phase denotes that the cluster
// has been deployed/upgraded successfully and is ready to use.
// For example, when the Aerospike server version is upgraded in CR, then Completed phase is set after the upgrade is
// completed.
AerospikeClusterCompleted AerospikeClusterPhase = "Completed"
// AerospikeClusterError means the Aerospike cluster operation is in error state because of some reason like
// misconfiguration, infra issues, etc.
// For example, when the Aerospike server version is upgraded in CR, then Error phase is set if the upgrade fails
// due to the wrong image issue, etc.
AerospikeClusterError AerospikeClusterPhase = "Error"
)

// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

// AerospikeClusterSpec defines the desired state of AerospikeCluster
Expand All @@ -37,6 +59,11 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability
// Aerospike cluster size
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Cluster Size"
Size int32 `json:"size"`
// MaxUnavailable is the percentage/number of pods that can be allowed to go down or unavailable before application
// disruption. This value is used to create PodDisruptionBudget. Defaults to 1.
// Refer Aerospike documentation for more details.
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Max Unavailable"
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`
// Aerospike server image
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Server Image"
Image string `json:"image"`
Expand Down Expand Up @@ -74,6 +101,12 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability
// RosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Roster Node BlockList"
RosterNodeBlockList []string `json:"rosterNodeBlockList,omitempty"`
// K8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods. Pods are not scheduled on
// these nodes. Pods are migrated from these nodes if already present. This is useful for the maintenance of
// Kubernetes nodes.
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Kubernetes Node BlockList"
// +kubebuilder:validation:MinItems:=1
K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"`
}

type SeedsFinderServices struct {
Expand Down Expand Up @@ -564,9 +597,12 @@ type AerospikeStorageSpec struct { //nolint:govet // for readability
// BlockVolumePolicy contains default policies for block volumes.
BlockVolumePolicy AerospikePersistentVolumePolicySpec `json:"blockVolumePolicy,omitempty"`

// CleanupThreads contains maximum number of cleanup threads(dd or blkdiscard) per init container.
// CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container.
CleanupThreads int `json:"cleanupThreads,omitempty"`

// LocalStorageClasses contains a list of storage classes which provisions local volumes.
LocalStorageClasses []string `json:"localStorageClasses,omitempty"`

// Volumes list to attach to created pods.
// +patchMergeKey=name
// +patchStrategy=merge
Expand All @@ -578,10 +614,12 @@ type AerospikeStorageSpec struct { //nolint:govet // for readability
// AerospikeClusterStatusSpec captures the current status of the cluster.
type AerospikeClusterStatusSpec struct { //nolint:govet // for readability
// Aerospike cluster size
// +operator-sdk:csv:customresourcedefinitions:type=status,displayName="Cluster Size"
Size int32 `json:"size,omitempty"`
// Aerospike server image
Image string `json:"image,omitempty"`
// MaxUnavailable is the percentage/number of pods that can be allowed to go down or unavailable before application
// disruption. This value is used to create PodDisruptionBudget. Defaults to 1.
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`
// If set true then multiple pods can be created per Kubernetes Node.
// This will create a NodePort service for each Pod.
// NodePort, as the name implies, opens a specific port on all the Kubernetes Nodes ,
Expand Down Expand Up @@ -627,6 +665,8 @@ type AerospikeClusterStatusSpec struct { //nolint:govet // for readability
SeedsFinderServices SeedsFinderServices `json:"seedsFinderServices,omitempty"`
// RosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup
RosterNodeBlockList []string `json:"rosterNodeBlockList,omitempty"`
// K8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods.
K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"`
}

// AerospikeClusterStatus defines the observed state of AerospikeCluster
Expand All @@ -646,7 +686,11 @@ type AerospikeClusterStatus struct { //nolint:govet // for readability
// This is map instead of the conventional map as list convention to allow each pod to patch update its own
// status. The map key is the name of the pod.
// +patchStrategy=strategic
// +optional
Pods map[string]AerospikePodStatus `json:"pods" patchStrategy:"strategic"`

// Phase denotes the current phase of Aerospike cluster operation.
Phase AerospikeClusterPhase `json:"phase,omitempty"`
}

// AerospikeNetworkType specifies the type of network address to use.
Expand Down Expand Up @@ -834,9 +878,10 @@ type AerospikePodStatus struct { //nolint:govet // for readability
// +kubebuilder:storageversion
// +kubebuilder:printcolumn:name="Size",type=string,JSONPath=`.spec.size`
// +kubebuilder:printcolumn:name="Image",type=string,JSONPath=`.spec.image`
// +kubebuilder:printcolumn:name="MultiPodPerHost",type=boolean,JSONPath=`.spec.podSpec.MultiPodPerHost`
// +kubebuilder:printcolumn:name="MultiPodPerHost",type=boolean,JSONPath=`.spec.podSpec.multiPodPerHost`
// +kubebuilder:printcolumn:name="HostNetwork",type=boolean,JSONPath=`.spec.podSpec.hostNetwork`
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// +kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase"

// AerospikeCluster is the schema for the AerospikeCluster API
// +operator-sdk:csv:customresourcedefinitions:displayName="Aerospike Cluster",resources={{Service, v1},{Pod,v1},{StatefulSet,v1}}
Expand Down Expand Up @@ -870,6 +915,7 @@ func CopySpecToStatus(spec *AerospikeClusterSpec) (*AerospikeClusterStatusSpec,

status.Size = spec.Size
status.Image = spec.Image
status.MaxUnavailable = spec.MaxUnavailable

// Storage
statusStorage := AerospikeStorageSpec{}
Expand Down Expand Up @@ -950,6 +996,16 @@ func CopySpecToStatus(spec *AerospikeClusterSpec) (*AerospikeClusterStatusSpec,
status.RosterNodeBlockList = rosterNodeBlockList
}

if len(spec.K8sNodeBlockList) != 0 {
var k8sNodeBlockList []string

lib.DeepCopy(
&k8sNodeBlockList, &spec.K8sNodeBlockList,
)

status.K8sNodeBlockList = k8sNodeBlockList
}

return &status, nil
}

Expand All @@ -959,6 +1015,7 @@ func CopyStatusToSpec(status *AerospikeClusterStatusSpec) (*AerospikeClusterSpec

spec.Size = status.Size
spec.Image = status.Image
spec.MaxUnavailable = status.MaxUnavailable

// Storage
specStorage := AerospikeStorageSpec{}
Expand Down Expand Up @@ -1040,5 +1097,15 @@ func CopyStatusToSpec(status *AerospikeClusterStatusSpec) (*AerospikeClusterSpec
spec.RosterNodeBlockList = rosterNodeBlockList
}

if len(status.K8sNodeBlockList) != 0 {
var k8sNodeBlockList []string

lib.DeepCopy(
&k8sNodeBlockList, &status.K8sNodeBlockList,
)

spec.K8sNodeBlockList = k8sNodeBlockList
}

return &spec, nil
}
59 changes: 59 additions & 0 deletions api/v1/aerospikecluster_validating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ func (c *AerospikeCluster) validate(aslog logr.Logger) error {
return fmt.Errorf("invalid cluster size 0")
}

// Validate MaxUnavailable for PodDisruptionBudget
if err := c.validateMaxUnavailable(); err != nil {
return err
}

// Validate Image version
version, err := GetImageVersion(c.Spec.Image)
if err != nil {
Expand Down Expand Up @@ -2171,3 +2176,57 @@ func validateIntOrStringField(value *intstr.IntOrString, fieldPath string) error

return nil
}

func (c *AerospikeCluster) validateMaxUnavailable() error {
// safe check for corner cases when mutation webhook somehow didn't work
if c.Spec.MaxUnavailable == nil {
return fmt.Errorf("maxUnavailable cannot be nil. Mutation webhook didn't work")
}

if err := validateIntOrStringField(c.Spec.MaxUnavailable, "spec.maxUnavailable"); err != nil {
return err
}

safeMaxUnavailable := int(c.Spec.Size)

// If Size is 1, then ignore it for maxUnavailable calculation as it will anyway result in data loss
if safeMaxUnavailable == 1 {
return nil
}

for idx := range c.Spec.RackConfig.Racks {
rack := &c.Spec.RackConfig.Racks[idx]
nsList := rack.AerospikeConfig.Value["namespaces"].([]interface{})

for _, nsInterface := range nsList {
rfInterface, exists := nsInterface.(map[string]interface{})["replication-factor"]
if !exists {
// Default RF is 2 if not given
safeMaxUnavailable = 2
continue
}

rf, err := GetIntType(rfInterface)
if err != nil {
return fmt.Errorf("namespace replication-factor %v", err)
}

// If RF is 1, then ignore it for maxUnavailable calculation as it will anyway result in data loss
if rf == 1 {
continue
}

if rf < safeMaxUnavailable {
safeMaxUnavailable = rf
}
}
}

if c.Spec.MaxUnavailable.IntValue() >= safeMaxUnavailable {
return fmt.Errorf("maxUnavailable %s cannot be greater than or equal to %v as it may result in "+
"data loss. Set it to a lower value",
c.Spec.MaxUnavailable.String(), safeMaxUnavailable)
}

return nil
}
25 changes: 25 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit be63a71

Please sign in to comment.