Skip to content

Commit

Permalink
Merge branch 'main' into support/move-IsOwnedByInstanceSet-into-apps
Browse files Browse the repository at this point in the history
  • Loading branch information
free6om committed Apr 23, 2024
2 parents c022f22 + 1595150 commit 1f329cd
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 91 deletions.
43 changes: 21 additions & 22 deletions controllers/apps/component_hscale_volume_populator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import (
"context"
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/factory"
Expand Down Expand Up @@ -68,8 +68,7 @@ func newDataClone(reqCtx intctrlutil.RequestCtx,
cli client.Client,
cluster *appsv1alpha1.Cluster,
component *component.SynthesizedComponent,
stsObj *appsv1.StatefulSet,
stsProto *appsv1.StatefulSet,
itsObj, itsProto *workloads.InstanceSet,
backupKey types.NamespacedName) (dataClone, error) {
if component == nil {
return nil, nil
Expand All @@ -81,8 +80,8 @@ func newDataClone(reqCtx intctrlutil.RequestCtx,
cli: cli,
cluster: cluster,
component: component,
stsObj: stsObj,
stsProto: stsProto,
itsObj: itsObj,
itsProto: itsProto,
backupKey: backupKey,
},
}, nil
Expand All @@ -94,8 +93,8 @@ func newDataClone(reqCtx intctrlutil.RequestCtx,
cli: cli,
cluster: cluster,
component: component,
stsObj: stsObj,
stsProto: stsProto,
itsObj: itsObj,
itsProto: itsProto,
backupKey: backupKey,
},
}, nil
Expand All @@ -109,8 +108,8 @@ type baseDataClone struct {
cli client.Client
cluster *appsv1alpha1.Cluster
component *component.SynthesizedComponent
stsObj *appsv1.StatefulSet
stsProto *appsv1.StatefulSet
itsObj *workloads.InstanceSet
itsProto *workloads.InstanceSet
backupKey types.NamespacedName
}

Expand Down Expand Up @@ -141,7 +140,7 @@ func (d *baseDataClone) CloneData(realDataClone dataClone) ([]client.Object, []c
status, d.cluster.Name, d.component.Name))
}
// backup's ready, then start to check restore
for i := *d.stsObj.Spec.Replicas; i < d.component.Replicas; i++ {
for i := *d.itsObj.Spec.Replicas; i < d.component.Replicas; i++ {
restoreStatus, err := realDataClone.CheckRestoreStatus(i)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -174,11 +173,11 @@ func (d *baseDataClone) isPVCExists(pvcKey types.NamespacedName) (bool, error) {
}

func (d *baseDataClone) checkAllPVCsExist() (bool, error) {
for i := *d.stsObj.Spec.Replicas; i < d.component.Replicas; i++ {
for i := *d.itsObj.Spec.Replicas; i < d.component.Replicas; i++ {
for _, vct := range d.component.VolumeClaimTemplates {
pvcKey := types.NamespacedName{
Namespace: d.stsObj.Namespace,
Name: fmt.Sprintf("%s-%s-%d", vct.Name, d.stsObj.Name, i),
Namespace: d.itsObj.Namespace,
Name: fmt.Sprintf("%s-%s-%d", vct.Name, d.itsObj.Name, i),
}
// check pvc existence
pvcExists, err := d.isPVCExists(pvcKey)
Expand Down Expand Up @@ -219,11 +218,11 @@ func (d *baseDataClone) excludeBackupVCTs() []*corev1.PersistentVolumeClaimTempl

func (d *baseDataClone) createPVCs(vcts []*corev1.PersistentVolumeClaimTemplate) ([]client.Object, error) {
objs := make([]client.Object, 0)
for i := *d.stsObj.Spec.Replicas; i < d.component.Replicas; i++ {
for i := *d.itsObj.Spec.Replicas; i < d.component.Replicas; i++ {
for _, vct := range vcts {
pvcKey := types.NamespacedName{
Namespace: d.stsObj.Namespace,
Name: fmt.Sprintf("%s-%s-%d", vct.Name, d.stsObj.Name, i),
Namespace: d.itsObj.Namespace,
Name: fmt.Sprintf("%s-%s-%d", vct.Name, d.itsObj.Name, i),
}
if exist, err := d.isPVCExists(pvcKey); err != nil {
return nil, err
Expand Down Expand Up @@ -298,7 +297,7 @@ func (d *backupDataClone) Succeed() (bool, error) {
if err != nil || !allPVCsExist {
return allPVCsExist, err
}
for i := *d.stsObj.Spec.Replicas; i < d.component.Replicas; i++ {
for i := *d.itsObj.Spec.Replicas; i < d.component.Replicas; i++ {
restoreStatus, err := d.CheckRestoreStatus(i)
if err != nil {
return false, err
Expand Down Expand Up @@ -347,7 +346,7 @@ func (d *backupDataClone) backup() ([]client.Object, error) {
if backupPolicy == nil {
return nil, intctrlutil.NewNotFound("not found any backup policy created by %s", backupPolicyTplName)
}
volumeSnapshotEnabled, err := isVolumeSnapshotEnabled(d.reqCtx.Ctx, d.cli, d.stsObj, backupVCT(d.component))
volumeSnapshotEnabled, err := isVolumeSnapshotEnabled(d.reqCtx.Ctx, d.cli, d.itsObj, backupVCT(d.component))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -446,13 +445,13 @@ func backupVCT(component *component.SynthesizedComponent) *corev1.PersistentVolu
}

func isVolumeSnapshotEnabled(ctx context.Context, cli client.Client,
sts *appsv1.StatefulSet, vct *corev1.PersistentVolumeClaimTemplate) (bool, error) {
if sts == nil || vct == nil {
its *workloads.InstanceSet, vct *corev1.PersistentVolumeClaimTemplate) (bool, error) {
if its == nil || vct == nil {
return false, nil
}
pvcKey := types.NamespacedName{
Namespace: sts.Namespace,
Name: fmt.Sprintf("%s-%s-%d", vct.Name, sts.Name, 0),
Namespace: its.Namespace,
Name: fmt.Sprintf("%s-%s-%d", vct.Name, its.Name, 0),
}
pvc := corev1.PersistentVolumeClaim{}
if err := cli.Get(ctx, pvcKey, &pvc, inDataContext4C()); err != nil {
Expand Down
9 changes: 3 additions & 6 deletions controllers/apps/transformer_component_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,14 +338,11 @@ func (r *componentStatusHandler) isScaleOutFailed() (bool, error) {
return false, nil
}

// stsObj is the underlying workload which is already running in the component.
stsObj := instanceset.ConvertInstanceSetToSTS(r.runningITS)
stsProto := instanceset.ConvertInstanceSetToSTS(r.protoITS)
backupKey := types.NamespacedName{
Namespace: stsObj.Namespace,
Name: constant.GenerateResourceNameWithScalingSuffix(stsObj.Name),
Namespace: r.runningITS.Namespace,
Name: constant.GenerateResourceNameWithScalingSuffix(r.runningITS.Name),
}
d, err := newDataClone(r.reqCtx, r.cli, r.cluster, r.synthesizeComp, stsObj, stsProto, backupKey)
d, err := newDataClone(r.reqCtx, r.cli, r.cluster, r.synthesizeComp, r.runningITS, r.protoITS, backupKey)
if err != nil {
return false, err
}
Expand Down
7 changes: 3 additions & 4 deletions controllers/apps/transformer_component_tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
appsv1beta1 "github.com/apecloud/kubeblocks/apis/apps/v1beta1"
cfgcore "github.com/apecloud/kubeblocks/pkg/configuration/core"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/instanceset"
"github.com/apecloud/kubeblocks/pkg/controller/plan"
"github.com/apecloud/kubeblocks/pkg/generics"
testapps "github.com/apecloud/kubeblocks/pkg/testutil/apps"
Expand Down Expand Up @@ -299,11 +298,11 @@ var _ = Describe("TLS self-signed cert function", func() {
Eventually(testapps.GetClusterPhase(&testCtx, clusterKey)).Should(Equal(appsv1alpha1.CreatingClusterPhase))

itsList := testk8s.ListAndCheckInstanceSet(&testCtx, clusterKey)
sts := *instanceset.ConvertInstanceSetToSTS(&itsList.Items[0])
its := itsList.Items[0]
cd := &appsv1alpha1.ClusterDefinition{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: clusterDefName, Namespace: testCtx.DefaultNamespace}, cd)).Should(Succeed())
cmName := cfgcore.GetInstanceCMName(&sts, &cd.Spec.ComponentDefs[0].ConfigSpecs[0].ComponentTemplateSpec)
cmKey := client.ObjectKey{Namespace: sts.Namespace, Name: cmName}
cmName := cfgcore.GetInstanceCMName(&its, &cd.Spec.ComponentDefs[0].ConfigSpecs[0].ComponentTemplateSpec)
cmKey := client.ObjectKey{Namespace: its.Namespace, Name: cmName}
hasTLSSettings := func() bool {
cm := &corev1.ConfigMap{}
Expect(k8sClient.Get(ctx, cmKey, cm)).Should(Succeed())
Expand Down
59 changes: 28 additions & 31 deletions controllers/apps/transformer_component_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/spf13/viper"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -43,7 +42,6 @@ import (
"github.com/apecloud/kubeblocks/pkg/controller/configuration"
"github.com/apecloud/kubeblocks/pkg/controller/factory"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/instanceset"
"github.com/apecloud/kubeblocks/pkg/controller/model"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
"github.com/apecloud/kubeblocks/pkg/generics"
Expand Down Expand Up @@ -398,26 +396,26 @@ func (r *componentWorkloadOps) expandVolume() error {

// horizontalScale handles workload horizontal scale
func (r *componentWorkloadOps) horizontalScale() error {
sts := instanceset.ConvertInstanceSetToSTS(r.runningITS)
if sts.Status.ReadyReplicas == r.synthesizeComp.Replicas {
its := r.runningITS
if its.Status.ReadyReplicas == r.synthesizeComp.Replicas {
return nil
}
ret := r.horizontalScaling(r.synthesizeComp, sts)
ret := r.horizontalScaling(r.synthesizeComp, its)
if ret == 0 {
if err := r.postScaleIn(); err != nil {
return err
}
if err := r.postScaleOut(sts); err != nil {
if err := r.postScaleOut(its); err != nil {
return err
}
return nil
}
if ret < 0 {
if err := r.scaleIn(sts); err != nil {
if err := r.scaleIn(its); err != nil {
return err
}
} else {
if err := r.scaleOut(sts); err != nil {
if err := r.scaleOut(its); err != nil {
return err
}
}
Expand All @@ -436,23 +434,23 @@ func (r *componentWorkloadOps) horizontalScale() error {
}

// < 0 for scale in, > 0 for scale out, and == 0 for nothing
func (r *componentWorkloadOps) horizontalScaling(synthesizeComp *component.SynthesizedComponent, stsObj *apps.StatefulSet) int {
return int(synthesizeComp.Replicas - *stsObj.Spec.Replicas)
func (r *componentWorkloadOps) horizontalScaling(synthesizeComp *component.SynthesizedComponent, itsObj *workloads.InstanceSet) int {
return int(synthesizeComp.Replicas - *itsObj.Spec.Replicas)
}

func (r *componentWorkloadOps) postScaleIn() error {
return nil
}

func (r *componentWorkloadOps) postScaleOut(stsObj *apps.StatefulSet) error {
func (r *componentWorkloadOps) postScaleOut(itsObj *workloads.InstanceSet) error {
var (
snapshotKey = types.NamespacedName{
Namespace: stsObj.Namespace,
Name: constant.GenerateResourceNameWithScalingSuffix(stsObj.Name),
Namespace: itsObj.Namespace,
Name: constant.GenerateResourceNameWithScalingSuffix(itsObj.Name),
}
)

d, err := newDataClone(r.reqCtx, r.cli, r.cluster, r.synthesizeComp, stsObj, stsObj, snapshotKey)
d, err := newDataClone(r.reqCtx, r.cli, r.cluster, r.synthesizeComp, itsObj, itsObj, snapshotKey)
if err != nil {
return err
}
Expand All @@ -472,7 +470,7 @@ func (r *componentWorkloadOps) postScaleOut(stsObj *apps.StatefulSet) error {
return nil
}

func (r *componentWorkloadOps) scaleIn(stsObj *apps.StatefulSet) error {
func (r *componentWorkloadOps) scaleIn(itsObj *workloads.InstanceSet) error {
// if scale in to 0, do not delete pvcs
if r.synthesizeComp.Replicas == 0 {
r.reqCtx.Log.Info("scale in to 0, keep all PVCs")
Expand All @@ -484,25 +482,24 @@ func (r *componentWorkloadOps) scaleIn(stsObj *apps.StatefulSet) error {
r.reqCtx.Log.Info(fmt.Sprintf("leave member at scaling-in error, retry later: %s", err.Error()))
return err
}
return r.deletePVCs4ScaleIn(stsObj)
return r.deletePVCs4ScaleIn(itsObj)
}

func (r *componentWorkloadOps) scaleOut(stsObj *apps.StatefulSet) error {
func (r *componentWorkloadOps) scaleOut(itsObj *workloads.InstanceSet) error {
var (
backupKey = types.NamespacedName{
Namespace: stsObj.Namespace,
Name: constant.GenerateResourceNameWithScalingSuffix(stsObj.Name),
Namespace: itsObj.Namespace,
Name: constant.GenerateResourceNameWithScalingSuffix(itsObj.Name),
}
)

// sts's replicas=0 means it's starting not scaling, skip all the scaling work.
if *stsObj.Spec.Replicas == 0 {
// its's replicas=0 means it's starting not scaling, skip all the scaling work.
if *itsObj.Spec.Replicas == 0 {
return nil
}
graphCli := model.NewGraphClient(r.cli)
graphCli.Noop(r.dag, r.protoITS)
stsProto := instanceset.ConvertInstanceSetToSTS(r.protoITS)
d, err := newDataClone(r.reqCtx, r.cli, r.cluster, r.synthesizeComp, stsObj, stsProto, backupKey)
d, err := newDataClone(r.reqCtx, r.cli, r.cluster, r.synthesizeComp, itsObj, r.protoITS, backupKey)
if err != nil {
return err
}
Expand All @@ -518,7 +515,7 @@ func (r *componentWorkloadOps) scaleOut(stsObj *apps.StatefulSet) error {
if succeed {
// pvcs are ready, ITS.replicas should be updated
graphCli.Update(r.dag, nil, r.protoITS)
return r.postScaleOut(stsObj)
return r.postScaleOut(itsObj)
} else {
graphCli.Noop(r.dag, r.protoITS)
// update objs will trigger reconcile, no need to requeue error
Expand Down Expand Up @@ -642,20 +639,20 @@ func (r *componentWorkloadOps) leaveMember4ScaleIn() error {
return err // TODO: use requeue-after
}

func (r *componentWorkloadOps) deletePVCs4ScaleIn(stsObj *apps.StatefulSet) error {
func (r *componentWorkloadOps) deletePVCs4ScaleIn(itsObj *workloads.InstanceSet) error {
graphCli := model.NewGraphClient(r.cli)
for i := r.synthesizeComp.Replicas; i < *stsObj.Spec.Replicas; i++ {
for _, vct := range stsObj.Spec.VolumeClaimTemplates {
for i := r.synthesizeComp.Replicas; i < *itsObj.Spec.Replicas; i++ {
for _, vct := range itsObj.Spec.VolumeClaimTemplates {
pvcKey := types.NamespacedName{
Namespace: stsObj.Namespace,
Name: fmt.Sprintf("%s-%s-%d", vct.Name, stsObj.Name, i),
Namespace: itsObj.Namespace,
Name: fmt.Sprintf("%s-%s-%d", vct.Name, itsObj.Name, i),
}
pvc := corev1.PersistentVolumeClaim{}
if err := r.cli.Get(r.reqCtx.Ctx, pvcKey, &pvc, inDataContext4C()); err != nil {
return err
}
// Since there are no order guarantee between updating STS and deleting PVCs, if there is any error occurred
// after updating STS and before deleting PVCs, the PVCs intended to scale-in will be leaked.
// Since there are no order guarantee between updating ITS and deleting PVCs, if there is any error occurred
// after updating ITS and before deleting PVCs, the PVCs intended to scale-in will be leaked.
// For simplicity, the updating dependency is added between them to guarantee that the PVCs to scale-in
// will be deleted or the scaling-in operation will be failed.
graphCli.Delete(r.dag, &pvc, inDataContext4G())
Expand Down
31 changes: 3 additions & 28 deletions pkg/controller/instanceset/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import (
"fmt"
"strings"

workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/builder"
"golang.org/x/exp/slices"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"

workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
)

const (
Expand Down Expand Up @@ -174,30 +173,6 @@ func ParseAnnotationsOfScope(scope AnnotationScope, scopedAnnotations map[string
return annotations
}

// ConvertInstanceSetToSTS converts a rsm to sts
// TODO(free6om): refactor this func out
func ConvertInstanceSetToSTS(rsm *workloads.InstanceSet) *appsv1.StatefulSet {
if rsm == nil {
return nil
}
sts := builder.NewStatefulSetBuilder(rsm.Namespace, rsm.Name).
SetUID(rsm.UID).
AddLabelsInMap(rsm.Labels).
AddAnnotationsInMap(rsm.Annotations).
SetReplicas(*rsm.Spec.Replicas).
SetSelector(rsm.Spec.Selector).
SetServiceName(rsm.Spec.ServiceName).
SetTemplate(rsm.Spec.Template).
SetVolumeClaimTemplates(rsm.Spec.VolumeClaimTemplates...).
SetPodManagementPolicy(rsm.Spec.PodManagementPolicy).
SetUpdateStrategy(rsm.Spec.UpdateStrategy).
GetObject()
sts.Generation = rsm.Generation
sts.Status = rsm.Status.StatefulSetStatus
sts.Status.ObservedGeneration = rsm.Status.ObservedGeneration
return sts
}

func GetEnvConfigMapName(rsmName string) string {
return fmt.Sprintf("%s-its-env", rsmName)
}
Expand Down

0 comments on commit 1f329cd

Please sign in to comment.