From 15951509c7377eb346024986964344ad255208fa Mon Sep 17 00:00:00 2001 From: free6om Date: Tue, 23 Apr 2024 18:06:46 +0800 Subject: [PATCH] chore: refactor out ConvertInstanceSetToSTS (#7136) --- .../apps/component_hscale_volume_populator.go | 43 +++++++------- .../apps/transformer_component_status.go | 9 +-- .../apps/transformer_component_tls_test.go | 7 +-- .../apps/transformer_component_workload.go | 59 +++++++++---------- pkg/controller/instanceset/utils.go | 26 -------- 5 files changed, 55 insertions(+), 89 deletions(-) diff --git a/controllers/apps/component_hscale_volume_populator.go b/controllers/apps/component_hscale_volume_populator.go index 3ff8a829721..40f5a9af775 100644 --- a/controllers/apps/component_hscale_volume_populator.go +++ b/controllers/apps/component_hscale_volume_populator.go @@ -23,7 +23,6 @@ import ( "context" "fmt" - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" @@ -31,6 +30,7 @@ import ( appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1" + workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1" "github.com/apecloud/kubeblocks/pkg/constant" "github.com/apecloud/kubeblocks/pkg/controller/component" "github.com/apecloud/kubeblocks/pkg/controller/factory" @@ -68,8 +68,7 @@ func newDataClone(reqCtx intctrlutil.RequestCtx, cli client.Client, cluster *appsv1alpha1.Cluster, component *component.SynthesizedComponent, - stsObj *appsv1.StatefulSet, - stsProto *appsv1.StatefulSet, + itsObj, itsProto *workloads.InstanceSet, backupKey types.NamespacedName) (dataClone, error) { if component == nil { return nil, nil @@ -81,8 +80,8 @@ func newDataClone(reqCtx intctrlutil.RequestCtx, cli: cli, cluster: cluster, component: component, - stsObj: stsObj, - stsProto: stsProto, + itsObj: itsObj, + itsProto: itsProto, backupKey: backupKey, }, }, nil @@ -94,8 +93,8 @@ func newDataClone(reqCtx intctrlutil.RequestCtx, cli: cli, cluster: cluster, component: component, - stsObj: stsObj, - stsProto: stsProto, + itsObj: itsObj, + itsProto: itsProto, backupKey: backupKey, }, }, nil @@ -109,8 +108,8 @@ type baseDataClone struct { cli client.Client cluster *appsv1alpha1.Cluster component *component.SynthesizedComponent - stsObj *appsv1.StatefulSet - stsProto *appsv1.StatefulSet + itsObj *workloads.InstanceSet + itsProto *workloads.InstanceSet backupKey types.NamespacedName } @@ -141,7 +140,7 @@ func (d *baseDataClone) CloneData(realDataClone dataClone) ([]client.Object, []c status, d.cluster.Name, d.component.Name)) } // backup's ready, then start to check restore - for i := *d.stsObj.Spec.Replicas; i < d.component.Replicas; i++ { + for i := *d.itsObj.Spec.Replicas; i < d.component.Replicas; i++ { restoreStatus, err := realDataClone.CheckRestoreStatus(i) if err != nil { return nil, nil, err @@ -174,11 +173,11 @@ func (d *baseDataClone) isPVCExists(pvcKey types.NamespacedName) (bool, error) { } func (d *baseDataClone) checkAllPVCsExist() (bool, error) { - for i := *d.stsObj.Spec.Replicas; i < d.component.Replicas; i++ { + for i := *d.itsObj.Spec.Replicas; i < d.component.Replicas; i++ { for _, vct := range d.component.VolumeClaimTemplates { pvcKey := types.NamespacedName{ - Namespace: d.stsObj.Namespace, - Name: fmt.Sprintf("%s-%s-%d", vct.Name, d.stsObj.Name, i), + Namespace: d.itsObj.Namespace, + Name: fmt.Sprintf("%s-%s-%d", vct.Name, d.itsObj.Name, i), } // check pvc existence pvcExists, err := d.isPVCExists(pvcKey) @@ -219,11 +218,11 @@ func (d *baseDataClone) excludeBackupVCTs() []*corev1.PersistentVolumeClaimTempl func (d *baseDataClone) createPVCs(vcts []*corev1.PersistentVolumeClaimTemplate) ([]client.Object, error) { objs := make([]client.Object, 0) - for i := *d.stsObj.Spec.Replicas; i < d.component.Replicas; i++ { + for i := *d.itsObj.Spec.Replicas; i < d.component.Replicas; i++ { for _, vct := range vcts { pvcKey := types.NamespacedName{ - Namespace: d.stsObj.Namespace, - Name: fmt.Sprintf("%s-%s-%d", vct.Name, d.stsObj.Name, i), + Namespace: d.itsObj.Namespace, + Name: fmt.Sprintf("%s-%s-%d", vct.Name, d.itsObj.Name, i), } if exist, err := d.isPVCExists(pvcKey); err != nil { return nil, err @@ -298,7 +297,7 @@ func (d *backupDataClone) Succeed() (bool, error) { if err != nil || !allPVCsExist { return allPVCsExist, err } - for i := *d.stsObj.Spec.Replicas; i < d.component.Replicas; i++ { + for i := *d.itsObj.Spec.Replicas; i < d.component.Replicas; i++ { restoreStatus, err := d.CheckRestoreStatus(i) if err != nil { return false, err @@ -347,7 +346,7 @@ func (d *backupDataClone) backup() ([]client.Object, error) { if backupPolicy == nil { return nil, intctrlutil.NewNotFound("not found any backup policy created by %s", backupPolicyTplName) } - volumeSnapshotEnabled, err := isVolumeSnapshotEnabled(d.reqCtx.Ctx, d.cli, d.stsObj, backupVCT(d.component)) + volumeSnapshotEnabled, err := isVolumeSnapshotEnabled(d.reqCtx.Ctx, d.cli, d.itsObj, backupVCT(d.component)) if err != nil { return nil, err } @@ -446,13 +445,13 @@ func backupVCT(component *component.SynthesizedComponent) *corev1.PersistentVolu } func isVolumeSnapshotEnabled(ctx context.Context, cli client.Client, - sts *appsv1.StatefulSet, vct *corev1.PersistentVolumeClaimTemplate) (bool, error) { - if sts == nil || vct == nil { + its *workloads.InstanceSet, vct *corev1.PersistentVolumeClaimTemplate) (bool, error) { + if its == nil || vct == nil { return false, nil } pvcKey := types.NamespacedName{ - Namespace: sts.Namespace, - Name: fmt.Sprintf("%s-%s-%d", vct.Name, sts.Name, 0), + Namespace: its.Namespace, + Name: fmt.Sprintf("%s-%s-%d", vct.Name, its.Name, 0), } pvc := corev1.PersistentVolumeClaim{} if err := cli.Get(ctx, pvcKey, &pvc, inDataContext4C()); err != nil { diff --git a/controllers/apps/transformer_component_status.go b/controllers/apps/transformer_component_status.go index 97a30f1c3aa..11b09aadbf0 100644 --- a/controllers/apps/transformer_component_status.go +++ b/controllers/apps/transformer_component_status.go @@ -338,14 +338,11 @@ func (r *componentStatusHandler) isScaleOutFailed() (bool, error) { return false, nil } - // stsObj is the underlying workload which is already running in the component. - stsObj := instanceset.ConvertInstanceSetToSTS(r.runningITS) - stsProto := instanceset.ConvertInstanceSetToSTS(r.protoITS) backupKey := types.NamespacedName{ - Namespace: stsObj.Namespace, - Name: constant.GenerateResourceNameWithScalingSuffix(stsObj.Name), + Namespace: r.runningITS.Namespace, + Name: constant.GenerateResourceNameWithScalingSuffix(r.runningITS.Name), } - d, err := newDataClone(r.reqCtx, r.cli, r.cluster, r.synthesizeComp, stsObj, stsProto, backupKey) + d, err := newDataClone(r.reqCtx, r.cli, r.cluster, r.synthesizeComp, r.runningITS, r.protoITS, backupKey) if err != nil { return false, err } diff --git a/controllers/apps/transformer_component_tls_test.go b/controllers/apps/transformer_component_tls_test.go index d0558469690..beace62b874 100644 --- a/controllers/apps/transformer_component_tls_test.go +++ b/controllers/apps/transformer_component_tls_test.go @@ -35,7 +35,6 @@ import ( appsv1beta1 "github.com/apecloud/kubeblocks/apis/apps/v1beta1" cfgcore "github.com/apecloud/kubeblocks/pkg/configuration/core" "github.com/apecloud/kubeblocks/pkg/constant" - "github.com/apecloud/kubeblocks/pkg/controller/instanceset" "github.com/apecloud/kubeblocks/pkg/controller/plan" "github.com/apecloud/kubeblocks/pkg/generics" testapps "github.com/apecloud/kubeblocks/pkg/testutil/apps" @@ -299,11 +298,11 @@ var _ = Describe("TLS self-signed cert function", func() { Eventually(testapps.GetClusterPhase(&testCtx, clusterKey)).Should(Equal(appsv1alpha1.CreatingClusterPhase)) itsList := testk8s.ListAndCheckInstanceSet(&testCtx, clusterKey) - sts := *instanceset.ConvertInstanceSetToSTS(&itsList.Items[0]) + its := itsList.Items[0] cd := &appsv1alpha1.ClusterDefinition{} Expect(k8sClient.Get(ctx, types.NamespacedName{Name: clusterDefName, Namespace: testCtx.DefaultNamespace}, cd)).Should(Succeed()) - cmName := cfgcore.GetInstanceCMName(&sts, &cd.Spec.ComponentDefs[0].ConfigSpecs[0].ComponentTemplateSpec) - cmKey := client.ObjectKey{Namespace: sts.Namespace, Name: cmName} + cmName := cfgcore.GetInstanceCMName(&its, &cd.Spec.ComponentDefs[0].ConfigSpecs[0].ComponentTemplateSpec) + cmKey := client.ObjectKey{Namespace: its.Namespace, Name: cmName} hasTLSSettings := func() bool { cm := &corev1.ConfigMap{} Expect(k8sClient.Get(ctx, cmKey, cm)).Should(Succeed()) diff --git a/controllers/apps/transformer_component_workload.go b/controllers/apps/transformer_component_workload.go index c1228398ba9..95cd554af36 100644 --- a/controllers/apps/transformer_component_workload.go +++ b/controllers/apps/transformer_component_workload.go @@ -28,7 +28,6 @@ import ( "github.com/spf13/viper" "golang.org/x/exp/maps" "golang.org/x/exp/slices" - apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" @@ -43,7 +42,6 @@ import ( "github.com/apecloud/kubeblocks/pkg/controller/configuration" "github.com/apecloud/kubeblocks/pkg/controller/factory" "github.com/apecloud/kubeblocks/pkg/controller/graph" - "github.com/apecloud/kubeblocks/pkg/controller/instanceset" "github.com/apecloud/kubeblocks/pkg/controller/model" intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" "github.com/apecloud/kubeblocks/pkg/generics" @@ -398,26 +396,26 @@ func (r *componentWorkloadOps) expandVolume() error { // horizontalScale handles workload horizontal scale func (r *componentWorkloadOps) horizontalScale() error { - sts := instanceset.ConvertInstanceSetToSTS(r.runningITS) - if sts.Status.ReadyReplicas == r.synthesizeComp.Replicas { + its := r.runningITS + if its.Status.ReadyReplicas == r.synthesizeComp.Replicas { return nil } - ret := r.horizontalScaling(r.synthesizeComp, sts) + ret := r.horizontalScaling(r.synthesizeComp, its) if ret == 0 { if err := r.postScaleIn(); err != nil { return err } - if err := r.postScaleOut(sts); err != nil { + if err := r.postScaleOut(its); err != nil { return err } return nil } if ret < 0 { - if err := r.scaleIn(sts); err != nil { + if err := r.scaleIn(its); err != nil { return err } } else { - if err := r.scaleOut(sts); err != nil { + if err := r.scaleOut(its); err != nil { return err } } @@ -436,23 +434,23 @@ func (r *componentWorkloadOps) horizontalScale() error { } // < 0 for scale in, > 0 for scale out, and == 0 for nothing -func (r *componentWorkloadOps) horizontalScaling(synthesizeComp *component.SynthesizedComponent, stsObj *apps.StatefulSet) int { - return int(synthesizeComp.Replicas - *stsObj.Spec.Replicas) +func (r *componentWorkloadOps) horizontalScaling(synthesizeComp *component.SynthesizedComponent, itsObj *workloads.InstanceSet) int { + return int(synthesizeComp.Replicas - *itsObj.Spec.Replicas) } func (r *componentWorkloadOps) postScaleIn() error { return nil } -func (r *componentWorkloadOps) postScaleOut(stsObj *apps.StatefulSet) error { +func (r *componentWorkloadOps) postScaleOut(itsObj *workloads.InstanceSet) error { var ( snapshotKey = types.NamespacedName{ - Namespace: stsObj.Namespace, - Name: constant.GenerateResourceNameWithScalingSuffix(stsObj.Name), + Namespace: itsObj.Namespace, + Name: constant.GenerateResourceNameWithScalingSuffix(itsObj.Name), } ) - d, err := newDataClone(r.reqCtx, r.cli, r.cluster, r.synthesizeComp, stsObj, stsObj, snapshotKey) + d, err := newDataClone(r.reqCtx, r.cli, r.cluster, r.synthesizeComp, itsObj, itsObj, snapshotKey) if err != nil { return err } @@ -472,7 +470,7 @@ func (r *componentWorkloadOps) postScaleOut(stsObj *apps.StatefulSet) error { return nil } -func (r *componentWorkloadOps) scaleIn(stsObj *apps.StatefulSet) error { +func (r *componentWorkloadOps) scaleIn(itsObj *workloads.InstanceSet) error { // if scale in to 0, do not delete pvcs if r.synthesizeComp.Replicas == 0 { r.reqCtx.Log.Info("scale in to 0, keep all PVCs") @@ -484,25 +482,24 @@ func (r *componentWorkloadOps) scaleIn(stsObj *apps.StatefulSet) error { r.reqCtx.Log.Info(fmt.Sprintf("leave member at scaling-in error, retry later: %s", err.Error())) return err } - return r.deletePVCs4ScaleIn(stsObj) + return r.deletePVCs4ScaleIn(itsObj) } -func (r *componentWorkloadOps) scaleOut(stsObj *apps.StatefulSet) error { +func (r *componentWorkloadOps) scaleOut(itsObj *workloads.InstanceSet) error { var ( backupKey = types.NamespacedName{ - Namespace: stsObj.Namespace, - Name: constant.GenerateResourceNameWithScalingSuffix(stsObj.Name), + Namespace: itsObj.Namespace, + Name: constant.GenerateResourceNameWithScalingSuffix(itsObj.Name), } ) - // sts's replicas=0 means it's starting not scaling, skip all the scaling work. - if *stsObj.Spec.Replicas == 0 { + // its's replicas=0 means it's starting not scaling, skip all the scaling work. + if *itsObj.Spec.Replicas == 0 { return nil } graphCli := model.NewGraphClient(r.cli) graphCli.Noop(r.dag, r.protoITS) - stsProto := instanceset.ConvertInstanceSetToSTS(r.protoITS) - d, err := newDataClone(r.reqCtx, r.cli, r.cluster, r.synthesizeComp, stsObj, stsProto, backupKey) + d, err := newDataClone(r.reqCtx, r.cli, r.cluster, r.synthesizeComp, itsObj, r.protoITS, backupKey) if err != nil { return err } @@ -518,7 +515,7 @@ func (r *componentWorkloadOps) scaleOut(stsObj *apps.StatefulSet) error { if succeed { // pvcs are ready, ITS.replicas should be updated graphCli.Update(r.dag, nil, r.protoITS) - return r.postScaleOut(stsObj) + return r.postScaleOut(itsObj) } else { graphCli.Noop(r.dag, r.protoITS) // update objs will trigger reconcile, no need to requeue error @@ -642,20 +639,20 @@ func (r *componentWorkloadOps) leaveMember4ScaleIn() error { return err // TODO: use requeue-after } -func (r *componentWorkloadOps) deletePVCs4ScaleIn(stsObj *apps.StatefulSet) error { +func (r *componentWorkloadOps) deletePVCs4ScaleIn(itsObj *workloads.InstanceSet) error { graphCli := model.NewGraphClient(r.cli) - for i := r.synthesizeComp.Replicas; i < *stsObj.Spec.Replicas; i++ { - for _, vct := range stsObj.Spec.VolumeClaimTemplates { + for i := r.synthesizeComp.Replicas; i < *itsObj.Spec.Replicas; i++ { + for _, vct := range itsObj.Spec.VolumeClaimTemplates { pvcKey := types.NamespacedName{ - Namespace: stsObj.Namespace, - Name: fmt.Sprintf("%s-%s-%d", vct.Name, stsObj.Name, i), + Namespace: itsObj.Namespace, + Name: fmt.Sprintf("%s-%s-%d", vct.Name, itsObj.Name, i), } pvc := corev1.PersistentVolumeClaim{} if err := r.cli.Get(r.reqCtx.Ctx, pvcKey, &pvc, inDataContext4C()); err != nil { return err } - // Since there are no order guarantee between updating STS and deleting PVCs, if there is any error occurred - // after updating STS and before deleting PVCs, the PVCs intended to scale-in will be leaked. + // Since there are no order guarantee between updating ITS and deleting PVCs, if there is any error occurred + // after updating ITS and before deleting PVCs, the PVCs intended to scale-in will be leaked. // For simplicity, the updating dependency is added between them to guarantee that the PVCs to scale-in // will be deleted or the scaling-in operation will be failed. graphCli.Delete(r.dag, &pvc, inDataContext4G()) diff --git a/pkg/controller/instanceset/utils.go b/pkg/controller/instanceset/utils.go index 47267b6d0fd..baca18a62cb 100644 --- a/pkg/controller/instanceset/utils.go +++ b/pkg/controller/instanceset/utils.go @@ -24,13 +24,11 @@ import ( "strings" "golang.org/x/exp/slices" - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1" "github.com/apecloud/kubeblocks/pkg/constant" - "github.com/apecloud/kubeblocks/pkg/controller/builder" ) const ( @@ -176,30 +174,6 @@ func ParseAnnotationsOfScope(scope AnnotationScope, scopedAnnotations map[string return annotations } -// ConvertInstanceSetToSTS converts a rsm to sts -// TODO(free6om): refactor this func out -func ConvertInstanceSetToSTS(rsm *workloads.InstanceSet) *appsv1.StatefulSet { - if rsm == nil { - return nil - } - sts := builder.NewStatefulSetBuilder(rsm.Namespace, rsm.Name). - SetUID(rsm.UID). - AddLabelsInMap(rsm.Labels). - AddAnnotationsInMap(rsm.Annotations). - SetReplicas(*rsm.Spec.Replicas). - SetSelector(rsm.Spec.Selector). - SetServiceName(rsm.Spec.ServiceName). - SetTemplate(rsm.Spec.Template). - SetVolumeClaimTemplates(rsm.Spec.VolumeClaimTemplates...). - SetPodManagementPolicy(rsm.Spec.PodManagementPolicy). - SetUpdateStrategy(rsm.Spec.UpdateStrategy). - GetObject() - sts.Generation = rsm.Generation - sts.Status = rsm.Status.StatefulSetStatus - sts.Status.ObservedGeneration = rsm.Status.ObservedGeneration - return sts -} - func GetEnvConfigMapName(rsmName string) string { return fmt.Sprintf("%s-its-env", rsmName) }