Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Use separate cache for partial metadata watches on secrets to include all secrets #10633

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions exp/addons/controllers/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"

Expand All @@ -36,12 +37,12 @@ type ClusterResourceSetReconciler struct {
WatchFilterValue string
}

func (r *ClusterResourceSetReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
func (r *ClusterResourceSetReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options, partialSecretCache cache.Cache) error {
return (&clusterresourcesets.ClusterResourceSetReconciler{
Client: r.Client,
Tracker: r.Tracker,
WatchFilterValue: r.WatchFilterValue,
}).SetupWithManager(ctx, mgr, options)
}).SetupWithManager(ctx, mgr, options, partialSecretCache)
}

// ClusterResourceSetBindingReconciler reconciles a ClusterResourceSetBinding object.
Expand Down
98 changes: 55 additions & 43 deletions exp/addons/internal/controllers/clusterresourceset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ import (
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/remote"
Expand Down Expand Up @@ -65,7 +67,7 @@ type ClusterResourceSetReconciler struct {
WatchFilterValue string
}

func (r *ClusterResourceSetReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
func (r *ClusterResourceSetReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options, partialSecretCache cache.Cache) error {
err := ctrl.NewControllerManagedBy(mgr).
For(&addonsv1.ClusterResourceSet{}).
Watches(
Expand All @@ -74,18 +76,26 @@ func (r *ClusterResourceSetReconciler) SetupWithManager(ctx context.Context, mgr
).
WatchesMetadata(
&corev1.ConfigMap{},
handler.EnqueueRequestsFromMapFunc(r.resourceToClusterResourceSet),
builder.WithPredicates(
resourcepredicates.ResourceCreateOrUpdate(ctrl.LoggerFrom(ctx)),
handler.EnqueueRequestsFromMapFunc(
resourceToClusterResourceSetFunc[client.Object](r.Client),
),
).
WatchesMetadata(
&corev1.Secret{},
handler.EnqueueRequestsFromMapFunc(r.resourceToClusterResourceSet),
builder.WithPredicates(
resourcepredicates.ResourceCreateOrUpdate(ctrl.LoggerFrom(ctx)),
resourcepredicates.TypedResourceCreateOrUpdate[client.Object](ctrl.LoggerFrom(ctx)),
),
).
WatchesRawSource(source.Kind(
partialSecretCache,
&metav1.PartialObjectMetadata{
TypeMeta: metav1.TypeMeta{
Kind: "Secret",
APIVersion: "v1",
},
},
handler.TypedEnqueueRequestsFromMapFunc(
resourceToClusterResourceSetFunc[*metav1.PartialObjectMetadata](r.Client),
),
resourcepredicates.TypedResourceCreateOrUpdate[*metav1.PartialObjectMetadata](ctrl.LoggerFrom(ctx)),
)).
WithOptions(options).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue)).
Complete(r)
Expand Down Expand Up @@ -471,46 +481,48 @@ func (r *ClusterResourceSetReconciler) clusterToClusterResourceSet(ctx context.C
return result
}

// resourceToClusterResourceSet is mapper function that maps resources to ClusterResourceSet.
func (r *ClusterResourceSetReconciler) resourceToClusterResourceSet(ctx context.Context, o client.Object) []ctrl.Request {
result := []ctrl.Request{}
// resourceToClusterResourceSetFunc returns a typed mapper function that maps resources to ClusterResourceSet.
func resourceToClusterResourceSetFunc[T client.Object](ctrlClient client.Client) handler.TypedMapFunc[T] {
return func(ctx context.Context, o T) []ctrl.Request {
result := []ctrl.Request{}

// Add all ClusterResourceSet owners.
for _, owner := range o.GetOwnerReferences() {
if owner.Kind == "ClusterResourceSet" {
name := client.ObjectKey{Namespace: o.GetNamespace(), Name: owner.Name}
result = append(result, ctrl.Request{NamespacedName: name})
// Add all ClusterResourceSet owners.
for _, owner := range o.GetOwnerReferences() {
if owner.Kind == "ClusterResourceSet" {
name := client.ObjectKey{Namespace: o.GetNamespace(), Name: owner.Name}
result = append(result, ctrl.Request{NamespacedName: name})
}
}
}

// If there is any ClusterResourceSet owner, that means the resource is reconciled before,
// and existing owners are the only matching ClusterResourceSets to this resource, so no need to return all ClusterResourceSets.
if len(result) > 0 {
return result
}
// If there is any ClusterResourceSet owner, that means the resource is reconciled before,
// and existing owners are the only matching ClusterResourceSets to this resource, so no need to return all ClusterResourceSets.
if len(result) > 0 {
return result
}

// Only core group is accepted as resources group
if o.GetObjectKind().GroupVersionKind().Group != "" {
return result
}
// Only core group is accepted as resources group
if o.GetObjectKind().GroupVersionKind().Group != "" {
return result
}

crsList := &addonsv1.ClusterResourceSetList{}
if err := r.Client.List(ctx, crsList, client.InNamespace(o.GetNamespace())); err != nil {
return nil
}
objKind, err := apiutil.GVKForObject(o, r.Client.Scheme())
if err != nil {
return nil
}
for _, crs := range crsList.Items {
for _, resource := range crs.Spec.Resources {
if resource.Kind == objKind.Kind && resource.Name == o.GetName() {
name := client.ObjectKey{Namespace: o.GetNamespace(), Name: crs.Name}
result = append(result, ctrl.Request{NamespacedName: name})
break
crsList := &addonsv1.ClusterResourceSetList{}
if err := ctrlClient.List(ctx, crsList, client.InNamespace(o.GetNamespace())); err != nil {
return nil
}
objKind, err := apiutil.GVKForObject(o, ctrlClient.Scheme())
if err != nil {
return nil
}
for _, crs := range crsList.Items {
for _, resource := range crs.Spec.Resources {
if resource.Kind == objKind.Kind && resource.Name == o.GetName() {
name := client.ObjectKey{Namespace: o.GetNamespace(), Name: crs.Name}
result = append(result, ctrl.Request{NamespacedName: name})
break
}
}
}
}

return result
return result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ package predicates

import (
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// ResourceCreateOrUpdate returns a predicate that returns true for create and update events.
func ResourceCreateOrUpdate(_ logr.Logger) predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(event.CreateEvent) bool { return true },
UpdateFunc: func(event.UpdateEvent) bool { return true },
DeleteFunc: func(event.DeleteEvent) bool { return false },
GenericFunc: func(event.GenericEvent) bool { return false },
// TypedResourceCreateOrUpdate returns a predicate that returns true for create and update events.
func TypedResourceCreateOrUpdate[T client.Object](_ logr.Logger) predicate.TypedFuncs[T] {
return predicate.TypedFuncs[T]{
CreateFunc: func(event.TypedCreateEvent[T]) bool { return true },
UpdateFunc: func(event.TypedUpdateEvent[T]) bool { return true },
DeleteFunc: func(event.TypedDeleteEvent[T]) bool { return false },
GenericFunc: func(event.TypedGenericEvent[T]) bool { return false },
}
}
19 changes: 18 additions & 1 deletion exp/addons/internal/controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ import (
"fmt"
"os"
"testing"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"

Expand All @@ -46,6 +49,20 @@ func TestMain(m *testing.M) {
}

setupReconcilers := func(ctx context.Context, mgr ctrl.Manager) {
// Create partial cache analog to main.go.
partialSecretCache, err := cache.New(mgr.GetConfig(), cache.Options{
Scheme: mgr.GetScheme(),
Mapper: mgr.GetRESTMapper(),
HTTPClient: mgr.GetHTTPClient(),
SyncPeriod: ptr.To(time.Minute * 10),
})
if err != nil {
panic(fmt.Sprintf("Failed to create cache for metadata only Secret watches: %v", err))
}
if err := mgr.Add(partialSecretCache); err != nil {
panic(fmt.Sprintf("Failed to start cache for metadata only Secret watches: %v", err))
}

tracker, err := remote.NewClusterCacheTracker(mgr, remote.ClusterCacheTrackerOptions{})
if err != nil {
panic(fmt.Sprintf("Failed to create new cluster cache tracker: %v", err))
Expand All @@ -55,7 +72,7 @@ func TestMain(m *testing.M) {
Client: mgr.GetClient(),
Tracker: tracker,
}
if err = reconciler.SetupWithManager(ctx, mgr, controller.Options{MaxConcurrentReconciles: 1}); err != nil {
if err = reconciler.SetupWithManager(ctx, mgr, controller.Options{MaxConcurrentReconciles: 1}, partialSecretCache); err != nil {
panic(fmt.Sprintf("Failed to set up cluster resource set reconciler: %v", err))
}
bindingReconciler := ClusterResourceSetBindingReconciler{
Expand Down
5 changes: 3 additions & 2 deletions exp/runtime/controllers/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"

Expand All @@ -37,11 +38,11 @@ type ExtensionConfigReconciler struct {
WatchFilterValue string
}

func (r *ExtensionConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
func (r *ExtensionConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options, partialSecretCache cache.Cache) error {
return (&runtimecontrollers.Reconciler{
Client: r.Client,
APIReader: r.APIReader,
RuntimeClient: r.RuntimeClient,
WatchFilterValue: r.WatchFilterValue,
}).SetupWithManager(ctx, mgr, options)
}).SetupWithManager(ctx, mgr, options, partialSecretCache)
}
23 changes: 17 additions & 6 deletions exp/runtime/internal/controllers/extensionconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
runtimev1 "sigs.k8s.io/cluster-api/exp/runtime/api/v1alpha1"
Expand Down Expand Up @@ -59,13 +62,21 @@ type Reconciler struct {
WatchFilterValue string
}

func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options, partialSecretCache cache.Cache) error {
err := ctrl.NewControllerManagedBy(mgr).
For(&runtimev1.ExtensionConfig{}).
WatchesMetadata(
&corev1.Secret{},
handler.EnqueueRequestsFromMapFunc(r.secretToExtensionConfig),
).
WatchesRawSource(source.Kind(
partialSecretCache,
&metav1.PartialObjectMetadata{
TypeMeta: metav1.TypeMeta{
Kind: "Secret",
APIVersion: "v1",
},
},
handler.TypedEnqueueRequestsFromMapFunc(
r.secretToExtensionConfig,
),
)).
WithOptions(options).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue)).
Complete(r)
Expand Down Expand Up @@ -181,7 +192,7 @@ func (r *Reconciler) reconcileDelete(ctx context.Context, extensionConfig *runti

// secretToExtensionConfig maps a secret to ExtensionConfigs with the corresponding InjectCAFromSecretAnnotation
// to reconcile them on updates of the secrets.
func (r *Reconciler) secretToExtensionConfig(ctx context.Context, secret client.Object) []reconcile.Request {
func (r *Reconciler) secretToExtensionConfig(ctx context.Context, secret *metav1.PartialObjectMetadata) []reconcile.Request {
result := []ctrl.Request{}

extensionConfigs := runtimev1.ExtensionConfigList{}
Expand Down
Loading
Loading