Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: support to specify the continuous backup method for cluster.spec #8863

Merged
merged 5 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apis/apps/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ type ClusterBackup struct {
// +optional
PITREnabled *bool `json:"pitrEnabled,omitempty"`

// Specifies the backup method to use, if not set, use the first continuous method.
//
// +optional
ContinuousMethod string `json:"continuousMethod,omitempty"`

// Specifies whether to enable incremental backup.
//
// +kubebuilder:default=false
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/apps.kubeblocks.io_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ spec:
backup:
description: Specifies the backup configuration of the Cluster.
properties:
continuousMethod:
description: Specifies the backup method to use, if not set, use
the first continuous method.
type: string
cronExpression:
description: The cron expression for the schedule. The timezone
is in UTC. See https://en.wikipedia.org/wiki/Cron.
Expand Down
64 changes: 53 additions & 11 deletions controllers/apps/cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/apecloud/kubeblocks/pkg/controller/builder"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/scheduling"
"github.com/apecloud/kubeblocks/pkg/dataprotection/utils/boolptr"
"github.com/apecloud/kubeblocks/pkg/generics"
testapps "github.com/apecloud/kubeblocks/pkg/testutil/apps"
testdp "github.com/apecloud/kubeblocks/pkg/testutil/dataprotection"
Expand Down Expand Up @@ -161,7 +162,7 @@ var _ = Describe("Cluster Controller", func() {
GetObject()

By("Create a bpt obj")
createBackupPolicyTpl(clusterDefObj, compDefObj.Name, clusterVersionName)
createBackupPolicyTpl(clusterDefObj, compDefObj.Name, false, clusterVersionName)

By("Create a componentVersion obj")
compVersionObj = testapps.NewComponentVersionFactory(compVersionName).
Expand Down Expand Up @@ -1368,6 +1369,19 @@ var _ = Describe("Cluster Controller", func() {
RepoName: backupRepoName,
},
},
{
desc: "backup with snapshot method and specified continuous method",
backup: &appsv1alpha1.ClusterBackup{
Enabled: &boolTrue,
RetentionPeriod: retention("1d"),
Method: vsBackupMethodName,
CronExpression: "*/1 * * * *",
StartingDeadlineMinutes: int64Ptr(int64(10)),
ContinuousMethod: continuousMethodName1,
PITREnabled: &boolTrue,
RepoName: backupRepoName,
},
},
{
desc: "disable backup",
backup: &appsv1alpha1.ClusterBackup{
Expand Down Expand Up @@ -1405,19 +1419,36 @@ var _ = Describe("Cluster Controller", func() {

checkSchedule := func(g Gomega, schedule *dpv1alpha1.BackupSchedule) {
var policy *dpv1alpha1.SchedulePolicy
enableOtherFullMethod := false
for i, s := range schedule.Spec.Schedules {
hasCheckPITRMethod := false
for i := range schedule.Spec.Schedules {
s := &schedule.Spec.Schedules[i]
if s.BackupMethod == backup.Method {
Expect(*s.Enabled).Should(BeEquivalentTo(*backup.Enabled))
policy = &schedule.Spec.Schedules[i]
if *backup.Enabled {
enableOtherFullMethod = true
policy = s
continue
}
if !slices.Contains([]string{continuousMethodName, continuousMethodName1}, s.BackupMethod) {
if boolptr.IsSetToTrue(backup.Enabled) {
// another full backup method should be disabled.
Expect(*s.Enabled).Should(BeFalse())
}
continue
}
if enableOtherFullMethod {
// another full backup method should be disabled.
Expect(*s.Enabled).Should(BeFalse())
if len(backup.ContinuousMethod) == 0 {
// first continuous backup method should be equal to "PITREnabled", another is disabled.
if !hasCheckPITRMethod {
Expect(*s.Enabled).Should(BeEquivalentTo(*backup.PITREnabled))
hasCheckPITRMethod = true
} else {
Expect(*s.Enabled).Should(BeFalse())
}
} else {
// specified continuous backup method should be equal to "PITREnabled", another is disabled.
if backup.ContinuousMethod == s.BackupMethod {
Expect(*s.Enabled).Should(BeEquivalentTo(*backup.PITREnabled))
} else {
Expect(*s.Enabled).Should(BeFalse())
}
}
}
if backup.Enabled != nil && *backup.Enabled {
Expand Down Expand Up @@ -1593,9 +1624,10 @@ var _ = Describe("Cluster Controller", func() {
})
})

func createBackupPolicyTpl(clusterDefObj *appsv1alpha1.ClusterDefinition, compDef string, mappingClusterVersions ...string) {
func createBackupPolicyTpl(clusterDefObj *appsv1alpha1.ClusterDefinition, compDef string, forHScale bool, mappingClusterVersions ...string) {
By("create actionSet")
fakeActionSet(clusterDefObj.Name)
fakeActionSet(actionSetName, clusterDefObj.Name, dpv1alpha1.BackupTypeFull)
fakeActionSet(continuousActionSetName, clusterDefObj.Name, dpv1alpha1.BackupTypeContinuous)

By("Creating a BackupPolicyTemplate")
bpt := testapps.NewBackupPolicyTemplateFactory(backupPolicyTPLName).
Expand All @@ -1618,6 +1650,16 @@ func createBackupPolicyTpl(clusterDefObj *appsv1alpha1.ClusterDefinition, compDe
case appsv1alpha1.Replication:
bpt.SetTargetRole("primary")
}
if !forHScale {
bpt.AddBackupMethod(continuousMethodName, false, continuousActionSetName).
SetComponentDef(compDef).
SetBackupMethodVolumeMounts("data", "/data").
AddBackupMethod(continuousMethodName1, false, continuousActionSetName).
SetComponentDef(compDef).
SetBackupMethodVolumeMounts("data", "/data").
AddSchedule(continuousMethodName, "0 0 * * *", ttl, false).
AddSchedule(continuousMethodName1, "0 0 * * *", ttl, false)
}
}
bpt.Create(&testCtx)
}
27 changes: 15 additions & 12 deletions controllers/apps/component_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ import (
)

const (
backupPolicyTPLName = "test-backup-policy-template-mysql"
backupMethodName = "test-backup-method"
vsBackupMethodName = "test-vs-backup-method"
actionSetName = "test-action-set"
backupPolicyTPLName = "test-backup-policy-template-mysql"
backupMethodName = "test-backup-method"
continuousMethodName = "continuous-backup-method"
continuousMethodName1 = "continuous-backup-method1"
vsBackupMethodName = "test-vs-backup-method"
actionSetName = "test-action-set"
continuousActionSetName = "test-continuous-action-set"
)

var (
Expand Down Expand Up @@ -700,7 +703,7 @@ var _ = Describe("Component Controller", func() {

if policyType == appsv1alpha1.HScaleDataClonePolicyCloneVolume {
By("creating actionSet if backup policy is backup")
fakeActionSet(clusterDef.Name)
fakeActionSet(actionSetName, clusterDef.Name, dpv1alpha1.BackupTypeFull)
}
}
})()).ShouldNot(HaveOccurred())
Expand Down Expand Up @@ -2052,7 +2055,7 @@ var _ = Describe("Component Controller", func() {
Context("provisioning", func() {
BeforeEach(func() {
createAllWorkloadTypesClusterDef()
createBackupPolicyTpl(clusterDefObj, compDefName)
createBackupPolicyTpl(clusterDefObj, compDefName, true)
})

AfterEach(func() {
Expand Down Expand Up @@ -2123,7 +2126,7 @@ var _ = Describe("Component Controller", func() {

BeforeEach(func() {
createAllWorkloadTypesClusterDef()
createBackupPolicyTpl(clusterDefObj, compDefName)
createBackupPolicyTpl(clusterDefObj, compDefName, true)
})

AfterEach(func() {
Expand All @@ -2143,7 +2146,7 @@ var _ = Describe("Component Controller", func() {
BeforeEach(func() {
cleanEnv()
createAllWorkloadTypesClusterDef()
createBackupPolicyTpl(clusterDefObj, compDefName)
createBackupPolicyTpl(clusterDefObj, compDefName, true)
})

createNWaitClusterObj := func(components map[string]string,
Expand Down Expand Up @@ -2220,7 +2223,7 @@ var _ = Describe("Component Controller", func() {

BeforeEach(func() {
createAllWorkloadTypesClusterDef()
createBackupPolicyTpl(clusterDefObj, compDefName)
createBackupPolicyTpl(clusterDefObj, compDefName, true)
mockStorageClass = testk8s.CreateMockStorageClass(&testCtx, testk8s.DefaultStorageClassName)
})

Expand Down Expand Up @@ -2289,7 +2292,7 @@ var _ = Describe("Component Controller", func() {
When("creating cluster with workloadType=consensus component", func() {
BeforeEach(func() {
createAllWorkloadTypesClusterDef()
createBackupPolicyTpl(clusterDefObj, compDefName)
createBackupPolicyTpl(clusterDefObj, compDefName, true)
})

AfterEach(func() {
Expand Down Expand Up @@ -2544,7 +2547,7 @@ func checkRestoreAndSetCompleted(clusterKey types.NamespacedName, compName strin
mockRestoreCompleted(ml)
}

func fakeActionSet(clusterDefName string) *dpv1alpha1.ActionSet {
func fakeActionSet(actionSetName, clusterDefName string, backupType dpv1alpha1.BackupType) *dpv1alpha1.ActionSet {
actionSet := &dpv1alpha1.ActionSet{
ObjectMeta: metav1.ObjectMeta{
Name: actionSetName,
Expand All @@ -2559,7 +2562,7 @@ func fakeActionSet(clusterDefName string) *dpv1alpha1.ActionSet {
Value: "test-value",
},
},
BackupType: dpv1alpha1.BackupTypeFull,
BackupType: backupType,
Backup: &dpv1alpha1.BackupActionSpec{
BackupData: &dpv1alpha1.BackupDataActionSpec{
JobActionSpec: dpv1alpha1.JobActionSpec{
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/opsrequest_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ var _ = Describe("OpsRequest Controller", func() {
}

createMysqlCluster := func(replicas int32) {
createBackupPolicyTpl(clusterDefObj, mysqlCompDefName)
createBackupPolicyTpl(clusterDefObj, mysqlCompDefName, true)

By("set component to horizontal with snapshot policy and create a cluster")
testk8s.MockEnableVolumeSnapshot(&testCtx, testk8s.DefaultStorageClassName)
Expand Down
32 changes: 21 additions & 11 deletions controllers/apps/transformer_cluster_backup_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,8 @@ func (r *clusterBackupPolicyTransformer) mergeClusterBackup(
hasSyncPITRMethod := false
hasSyncIncMethod := false
enableAutoBackup := boolptr.IsSetToTrue(backup.Enabled)
for i, s := range backupSchedule.Spec.Schedules {
for i := range backupSchedule.Spec.Schedules {
s := &backupSchedule.Spec.Schedules[i]
if s.BackupMethod == backup.Method {
mergeSchedulePolicy(sp, &backupSchedule.Spec.Schedules[i])
exist = true
Expand All @@ -671,15 +672,23 @@ func (r *clusterBackupPolicyTransformer) mergeClusterBackup(
r.Error(err, "failed to get ActionSet for backup.", "ActionSet", as.Name)
continue
}
if as.Spec.BackupType == dpv1alpha1.BackupTypeContinuous && backup.PITREnabled != nil && !hasSyncPITRMethod {
// auto-sync the first continuous backup for the 'pirtEnable' option.
backupSchedule.Spec.Schedules[i].Enabled = backup.PITREnabled
switch as.Spec.BackupType {
case dpv1alpha1.BackupTypeContinuous:
if backup.PITREnabled == nil {
continue
}
if boolptr.IsSetToFalse(backup.PITREnabled) || hasSyncPITRMethod ||
(len(backup.ContinuousMethod) > 0 && backup.ContinuousMethod != s.BackupMethod) {
s.Enabled = boolptr.False()
continue
}
// auto-sync the first or specified continuous backup for the 'pirtEnable' option.
s.Enabled = backup.PITREnabled
if backup.RetentionPeriod.String() != "" {
backupSchedule.Spec.Schedules[i].RetentionPeriod = backup.RetentionPeriod
s.RetentionPeriod = backup.RetentionPeriod
}
hasSyncPITRMethod = true
}
if as.Spec.BackupType == dpv1alpha1.BackupTypeIncremental {
case dpv1alpha1.BackupTypeIncremental:
if len(backup.Method) == 0 || m.CompatibleMethod != backup.Method {
// disable other incremental backup schedules
backupSchedule.Spec.Schedules[i].Enabled = boolptr.False()
Expand All @@ -692,10 +701,11 @@ func (r *clusterBackupPolicyTransformer) mergeClusterBackup(
}, &backupSchedule.Spec.Schedules[i])
hasSyncIncMethod = true
}
}
if as.Spec.BackupType == dpv1alpha1.BackupTypeFull && enableAutoBackup {
// disable the automatic backup for other full backup method
backupSchedule.Spec.Schedules[i].Enabled = boolptr.False()
case dpv1alpha1.BackupTypeFull:
if enableAutoBackup {
// disable the automatic backup for other full backup method
s.Enabled = boolptr.False()
}
}
}
if !exist {
Expand Down
4 changes: 4 additions & 0 deletions deploy/helm/crds/apps.kubeblocks.io_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ spec:
backup:
description: Specifies the backup configuration of the Cluster.
properties:
continuousMethod:
description: Specifies the backup method to use, if not set, use
the first continuous method.
type: string
cronExpression:
description: The cron expression for the schedule. The timezone
is in UTC. See https://en.wikipedia.org/wiki/Cron.
Expand Down
12 changes: 12 additions & 0 deletions docs/developer_docs/api-reference/cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -4548,6 +4548,18 @@ bool
</tr>
<tr>
<td>
<code>continuousMethod</code><br/>
<em>
string
</em>
</td>
<td>
<em>(Optional)</em>
<p>Specifies the backup method to use, if not set, use the first continuous method.</p>
</td>
</tr>
<tr>
<td>
<code>incrementalBackupEnabled</code><br/>
<em>
bool
Expand Down
37 changes: 34 additions & 3 deletions pkg/dataprotection/backup/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,18 +365,42 @@ type backupReconfigureRef struct {

type parameterPairs map[string][]appsv1alpha1.ParameterPair

func (s *Scheduler) convertLastAppliedConfigs(continuousMethod string) {
if _, ok := s.BackupSchedule.Annotations[dptypes.LastAppliedConfigsAnnotationKey]; ok {
return
}
lastAppliedConfig := s.BackupSchedule.Annotations[constant.LastAppliedConfigAnnotationKey]
if lastAppliedConfig == "" {
return
}
lastAppliedConfigMap := map[string]string{}
lastAppliedConfigMap[continuousMethod] = lastAppliedConfig
str, _ := json.Marshal(lastAppliedConfigMap)
s.BackupSchedule.Annotations[dptypes.LastAppliedConfigsAnnotationKey] = string(str)
}

func (s *Scheduler) getLastAppliedConfigsMap() (map[string]string, error) {
resMap := map[string]string{}
if err := json.Unmarshal([]byte(s.BackupSchedule.Annotations[dptypes.LastAppliedConfigsAnnotationKey]), &resMap); err != nil {
return nil, err
}
return resMap, nil
}

func (s *Scheduler) reconfigure(schedulePolicy *dpv1alpha1.SchedulePolicy) error {
reCfgRef := s.BackupSchedule.Annotations[dptypes.ReconfigureRefAnnotationKey]
if reCfgRef == "" {
return nil
}
// convert deprecated "lastAppliedConfig "to "lastAppliedConfigs"
s.convertLastAppliedConfigs(schedulePolicy.BackupMethod)
configRef := backupReconfigureRef{}
if err := json.Unmarshal([]byte(reCfgRef), &configRef); err != nil {
return err
}

enable := boolptr.IsSetToTrue(schedulePolicy.Enabled)
if s.BackupSchedule.Annotations[constant.LastAppliedConfigAnnotationKey] == "" && !enable {
if s.BackupSchedule.Annotations[dptypes.LastAppliedConfigsAnnotationKey] == "" && !enable {
// disable in the first policy created, no need reconfigure because default configs had been set.
return nil
}
Expand All @@ -392,9 +416,13 @@ func (s *Scheduler) reconfigure(schedulePolicy *dpv1alpha1.SchedulePolicy) error
// skip reconfigure if not found parameters.
return nil
}
lastAppliedConfigsMap, err := s.getLastAppliedConfigsMap()
if err != nil {
return err
}
updateParameterPairsBytes, _ := json.Marshal(parameters)
updateParameterPairs := string(updateParameterPairsBytes)
if updateParameterPairs == s.BackupSchedule.Annotations[constant.LastAppliedConfigAnnotationKey] {
if updateParameterPairs == lastAppliedConfigsMap[schedulePolicy.BackupMethod] {
// reconcile the config job if finished
return s.reconcileReconfigure(s.BackupSchedule)
}
Expand Down Expand Up @@ -450,7 +478,10 @@ func (s *Scheduler) reconfigure(schedulePolicy *dpv1alpha1.SchedulePolicy) error
if s.BackupSchedule.Annotations == nil {
s.BackupSchedule.Annotations = map[string]string{}
}
s.BackupSchedule.Annotations[constant.LastAppliedConfigAnnotationKey] = updateParameterPairs
lastAppliedConfigsMap[schedulePolicy.BackupMethod] = updateParameterPairs
updateParameterPairsBytes, _ = json.Marshal(lastAppliedConfigsMap)
s.BackupSchedule.Annotations[dptypes.LastAppliedConfigsAnnotationKey] = string(updateParameterPairsBytes)
delete(s.BackupSchedule.Annotations, constant.LastAppliedConfigAnnotationKey)
if err := s.Client.Patch(s.Ctx, s.BackupSchedule, patch); err != nil {
return err
}
Expand Down
Loading
Loading