From 362c27e1823a63c9cc4a9a9cd7c739fd72d6e674 Mon Sep 17 00:00:00 2001 From: Jian Qiu Date: Thu, 5 Dec 2024 10:00:01 +0800 Subject: [PATCH] Add importer into registraiton Signed-off-by: Jian Qiu --- pkg/registration/hub/importer/importer.go | 291 ++++++++++++++++++ .../hub/importer/providers/capi/provider.go | 151 +++++++++ .../hub/importer/providers/interface.go | 26 ++ pkg/registration/hub/importer/renderers.go | 93 ++++++ 4 files changed, 561 insertions(+) create mode 100644 pkg/registration/hub/importer/importer.go create mode 100644 pkg/registration/hub/importer/providers/capi/provider.go create mode 100644 pkg/registration/hub/importer/providers/interface.go create mode 100644 pkg/registration/hub/importer/renderers.go diff --git a/pkg/registration/hub/importer/importer.go b/pkg/registration/hub/importer/importer.go new file mode 100644 index 000000000..946eb6810 --- /dev/null +++ b/pkg/registration/hub/importer/importer.go @@ -0,0 +1,291 @@ +package importer + +import ( + "context" + "fmt" + + "github.com/openshift/api" + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/events" + "github.com/openshift/library-go/pkg/operator/resource/resourceapply" + "github.com/openshift/library-go/pkg/operator/resource/resourcemerge" + appsv1 "k8s.io/api/apps/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" + + clusterclientset "open-cluster-management.io/api/client/cluster/clientset/versioned" + clusterinformerv1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1" + clusterlisterv1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1" + operatorclient "open-cluster-management.io/api/client/operator/clientset/versioned" + v1 "open-cluster-management.io/api/cluster/v1" + operatorv1 "open-cluster-management.io/api/operator/v1" + "open-cluster-management.io/sdk-go/pkg/patcher" + + "open-cluster-management.io/ocm/pkg/common/queue" + "open-cluster-management.io/ocm/pkg/operator/helpers/chart" + cloudproviders "open-cluster-management.io/ocm/pkg/registration/hub/importer/providers" +) + +var ( + genericScheme = runtime.NewScheme() + genericCodecs = serializer.NewCodecFactory(genericScheme) + genericCodec = genericCodecs.UniversalDeserializer() +) + +func init() { + utilruntime.Must(api.InstallKube(genericScheme)) + utilruntime.Must(apiextensionsv1.AddToScheme(genericScheme)) + utilruntime.Must(operatorv1.Install(genericScheme)) +} + +const operatorNamesapce = "open-cluster-management" + +// KlusterletConfigRenderer renders the config for klusterlet chart. +type KlusterletConfigRenderer func( + ctx context.Context, config *chart.KlusterletChartConfig) (*chart.KlusterletChartConfig, error) + +type Importer struct { + providers []cloudproviders.Interface + clusterClient clusterclientset.Interface + clusterLister clusterlisterv1.ManagedClusterLister + renders []KlusterletConfigRenderer + patcher patcher.Patcher[*v1.ManagedCluster, v1.ManagedClusterSpec, v1.ManagedClusterStatus] +} + +func NewImporter( + renders []KlusterletConfigRenderer, + clusterClient clusterclientset.Interface, + clusterInformer clusterinformerv1.ManagedClusterInformer, + providers []cloudproviders.Interface, + recorder events.Recorder) factory.Controller { + controllerName := "managed-cluster-importer" + syncCtx := factory.NewSyncContext(controllerName, recorder) + + i := &Importer{ + providers: providers, + clusterClient: clusterClient, + clusterLister: clusterInformer.Lister(), + renders: renders, + patcher: patcher.NewPatcher[ + *v1.ManagedCluster, v1.ManagedClusterSpec, v1.ManagedClusterStatus]( + clusterClient.ClusterV1().ManagedClusters()), + } + + for _, provider := range providers { + provider.Register(syncCtx) + } + + return factory.New().WithInformersQueueKeysFunc(queue.QueueKeyByMetaName, clusterInformer.Informer()). + WithSyncContext(syncCtx).WithSync(i.sync).ToController(controllerName, recorder) +} + +func (i *Importer) sync(ctx context.Context, syncCtx factory.SyncContext) error { + clusterName := syncCtx.QueueKey() + logger := klog.FromContext(ctx) + logger.V(4).Info("Reconciling key", "clusterName", clusterName) + + cluster, err := i.clusterLister.Get(clusterName) + switch { + case errors.IsNotFound(err): + return nil + case err != nil: + return err + } + + // If the cluster is imported, skip the reconcile + if meta.IsStatusConditionTrue(cluster.Status.Conditions, "Imported") { + return nil + } + + // get provider from the provider list + var provider cloudproviders.Interface + for _, p := range i.providers { + if p.IsManagedClusterOwner(cluster) { + provider = p + } + } + if provider == nil { + logger.Info("provider not found for cluster", "cluster", cluster.Name) + return nil + } + + newCluster := cluster.DeepCopy() + newCluster, err = i.reconcile(ctx, logger, syncCtx.Recorder(), provider, newCluster) + updated, updatedErr := i.patcher.PatchStatus(ctx, newCluster, newCluster.Status, cluster.Status) + if updatedErr != nil { + return updatedErr + } + if err != nil { + return err + } + if updated { + syncCtx.Recorder().Eventf( + "ManagedClusterImported", "managed cluster %s is imported", clusterName) + } + + return nil +} + +func (i *Importer) reconcile( + ctx context.Context, + logger klog.Logger, + recorder events.Recorder, + provider cloudproviders.Interface, + cluster *v1.ManagedCluster) (*v1.ManagedCluster, error) { + config, err := provider.KubeConfig(cluster) + if err != nil { + meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ + Type: "Imported", + Status: metav1.ConditionFalse, + Reason: "KubeConfigGetFailed", + Message: fmt.Sprintf("failed to get kubeconfig. See errors:\n%s", + err.Error()), + }) + return cluster, err + } + + if config == nil { + meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ + Type: "Imported", + Status: metav1.ConditionFalse, + Reason: "KubeConfigNotFound", + Message: "Secret for kubeconfig is not found.", + }) + return cluster, nil + } + + // render the klsuterlet chart config + klusterletChartConfig := &chart.KlusterletChartConfig{ + CreateNamespace: true, + Klusterlet: chart.KlusterletConfig{ + ClusterName: cluster.Name, + }, + } + for _, renderer := range i.renders { + klusterletChartConfig, err = renderer(ctx, klusterletChartConfig) + if err != nil { + meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ + Type: "Imported", + Status: metav1.ConditionFalse, + Reason: "ConfigRendererFailed", + Message: fmt.Sprintf("failed to render config. See errors:\n%s", + err.Error()), + }) + return cluster, err + } + } + rawManifests, err := chart.RenderKlusterletChart(klusterletChartConfig, operatorNamesapce) + if err != nil { + return cluster, err + } + + // build related client + kubeClient, err := kubernetes.NewForConfig(config) + if err != nil { + meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ + Type: "Imported", + Status: metav1.ConditionFalse, + Reason: "ClientBuildErr", + Message: fmt.Sprintf("failed to import the klusterlet. See errors: %s", + err.Error()), + }) + return cluster, err + } + hubApiExtensionClient, err := apiextensionsclient.NewForConfig(config) + if err != nil { + return cluster, err + } + operatorClient, err := operatorclient.NewForConfig(config) + if err != nil { + return cluster, err + } + + clientHolder := resourceapply.NewKubeClientHolder(kubeClient). + WithAPIExtensionsClient(hubApiExtensionClient) + cache := resourceapply.NewResourceCache() + var results []resourceapply.ApplyResult + for _, manifest := range rawManifests { + requiredObj, _, err := genericCodec.Decode(manifest, nil, nil) + if err != nil { + logger.Error(err, "failed to decode manifest", "manifest", manifest) + return cluster, err + } + result := resourceapply.ApplyResult{} + switch t := requiredObj.(type) { + case *appsv1.Deployment: + result.Result, result.Changed, result.Error = resourceapply.ApplyDeployment( + ctx, kubeClient.AppsV1(), recorder, t, 0) + results = append(results, result) + case *operatorv1.Klusterlet: + result.Result, result.Changed, result.Error = ApplyKlusterlet( + ctx, operatorClient, t) + results = append(results, result) + default: + tempResults := resourceapply.ApplyDirectly(ctx, clientHolder, recorder, cache, + func(name string) ([]byte, error) { + return manifest, nil + }, + "manifest") + results = append(results, tempResults...) + } + } + + var errs []error + for _, result := range results { + if result.Error != nil { + errs = append(errs, result.Error) + } + } + if len(errs) > 0 { + meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ + Type: "Imported", + Status: metav1.ConditionFalse, + Reason: "ImportFailed", + Message: fmt.Sprintf("failed to import the klusterlet. See errors:\n%s", + err.Error()), + }) + } else { + meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ + Type: "Imported", + Status: metav1.ConditionTrue, + Reason: "ImportSucceed", + }) + } + + return cluster, utilerrors.NewAggregate(errs) +} + +func ApplyKlusterlet(ctx context.Context, client operatorclient.Interface, required *operatorv1.Klusterlet) (*operatorv1.Klusterlet, bool, error) { + existing, err := client.OperatorV1().Klusterlets().Get(ctx, required.Name, metav1.GetOptions{}) + if errors.IsNotFound(err) { + requiredCopy := required.DeepCopy() + actual, err := client.OperatorV1().Klusterlets().Create(ctx, requiredCopy, metav1.CreateOptions{}) + return actual, true, err + } + if err != nil { + return nil, false, err + } + + modified := pointer.Bool(false) + existingCopy := existing.DeepCopy() + resourcemerge.EnsureObjectMeta(modified, &existingCopy.ObjectMeta, required.ObjectMeta) + + if !*modified && equality.Semantic.DeepEqual(existingCopy.Spec, required.Spec) { + return existingCopy, false, nil + } + + existingCopy.Spec = required.Spec + actual, err := client.OperatorV1().Klusterlets().Update(ctx, existingCopy, metav1.UpdateOptions{}) + return actual, true, err +} diff --git a/pkg/registration/hub/importer/providers/capi/provider.go b/pkg/registration/hub/importer/providers/capi/provider.go new file mode 100644 index 000000000..3cdf60e20 --- /dev/null +++ b/pkg/registration/hub/importer/providers/capi/provider.go @@ -0,0 +1,151 @@ +package capi + +import ( + "context" + "fmt" + "time" + + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/pkg/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" + + clusterinformerv1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1" + clusterv1 "open-cluster-management.io/api/cluster/v1" + + "open-cluster-management.io/ocm/pkg/registration/hub/importer/providers" +) + +var gvr = schema.GroupVersionResource{ + Group: "cluster.x-k8s.io", + Version: "v1beta1", + Resource: "clusters", +} + +const ( + ByCAPIResource = "by-capi-resource" + CAPIAnnotationKey = "cluster.x-k8s.io/cluster" +) + +type CAPIProvider struct { + informer dynamicinformer.DynamicSharedInformerFactory + lister cache.GenericLister + kubeClient kubernetes.Interface + managedClusterIndexer cache.Indexer +} + +func NewCAPIProvider( + kubeconfig *rest.Config, clusterInformer clusterinformerv1.ManagedClusterInformer) providers.Interface { + dynamicClient := dynamic.NewForConfigOrDie(kubeconfig) + kubeClient := kubernetes.NewForConfigOrDie(kubeconfig) + + dynamicInformer := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 30*time.Minute) + + utilruntime.Must(clusterInformer.Informer().AddIndexers(cache.Indexers{ + ByCAPIResource: indexByCAPIResource, + })) + + return &CAPIProvider{ + informer: dynamicInformer, + lister: dynamicInformer.ForResource(gvr).Lister(), + kubeClient: kubeClient, + managedClusterIndexer: clusterInformer.Informer().GetIndexer(), + } +} + +func (c *CAPIProvider) KubeConfig(cluster *clusterv1.ManagedCluster) (*rest.Config, error) { + clusterKey := capiNameFromManagedCluster(cluster) + namespace, name, err := cache.SplitMetaNamespaceKey(clusterKey) + if err != nil { + return nil, err + } + _, err = c.lister.ByNamespace(namespace).Get(name) + switch { + case apierrors.IsNotFound(err): + return nil, nil + case err != nil: + return nil, err + } + + secret, err := c.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), name+"-kubeconfig", metav1.GetOptions{}) + switch { + case apierrors.IsNotFound(err): + return nil, nil + case err != nil: + return nil, err + } + + data, ok := secret.Data["value"] + if !ok { + return nil, errors.Errorf("missing key %q in secret data", name) + } + + configOverride, err := clientcmd.NewClientConfigFromBytes(data) + if err != nil { + return nil, err + } + return configOverride.ClientConfig() +} + +func (c *CAPIProvider) Register(syncCtx factory.SyncContext) { + _, err := c.informer.ForResource(gvr).Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.enqueueManagedClusterByCAPI(obj, syncCtx) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + c.enqueueManagedClusterByCAPI(newObj, syncCtx) + }, + }) + utilruntime.HandleError(err) +} + +func (c *CAPIProvider) IsManagedClusterOwner(cluster *clusterv1.ManagedCluster) bool { + clusterKey := capiNameFromManagedCluster(cluster) + namespace, name, _ := cache.SplitMetaNamespaceKey(clusterKey) + _, err := c.lister.ByNamespace(namespace).Get(name) + return err == nil +} + +func (c *CAPIProvider) Run(ctx context.Context) { + c.informer.Start(ctx.Done()) +} + +func (c *CAPIProvider) enqueueManagedClusterByCAPI(obj interface{}, syncCtx factory.SyncContext) { + accessor, _ := meta.Accessor(obj) + objs, err := c.managedClusterIndexer.ByIndex(ByCAPIResource, fmt.Sprintf( + "%s/%s", accessor.GetNamespace(), accessor.GetName())) + if err != nil { + return + } + for _, obj := range objs { + accessor, _ := meta.Accessor(obj) + syncCtx.Queue().Add(fmt.Sprintf("%s/%s", accessor.GetNamespace(), accessor.GetName())) + } +} + +func indexByCAPIResource(obj interface{}) ([]string, error) { + cluster, ok := obj.(*clusterv1.ManagedCluster) + if !ok { + return []string{}, nil + } + return []string{capiNameFromManagedCluster(cluster)}, nil +} + +func capiNameFromManagedCluster(cluster *clusterv1.ManagedCluster) string { + name := fmt.Sprintf("%s/%s", cluster.Name, cluster.Namespace) + if len(cluster.Annotations) > 0 { + if key, ok := cluster.Annotations[CAPIAnnotationKey]; ok { + return key + } + } + return name +} diff --git a/pkg/registration/hub/importer/providers/interface.go b/pkg/registration/hub/importer/providers/interface.go new file mode 100644 index 000000000..915beb1d8 --- /dev/null +++ b/pkg/registration/hub/importer/providers/interface.go @@ -0,0 +1,26 @@ +package providers + +import ( + "context" + + "github.com/openshift/library-go/pkg/controller/factory" + "k8s.io/client-go/rest" + + clusterv1 "open-cluster-management.io/api/cluster/v1" +) + +// Interface is the interface that a cluster provider should implement +type Interface interface { + // KubeConfig is to return the config to connect to the target cluster. + KubeConfig(cluster *clusterv1.ManagedCluster) (*rest.Config, error) + + // IsManagedClusterOwner check if the provider is used to manage this cluster + IsManagedClusterOwner(cluster *clusterv1.ManagedCluster) bool + + // Register registers the provider to the importer. The provider should enqueue the resource + // into the queue with the name of the managed cluster + Register(syncCtx factory.SyncContext) + + // Run starts the provider + Run(ctx context.Context) +} diff --git a/pkg/registration/hub/importer/renderers.go b/pkg/registration/hub/importer/renderers.go new file mode 100644 index 000000000..9ea37e9ee --- /dev/null +++ b/pkg/registration/hub/importer/renderers.go @@ -0,0 +1,93 @@ +package importer + +import ( + "context" + "fmt" + + "github.com/ghodss/yaml" + authv1 "k8s.io/api/authentication/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + clientcmdapiv1 "k8s.io/client-go/tools/clientcmd/api/v1" + "k8s.io/utils/ptr" + + sdkhelpers "open-cluster-management.io/sdk-go/pkg/helpers" + + "open-cluster-management.io/ocm/pkg/operator/helpers/chart" +) + +func RenderBootstrapHubKubeConfig( + kubeClient kubernetes.Interface, apiServerURL string) KlusterletConfigRenderer { + return func(ctx context.Context, config *chart.KlusterletChartConfig) (*chart.KlusterletChartConfig, error) { + // get bootstrap token + tr, err := kubeClient.CoreV1(). + ServiceAccounts(operatorNamesapce). + CreateToken(ctx, "cluster-bootstrap", &authv1.TokenRequest{ + Spec: authv1.TokenRequestSpec{ + // token expired in 1 hour + ExpirationSeconds: ptr.To[int64](3600), + }, + }, metav1.CreateOptions{}) + if err != nil { + return config, fmt.Errorf( + "failed to get token from sa %s/cluster-bootstrap: %v", operatorNamesapce, err) + } + + // get apisever url + url := apiServerURL + if len(url) == 0 { + url, err = sdkhelpers.GetAPIServer(kubeClient) + if err != nil { + return config, err + } + } + + // get cabundle + ca, err := sdkhelpers.GetCACert(kubeClient) + if err != nil { + return config, err + } + + clientConfig := clientcmdapiv1.Config{ + // Define a cluster stanza based on the bootstrap kubeconfig. + Clusters: []clientcmdapiv1.NamedCluster{ + { + Name: "hub", + Cluster: clientcmdapiv1.Cluster{ + Server: url, + CertificateAuthorityData: ca, + }, + }, + }, + // Define auth based on the obtained client cert. + AuthInfos: []clientcmdapiv1.NamedAuthInfo{ + { + Name: "bootstrap", + AuthInfo: clientcmdapiv1.AuthInfo{ + Token: tr.Status.Token, + }, + }, + }, + // Define a context that connects the auth info and cluster, and set it as the default + Contexts: []clientcmdapiv1.NamedContext{ + { + Name: "bootstrap", + Context: clientcmdapiv1.Context{ + Cluster: "hub", + AuthInfo: "bootstrap", + Namespace: "default", + }, + }, + }, + CurrentContext: "bootstrap", + } + + bootstrapConfigBytes, err := yaml.Marshal(clientConfig) + if err != nil { + return config, err + } + + config.BootstrapHubKubeConfig = string(bootstrapConfigBytes) + return config, nil + } +}