diff --git a/pkg/kube_events_manager/factory.go b/pkg/kube_events_manager/factory.go index f514709b..56147105 100644 --- a/pkg/kube_events_manager/factory.go +++ b/pkg/kube_events_manager/factory.go @@ -5,6 +5,7 @@ import ( "sync" "time" + log "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" @@ -30,10 +31,10 @@ type FactoryIndex struct { } type Factory struct { - shared dynamicinformer.DynamicSharedInformerFactory - score uint64 - ctx context.Context - cancel context.CancelFunc + shared dynamicinformer.DynamicSharedInformerFactory + handlerRegistrations map[string]cache.ResourceEventHandlerRegistration + ctx context.Context + cancel context.CancelFunc } type FactoryStore struct { @@ -47,21 +48,21 @@ func NewFactoryStore() *FactoryStore { } } -func (c *FactoryStore) add(ctx context.Context, index FactoryIndex, f dynamicinformer.DynamicSharedInformerFactory) { - cctx, cancel := context.WithCancel(ctx) +func (c *FactoryStore) add(index FactoryIndex, f dynamicinformer.DynamicSharedInformerFactory) { + ctx, cancel := context.WithCancel(context.Background()) c.data[index] = Factory{ - shared: f, - score: uint64(1), - ctx: cctx, - cancel: cancel, + shared: f, + handlerRegistrations: make(map[string]cache.ResourceEventHandlerRegistration, 0), + ctx: ctx, + cancel: cancel, } + log.Debugf("Factory store: added a new factory for %v index", index) } -func (c *FactoryStore) get(ctx context.Context, client dynamic.Interface, index FactoryIndex) Factory { +func (c *FactoryStore) get(client dynamic.Interface, index FactoryIndex) Factory { f, ok := c.data[index] if ok { - f.score++ - c.data[index] = f + log.Debugf("Factory store: the factory with %v index found", index) return f } @@ -81,21 +82,25 @@ func (c *FactoryStore) get(ctx context.Context, client dynamic.Interface, index client, resyncPeriod, index.Namespace, tweakListOptions) factory.ForResource(index.GVR) - c.add(ctx, index, factory) + c.add(index, factory) return c.data[index] } -func (c *FactoryStore) Start(ctx context.Context, client dynamic.Interface, index FactoryIndex, handler cache.ResourceEventHandler, errorHandler *WatchErrorHandler) error { +func (c *FactoryStore) Start(ctx context.Context, informerId string, client dynamic.Interface, index FactoryIndex, handler cache.ResourceEventHandler, errorHandler *WatchErrorHandler) error { c.mu.Lock() defer c.mu.Unlock() - factory := c.get(ctx, client, index) + factory := c.get(client, index) informer := factory.shared.ForResource(index.GVR).Informer() // Add error handler, ignore "already started" error. _ = informer.SetWatchErrorHandler(errorHandler.handler) - // TODO(nabokihms): think about what will happen if we stop and then start the monitor - informer.AddEventHandler(handler) + registration, err := informer.AddEventHandler(handler) + if err != nil { + log.Warnf("Factory store: couldn't add event handler to the %v factory's informer: %v", index, err) + } + factory.handlerRegistrations[informerId] = registration + log.Debugf("Factory store: increased usage counter to %d of the factory with %v index", len(factory.handlerRegistrations), index) if !informer.HasSynced() { go informer.Run(factory.ctx.Done()) @@ -106,10 +111,11 @@ func (c *FactoryStore) Start(ctx context.Context, client dynamic.Interface, inde return err } } + log.Debugf("Factory store: started informer for %v index", index) return nil } -func (c *FactoryStore) Stop(index FactoryIndex) { +func (c *FactoryStore) Stop(informerId string, index FactoryIndex) { c.mu.Lock() defer c.mu.Unlock() @@ -119,12 +125,17 @@ func (c *FactoryStore) Stop(index FactoryIndex) { return } - f.score-- - if f.score == 0 { - f.cancel() - delete(c.data, index) - return + if handlerRegistration, found := f.handlerRegistrations[informerId]; found { + err := f.shared.ForResource(index.GVR).Informer().RemoveEventHandler(handlerRegistration) + if err != nil { + log.Warnf("Factory store: couldn't remove event handler from the %v factory's informer: %v", index, err) + } + delete(f.handlerRegistrations, informerId) + log.Debugf("Factory store: decreased usage counter to %d of the factory with %v index", len(f.handlerRegistrations), index) + if len(f.handlerRegistrations) == 0 { + f.cancel() + delete(c.data, index) + log.Debugf("Factory store: deleted factory for %v index", index) + } } - - c.data[index] = f } diff --git a/pkg/kube_events_manager/resource_informer.go b/pkg/kube_events_manager/resource_informer.go index 705ca54b..4951b8bb 100644 --- a/pkg/kube_events_manager/resource_informer.go +++ b/pkg/kube_events_manager/resource_informer.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/gofrs/uuid/v5" log "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -20,6 +21,7 @@ import ( ) type resourceInformer struct { + id string KubeClient *klient.Client Monitor *MonitorConfig // Filter by namespace @@ -70,6 +72,7 @@ type resourceInformerConfig struct { func newResourceInformer(ns, name string, cfg *resourceInformerConfig) *resourceInformer { informer := &resourceInformer{ + id: uuid.Must(uuid.NewV4()).String(), KubeClient: cfg.client, metricStorage: cfg.mstor, Namespace: ns, @@ -256,6 +259,11 @@ func (ei *resourceInformer) OnDelete(obj interface{}) { func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType WatchEventType) { // check if stop if ei.stopped { + log.Debugf("%s: received WATCH for a stopped %s/%s informer %s", + ei.Monitor.Metadata.DebugName, + ei.Namespace, + ei.Name, + eventType) return } @@ -427,13 +435,13 @@ func (ei *resourceInformer) start() { go func() { if ei.ctx != nil { <-ei.ctx.Done() - DefaultFactoryStore.Stop(ei.FactoryIndex) + DefaultFactoryStore.Stop(ei.id, ei.FactoryIndex) } }() // TODO: separate handler and informer errorHandler := newWatchErrorHandler(ei.Monitor.Metadata.DebugName, ei.Monitor.Kind, ei.Monitor.Metadata.LogLabels, ei.metricStorage) - err := DefaultFactoryStore.Start(ei.ctx, ei.KubeClient.Dynamic(), ei.FactoryIndex, ei, errorHandler) + err := DefaultFactoryStore.Start(ei.ctx, ei.id, ei.KubeClient.Dynamic(), ei.FactoryIndex, ei, errorHandler) if err != nil { ei.Monitor.LogEntry.Errorf("%s: cache is not synced for informer", ei.Monitor.Metadata.DebugName) return