Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
free6om committed Oct 11, 2023
1 parent 385b383 commit efb6853
Show file tree
Hide file tree
Showing 25 changed files with 269 additions and 450 deletions.
71 changes: 31 additions & 40 deletions controllers/apps/cluster_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package apps
import (
"context"
"fmt"
"github.com/apecloud/kubeblocks/internal/controller/model"
"reflect"

"github.com/go-logr/logr"
Expand All @@ -40,7 +41,6 @@ import (
"github.com/apecloud/kubeblocks/internal/constant"
roclient "github.com/apecloud/kubeblocks/internal/controller/client"
"github.com/apecloud/kubeblocks/internal/controller/graph"
ictrltypes "github.com/apecloud/kubeblocks/internal/controller/types"
intctrlutil "github.com/apecloud/kubeblocks/internal/controllerutil"
)

Expand All @@ -52,8 +52,8 @@ const (

// TODO: cluster plan builder can be abstracted as a common flow

// ClusterTransformContext a graph.TransformContext implementation for Cluster reconciliation
type ClusterTransformContext struct {
// clusterTransformContext a graph.TransformContext implementation for Cluster reconciliation
type clusterTransformContext struct {
context.Context
Client roclient.ReadonlyClient
record.EventRecorder
Expand All @@ -68,7 +68,7 @@ type ClusterTransformContext struct {
type clusterPlanBuilder struct {
req ctrl.Request
cli client.Client
transCtx *ClusterTransformContext
transCtx *clusterTransformContext
transformers graph.TransformerChain
}

Expand All @@ -77,28 +77,28 @@ type clusterPlan struct {
dag *graph.DAG
walkFunc graph.WalkFunc
cli client.Client
transCtx *ClusterTransformContext
transCtx *clusterTransformContext
}

var _ graph.TransformContext = &ClusterTransformContext{}
var _ graph.TransformContext = &clusterTransformContext{}
var _ graph.PlanBuilder = &clusterPlanBuilder{}
var _ graph.Plan = &clusterPlan{}

// TransformContext implementation

func (c *ClusterTransformContext) GetContext() context.Context {
func (c *clusterTransformContext) GetContext() context.Context {
return c.Context
}

func (c *ClusterTransformContext) GetClient() roclient.ReadonlyClient {
func (c *clusterTransformContext) GetClient() roclient.ReadonlyClient {
return c.Client
}

func (c *ClusterTransformContext) GetRecorder() record.EventRecorder {
func (c *clusterTransformContext) GetRecorder() record.EventRecorder {
return c.EventRecorder
}

func (c *ClusterTransformContext) GetLogger() logr.Logger {
func (c *clusterTransformContext) GetLogger() logr.Logger {
return c.Logger
}

Expand All @@ -109,13 +109,7 @@ func (c *clusterPlanBuilder) Init() error {
if err := c.cli.Get(c.transCtx.Context, c.req.NamespacedName, cluster); err != nil {
return err
}

c.transCtx.Cluster = cluster
c.transCtx.OrigCluster = cluster.DeepCopy()
c.transformers = append(c.transformers, &initTransformer{
cluster: c.transCtx.Cluster,
originCluster: c.transCtx.OrigCluster,
})
c.AddTransformer(&initTransformer{cluster: cluster})
return nil
}

Expand Down Expand Up @@ -172,7 +166,7 @@ func (c *clusterPlanBuilder) Build() (graph.Plan, error) {
func (p *clusterPlan) Execute() error {
less := func(v1, v2 graph.Vertex) bool {
getWeight := func(v graph.Vertex) int {
lifecycleVertex, ok := v.(*ictrltypes.LifecycleVertex)
lifecycleVertex, ok := v.(*model.ObjectVertex)
if !ok {
return defaultWeight
}
Expand Down Expand Up @@ -211,17 +205,17 @@ func NewClusterPlanBuilder(ctx intctrlutil.RequestCtx, cli client.Client, req ct
return &clusterPlanBuilder{
req: req,
cli: cli,
transCtx: &ClusterTransformContext{
transCtx: &clusterTransformContext{
Context: ctx.Ctx,
Client: cli,
Client: model.NewGraphClient(cli),
EventRecorder: ctx.Recorder,
Logger: ctx.Log,
},
}
}

func (c *clusterPlanBuilder) defaultWalkFuncWithLogging(vertex graph.Vertex) error {
node, ok := vertex.(*ictrltypes.LifecycleVertex)
node, ok := vertex.(*model.ObjectVertex)
err := c.defaultWalkFunc(vertex)
if err != nil {
if !ok {
Expand All @@ -239,7 +233,7 @@ func (c *clusterPlanBuilder) defaultWalkFuncWithLogging(vertex graph.Vertex) err

// TODO: retry strategy on error
func (c *clusterPlanBuilder) defaultWalkFunc(vertex graph.Vertex) error {
node, ok := vertex.(*ictrltypes.LifecycleVertex)
node, ok := vertex.(*model.ObjectVertex)
if !ok {
return fmt.Errorf("wrong vertex type %v", vertex)
}
Expand All @@ -256,28 +250,25 @@ func (c *clusterPlanBuilder) defaultWalkFunc(vertex graph.Vertex) error {
return c.reconcileObject(node)
}

func (c *clusterPlanBuilder) reconcileObject(node *ictrltypes.LifecycleVertex) error {
func (c *clusterPlanBuilder) reconcileObject(node *model.ObjectVertex) error {
switch *node.Action {
case ictrltypes.CREATE:
case model.CREATE:
err := c.cli.Create(c.transCtx.Context, node.Obj)
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
case ictrltypes.UPDATE:
if node.Immutable {
return nil
}
case model.UPDATE:
err := c.cli.Update(c.transCtx.Context, node.Obj)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
case ictrltypes.PATCH:
patch := client.MergeFrom(node.ObjCopy)
case model.PATCH:
patch := client.MergeFrom(node.OriObj)
if err := c.cli.Patch(c.transCtx.Context, node.Obj, patch); err != nil && !apierrors.IsNotFound(err) {
c.transCtx.Logger.Error(err, fmt.Sprintf("patch %T error", node.ObjCopy))
c.transCtx.Logger.Error(err, fmt.Sprintf("patch %T error", node.OriObj))
return err
}
case ictrltypes.DELETE:
case model.DELETE:
if controllerutil.RemoveFinalizer(node.Obj, constant.DBClusterFinalizerName) {
err := c.cli.Update(c.transCtx.Context, node.Obj)
if err != nil && !apierrors.IsNotFound(err) {
Expand All @@ -292,29 +283,29 @@ func (c *clusterPlanBuilder) reconcileObject(node *ictrltypes.LifecycleVertex) e
return err
}
}
case ictrltypes.STATUS:
patch := client.MergeFrom(node.ObjCopy)
case model.STATUS:
patch := client.MergeFrom(node.OriObj)
if err := c.cli.Status().Patch(c.transCtx.Context, node.Obj, patch); err != nil {
return err
}
// handle condition and phase changing triggered events
if newCluster, ok := node.Obj.(*appsv1alpha1.Cluster); ok {
oldCluster, _ := node.ObjCopy.(*appsv1alpha1.Cluster)
oldCluster, _ := node.OriObj.(*appsv1alpha1.Cluster)
c.emitConditionUpdatingEvent(oldCluster.Status.Conditions, newCluster.Status.Conditions)
c.emitStatusUpdatingEvent(oldCluster.Status, newCluster.Status)
}
case ictrltypes.NOOP:
case model.NOOP:
// nothing
}
return nil
}

func (c *clusterPlanBuilder) reconcileCluster(node *ictrltypes.LifecycleVertex) error {
func (c *clusterPlanBuilder) reconcileCluster(node *model.ObjectVertex) error {
cluster := node.Obj.(*appsv1alpha1.Cluster).DeepCopy()
origCluster := node.ObjCopy.(*appsv1alpha1.Cluster)
origCluster := node.OriObj.(*appsv1alpha1.Cluster)
switch *node.Action {
// cluster.meta and cluster.spec might change
case ictrltypes.STATUS:
case model.STATUS:
if !reflect.DeepEqual(cluster.ObjectMeta, origCluster.ObjectMeta) || !reflect.DeepEqual(cluster.Spec, origCluster.Spec) {
// TODO: we should Update instead of Patch cluster object,
// TODO: but Update failure happens too frequently as other controllers are updating cluster object too.
Expand All @@ -333,7 +324,7 @@ func (c *clusterPlanBuilder) reconcileCluster(node *ictrltypes.LifecycleVertex)
return err
}
}
case ictrltypes.CREATE, ictrltypes.UPDATE:
case model.CREATE, model.UPDATE:
return fmt.Errorf("cluster can't be created or updated: %s", cluster.Name)
}
return nil
Expand Down
Loading

0 comments on commit efb6853

Please sign in to comment.