Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KO-198: Added readiness probe and pod disruption budget support #254

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion api/v1/aerospikecluster_mutating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"gomodules.xyz/jsonpatch/v2"
v1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
Expand Down Expand Up @@ -69,6 +70,12 @@ func (c *AerospikeCluster) Default(operation v1.Operation) admission.Response {
}

func (c *AerospikeCluster) setDefaults(asLog logr.Logger) error {
// Set maxUnavailable default to 1
if c.Spec.MaxUnavailable == nil {
maxUnavailable := intstr.FromInt(1)
c.Spec.MaxUnavailable = &maxUnavailable
}

// Set network defaults
c.Spec.AerospikeNetworkPolicy.setDefaults(c.ObjectMeta.Namespace)

Expand Down Expand Up @@ -504,7 +511,7 @@ func setDefaultNetworkConf(
)
}
// Override these sections
// TODO: These values lines will be replaces with runtime info by script in init-container
// TODO: These values lines will be replaced with runtime info by script in init-container
// See if we can get better way to make template
serviceDefaults := map[string]interface{}{}
srvPort := GetServicePort(configSpec)
Expand Down
10 changes: 9 additions & 1 deletion api/v1/aerospikecluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability
// Aerospike cluster size
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Cluster Size"
Size int32 `json:"size"`
// MaxUnavailable is the percentage/number of pods that can be allowed to go down or unavailable before application
// disruption. This value is used to create PodDisruptionBudget. Defaults to 1.
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Max Unavailable"
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`
// Aerospike server image
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Server Image"
Image string `json:"image"`
Expand Down Expand Up @@ -566,10 +570,12 @@ type AerospikeStorageSpec struct { //nolint:govet // for readability
// AerospikeClusterStatusSpec captures the current status of the cluster.
type AerospikeClusterStatusSpec struct { //nolint:govet // for readability
// Aerospike cluster size
// +operator-sdk:csv:customresourcedefinitions:type=status,displayName="Cluster Size"
Size int32 `json:"size,omitempty"`
// Aerospike server image
Image string `json:"image,omitempty"`
// MaxUnavailable is the percentage/number of pods that can be allowed to go down or unavailable before application
// disruption. This value is used to create PodDisruptionBudget. Defaults to 1.
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`
// If set true then multiple pods can be created per Kubernetes Node.
// This will create a NodePort service for each Pod.
// NodePort, as the name implies, opens a specific port on all the Kubernetes Nodes ,
Expand Down Expand Up @@ -857,6 +863,7 @@ func CopySpecToStatus(spec *AerospikeClusterSpec) (*AerospikeClusterStatusSpec,

status.Size = spec.Size
status.Image = spec.Image
status.MaxUnavailable = spec.MaxUnavailable

// Storage
statusStorage := AerospikeStorageSpec{}
Expand Down Expand Up @@ -946,6 +953,7 @@ func CopyStatusToSpec(status *AerospikeClusterStatusSpec) (*AerospikeClusterSpec

spec.Size = status.Size
spec.Image = status.Image
spec.MaxUnavailable = status.MaxUnavailable

// Storage
specStorage := AerospikeStorageSpec{}
Expand Down
95 changes: 80 additions & 15 deletions api/v1/aerospikecluster_validating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ func (c *AerospikeCluster) validate(aslog logr.Logger) error {
return fmt.Errorf("invalid cluster size 0")
}

// Validate MaxUnavailable for PodDisruptionBudget
if err := c.validateMaxUnavailable(); err != nil {
return err
}

// Validate Image version
version, err := GetImageVersion(c.Spec.Image)
if err != nil {
Expand Down Expand Up @@ -604,24 +609,11 @@ func (c *AerospikeCluster) validateRackConfig(_ logr.Logger) error {

// Validate batch upgrade/restart param
if c.Spec.RackConfig.RollingUpdateBatchSize != nil {
// Just validate if RollingUpdateBatchSize is valid number or string.
randomNumber := 100

count, err := intstr.GetScaledValueFromIntOrPercent(
c.Spec.RackConfig.RollingUpdateBatchSize, randomNumber, false,
)
if err != nil {
if err := validateIntOrStringField(c.Spec.RackConfig.RollingUpdateBatchSize,
"spec.rackConfig.rollingUpdateBatchSize"); err != nil {
return err
}

// Only negative is not allowed. Any big number can be given.
if count < 0 {
return fmt.Errorf(
"can not use negative rackConfig.RollingUpdateBatchSize %s",
c.Spec.RackConfig.RollingUpdateBatchSize.String(),
)
}

if len(c.Spec.RackConfig.Racks) < 2 {
return fmt.Errorf("can not use rackConfig.RollingUpdateBatchSize when number of racks is less than two")
}
Expand Down Expand Up @@ -2175,3 +2167,76 @@ func (c *AerospikeCluster) validateNetworkPolicy(namespace string) error {

return nil
}

func validateIntOrStringField(intOrStr *intstr.IntOrString, fieldPath string) error {
randomNumber := 100
// Just validate if MaxUnavailable is valid number or string.
count, err := intstr.GetScaledValueFromIntOrPercent(intOrStr, randomNumber, false)
if err != nil {
return err
}

// Only negative is not allowed. Any big number can be given.
if count < 0 {
return fmt.Errorf("can not use negative %s: %s", fieldPath, intOrStr.String())
}

if intOrStr.Type == intstr.String && count > 100 {
return fmt.Errorf("%s: %s must not be greater than 100 percent", fieldPath, intOrStr.String())
}

return nil
}

func (c *AerospikeCluster) validateMaxUnavailable() error {
// safe check for corner cases when mutation webhook somehow didn't work
if c.Spec.MaxUnavailable == nil {
return fmt.Errorf("maxUnavailable cannot be nil. Mutation webhook didn't work")
}

if err := validateIntOrStringField(c.Spec.MaxUnavailable, "spec.maxUnavailable"); err != nil {
return err
}

maxUnavailable := int(c.Spec.Size)

// If Size is 1, then ignore it for maxUnavailable calculation as it will anyway result in data loss
if maxUnavailable == 1 {
return nil
}

for idx := range c.Spec.RackConfig.Racks {
rack := &c.Spec.RackConfig.Racks[idx]
nsList := rack.AerospikeConfig.Value["namespaces"].([]interface{})

for _, nsInterface := range nsList {
rfInterface, exists := nsInterface.(map[string]interface{})["replication-factor"]
if !exists {
// Default RF is 2 if not given
maxUnavailable = 2
continue
}

rf, err := GetIntType(rfInterface)
if err != nil {
return fmt.Errorf("namespace replication-factor %v", err)
}

// If RF is 1, then ignore it for maxUnavailable calculation as it will anyway result in data loss
if rf == 1 {
continue
}

if rf < maxUnavailable {
maxUnavailable = rf
}
}
}

if c.Spec.MaxUnavailable.IntValue() >= maxUnavailable {
return fmt.Errorf("maxUnavailable %s cannot be greater than or equal to %v",
c.Spec.MaxUnavailable.String(), maxUnavailable)
}

return nil
}
10 changes: 10 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,14 @@ spec:
image:
description: Aerospike server image
type: string
maxUnavailable:
anyOf:
- type: integer
- type: string
description: MaxUnavailable is the percentage/number of pods that
can be allowed to go down or unavailable before application disruption.
This value is used to create PodDisruptionBudget. Defaults to 1.
x-kubernetes-int-or-string: true
operatorClientCert:
description: Certificates to connect to Aerospike.
properties:
Expand Down Expand Up @@ -8830,6 +8838,14 @@ spec:
image:
description: Aerospike server image
type: string
maxUnavailable:
anyOf:
- type: integer
- type: string
description: MaxUnavailable is the percentage/number of pods that
can be allowed to go down or unavailable before application disruption.
This value is used to create PodDisruptionBudget. Defaults to 1.
x-kubernetes-int-or-string: true
multiPodPerHost:
description: "If set true then multiple pods can be created per Kubernetes
Node. This will create a NodePort service for each Pod. NodePort,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ spec:
- description: Aerospike server image
displayName: Server Image
path: image
- description: MaxUnavailable is the percentage/number of pods that can be allowed
to go down or unavailable before application disruption. This value is used
to create PodDisruptionBudget. Defaults to 1.
displayName: Max Unavailable
path: maxUnavailable
- description: Certificates to connect to Aerospike.
displayName: Operator Client Cert
path: operatorClientCert
Expand All @@ -74,10 +79,6 @@ spec:
resource.
displayName: Validation Policy
path: validationPolicy
statusDescriptors:
- description: Aerospike cluster size
displayName: Cluster Size
path: size
version: v1
- description: AerospikeCluster is the schema for the AerospikeCluster API
displayName: Aerospike Cluster
Expand Down
9 changes: 9 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,12 @@ rules:
- patch
- update
- watch
- apiGroups:
- policy
resources:
- poddisruptionbudgets
verbs:
- create
- get
- patch
- update
31 changes: 31 additions & 0 deletions controllers/aerospikecluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
k8sRuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -37,6 +40,8 @@ var (
}
)

var PDBbGvk = policyv1beta1.SchemeGroupVersion.WithKind("PodDisruptionBudget")

// AerospikeClusterReconciler reconciles AerospikeClusters
type AerospikeClusterReconciler struct {
client.Client
Expand All @@ -49,6 +54,10 @@ type AerospikeClusterReconciler struct {

// SetupWithManager sets up the controller with the Manager
func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
if err := r.setupPdbAPI(r.KubeConfig); err != nil {
return err
}

return ctrl.NewControllerManagedBy(mgr).
For(&asdbv1.AerospikeCluster{}).
Owns(
Expand All @@ -72,6 +81,27 @@ func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

// setupPdbAPI sets up the pdb api version to use as per the k8s version.
// TODO: Move to v1 when minimum supported k8s version is 1.21
func (r *AerospikeClusterReconciler) setupPdbAPI(config *rest.Config) error {
discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(config)

resources, err := discoveryClient.ServerResourcesForGroupVersion("policy/v1")
if err != nil {
r.Log.Info("Could not get ServerResourcesForGroupVersion for policy/v1, falling back to policy/v1beta1")
return nil
}

for i := range resources.APIResources {
if resources.APIResources[i].Kind == "PodDisruptionBudget" {
PDBbGvk = policyv1.SchemeGroupVersion.WithKind("PodDisruptionBudget")
return nil
}
}

return nil
}

// RackState contains the rack configuration and rack size.
type RackState struct {
Rack *asdbv1.Rack
Expand All @@ -86,6 +116,7 @@ type RackState struct {
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=get;create;update;patch
//nolint:lll // marker
// +kubebuilder:rbac:groups=asdb.aerospike.com,resources=aerospikeclusters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=asdb.aerospike.com,resources=aerospikeclusters/status,verbs=get;update;patch
Expand Down
7 changes: 7 additions & 0 deletions controllers/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ func (r *SingleClusterReconciler) getRollingRestartTypePod(
r.Log.Info("Aerospike rack storage changed. Need rolling restart")
}

if r.isReadinessPortUpdated(pod) {
restartType = mergeRestartType(restartType, podRestart)

r.Log.Info("Aerospike readiness tcp port changed. Need rolling restart",
"newTCPPort", r.getReadinessProbe().TCPSocket.String())
}

return restartType
}

Expand Down
Loading