Skip to content

Commit

Permalink
Allowing only single operation at a time.
Browse files Browse the repository at this point in the history
  • Loading branch information
tanmayja committed Jun 10, 2024
1 parent d92b6f2 commit 96327d6
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 251 deletions.
21 changes: 21 additions & 0 deletions api/v1/aerospikecluster_mutating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ func (c *AerospikeCluster) setDefaults(asLog logr.Logger) error {
return err
}

// Set defaults for user specified operations
// Populate operation id if not present
if err := setDefaultOperation(&c.Spec.Operations); err != nil {
return err
}

// Update racks configuration using global values where required.
if err := c.updateRacks(asLog); err != nil {
return err
Expand Down Expand Up @@ -849,3 +855,18 @@ func setNamespaceDefault(networks []string, namespace string) {
networks[idx] = netName
}
}

func setDefaultOperation(operations *[]OperationSpec) error {
for i := range *operations {
if (*operations)[i].OperationID == "" {
id, err := randomString(5)
if err != nil {
return err
}

(*operations)[i].OperationID = id
}
}

return nil
}
12 changes: 7 additions & 5 deletions api/v1/aerospikecluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability
K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"`
// Operations is a list of on demand operation to be performed on the Aerospike cluster.
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Operations"
Operations []OperationSpec `json:"operation,omitempty"`
Operations []OperationSpec `json:"operations,omitempty"`
}

type OperationType string
Expand All @@ -143,9 +143,11 @@ type OperationSpec struct {
// OperationType is the type of operation to be performed on the Aerospike cluster.
// +kubebuilder:validation:Enum=quickRestart;podRestart
OperationType OperationType `json:"operationType"`
PodList []string `json:"podList,omitempty"`
// +kubebuilder:validation:Minimum=1
OperationID int `json:"operationID"`
// +kubebuilder:validation:MaxLength=5
// +kubebuilder:validation:MinLength=5
// +optional
OperationID string `json:"operationID"`
PodList []string `json:"podList,omitempty"`
}

type SeedsFinderServices struct {
Expand Down Expand Up @@ -722,7 +724,7 @@ type AerospikeClusterStatusSpec struct { //nolint:govet // for readability
// K8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods.
K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"`
// Operations is a list of on demand operation to be performed on the Aerospike cluster.
Operations []OperationSpec `json:"operation,omitempty"`
Operations []OperationSpec `json:"operations,omitempty"`
}

// AerospikeClusterStatus defines the observed state of AerospikeCluster
Expand Down
54 changes: 19 additions & 35 deletions api/v1/aerospikecluster_validating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,43 +274,31 @@ func (c *AerospikeCluster) validate(aslog logr.Logger) error {
}

func (c *AerospikeCluster) validateOperation() error {
if c.Status.AerospikeConfig == nil && len(c.Spec.Operations) != 0 {
return fmt.Errorf("operation cannot be set on create")
// Nothing to validate if no operation
if len(c.Spec.Operations) == 0 {
return nil
}

allPodNames := GetAllPodNames(c.Name, c.Spec.Size, c.Spec.RackConfig.Racks)

for idx := range c.Spec.Operations {
if ContainsString(c.Spec.Operations[idx].PodList, AllPods) {
if len(c.Spec.Operations[idx].PodList) > 1 {
return fmt.Errorf("cannot specify ALL and other pods in operation")
}
} else {
podSet := sets.NewString(c.Spec.Operations[idx].PodList...)
if !allPodNames.IsSuperset(sets.Set[string](podSet)) {
return fmt.Errorf("invalid pod name in operation")
}
}
if c.Status.AerospikeConfig == nil {
return fmt.Errorf("operation cannot be set on create")
}

quickRestarts, podRestarts, err := PodsToRestart(c.Spec.Operations, c.Status.Operations, allPodNames)
if err != nil {
return err
if len(c.Spec.Operations) > 1 {
return fmt.Errorf("only one operation can be set at a time")
}

invalidPods := quickRestarts.Intersection(podRestarts)
if invalidPods.Len() > 0 {
return fmt.Errorf(
"pods %v cannot be part of both quick restart and pod restart operations",
invalidPods,
)
allPodNames := GetAllPodNames(c.Name, c.Spec.Size, c.Spec.RackConfig.Racks)

podSet := sets.NewString(c.Spec.Operations[0].PodList...)
if !allPodNames.IsSuperset(sets.Set[string](podSet)) {
return fmt.Errorf("invalid pod name in operation")
}

// Don't allow any operation along with cluster scale up or racks added or removed
// New pods won't be available for operation
if len(quickRestarts)+len(podRestarts) > 0 &&
if !reflect.DeepEqual(c.Spec.Operations, c.Status.Operations) &&
(c.Spec.Size > c.Status.Size || len(c.Spec.RackConfig.Racks) != len(c.Status.RackConfig.Racks)) {
return fmt.Errorf("cannot perform operation along with cluster scale up")
return fmt.Errorf("cannot perform any on demand operation along with cluster scale up or racks added or removed")
}

return nil
Expand Down Expand Up @@ -2415,7 +2403,7 @@ func (c *AerospikeCluster) validateEnableDynamicConfigUpdate() error {

func validateOperationUpdate(oldOp, newOp *[]OperationSpec) error {
// Define a key extractor function
keyExtractor := func(op OperationSpec) int {
keyExtractor := func(op OperationSpec) string {
return op.OperationID
}

Expand All @@ -2425,15 +2413,11 @@ func validateOperationUpdate(oldOp, newOp *[]OperationSpec) error {
return err
}

newOpMap, err := ConvertToMap(*newOp, keyExtractor)
if err != nil {
return err
}

for key, value := range newOpMap {
for idx := range *newOp {
key := (*newOp)[idx].OperationID
if _, ok := oldOpMap[key]; ok {
if oldOpMap[key].OperationType != value.OperationType {
return fmt.Errorf("operation type of existing operation %d cannot be updated", key)
if oldOpMap[key].OperationType != (*newOp)[idx].OperationType {
return fmt.Errorf("operation type of existing operation %s cannot be updated", key)
}
}
}
Expand Down
96 changes: 61 additions & 35 deletions api/v1/utils.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package v1

import (
"crypto/rand"
"errors"
"fmt"
"math/big"
"os"
"reflect"
"regexp"
Expand Down Expand Up @@ -34,8 +36,6 @@ const (
FabricPortName = "fabric"

InfoPortName = "info"

AllPods = "ALL"
)

const (
Expand Down Expand Up @@ -592,13 +592,13 @@ func DistributeItems(totalItems, totalGroups int) []int {
return topology
}

func ConvertToMap[T any](items []T, keyExtractor func(T) int) (map[int]T, error) {
itemMap := make(map[int]T)
func ConvertToMap[T any](items []T, keyExtractor func(T) string) (map[string]T, error) {
itemMap := make(map[string]T)

for _, item := range items {
key := keyExtractor(item)
if _, ok := itemMap[key]; ok {
return nil, fmt.Errorf("duplicate key %d", key)
return nil, fmt.Errorf("duplicate key %s", key)
}

itemMap[key] = item
Expand Down Expand Up @@ -627,13 +627,14 @@ func PodsToRestart(specOps, statusOps []OperationSpec, allPodNames sets.Set[stri
quickRestarts = make(sets.Set[string])
podRestarts = make(sets.Set[string])

// If no spec operations, no pods to restart
// If the Spec.Operations and Status.Operations are equal, no pods to restart.
if reflect.DeepEqual(specOps, statusOps) {
if len(specOps) == 0 || reflect.DeepEqual(specOps, statusOps) {
return quickRestarts, podRestarts, nil
}

// Define a key extractor function
keyExtractor := func(op OperationSpec) int {
keyExtractor := func(op OperationSpec) string {
return op.OperationID
}

Expand All @@ -642,41 +643,66 @@ func PodsToRestart(specOps, statusOps []OperationSpec, allPodNames sets.Set[stri
return quickRestarts, podRestarts, err
}

for _, specOp := range specOps {
// If no pod list is provided, it indicates that no pods need to be restarted.
if len(specOp.PodList) == 0 {
continue
}

var (
podsToRestart, specPods sets.Set[string]
)
// Assuming only one operation is present in the spec.
specOp := specOps[0]
// If the operation is not a quick restart or pod restart, no pods need to be restarted.
if specOp.OperationType != OperationQuickRestart && specOp.OperationType != OperationPodRestart {
return quickRestarts, podRestarts, nil
}

if specOp.PodList[0] == AllPods {
specPods = allPodNames
var (
podsToRestart, specPods sets.Set[string]
)
// If no pod list is provided, it indicates that all pods need to be restarted.
if len(specOp.PodList) == 0 {
specPods = allPodNames
} else {
specPods = sets.New[string](specOp.PodList...)
}

// If the operation is not present in the status, all pods need to be restarted.
// If the operation is present in the status, only the pods that are not present in the status need to be restarted.
// If the operation is present in the status and podList is empty, no pods need to be restarted.
if statusOp, exists := statusOpsMap[specOp.OperationID]; !exists {
podsToRestart = specPods
} else {
var statusPods sets.Set[string]
if len(statusOp.PodList) == 0 {
statusPods = allPodNames
} else {
specPods = sets.New[string](specOp.PodList...)
statusPods = sets.New[string](statusOp.PodList...)
}

// If the operation is not present in the status, all pods need to be restarted.
// If the operation is present in the status, only the pods that are not present in the status need to be restarted.
// If the operation is present in the status and all podList has ALL pods, no pods need to be restarted.
if _, exists := statusOpsMap[specOp.OperationID]; !exists || len(statusOpsMap[specOp.OperationID].PodList) == 0 {
podsToRestart = specPods
} else if statusOpsMap[specOp.OperationID].PodList[0] != AllPods {
podsToRestart = specPods.Difference(sets.New[string](statusOpsMap[specOp.OperationID].PodList...))
}
podsToRestart = specPods.Difference(statusPods)
}

// Separate pods to be restarted based on operation type
if podsToRestart != nil && podsToRestart.Len() > 0 {
switch specOp.OperationType {
case OperationQuickRestart:
quickRestarts.Insert(podsToRestart.UnsortedList()...)
case OperationPodRestart:
podRestarts.Insert(podsToRestart.UnsortedList()...)
}
// Separate pods to be restarted based on operation type
if podsToRestart != nil && podsToRestart.Len() > 0 {
switch specOp.OperationType {
case OperationQuickRestart:
quickRestarts.Insert(podsToRestart.UnsortedList()...)
case OperationPodRestart:
podRestarts.Insert(podsToRestart.UnsortedList()...)
}
}

return quickRestarts, podRestarts, nil
}

// charset contains the characters to use for the random string
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

// randomString generates a random string of length n
func randomString(n int) (string, error) {
b := make([]byte, n)
for i := range b {
num, err := rand.Int(rand.Reader, big.NewInt(int64(len(charset))))
if err != nil {
return "", err
}

b[i] = charset[num.Int64()]
}

return string(b), nil
}
16 changes: 8 additions & 8 deletions config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,15 @@ spec:
This value is used to create PodDisruptionBudget. Defaults to 1.
Refer Aerospike documentation for more details.
x-kubernetes-int-or-string: true
operation:
operations:
description: Operations is a list of on demand operation to be performed
on the Aerospike cluster.
items:
properties:
operationID:
minimum: 1
type: integer
maxLength: 5
minLength: 5
type: string
operationType:
description: OperationType is the type of operation to be performed
on the Aerospike cluster.
Expand All @@ -332,7 +333,6 @@ spec:
type: string
type: array
required:
- operationID
- operationType
type: object
type: array
Expand Down Expand Up @@ -9675,14 +9675,15 @@ spec:
is the port requested by the user. Deprecated: MultiPodPerHost is
now part of podSpec"
type: boolean
operation:
operations:
description: Operations is a list of on demand operation to be performed
on the Aerospike cluster.
items:
properties:
operationID:
minimum: 1
type: integer
maxLength: 5
minLength: 5
type: string
operationType:
description: OperationType is the type of operation to be performed
on the Aerospike cluster.
Expand All @@ -9695,7 +9696,6 @@ spec:
type: string
type: array
required:
- operationID
- operationType
type: object
type: array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ spec:
- description: Operations is a list of on demand operation to be performed on
the Aerospike cluster.
displayName: Operations
path: operation
path: operations
- description: Certificates to connect to Aerospike.
displayName: Operator Client Cert
path: operatorClientCert
Expand Down
Loading

0 comments on commit 96327d6

Please sign in to comment.