From 2b168f4d03ba72b4b579cae8dabcf34f52c9e59f Mon Sep 17 00:00:00 2001 From: KevFan Date: Fri, 27 Sep 2024 17:29:59 +0100 Subject: [PATCH 1/3] refactor: allow custom predicates as option Signed-off-by: KevFan --- controller/runnable.go | 17 +++++++++--- examples/kuadrant/main.go | 56 +++++++++++++++++++++++++++++++++------ 2 files changed, 62 insertions(+), 11 deletions(-) diff --git a/controller/runnable.go b/controller/runnable.go index 46d37f3..a1600a9 100644 --- a/controller/runnable.go +++ b/controller/runnable.go @@ -35,6 +35,7 @@ type RunnableBuilder func(controller *Controller) Runnable type RunnableBuilderOptions[T Object] struct { LabelSelector string FieldSelector string + Predicates []ctrlruntimepredicate.TypedPredicate[T] Builder func(obj T, resource schema.GroupVersionResource, namespace string, options ...RunnableBuilderOption[T]) RunnableBuilder } @@ -52,6 +53,12 @@ func FilterResourcesByField[T Object](selector string) RunnableBuilderOption[T] } } +func WithPredicates[T Object](predicates ...ctrlruntimepredicate.TypedPredicate[T]) RunnableBuilderOption[T] { + return func(o *RunnableBuilderOptions[T]) { + o.Predicates = append(o.Predicates, predicates...) + } +} + func Builder[T Object](builder func(obj T, resource schema.GroupVersionResource, namespace string, options ...RunnableBuilderOption[T]) RunnableBuilder) RunnableBuilderOption[T] { return func(o *RunnableBuilderOptions[T]) { o.Builder = builder @@ -156,9 +163,7 @@ func StateReconciler[T Object](obj T, resource schema.GroupVersionResource, name }) }, watchFunc: func(manager ctrlruntime.Manager) ctrlruntimesrc.Source { - predicates := []ctrlruntimepredicate.TypedPredicate[T]{ - &ctrlruntimepredicate.TypedGenerationChangedPredicate[T]{}, - } + var predicates []ctrlruntimepredicate.TypedPredicate[T] if o.LabelSelector != "" { predicates = append(predicates, ctrlruntimepredicate.NewTypedPredicateFuncs(func(obj T) bool { return ToLabelSelector(o.LabelSelector).Matches(labels.Set(obj.GetLabels())) @@ -172,6 +177,12 @@ func StateReconciler[T Object](obj T, resource schema.GroupVersionResource, name })))) })) } + + // Add custom predicates passed via options + if len(o.Predicates) > 0 { + predicates = append(predicates, o.Predicates...) + } + return ctrlruntimesrc.Kind(manager.GetCache(), obj, ctrlruntimehandler.TypedEnqueueRequestsFromMapFunc(TypedEnqueueRequestsMapFunc[T]), predicates...) }, } diff --git a/examples/kuadrant/main.go b/examples/kuadrant/main.go index f374d76..8ddb5bf 100644 --- a/examples/kuadrant/main.go +++ b/examples/kuadrant/main.go @@ -19,6 +19,7 @@ import ( clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/clientcmd" "k8s.io/utils/ptr" + ctrlruntimepredicate "sigs.k8s.io/controller-runtime/pkg/predicate" gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" ctrlruntime "sigs.k8s.io/controller-runtime" @@ -120,12 +121,41 @@ func main() { controllerOpts := []controller.ControllerOption{ controller.WithLogger(logger), controller.WithClient(client), - controller.WithRunnable("gateway watcher", buildWatcher(&gwapiv1.Gateway{}, controller.GatewaysResource, metav1.NamespaceAll)), - controller.WithRunnable("httproute watcher", buildWatcher(&gwapiv1.HTTPRoute{}, controller.HTTPRoutesResource, metav1.NamespaceAll)), - controller.WithRunnable("dnspolicy watcher", buildWatcher(&kuadrantv1alpha2.DNSPolicy{}, kuadrantv1alpha2.DNSPoliciesResource, metav1.NamespaceAll)), - controller.WithRunnable("tlspolicy watcher", buildWatcher(&kuadrantv1alpha2.TLSPolicy{}, kuadrantv1alpha2.TLSPoliciesResource, metav1.NamespaceAll)), - controller.WithRunnable("authpolicy watcher", buildWatcher(&kuadrantv1beta3.AuthPolicy{}, kuadrantv1beta3.AuthPoliciesResource, metav1.NamespaceAll)), - controller.WithRunnable("ratelimitpolicy watcher", buildWatcher(&kuadrantv1beta3.RateLimitPolicy{}, kuadrantv1beta3.RateLimitPoliciesResource, metav1.NamespaceAll)), + controller.WithRunnable("gateway watcher", buildWatcher( + &gwapiv1.Gateway{}, + controller.GatewaysResource, + metav1.NamespaceAll, + controller.WithPredicates[*gwapiv1.Gateway](&ctrlruntimepredicate.TypedGenerationChangedPredicate[*gwapiv1.Gateway]{})), + ), + controller.WithRunnable("httproute watcher", buildWatcher( + &gwapiv1.HTTPRoute{}, + controller.HTTPRoutesResource, + metav1.NamespaceAll, + controller.WithPredicates[*gwapiv1.HTTPRoute](&ctrlruntimepredicate.TypedGenerationChangedPredicate[*gwapiv1.HTTPRoute]{})), + ), + controller.WithRunnable("dnspolicy watcher", buildWatcher( + &kuadrantv1alpha2.DNSPolicy{}, + kuadrantv1alpha2.DNSPoliciesResource, + metav1.NamespaceAll, + controller.WithPredicates[*kuadrantv1alpha2.DNSPolicy](&ctrlruntimepredicate.TypedGenerationChangedPredicate[*kuadrantv1alpha2.DNSPolicy]{})), + ), + controller.WithRunnable("tlspolicy watcher", buildWatcher( + &kuadrantv1alpha2.TLSPolicy{}, + kuadrantv1alpha2.TLSPoliciesResource, + metav1.NamespaceAll, + controller.WithPredicates[*kuadrantv1alpha2.TLSPolicy](&ctrlruntimepredicate.TypedGenerationChangedPredicate[*kuadrantv1alpha2.TLSPolicy]{})), + ), + controller.WithRunnable("authpolicy watcher", buildWatcher( + &kuadrantv1beta3.AuthPolicy{}, + kuadrantv1beta3.AuthPoliciesResource, + metav1.NamespaceAll, + controller.WithPredicates[*kuadrantv1beta3.AuthPolicy](&ctrlruntimepredicate.TypedGenerationChangedPredicate[*kuadrantv1beta3.AuthPolicy]{})), + ), + controller.WithRunnable("ratelimitpolicy watcher", buildWatcher( + &kuadrantv1beta3.RateLimitPolicy{}, + kuadrantv1beta3.RateLimitPoliciesResource, + metav1.NamespaceAll, + controller.WithPredicates[*kuadrantv1beta3.RateLimitPolicy](&ctrlruntimepredicate.TypedGenerationChangedPredicate[*kuadrantv1beta3.RateLimitPolicy]{}))), controller.WithPolicyKinds( kuadrantv1alpha2.DNSPolicyKind, kuadrantv1alpha2.TLSPolicyKind, @@ -182,11 +212,21 @@ func controllerOptionsFor(gatewayProviders []string) []controller.ControllerOpti for _, gatewayProvider := range gatewayProviders { switch gatewayProvider { case reconcilers.EnvoyGatewayProviderName: - opts = append(opts, controller.WithRunnable("envoygateway/securitypolicy watcher", buildWatcher(&egv1alpha1.SecurityPolicy{}, reconcilers.EnvoyGatewaySecurityPoliciesResource, metav1.NamespaceAll))) + opts = append(opts, controller.WithRunnable("envoygateway/securitypolicy watcher", buildWatcher( + &egv1alpha1.SecurityPolicy{}, + reconcilers.EnvoyGatewaySecurityPoliciesResource, + metav1.NamespaceAll, + controller.WithPredicates[*egv1alpha1.SecurityPolicy](&ctrlruntimepredicate.TypedGenerationChangedPredicate[*egv1alpha1.SecurityPolicy]{}))), + ) opts = append(opts, controller.WithObjectKinds(reconcilers.EnvoyGatewaySecurityPolicyKind)) opts = append(opts, controller.WithObjectLinks(reconcilers.LinkGatewayToEnvoyGatewaySecurityPolicyFunc)) case reconcilers.IstioGatewayProviderName: - opts = append(opts, controller.WithRunnable("istio/authorizationpolicy watcher", buildWatcher(&istiov1.AuthorizationPolicy{}, reconcilers.IstioAuthorizationPoliciesResource, metav1.NamespaceAll))) + opts = append(opts, controller.WithRunnable("istio/authorizationpolicy watcher", buildWatcher( + &istiov1.AuthorizationPolicy{}, + reconcilers.IstioAuthorizationPoliciesResource, + metav1.NamespaceAll, + controller.WithPredicates[*istiov1.AuthorizationPolicy](&ctrlruntimepredicate.TypedGenerationChangedPredicate[*istiov1.AuthorizationPolicy]{}))), + ) opts = append(opts, controller.WithObjectKinds(reconcilers.IstioAuthorizationPolicyKind)) opts = append(opts, controller.WithObjectLinks(reconcilers.LinkGatewayToIstioAuthorizationPolicyFunc)) } From 7335fc9f50e2f94cd029bd0d5a9732f7450797ff Mon Sep 17 00:00:00 2001 From: KevFan Date: Fri, 27 Sep 2024 17:31:23 +0100 Subject: [PATCH 2/3] refactor: add RWmutex to stateReconciler struct Signed-off-by: KevFan --- controller/runnable.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/controller/runnable.go b/controller/runnable.go index a1600a9..fb9ef29 100644 --- a/controller/runnable.go +++ b/controller/runnable.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "strings" + "sync" "time" "github.com/samber/lo" @@ -198,14 +199,20 @@ type stateReconciler struct { listFunc ListFunc watchFunc WatchFunc synced bool + sync.RWMutex } func (r *stateReconciler) Run(_ <-chan struct{}) { + r.Lock() + defer r.Unlock() r.controller.listAndWatch(r.listFunc, r.watchFunc) r.synced = true } func (r *stateReconciler) HasSynced() bool { + r.RLock() + defer r.RUnlock() + return r.synced } From c25fb320ae8283f783a3e71e4a89e3e74d89b83e Mon Sep 17 00:00:00 2001 From: KevFan Date: Mon, 30 Sep 2024 09:59:32 +0100 Subject: [PATCH 3/3] fixup: infer type from arg for typed predicate Signed-off-by: KevFan --- examples/kuadrant/main.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/examples/kuadrant/main.go b/examples/kuadrant/main.go index 8ddb5bf..440e4bd 100644 --- a/examples/kuadrant/main.go +++ b/examples/kuadrant/main.go @@ -125,37 +125,37 @@ func main() { &gwapiv1.Gateway{}, controller.GatewaysResource, metav1.NamespaceAll, - controller.WithPredicates[*gwapiv1.Gateway](&ctrlruntimepredicate.TypedGenerationChangedPredicate[*gwapiv1.Gateway]{})), + controller.WithPredicates(&ctrlruntimepredicate.TypedGenerationChangedPredicate[*gwapiv1.Gateway]{})), ), controller.WithRunnable("httproute watcher", buildWatcher( &gwapiv1.HTTPRoute{}, controller.HTTPRoutesResource, metav1.NamespaceAll, - controller.WithPredicates[*gwapiv1.HTTPRoute](&ctrlruntimepredicate.TypedGenerationChangedPredicate[*gwapiv1.HTTPRoute]{})), + controller.WithPredicates(&ctrlruntimepredicate.TypedGenerationChangedPredicate[*gwapiv1.HTTPRoute]{})), ), controller.WithRunnable("dnspolicy watcher", buildWatcher( &kuadrantv1alpha2.DNSPolicy{}, kuadrantv1alpha2.DNSPoliciesResource, metav1.NamespaceAll, - controller.WithPredicates[*kuadrantv1alpha2.DNSPolicy](&ctrlruntimepredicate.TypedGenerationChangedPredicate[*kuadrantv1alpha2.DNSPolicy]{})), + controller.WithPredicates(&ctrlruntimepredicate.TypedGenerationChangedPredicate[*kuadrantv1alpha2.DNSPolicy]{})), ), controller.WithRunnable("tlspolicy watcher", buildWatcher( &kuadrantv1alpha2.TLSPolicy{}, kuadrantv1alpha2.TLSPoliciesResource, metav1.NamespaceAll, - controller.WithPredicates[*kuadrantv1alpha2.TLSPolicy](&ctrlruntimepredicate.TypedGenerationChangedPredicate[*kuadrantv1alpha2.TLSPolicy]{})), + controller.WithPredicates(&ctrlruntimepredicate.TypedGenerationChangedPredicate[*kuadrantv1alpha2.TLSPolicy]{})), ), controller.WithRunnable("authpolicy watcher", buildWatcher( &kuadrantv1beta3.AuthPolicy{}, kuadrantv1beta3.AuthPoliciesResource, metav1.NamespaceAll, - controller.WithPredicates[*kuadrantv1beta3.AuthPolicy](&ctrlruntimepredicate.TypedGenerationChangedPredicate[*kuadrantv1beta3.AuthPolicy]{})), + controller.WithPredicates(&ctrlruntimepredicate.TypedGenerationChangedPredicate[*kuadrantv1beta3.AuthPolicy]{})), ), controller.WithRunnable("ratelimitpolicy watcher", buildWatcher( &kuadrantv1beta3.RateLimitPolicy{}, kuadrantv1beta3.RateLimitPoliciesResource, metav1.NamespaceAll, - controller.WithPredicates[*kuadrantv1beta3.RateLimitPolicy](&ctrlruntimepredicate.TypedGenerationChangedPredicate[*kuadrantv1beta3.RateLimitPolicy]{}))), + controller.WithPredicates(&ctrlruntimepredicate.TypedGenerationChangedPredicate[*kuadrantv1beta3.RateLimitPolicy]{}))), controller.WithPolicyKinds( kuadrantv1alpha2.DNSPolicyKind, kuadrantv1alpha2.TLSPolicyKind, @@ -216,7 +216,7 @@ func controllerOptionsFor(gatewayProviders []string) []controller.ControllerOpti &egv1alpha1.SecurityPolicy{}, reconcilers.EnvoyGatewaySecurityPoliciesResource, metav1.NamespaceAll, - controller.WithPredicates[*egv1alpha1.SecurityPolicy](&ctrlruntimepredicate.TypedGenerationChangedPredicate[*egv1alpha1.SecurityPolicy]{}))), + controller.WithPredicates(&ctrlruntimepredicate.TypedGenerationChangedPredicate[*egv1alpha1.SecurityPolicy]{}))), ) opts = append(opts, controller.WithObjectKinds(reconcilers.EnvoyGatewaySecurityPolicyKind)) opts = append(opts, controller.WithObjectLinks(reconcilers.LinkGatewayToEnvoyGatewaySecurityPolicyFunc)) @@ -225,7 +225,7 @@ func controllerOptionsFor(gatewayProviders []string) []controller.ControllerOpti &istiov1.AuthorizationPolicy{}, reconcilers.IstioAuthorizationPoliciesResource, metav1.NamespaceAll, - controller.WithPredicates[*istiov1.AuthorizationPolicy](&ctrlruntimepredicate.TypedGenerationChangedPredicate[*istiov1.AuthorizationPolicy]{}))), + controller.WithPredicates(&ctrlruntimepredicate.TypedGenerationChangedPredicate[*istiov1.AuthorizationPolicy]{}))), ) opts = append(opts, controller.WithObjectKinds(reconcilers.IstioAuthorizationPolicyKind)) opts = append(opts, controller.WithObjectLinks(reconcilers.LinkGatewayToIstioAuthorizationPolicyFunc))