Skip to content

Commit

Permalink
Add PVC volume expansion
Browse files Browse the repository at this point in the history
  • Loading branch information
HoustonPutman committed Jun 14, 2024
1 parent 0af2d73 commit 49a0c84
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 11 deletions.
59 changes: 58 additions & 1 deletion controllers/solr_cluster_ops_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"net/url"
Expand All @@ -53,6 +54,7 @@ const (
ScaleUpLock SolrClusterOperationType = "ScalingUp"
UpdateLock SolrClusterOperationType = "RollingUpdate"
BalanceReplicasLock SolrClusterOperationType = "BalanceReplicas"
PvcExpansionLock SolrClusterOperationType = "PVCExpansion"
)

// RollingUpdateMetadata contains metadata for rolling update cluster operations.
Expand Down Expand Up @@ -150,6 +152,60 @@ func retryNextQueuedClusterOpWithQueue(statefulSet *appsv1.StatefulSet, clusterO
return hasOp, err
}

func determinePvcExpansionClusterOpLockIfNecessary(instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
if instance.Spec.StorageOptions.PersistentStorage != nil &&
instance.Spec.StorageOptions.PersistentStorage.PersistentVolumeClaimTemplate.Spec.Resources.Requests.Storage() != nil &&
instance.Spec.StorageOptions.PersistentStorage.PersistentVolumeClaimTemplate.Spec.Resources.Requests.Storage().String() != statefulSet.Annotations[util.StorageMinimumSizeAnnotation] {
// First make sure that the new Storage request is greater than what already is set.
// PVCs cannot be shrunk
newSize := instance.Spec.StorageOptions.PersistentStorage.PersistentVolumeClaimTemplate.Spec.Resources.Requests.Storage()
// If there is no old size to update, the StatefulSet can be just set to use the new PVC size without any issue.
// Only do a cluster operation if we are expanding from an existing size to a new size
if oldSizeStr, hasOldSize := statefulSet.Annotations[util.StorageMinimumSizeAnnotation]; hasOldSize {
if oldSize, e := resource.ParseQuantity(oldSizeStr); e != nil {
err = e
// TODO: add an event
} else {
// Only update to the new size if it is bigger, we cannot shrink PVCs
if newSize.Cmp(oldSize) > 0 {
clusterOp = &SolrClusterOp{
Operation: PvcExpansionLock,
Metadata: newSize.String(),
}
}
// TODO: add an event saying that we cannot shrink PVCs
}
}
}
return
}

// handleManagedCloudScaleUp does the logic of a managed and "locked" cloud scale up operation.
// This will likely take many reconcile loops to complete, as it is moving replicas to the pods that have recently been scaled up.
func handlePvcExpansion(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, clusterOp *SolrClusterOp, logger logr.Logger) (operationComplete bool, retryLaterDuration time.Duration, err error) {
var newSize resource.Quantity
newSize, err = resource.ParseQuantity(clusterOp.Metadata)
if err != nil {
logger.Error(err, "Could not convert PvcExpansion metadata to a resource.Quantity, as it represents the new size of PVCs", "metadata", clusterOp.Metadata)
return
}
operationComplete, err = r.expandPVCs(ctx, instance, statefulSet.Spec.Selector.MatchLabels, newSize, logger)
if err == nil && operationComplete {
originalStatefulSet := statefulSet.DeepCopy()
statefulSet.Annotations[util.StorageMinimumSizeAnnotation] = newSize.String()
statefulSet.Spec.Template.Annotations[util.StorageMinimumSizeAnnotation] = newSize.String()
if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
logger.Error(err, "Error while patching StatefulSet to set the new minimum PVC size after PVCs the completion of PVC resizing", "newSize", newSize)
operationComplete = false
}
// Return and wait for the StatefulSet to be updated which will call the reconcile to start the rolling restart
retryLaterDuration = 0
} else if err == nil {
retryLaterDuration = time.Second * 5
}
return
}

func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, scaleDownOpIsQueued bool, podList []corev1.Pod, blockReconciliationOfStatefulSet bool, logger logr.Logger) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
desiredPods := int(*instance.Spec.Replicas)
configuredPods := int(*statefulSet.Spec.Replicas)
Expand Down Expand Up @@ -291,7 +347,8 @@ func cleanupManagedCloudScaleDown(ctx context.Context, r *SolrCloudReconciler, p
// handleManagedCloudScaleUp does the logic of a managed and "locked" cloud scale up operation.
// This will likely take many reconcile loops to complete, as it is moving replicas to the pods that have recently been scaled up.
func handleManagedCloudScaleUp(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, clusterOp *SolrClusterOp, podList []corev1.Pod, logger logr.Logger) (operationComplete bool, nextClusterOperation *SolrClusterOp, err error) {
desiredPods, err := strconv.Atoi(clusterOp.Metadata)
desiredPods := 0
desiredPods, err = strconv.Atoi(clusterOp.Metadata)
if err != nil {
logger.Error(err, "Could not convert ScaleUp metadata to int, as it represents the number of nodes to scale to", "metadata", clusterOp.Metadata)
return
Expand Down
71 changes: 61 additions & 10 deletions controllers/solrcloud_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"crypto/md5"
"fmt"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime"
"reflect"
"sort"
Expand Down Expand Up @@ -483,6 +484,8 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
operationComplete, nextClusterOperation, err = handleManagedCloudScaleUp(ctx, r, instance, statefulSet, clusterOp, podList, logger)
case BalanceReplicasLock:
operationComplete, requestInProgress, retryLaterDuration, err = util.BalanceReplicasForCluster(ctx, instance, statefulSet, clusterOp.Metadata, clusterOp.Metadata, logger)
case PvcExpansionLock:
operationComplete, retryLaterDuration, err = handlePvcExpansion(ctx, r, instance, statefulSet, clusterOp, logger)
default:
operationFound = false
// This shouldn't happen, but we don't want to be stuck if it does.
Expand Down Expand Up @@ -550,6 +553,12 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
clusterOpQueue[queueIdx] = *clusterOp
clusterOp = nil
}
clusterOp, retryLaterDuration, err = determinePvcExpansionClusterOpLockIfNecessary(instance, statefulSet)
// If the new clusterOperation is an update to a queued clusterOp, just change the operation that is already queued
if queueIdx, opIsQueued := queuedRetryOps[UpdateLock]; clusterOp != nil && opIsQueued {
clusterOpQueue[queueIdx] = *clusterOp
clusterOp = nil
}

// If a non-managed scale needs to take place, this method will update the StatefulSet without starting
// a "locked" cluster operation
Expand Down Expand Up @@ -932,6 +941,46 @@ func (r *SolrCloudReconciler) reconcileZk(ctx context.Context, logger logr.Logge
return nil
}

func (r *SolrCloudReconciler) expandPVCs(ctx context.Context, cloud *solrv1beta1.SolrCloud, pvcLabelSelector map[string]string, newSize resource.Quantity, logger logr.Logger) (expansionComplete bool, err error) {
var pvcList corev1.PersistentVolumeClaimList
pvcList, err = r.getPVCList(ctx, cloud, pvcLabelSelector)
if err != nil {
return
}
expansionCompleteCount := 0
for _, pvcItem := range pvcList.Items {
if pvcExpansionComplete, e := r.expandPVC(ctx, &pvcItem, newSize, logger); e != nil {
err = e
} else if pvcExpansionComplete {
expansionCompleteCount += 1
}
}
// If all PVCs have been expanded, then we are done
expansionComplete = err == nil && expansionCompleteCount == len(pvcList.Items)
return
}

func (r *SolrCloudReconciler) expandPVC(ctx context.Context, pvc *corev1.PersistentVolumeClaim, newSize resource.Quantity, logger logr.Logger) (expansionComplete bool, err error) {
// If the current capacity is >= the new size, then there is nothing to do, expansion is complete
if pvc.Status.Capacity.Storage().Cmp(newSize) >= 0 {
// TODO: Eventually use the pvc.Status.AllocatedResources and pvc.Status.AllocatedResourceStatuses to determine the status of PVC Expansion and react to failures
expansionComplete = true
} else if !pvc.Spec.Resources.Requests.Storage().Equal(newSize) {
// Update the pvc if the capacity request is different.
// The newSize might be smaller than the current size, but this is supported as the last size might have been too
// big for the storage quota, so it was lowered.
// As long as the PVCs current capacity is lower than the new size, we are still good to update the PVC.
originalPvc := pvc.DeepCopy()
pvc.Spec.Resources.Requests[corev1.ResourceStorage] = newSize
if err = r.Patch(ctx, pvc, client.StrategicMergeFrom(originalPvc)); err != nil {
logger.Error(err, "Error while expanding PersistentVolumeClaim size", "persistentVolumeClaim", pvc.Name, "size", newSize)
} else {
logger.Info("Expanded PersistentVolumeClaim size", "persistentVolumeClaim", pvc.Name, "size", newSize)
}
}
return
}

// Logic derived from:
// - https://book.kubebuilder.io/reference/using-finalizers.html
// - https://github.com/pravega/zookeeper-operator/blob/v0.2.9/pkg/controller/zookeepercluster/zookeepercluster_controller.go#L629
Expand Down Expand Up @@ -978,16 +1027,15 @@ func (r *SolrCloudReconciler) reconcileStorageFinalizer(ctx context.Context, clo
return nil
}

func (r *SolrCloudReconciler) getPVCCount(ctx context.Context, cloud *solrv1beta1.SolrCloud, pvcLabelSelector map[string]string) (pvcCount int, err error) {
func (r *SolrCloudReconciler) getPVCCount(ctx context.Context, cloud *solrv1beta1.SolrCloud, pvcLabelSelector map[string]string) (int, error) {
pvcList, err := r.getPVCList(ctx, cloud, pvcLabelSelector)
if err != nil {
return -1, err
}
pvcCount = len(pvcList.Items)
return pvcCount, nil
return len(pvcList.Items), nil
}

func (r *SolrCloudReconciler) cleanupOrphanPVCs(ctx context.Context, cloud *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, pvcLabelSelector map[string]string, logger logr.Logger) (err error) {
func (r *SolrCloudReconciler) cleanupOrphanPVCs(ctx context.Context, cloud *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, pvcLabelSelector map[string]string, logger logr.Logger) error {
// this check should make sure we do not delete the PVCs before the STS has scaled down
if cloud.Status.ReadyReplicas == cloud.Status.Replicas {
pvcList, err := r.getPVCList(ctx, cloud, pvcLabelSelector)
Expand All @@ -1003,36 +1051,39 @@ func (r *SolrCloudReconciler) cleanupOrphanPVCs(ctx context.Context, cloud *solr
// Don't use the Spec replicas here, because we might be rolling down 1-by-1 and the PVCs for
// soon-to-be-deleted pods should not be deleted until the pod is deleted.
if util.IsPVCOrphan(pvcItem.Name, *statefulSet.Spec.Replicas) {
r.deletePVC(ctx, pvcItem, logger)
if e := r.deletePVC(ctx, pvcItem, logger); e != nil {

Check failure on line 1054 in controllers/solrcloud_controller.go

View workflow job for this annotation

GitHub Actions / Build & Check (Lint & Unit Test) (1.22)

r.deletePVC(ctx, pvcItem, logger) (no value) used as value
err = e
}
}
}
}
return err
}
return nil
}

func (r *SolrCloudReconciler) getPVCList(ctx context.Context, cloud *solrv1beta1.SolrCloud, pvcLabelSelector map[string]string) (pvList corev1.PersistentVolumeClaimList, err error) {
func (r *SolrCloudReconciler) getPVCList(ctx context.Context, cloud *solrv1beta1.SolrCloud, pvcLabelSelector map[string]string) (corev1.PersistentVolumeClaimList, error) {
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: pvcLabelSelector,
})
pvclistOps := &client.ListOptions{
pvcListOps := &client.ListOptions{
Namespace: cloud.Namespace,
LabelSelector: selector,
}
pvcList := &corev1.PersistentVolumeClaimList{}
err = r.Client.List(ctx, pvcList, pvclistOps)
err = r.Client.List(ctx, pvcList, pvcListOps)
return *pvcList, err
}

func (r *SolrCloudReconciler) cleanUpAllPVCs(ctx context.Context, cloud *solrv1beta1.SolrCloud, pvcLabelSelector map[string]string, logger logr.Logger) (err error) {
func (r *SolrCloudReconciler) cleanUpAllPVCs(ctx context.Context, cloud *solrv1beta1.SolrCloud, pvcLabelSelector map[string]string, logger logr.Logger) error {
pvcList, err := r.getPVCList(ctx, cloud, pvcLabelSelector)
if err != nil {
return err
}
for _, pvcItem := range pvcList.Items {
r.deletePVC(ctx, pvcItem, logger)
}
return nil
return err
}

func (r *SolrCloudReconciler) deletePVC(ctx context.Context, pvcItem corev1.PersistentVolumeClaim, logger logr.Logger) {
Expand Down
24 changes: 24 additions & 0 deletions controllers/util/solr_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
// These are to be saved on a statefulSet update
ClusterOpsLockAnnotation = "solr.apache.org/clusterOpsLock"
ClusterOpsRetryQueueAnnotation = "solr.apache.org/clusterOpsRetryQueue"
StorageMinimumSizeAnnotation = "solr.apache.org/storageMinimumSize"

SolrIsNotStoppedReadinessCondition = "solr.apache.org/isNotStopped"
SolrReplicasNotEvictedReadinessCondition = "solr.apache.org/replicasNotEvicted"
Expand Down Expand Up @@ -200,6 +201,13 @@ func GenerateStatefulSet(solrCloud *solr.SolrCloud, solrCloudStatus *solr.SolrCl
Spec: pvc.Spec,
},
}
if pvc.Spec.Resources.Requests.Storage() != nil {
annotations[StorageMinimumSizeAnnotation] = pvc.Spec.Resources.Requests.Storage().String()
if podAnnotations == nil {
podAnnotations = make(map[string]string, 1)
}
podAnnotations[StorageMinimumSizeAnnotation] = pvc.Spec.Resources.Requests.Storage().String()
}
} else {
ephemeralVolume := corev1.Volume{
Name: solrDataVolumeName,
Expand Down Expand Up @@ -680,6 +688,22 @@ func MaintainPreservedStatefulSetFields(expected, found *appsv1.StatefulSet) {
}
expected.Annotations[ClusterOpsRetryQueueAnnotation] = queue
}
if storage, hasStorage := found.Annotations[StorageMinimumSizeAnnotation]; hasStorage {
if expected.Annotations == nil {
expected.Annotations = make(map[string]string, 1)
}
expected.Annotations[StorageMinimumSizeAnnotation] = storage
}
}
if found.Spec.Template.Annotations != nil {
// Note: the Pod template storage annotation is used to start a rolling restart,
// it should always match the StatefulSet's storage annotation
if storage, hasStorage := found.Spec.Template.Annotations[StorageMinimumSizeAnnotation]; hasStorage {
if expected.Spec.Template.Annotations == nil {
expected.Spec.Template.Annotations = make(map[string]string, 1)
}
expected.Spec.Template.Annotations[StorageMinimumSizeAnnotation] = storage
}
}

// Scaling (i.e. changing) the number of replicas in the SolrCloud statefulSet is handled during the clusterOps
Expand Down

0 comments on commit 49a0c84

Please sign in to comment.