Skip to content

Commit

Permalink
chore: support to abort last running opsRequest with same kind when u…
Browse files Browse the repository at this point in the history
…sing 'force' flag (#7165)
  • Loading branch information
wangyelei authored Apr 25, 2024
1 parent 6f6939d commit 5b2c44d
Show file tree
Hide file tree
Showing 20 changed files with 155 additions and 5,310 deletions.
12 changes: 12 additions & 0 deletions apis/apps/v1alpha1/opsrequest_conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
ConditionTypeValidated = "Validated"
ConditionTypeSucceed = "Succeed"
ConditionTypeFailed = "Failed"
ConditionTypeAborted = "Aborted"
ConditionTypeRestarting = "Restarting"
ConditionTypeVerticalScaling = "VerticalScaling"
ConditionTypeHorizontalScaling = "HorizontalScaling"
Expand Down Expand Up @@ -109,6 +110,17 @@ func NewCancelFailedCondition(ops *OpsRequest, err error) *metav1.Condition {
}
}

// NewAbortedCondition creates a condition for aborted phase.
func NewAbortedCondition(ops *OpsRequest) metav1.Condition {
return metav1.Condition{
Type: ConditionTypeAborted,
Status: metav1.ConditionTrue,
Reason: ConditionTypeAborted,
LastTransitionTime: metav1.Now(),
Message: fmt.Sprintf(`Aborted as a result of the latest opsRequest "%s"`, ops.Name),
}
}

// NewCancelSucceedCondition creates a condition for canceling successfully.
func NewCancelSucceedCondition(opsName string) *metav1.Condition {
return &metav1.Condition{
Expand Down
5 changes: 0 additions & 5 deletions apis/apps/v1alpha1/opsrequest_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1112,11 +1112,6 @@ type OpsRequestComponentStatus struct {
// +optional
WorkloadType WorkloadType `json:"workloadType,omitempty"`

// Describes the configuration covered by the latest OpsRequest of the same kind.
// when reconciling, this information will be used as a benchmark rather than the 'spec', such as 'Spec.HorizontalScaling'.
// +optional
OverrideBy *OverrideBy `json:"overrideBy,omitempty"`

// Provides an explanation for the Component being in its current state.
// +kubebuilder:validation:MaxLength=1024
// +optional
Expand Down
12 changes: 10 additions & 2 deletions apis/apps/v1alpha1/opsrequest_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,18 @@ func (r *OpsRequest) ValidateDelete() (admission.Warnings, error) {

// IsComplete checks if opsRequest has been completed.
func (r *OpsRequest) IsComplete(phases ...OpsPhase) bool {
completedPhase := func(phase OpsPhase) bool {
return slices.Contains([]OpsPhase{OpsCancelledPhase, OpsSucceedPhase, OpsAbortedPhase, OpsFailedPhase}, phase)
}
if len(phases) == 0 {
return slices.Contains([]OpsPhase{OpsCancelledPhase, OpsSucceedPhase, OpsFailedPhase}, r.Status.Phase)
return completedPhase(r.Status.Phase)
}
for i := range phases {
if !completedPhase(phases[i]) {
return false
}
}
return slices.Contains([]OpsPhase{OpsCancelledPhase, OpsSucceedPhase, OpsFailedPhase}, phases[0])
return true
}

// Force checks if the current opsRequest can be forcibly executed
Expand Down
3 changes: 2 additions & 1 deletion apis/apps/v1alpha1/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ const (

// OpsPhase defines opsRequest phase.
// +enum
// +kubebuilder:validation:Enum={Pending,Creating,Running,Cancelling,Cancelled,Failed,Succeed}
// +kubebuilder:validation:Enum={Pending,Creating,Running,Cancelling,Cancelled,Aborted,Failed,Succeed}
type OpsPhase string

const (
Expand All @@ -331,6 +331,7 @@ const (
OpsSucceedPhase OpsPhase = "Succeed"
OpsCancelledPhase OpsPhase = "Cancelled"
OpsFailedPhase OpsPhase = "Failed"
OpsAbortedPhase OpsPhase = "Aborted"
)

// PodSelectionPolicy pod selection strategy.
Expand Down
5 changes: 0 additions & 5 deletions apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2,526 changes: 1 addition & 2,525 deletions config/crd/bases/apps.kubeblocks.io_opsrequests.yaml

Large diffs are not rendered by default.

33 changes: 23 additions & 10 deletions controllers/apps/operations/horizontal_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package operations

import (
"slices"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -55,6 +56,10 @@ func (hs horizontalScalingOpsHandler) ActionStartedCondition(reqCtx intctrlutil.

// Action modifies Cluster.spec.components[*].replicas from the opsRequest
func (hs horizontalScalingOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) error {
if slices.Contains([]appsv1alpha1.ClusterPhase{appsv1alpha1.StoppedClusterPhase,
appsv1alpha1.StoppingClusterPhase}, opsRes.Cluster.Status.Phase) {
return intctrlutil.NewFatalError("please start the cluster before scaling the cluster horizontally")
}
applyHorizontalScaling := func(compSpec *appsv1alpha1.ClusterComponentSpec, obj ComponentOpsInteface) {
horizontalScaling := obj.(appsv1alpha1.HorizontalScaling)
instances := buildInstances(*compSpec, horizontalScaling)
Expand All @@ -65,6 +70,23 @@ func (hs horizontalScalingOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli
compSpec.Replicas = horizontalScaling.Replicas
}
compOpsSet := newComponentOpsHelper(opsRes.OpsRequest.Spec.HorizontalScalingList)
// abort earlier running vertical scaling opsRequest.
if err := abortEarlierOpsRequestWithSameKind(reqCtx, cli, opsRes, []appsv1alpha1.OpsType{appsv1alpha1.HorizontalScalingType, appsv1alpha1.StartType},
func(earlierOps *appsv1alpha1.OpsRequest) bool {
if slices.Contains([]appsv1alpha1.OpsType{appsv1alpha1.StartType, appsv1alpha1.StopType}, earlierOps.Spec.Type) {
return true
}
for _, v := range earlierOps.Spec.HorizontalScalingList {
// abort the earlierOps if exists the same component.
if _, ok := compOpsSet.componentOpsSet[v.ComponentName]; ok {
return true
}
}
return false
}); err != nil {
return err
}

compOpsSet.updateClusterComponentsAndShardings(opsRes.Cluster, applyHorizontalScaling)
return cli.Update(reqCtx.Ctx, opsRes.Cluster)
}
Expand Down Expand Up @@ -102,7 +124,7 @@ func (hs horizontalScalingOpsHandler) ReconcileAction(reqCtx intctrlutil.Request
return handleComponentProgressForScalingReplicas(reqCtx, cli, opsRes, pgRes, compStatus, hs.getExpectReplicas)
}
compOpsHelper := newComponentOpsHelper(opsRes.OpsRequest.Spec.HorizontalScalingList)
return compOpsHelper.reconcileActionWithComponentOps(reqCtx, cli, opsRes, "", syncOverrideByOpsForScaleReplicas, handleComponentProgress)
return compOpsHelper.reconcileActionWithComponentOps(reqCtx, cli, opsRes, "", handleComponentProgress)
}

// SaveLastConfiguration records last configuration to the OpsRequest.status.lastConfiguration
Expand Down Expand Up @@ -135,10 +157,6 @@ func (hs horizontalScalingOpsHandler) SaveLastConfiguration(reqCtx intctrlutil.R
}

func (hs horizontalScalingOpsHandler) getExpectReplicas(opsRequest *appsv1alpha1.OpsRequest, compOps ComponentOpsInteface) *int32 {
compStatus := opsRequest.Status.Components[compOps.GetComponentName()]
if compStatus.OverrideBy != nil {
return compStatus.OverrideBy.Replicas
}
for _, v := range opsRequest.Spec.HorizontalScalingList {
if v.ComponentName == compOps.GetComponentName() {
return &v.Replicas
Expand All @@ -149,11 +167,6 @@ func (hs horizontalScalingOpsHandler) getExpectReplicas(opsRequest *appsv1alpha1

// Cancel this function defines the cancel horizontalScaling action.
func (hs horizontalScalingOpsHandler) Cancel(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) error {
for _, v := range opsRes.OpsRequest.Status.Components {
if v.OverrideBy != nil && v.OverrideBy.OpsName != "" {
return intctrlutil.NewErrorf(intctrlutil.ErrorIgnoreCancel, `can not cancel the opsRequest due to another opsRequest "%s" is running`, v.OverrideBy.OpsName)
}
}
compOpsHelper := newComponentOpsHelper(opsRes.OpsRequest.Spec.VerticalScalingList)
return compOpsHelper.cancelComponentOps(reqCtx.Ctx, cli, opsRes, func(lastConfig *appsv1alpha1.LastComponentConfiguration, comp *appsv1alpha1.ClusterComponentSpec) {
if lastConfig.Replicas == nil {
Expand Down
14 changes: 3 additions & 11 deletions controllers/apps/operations/horizontal_scaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,22 +227,14 @@ var _ = Describe("HorizontalScaling OpsRequest", func() {
ops.Status.Phase = appsv1alpha1.OpsRunningPhase
})).Should(Succeed())

By("expect these opsRequest should not in the queue of the clusters")
By("the first operations request is expected to be aborted.")
Eventually(testapps.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(firstOps))).Should(Equal(appsv1alpha1.OpsAbortedPhase))
opsRequestSlice, _ := opsutil.GetOpsRequestSliceFromCluster(opsRes.Cluster)
Expect(len(opsRequestSlice)).Should(Equal(2))
Expect(len(opsRequestSlice)).Should(Equal(1))
for _, v := range opsRequestSlice {
Expect(v.InQueue).Should(BeFalse())
}

By("reconcile the firstOpsRequest again")
opsRes.OpsRequest = firstOps
_, err = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())

By("expect the firstOpsRequest should be overwritten for the resources")
override := opsRes.OpsRequest.Status.Components[consensusComp].OverrideBy
Expect(override).ShouldNot(BeNil())
Expect(*override.Replicas).Should(Equal(ops.Spec.HorizontalScalingList[0].Replicas))
})

It("test scaling down replicas with specified pod", func() {
Expand Down
15 changes: 5 additions & 10 deletions controllers/apps/operations/ops_comp_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"fmt"
"reflect"
"slices"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -145,7 +146,6 @@ func (c componentOpsHelper) reconcileActionWithComponentOps(reqCtx intctrlutil.R
cli client.Client,
opsRes *OpsResource,
opsMessageKey string,
syncOverrideBy syncOverrideByOps,
handleStatusProgress handleStatusProgressWithComponent,
) (appsv1alpha1.OpsPhase, time.Duration, error) {
if opsRes == nil {
Expand All @@ -172,11 +172,6 @@ func (c componentOpsHelper) reconcileActionWithComponentOps(reqCtx intctrlutil.R
if opsRequest.Status.Components == nil {
opsRequest.Status.Components = map[string]appsv1alpha1.OpsRequestComponentStatus{}
}
if syncOverrideBy != nil {
if err = syncOverrideBy(reqCtx, cli, opsRes); err != nil {
return "", 0, nil
}
}
var progressResources []progressResource
setProgressResource := func(compSpec *appsv1alpha1.ClusterComponentSpec, compOps ComponentOpsInteface, fullComponentName string) error {
var componentDefinition *appsv1alpha1.ComponentDefinition
Expand Down Expand Up @@ -234,7 +229,7 @@ func (c componentOpsHelper) reconcileActionWithComponentOps(reqCtx intctrlutil.R
}
}
}
var waitCompleted bool
var waitComponentCompleted bool
for _, pgResource := range progressResources {
opsCompStatus := c.getOpsComponentAndShardStatus(opsRequest, pgResource.compOps)
expectCount, completedCount, err := handleStatusProgress(reqCtx, cli, opsRes, pgResource, &opsCompStatus)
Expand Down Expand Up @@ -266,8 +261,8 @@ func (c componentOpsHelper) reconcileActionWithComponentOps(reqCtx intctrlutil.R
opsCompStatus.LastFailedTime = lastFailedTime
}
// wait the component to complete
if !pgResource.noWaitComponentCompleted && !isComponentCompleted(componentPhase) {
waitCompleted = true
if !pgResource.noWaitComponentCompleted && !slices.Contains(appsv1alpha1.GetComponentTerminalPhases(), componentPhase) {
waitComponentCompleted = true
}
}
c.setOpsComponentAndShardStatus(opsRequest, opsCompStatus, pgResource.compOps)
Expand All @@ -279,7 +274,7 @@ func (c componentOpsHelper) reconcileActionWithComponentOps(reqCtx intctrlutil.R
return opsRequestPhase, 0, err
}
}
if waitCompleted || completedProgressCount != expectProgressCount {
if waitComponentCompleted || completedProgressCount != expectProgressCount {
return opsRequestPhase, 0, nil
}
if isFailed {
Expand Down
Loading

0 comments on commit 5b2c44d

Please sign in to comment.