Skip to content

Commit

Permalink
feat: improve to rebuild the instance on specified node (#6908)
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyelei authored Mar 28, 2024
1 parent ea1cfd2 commit 34c2190
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 50 deletions.
15 changes: 13 additions & 2 deletions apis/apps/v1alpha1/opsrequest_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ type ComponentOps struct {
type RebuildInstance struct {
ComponentOps `json:",inline"`

// Defines the names of the instances that need to be rebuilt. These are essentially the names of the pods.
// Defines the instances that need to be rebuilt.
// +kubebuilder:validation:Required
InstanceNames []string `json:"instanceNames"`
Instances []Instance `json:"instances"`

// Indicates the name of the backup from which to recover. Currently, only a full physical backup is supported
// unless your component only has one replica. Such as 'xtrabackup' is full physical backup for mysql and 'mysqldump' is not.
Expand All @@ -195,6 +195,17 @@ type RebuildInstance struct {
EnvForRestore []corev1.EnvVar `json:"envForRestore,omitempty" patchStrategy:"merge" patchMergeKey:"name"`
}

type Instance struct {
// Pod name of the instance.
// +kubebuilder:validation:Required
Name string `json:"name"`

// The instance will rebuild on the specified node when the instance uses local PersistentVolume as the storage disk.
// If not set, it will rebuild on a random node.
// +optional
TargetNodeName string `json:"targetNodeName,omitempty"`
}

type Switchover struct {
ComponentOps `json:",inline"`

Expand Down
21 changes: 18 additions & 3 deletions apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 16 additions & 5 deletions config/crd/bases/apps.kubeblocks.io_opsrequests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -506,15 +506,26 @@ spec:
type: object
type: array
x-kubernetes-preserve-unknown-fields: true
instanceNames:
description: Defines the names of the instances that need to
be rebuilt. These are essentially the names of the pods.
instances:
description: Defines the instances that need to be rebuilt.
items:
type: string
properties:
name:
description: Pod name of the instance.
type: string
targetNodeName:
description: The instance will rebuild on the specified
node when the instance uses local PersistentVolume as
the storage disk. If not set, it will rebuild on a random
node.
type: string
required:
- name
type: object
type: array
required:
- componentName
- instanceNames
- instances
type: object
type: array
x-kubernetes-list-map-keys:
Expand Down
69 changes: 43 additions & 26 deletions controllers/apps/operations/rebuild_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type instanceHelper struct {
comp *appsv1alpha1.ClusterComponentSpec
targetPod *corev1.Pod
backup *dpv1alpha1.Backup
instance appsv1alpha1.Instance
actionSet *dpv1alpha1.ActionSet
// key: source pvc name, value: the tmp pvc which using to rebuild
pvcMap map[string]*corev1.PersistentVolumeClaim
Expand Down Expand Up @@ -106,17 +107,17 @@ func (r rebuildInstanceOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli cli
if err != nil {
return err
}
for _, podName := range v.InstanceNames {
for _, ins := range v.Instances {
targetPod := &corev1.Pod{}
if err = cli.Get(reqCtx.Ctx, client.ObjectKey{Name: podName, Namespace: opsRes.Cluster.Namespace}, targetPod); err != nil {
if err = cli.Get(reqCtx.Ctx, client.ObjectKey{Name: ins.Name, Namespace: opsRes.Cluster.Namespace}, targetPod); err != nil {
return err
}
isAvailable, err := r.instanceIsAvailable(synthesizedComp, targetPod)
if err != nil {
return err
}
if isAvailable {
return intctrlutil.NewFatalError(fmt.Sprintf(`instance "%s" is availabled, can not rebuild it`, podName))
return intctrlutil.NewFatalError(fmt.Sprintf(`instance "%s" is availabled, can not rebuild it`, ins.Name))
}
}
}
Expand All @@ -127,13 +128,13 @@ func (r rebuildInstanceOpsHandler) SaveLastConfiguration(reqCtx intctrlutil.Requ
return nil
}

func (r rebuildInstanceOpsHandler) getInstanceProgressDetail(compStatus appsv1alpha1.OpsRequestComponentStatus, instance string) *appsv1alpha1.ProgressStatusDetail {
func (r rebuildInstanceOpsHandler) getInstanceProgressDetail(compStatus appsv1alpha1.OpsRequestComponentStatus, instance string) appsv1alpha1.ProgressStatusDetail {
objectKey := getProgressObjectKey(constant.PodKind, instance)
progressDetail := findStatusProgressDetail(compStatus.ProgressDetails, objectKey)
if progressDetail != nil {
return progressDetail
return *progressDetail
}
return &appsv1alpha1.ProgressStatusDetail{
return appsv1alpha1.ProgressStatusDetail{
ObjectKey: objectKey,
Status: appsv1alpha1.ProcessingProgressStatus,
Message: fmt.Sprintf("Start to rebuild pod %s", instance),
Expand All @@ -156,9 +157,9 @@ func (r rebuildInstanceOpsHandler) ReconcileAction(reqCtx intctrlutil.RequestCtx
for _, v := range opsRes.OpsRequest.Spec.RebuildFrom {
compStatus := opsRes.OpsRequest.Status.Components[v.ComponentName]
comp := opsRes.Cluster.Spec.GetComponentByName(v.ComponentName)
for i, instance := range v.InstanceNames {
for i, instance := range v.Instances {
expectCount += 1
progressDetail := r.getInstanceProgressDetail(compStatus, instance)
progressDetail := r.getInstanceProgressDetail(compStatus, instance.Name)
if isCompletedProgressStatus(progressDetail.Status) {
completedCount += 1
if progressDetail.Status == appsv1alpha1.FailedProgressStatus {
Expand All @@ -167,11 +168,11 @@ func (r rebuildInstanceOpsHandler) ReconcileAction(reqCtx intctrlutil.RequestCtx
continue
}
// rebuild instance
completed, err := r.rebuildInstance(reqCtx, cli, opsRes, comp, v.EnvForRestore, progressDetail, instance, v.BackupName, i)
completed, err := r.rebuildInstance(reqCtx, cli, opsRes, comp, v.EnvForRestore, &progressDetail, instance, v.BackupName, i)
if intctrlutil.IsTargetError(err, intctrlutil.ErrorTypeFatal) {
// If a fatal error occurs, this instance rebuilds failed.
progressDetail.SetStatusAndMessage(appsv1alpha1.FailedProgressStatus, err.Error())
setComponentStatusProgressDetail(opsRes.Recorder, opsRes.OpsRequest, &compStatus.ProgressDetails, *progressDetail)
setComponentStatusProgressDetail(opsRes.Recorder, opsRes.OpsRequest, &compStatus.ProgressDetails, progressDetail)
continue
}
if err != nil {
Expand All @@ -180,9 +181,9 @@ func (r rebuildInstanceOpsHandler) ReconcileAction(reqCtx intctrlutil.RequestCtx
if completed {
// if the pod has been rebuilt, set progressDetail phase to Succeed.
progressDetail.SetStatusAndMessage(appsv1alpha1.SucceedProgressStatus,
fmt.Sprintf("Rebuild pod %s successfully", instance))
fmt.Sprintf("Rebuild pod %s successfully", instance.Name))
}
setComponentStatusProgressDetail(opsRes.Recorder, opsRes.OpsRequest, &compStatus.ProgressDetails, *progressDetail)
setComponentStatusProgressDetail(opsRes.Recorder, opsRes.OpsRequest, &compStatus.ProgressDetails, progressDetail)
}
opsRes.OpsRequest.Status.Components[v.ComponentName] = compStatus
}
Expand All @@ -206,10 +207,10 @@ func (r rebuildInstanceOpsHandler) rebuildInstance(reqCtx intctrlutil.RequestCtx
comp *appsv1alpha1.ClusterComponentSpec,
envForRestore []corev1.EnvVar,
progressDetail *appsv1alpha1.ProgressStatusDetail,
targetPodName,
instance appsv1alpha1.Instance,
backupName string,
index int) (bool, error) {
insHelper, err := r.prepareInstanceHelper(reqCtx, cli, opsRes, comp, envForRestore, targetPodName, backupName, index)
insHelper, err := r.prepareInstanceHelper(reqCtx, cli, opsRes, comp, envForRestore, instance, backupName, index)
if err != nil {
return false, err
}
Expand All @@ -224,7 +225,7 @@ func (r rebuildInstanceOpsHandler) prepareInstanceHelper(reqCtx intctrlutil.Requ
opsRes *OpsResource,
comp *appsv1alpha1.ClusterComponentSpec,
envForRestore []corev1.EnvVar,
targetPodName,
instance appsv1alpha1.Instance,
backupName string,
index int) (*instanceHelper, error) {
var (
Expand Down Expand Up @@ -253,7 +254,7 @@ func (r rebuildInstanceOpsHandler) prepareInstanceHelper(reqCtx intctrlutil.Requ
}
}
targetPod := &corev1.Pod{}
if err = cli.Get(reqCtx.Ctx, client.ObjectKey{Name: targetPodName, Namespace: opsRes.Cluster.Namespace}, targetPod); err != nil {
if err = cli.Get(reqCtx.Ctx, client.ObjectKey{Name: instance.Name, Namespace: opsRes.Cluster.Namespace}, targetPod); err != nil {
return nil, err
}
synthesizedComp, err := component.BuildSynthesizedComponentWrapper(reqCtx, cli, opsRes.Cluster, comp)
Expand All @@ -269,6 +270,7 @@ func (r rebuildInstanceOpsHandler) prepareInstanceHelper(reqCtx intctrlutil.Requ
index: index,
comp: comp,
backup: backup,
instance: instance,
actionSet: actionSet,
synthesizedComp: synthesizedComp,
pvcMap: pvcMap,
Expand Down Expand Up @@ -330,15 +332,18 @@ func (r rebuildInstanceOpsHandler) rebuildInstanceWithBackup(reqCtx intctrlutil.
if err != nil || !available {
return false, err
}
progressDetail.Message = fmt.Sprintf(`Waiting for Restore "%s" to be completed`, restoreName)
return false, r.createPostReadyRestore(reqCtx, cli, opsRes.OpsRequest, insHelper, restoreName)
}
return false, r.createPrepareDataRestore(reqCtx, cli, opsRes.OpsRequest, insHelper, restoreName)
}
if restore.Status.Phase == dpv1alpha1.RestorePhaseFailed {
return false, intctrlutil.NewFatalError(fmt.Sprintf(`pod "%s" rebuild failed, due to the Restore "%s" is Failed`, insHelper.targetPod.Name, restoreName))
}
return restore.Status.Phase == dpv1alpha1.RestorePhaseCompleted, nil
if restore.Status.Phase != dpv1alpha1.RestorePhaseCompleted {
progressDetail.Message = fmt.Sprintf(`Waiting for Restore "%s" to be completed`, restoreName)
return false, nil
}
return true, nil
}

var (
Expand Down Expand Up @@ -488,6 +493,16 @@ func (r rebuildInstanceOpsHandler) createPrepareDataRestore(reqCtx intctrlutil.R
}
volumeClaims = append(volumeClaims, volumeClaim)
}
schedulePolicy := dpv1alpha1.SchedulingSpec{
Tolerations: insHelper.targetPod.Spec.Tolerations,
Affinity: insHelper.targetPod.Spec.Affinity,
TopologySpreadConstraints: insHelper.targetPod.Spec.TopologySpreadConstraints,
}
if insHelper.instance.TargetNodeName != "" {
schedulePolicy.NodeSelector = map[string]string{
corev1.LabelHostname: insHelper.instance.TargetNodeName,
}
}
restore := &dpv1alpha1.Restore{
ObjectMeta: r.buildRestoreMetaObject(opsRequest, restoreName),
Spec: dpv1alpha1.RestoreSpec{
Expand All @@ -497,11 +512,7 @@ func (r rebuildInstanceOpsHandler) createPrepareDataRestore(reqCtx intctrlutil.R
},
Env: insHelper.envForRestore,
PrepareDataConfig: &dpv1alpha1.PrepareDataConfig{
SchedulingSpec: dpv1alpha1.SchedulingSpec{
Tolerations: insHelper.targetPod.Spec.Tolerations,
Affinity: insHelper.targetPod.Spec.Affinity,
TopologySpreadConstraints: insHelper.targetPod.Spec.TopologySpreadConstraints,
},
SchedulingSpec: schedulePolicy,
VolumeClaimRestorePolicy: dpv1alpha1.VolumeClaimRestorePolicySerial,
RestoreVolumeClaims: volumeClaims,
},
Expand Down Expand Up @@ -571,14 +582,20 @@ func (r rebuildInstanceOpsHandler) createTmpPVCsAndPod(reqCtx intctrlutil.Reques
VolumeMounts: insHelper.volumeMounts,
}
intctrlutil.InjectZeroResourcesLimitsIfEmpty(container)
rebuildPod := builder.NewPodBuilder(insHelper.targetPod.Namespace, tmpPodName).AddTolerations(insHelper.targetPod.Spec.Tolerations...).
rebuildPodBuilder := builder.NewPodBuilder(insHelper.targetPod.Namespace, tmpPodName).AddTolerations(insHelper.targetPod.Spec.Tolerations...).
AddContainer(*container).
AddVolumes(insHelper.volumes...).
SetRestartPolicy(corev1.RestartPolicyNever).
AddLabels(constant.OpsRequestNameLabelKey, opsRequest.Name).
AddLabels(constant.OpsRequestNamespaceLabelKey, opsRequest.Namespace).
SetTopologySpreadConstraints(insHelper.targetPod.Spec.TopologySpreadConstraints).
SetAffinity(insHelper.targetPod.Spec.Affinity).GetObject()
SetAffinity(insHelper.targetPod.Spec.Affinity)
if insHelper.instance.TargetNodeName != "" {
rebuildPodBuilder.SetNodeSelector(map[string]string{
corev1.LabelHostname: insHelper.instance.TargetNodeName,
})
}
rebuildPod := rebuildPodBuilder.GetObject()
_ = intctrlutil.SetControllerReference(opsRequest, rebuildPod)
return client.IgnoreAlreadyExists(cli.Create(reqCtx.Ctx, rebuildPod))
}
Expand Down Expand Up @@ -752,7 +769,7 @@ func (r rebuildInstanceOpsHandler) instanceIsAvailable(
}
isFailed, isTimeout, _ := intctrlutil.IsPodFailedAndTimedOut(targetPod)
if isFailed && isTimeout {
return false, intctrlutil.NewFatalError(fmt.Sprintf(`create pod "%s" failed`, targetPod.Name))
return false, intctrlutil.NewFatalError(fmt.Sprintf(`the new instance "%s" is failed, please check it`, targetPod.Name))
}
if !podutils.IsPodAvailable(targetPod, synthesizedComp.MinReadySeconds, metav1.Now()) {
return false, nil
Expand Down
16 changes: 11 additions & 5 deletions controllers/apps/operations/rebuild_instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,17 @@ var _ = Describe("OpsUtil functions", func() {
opsName := "rebuild-instance-" + testCtx.GetRandomStr()
ops := testapps.NewOpsRequestObj(opsName, testCtx.DefaultNamespace,
clusterName, appsv1alpha1.RebuildInstanceType)
var instances []appsv1alpha1.Instance
for _, insName := range instanceNames {
instances = append(instances, appsv1alpha1.Instance{
Name: insName,
})
}
ops.Spec.RebuildFrom = []appsv1alpha1.RebuildInstance{
{
ComponentOps: appsv1alpha1.ComponentOps{ComponentName: consensusComp},
InstanceNames: instanceNames,
BackupName: backupName,
ComponentOps: appsv1alpha1.ComponentOps{ComponentName: consensusComp},
Instances: instances,
BackupName: backupName,
},
}
opsRequest := testapps.CreateOpsRequest(ctx, testCtx, ops)
Expand Down Expand Up @@ -170,8 +176,8 @@ var _ = Describe("OpsUtil functions", func() {

By("fake pod is unavailable")
opsRes.OpsRequest.Status.Phase = appsv1alpha1.OpsCreatingPhase
for _, podName := range opsRes.OpsRequest.Spec.RebuildFrom[0].InstanceNames {
Expect(testapps.GetAndChangeObjStatus(&testCtx, client.ObjectKey{Name: podName, Namespace: opsRes.OpsRequest.Namespace}, func(pod *corev1.Pod) {
for _, ins := range opsRes.OpsRequest.Spec.RebuildFrom[0].Instances {
Expect(testapps.GetAndChangeObjStatus(&testCtx, client.ObjectKey{Name: ins.Name, Namespace: opsRes.OpsRequest.Namespace}, func(pod *corev1.Pod) {
pod.Status.Conditions = nil
})()).Should(Succeed())
}
Expand Down
21 changes: 16 additions & 5 deletions deploy/helm/crds/apps.kubeblocks.io_opsrequests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -506,15 +506,26 @@ spec:
type: object
type: array
x-kubernetes-preserve-unknown-fields: true
instanceNames:
description: Defines the names of the instances that need to
be rebuilt. These are essentially the names of the pods.
instances:
description: Defines the instances that need to be rebuilt.
items:
type: string
properties:
name:
description: Pod name of the instance.
type: string
targetNodeName:
description: The instance will rebuild on the specified
node when the instance uses local PersistentVolume as
the storage disk. If not set, it will rebuild on a random
node.
type: string
required:
- name
type: object
type: array
required:
- componentName
- instanceNames
- instances
type: object
type: array
x-kubernetes-list-map-keys:
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile-tools
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ RUN GRPC_HEALTH_PROBE_VERSION=v0.4.13 GOOS=${TARGETOS} GOARCH=${TARGETARCH} &&


# Use alpine with tag 20230329 is corresponding to "edge" tag (latest release to date is 3.18) as of 20230625
FROM docker.io/alpine:edge as dist
FROM docker.io/alpine:3.19.1 as dist
ARG APK_MIRROR

# install tools via apk
Expand Down
Loading

0 comments on commit 34c2190

Please sign in to comment.