From 4cfa964bc89081f457a01569377056c690527503 Mon Sep 17 00:00:00 2001 From: Alper Rifat Ulucinar Date: Fri, 18 Aug 2023 11:29:48 +0300 Subject: [PATCH] Fixes the requeueing bug which requeues a reconcile request in a wrong workqueue Signed-off-by: Alper Rifat Ulucinar --- Makefile | 4 ++- pkg/controller/external.go | 2 +- pkg/controller/handler/eventhandler.go | 30 ++++++++++++++++++++--- pkg/controller/options.go | 5 ---- pkg/pipeline/templates/controller.go.tmpl | 8 +++--- pkg/pipeline/templates/setup.go.tmpl | 6 ----- 6 files changed, 36 insertions(+), 19 deletions(-) diff --git a/Makefile b/Makefile index d9bd521f..66d4d5ad 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,9 @@ PROJECT_NAME := upjet PROJECT_REPO := github.com/upbound/$(PROJECT_NAME) -GOLANGCILINT_VERSION ?= 1.53.3 +# GOLANGCILINT_VERSION is inherited from build submodule by default. +# Uncomment below if you need to override the version. +# GOLANGCILINT_VERSION ?= 1.54.0 GO_REQUIRED_VERSION ?= 1.20 PLATFORMS ?= linux_amd64 linux_arm64 diff --git a/pkg/controller/external.go b/pkg/controller/external.go index 48db6153..bd317340 100644 --- a/pkg/controller/external.go +++ b/pkg/controller/external.go @@ -125,7 +125,7 @@ func (c *Connector) Connect(ctx context.Context, mg xpresource.Managed) (managed providerHandle: ws.ProviderHandle, eventHandler: c.eventHandler, kube: c.kube, - logger: c.logger.WithValues("uid", mg.GetUID()), + logger: c.logger.WithValues("uid", mg.GetUID(), "name", mg.GetName(), "gvk", mg.GetObjectKind().GroupVersionKind().String()), }, nil } diff --git a/pkg/controller/handler/eventhandler.go b/pkg/controller/handler/eventhandler.go index 027d1798..d5f14a01 100644 --- a/pkg/controller/handler/eventhandler.go +++ b/pkg/controller/handler/eventhandler.go @@ -18,6 +18,7 @@ import ( "context" "sync" + "github.com/crossplane/crossplane-runtime/pkg/logging" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/event" @@ -31,16 +32,31 @@ type EventHandler struct { innerHandler handler.EventHandler queue workqueue.RateLimitingInterface rateLimiterMap map[string]workqueue.RateLimiter + logger logging.Logger mu *sync.RWMutex } +// Option configures an option for the EventHandler. +type Option func(eventHandler *EventHandler) + +// WithLogger configures the logger for the EventHandler. +func WithLogger(logger logging.Logger) Option { + return func(eventHandler *EventHandler) { + eventHandler.logger = logger + } +} + // NewEventHandler initializes a new EventHandler instance. -func NewEventHandler() *EventHandler { - return &EventHandler{ +func NewEventHandler(opts ...Option) *EventHandler { + eh := &EventHandler{ innerHandler: &handler.EnqueueRequestForObject{}, mu: &sync.RWMutex{}, rateLimiterMap: make(map[string]workqueue.RateLimiter), } + for _, o := range opts { + o(eh) + } + return eh } // RequestReconcile requeues a reconciliation request for the specified name. @@ -51,6 +67,7 @@ func (e *EventHandler) RequestReconcile(rateLimiterName, name string, failureLim if e.queue == nil { return false } + logger := e.logger.WithValues("name", name) item := reconcile.Request{ NamespacedName: types.NamespacedName{ Name: name, @@ -62,9 +79,12 @@ func (e *EventHandler) RequestReconcile(rateLimiterName, name string, failureLim e.rateLimiterMap[rateLimiterName] = rateLimiter } if failureLimit != nil && rateLimiter.NumRequeues(item) > *failureLimit { + logger.Info("Failure limit has been exceeded.", "failureLimit", *failureLimit, "numRequeues", rateLimiter.NumRequeues(item)) return false } - e.queue.AddAfter(item, rateLimiter.When(item)) + when := rateLimiter.When(item) + e.queue.AddAfter(item, when) + logger.Debug("Reconcile request has been requeued.", "rateLimiterName", rateLimiterName, "when", when) return true } @@ -94,20 +114,24 @@ func (e *EventHandler) setQueue(limitingInterface workqueue.RateLimitingInterfac func (e *EventHandler) Create(ctx context.Context, ev event.CreateEvent, limitingInterface workqueue.RateLimitingInterface) { e.setQueue(limitingInterface) + e.logger.Debug("Calling the inner handler for Create event.", "name", ev.Object.GetName(), "queueLength", limitingInterface.Len()) e.innerHandler.Create(ctx, ev, limitingInterface) } func (e *EventHandler) Update(ctx context.Context, ev event.UpdateEvent, limitingInterface workqueue.RateLimitingInterface) { e.setQueue(limitingInterface) + e.logger.Debug("Calling the inner handler for Update event.", "name", ev.ObjectOld.GetName(), "queueLength", limitingInterface.Len()) e.innerHandler.Update(ctx, ev, limitingInterface) } func (e *EventHandler) Delete(ctx context.Context, ev event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) { e.setQueue(limitingInterface) + e.logger.Debug("Calling the inner handler for Delete event.", "name", ev.Object.GetName(), "queueLength", limitingInterface.Len()) e.innerHandler.Delete(ctx, ev, limitingInterface) } func (e *EventHandler) Generic(ctx context.Context, ev event.GenericEvent, limitingInterface workqueue.RateLimitingInterface) { e.setQueue(limitingInterface) + e.logger.Debug("Calling the inner handler for Generic event.", "name", ev.Object.GetName(), "queueLength", limitingInterface.Len()) e.innerHandler.Generic(ctx, ev, limitingInterface) } diff --git a/pkg/controller/options.go b/pkg/controller/options.go index eef8e9bf..ae265f4c 100644 --- a/pkg/controller/options.go +++ b/pkg/controller/options.go @@ -11,7 +11,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "github.com/upbound/upjet/pkg/config" - "github.com/upbound/upjet/pkg/controller/handler" "github.com/upbound/upjet/pkg/terraform" ) @@ -39,10 +38,6 @@ type Options struct { // ESSOptions for External Secret Stores. ESSOptions *ESSOptions - - // EventHandler to handle the Kubernetes events and - // to queue reconcile requests. - EventHandler *handler.EventHandler } // ESSOptions for External Secret Stores. diff --git a/pkg/pipeline/templates/controller.go.tmpl b/pkg/pipeline/templates/controller.go.tmpl index 6042e8ba..18d39219 100644 --- a/pkg/pipeline/templates/controller.go.tmpl +++ b/pkg/pipeline/templates/controller.go.tmpl @@ -12,6 +12,7 @@ import ( "github.com/crossplane/crossplane-runtime/pkg/ratelimiter" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" xpresource "github.com/crossplane/crossplane-runtime/pkg/resource" + "github.com/upbound/upjet/pkg/controller/handler" tjcontroller "github.com/upbound/upjet/pkg/controller" "github.com/upbound/upjet/pkg/terraform" ctrl "sigs.k8s.io/controller-runtime" @@ -35,11 +36,12 @@ func Setup(mgr ctrl.Manager, o tjcontroller.Options) error { if o.SecretStoreConfigGVK != nil { cps = append(cps, connection.NewDetailsManager(mgr.GetClient(), *o.SecretStoreConfigGVK, connection.WithTLSConfig(o.ESSOptions.TLSConfig))) } + eventHandler := handler.NewEventHandler(handler.WithLogger(o.Logger.WithValues("gvk", {{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind))) {{- if .UseAsync }} - ac := tjcontroller.NewAPICallbacks(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind), tjcontroller.WithEventHandler(o.EventHandler)) + ac := tjcontroller.NewAPICallbacks(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind), tjcontroller.WithEventHandler(eventHandler)) {{- end}} opts := []managed.ReconcilerOption{ - managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], tjcontroller.WithLogger(o.Logger), tjcontroller.WithConnectorEventHandler(o.EventHandler), + managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], tjcontroller.WithLogger(o.Logger), tjcontroller.WithConnectorEventHandler(eventHandler), {{- if .UseAsync }} tjcontroller.WithCallbackProvider(ac), {{- end}} @@ -63,6 +65,6 @@ func Setup(mgr ctrl.Manager, o tjcontroller.Options) error { Named(name). WithOptions(o.ForControllerRuntime()). WithEventFilter(xpresource.DesiredStateChanged()). - Watches(&{{ .TypePackageAlias }}{{ .CRD.Kind }}{}, o.EventHandler). + Watches(&{{ .TypePackageAlias }}{{ .CRD.Kind }}{}, eventHandler). Complete(ratelimiter.NewReconciler(name, r, o.GlobalRateLimiter)) } diff --git a/pkg/pipeline/templates/setup.go.tmpl b/pkg/pipeline/templates/setup.go.tmpl index dd7754b4..72b9f5a4 100644 --- a/pkg/pipeline/templates/setup.go.tmpl +++ b/pkg/pipeline/templates/setup.go.tmpl @@ -8,7 +8,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "github.com/upbound/upjet/pkg/controller" - "github.com/upbound/upjet/pkg/controller/handler" {{ .Imports }} ) @@ -16,11 +15,6 @@ import ( // Setup{{ .Group }} creates all controllers with the supplied logger and adds them to // the supplied manager. func Setup{{ .Group }}(mgr ctrl.Manager, o controller.Options) error { - // set the default event handler if the provider's main module did not - // set one. - if o.EventHandler == nil { - o.EventHandler = handler.NewEventHandler() - } for _, setup := range []func(ctrl.Manager, controller.Options) error{ {{- range $alias := .Aliases }} {{ $alias }}Setup,