Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Jian Qiu <[email protected]>
  • Loading branch information
qiujian16 committed Dec 10, 2024
1 parent 9ee219c commit 68369c8
Show file tree
Hide file tree
Showing 7 changed files with 529 additions and 48 deletions.
66 changes: 27 additions & 39 deletions pkg/registration/hub/importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
"github.com/openshift/library-go/pkg/operator/resource/resourcehelper"
"github.com/openshift/library-go/pkg/operator/resource/resourcemerge"
appsv1 "k8s.io/api/apps/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -20,7 +20,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/serializer"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"

Expand All @@ -37,6 +36,12 @@ import (
cloudproviders "open-cluster-management.io/ocm/pkg/registration/hub/importer/providers"
)

const (
operatorNamesapce = "open-cluster-management"
bootstrapSA = "cluster-bootstrap"
conditionTypeImported = "Imported"
)

var (
genericScheme = runtime.NewScheme()
genericCodecs = serializer.NewCodecFactory(genericScheme)
Expand All @@ -49,8 +54,6 @@ func init() {
utilruntime.Must(operatorv1.Install(genericScheme))
}

const operatorNamesapce = "open-cluster-management"

// KlusterletConfigRenderer renders the config for klusterlet chart.
type KlusterletConfigRenderer func(
ctx context.Context, config *chart.KlusterletChartConfig) (*chart.KlusterletChartConfig, error)
Expand Down Expand Up @@ -104,7 +107,7 @@ func (i *Importer) sync(ctx context.Context, syncCtx factory.SyncContext) error
}

// If the cluster is imported, skip the reconcile
if meta.IsStatusConditionTrue(cluster.Status.Conditions, "Imported") {
if meta.IsStatusConditionTrue(cluster.Status.Conditions, conditionTypeImported) {
return nil
}

Expand Down Expand Up @@ -143,10 +146,10 @@ func (i *Importer) reconcile(
recorder events.Recorder,
provider cloudproviders.Interface,
cluster *v1.ManagedCluster) (*v1.ManagedCluster, error) {
config, err := provider.KubeConfig(cluster)
clients, err := provider.Clients(cluster)
if err != nil {
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
Type: "Imported",
Type: conditionTypeImported,
Status: metav1.ConditionFalse,
Reason: "KubeConfigGetFailed",
Message: fmt.Sprintf("failed to get kubeconfig. See errors:\n%s",
Expand All @@ -155,9 +158,9 @@ func (i *Importer) reconcile(
return cluster, err
}

if config == nil {
if clients == nil {
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
Type: "Imported",
Type: conditionTypeImported,
Status: metav1.ConditionFalse,
Reason: "KubeConfigNotFound",
Message: "Secret for kubeconfig is not found.",
Expand All @@ -176,7 +179,7 @@ func (i *Importer) reconcile(
klusterletChartConfig, err = renderer(ctx, klusterletChartConfig)
if err != nil {
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
Type: "Imported",
Type: conditionTypeImported,
Status: metav1.ConditionFalse,
Reason: "ConfigRendererFailed",
Message: fmt.Sprintf("failed to render config. See errors:\n%s",
Expand All @@ -190,29 +193,8 @@ func (i *Importer) reconcile(
return cluster, err
}

// build related client
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
Type: "Imported",
Status: metav1.ConditionFalse,
Reason: "ClientBuildErr",
Message: fmt.Sprintf("failed to import the klusterlet. See errors: %s",
err.Error()),
})
return cluster, err
}
hubApiExtensionClient, err := apiextensionsclient.NewForConfig(config)
if err != nil {
return cluster, err
}
operatorClient, err := operatorclient.NewForConfig(config)
if err != nil {
return cluster, err
}

clientHolder := resourceapply.NewKubeClientHolder(kubeClient).
WithAPIExtensionsClient(hubApiExtensionClient)
clientHolder := resourceapply.NewKubeClientHolder(clients.KubeClient).
WithAPIExtensionsClient(clients.APIExtClient).WithDynamicClient(clients.DynamicClient)
cache := resourceapply.NewResourceCache()
var results []resourceapply.ApplyResult
for _, manifest := range rawManifests {
Expand All @@ -225,11 +207,11 @@ func (i *Importer) reconcile(
switch t := requiredObj.(type) {
case *appsv1.Deployment:
result.Result, result.Changed, result.Error = resourceapply.ApplyDeployment(
ctx, kubeClient.AppsV1(), recorder, t, 0)
ctx, clients.KubeClient.AppsV1(), recorder, t, 0)
results = append(results, result)
case *operatorv1.Klusterlet:
result.Result, result.Changed, result.Error = ApplyKlusterlet(
ctx, operatorClient, t)
ctx, clients.OperatorClient, recorder, t)
results = append(results, result)
default:
tempResults := resourceapply.ApplyDirectly(ctx, clientHolder, recorder, cache,
Expand All @@ -249,15 +231,15 @@ func (i *Importer) reconcile(
}
if len(errs) > 0 {
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
Type: "Imported",
Type: conditionTypeImported,
Status: metav1.ConditionFalse,
Reason: "ImportFailed",
Message: fmt.Sprintf("failed to import the klusterlet. See errors:\n%s",
err.Error()),
utilerrors.NewAggregate(errs).Error()),
})
} else {
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
Type: "Imported",
Type: conditionTypeImported,
Status: metav1.ConditionTrue,
Reason: "ImportSucceed",
})
Expand All @@ -266,11 +248,16 @@ func (i *Importer) reconcile(
return cluster, utilerrors.NewAggregate(errs)
}

func ApplyKlusterlet(ctx context.Context, client operatorclient.Interface, required *operatorv1.Klusterlet) (*operatorv1.Klusterlet, bool, error) {
func ApplyKlusterlet(
ctx context.Context,
client operatorclient.Interface,
recorder events.Recorder,
required *operatorv1.Klusterlet) (*operatorv1.Klusterlet, bool, error) {
existing, err := client.OperatorV1().Klusterlets().Get(ctx, required.Name, metav1.GetOptions{})
if errors.IsNotFound(err) {
requiredCopy := required.DeepCopy()
actual, err := client.OperatorV1().Klusterlets().Create(ctx, requiredCopy, metav1.CreateOptions{})
resourcehelper.ReportCreateEvent(recorder, required, err)
return actual, true, err
}
if err != nil {
Expand All @@ -287,5 +274,6 @@ func ApplyKlusterlet(ctx context.Context, client operatorclient.Interface, requi

existingCopy.Spec = required.Spec
actual, err := client.OperatorV1().Klusterlets().Update(ctx, existingCopy, metav1.UpdateOptions{})
resourcehelper.ReportUpdateEvent(recorder, required, err)
return actual, true, err
}
152 changes: 152 additions & 0 deletions pkg/registration/hub/importer/importer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package importer

import (
"context"
"encoding/json"
"testing"
"time"

"github.com/openshift/library-go/pkg/controller/factory"
fakeapiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
fakedynamic "k8s.io/client-go/dynamic/fake"
kubefake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"

fakeclusterclient "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
fakeoperatorclient "open-cluster-management.io/api/client/operator/clientset/versioned/fake"
clusterv1 "open-cluster-management.io/api/cluster/v1"
"open-cluster-management.io/sdk-go/pkg/patcher"

testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
"open-cluster-management.io/ocm/pkg/registration/hub/importer/providers"
cloudproviders "open-cluster-management.io/ocm/pkg/registration/hub/importer/providers"
)

func TestSync(t *testing.T) {
cases := []struct {
name string
provider *fakeProvider
key string
cluster *clusterv1.ManagedCluster
validate func(t *testing.T, actions []clienttesting.Action)
}{
{
name: "import succeed",
provider: &fakeProvider{isOwned: true},
key: "cluster1",
cluster: &clusterv1.ManagedCluster{ObjectMeta: metav1.ObjectMeta{Name: "cluster1"}},
validate: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &clusterv1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
t.Fatal(err)
}
if !meta.IsStatusConditionTrue(managedCluster.Status.Conditions, conditionTypeImported) {
t.Errorf("expected managed cluster to be imported")
}
},
},
{
name: "no cluster",
provider: &fakeProvider{isOwned: true},
key: "cluster1",
cluster: &clusterv1.ManagedCluster{ObjectMeta: metav1.ObjectMeta{Name: "cluster2"}},
validate: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertNoActions(t, actions)
},
},
{
name: "not owned by the provider",
provider: &fakeProvider{isOwned: false},
key: "cluster1",
cluster: &clusterv1.ManagedCluster{ObjectMeta: metav1.ObjectMeta{Name: "cluster1"}},
validate: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertNoActions(t, actions)
},
},
{
name: "clients for remote cluster is not generated",
provider: &fakeProvider{isOwned: true, noClients: true},
key: "cluster1",
cluster: &clusterv1.ManagedCluster{ObjectMeta: metav1.ObjectMeta{Name: "cluster1"}},
validate: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &clusterv1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
t.Fatal(err)
}
if !meta.IsStatusConditionFalse(managedCluster.Status.Conditions, conditionTypeImported) {
t.Errorf("expected managed cluster to be imported")
}
},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := fakeclusterclient.NewSimpleClientset(c.cluster)
clusterInformer := clusterinformers.NewSharedInformerFactory(
clusterClient, 10*time.Minute).Cluster().V1().ManagedClusters()
clusterStore := clusterInformer.Informer().GetStore()
if err := clusterStore.Add(c.cluster); err != nil {
t.Fatal(err)
}
importer := &Importer{
providers: []cloudproviders.Interface{c.provider},
clusterClient: clusterClient,
clusterLister: clusterInformer.Lister(),
patcher: patcher.NewPatcher[
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters()),
}
err := importer.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, c.key))
if err != nil {
t.Fatal(err)
}
c.validate(t, clusterClient.Actions())
})
}
}

type fakeProvider struct {
isOwned bool
noClients bool
kubeConfigErr error
}

// KubeConfig is to return the config to connect to the target cluster.
func (f *fakeProvider) Clients(_ *clusterv1.ManagedCluster) (*providers.Clients, error) {
if f.kubeConfigErr != nil {
return nil, f.kubeConfigErr
}
if f.noClients {
return nil, nil
}
return &providers.Clients{
KubeClient: kubefake.NewClientset(),
// due to https://github.com/kubernetes/kubernetes/issues/126850, still need to use NewSimpleClientset
APIExtClient: fakeapiextensions.NewSimpleClientset(),
OperatorClient: fakeoperatorclient.NewSimpleClientset(),
DynamicClient: fakedynamic.NewSimpleDynamicClient(runtime.NewScheme()),
}, nil
}

// IsManagedClusterOwner check if the provider is used to manage this cluster
func (f *fakeProvider) IsManagedClusterOwner(_ *clusterv1.ManagedCluster) bool {
return f.isOwned
}

// Register registers the provider to the importer. The provider should enqueue the resource
// into the queue with the name of the managed cluster
func (f *fakeProvider) Register(_ factory.SyncContext) {}

// Run starts the provider
func (f *fakeProvider) Run(_ context.Context) {}
14 changes: 9 additions & 5 deletions pkg/registration/hub/importer/providers/capi/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewCAPIProvider(
}
}

func (c *CAPIProvider) KubeConfig(cluster *clusterv1.ManagedCluster) (*rest.Config, error) {
func (c *CAPIProvider) Clients(cluster *clusterv1.ManagedCluster) (*providers.Clients, error) {
clusterKey := capiNameFromManagedCluster(cluster)
namespace, name, err := cache.SplitMetaNamespaceKey(clusterKey)
if err != nil {
Expand Down Expand Up @@ -93,7 +93,12 @@ func (c *CAPIProvider) KubeConfig(cluster *clusterv1.ManagedCluster) (*rest.Conf
if err != nil {
return nil, err
}
return configOverride.ClientConfig()

config, err := configOverride.ClientConfig()
if err != nil {
return nil, err
}
return providers.NewClient(config)
}

func (c *CAPIProvider) Register(syncCtx factory.SyncContext) {
Expand Down Expand Up @@ -128,7 +133,7 @@ func (c *CAPIProvider) enqueueManagedClusterByCAPI(obj interface{}, syncCtx fact
}
for _, obj := range objs {
accessor, _ := meta.Accessor(obj)
syncCtx.Queue().Add(fmt.Sprintf("%s/%s", accessor.GetNamespace(), accessor.GetName()))
syncCtx.Queue().Add(accessor.GetName())
}
}

Expand All @@ -141,11 +146,10 @@ func indexByCAPIResource(obj interface{}) ([]string, error) {
}

func capiNameFromManagedCluster(cluster *clusterv1.ManagedCluster) string {
name := fmt.Sprintf("%s/%s", cluster.Name, cluster.Namespace)
if len(cluster.Annotations) > 0 {
if key, ok := cluster.Annotations[CAPIAnnotationKey]; ok {
return key
}
}
return name
return fmt.Sprintf("%s/%s", cluster.Name, cluster.Name)
}
Loading

0 comments on commit 68369c8

Please sign in to comment.