Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add importer into registration #753

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
k8s.io/kube-aggregator v0.31.3
k8s.io/utils v0.0.0-20240921022957-49e7df575cb6
open-cluster-management.io/addon-framework v0.11.1-0.20241129080247-57b1d2859f50
open-cluster-management.io/api v0.15.1-0.20241209025232-b62746ae96d4
open-cluster-management.io/api v0.15.1-0.20241210025410-0ba6809d0ae2
open-cluster-management.io/sdk-go v0.15.1-0.20241125015855-1536c3970f8f
sigs.k8s.io/cluster-inventory-api v0.0.0-20240730014211-ef0154379848
sigs.k8s.io/controller-runtime v0.19.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,8 @@ k8s.io/utils v0.0.0-20240921022957-49e7df575cb6 h1:MDF6h2H/h4tbzmtIKTuctcwZmY0tY
k8s.io/utils v0.0.0-20240921022957-49e7df575cb6/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
open-cluster-management.io/addon-framework v0.11.1-0.20241129080247-57b1d2859f50 h1:TXRd6OdGjArh6cwlCYOqlIcyx21k81oUIYj4rmHlYx0=
open-cluster-management.io/addon-framework v0.11.1-0.20241129080247-57b1d2859f50/go.mod h1:tsBSNs9mGfVQQjXBnjgpiX6r0UM+G3iNfmzQgKhEfw4=
open-cluster-management.io/api v0.15.1-0.20241209025232-b62746ae96d4 h1:f6KU3t9s0PA6vXmAjB6A9sd52OqBqOFK2uAhk3UUBKs=
open-cluster-management.io/api v0.15.1-0.20241209025232-b62746ae96d4/go.mod h1:9erZEWEn4bEqh0nIX2wA7f/s3KCuFycQdBrPrRzi0QM=
open-cluster-management.io/api v0.15.1-0.20241210025410-0ba6809d0ae2 h1:zkp3VJnvexYk5fMf9/yFt6P0fQmp1WFd6Q/Y2t2jF5Q=
open-cluster-management.io/api v0.15.1-0.20241210025410-0ba6809d0ae2/go.mod h1:9erZEWEn4bEqh0nIX2wA7f/s3KCuFycQdBrPrRzi0QM=
open-cluster-management.io/sdk-go v0.15.1-0.20241125015855-1536c3970f8f h1:zeC7QrFNarfK2zY6jGtd+mX+yDrQQmnH/J8A7n5Nh38=
open-cluster-management.io/sdk-go v0.15.1-0.20241125015855-1536c3970f8f/go.mod h1:fi5WBsbC5K3txKb8eRLuP0Sim/Oqz/PHX18skAEyjiA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 h1:2770sDpzrjjsAtVhSeUFseziht227YAWYHLGNM8QPwY=
Expand Down
285 changes: 285 additions & 0 deletions pkg/registration/hub/importer/importer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
package importer

import (
"context"
"fmt"

"github.com/openshift/api"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
"github.com/openshift/library-go/pkg/operator/resource/resourcehelper"
"github.com/openshift/library-go/pkg/operator/resource/resourcemerge"
appsv1 "k8s.io/api/apps/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"

clusterclientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterinformerv1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
clusterlisterv1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
operatorclient "open-cluster-management.io/api/client/operator/clientset/versioned"
v1 "open-cluster-management.io/api/cluster/v1"
operatorv1 "open-cluster-management.io/api/operator/v1"
"open-cluster-management.io/sdk-go/pkg/patcher"

"open-cluster-management.io/ocm/pkg/common/queue"
"open-cluster-management.io/ocm/pkg/operator/helpers/chart"
cloudproviders "open-cluster-management.io/ocm/pkg/registration/hub/importer/providers"
)

const (
operatorNamesapce = "open-cluster-management"
bootstrapSA = "cluster-bootstrap"
ManagedClusterConditionImported = "Imported"
)

var (
genericScheme = runtime.NewScheme()
genericCodecs = serializer.NewCodecFactory(genericScheme)
genericCodec = genericCodecs.UniversalDeserializer()
)

func init() {
utilruntime.Must(api.InstallKube(genericScheme))
utilruntime.Must(apiextensionsv1.AddToScheme(genericScheme))
utilruntime.Must(operatorv1.Install(genericScheme))
}

// KlusterletConfigRenderer renders the config for klusterlet chart.
type KlusterletConfigRenderer func(
ctx context.Context, config *chart.KlusterletChartConfig) (*chart.KlusterletChartConfig, error)

type Importer struct {
providers []cloudproviders.Interface
clusterClient clusterclientset.Interface
clusterLister clusterlisterv1.ManagedClusterLister
renders []KlusterletConfigRenderer
patcher patcher.Patcher[*v1.ManagedCluster, v1.ManagedClusterSpec, v1.ManagedClusterStatus]
}

// NewImporter creates an auto import controller
func NewImporter(
renders []KlusterletConfigRenderer,
clusterClient clusterclientset.Interface,
clusterInformer clusterinformerv1.ManagedClusterInformer,
providers []cloudproviders.Interface,
recorder events.Recorder) factory.Controller {
controllerName := "managed-cluster-importer"
syncCtx := factory.NewSyncContext(controllerName, recorder)

i := &Importer{
providers: providers,
clusterClient: clusterClient,
clusterLister: clusterInformer.Lister(),
renders: renders,
patcher: patcher.NewPatcher[
*v1.ManagedCluster, v1.ManagedClusterSpec, v1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters()),
}

for _, provider := range providers {
provider.Register(syncCtx)
}

return factory.New().WithInformersQueueKeysFunc(queue.QueueKeyByMetaName, clusterInformer.Informer()).
WithSyncContext(syncCtx).WithSync(i.sync).ToController(controllerName, recorder)
}

func (i *Importer) sync(ctx context.Context, syncCtx factory.SyncContext) error {
clusterName := syncCtx.QueueKey()
logger := klog.FromContext(ctx)
logger.V(4).Info("Reconciling key", "clusterName", clusterName)

cluster, err := i.clusterLister.Get(clusterName)
switch {
case errors.IsNotFound(err):
return nil
case err != nil:
return err
}

// If the cluster is imported, skip the reconcile
if meta.IsStatusConditionTrue(cluster.Status.Conditions, ManagedClusterConditionImported) {
return nil
}

// get provider from the provider list
var provider cloudproviders.Interface
for _, p := range i.providers {
if p.IsManagedClusterOwner(cluster) {
provider = p
qiujian16 marked this conversation as resolved.
Show resolved Hide resolved
break
}
}
if provider == nil {
logger.V(2).Info("provider not found for cluster", "cluster", cluster.Name)
return nil
}

newCluster := cluster.DeepCopy()
newCluster, err = i.reconcile(ctx, logger, syncCtx.Recorder(), provider, newCluster)
updated, updatedErr := i.patcher.PatchStatus(ctx, newCluster, newCluster.Status, cluster.Status)
if updatedErr != nil {
return updatedErr
}
if err != nil {
return err
}
if updated {
syncCtx.Recorder().Eventf(
"ManagedClusterImported", "managed cluster %s is imported", clusterName)
}

return nil
}

func (i *Importer) reconcile(
ctx context.Context,
logger klog.Logger,
recorder events.Recorder,
provider cloudproviders.Interface,
cluster *v1.ManagedCluster) (*v1.ManagedCluster, error) {
clients, err := provider.Clients(ctx, cluster)
if err != nil {
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
Type: ManagedClusterConditionImported,
Status: metav1.ConditionFalse,
Reason: "KubeConfigGetFailed",
Message: fmt.Sprintf("failed to get kubeconfig. See errors:\n%s",
err.Error()),
})
return cluster, err
}

if clients == nil {
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
Type: ManagedClusterConditionImported,
Status: metav1.ConditionFalse,
Reason: "KubeConfigNotFound",
Message: "Secret for kubeconfig is not found.",
})
return cluster, nil
}

// render the klsuterlet chart config
klusterletChartConfig := &chart.KlusterletChartConfig{
CreateNamespace: true,
Klusterlet: chart.KlusterletConfig{
Create: true,
ClusterName: cluster.Name,
ResourceRequirement: operatorv1.ResourceRequirement{
Type: operatorv1.ResourceQosClassDefault,
},
},
}
for _, renderer := range i.renders {
klusterletChartConfig, err = renderer(ctx, klusterletChartConfig)
if err != nil {
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
Type: ManagedClusterConditionImported,
Status: metav1.ConditionFalse,
Reason: "ConfigRendererFailed",
Message: fmt.Sprintf("failed to render config. See errors:\n%s",
err.Error()),
})
return cluster, err
}
}
rawManifests, err := chart.RenderKlusterletChart(klusterletChartConfig, operatorNamesapce)
if err != nil {
return cluster, err
}

clientHolder := resourceapply.NewKubeClientHolder(clients.KubeClient).
WithAPIExtensionsClient(clients.APIExtClient).WithDynamicClient(clients.DynamicClient)
cache := resourceapply.NewResourceCache()
var results []resourceapply.ApplyResult
for _, manifest := range rawManifests {
requiredObj, _, err := genericCodec.Decode(manifest, nil, nil)
if err != nil {
logger.Error(err, "failed to decode manifest", "manifest", manifest)
return cluster, err
}
result := resourceapply.ApplyResult{}
switch t := requiredObj.(type) {
case *appsv1.Deployment:
result.Result, result.Changed, result.Error = resourceapply.ApplyDeployment(
ctx, clients.KubeClient.AppsV1(), recorder, t, 0)
results = append(results, result)
case *operatorv1.Klusterlet:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I don't quite understand why we handle the Deployment and Klusterlet separately, instead of using the resourceapply.ApplyDirectly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it only support ApplyDirectly only can apply certain static resource, deployment/klusterlet can do be applied with this func.

result.Result, result.Changed, result.Error = ApplyKlusterlet(
ctx, clients.OperatorClient, recorder, t)
results = append(results, result)
default:
tempResults := resourceapply.ApplyDirectly(ctx, clientHolder, recorder, cache,
func(name string) ([]byte, error) {
return manifest, nil
},
"manifest")
results = append(results, tempResults...)
}
}

var errs []error
for _, result := range results {
if result.Error != nil {
errs = append(errs, result.Error)
}
}
if len(errs) > 0 {
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
Type: ManagedClusterConditionImported,
Status: metav1.ConditionFalse,
Reason: "ImportFailed",
Message: fmt.Sprintf("failed to import the klusterlet. See errors:\n%s",
utilerrors.NewAggregate(errs).Error()),
})
} else {
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
Type: ManagedClusterConditionImported,
Status: metav1.ConditionTrue,
Reason: "ImportSucceed",
})
}

return cluster, utilerrors.NewAggregate(errs)
}

func ApplyKlusterlet(
ctx context.Context,
client operatorclient.Interface,
recorder events.Recorder,
required *operatorv1.Klusterlet) (*operatorv1.Klusterlet, bool, error) {
existing, err := client.OperatorV1().Klusterlets().Get(ctx, required.Name, metav1.GetOptions{})
if errors.IsNotFound(err) {
requiredCopy := required.DeepCopy()
actual, err := client.OperatorV1().Klusterlets().Create(ctx, requiredCopy, metav1.CreateOptions{})
resourcehelper.ReportCreateEvent(recorder, required, err)
return actual, true, err
}
if err != nil {
return nil, false, err
}

modified := pointer.Bool(false)
existingCopy := existing.DeepCopy()
resourcemerge.EnsureObjectMeta(modified, &existingCopy.ObjectMeta, required.ObjectMeta)

if !*modified && equality.Semantic.DeepEqual(existingCopy.Spec, required.Spec) {
return existingCopy, false, nil
}

existingCopy.Spec = required.Spec
actual, err := client.OperatorV1().Klusterlets().Update(ctx, existingCopy, metav1.UpdateOptions{})
resourcehelper.ReportUpdateEvent(recorder, required, err)
return actual, true, err
}
Loading
Loading