Skip to content

Commit

Permalink
implementing ALL in podlist
Browse files Browse the repository at this point in the history
  • Loading branch information
tanmayja committed Jun 8, 2024
1 parent 0062ae6 commit d92b6f2
Show file tree
Hide file tree
Showing 14 changed files with 706 additions and 188 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ go-lint: golanci-lint ## Run golangci-lint against code.
.PHONY: test
test: manifests generate fmt vet envtest ## Run tests.
# KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test ./... -coverprofile cover.out
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" cd $(shell pwd)/test; go run github.com/onsi/ginkgo/v2/ginkgo -coverprofile cover.out -progress -v -timeout=12h0m0s -focus=${FOCUS} --junit-report="junit.xml" -- ${ARGS}
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" cd $(shell pwd)/test; go run github.com/onsi/ginkgo/v2/ginkgo -coverprofile cover.out -show-node-events -v -timeout=12h0m0s -focus=${FOCUS} --junit-report="junit.xml" -- ${ARGS}

##@ Build

Expand Down
4 changes: 0 additions & 4 deletions api/v1/aerospikecluster_mutating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,6 @@ func (c *AerospikeCluster) setDefaults(asLog logr.Logger) error {
return err
}

// Set defaults for user specified operations
// Populate all possible pod names in pod list if given empty
c.setDefaultOperation()

// Update racks configuration using global values where required.
if err := c.updateRacks(asLog); err != nil {
return err
Expand Down
49 changes: 11 additions & 38 deletions api/v1/aerospikecluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ limitations under the License.
package v1

import (
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -127,9 +124,9 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Kubernetes Node BlockList"
// +kubebuilder:validation:MinItems:=1
K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"`
// Operation is a list of on demand operation to be performed on the Aerospike cluster.
// Operations is a list of on demand operation to be performed on the Aerospike cluster.
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Operations"
Operation []OperationSpec `json:"operation,omitempty"`
Operations []OperationSpec `json:"operation,omitempty"`
}

type OperationType string
Expand All @@ -146,7 +143,7 @@ type OperationSpec struct {
// OperationType is the type of operation to be performed on the Aerospike cluster.
// +kubebuilder:validation:Enum=quickRestart;podRestart
OperationType OperationType `json:"operationType"`
PodList []string `json:"podList"`
PodList []string `json:"podList,omitempty"`
// +kubebuilder:validation:Minimum=1
OperationID int `json:"operationID"`
}
Expand Down Expand Up @@ -724,8 +721,8 @@ type AerospikeClusterStatusSpec struct { //nolint:govet // for readability
RosterNodeBlockList []string `json:"rosterNodeBlockList,omitempty"`
// K8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods.
K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"`
// Operation is a list of on demand operation to be performed on the Aerospike cluster.
Operation []OperationSpec `json:"operation,omitempty"`
// Operations is a list of on demand operation to be performed on the Aerospike cluster.
Operations []OperationSpec `json:"operation,omitempty"`
}

// AerospikeClusterStatus defines the observed state of AerospikeCluster
Expand Down Expand Up @@ -962,30 +959,6 @@ type AerospikeCluster struct { //nolint:govet // for readability
Status AerospikeClusterStatus `json:"status,omitempty"`
}

func (c *AerospikeCluster) setDefaultOperation() {
for i := range c.Spec.Operation {
if len(c.Spec.Operation[i].PodList) == 0 {
c.Spec.Operation[i].PodList = c.getAllPodNames()
}
}
}

func (c *AerospikeCluster) getAllPodNames() []string {
podNames := make([]string, 0, c.Spec.Size)
topology := DistributeItems(
int(c.Spec.Size), len(c.Spec.RackConfig.Racks),
)

racks := c.Spec.RackConfig.Racks
for idx := range racks {
for i := 0; i < topology[idx]; i++ {
podNames = append(podNames, fmt.Sprintf("%s-%s-%d", c.Name, strconv.Itoa(racks[idx].ID), i))
}
}

return podNames
}

// +kubebuilder:object:root=true

// AerospikeClusterList contains a list of AerospikeCluster
Expand Down Expand Up @@ -1095,10 +1068,10 @@ func CopySpecToStatus(spec *AerospikeClusterSpec) (*AerospikeClusterStatusSpec,
status.K8sNodeBlockList = *k8sNodeBlockList
}

if len(spec.Operation) != 0 {
operation := lib.DeepCopy(&spec.Operation).(*[]OperationSpec)
if len(spec.Operations) != 0 {
operation := lib.DeepCopy(&spec.Operations).(*[]OperationSpec)

status.Operation = *operation
status.Operations = *operation
}

return &status, nil
Expand Down Expand Up @@ -1200,9 +1173,9 @@ func CopyStatusToSpec(status *AerospikeClusterStatusSpec) (*AerospikeClusterSpec
spec.K8sNodeBlockList = *k8sNodeBlockList
}

if len(status.Operation) != 0 {
operation := lib.DeepCopy(&status.Operation).(*[]OperationSpec)
spec.Operation = *operation
if len(status.Operations) != 0 {
operation := lib.DeepCopy(&status.Operations).(*[]OperationSpec)
spec.Operations = *operation
}

return &spec, nil
Expand Down
66 changes: 28 additions & 38 deletions api/v1/aerospikecluster_validating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func (c *AerospikeCluster) ValidateUpdate(oldObj runtime.Object) (admission.Warn
return nil, err
}

if err := validateOperatonUpdate(
&old.Spec.Operation, &c.Spec.Operation,
if err := validateOperationUpdate(
&old.Spec.Operations, &c.Spec.Operations,
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func (c *AerospikeCluster) validate(aslog logr.Logger) error {
return err
}

if err := validateOperation(&c.Spec.Operation, &c.Status.Operation); err != nil {
if err := c.validateOperation(); err != nil {
return err
}

Expand Down Expand Up @@ -273,24 +273,31 @@ func (c *AerospikeCluster) validate(aslog logr.Logger) error {
return c.validateSCNamespaces()
}

func validateOperation(specOps, statusOps *[]OperationSpec) error {
// Define a key extractor function
keyExtractor := func(op OperationSpec) int {
return op.OperationID
func (c *AerospikeCluster) validateOperation() error {
if c.Status.AerospikeConfig == nil && len(c.Spec.Operations) != 0 {
return fmt.Errorf("operation cannot be set on create")
}

specOpsMap, err := ConvertToMap(*specOps, keyExtractor)
if err != nil {
return err
allPodNames := GetAllPodNames(c.Name, c.Spec.Size, c.Spec.RackConfig.Racks)

for idx := range c.Spec.Operations {
if ContainsString(c.Spec.Operations[idx].PodList, AllPods) {
if len(c.Spec.Operations[idx].PodList) > 1 {
return fmt.Errorf("cannot specify ALL and other pods in operation")
}
} else {
podSet := sets.NewString(c.Spec.Operations[idx].PodList...)
if !allPodNames.IsSuperset(sets.Set[string](podSet)) {
return fmt.Errorf("invalid pod name in operation")
}
}
}

statusOpsMap, err := ConvertToMap(*statusOps, keyExtractor)
quickRestarts, podRestarts, err := PodsToRestart(c.Spec.Operations, c.Status.Operations, allPodNames)
if err != nil {
return err
}

quickRestarts, podRestarts := podsToRestart(specOpsMap, statusOpsMap)

invalidPods := quickRestarts.Intersection(podRestarts)
if invalidPods.Len() > 0 {
return fmt.Errorf(
Expand All @@ -299,31 +306,14 @@ func validateOperation(specOps, statusOps *[]OperationSpec) error {
)
}

return nil
}

func podsToRestart(specOps, statusOps map[int]OperationSpec) (quickRestarts, podRestarts sets.Set[string]) {
quickRestarts = make(sets.Set[string])
podRestarts = make(sets.Set[string])

for id := range specOps {
pods := sets.NewString()
if _, ok := statusOps[id]; !ok {
pods.Insert(specOps[id].PodList...)
} else {
pods.Union(sets.NewString(specOps[id].PodList...).Difference(sets.NewString(statusOps[id].PodList...)))
}

if pods.Len() > 0 {
if specOps[id].OperationType == OperationQuickRestart {
quickRestarts.Insert(pods.List()...)
} else if specOps[id].OperationType == OperationPodRestart {
podRestarts.Insert(pods.List()...)
}
}
// Don't allow any operation along with cluster scale up or racks added or removed
// New pods won't be available for operation
if len(quickRestarts)+len(podRestarts) > 0 &&
(c.Spec.Size > c.Status.Size || len(c.Spec.RackConfig.Racks) != len(c.Status.RackConfig.Racks)) {
return fmt.Errorf("cannot perform operation along with cluster scale up")
}

return quickRestarts, podRestarts
return nil
}

func (c *AerospikeCluster) validateSCNamespaces() error {
Expand Down Expand Up @@ -1362,7 +1352,7 @@ func validateEnableSecurityConfig(newConfSpec, oldConfSpec *AerospikeConfigSpec)
return fmt.Errorf("cannot remove cluster security config")
}

if oldSecConfFound && newSecConfFound {
if oldSecConfFound {
oldSecFlag, oldEnableSecurityFlagFound := oldSec.(map[string]interface{})["enable-security"]
newSecFlag, newEnableSecurityFlagFound := newSec.(map[string]interface{})["enable-security"]

Expand Down Expand Up @@ -2423,7 +2413,7 @@ func (c *AerospikeCluster) validateEnableDynamicConfigUpdate() error {
return nil
}

func validateOperatonUpdate(oldOp, newOp *[]OperationSpec) error {
func validateOperationUpdate(oldOp, newOp *[]OperationSpec) error {
// Define a key extractor function
keyExtractor := func(op OperationSpec) int {
return op.OperationID
Expand Down
80 changes: 80 additions & 0 deletions api/v1/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ import (
"errors"
"fmt"
"os"
"reflect"
"regexp"
"strconv"
"strings"

"k8s.io/apimachinery/pkg/util/sets"

v1 "k8s.io/api/core/v1"
"k8s.io/utils/ptr"

Expand All @@ -30,6 +34,8 @@ const (
FabricPortName = "fabric"

InfoPortName = "info"

AllPods = "ALL"
)

const (
Expand Down Expand Up @@ -600,3 +606,77 @@ func ConvertToMap[T any](items []T, keyExtractor func(T) int) (map[int]T, error)

return itemMap, nil
}

func GetAllPodNames(clusterName string, clusterSize int32, racks []Rack) sets.Set[string] {
podNames := make(sets.Set[string])
topology := DistributeItems(
int(clusterSize), len(racks),
)

for idx := range racks {
for i := 0; i < topology[idx]; i++ {
podNames.Insert(fmt.Sprintf("%s-%s-%d", clusterName, strconv.Itoa(racks[idx].ID), i))
}
}

return podNames
}

func PodsToRestart(specOps, statusOps []OperationSpec, allPodNames sets.Set[string]) (quickRestarts,
podRestarts sets.Set[string], err error) {
quickRestarts = make(sets.Set[string])
podRestarts = make(sets.Set[string])

// If the Spec.Operations and Status.Operations are equal, no pods to restart.
if reflect.DeepEqual(specOps, statusOps) {
return quickRestarts, podRestarts, nil
}

// Define a key extractor function
keyExtractor := func(op OperationSpec) int {
return op.OperationID
}

statusOpsMap, err := ConvertToMap(statusOps, keyExtractor)
if err != nil {
return quickRestarts, podRestarts, err
}

for _, specOp := range specOps {
// If no pod list is provided, it indicates that no pods need to be restarted.
if len(specOp.PodList) == 0 {
continue
}

var (
podsToRestart, specPods sets.Set[string]
)

if specOp.PodList[0] == AllPods {
specPods = allPodNames
} else {
specPods = sets.New[string](specOp.PodList...)
}

// If the operation is not present in the status, all pods need to be restarted.
// If the operation is present in the status, only the pods that are not present in the status need to be restarted.
// If the operation is present in the status and all podList has ALL pods, no pods need to be restarted.
if _, exists := statusOpsMap[specOp.OperationID]; !exists || len(statusOpsMap[specOp.OperationID].PodList) == 0 {
podsToRestart = specPods
} else if statusOpsMap[specOp.OperationID].PodList[0] != AllPods {
podsToRestart = specPods.Difference(sets.New[string](statusOpsMap[specOp.OperationID].PodList...))
}

// Separate pods to be restarted based on operation type
if podsToRestart != nil && podsToRestart.Len() > 0 {
switch specOp.OperationType {
case OperationQuickRestart:
quickRestarts.Insert(podsToRestart.UnsortedList()...)
case OperationPodRestart:
podRestarts.Insert(podsToRestart.UnsortedList()...)
}
}
}

return quickRestarts, podRestarts, nil
}
8 changes: 4 additions & 4 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ spec:
Refer Aerospike documentation for more details.
x-kubernetes-int-or-string: true
operation:
description: Operation is a list of on demand operation to be performed
description: Operations is a list of on demand operation to be performed
on the Aerospike cluster.
items:
properties:
Expand All @@ -334,7 +334,6 @@ spec:
required:
- operationID
- operationType
- podList
type: object
type: array
operatorClientCert:
Expand Down Expand Up @@ -9677,7 +9676,7 @@ spec:
now part of podSpec"
type: boolean
operation:
description: Operation is a list of on demand operation to be performed
description: Operations is a list of on demand operation to be performed
on the Aerospike cluster.
items:
properties:
Expand All @@ -9698,7 +9697,6 @@ spec:
required:
- operationID
- operationType
- podList
type: object
type: array
operatorClientCertSpec:
Expand Down
Loading

0 comments on commit d92b6f2

Please sign in to comment.