Skip to content

Commit

Permalink
chore: support to specify the continuous backup method for cluster.sp…
Browse files Browse the repository at this point in the history
…ec (#8863)
  • Loading branch information
wangyelei authored Jan 23, 2025
1 parent 1e49849 commit 8cf92c7
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 40 deletions.
5 changes: 5 additions & 0 deletions apis/apps/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ type ClusterBackup struct {
// +optional
PITREnabled *bool `json:"pitrEnabled,omitempty"`

// Specifies the backup method to use, if not set, use the first continuous method.
//
// +optional
ContinuousMethod string `json:"continuousMethod,omitempty"`

// Specifies whether to enable incremental backup.
//
// +kubebuilder:default=false
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/apps.kubeblocks.io_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ spec:
backup:
description: Specifies the backup configuration of the Cluster.
properties:
continuousMethod:
description: Specifies the backup method to use, if not set, use
the first continuous method.
type: string
cronExpression:
description: The cron expression for the schedule. The timezone
is in UTC. See https://en.wikipedia.org/wiki/Cron.
Expand Down
64 changes: 53 additions & 11 deletions controllers/apps/cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/apecloud/kubeblocks/pkg/controller/builder"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/scheduling"
"github.com/apecloud/kubeblocks/pkg/dataprotection/utils/boolptr"
"github.com/apecloud/kubeblocks/pkg/generics"
testapps "github.com/apecloud/kubeblocks/pkg/testutil/apps"
testdp "github.com/apecloud/kubeblocks/pkg/testutil/dataprotection"
Expand Down Expand Up @@ -161,7 +162,7 @@ var _ = Describe("Cluster Controller", func() {
GetObject()

By("Create a bpt obj")
createBackupPolicyTpl(clusterDefObj, compDefObj.Name, clusterVersionName)
createBackupPolicyTpl(clusterDefObj, compDefObj.Name, false, clusterVersionName)

By("Create a componentVersion obj")
compVersionObj = testapps.NewComponentVersionFactory(compVersionName).
Expand Down Expand Up @@ -1368,6 +1369,19 @@ var _ = Describe("Cluster Controller", func() {
RepoName: backupRepoName,
},
},
{
desc: "backup with snapshot method and specified continuous method",
backup: &appsv1alpha1.ClusterBackup{
Enabled: &boolTrue,
RetentionPeriod: retention("1d"),
Method: vsBackupMethodName,
CronExpression: "*/1 * * * *",
StartingDeadlineMinutes: int64Ptr(int64(10)),
ContinuousMethod: continuousMethodName1,
PITREnabled: &boolTrue,
RepoName: backupRepoName,
},
},
{
desc: "disable backup",
backup: &appsv1alpha1.ClusterBackup{
Expand Down Expand Up @@ -1405,19 +1419,36 @@ var _ = Describe("Cluster Controller", func() {

checkSchedule := func(g Gomega, schedule *dpv1alpha1.BackupSchedule) {
var policy *dpv1alpha1.SchedulePolicy
enableOtherFullMethod := false
for i, s := range schedule.Spec.Schedules {
hasCheckPITRMethod := false
for i := range schedule.Spec.Schedules {
s := &schedule.Spec.Schedules[i]
if s.BackupMethod == backup.Method {
Expect(*s.Enabled).Should(BeEquivalentTo(*backup.Enabled))
policy = &schedule.Spec.Schedules[i]
if *backup.Enabled {
enableOtherFullMethod = true
policy = s
continue
}
if !slices.Contains([]string{continuousMethodName, continuousMethodName1}, s.BackupMethod) {
if boolptr.IsSetToTrue(backup.Enabled) {
// another full backup method should be disabled.
Expect(*s.Enabled).Should(BeFalse())
}
continue
}
if enableOtherFullMethod {
// another full backup method should be disabled.
Expect(*s.Enabled).Should(BeFalse())
if len(backup.ContinuousMethod) == 0 {
// first continuous backup method should be equal to "PITREnabled", another is disabled.
if !hasCheckPITRMethod {
Expect(*s.Enabled).Should(BeEquivalentTo(*backup.PITREnabled))
hasCheckPITRMethod = true
} else {
Expect(*s.Enabled).Should(BeFalse())
}
} else {
// specified continuous backup method should be equal to "PITREnabled", another is disabled.
if backup.ContinuousMethod == s.BackupMethod {
Expect(*s.Enabled).Should(BeEquivalentTo(*backup.PITREnabled))
} else {
Expect(*s.Enabled).Should(BeFalse())
}
}
}
if backup.Enabled != nil && *backup.Enabled {
Expand Down Expand Up @@ -1593,9 +1624,10 @@ var _ = Describe("Cluster Controller", func() {
})
})

func createBackupPolicyTpl(clusterDefObj *appsv1alpha1.ClusterDefinition, compDef string, mappingClusterVersions ...string) {
func createBackupPolicyTpl(clusterDefObj *appsv1alpha1.ClusterDefinition, compDef string, forHScale bool, mappingClusterVersions ...string) {
By("create actionSet")
fakeActionSet(clusterDefObj.Name)
fakeActionSet(actionSetName, clusterDefObj.Name, dpv1alpha1.BackupTypeFull)
fakeActionSet(continuousActionSetName, clusterDefObj.Name, dpv1alpha1.BackupTypeContinuous)

By("Creating a BackupPolicyTemplate")
bpt := testapps.NewBackupPolicyTemplateFactory(backupPolicyTPLName).
Expand All @@ -1618,6 +1650,16 @@ func createBackupPolicyTpl(clusterDefObj *appsv1alpha1.ClusterDefinition, compDe
case appsv1alpha1.Replication:
bpt.SetTargetRole("primary")
}
if !forHScale {
bpt.AddBackupMethod(continuousMethodName, false, continuousActionSetName).
SetComponentDef(compDef).
SetBackupMethodVolumeMounts("data", "/data").
AddBackupMethod(continuousMethodName1, false, continuousActionSetName).
SetComponentDef(compDef).
SetBackupMethodVolumeMounts("data", "/data").
AddSchedule(continuousMethodName, "0 0 * * *", ttl, false).
AddSchedule(continuousMethodName1, "0 0 * * *", ttl, false)
}
}
bpt.Create(&testCtx)
}
27 changes: 15 additions & 12 deletions controllers/apps/component_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ import (
)

const (
backupPolicyTPLName = "test-backup-policy-template-mysql"
backupMethodName = "test-backup-method"
vsBackupMethodName = "test-vs-backup-method"
actionSetName = "test-action-set"
backupPolicyTPLName = "test-backup-policy-template-mysql"
backupMethodName = "test-backup-method"
continuousMethodName = "continuous-backup-method"
continuousMethodName1 = "continuous-backup-method1"
vsBackupMethodName = "test-vs-backup-method"
actionSetName = "test-action-set"
continuousActionSetName = "test-continuous-action-set"
)

var (
Expand Down Expand Up @@ -700,7 +703,7 @@ var _ = Describe("Component Controller", func() {

if policyType == appsv1alpha1.HScaleDataClonePolicyCloneVolume {
By("creating actionSet if backup policy is backup")
fakeActionSet(clusterDef.Name)
fakeActionSet(actionSetName, clusterDef.Name, dpv1alpha1.BackupTypeFull)
}
}
})()).ShouldNot(HaveOccurred())
Expand Down Expand Up @@ -2052,7 +2055,7 @@ var _ = Describe("Component Controller", func() {
Context("provisioning", func() {
BeforeEach(func() {
createAllWorkloadTypesClusterDef()
createBackupPolicyTpl(clusterDefObj, compDefName)
createBackupPolicyTpl(clusterDefObj, compDefName, true)
})

AfterEach(func() {
Expand Down Expand Up @@ -2123,7 +2126,7 @@ var _ = Describe("Component Controller", func() {

BeforeEach(func() {
createAllWorkloadTypesClusterDef()
createBackupPolicyTpl(clusterDefObj, compDefName)
createBackupPolicyTpl(clusterDefObj, compDefName, true)
})

AfterEach(func() {
Expand All @@ -2143,7 +2146,7 @@ var _ = Describe("Component Controller", func() {
BeforeEach(func() {
cleanEnv()
createAllWorkloadTypesClusterDef()
createBackupPolicyTpl(clusterDefObj, compDefName)
createBackupPolicyTpl(clusterDefObj, compDefName, true)
})

createNWaitClusterObj := func(components map[string]string,
Expand Down Expand Up @@ -2220,7 +2223,7 @@ var _ = Describe("Component Controller", func() {

BeforeEach(func() {
createAllWorkloadTypesClusterDef()
createBackupPolicyTpl(clusterDefObj, compDefName)
createBackupPolicyTpl(clusterDefObj, compDefName, true)
mockStorageClass = testk8s.CreateMockStorageClass(&testCtx, testk8s.DefaultStorageClassName)
})

Expand Down Expand Up @@ -2289,7 +2292,7 @@ var _ = Describe("Component Controller", func() {
When("creating cluster with workloadType=consensus component", func() {
BeforeEach(func() {
createAllWorkloadTypesClusterDef()
createBackupPolicyTpl(clusterDefObj, compDefName)
createBackupPolicyTpl(clusterDefObj, compDefName, true)
})

AfterEach(func() {
Expand Down Expand Up @@ -2544,7 +2547,7 @@ func checkRestoreAndSetCompleted(clusterKey types.NamespacedName, compName strin
mockRestoreCompleted(ml)
}

func fakeActionSet(clusterDefName string) *dpv1alpha1.ActionSet {
func fakeActionSet(actionSetName, clusterDefName string, backupType dpv1alpha1.BackupType) *dpv1alpha1.ActionSet {
actionSet := &dpv1alpha1.ActionSet{
ObjectMeta: metav1.ObjectMeta{
Name: actionSetName,
Expand All @@ -2559,7 +2562,7 @@ func fakeActionSet(clusterDefName string) *dpv1alpha1.ActionSet {
Value: "test-value",
},
},
BackupType: dpv1alpha1.BackupTypeFull,
BackupType: backupType,
Backup: &dpv1alpha1.BackupActionSpec{
BackupData: &dpv1alpha1.BackupDataActionSpec{
JobActionSpec: dpv1alpha1.JobActionSpec{
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/opsrequest_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ var _ = Describe("OpsRequest Controller", func() {
}

createMysqlCluster := func(replicas int32) {
createBackupPolicyTpl(clusterDefObj, mysqlCompDefName)
createBackupPolicyTpl(clusterDefObj, mysqlCompDefName, true)

By("set component to horizontal with snapshot policy and create a cluster")
testk8s.MockEnableVolumeSnapshot(&testCtx, testk8s.DefaultStorageClassName)
Expand Down
32 changes: 21 additions & 11 deletions controllers/apps/transformer_cluster_backup_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,8 @@ func (r *clusterBackupPolicyTransformer) mergeClusterBackup(
hasSyncPITRMethod := false
hasSyncIncMethod := false
enableAutoBackup := boolptr.IsSetToTrue(backup.Enabled)
for i, s := range backupSchedule.Spec.Schedules {
for i := range backupSchedule.Spec.Schedules {
s := &backupSchedule.Spec.Schedules[i]
if s.BackupMethod == backup.Method {
mergeSchedulePolicy(sp, &backupSchedule.Spec.Schedules[i])
exist = true
Expand All @@ -671,15 +672,23 @@ func (r *clusterBackupPolicyTransformer) mergeClusterBackup(
r.Error(err, "failed to get ActionSet for backup.", "ActionSet", as.Name)
continue
}
if as.Spec.BackupType == dpv1alpha1.BackupTypeContinuous && backup.PITREnabled != nil && !hasSyncPITRMethod {
// auto-sync the first continuous backup for the 'pirtEnable' option.
backupSchedule.Spec.Schedules[i].Enabled = backup.PITREnabled
switch as.Spec.BackupType {
case dpv1alpha1.BackupTypeContinuous:
if backup.PITREnabled == nil {
continue
}
if boolptr.IsSetToFalse(backup.PITREnabled) || hasSyncPITRMethod ||
(len(backup.ContinuousMethod) > 0 && backup.ContinuousMethod != s.BackupMethod) {
s.Enabled = boolptr.False()
continue
}
// auto-sync the first or specified continuous backup for the 'pirtEnable' option.
s.Enabled = backup.PITREnabled
if backup.RetentionPeriod.String() != "" {
backupSchedule.Spec.Schedules[i].RetentionPeriod = backup.RetentionPeriod
s.RetentionPeriod = backup.RetentionPeriod
}
hasSyncPITRMethod = true
}
if as.Spec.BackupType == dpv1alpha1.BackupTypeIncremental {
case dpv1alpha1.BackupTypeIncremental:
if len(backup.Method) == 0 || m.CompatibleMethod != backup.Method {
// disable other incremental backup schedules
backupSchedule.Spec.Schedules[i].Enabled = boolptr.False()
Expand All @@ -692,10 +701,11 @@ func (r *clusterBackupPolicyTransformer) mergeClusterBackup(
}, &backupSchedule.Spec.Schedules[i])
hasSyncIncMethod = true
}
}
if as.Spec.BackupType == dpv1alpha1.BackupTypeFull && enableAutoBackup {
// disable the automatic backup for other full backup method
backupSchedule.Spec.Schedules[i].Enabled = boolptr.False()
case dpv1alpha1.BackupTypeFull:
if enableAutoBackup {
// disable the automatic backup for other full backup method
s.Enabled = boolptr.False()
}
}
}
if !exist {
Expand Down
4 changes: 4 additions & 0 deletions deploy/helm/crds/apps.kubeblocks.io_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ spec:
backup:
description: Specifies the backup configuration of the Cluster.
properties:
continuousMethod:
description: Specifies the backup method to use, if not set, use
the first continuous method.
type: string
cronExpression:
description: The cron expression for the schedule. The timezone
is in UTC. See https://en.wikipedia.org/wiki/Cron.
Expand Down
12 changes: 12 additions & 0 deletions docs/developer_docs/api-reference/cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -4548,6 +4548,18 @@ bool
</tr>
<tr>
<td>
<code>continuousMethod</code><br/>
<em>
string
</em>
</td>
<td>
<em>(Optional)</em>
<p>Specifies the backup method to use, if not set, use the first continuous method.</p>
</td>
</tr>
<tr>
<td>
<code>incrementalBackupEnabled</code><br/>
<em>
bool
Expand Down
37 changes: 34 additions & 3 deletions pkg/dataprotection/backup/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,18 +365,42 @@ type backupReconfigureRef struct {

type parameterPairs map[string][]appsv1alpha1.ParameterPair

func (s *Scheduler) convertLastAppliedConfigs(continuousMethod string) {
if _, ok := s.BackupSchedule.Annotations[dptypes.LastAppliedConfigsAnnotationKey]; ok {
return
}
lastAppliedConfig := s.BackupSchedule.Annotations[constant.LastAppliedConfigAnnotationKey]
if lastAppliedConfig == "" {
return
}
lastAppliedConfigMap := map[string]string{}
lastAppliedConfigMap[continuousMethod] = lastAppliedConfig
str, _ := json.Marshal(lastAppliedConfigMap)
s.BackupSchedule.Annotations[dptypes.LastAppliedConfigsAnnotationKey] = string(str)
}

func (s *Scheduler) getLastAppliedConfigsMap() (map[string]string, error) {
resMap := map[string]string{}
if err := json.Unmarshal([]byte(s.BackupSchedule.Annotations[dptypes.LastAppliedConfigsAnnotationKey]), &resMap); err != nil {
return nil, err
}
return resMap, nil
}

func (s *Scheduler) reconfigure(schedulePolicy *dpv1alpha1.SchedulePolicy) error {
reCfgRef := s.BackupSchedule.Annotations[dptypes.ReconfigureRefAnnotationKey]
if reCfgRef == "" {
return nil
}
// convert deprecated "lastAppliedConfig "to "lastAppliedConfigs"
s.convertLastAppliedConfigs(schedulePolicy.BackupMethod)
configRef := backupReconfigureRef{}
if err := json.Unmarshal([]byte(reCfgRef), &configRef); err != nil {
return err
}

enable := boolptr.IsSetToTrue(schedulePolicy.Enabled)
if s.BackupSchedule.Annotations[constant.LastAppliedConfigAnnotationKey] == "" && !enable {
if s.BackupSchedule.Annotations[dptypes.LastAppliedConfigsAnnotationKey] == "" && !enable {
// disable in the first policy created, no need reconfigure because default configs had been set.
return nil
}
Expand All @@ -392,9 +416,13 @@ func (s *Scheduler) reconfigure(schedulePolicy *dpv1alpha1.SchedulePolicy) error
// skip reconfigure if not found parameters.
return nil
}
lastAppliedConfigsMap, err := s.getLastAppliedConfigsMap()
if err != nil {
return err
}
updateParameterPairsBytes, _ := json.Marshal(parameters)
updateParameterPairs := string(updateParameterPairsBytes)
if updateParameterPairs == s.BackupSchedule.Annotations[constant.LastAppliedConfigAnnotationKey] {
if updateParameterPairs == lastAppliedConfigsMap[schedulePolicy.BackupMethod] {
// reconcile the config job if finished
return s.reconcileReconfigure(s.BackupSchedule)
}
Expand Down Expand Up @@ -450,7 +478,10 @@ func (s *Scheduler) reconfigure(schedulePolicy *dpv1alpha1.SchedulePolicy) error
if s.BackupSchedule.Annotations == nil {
s.BackupSchedule.Annotations = map[string]string{}
}
s.BackupSchedule.Annotations[constant.LastAppliedConfigAnnotationKey] = updateParameterPairs
lastAppliedConfigsMap[schedulePolicy.BackupMethod] = updateParameterPairs
updateParameterPairsBytes, _ = json.Marshal(lastAppliedConfigsMap)
s.BackupSchedule.Annotations[dptypes.LastAppliedConfigsAnnotationKey] = string(updateParameterPairsBytes)
delete(s.BackupSchedule.Annotations, constant.LastAppliedConfigAnnotationKey)
if err := s.Client.Patch(s.Ctx, s.BackupSchedule, patch); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 8cf92c7

Please sign in to comment.