Skip to content

Commit

Permalink
Support scaling the entire database (#1051)
Browse files Browse the repository at this point in the history
So far, we have been scaling based on a service name. Now we can
consider the entire database for scaling.
Now if serviceName is empty all the pods will be selected, and when
scaling is needed all the subclusters will be considered.
  • Loading branch information
roypaulin authored Feb 7, 2025
1 parent 11ec273 commit 142e6c1
Show file tree
Hide file tree
Showing 89 changed files with 1,732 additions and 67 deletions.
18 changes: 16 additions & 2 deletions api/v1/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,12 +497,14 @@ func (v *VerticaDB) GetVProxyDeploymentName(scName string) string {
return fmt.Sprintf("%s-%s-proxy", v.Name, GenCompatibleFQDNHelper(scName))
}

// FindSubclusterForServiceName will find any subclusters that match the given service name
// FindSubclusterForServiceName will find any subclusters that match the given service name.
// If service name is empty, it will return all the subclusters in vdb.
func (v *VerticaDB) FindSubclusterForServiceName(svcName string) (scs []*Subcluster, totalSize int32) {
totalSize = int32(0)
scs = []*Subcluster{}
for i := range v.Spec.Subclusters {
if v.Spec.Subclusters[i].GetServiceName() == svcName {
sc := &v.Spec.Subclusters[i]
if svcName == "" || sc.GetServiceName() == svcName {
scs = append(scs, &v.Spec.Subclusters[i])
totalSize += v.Spec.Subclusters[i].Size
}
Expand Down Expand Up @@ -657,6 +659,18 @@ func (v *VerticaDB) GetFirstPrimarySubcluster() *Subcluster {
return nil
}

// GetPromaryCount returns the number of primary nodes in the cluster.
func (v *VerticaDB) GetPrimaryCount() int {
sizeSum := 0
for i := range v.Spec.Subclusters {
sc := &v.Spec.Subclusters[i]
if sc.IsPrimary() && !sc.IsSandboxPrimary() {
sizeSum += int(sc.Size)
}
}
return sizeSum
}

// HasSecondarySubclusters returns true if at least 1 secondary subcluster
// exists in the database.
func (v *VerticaDB) HasSecondarySubclusters() bool {
Expand Down
16 changes: 12 additions & 4 deletions api/v1beta1/verticaautoscaler_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,30 @@ type VerticaAutoscalerSpec struct {
// the last subcluster only.
ScalingGranularity ScalingGranularityType `json:"scalingGranularity"`

// +kubebuilder:validation:Required
// +kubebuilder:validation:Optional
// +operator-sdk:csv:customresourcedefinitions:type=spec
// +operator-sdk:csv:customresourcedefinitions:type=spec,xDescriptors="urn:alm:descriptor:com.tectonic.ui:text"
// This acts as a selector for the subclusters that are being scaled together.
// Each subcluster has a service name field, which if omitted is the same
// name as the subcluster name. Multiple subclusters that have the same
// service name use the same service object.
ServiceName string `json:"serviceName"`
// if this field is empty, all the subclusters will be selected for scaling.
ServiceName string `json:"serviceName,omitempty"`

// +operator-sdk:csv:customresourcedefinitions:type=spec
// +kubebuilder:validation:Optional
// When the scaling granularity is Subcluster, this field defines a template
// to use for when a new subcluster needs to be created. If size is 0, then
// the operator will use an existing subcluster to use as the template. If
// size is > 0, the service name must match the serviceName parameter. The
// name of the new subcluster is always auto generated. If the name is set
// size is > 0 service name must match the serviceName parameter (if non-empty).
//
// If the serviceName parameter is empty, service name can be an existing service and
// in that case the new subcluster will share it with other(s) subcluster, service
// name can also be non-existing and all the subclusters created from the template
// will share that service. If service name is empty, each new subcluster will have its
// own service.
//
// The name of the new subcluster is always auto generated. If the name is set
// here it will be used as a prefix for the new subcluster. Otherwise, we
// use the name of this VerticaAutoscaler object as a prefix for all
// subclusters.
Expand Down
2 changes: 1 addition & 1 deletion api/v1beta1/verticaautoscaler_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (v *VerticaAutoscaler) validateScalingGranularity(allErrs field.ErrorList)
// validateSubclusterTemplate will validate the subcluster template
func (v *VerticaAutoscaler) validateSubclusterTemplate(allErrs field.ErrorList) field.ErrorList {
pathPrefix := field.NewPath("spec").Child("template")
if v.CanUseTemplate() && v.Spec.Template.ServiceName != v.Spec.ServiceName {
if v.CanUseTemplate() && v.Spec.ServiceName != "" && v.Spec.Template.ServiceName != v.Spec.ServiceName {
err := field.Invalid(pathPrefix.Child("serviceName"),
v.Spec.Template.ServiceName,
"The serviceName in the subcluster template must match spec.serviceName")
Expand Down
4 changes: 4 additions & 0 deletions api/v1beta1/verticaautoscaler_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ var _ = Describe("verticaautoscaler_webhook", func() {
vas.Spec.Template.ServiceName = vas.Spec.ServiceName
_, err3 := vas.ValidateUpdate(MakeVAS())
Expect(err3).Should(Succeed())
vas.Spec.ServiceName = ""
vas.Spec.Template.ServiceName = "SomethingElse"
_, err4 := vas.ValidateUpdate(MakeVAS())
Expect(err4).Should(Succeed())
})

It("should fail if you try to use the template with pod scalingGranularity", func() {
Expand Down
5 changes: 5 additions & 0 deletions changes/unreleased/Fixed-20250203-220428.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
kind: Fixed
body: Routing traffic to a sandbox pod after restart
time: 2025-02-03T22:04:28.253476322+01:00
custom:
Issue: "1045"
2 changes: 1 addition & 1 deletion pkg/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ func BuildVProxyDeployment(nm types.NamespacedName, vdb *vapi.VerticaDB, sc *vap
}
}

// buildPodSpec creates a PodSpec for the deployment
// buildVProxyPodSpec creates a PodSpec for the vproxy deployment
func buildVProxyPodSpec(vdb *vapi.VerticaDB, sc *vapi.Subcluster) corev1.PodSpec {
termGracePeriod := int64(0)
return corev1.PodSpec{
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/vas/scaledown_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (s *ScaledownReconciler) Reconcile(ctx context.Context, req *ctrl.Request)
s.Log.Info("Metric's value is lower than the scale-down threshold.", "metric", mStatus.name)
newMinReplicas = *s.Vas.Spec.CustomAutoscaler.MinReplicas
} else {
newMinReplicas = s.Vas.Spec.TargetSize
newMinReplicas = s.Vas.Status.CurrentSize
break
}
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/controllers/vas/subclusterresize_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,29 @@ func (s *SubclusterResizeReconciler) resizeSubcluster(ctx context.Context, req *
return nil
}

minHosts := vapi.KSafety0MinHosts
if !s.Vdb.IsKSafety0() {
minHosts = vapi.KSafety1MinHosts
}
for i := len(subclusters) - 1; i >= 0; i-- {
targetSc := subclusters[i]
if delta > 0 { // Growing subclusters
targetSc.Size += delta
delta = 0
} else { // Shrinking subclusters
primaryCount := s.Vdb.GetPrimaryCount()
if -1*delta > targetSc.Size {
if primaryCount-int(targetSc.Size) < minHosts {
s.VRec.Log.Info("Shrinking subcluster will violate ksafety rule", "subcluster", targetSc.Name)
continue
}
delta += targetSc.Size
targetSc.Size = 0
} else {
if primaryCount+int(delta) < minHosts {
s.VRec.Log.Info("Shrinking subcluster will violate ksafety rule", "subcluster", targetSc.Name)
continue
}
targetSc.Size += delta
delta = 0
}
Expand Down
31 changes: 30 additions & 1 deletion pkg/controllers/vas/subclusterresize_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ var _ = Describe("subclusterresize_reconcile", func() {
const TargetSvcName = "conn"
vdb := vapi.MakeVDB()
vdb.Spec.Subclusters = []vapi.Subcluster{
{Name: "sc1", Size: 5, ServiceName: TargetSvcName},
{Name: "sc1", Size: 5, ServiceName: TargetSvcName, Type: vapi.PrimarySubcluster},
{Name: "sc2", Size: 10, ServiceName: "other"},
{Name: "sc3", Size: 1, ServiceName: TargetSvcName},
}
Expand All @@ -176,4 +176,33 @@ var _ = Describe("subclusterresize_reconcile", func() {
Expect(fetchVdb.Spec.Subclusters[1].Size).Should(Equal(vdb.Spec.Subclusters[1].Size))
Expect(fetchVdb.Spec.Subclusters[2].Size).Should(Equal(int32(0)))
})

It("should not shrink the subcluster size", func() {
const TargetSvcName = "conn"
vdb := vapi.MakeVDB()
vdb.Spec.Subclusters = []vapi.Subcluster{
{Name: "sc1", Size: 5, ServiceName: TargetSvcName, Type: vapi.PrimarySubcluster},
{Name: "sc2", Size: 10, ServiceName: "other"},
{Name: "sc3", Size: 1, ServiceName: TargetSvcName},
}
test.CreateVDB(ctx, k8sClient, vdb)
defer test.DeleteVDB(ctx, k8sClient, vdb)

vas := v1beta1.MakeVAS()
const NumPodsToRemove = 4
vas.Spec.TargetSize = vdb.Spec.Subclusters[0].Size + vdb.Spec.Subclusters[2].Size - NumPodsToRemove
vas.Spec.ServiceName = TargetSvcName
v1beta1_test.CreateVAS(ctx, k8sClient, vas)
defer v1beta1_test.DeleteVAS(ctx, k8sClient, vas)

req := ctrl.Request{NamespacedName: v1beta1.MakeVASName()}
Expect(vasRec.Reconcile(ctx, req)).Should(Equal(ctrl.Result{}))

fetchVdb := &v1beta1.VerticaDB{}
nm := v1beta1.MakeVDBName()
Expect(k8sClient.Get(ctx, nm, fetchVdb)).Should(Succeed())
Expect(fetchVdb.Spec.Subclusters[0].Size).Should(Equal(vdb.Spec.Subclusters[0].Size))
Expect(fetchVdb.Spec.Subclusters[1].Size).Should(Equal(vdb.Spec.Subclusters[1].Size))
Expect(fetchVdb.Spec.Subclusters[2].Size).Should(Equal(int32(0)))
})
})
73 changes: 41 additions & 32 deletions pkg/controllers/vas/subclusterscale_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,31 @@ func (s *SubclusterScaleReconciler) scaleSubcluster(ctx context.Context, req *ct
// picking the last one first. Changes are made in-place in s.Vdb
func (s *SubclusterScaleReconciler) considerRemovingSubclusters(podsToRemove int32) bool {
origNumSubclusters := len(s.Vdb.Spec.Subclusters)
minHosts := vapi.KSafety0MinHosts
if !s.Vdb.IsKSafety0() {
minHosts = vapi.KSafety1MinHosts
}
for j := len(s.Vdb.Spec.Subclusters) - 1; j >= 0; j-- {
sc := &s.Vdb.Spec.Subclusters[j]
if sc.GetServiceName() == s.Vas.Spec.ServiceName {
if podsToRemove > 0 && sc.Size <= podsToRemove {
if s.Vas.Spec.ServiceName == "" || sc.GetServiceName() == s.Vas.Spec.ServiceName {
if podsToRemove == 0 {
break
}
if sc.Size <= podsToRemove {
if sc.IsPrimary() {
primaryCount := s.Vdb.GetPrimaryCount()
primaryCountAfterScaling := primaryCount - int(sc.Size)
// We will prevent removing a primary if it will lead to a kasafety
// rule violation.
if primaryCountAfterScaling < minHosts {
s.VRec.Log.Info("Removing subcluster will violate ksafety. Skipping to the next one", "Subcluster", sc.Name)
continue
}
}
podsToRemove -= sc.Size
s.VRec.Log.Info("Removing subcluster in VerticaDB", "VerticaDB", s.Vdb.Name, "Subcluster", sc.Name)
s.Vdb.Spec.Subclusters = append(s.Vdb.Spec.Subclusters[:j], s.Vdb.Spec.Subclusters[j+1:]...)
continue
}
return origNumSubclusters != len(s.Vdb.Spec.Subclusters)
}
}
return origNumSubclusters != len(s.Vdb.Spec.Subclusters)
Expand All @@ -120,12 +135,13 @@ func (s *SubclusterScaleReconciler) considerRemovingSubclusters(podsToRemove int
func (s *SubclusterScaleReconciler) considerAddingSubclusters(newPodsNeeded int32) bool {
origNumSubclusters := len(s.Vdb.Spec.Subclusters)
scMap := s.Vdb.GenSubclusterMap()
newScSize, ok := s.calcNextSubclusterSize(scMap)
if !ok {
baseSc := s.calcNextSubcluster(newPodsNeeded)
if baseSc == nil {
return false
}
for newPodsNeeded >= newScSize {
newSc, _ := s.calcNextSubcluster(scMap)
for newPodsNeeded >= baseSc.Size {
newSc := baseSc.DeepCopy()
newSc.Name = s.genNextSubclusterName(scMap)
s.Vdb.Spec.Subclusters = append(s.Vdb.Spec.Subclusters, *newSc)
scMap[newSc.Name] = &s.Vdb.Spec.Subclusters[len(s.Vdb.Spec.Subclusters)-1]
newPodsNeeded -= newSc.Size
Expand All @@ -151,36 +167,29 @@ func (s *SubclusterScaleReconciler) genNextSubclusterName(scMap map[string]*vapi
}
}

// calcNextSubclusterSize returns the size of the next subcluster
func (s *SubclusterScaleReconciler) calcNextSubclusterSize(scMap map[string]*vapi.Subcluster) (int32, bool) {
newSc, ok := s.calcNextSubcluster(scMap)
if !ok {
return 0, false
}
return newSc.Size, true
}

// calcNextSubcluster build the next subcluster that we want to add to the vdb.
// Returns false for second parameter if unable to construct one. An event will
// be logged if this happens.
func (s *SubclusterScaleReconciler) calcNextSubcluster(scMap map[string]*vapi.Subcluster) (*vapi.Subcluster, bool) {
func (s *SubclusterScaleReconciler) calcNextSubcluster(newPodsNeeded int32) *vapi.Subcluster {
// If the template is set, we will use that. Otherwise, we try to use an
// existing subcluster (last one added) as a base.
if s.Vas.CanUseTemplate() {
sc := v1beta1.GetV1SubclusterFromV1beta1(s.Vas.Spec.Template.DeepCopy())
sc.Name = s.genNextSubclusterName(scMap)
return &sc, true
if newPodsNeeded >= sc.Size {
return &sc
}
return nil
}
scs, _ := s.Vdb.FindSubclusterForServiceName(s.Vas.Spec.ServiceName)
if len(scs) == 0 {
msg := "Could not determine size of the next subcluster. Template in VerticaAutoscaler "
msg += "is empty and no existing subcluster can be used as a base"
s.VRec.Log.Info(msg)
s.VRec.EVRec.Event(s.Vas, corev1.EventTypeWarning, events.NoSubclusterTemplate, msg)
return nil, false
for j := len(scs) - 1; j >= 0; j-- {
// we pick the first subcluster (starting from the last) that
// has a size not greater than the number of pods to add.
if newPodsNeeded >= scs[j].Size {
scs[j].ServiceName = s.Vas.Spec.ServiceName
return scs[j]
}
}
newSc := scs[len(scs)-1].DeepCopy()
newSc.ServiceName = s.Vas.Spec.ServiceName
newSc.Name = s.genNextSubclusterName(scMap)
return newSc, true
msg := "Could not determine size of the next subcluster. Template in VerticaAutoscaler "
msg += "is empty and no existing subcluster can be used as a base"
s.VRec.Log.Info(msg)
s.VRec.EVRec.Event(s.Vas, corev1.EventTypeWarning, events.NoSubclusterTemplate, msg)
return nil
}
28 changes: 17 additions & 11 deletions pkg/controllers/vas/subclusterscale_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,20 @@ var _ = Describe("subclusterscale_reconcile", func() {

It("should grow by adding new subclusters", func() {
vdb := vapi.MakeVDB()
const serviceName = "as"
vdb.Spec.Subclusters[0].ServiceName = serviceName
test.CreateVDB(ctx, k8sClient, vdb)
defer test.DeleteVDB(ctx, k8sClient, vdb)

vas := v1beta1.MakeVAS()
vas.Spec.ScalingGranularity = v1beta1.SubclusterScalingGranularity
vas.Spec.ServiceName = serviceName
vas.Spec.Template = v1beta1.Subcluster{
Name: "blah",
ServiceName: "my-ut",
ServiceName: serviceName,
Size: 8,
}
vas.Spec.TargetSize = vas.Spec.Template.Size * 2
vas.Spec.TargetSize = vas.Spec.Template.Size*2 + 3
v1beta1_test.CreateVAS(ctx, k8sClient, vas)
defer v1beta1_test.DeleteVAS(ctx, k8sClient, vas)

Expand Down Expand Up @@ -93,12 +96,10 @@ var _ = Describe("subclusterscale_reconcile", func() {
fetchVdb := &vapi.VerticaDB{}
vdbName := vdb.ExtractNamespacedName()
Expect(k8sClient.Get(ctx, vdbName, fetchVdb)).Should(Succeed())
Expect(len(fetchVdb.Spec.Subclusters)).Should(Equal(5))
Expect(fetchVdb.Spec.Subclusters[0].Size).Should(Equal(vdb.Spec.Subclusters[0].Size))
Expect(fetchVdb.Spec.Subclusters[1].Size).Should(Equal(vdb.Spec.Subclusters[1].Size))
Expect(fetchVdb.Spec.Subclusters[2].Size).Should(Equal(vdb.Spec.Subclusters[2].Size))
Expect(fetchVdb.Spec.Subclusters[3].Size).Should(Equal(vdb.Spec.Subclusters[3].Size))
Expect(fetchVdb.Spec.Subclusters[4].Size).Should(Equal(vdb.Spec.Subclusters[4].Size))
Expect(len(fetchVdb.Spec.Subclusters)).Should(Equal(3))
Expect(fetchVdb.Spec.Subclusters[0].Size).Should(Equal(vdb.Spec.Subclusters[1].Size))
Expect(fetchVdb.Spec.Subclusters[1].Size).Should(Equal(vdb.Spec.Subclusters[3].Size))
Expect(fetchVdb.Spec.Subclusters[2].Size).Should(Equal(vdb.Spec.Subclusters[4].Size))

vasName := v1beta1.MakeVASName()
Expect(k8sClient.Get(ctx, vasName, vas)).Should(Succeed())
Expand All @@ -108,9 +109,14 @@ var _ = Describe("subclusterscale_reconcile", func() {

Expect(k8sClient.Get(ctx, vdbName, fetchVdb)).Should(Succeed())
Expect(len(fetchVdb.Spec.Subclusters)).Should(Equal(3))
Expect(fetchVdb.Spec.Subclusters[0].Size).Should(Equal(vdb.Spec.Subclusters[0].Size))
Expect(fetchVdb.Spec.Subclusters[1].Size).Should(Equal(vdb.Spec.Subclusters[1].Size))
Expect(fetchVdb.Spec.Subclusters[2].Size).Should(Equal(vdb.Spec.Subclusters[4].Size))

vas.Spec.TargetSize = 0
Expect(k8sClient.Update(ctx, vas)).Should(Succeed())
Expect(vasRec.Reconcile(ctx, req)).Should(Equal(ctrl.Result{}))
Expect(k8sClient.Get(ctx, vdbName, fetchVdb)).Should(Succeed())
Expect(len(fetchVdb.Spec.Subclusters)).Should(Equal(2))
Expect(fetchVdb.Spec.Subclusters[0].Size).Should(Equal(vdb.Spec.Subclusters[1].Size))
Expect(fetchVdb.Spec.Subclusters[1].Size).Should(Equal(vdb.Spec.Subclusters[4].Size))
})

It("should get rid of all subclusters if shrinking to zero is allowed", func() {
Expand Down
14 changes: 12 additions & 2 deletions pkg/controllers/vas/vdbverify_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package vas

import (
"context"
"errors"

vapi "github.com/vertica/vertica-kubernetes/api/v1"
v1beta1 "github.com/vertica/vertica-kubernetes/api/v1beta1"
Expand Down Expand Up @@ -49,12 +50,21 @@ func (s *VDBVerifyReconciler) Reconcile(ctx context.Context, _ *ctrl.Request) (c
}
vinf, vErr := s.Vdb.MakeVersionInfoCheck()
if vErr != nil {
return ctrl.Result{}, err
return ctrl.Result{}, vErr
}
if !vinf.IsEqualOrNewer(vapi.PrometheusMetricsMinVersion) {
ver, _ := s.Vdb.GetVerticaVersionStr()
s.VRec.Eventf(s.Vas, corev1.EventTypeWarning, events.PrometheusMetricsNotSupported,
"The server version %s does not support prometheus metrics", ver)
return ctrl.Result{}, errors.New("the server version does not support prometheus metrics")
}
return res, err
scSbMap := s.Vdb.GenSubclusterSandboxMap()
for i := range s.Vdb.Spec.Subclusters {
sc := &s.Vdb.Spec.Subclusters[i]
sbName := scSbMap[sc.Name]
if sbName != vapi.MainCluster {
return ctrl.Result{}, errors.New("cannot do autoscaling if there is a sandbox")
}
}
return res, nil
}
Loading

0 comments on commit 142e6c1

Please sign in to comment.