Skip to content

Commit

Permalink
feat: support running clusters in multiple k8s clusters (#6951)
Browse files Browse the repository at this point in the history
(cherry picked from commit 370ffdd)
  • Loading branch information
leon-inf committed Apr 16, 2024
1 parent 69219a8 commit f50e818
Show file tree
Hide file tree
Showing 65 changed files with 916 additions and 626 deletions.
17 changes: 9 additions & 8 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,10 @@ func main() {

if viper.GetBool(appsFlagKey.viperName()) {
if err = (&appscontrollers.ClusterReconciler{
Client: client,
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("cluster-controller"),
Client: client,
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("cluster-controller"),
MultiClusterMgr: multiClusterMgr,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Cluster")
os.Exit(1)
Expand Down Expand Up @@ -376,7 +377,7 @@ func main() {
Client: client,
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("component-controller"),
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(mgr, multiClusterMgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Component")
os.Exit(1)
}
Expand Down Expand Up @@ -430,7 +431,7 @@ func main() {
Client: client,
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("reconfigure-controller"),
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(mgr, multiClusterMgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ReconfigureRequest")
os.Exit(1)
}
Expand All @@ -439,7 +440,7 @@ func main() {
Client: client,
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("configuration-controller"),
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(mgr, multiClusterMgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Configuration")
os.Exit(1)
}
Expand All @@ -457,7 +458,7 @@ func main() {
Client: client,
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("event-controller"),
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(mgr, multiClusterMgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Event")
os.Exit(1)
}
Expand Down Expand Up @@ -507,7 +508,7 @@ func main() {
Client: client,
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("replicated-state-machine-controller"),
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(mgr, multiClusterMgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ReplicatedStateMachine")
os.Exit(1)
}
Expand Down
8 changes: 6 additions & 2 deletions controllers/apps/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/multicluster"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
)
Expand Down Expand Up @@ -72,8 +73,9 @@ import (
// ClusterReconciler reconciles a Cluster object
type ClusterReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
Scheme *runtime.Scheme
Recorder record.EventRecorder
MultiClusterMgr multicluster.Manager
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
Expand Down Expand Up @@ -135,6 +137,8 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
&clusterLoadRefResourcesTransformer{},
// normalize the cluster and component API
&ClusterAPINormalizationTransformer{},
// placement replicas across data-plane k8s clusters
&clusterPlacementTransformer{multiClusterMgr: r.MultiClusterMgr},
// handle cluster services
&clusterServiceTransformer{},
// handle the restore for cluster
Expand Down
6 changes: 3 additions & 3 deletions controllers/apps/cluster_status_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,15 @@ func getEventInvolvedObject(ctx context.Context, cli client.Client, event *corev
switch event.InvolvedObject.Kind {
case constant.PodKind:
pod := &corev1.Pod{}
err = cli.Get(ctx, objectKey, pod)
err = cli.Get(ctx, objectKey, pod, inDataContextUnspecified4C())
return pod, err
case constant.StatefulSetKind:
sts := &appsv1.StatefulSet{}
err = cli.Get(ctx, objectKey, sts)
err = cli.Get(ctx, objectKey, sts, inDataContextUnspecified4C())
return sts, err
case constant.DeploymentKind:
deployment := &appsv1.Deployment{}
err = cli.Get(ctx, objectKey, deployment)
err = cli.Get(ctx, objectKey, deployment, inDataContextUnspecified4C())
return deployment, err
}
return nil, err
Expand Down
37 changes: 35 additions & 2 deletions controllers/apps/component_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/multicluster"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
)
Expand Down Expand Up @@ -111,7 +112,7 @@ func (r *ComponentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

reqCtx.Log.V(1).Info("reconcile", "component", req.NamespacedName)

planBuilder := newComponentPlanBuilder(reqCtx, r.Client, req)
planBuilder := newComponentPlanBuilder(reqCtx, r.Client)
if err := planBuilder.Init(); err != nil {
return intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "")
}
Expand Down Expand Up @@ -183,11 +184,18 @@ func (r *ComponentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}

// SetupWithManager sets up the controller with the Manager.
func (r *ComponentReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *ComponentReconciler) SetupWithManager(mgr ctrl.Manager, multiClusterMgr multicluster.Manager) error {
retryDurationMS := viper.GetInt(constant.CfgKeyCtrlrReconcileRetryDurationMS)
if retryDurationMS != 0 {
requeueDuration = time.Millisecond * time.Duration(retryDurationMS)
}
if multiClusterMgr == nil {
return r.setupWithManager(mgr)
}
return r.setupWithMultiClusterManager(mgr, multiClusterMgr)
}

func (r *ComponentReconciler) setupWithManager(mgr ctrl.Manager) error {
b := intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&appsv1alpha1.Component{}).
WithOptions(controller.Options{
Expand Down Expand Up @@ -217,6 +225,31 @@ func (r *ComponentReconciler) SetupWithManager(mgr ctrl.Manager) error {
return b.Complete(r)
}

func (r *ComponentReconciler) setupWithMultiClusterManager(mgr ctrl.Manager, multiClusterMgr multicluster.Manager) error {
b := intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&appsv1alpha1.Component{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers),
}).
Watches(&workloads.ReplicatedStateMachine{}, handler.EnqueueRequestsFromMapFunc(r.filterComponentResources)).
Owns(&dpv1alpha1.Backup{}).
Owns(&dpv1alpha1.Restore{}).
Watches(&appsv1alpha1.Configuration{}, handler.EnqueueRequestsFromMapFunc(r.configurationEventHandler))

eventHandler := handler.EnqueueRequestsFromMapFunc(r.filterComponentResources)
multiClusterMgr.Watch(b, &corev1.Service{}, eventHandler).
Watch(b, &corev1.Secret{}, eventHandler).
Watch(b, &corev1.ConfigMap{}, eventHandler).
Watch(b, &corev1.PersistentVolumeClaim{}, eventHandler).
Watch(b, &batchv1.Job{}, eventHandler).
Watch(b, &corev1.ServiceAccount{}, eventHandler).
Watch(b, &rbacv1.RoleBinding{}, eventHandler).
Watch(b, &rbacv1.ClusterRoleBinding{}, eventHandler).
Watch(b, &corev1.Pod{}, eventHandler)

return b.Complete(r)
}

func (r *ComponentReconciler) filterComponentResources(ctx context.Context, obj client.Object) []reconcile.Request {
labels := obj.GetLabels()
if v, ok := labels[constant.AppManagedByLabelKey]; !ok || v != constant.AppName {
Expand Down
49 changes: 23 additions & 26 deletions controllers/apps/component_hscale_volume_populator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type dataClone interface {
// Succeed check if data clone succeeded
Succeed() (bool, error)
// CloneData do clone data, return objects that need to be created
CloneData(dataClone) ([]client.Object, error)
CloneData(dataClone) ([]client.Object, []client.Object, error)
// ClearTmpResources clear all the temporary resources created during data clone, return objects that need to be deleted
ClearTmpResources() ([]client.Object, error)

Expand All @@ -70,7 +70,7 @@ func newDataClone(reqCtx intctrlutil.RequestCtx,
component *component.SynthesizedComponent,
stsObj *appsv1.StatefulSet,
stsProto *appsv1.StatefulSet,
key types.NamespacedName) (dataClone, error) {
backupKey types.NamespacedName) (dataClone, error) {
if component == nil {
return nil, nil
}
Expand All @@ -83,7 +83,7 @@ func newDataClone(reqCtx intctrlutil.RequestCtx,
component: component,
stsObj: stsObj,
stsProto: stsProto,
key: key,
backupKey: backupKey,
},
}, nil
}
Expand All @@ -96,7 +96,7 @@ func newDataClone(reqCtx intctrlutil.RequestCtx,
component: component,
stsObj: stsObj,
stsProto: stsProto,
key: key,
backupKey: backupKey,
},
}, nil
}
Expand All @@ -111,29 +111,29 @@ type baseDataClone struct {
component *component.SynthesizedComponent
stsObj *appsv1.StatefulSet
stsProto *appsv1.StatefulSet
key types.NamespacedName
backupKey types.NamespacedName
}

func (d *baseDataClone) CloneData(realDataClone dataClone) ([]client.Object, error) {
func (d *baseDataClone) CloneData(realDataClone dataClone) ([]client.Object, []client.Object, error) {
objs := make([]client.Object, 0)

// check backup ready
status, err := realDataClone.CheckBackupStatus()
if err != nil {
return nil, err
return nil, nil, err
}
switch status {
case backupStatusNotCreated:
// create backup
backupObjs, err := realDataClone.backup()
if err != nil {
return nil, err
return nil, nil, err
}
objs = append(objs, backupObjs...)
return objs, nil
return objs, nil, nil
case backupStatusProcessing, backupStatusFailed:
// requeue to waiting for backup ready
return objs, nil
return objs, nil, nil
case backupStatusReadyToUse:
break
default:
Expand All @@ -144,13 +144,13 @@ func (d *baseDataClone) CloneData(realDataClone dataClone) ([]client.Object, err
for i := *d.stsObj.Spec.Replicas; i < d.component.Replicas; i++ {
restoreStatus, err := realDataClone.CheckRestoreStatus(i)
if err != nil {
return nil, err
return nil, nil, err
}
switch restoreStatus {
case "":
restoreObjs, err := realDataClone.restore(i)
if err != nil {
return nil, err
return nil, nil, err
}
objs = append(objs, restoreObjs...)
case dpv1alpha1.RestorePhaseCompleted:
Expand All @@ -160,16 +160,14 @@ func (d *baseDataClone) CloneData(realDataClone dataClone) ([]client.Object, err
// create PVCs that do not need to restore
pvcObjs, err := d.createPVCs(d.excludeBackupVCTs())
if err != nil {
return nil, err
return nil, nil, err
}
objs = append(objs, pvcObjs...)

return objs, nil
return objs, pvcObjs, nil
}

func (d *baseDataClone) isPVCExists(pvcKey types.NamespacedName) (bool, error) {
pvc := corev1.PersistentVolumeClaim{}
if err := d.cli.Get(d.reqCtx.Ctx, pvcKey, &pvc); err != nil {
if err := d.cli.Get(d.reqCtx.Ctx, pvcKey, &pvc, inDataContext4C()); err != nil {
return false, client.IgnoreNotFound(err)
}
return true, nil
Expand Down Expand Up @@ -257,8 +255,9 @@ func (d *dummyDataClone) Succeed() (bool, error) {
return d.checkAllPVCsExist()
}

func (d *dummyDataClone) CloneData(dataClone) ([]client.Object, error) {
return d.createPVCs(d.allVCTs())
func (d *dummyDataClone) CloneData(dataClone) ([]client.Object, []client.Object, error) {
pvcObjs, err := d.createPVCs(d.allVCTs())
return nil, pvcObjs, err
}

func (d *dummyDataClone) ClearTmpResources() ([]client.Object, error) {
Expand Down Expand Up @@ -333,7 +332,6 @@ func (d *backupDataClone) ClearTmpResources() ([]client.Object, error) {
}

func (d *backupDataClone) backup() ([]client.Object, error) {
objs := make([]client.Object, 0)
componentDef := func() string {
name := d.component.CompDefName
if name == "" {
Expand All @@ -359,14 +357,13 @@ func (d *backupDataClone) backup() ([]client.Object, error) {
} else if len(backupMethods) > 1 {
return nil, fmt.Errorf("more than one backup methods found in backup policy %s", backupPolicy.Name)
}
backup := factory.BuildBackup(d.cluster, d.component, backupPolicy.Name, d.key, backupMethods[0])
objs = append(objs, backup)
return objs, nil
backup := factory.BuildBackup(d.cluster, d.component, backupPolicy.Name, d.backupKey, backupMethods[0])
return []client.Object{backup}, nil
}

func (d *backupDataClone) CheckBackupStatus() (backupStatus, error) {
backup := dpv1alpha1.Backup{}
if err := d.cli.Get(d.reqCtx.Ctx, d.key, &backup); err != nil {
if err := d.cli.Get(d.reqCtx.Ctx, d.backupKey, &backup); err != nil {
if errors.IsNotFound(err) {
return backupStatusNotCreated, nil
} else {
Expand All @@ -386,7 +383,7 @@ func (d *backupDataClone) CheckBackupStatus() (backupStatus, error) {

func (d *backupDataClone) restore(startingIndex int32) ([]client.Object, error) {
backup := &dpv1alpha1.Backup{}
if err := d.cli.Get(d.reqCtx.Ctx, d.key, backup); err != nil {
if err := d.cli.Get(d.reqCtx.Ctx, d.backupKey, backup); err != nil {
return nil, err
}
restoreMGR := plan.NewRestoreManager(d.reqCtx.Ctx, d.cli, d.cluster, nil, d.getBRLabels(), int32(1), startingIndex)
Expand Down Expand Up @@ -458,7 +455,7 @@ func isVolumeSnapshotEnabled(ctx context.Context, cli client.Client,
Name: fmt.Sprintf("%s-%s-%d", vct.Name, sts.Name, 0),
}
pvc := corev1.PersistentVolumeClaim{}
if err := cli.Get(ctx, pvcKey, &pvc); err != nil {
if err := cli.Get(ctx, pvcKey, &pvc, inDataContext4C()); err != nil {
return false, client.IgnoreNotFound(err)
}

Expand Down
11 changes: 3 additions & 8 deletions controllers/apps/component_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ type componentPlanBuilder struct {
type componentPlan struct {
dag *graph.DAG
walkFunc graph.WalkFunc
cli client.Client
transCtx *componentTransformContext
}

Expand All @@ -97,10 +96,7 @@ func (c *componentPlanBuilder) Init() error {

c.transCtx.Component = comp
c.transCtx.ComponentOrig = comp.DeepCopy()
c.transformers = append(c.transformers, &componentInitTransformer{
Component: c.transCtx.Component,
ComponentOrig: c.transCtx.ComponentOrig,
})
c.transformers = append(c.transformers, &componentInitTransformer{})
return nil
}

Expand All @@ -126,7 +122,6 @@ func (c *componentPlanBuilder) Build() (graph.Plan, error) {
plan := &componentPlan{
dag: dag,
walkFunc: c.defaultWalkFuncWithLogging,
cli: c.cli,
transCtx: c.transCtx,
}
return plan, err
Expand All @@ -141,9 +136,9 @@ func (p *componentPlan) Execute() error {
}

// newComponentPlanBuilder returns a componentPlanBuilder powered PlanBuilder
func newComponentPlanBuilder(ctx intctrlutil.RequestCtx, cli client.Client, req ctrl.Request) graph.PlanBuilder {
func newComponentPlanBuilder(ctx intctrlutil.RequestCtx, cli client.Client) graph.PlanBuilder {
return &componentPlanBuilder{
req: req,
req: ctx.Req,
cli: cli,
transCtx: &componentTransformContext{
Context: ctx.Ctx,
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/component_plan_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ var _ = Describe("component plan builder test", func() {
Req: req,
Log: log.FromContext(ctx).WithValues("component", req.NamespacedName),
}
planBuilder := newComponentPlanBuilder(reqCtx, testCtx.Cli, req)
planBuilder := newComponentPlanBuilder(reqCtx, testCtx.Cli)
Expect(planBuilder.Init()).Should(Succeed())
})
})
Expand Down
Loading

0 comments on commit f50e818

Please sign in to comment.