Skip to content

Commit

Permalink
Merge branch 'feature/component-definition' of github.com:apecloud/ku…
Browse files Browse the repository at this point in the history
…beblocks into feature/component-definition
  • Loading branch information
Y-Rookie committed Oct 25, 2023
2 parents 1ca7e8a + 14748b1 commit 3e5dc5a
Show file tree
Hide file tree
Showing 10 changed files with 405 additions and 62 deletions.
2 changes: 2 additions & 0 deletions controllers/apps/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
&AssureMetaTransformer{},
// validate cd & cv's existence and availability
&ValidateAndLoadRefResourcesTransformer{},
// normalize the cluster and component API
&ClusterAPINormalizationTransformer{},
// handle cluster services
&ClusterServiceTransformer{},
// TODO(component): create default cluster connection credential secret object
Expand Down
11 changes: 6 additions & 5 deletions controllers/apps/cluster_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ type clusterTransformContext struct {
Client roclient.ReadonlyClient
record.EventRecorder
logr.Logger
Cluster *appsv1alpha1.Cluster
OrigCluster *appsv1alpha1.Cluster
ClusterDef *appsv1alpha1.ClusterDefinition
ClusterVer *appsv1alpha1.ClusterVersion
ComponentDefs map[string]*appsv1alpha1.ComponentDefinition // TODO(component)
Cluster *appsv1alpha1.Cluster
OrigCluster *appsv1alpha1.Cluster
ClusterDef *appsv1alpha1.ClusterDefinition
ClusterVer *appsv1alpha1.ClusterVersion
ComponentSpecs []*appsv1alpha1.ClusterComponentSpec
ComponentDefs map[string]*appsv1alpha1.ComponentDefinition
}

// clusterPlanBuilder a graph.PlanBuilder implementation for Cluster reconciliation
Expand Down
88 changes: 88 additions & 0 deletions controllers/apps/transformer_cluster_api_normalization.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
Copyright (C) 2022-2023 ApeCloud Co., Ltd
This file is part of KubeBlocks project
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package apps

import (
"fmt"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/controller/apiconversion"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/model"
)

// ClusterAPINormalizationTransformer handles cluster and component API conversion.
type ClusterAPINormalizationTransformer struct{}

var _ graph.Transformer = &ClusterAPINormalizationTransformer{}

func (t *ClusterAPINormalizationTransformer) Transform(ctx graph.TransformContext, dag *graph.DAG) error {
transCtx, _ := ctx.(*clusterTransformContext)
if model.IsObjectDeleting(transCtx.OrigCluster) {
return nil
}

// build all component specs
transCtx.ComponentSpecs = make([]*appsv1alpha1.ClusterComponentSpec, 0)
cluster := transCtx.Cluster
for i := range cluster.Spec.ComponentSpecs {
transCtx.ComponentSpecs = append(transCtx.ComponentSpecs, &cluster.Spec.ComponentSpecs[i])
}
if compSpec := apiconversion.HandleSimplifiedClusterAPI(cluster, transCtx.ClusterDef); compSpec != nil {
transCtx.ComponentSpecs = append(transCtx.ComponentSpecs, compSpec)
}

// build all component definitions referenced
if transCtx.ComponentDefs == nil {
transCtx.ComponentDefs = make(map[string]*appsv1alpha1.ComponentDefinition)
}
for i, compSpec := range transCtx.ComponentSpecs {
if len(compSpec.ComponentDef) == 0 {
compDef, err := t.buildComponentDefinition(transCtx, compSpec)
if err != nil {
return err
}
transCtx.ComponentDefs[compDef.Name] = compDef
transCtx.ComponentSpecs[i].ComponentDef = compDef.Name
} else {
if _, ok := transCtx.ComponentDefs[compSpec.ComponentDef]; !ok {
panic(fmt.Sprintf("runtime error - expected component definition object not found: %s", compSpec.ComponentDef))
}
}
}
return nil
}

func (t *ClusterAPINormalizationTransformer) buildComponentDefinition(transCtx *clusterTransformContext,
clusterCompSpec *appsv1alpha1.ClusterComponentSpec) (*appsv1alpha1.ComponentDefinition, error) {
if clusterCompSpec.ComponentDefRef == "" {
return nil, fmt.Errorf("expected cluster component def ref is empty: %s-%s", transCtx.Cluster.Name, clusterCompSpec.Name)
}
clusterCompDef := transCtx.ClusterDef.GetComponentDefByName(clusterCompSpec.ComponentDefRef)
if clusterCompDef == nil {
return nil, fmt.Errorf("referenced cluster component def is not defined: %s-%s", transCtx.Cluster.Name, clusterCompSpec.Name)
}
var clusterCompVer *appsv1alpha1.ClusterComponentVersion
if transCtx.ClusterVer != nil {
clusterCompVer = transCtx.ClusterVer.Spec.GetDefNameMappingComponents()[clusterCompSpec.ComponentDefRef]
}
return component.BuildComponentDefinitionFrom(clusterCompDef, clusterCompVer, transCtx.Cluster.Name)
}
46 changes: 22 additions & 24 deletions controllers/apps/transformer_cluster_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ func (c *ClusterComponentTransformer) Transform(ctx graph.TransformContext, dag
return nil
}

if cluster.Spec.ComponentSpecs == nil {
// has no components defined
if len(transCtx.ComponentSpecs) == 0 {
return nil
}

protoCompSpecMap := make(map[string]*appsv1alpha1.ClusterComponentSpec)
for _, spec := range cluster.Spec.ComponentSpecs {
protoCompSpecMap[spec.Name] = &spec
for _, spec := range transCtx.ComponentSpecs {
protoCompSpecMap[spec.Name] = spec
}

protoCompSet := sets.KeySet(protoCompSpecMap)
Expand All @@ -73,35 +75,31 @@ func (c *ClusterComponentTransformer) Transform(ctx graph.TransformContext, dag

createCompObjects := func() error {
for compName := range createCompSet {
protoComp, err := component.BuildProtoComponent(reqCtx, c.Client, cluster, protoCompSpecMap[compName])
comp, err := component.BuildProtoComponent2(cluster, protoCompSpecMap[compName])
if err != nil {
return err
}
graphCli.Create(dag, protoComp)
graphCli.Create(dag, comp)
}
return nil
}

updateCompObjects := func() error {
for compName := range updateCompSet {
runningComp, err := getCacheSnapshotComp(reqCtx, c.Client, compName, cluster.Namespace)
if err != nil && apierrors.IsNotFound(err) {
// to be backwards compatible with old API versions, for components that are already running but don't have a component CR, component CR needs to be generated.
protoComp, err := component.BuildProtoComponent(reqCtx, c.Client, cluster, protoCompSpecMap[compName])
if err != nil {
return err
}
graphCli.Create(dag, protoComp)
continue
} else if err != nil {
return err
runningComp, err1 := getCacheSnapshotComp(reqCtx, c.Client, compName, cluster.Namespace)
if err1 != nil && !apierrors.IsNotFound(err1) {
return err1
}
protoComp, err := component.BuildProtoComponent(reqCtx, c.Client, cluster, protoCompSpecMap[compName])
if err != nil {
return err
comp, err2 := component.BuildProtoComponent2(cluster, protoCompSpecMap[compName])
if err2 != nil {
return err2
}
if err1 != nil { // non-exist
// to be backwards compatible with old API versions, for components that are already running but don't have a component CR, component CR needs to be generated.
graphCli.Create(dag, comp)
} else {
graphCli.Update(dag, runningComp, copyAndMergeComponent(runningComp, comp, cluster))
}
newObj := copyAndMergeComponent(runningComp, protoComp, cluster)
graphCli.Update(dag, runningComp, newObj)
}
return nil
}
Expand Down Expand Up @@ -167,9 +165,9 @@ func copyAndMergeComponent(oldCompObj, newCompObj *appsv1alpha1.Component, clust

// getCacheSnapshotComp gets the component object from cache snapshot
func getCacheSnapshotComp(reqCtx ictrlutil.RequestCtx, cli client.Client, compName, namespace string) (*appsv1alpha1.Component, error) {
runningComp := &appsv1alpha1.Component{}
if err := ictrlutil.ValidateExistence(reqCtx.Ctx, cli, types.NamespacedName{Name: compName, Namespace: namespace}, runningComp, false); err != nil {
comp := &appsv1alpha1.Component{}
if err := ictrlutil.ValidateExistence(reqCtx.Ctx, cli, types.NamespacedName{Name: compName, Namespace: namespace}, comp, false); err != nil {
return nil, err
}
return runningComp, nil
return comp, nil
}
26 changes: 25 additions & 1 deletion controllers/apps/transformer_validate_and_load_ref_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
// ValidateAndLoadRefResourcesTransformer handles referenced resources'(cd & cv) validation and load them into context
type ValidateAndLoadRefResourcesTransformer struct{}

var _ graph.Transformer = &ValidateAndLoadRefResourcesTransformer{}

func (t *ValidateAndLoadRefResourcesTransformer) Transform(ctx graph.TransformContext, dag *graph.DAG) error {
transCtx, _ := ctx.(*clusterTransformContext)
cluster := transCtx.Cluster
Expand Down Expand Up @@ -82,7 +84,29 @@ func (t *ValidateAndLoadRefResourcesTransformer) Transform(ctx graph.TransformCo
transCtx.ClusterVer = &appsv1alpha1.ClusterVersion{}
}

if err = t.checkComponentDefinitions(transCtx, cluster); err != nil {
return newRequeueError(requeueDuration, err.Error())
}

return nil
}

var _ graph.Transformer = &ValidateAndLoadRefResourcesTransformer{}
func (t *ValidateAndLoadRefResourcesTransformer) checkComponentDefinitions(ctx *clusterTransformContext, cluster *appsv1alpha1.Cluster) error {
for _, comp := range cluster.Spec.ComponentSpecs {
if len(comp.ComponentDef) == 0 {
continue
}
compDef := &appsv1alpha1.ComponentDefinition{}
if err := ctx.Client.Get(ctx.Context, types.NamespacedName{Name: comp.ComponentDef}, compDef); err != nil {
return err
}
if compDef.Status.Phase != appsv1alpha1.AvailablePhase {
return fmt.Errorf("the componetn definition referenced is unavailable: %s", comp.ComponentDef)
}
if ctx.ComponentDefs == nil {
ctx.ComponentDefs = make(map[string]*appsv1alpha1.ComponentDefinition)
}
ctx.ComponentDefs[compDef.Name] = compDef
}
return nil
}
Loading

0 comments on commit 3e5dc5a

Please sign in to comment.