Skip to content

Commit

Permalink
Fix scaling when using ingress-addressed nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
HoustonPutman committed Mar 7, 2024
1 parent 15afb17 commit 5242e94
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 8 deletions.
7 changes: 7 additions & 0 deletions api/v1beta1/solrcloud_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,13 @@ func (sc *SolrCloud) GetAllSolrPodNames() []string {
if sc.Spec.Replicas != nil {
replicas = int(*sc.Spec.Replicas)
}
if int(sc.Status.Replicas) > replicas {
replicas = int(sc.Status.Replicas)
}
return sc.GetSolrPodNames(replicas)
}

func (sc *SolrCloud) GetSolrPodNames(replicas int) []string {
podNames := make([]string, replicas)
statefulSetName := sc.StatefulSetName()
for i := range podNames {
Expand Down
19 changes: 17 additions & 2 deletions controllers/solr_cluster_ops_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func retryNextQueuedClusterOpWithQueue(statefulSet *appsv1.StatefulSet, clusterO
return hasOp, err
}

func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, scaleDownOpIsQueued bool, podList []corev1.Pod, logger logr.Logger) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, scaleDownOpIsQueued bool, podList []corev1.Pod, blockReconciliationOfStatefulSet bool, logger logr.Logger) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
desiredPods := int(*instance.Spec.Replicas)
configuredPods := int(*statefulSet.Spec.Replicas)
if desiredPods != configuredPods {
Expand All @@ -170,11 +170,26 @@ func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudRec
Metadata: strconv.Itoa(configuredPods - 1),
}
} else if desiredPods > configuredPods && (instance.Spec.Scaling.PopulatePodsOnScaleUp == nil || *instance.Spec.Scaling.PopulatePodsOnScaleUp) {
// We need to wait for all pods to become healthy to scale up in a managed fashion, otherwise
// the balancing will skip some pods
if len(podList) < configuredPods {
// There are not enough pods, the statefulSet controller has yet to create the previously desired pods.
// Do not start the scale up until these missing pods are created.
return nil, time.Second * 5, nil
}
// If Solr nodes are advertised by their individual node services (through an ingress)
// then make sure that the host aliases are set for all desired pods before starting a scale-up.
// If the host aliases do not already include the soon-to-be created pods, then Solr might not be able to balance
// replicas onto the new hosts.
// We need to make sure that the StatefulSet is updated with these new hostAliases before the scale up occurs.
if instance.UsesIndividualNodeServices() && instance.Spec.SolrAddressability.External.UseExternalAddress {
for _, pod := range podList {
if len(pod.Spec.HostAliases) < desiredPods {
return nil, time.Second * 5, nil
}
}
}

clusterOp = &SolrClusterOp{
Operation: ScaleUpLock,
Metadata: strconv.Itoa(desiredPods),
Expand Down Expand Up @@ -371,7 +386,7 @@ func handleManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler
// We won't kill pods that we need the cluster state for, but we can kill the pods that are already not running.
// This is important for scenarios where there is a bad pod config and nothing is running, but we need to do
// a restart to get a working pod config.
state, retryLater, apiError := util.GetNodeReplicaState(ctx, instance, hasReadyPod, logger)
state, retryLater, apiError := util.GetNodeReplicaState(ctx, instance, statefulSet, hasReadyPod, logger)
if apiError != nil {
return false, true, 0, nil, apiError
} else if !retryLater {
Expand Down
8 changes: 7 additions & 1 deletion controllers/solrcloud_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
hostNameIpMap := make(map[string]string)
// Generate a service for every Node
if instance.UsesIndividualNodeServices() {
// When updating the statefulSet below, the hostNameIpMap is just used to add new IPs or modify existing ones.
// When scaling down, the hostAliases that are no longer found here will not be removed from the hostAliases in the statefulSet pod spec.
// Therefore, it should be ok that we are not reconciling the node services that will be scaled down in the future.
// This is unfortunately the reality since we don't have the statefulSet yet to determine how many Solr pods are still running,
// we just have Spec.replicas which is the requested pod count.
for _, nodeName := range solrNodeNames {
err, ip := r.reconcileNodeService(ctx, logger, instance, nodeName)
if err != nil {
Expand All @@ -161,6 +166,7 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if instance.Spec.SolrAddressability.External.UseExternalAddress {
if ip == "" {
// If we are using this IP in the hostAliases of the statefulSet, it needs to be set for every service before trying to update the statefulSet
// TODO: Make an event here
blockReconciliationOfStatefulSet = true
} else {
hostNameIpMap[instance.AdvertisedNodeHost(nodeName)] = ip
Expand Down Expand Up @@ -545,7 +551,7 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// a "locked" cluster operation
if clusterOp == nil {
_, scaleDownOpIsQueued := queuedRetryOps[ScaleDownLock]
clusterOp, retryLaterDuration, err = determineScaleClusterOpLockIfNecessary(ctx, r, instance, statefulSet, scaleDownOpIsQueued, podList, logger)
clusterOp, retryLaterDuration, err = determineScaleClusterOpLockIfNecessary(ctx, r, instance, statefulSet, scaleDownOpIsQueued, podList, blockReconciliationOfStatefulSet, logger)

// If the new clusterOperation is an update to a queued clusterOp, just change the operation that is already queued
if clusterOp != nil {
Expand Down
24 changes: 23 additions & 1 deletion controllers/util/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,29 @@ func CopyPodTemplates(from, to *corev1.PodTemplateSpec, basePath string, logger

if !DeepEqualWithNils(to.Spec.HostAliases, from.Spec.HostAliases) {
requireUpdate = true
to.Spec.HostAliases = from.Spec.HostAliases
if to.Spec.HostAliases == nil {
to.Spec.HostAliases = from.Spec.HostAliases
} else {
// Do not remove aliases that are no longer used.
// This is in case Solr is scaling down and we want to keep the old addresses for future use.
for _, fromAlias := range from.Spec.HostAliases {
found := false
for i, toAlias := range to.Spec.HostAliases {
if fromAlias.Hostnames[0] == toAlias.Hostnames[0] {
found = true
if !DeepEqualWithNils(toAlias, fromAlias) {
requireUpdate = true
to.Spec.HostAliases[i] = fromAlias
break
}
}
}
if !found {
requireUpdate = true
to.Spec.HostAliases = append(to.Spec.HostAliases, fromAlias)
}
}
}
logger.Info("Update required because field changed", "field", basePath+"Spec.HostAliases", "from", to.Spec.HostAliases, "to", from.Spec.HostAliases)
}

Expand Down
14 changes: 12 additions & 2 deletions controllers/util/solr_update_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/apache/solr-operator/controllers/util/solr_api"
"github.com/go-logr/logr"
"github.com/robfig/cron/v3"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"net/url"
Expand Down Expand Up @@ -119,7 +120,7 @@ func (state NodeReplicaState) PodHasReplicas(cloud *solr.SolrCloud, podName stri
return isInClusterState && contents.replicas > 0
}

func GetNodeReplicaState(ctx context.Context, cloud *solr.SolrCloud, hasReadyPod bool, logger logr.Logger) (state NodeReplicaState, retryLater bool, err error) {
func GetNodeReplicaState(ctx context.Context, cloud *solr.SolrCloud, statefulSet *appsv1.StatefulSet, hasReadyPod bool, logger logr.Logger) (state NodeReplicaState, retryLater bool, err error) {
clusterResp := &solr_api.SolrClusterStatusResponse{}
overseerResp := &solr_api.SolrOverseerStatusResponse{}

Expand All @@ -138,7 +139,7 @@ func GetNodeReplicaState(ctx context.Context, cloud *solr.SolrCloud, hasReadyPod
}
}
if err == nil {
state = findSolrNodeContents(clusterResp.ClusterStatus, overseerResp.Leader, GetAllManagedSolrNodeNames(cloud))
state = findSolrNodeContents(clusterResp.ClusterStatus, overseerResp.Leader, GetManagedSolrNodeNames(cloud, int(*statefulSet.Spec.Replicas)))
} else {
logger.Error(err, "Could not fetch cluster state information for cloud")
}
Expand Down Expand Up @@ -535,6 +536,15 @@ func GetAllManagedSolrNodeNames(solrCloud *solr.SolrCloud) map[string]bool {
return allNodeNames
}

func GetManagedSolrNodeNames(solrCloud *solr.SolrCloud, currentlyConfiguredPodCount int) map[string]bool {
podNames := solrCloud.GetSolrPodNames(currentlyConfiguredPodCount)
allNodeNames := make(map[string]bool, len(podNames))
for _, podName := range podNames {
allNodeNames[SolrNodeName(solrCloud, podName)] = true
}
return allNodeNames
}

// EvictReplicasForPodIfNecessary takes a solr Pod and migrates all replicas off of that Pod.
// For updates this will only be called for pods using ephemeral data.
// For scale-down operations, this can be called for pods using ephemeral or persistent data.
Expand Down
70 changes: 70 additions & 0 deletions tests/e2e/solrcloud_scaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,76 @@ var _ = FDescribe("E2E - SolrCloud - Scale Up", func() {
})
})

FContext("with replica migration using an ingress for node addresses", func() {

BeforeEach(func() {
solrCloud.Spec.Replicas = pointer.Int32(2)
solrCloud.Spec.SolrAddressability = solrv1beta1.SolrAddressabilityOptions{
External: &solrv1beta1.ExternalAddressability{
Method: solrv1beta1.Ingress,
UseExternalAddress: true,
DomainName: "test.solr.org",
},
}
})

FIt("Scales Up", func(ctx context.Context) {
originalSolrCloud := solrCloud.DeepCopy()
solrCloud.Spec.Replicas = pointer.Int32(int32(3))
By("triggering a scale down via solrCloud replicas")
Expect(k8sClient.Patch(ctx, solrCloud, client.MergeFrom(originalSolrCloud))).To(Succeed(), "Could not patch SolrCloud replicas to initiate scale up")

By("waiting for the rolling restart to begin with hostAliases")
expectStatefulSetWithChecksAndTimeout(ctx, solrCloud, solrCloud.StatefulSetName(), time.Second*5, time.Millisecond*5, func(g Gomega, found *appsv1.StatefulSet) {
clusterOp, err := controllers.GetCurrentClusterOp(found)
g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
g.Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a update lock to add hostAliases.")
g.Expect(clusterOp.Operation).To(Equal(controllers.UpdateLock), "StatefulSet does not have a update lock to add hostAliases.")
g.Expect(found.Spec.Template.Spec.HostAliases).To(HaveLen(3), "Host aliases in the pod template do not have length 3, which is the number of pods to scale up to")
})

By("waiting for the scaleUp to begin")
statefulSet := expectStatefulSetWithChecksAndTimeout(ctx, solrCloud, solrCloud.StatefulSetName(), time.Second*30, time.Millisecond*5, func(g Gomega, found *appsv1.StatefulSet) {
clusterOp, err := controllers.GetCurrentClusterOp(found)
g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
g.Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a scaleUp lock.")
g.Expect(clusterOp.Operation).To(Equal(controllers.ScaleUpLock), "StatefulSet does not have a scaleUp lock.")
g.Expect(clusterOp.Metadata).To(Equal("3"), "StatefulSet scaling lock operation has the wrong metadata.")
})

// The first step is to increase the number of pods
// Check very often, as the new pods will be created quickly, which will cause the cluster op to change.
statefulSet = expectStatefulSetWithChecksAndTimeout(ctx, solrCloud, solrCloud.StatefulSetName(), time.Second*5, time.Millisecond*5, func(g Gomega, found *appsv1.StatefulSet) {
g.Expect(found.Spec.Replicas).To(HaveValue(BeEquivalentTo(3)), "StatefulSet should still have 3 pods, because the scale down should first move Solr replicas")
})
clusterOp, err := controllers.GetCurrentClusterOp(statefulSet)
Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a scaleUp lock.")
Expect(clusterOp.Operation).To(Equal(controllers.ScaleUpLock), "StatefulSet does not have a scaleUp lock.")
Expect(clusterOp.Metadata).To(Equal("3"), "StatefulSet scaling lock operation has the wrong metadata.")

// Wait for new pods to come up, and when they do we should be doing a balanceReplicas clusterOp
statefulSet = expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
g.Expect(found.Status.Replicas).To(HaveValue(BeEquivalentTo(3)), "StatefulSet should still have 3 pods, because the scale down should first move Solr replicas")
})
clusterOp, err = controllers.GetCurrentClusterOp(statefulSet)
Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
Expect(clusterOp).ToNot(BeNil(), "StatefulSet does not have a balanceReplicas lock after new pods are created.")
Expect(clusterOp.Operation).To(Equal(controllers.BalanceReplicasLock), "StatefulSet does not have a balanceReplicas lock after new pods are created.")
Expect(clusterOp.Metadata).To(Equal("ScaleUp"), "StatefulSet balanceReplicas lock operation has the wrong metadata.")

By("waiting for the scaleUp to finish")
statefulSet = expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
clusterOp, err := controllers.GetCurrentClusterOp(found)
g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
g.Expect(clusterOp).To(BeNil(), "StatefulSet should not have a balanceReplicas lock after balancing is complete.")
})

queryCollection(ctx, solrCloud, solrCollection1, 0)
queryCollection(ctx, solrCloud, solrCollection2, 0)
})
})

FContext("without replica migration", func() {

BeforeEach(func() {
Expand Down
46 changes: 46 additions & 0 deletions tests/e2e/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,26 @@ func writeAllSolrInfoToFiles(ctx context.Context, directory string, namespace st
&statefulSet,
)
}

// Unfortunately the services don't have the technology label
req, err = labels.NewRequirement("solr-cloud", selection.Exists, make([]string, 0))
Expect(err).ToNot(HaveOccurred())

labelSelector = labels.Everything().Add(*req)
listOps = &client.ListOptions{
Namespace: namespace,
LabelSelector: labelSelector,
}

foundServices := &corev1.ServiceList{}
Expect(k8sClient.List(ctx, foundServices, listOps)).To(Succeed(), "Could not fetch Solr pods")
Expect(foundServices).ToNot(BeNil(), "No Solr services could be found")
for _, service := range foundServices.Items {
writeAllServiceInfoToFiles(
directory+service.Name+".service",
&service,
)
}
}

// writeAllStatefulSetInfoToFiles writes the following each to a separate file with the given base name & directory.
Expand Down Expand Up @@ -339,6 +359,32 @@ func writeAllStatefulSetInfoToFiles(baseFilename string, statefulSet *appsv1.Sta
Expect(writeErr).ToNot(HaveOccurred(), "Could not write statefulSet events json to file")
}

// writeAllServiceInfoToFiles writes the following each to a separate file with the given base name & directory.
// - Service Spec/Status
// - Service Events
func writeAllServiceInfoToFiles(baseFilename string, service *corev1.Service) {
// Write service to a file
statusFile, err := os.Create(baseFilename + ".status.json")
defer statusFile.Close()
Expect(err).ToNot(HaveOccurred(), "Could not open file to save service status: %s", baseFilename+".status.json")
jsonBytes, marshErr := json.MarshalIndent(service, "", "\t")
Expect(marshErr).ToNot(HaveOccurred(), "Could not serialize service json")
_, writeErr := statusFile.Write(jsonBytes)
Expect(writeErr).ToNot(HaveOccurred(), "Could not write service json to file")

// Write events for service to a file
eventsFile, err := os.Create(baseFilename + ".events.json")
defer eventsFile.Close()
Expect(err).ToNot(HaveOccurred(), "Could not open file to save service events: %s", baseFilename+".events.yaml")

eventList, err := rawK8sClient.CoreV1().Events(service.Namespace).Search(scheme.Scheme, service)
Expect(err).ToNot(HaveOccurred(), "Could not find events for service: %s", service.Name)
jsonBytes, marshErr = json.MarshalIndent(eventList, "", "\t")
Expect(marshErr).ToNot(HaveOccurred(), "Could not serialize service events json")
_, writeErr = eventsFile.Write(jsonBytes)
Expect(writeErr).ToNot(HaveOccurred(), "Could not write service events json to file")
}

// writeAllPodInfoToFile writes the following each to a separate file with the given base name & directory.
// - Pod Spec/Status
// - Pod Events
Expand Down
4 changes: 2 additions & 2 deletions tests/e2e/test_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,10 @@ func callSolrApiInPod(ctx context.Context, solrCloud *solrv1beta1.SolrCloud, htt
"-verbose",
"-" + strings.ToLower(httpMethod),
fmt.Sprintf(
"\"%s://%s:%d%s%s\"",
"\"%s://%s%s%s%s\"",
solrCloud.UrlScheme(false),
hostname,
solrCloud.Spec.SolrAddressability.PodPort,
solrCloud.NodePortSuffix(false),
apiPath,
queryParamsString),
}
Expand Down

0 comments on commit 5242e94

Please sign in to comment.