Skip to content

Commit

Permalink
feat: support volume populator for restore (#5342)
Browse files Browse the repository at this point in the history
(cherry picked from commit b9867a3)
  • Loading branch information
wangyelei committed Oct 11, 2023
1 parent fe6bed8 commit 8667d7e
Show file tree
Hide file tree
Showing 35 changed files with 1,030 additions and 111 deletions.
8 changes: 3 additions & 5 deletions apis/dataprotection/v1alpha1/backuppolicy_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,17 @@ const (
// target database cluster.
type ConnectionCredential struct {
// secretName refers to the Secret object that contains the connection credential.
// +kube:validation:Required
// +kubebuilder:validation:Required
// +kubebuilder:validation:Pattern:=`^[a-z0-9]([a-z0-9\.\-]*[a-z0-9])?$`
SecretName string `json:"secretName"`

// usernameKey specifies the map key of the user in the connection credential secret.
// +kubebuilder:validation:Required
// +kubebuilder:default=username
UsernameKey string `json:"usernameKey"`
UsernameKey string `json:"usernameKey,omitempty"`

// passwordKey specifies the map key of the password in the connection credential secret.
// +kubebuilder:validation:Required
// +kubebuilder:default=password
PasswordKey string `json:"passwordKey"`
PasswordKey string `json:"passwordKey,omitempty"`

// hostKey specifies the map key of the host in the connection credential secret.
// +kubebuilder:default=host
Expand Down
12 changes: 6 additions & 6 deletions apis/dataprotection/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,16 @@ func (r RetentionPeriod) nextNumber(input string) (num int, rest string, err err
return num, rest, nil
}

// RestorePhase The current phase. Valid values are Running, Completed, Failed, Deleting.
// RestorePhase The current phase. Valid values are Running, Completed, Failed, AsDataSource.
// +enum
// +kubebuilder:validation:Enum={Running,Completed,Failed,Deleting}
// +kubebuilder:validation:Enum={Running,Completed,Failed,AsDataSource}
type RestorePhase string

const (
RestorePhaseRunning RestorePhase = "Running"
RestorePhaseCompleted RestorePhase = "Completed"
RestorePhaseFailed RestorePhase = "Failed"
RestorePhaseDeleting RestorePhase = "Deleting"
RestorePhaseRunning RestorePhase = "Running"
RestorePhaseCompleted RestorePhase = "Completed"
RestorePhaseFailed RestorePhase = "Failed"
RestorePhaseAsDataSource RestorePhase = "AsDataSource"
)

// RestoreActionStatus the status of restore action.
Expand Down
9 changes: 9 additions & 0 deletions cmd/dataprotection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,15 @@ func main() {
os.Exit(1)
}

if err = (&dpcontrollers.VolumePopulatorReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("volume-populator-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "VolumePopulator")
os.Exit(1)
}

if err = (&dpcontrollers.BackupPolicyReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,7 @@ spec:
in the connection credential secret.
type: string
required:
- passwordKey
- secretName
- usernameKey
type: object
podSelector:
description: podSelector is used to find the target pod. The volumes
Expand Down
2 changes: 0 additions & 2 deletions config/crd/bases/dataprotection.kubeblocks.io_backups.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -549,9 +549,7 @@ spec:
in the connection credential secret.
type: string
required:
- passwordKey
- secretName
- usernameKey
type: object
podSelector:
description: podSelector is used to find the target pod. The volumes
Expand Down
6 changes: 2 additions & 4 deletions config/crd/bases/dataprotection.kubeblocks.io_restores.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2046,9 +2046,7 @@ spec:
in the connection credential secret.
type: string
required:
- passwordKey
- secretName
- usernameKey
type: object
execAction:
description: configuration for exec action.
Expand Down Expand Up @@ -2504,12 +2502,12 @@ spec:
type: string
phase:
description: RestorePhase The current phase. Valid values are Running,
Completed, Failed, Deleting.
Completed, Failed, AsDataSource.
enum:
- Running
- Completed
- Failed
- Deleting
- AsDataSource
type: string
startTimestamp:
description: Date/time when the restore started being processed.
Expand Down
2 changes: 2 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ rules:
- persistentvolumeclaims/status
verbs:
- get
- patch
- update
- apiGroups:
- ""
resources:
Expand Down
4 changes: 2 additions & 2 deletions controllers/apps/cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ var _ = Describe("Cluster Controller", func() {
Expect(testapps.GetAndChangeObjStatus(&testCtx, backupKey, func(backup *dpv1alpha1.Backup) {
backup.Status.Phase = dpv1alpha1.BackupPhaseCompleted
backup.Status.PersistentVolumeClaimName = "backup-data"
testdp.MockBackupStatusMethod(backup, testapps.DataVolumeName)
testdp.MockBackupStatusMethod(backup, testdp.BackupMethodName, testapps.DataVolumeName, testdp.ActionSetName)
})()).Should(Succeed())

if testk8s.IsMockVolumeSnapshotEnabled(&testCtx, storageClassName) {
Expand Down Expand Up @@ -2324,7 +2324,7 @@ var _ = Describe("Cluster Controller", func() {
Eventually(testapps.GetAndChangeObjStatus(&testCtx, client.ObjectKeyFromObject(backup), func(backup *dpv1alpha1.Backup) {
backup.Status.PersistentVolumeClaimName = "backup-pvc"
backup.Status.Phase = dpv1alpha1.BackupPhaseCompleted
testdp.MockBackupStatusMethod(backup, testapps.DataVolumeName)
testdp.MockBackupStatusMethod(backup, testdp.BackupMethodName, testapps.DataVolumeName, testdp.ActionSetName)
})).Should(Succeed())

By("creating cluster with backup")
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/opsrequest_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ var _ = Describe("OpsRequest Controller", func() {
backup := &dpv1alpha1.Backup{}
Expect(k8sClient.Get(testCtx.Ctx, backupKey, backup)).Should(Succeed())
backup.Status.Phase = dpv1alpha1.BackupPhaseCompleted
testdp.MockBackupStatusMethod(backup, testapps.DataVolumeName)
testdp.MockBackupStatusMethod(backup, testdp.BackupMethodName, testapps.DataVolumeName, testdp.ActionSetName)
Expect(k8sClient.Status().Update(testCtx.Ctx, backup)).Should(Succeed())
Eventually(testapps.CheckObj(&testCtx, clusterKey, func(g Gomega, cluster *appsv1alpha1.Cluster) {
g.Expect(cluster.Status.Components[mysqlCompName].Phase).Should(Equal(appsv1alpha1.UpdatingClusterCompPhase))
Expand Down
49 changes: 10 additions & 39 deletions controllers/dataprotection/restore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1"
"github.com/apecloud/kubeblocks/internal/constant"
intctrlutil "github.com/apecloud/kubeblocks/internal/controllerutil"
dperrors "github.com/apecloud/kubeblocks/internal/dataprotection/errors"
dprestore "github.com/apecloud/kubeblocks/internal/dataprotection/restore"
dptypes "github.com/apecloud/kubeblocks/internal/dataprotection/types"
)
Expand Down Expand Up @@ -135,10 +134,14 @@ func (r *RestoreReconciler) newAction(reqCtx intctrlutil.RequestCtx, restore *dp
}
return intctrlutil.Reconciled()
}
// patch status
restore.Status.StartTimestamp = &metav1.Time{Time: time.Now()}
restore.Status.Phase = dpv1alpha1.RestorePhaseRunning
r.Recorder.Event(restore, corev1.EventTypeNormal, dprestore.ReasonRestoreStarting, "start to restore")
if restore.Spec.PrepareDataConfig != nil && restore.Spec.PrepareDataConfig.DataSourceRef != nil {
restore.Status.Phase = dpv1alpha1.RestorePhaseAsDataSource
} else {
// patch status
restore.Status.StartTimestamp = &metav1.Time{Time: time.Now()}
restore.Status.Phase = dpv1alpha1.RestorePhaseRunning
r.Recorder.Event(restore, corev1.EventTypeNormal, dprestore.ReasonRestoreStarting, "start to restore")
}
if err := r.Client.Status().Patch(reqCtx.Ctx, restore, patch); err != nil {
return intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "")
}
Expand Down Expand Up @@ -207,39 +210,7 @@ func (r *RestoreReconciler) validateAndBuildMGR(reqCtx intctrlutil.RequestCtx, r
}
}()

// get backupActionSet based on the specified backup name.
backupName := restoreMgr.Restore.Spec.Backup.Name
backupSet, err := restoreMgr.GetBackupActionSetByNamespaced(reqCtx, r.Client, backupName, restoreMgr.Restore.Spec.Backup.Namespace)
if err != nil {
return err
}

// check if the backup is completed exclude continuous backup.
var backupType dpv1alpha1.BackupType
if backupSet.ActionSet != nil {
backupType = backupSet.ActionSet.Spec.BackupType
} else if backupSet.UseVolumeSnapshot {
backupType = dpv1alpha1.BackupTypeFull
}
if backupType != dpv1alpha1.BackupTypeContinuous && backupSet.Backup.Status.Phase != dpv1alpha1.BackupPhaseCompleted {
err = intctrlutil.NewFatalError(fmt.Sprintf(`phase of backup "%s" is not completed`, backupName))
return err
}

// build backupActionSets of prepareData and postReady stage based on the specified backup's type.
switch backupType {
case dpv1alpha1.BackupTypeFull:
restoreMgr.SetBackupSets(*backupSet)
case dpv1alpha1.BackupTypeIncremental:
err = restoreMgr.BuildIncrementalBackupActionSets(reqCtx, r.Client, *backupSet)
case dpv1alpha1.BackupTypeDifferential:
err = restoreMgr.BuildDifferentialBackupActionSets(reqCtx, r.Client, *backupSet)
case dpv1alpha1.BackupTypeContinuous:
err = intctrlutil.NewErrorf(dperrors.ErrorTypeWaitForExternalHandler, "wait for external handler to do handle the Point-In-Time recovery.")
r.Recorder.Event(restoreMgr.Restore, corev1.EventTypeWarning, string(dperrors.ErrorTypeWaitForExternalHandler), err.Error())
default:
err = intctrlutil.NewFatalError(fmt.Sprintf("backup type of %s is empty", backupName))
}
err = dprestore.ValidateAndInitRestoreMGR(reqCtx, r.Client, r.Recorder, restoreMgr)
return err
}

Expand Down Expand Up @@ -365,7 +336,7 @@ func (r *RestoreReconciler) handleBackupActionSet(reqCtx intctrlutil.RequestCtx,
return true, nil
}
// 3. create jobs
jobs, err = restoreMgr.CreateJobsIfNotExist(reqCtx, r.Client, jobs)
jobs, err = restoreMgr.CreateJobsIfNotExist(reqCtx, r.Client, restoreMgr.Restore, jobs)
if err != nil {
return false, err
}
Expand Down
7 changes: 7 additions & 0 deletions controllers/dataprotection/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ var _ = BeforeSuite(func() {
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

err = (&VolumePopulatorReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Recorder: k8sManager.GetEventRecorderFor("volume-populate-controller"),
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

testCtx = testutil.NewDefaultTestContext(ctx, k8sClient, testEnv)

go func() {
Expand Down
23 changes: 23 additions & 0 deletions controllers/dataprotection/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"runtime"
"time"

corev1 "k8s.io/api/core/v1"

viper "github.com/apecloud/kubeblocks/internal/viperx"
)

Expand Down Expand Up @@ -85,6 +87,27 @@ const (
ReasonUnknownError = "UnknownError"
)

// constant for volume populator
const (
populatePodPrefix = "kb-populate"

// annotation keys
annSelectedNode = "volume.kubernetes.io/selected-node"
annPopulateFrom = "dataprotections.kubeblocks.io/populate-from"

// event reason
reasonStartToVolumePopulate = "StartToVolumePopulate"
reasonVolumePopulateSucceed = "VolumePopulateSucceed"
reasonVolumePopulateFailed = "VolumePopulateFailed"

// pvc condition type and reason
reasonPopulatingFailed = "Failed"
reasonPopulatingProcessing = "Processing"
reasonPopulatingSucceed = "Succeed"

PersistentVolumeClaimPopulating corev1.PersistentVolumeClaimConditionType = "Populating"
)

var reconcileInterval = time.Second

func init() {
Expand Down
6 changes: 6 additions & 0 deletions controllers/dataprotection/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,9 @@ func fromFlattenName(flatten string) (name string, namespace string) {
}
return
}

// restore functions

func getPopulatePVCName(pvcUID types.UID) string {
return fmt.Sprintf("%s-%s", populatePodPrefix, pvcUID)
}
Loading

0 comments on commit 8667d7e

Please sign in to comment.