diff --git a/controller/runnable.go b/controller/runnable.go index 46d37f3..fb9ef29 100644 --- a/controller/runnable.go +++ b/controller/runnable.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "strings" + "sync" "time" "github.com/samber/lo" @@ -35,6 +36,7 @@ type RunnableBuilder func(controller *Controller) Runnable type RunnableBuilderOptions[T Object] struct { LabelSelector string FieldSelector string + Predicates []ctrlruntimepredicate.TypedPredicate[T] Builder func(obj T, resource schema.GroupVersionResource, namespace string, options ...RunnableBuilderOption[T]) RunnableBuilder } @@ -52,6 +54,12 @@ func FilterResourcesByField[T Object](selector string) RunnableBuilderOption[T] } } +func WithPredicates[T Object](predicates ...ctrlruntimepredicate.TypedPredicate[T]) RunnableBuilderOption[T] { + return func(o *RunnableBuilderOptions[T]) { + o.Predicates = append(o.Predicates, predicates...) + } +} + func Builder[T Object](builder func(obj T, resource schema.GroupVersionResource, namespace string, options ...RunnableBuilderOption[T]) RunnableBuilder) RunnableBuilderOption[T] { return func(o *RunnableBuilderOptions[T]) { o.Builder = builder @@ -156,9 +164,7 @@ func StateReconciler[T Object](obj T, resource schema.GroupVersionResource, name }) }, watchFunc: func(manager ctrlruntime.Manager) ctrlruntimesrc.Source { - predicates := []ctrlruntimepredicate.TypedPredicate[T]{ - &ctrlruntimepredicate.TypedGenerationChangedPredicate[T]{}, - } + var predicates []ctrlruntimepredicate.TypedPredicate[T] if o.LabelSelector != "" { predicates = append(predicates, ctrlruntimepredicate.NewTypedPredicateFuncs(func(obj T) bool { return ToLabelSelector(o.LabelSelector).Matches(labels.Set(obj.GetLabels())) @@ -172,6 +178,12 @@ func StateReconciler[T Object](obj T, resource schema.GroupVersionResource, name })))) })) } + + // Add custom predicates passed via options + if len(o.Predicates) > 0 { + predicates = append(predicates, o.Predicates...) + } + return ctrlruntimesrc.Kind(manager.GetCache(), obj, ctrlruntimehandler.TypedEnqueueRequestsFromMapFunc(TypedEnqueueRequestsMapFunc[T]), predicates...) }, } @@ -187,14 +199,20 @@ type stateReconciler struct { listFunc ListFunc watchFunc WatchFunc synced bool + sync.RWMutex } func (r *stateReconciler) Run(_ <-chan struct{}) { + r.Lock() + defer r.Unlock() r.controller.listAndWatch(r.listFunc, r.watchFunc) r.synced = true } func (r *stateReconciler) HasSynced() bool { + r.RLock() + defer r.RUnlock() + return r.synced } diff --git a/examples/kuadrant/main.go b/examples/kuadrant/main.go index f374d76..440e4bd 100644 --- a/examples/kuadrant/main.go +++ b/examples/kuadrant/main.go @@ -19,6 +19,7 @@ import ( clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/clientcmd" "k8s.io/utils/ptr" + ctrlruntimepredicate "sigs.k8s.io/controller-runtime/pkg/predicate" gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" ctrlruntime "sigs.k8s.io/controller-runtime" @@ -120,12 +121,41 @@ func main() { controllerOpts := []controller.ControllerOption{ controller.WithLogger(logger), controller.WithClient(client), - controller.WithRunnable("gateway watcher", buildWatcher(&gwapiv1.Gateway{}, controller.GatewaysResource, metav1.NamespaceAll)), - controller.WithRunnable("httproute watcher", buildWatcher(&gwapiv1.HTTPRoute{}, controller.HTTPRoutesResource, metav1.NamespaceAll)), - controller.WithRunnable("dnspolicy watcher", buildWatcher(&kuadrantv1alpha2.DNSPolicy{}, kuadrantv1alpha2.DNSPoliciesResource, metav1.NamespaceAll)), - controller.WithRunnable("tlspolicy watcher", buildWatcher(&kuadrantv1alpha2.TLSPolicy{}, kuadrantv1alpha2.TLSPoliciesResource, metav1.NamespaceAll)), - controller.WithRunnable("authpolicy watcher", buildWatcher(&kuadrantv1beta3.AuthPolicy{}, kuadrantv1beta3.AuthPoliciesResource, metav1.NamespaceAll)), - controller.WithRunnable("ratelimitpolicy watcher", buildWatcher(&kuadrantv1beta3.RateLimitPolicy{}, kuadrantv1beta3.RateLimitPoliciesResource, metav1.NamespaceAll)), + controller.WithRunnable("gateway watcher", buildWatcher( + &gwapiv1.Gateway{}, + controller.GatewaysResource, + metav1.NamespaceAll, + controller.WithPredicates(&ctrlruntimepredicate.TypedGenerationChangedPredicate[*gwapiv1.Gateway]{})), + ), + controller.WithRunnable("httproute watcher", buildWatcher( + &gwapiv1.HTTPRoute{}, + controller.HTTPRoutesResource, + metav1.NamespaceAll, + controller.WithPredicates(&ctrlruntimepredicate.TypedGenerationChangedPredicate[*gwapiv1.HTTPRoute]{})), + ), + controller.WithRunnable("dnspolicy watcher", buildWatcher( + &kuadrantv1alpha2.DNSPolicy{}, + kuadrantv1alpha2.DNSPoliciesResource, + metav1.NamespaceAll, + controller.WithPredicates(&ctrlruntimepredicate.TypedGenerationChangedPredicate[*kuadrantv1alpha2.DNSPolicy]{})), + ), + controller.WithRunnable("tlspolicy watcher", buildWatcher( + &kuadrantv1alpha2.TLSPolicy{}, + kuadrantv1alpha2.TLSPoliciesResource, + metav1.NamespaceAll, + controller.WithPredicates(&ctrlruntimepredicate.TypedGenerationChangedPredicate[*kuadrantv1alpha2.TLSPolicy]{})), + ), + controller.WithRunnable("authpolicy watcher", buildWatcher( + &kuadrantv1beta3.AuthPolicy{}, + kuadrantv1beta3.AuthPoliciesResource, + metav1.NamespaceAll, + controller.WithPredicates(&ctrlruntimepredicate.TypedGenerationChangedPredicate[*kuadrantv1beta3.AuthPolicy]{})), + ), + controller.WithRunnable("ratelimitpolicy watcher", buildWatcher( + &kuadrantv1beta3.RateLimitPolicy{}, + kuadrantv1beta3.RateLimitPoliciesResource, + metav1.NamespaceAll, + controller.WithPredicates(&ctrlruntimepredicate.TypedGenerationChangedPredicate[*kuadrantv1beta3.RateLimitPolicy]{}))), controller.WithPolicyKinds( kuadrantv1alpha2.DNSPolicyKind, kuadrantv1alpha2.TLSPolicyKind, @@ -182,11 +212,21 @@ func controllerOptionsFor(gatewayProviders []string) []controller.ControllerOpti for _, gatewayProvider := range gatewayProviders { switch gatewayProvider { case reconcilers.EnvoyGatewayProviderName: - opts = append(opts, controller.WithRunnable("envoygateway/securitypolicy watcher", buildWatcher(&egv1alpha1.SecurityPolicy{}, reconcilers.EnvoyGatewaySecurityPoliciesResource, metav1.NamespaceAll))) + opts = append(opts, controller.WithRunnable("envoygateway/securitypolicy watcher", buildWatcher( + &egv1alpha1.SecurityPolicy{}, + reconcilers.EnvoyGatewaySecurityPoliciesResource, + metav1.NamespaceAll, + controller.WithPredicates(&ctrlruntimepredicate.TypedGenerationChangedPredicate[*egv1alpha1.SecurityPolicy]{}))), + ) opts = append(opts, controller.WithObjectKinds(reconcilers.EnvoyGatewaySecurityPolicyKind)) opts = append(opts, controller.WithObjectLinks(reconcilers.LinkGatewayToEnvoyGatewaySecurityPolicyFunc)) case reconcilers.IstioGatewayProviderName: - opts = append(opts, controller.WithRunnable("istio/authorizationpolicy watcher", buildWatcher(&istiov1.AuthorizationPolicy{}, reconcilers.IstioAuthorizationPoliciesResource, metav1.NamespaceAll))) + opts = append(opts, controller.WithRunnable("istio/authorizationpolicy watcher", buildWatcher( + &istiov1.AuthorizationPolicy{}, + reconcilers.IstioAuthorizationPoliciesResource, + metav1.NamespaceAll, + controller.WithPredicates(&ctrlruntimepredicate.TypedGenerationChangedPredicate[*istiov1.AuthorizationPolicy]{}))), + ) opts = append(opts, controller.WithObjectKinds(reconcilers.IstioAuthorizationPolicyKind)) opts = append(opts, controller.WithObjectLinks(reconcilers.LinkGatewayToIstioAuthorizationPolicyFunc)) }