From e72955a9f303516d6d3f2ecac2758700e1070bda Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Thu, 7 Dec 2023 16:25:40 +0200 Subject: [PATCH 01/31] first update for nodePodUpdater --- kubernetes/eventhandler.go | 64 +++++++++++++++++++++++++++----------- 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index acbc51394d..374ab9b557 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -18,23 +18,27 @@ package kubernetes import ( + "reflect" "sync" + + "github.com/elastic/elastic-agent-libs/logp" ) // ResourceEventHandler can handle notifications for events that happen to a // resource. The events are informational only, so you can't return an // error. -// * OnAdd is called when an object is added. -// * OnUpdate is called when an object is modified. Note that oldObj is the -// last known state of the object-- it is possible that several changes -// were combined together, so you can't use this to see every single -// change. OnUpdate is also called when a re-list happens, and it will -// get called even if nothing changed. This is useful for periodically -// evaluating or syncing something. -// * OnDelete will get the final state of the item if it is known, otherwise -// it will get an object of type DeletedFinalStateUnknown. This can -// happen if the watch is closed and misses the delete event and we don't -// notice the deletion until the subsequent re-list. +// - OnAdd is called when an object is added. +// - OnUpdate is called when an object is modified. Note that oldObj is the +// last known state of the object-- it is possible that several changes +// were combined together, so you can't use this to see every single +// change. OnUpdate is also called when a re-list happens, and it will +// get called even if nothing changed. This is useful for periodically +// evaluating or syncing something. +// - OnDelete will get the final state of the item if it is known, otherwise +// it will get an object of type DeletedFinalStateUnknown. This can +// happen if the watch is closed and misses the delete event and we don't +// notice the deletion until the subsequent re-list. +// // idea: allow the On* methods to return an error so that the RateLimited WorkQueue // idea: can requeue the failed event processing. type ResourceEventHandler interface { @@ -134,6 +138,12 @@ type podUpdaterStore interface { List() []interface{} } +// NodeStore is the interface that an object needs to implement to be +// used as a node shared store. +type NodeStore interface { + List() []interface{} +} + // namespacePodUpdater notifies updates on pods when their namespaces are updated. type namespacePodUpdater struct { handler podUpdaterHandlerFunc @@ -184,17 +194,19 @@ func (*namespacePodUpdater) OnDelete(interface{}) {} // nodePodUpdater notifies updates on pods when their nodes are updated. type nodePodUpdater struct { - handler podUpdaterHandlerFunc - store podUpdaterStore - locker sync.Locker + handler podUpdaterHandlerFunc + store podUpdaterStore + nodestore NodeStore + locker sync.Locker } // NewNodePodUpdater creates a nodePodUpdater -func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *nodePodUpdater { +func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodestore NodeStore, locker sync.Locker) *nodePodUpdater { return &nodePodUpdater{ - handler: handler, - store: store, - locker: locker, + handler: handler, + store: store, + nodestore: nodestore, + locker: locker, } } @@ -205,6 +217,9 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { return } + log := logp.NewLogger("NodeWatcherPas") + log.Errorf("ChangeNode: %v", node) + // n.store.List() returns a snapshot at this point. If a delete is received // from the main watcher, this loop may generate an update event after the // delete is processed, leaving configurations that would never be deleted. @@ -214,9 +229,20 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { n.locker.Lock() defer n.locker.Unlock() } + labelscheck := true + annotationscheck := true + for _, nodes := range n.nodestore.List() { + nodes, ok := nodes.(*Node) + if ok && node.Name == nodes.Name { + labelscheck = reflect.DeepEqual(node.ObjectMeta.Labels, nodes.ObjectMeta.Labels) + annotationscheck = reflect.DeepEqual(node.ObjectMeta.Annotations, nodes.ObjectMeta.Annotations) + } + } for _, pod := range n.store.List() { + log.Errorf("PodtoCheck %v", pod) + pod, ok := pod.(*Pod) - if ok && pod.Spec.NodeName == node.Name { + if ok && pod.Spec.NodeName == node.Name && (!labelscheck || !annotationscheck) { n.handler(pod) } } From d8ace1189424d5a9cb1a7f3b4d3832d21ffed056 Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Fri, 8 Dec 2023 14:27:51 +0200 Subject: [PATCH 02/31] first update for nodePodUpdater with key function --- kubernetes/eventhandler.go | 32 +++++++++++++++++--------------- kubernetes/metadata/pod.go | 1 + 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index 374ab9b557..441c92b9ad 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -21,7 +21,7 @@ import ( "reflect" "sync" - "github.com/elastic/elastic-agent-libs/logp" + "k8s.io/client-go/tools/cache" ) // ResourceEventHandler can handle notifications for events that happen to a @@ -141,7 +141,7 @@ type podUpdaterStore interface { // NodeStore is the interface that an object needs to implement to be // used as a node shared store. type NodeStore interface { - List() []interface{} + GetByKey(string) (interface{}, bool, error) } // namespacePodUpdater notifies updates on pods when their namespaces are updated. @@ -217,8 +217,9 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { return } - log := logp.NewLogger("NodeWatcherPas") - log.Errorf("ChangeNode: %v", node) + key, _ := cache.MetaNamespaceKeyFunc(obj) + //Trying to retrieve from the cache. Get returns the accumulator associated with the given object's key + cachednodeobj, exists, err := n.nodestore.GetByKey(key) // n.store.List() returns a snapshot at this point. If a delete is received // from the main watcher, this loop may generate an update event after the @@ -231,19 +232,20 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { } labelscheck := true annotationscheck := true - for _, nodes := range n.nodestore.List() { - nodes, ok := nodes.(*Node) - if ok && node.Name == nodes.Name { - labelscheck = reflect.DeepEqual(node.ObjectMeta.Labels, nodes.ObjectMeta.Labels) - annotationscheck = reflect.DeepEqual(node.ObjectMeta.Annotations, nodes.ObjectMeta.Annotations) + if err == nil && exists { + cachednode, ok := cachednodeobj.(*Node) + if ok { + labelscheck = reflect.DeepEqual(node.ObjectMeta.Labels, cachednode.ObjectMeta.Labels) + annotationscheck = reflect.DeepEqual(node.ObjectMeta.Annotations, cachednode.ObjectMeta.Annotations) } } - for _, pod := range n.store.List() { - log.Errorf("PodtoCheck %v", pod) - - pod, ok := pod.(*Pod) - if ok && pod.Spec.NodeName == node.Name && (!labelscheck || !annotationscheck) { - n.handler(pod) + // Only if an update happend either in Labels or Annotations proceed with Pod updates + if !labelscheck || !annotationscheck { + for _, pod := range n.store.List() { + pod, ok := pod.(*Pod) + if ok && pod.Spec.NodeName == node.Name { + n.handler(pod) + } } } } diff --git a/kubernetes/metadata/pod.go b/kubernetes/metadata/pod.go index 3a5f3d539e..7dfbb6a85c 100644 --- a/kubernetes/metadata/pod.go +++ b/kubernetes/metadata/pod.go @@ -74,6 +74,7 @@ func (p *pod) Generate(obj kubernetes.Resource, opts ...FieldOptions) mapstr.M { "kubernetes": p.GenerateK8s(obj, opts...), } meta.DeepUpdate(ecsFields) + return meta } From ac43d6ffde80a6ceb72f43fdf6ce887c2c063748 Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Fri, 8 Dec 2023 17:26:28 +0200 Subject: [PATCH 03/31] first update for namespacePodUpdater --- kubernetes/eventhandler.go | 73 +++++++++++++++++++++++++------------- 1 file changed, 49 insertions(+), 24 deletions(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index 441c92b9ad..53f8ab2bbb 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -140,23 +140,25 @@ type podUpdaterStore interface { // NodeStore is the interface that an object needs to implement to be // used as a node shared store. -type NodeStore interface { +type NamespaceStore interface { GetByKey(string) (interface{}, bool, error) } // namespacePodUpdater notifies updates on pods when their namespaces are updated. type namespacePodUpdater struct { - handler podUpdaterHandlerFunc - store podUpdaterStore - locker sync.Locker + handler podUpdaterHandlerFunc + store podUpdaterStore + namespacestore NamespaceStore + locker sync.Locker } // NewNamespacePodUpdater creates a namespacePodUpdater -func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *namespacePodUpdater { +func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespacestore NamespaceStore, locker sync.Locker) *namespacePodUpdater { return &namespacePodUpdater{ - handler: handler, - store: store, - locker: locker, + handler: handler, + store: store, + namespacestore: namespacestore, + locker: locker, } } @@ -166,20 +168,36 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) { if !ok { return } + key, _ := cache.MetaNamespaceKeyFunc(obj) - // n.store.List() returns a snapshot at this point. If a delete is received - // from the main watcher, this loop may generate an update event after the - // delete is processed, leaving configurations that would never be deleted. - // Also this loop can miss updates, what could leave outdated configurations. - // Avoid these issues by locking the processing of events from the main watcher. if n.locker != nil { n.locker.Lock() defer n.locker.Unlock() } - for _, pod := range n.store.List() { - pod, ok := pod.(*Pod) - if ok && pod.Namespace == ns.Name { - n.handler(pod) + //Trying to retrieve from the cache. Getbykey returns the object from cache associated with the given object's key + cachednamespaceobj, exists, err := n.namespacestore.GetByKey(key) + + labelscheck := true + annotationscheck := true + if err == nil && exists { + cachednamespace, ok := cachednamespaceobj.(*Namespace) + if ok { + labelscheck = reflect.DeepEqual(ns.ObjectMeta.Labels, cachednamespace.ObjectMeta.Labels) + annotationscheck = reflect.DeepEqual(ns.ObjectMeta.Annotations, cachednamespace.ObjectMeta.Annotations) + } + } + // Only if an update happend either in Labels or Annotations proceed with Pod updates + if !labelscheck || !annotationscheck { + // n.store.List() returns a snapshot at this point. If a delete is received + // from the main watcher, this loop may generate an update event after the + // delete is processed, leaving configurations that would never be deleted. + // Also this loop can miss updates, what could leave outdated configurations. + // Avoid these issues by locking the processing of events from the main watcher. + for _, pod := range n.store.List() { + pod, ok := pod.(*Pod) + if ok && pod.Namespace == ns.Name { + n.handler(pod) + } } } } @@ -200,6 +218,12 @@ type nodePodUpdater struct { locker sync.Locker } +// NodeStore is the interface that an object needs to implement to be +// used as a node shared store. +type NodeStore interface { + GetByKey(string) (interface{}, bool, error) +} + // NewNodePodUpdater creates a nodePodUpdater func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodestore NodeStore, locker sync.Locker) *nodePodUpdater { return &nodePodUpdater{ @@ -218,18 +242,14 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { } key, _ := cache.MetaNamespaceKeyFunc(obj) - //Trying to retrieve from the cache. Get returns the accumulator associated with the given object's key - cachednodeobj, exists, err := n.nodestore.GetByKey(key) - // n.store.List() returns a snapshot at this point. If a delete is received - // from the main watcher, this loop may generate an update event after the - // delete is processed, leaving configurations that would never be deleted. - // Also this loop can miss updates, what could leave outdated configurations. - // Avoid these issues by locking the processing of events from the main watcher. if n.locker != nil { n.locker.Lock() defer n.locker.Unlock() } + //Trying to retrieve from the cache. Getbykey returns the object from cache associated with the given object's key + cachednodeobj, exists, err := n.nodestore.GetByKey(key) + labelscheck := true annotationscheck := true if err == nil && exists { @@ -241,6 +261,11 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { } // Only if an update happend either in Labels or Annotations proceed with Pod updates if !labelscheck || !annotationscheck { + // n.store.List() returns a snapshot at this point. If a delete is received + // from the main watcher, this loop may generate an update event after the + // delete is processed, leaving configurations that would never be deleted. + // Also this loop can miss updates, what could leave outdated configurations. + // Avoid these issues by locking the processing of events from the main watcher. for _, pod := range n.store.List() { pod, ok := pod.(*Pod) if ok && pod.Spec.NodeName == node.Name { From 979e3c0faee6bf7200eeaafe51a85d23acd87d8f Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Fri, 8 Dec 2023 17:33:55 +0200 Subject: [PATCH 04/31] updating comments for code --- kubernetes/eventhandler.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index 53f8ab2bbb..f97beedbef 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -139,7 +139,7 @@ type podUpdaterStore interface { } // NodeStore is the interface that an object needs to implement to be -// used as a node shared store. +// used as a namespace shared store. type NamespaceStore interface { GetByKey(string) (interface{}, bool, error) } @@ -168,6 +168,7 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) { if !ok { return } + // https://pkg.go.dev/k8s.io/client-go/tools/cache#MetaNamespaceKeyFunc Creates key from provided obj key, _ := cache.MetaNamespaceKeyFunc(obj) if n.locker != nil { @@ -240,7 +241,7 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { if !ok { return } - + // https://pkg.go.dev/k8s.io/client-go/tools/cache#MetaNamespaceKeyFunc Creates key from provided obj key, _ := cache.MetaNamespaceKeyFunc(obj) if n.locker != nil { From 0a9197896a94baca6a8e9093d76e80b0f1727e39 Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Tue, 12 Dec 2023 16:58:23 +0200 Subject: [PATCH 05/31] updating with delta --- kubernetes/eventhandler.go | 60 ++++++++++++++++++++++++++------------ kubernetes/watcher.go | 21 ++++++++++++- 2 files changed, 61 insertions(+), 20 deletions(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index f97beedbef..705b72f52b 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -21,6 +21,8 @@ import ( "reflect" "sync" + "github.com/elastic/elastic-agent-libs/logp" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/cache" ) @@ -144,20 +146,26 @@ type NamespaceStore interface { GetByKey(string) (interface{}, bool, error) } +type NamespaceDelta interface { + Deltaslice() []runtime.Object +} + // namespacePodUpdater notifies updates on pods when their namespaces are updated. type namespacePodUpdater struct { handler podUpdaterHandlerFunc store podUpdaterStore namespacestore NamespaceStore + namespacedelta NamespaceDelta locker sync.Locker } // NewNamespacePodUpdater creates a namespacePodUpdater -func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespacestore NamespaceStore, locker sync.Locker) *namespacePodUpdater { +func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespacestore NamespaceStore, namespacedelta NamespaceDelta, locker sync.Locker) *namespacePodUpdater { return &namespacePodUpdater{ handler: handler, store: store, namespacestore: namespacestore, + namespacedelta: namespacedelta, locker: locker, } } @@ -170,37 +178,49 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) { } // https://pkg.go.dev/k8s.io/client-go/tools/cache#MetaNamespaceKeyFunc Creates key from provided obj key, _ := cache.MetaNamespaceKeyFunc(obj) + lognode := logp.NewLogger("--------Namespace---------") if n.locker != nil { n.locker.Lock() defer n.locker.Unlock() } //Trying to retrieve from the cache. Getbykey returns the object from cache associated with the given object's key + cachednamespaceobj, exists, err := n.namespacestore.GetByKey(key) + slice := n.namespacedelta.Deltaslice() + lognode.Infof("-----Slice-------->: %v, %v ", slice[0], slice[1]) labelscheck := true annotationscheck := true if err == nil && exists { cachednamespace, ok := cachednamespaceobj.(*Namespace) - if ok { - labelscheck = reflect.DeepEqual(ns.ObjectMeta.Labels, cachednamespace.ObjectMeta.Labels) - annotationscheck = reflect.DeepEqual(ns.ObjectMeta.Annotations, cachednamespace.ObjectMeta.Annotations) - } - } - // Only if an update happend either in Labels or Annotations proceed with Pod updates - if !labelscheck || !annotationscheck { - // n.store.List() returns a snapshot at this point. If a delete is received - // from the main watcher, this loop may generate an update event after the - // delete is processed, leaving configurations that would never be deleted. - // Also this loop can miss updates, what could leave outdated configurations. - // Avoid these issues by locking the processing of events from the main watcher. - for _, pod := range n.store.List() { - pod, ok := pod.(*Pod) - if ok && pod.Namespace == ns.Name { - n.handler(pod) + cachednamespaceold, ok := slice[0].(*Namespace) + if ns.Name == cachednamespace.Name { + labelscheck = reflect.DeepEqual(cachednamespace.ObjectMeta.Labels, cachednamespaceold.ObjectMeta.Labels) + annotationscheck = reflect.DeepEqual(cachednamespace.ObjectMeta.Annotations, cachednamespaceold.ObjectMeta.Annotations) + if ok { + // n.store.List() returns a snapshot at this point. If a delete is received + // from the main watcher, this loop may generate an update event after the + // delete is processed, leaving configurations that would never be deleted. + // Also this loop can miss updates, what could leave outdated configurations. + // Avoid these issues by locking the processing of events from the main watcher. + for _, pod := range n.store.List() { + pod, ok := pod.(*Pod) + + if ok && pod.Namespace == cachednamespace.Name { + // Only if an update happend either in Labels or Annotations proceed with Pod updates + lognode.Infof("-----Values-------->: %v, %v ", pod.Namespace, cachednamespace.Name) + lognode.Infof("------Checks------->: %v, %v ", labelscheck, annotationscheck) + lognode.Infof("------Labels------->: %v, %v ", cachednamespace.ObjectMeta.Labels, cachednamespaceold.ObjectMeta.Labels) + if !labelscheck || !annotationscheck { + n.handler(pod) + } + } + } } } } + } // OnAdd handles add events on namespaces. Nothing to do, if pods are added to this @@ -243,6 +263,7 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { } // https://pkg.go.dev/k8s.io/client-go/tools/cache#MetaNamespaceKeyFunc Creates key from provided obj key, _ := cache.MetaNamespaceKeyFunc(obj) + lognode := logp.NewLogger("--------Node---------") if n.locker != nil { n.locker.Lock() @@ -255,9 +276,10 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { annotationscheck := true if err == nil && exists { cachednode, ok := cachednodeobj.(*Node) + lognode.Infof("Node: %v .", cachednode.Labels) if ok { - labelscheck = reflect.DeepEqual(node.ObjectMeta.Labels, cachednode.ObjectMeta.Labels) - annotationscheck = reflect.DeepEqual(node.ObjectMeta.Annotations, cachednode.ObjectMeta.Annotations) + labelscheck = reflect.DeepEqual(node.ObjectMeta.Labels, cachednode.Labels) + annotationscheck = reflect.DeepEqual(node.ObjectMeta.Annotations, cachednode.Annotations) } } // Only if an update happend either in Labels or Annotations proceed with Pod updates diff --git a/kubernetes/watcher.go b/kubernetes/watcher.go index 35b308e09a..9da45f7ae6 100644 --- a/kubernetes/watcher.go +++ b/kubernetes/watcher.go @@ -59,6 +59,9 @@ type Watcher interface { // Client returns the kubernetes client object used by the watcher Client() kubernetes.Interface + + // Delta returns the slice of objects changed by the last updated event + Deltaslice() []runtime.Object } // WatchOptions controls watch behaviors @@ -91,6 +94,7 @@ type watcher struct { stop context.CancelFunc handler ResourceEventHandler logger *logp.Logger + delta []runtime.Object } // NewWatcher initializes the watcher client to provide a events handler for @@ -106,7 +110,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { var store cache.Store var queue workqueue.Interface - + var deltaslice []runtime.Object informer, _, err := NewInformer(client, resource, opts, indexers) if err != nil { return nil, err @@ -132,6 +136,7 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource store: store, queue: queue, ctx: ctx, + delta: deltaslice, stop: cancel, logger: logp.NewLogger("kubernetes"), handler: NoOpEventHandlerFuncs{}, @@ -157,6 +162,8 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource // state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update. w.enqueue(n, add) } + w.deltaslice(o, n) + }, }) @@ -178,6 +185,11 @@ func (w *watcher) Client() kubernetes.Interface { return w.client } +// Client returns the kubernetes client object used by the watcher +func (w *watcher) Deltaslice() []runtime.Object { + return w.delta +} + // Start watching pods func (w *watcher) Start() error { go w.informer.Run(w.ctx.Done()) @@ -217,6 +229,13 @@ func (w *watcher) enqueue(obj interface{}, state string) { w.queue.Add(&item{key, obj, state}) } +func (w *watcher) deltaslice(o interface{}, n interface{}) { + w.delta = w.delta[:0] + w.delta = append(w.delta, o.(runtime.Object)) + w.delta = append(w.delta, n.(runtime.Object)) + +} + // process gets the top of the work queue and processes the object that is received. func (w *watcher) process(_ context.Context) bool { obj, quit := w.queue.Get() From 853661d6eece8ce7208216ad1ee0731ef6bf7dca Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Wed, 13 Dec 2023 10:25:55 +0200 Subject: [PATCH 06/31] fixing referneces only to one wather as an argument --- kubernetes/eventhandler.go | 155 +++++++++++++++---------------------- kubernetes/watcher.go | 13 ++-- 2 files changed, 70 insertions(+), 98 deletions(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index 705b72f52b..15e6e13469 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -23,7 +23,6 @@ import ( "github.com/elastic/elastic-agent-libs/logp" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/cache" ) // ResourceEventHandler can handle notifications for events that happen to a @@ -140,33 +139,25 @@ type podUpdaterStore interface { List() []interface{} } -// NodeStore is the interface that an object needs to implement to be -// used as a namespace shared store. -type NamespaceStore interface { - GetByKey(string) (interface{}, bool, error) -} - -type NamespaceDelta interface { - Deltaslice() []runtime.Object +type UpdateWatcher interface { + Deltaobjects() []runtime.Object } // namespacePodUpdater notifies updates on pods when their namespaces are updated. type namespacePodUpdater struct { - handler podUpdaterHandlerFunc - store podUpdaterStore - namespacestore NamespaceStore - namespacedelta NamespaceDelta - locker sync.Locker + handler podUpdaterHandlerFunc + store podUpdaterStore + namespacewatcher UpdateWatcher + locker sync.Locker } // NewNamespacePodUpdater creates a namespacePodUpdater -func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespacestore NamespaceStore, namespacedelta NamespaceDelta, locker sync.Locker) *namespacePodUpdater { +func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespacewatcher UpdateWatcher, locker sync.Locker) *namespacePodUpdater { return &namespacePodUpdater{ - handler: handler, - store: store, - namespacestore: namespacestore, - namespacedelta: namespacedelta, - locker: locker, + handler: handler, + store: store, + namespacewatcher: namespacewatcher, + locker: locker, } } @@ -176,46 +167,33 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) { if !ok { return } - // https://pkg.go.dev/k8s.io/client-go/tools/cache#MetaNamespaceKeyFunc Creates key from provided obj - key, _ := cache.MetaNamespaceKeyFunc(obj) lognode := logp.NewLogger("--------Namespace---------") if n.locker != nil { n.locker.Lock() defer n.locker.Unlock() } - //Trying to retrieve from the cache. Getbykey returns the object from cache associated with the given object's key - cachednamespaceobj, exists, err := n.namespacestore.GetByKey(key) - slice := n.namespacedelta.Deltaslice() + // Slice includes the old and new version of caching object that changes in the current update event. slice[0] is the old version and slice[1] the new updated one + slice := n.namespacewatcher.Deltaobjects() + cachednamespaceold, ok := slice[0].(*Namespace) + lognode.Infof("-----Slice-------->: %v, %v ", slice[0], slice[1]) - labelscheck := true - annotationscheck := true - if err == nil && exists { - cachednamespace, ok := cachednamespaceobj.(*Namespace) - cachednamespaceold, ok := slice[0].(*Namespace) - if ns.Name == cachednamespace.Name { - labelscheck = reflect.DeepEqual(cachednamespace.ObjectMeta.Labels, cachednamespaceold.ObjectMeta.Labels) - annotationscheck = reflect.DeepEqual(cachednamespace.ObjectMeta.Annotations, cachednamespaceold.ObjectMeta.Annotations) - if ok { - // n.store.List() returns a snapshot at this point. If a delete is received - // from the main watcher, this loop may generate an update event after the - // delete is processed, leaving configurations that would never be deleted. - // Also this loop can miss updates, what could leave outdated configurations. - // Avoid these issues by locking the processing of events from the main watcher. - for _, pod := range n.store.List() { - pod, ok := pod.(*Pod) - - if ok && pod.Namespace == cachednamespace.Name { - // Only if an update happend either in Labels or Annotations proceed with Pod updates - lognode.Infof("-----Values-------->: %v, %v ", pod.Namespace, cachednamespace.Name) - lognode.Infof("------Checks------->: %v, %v ", labelscheck, annotationscheck) - lognode.Infof("------Labels------->: %v, %v ", cachednamespace.ObjectMeta.Labels, cachednamespaceold.ObjectMeta.Labels) - if !labelscheck || !annotationscheck { - n.handler(pod) - } - } + if ns.Name == cachednamespaceold.Name && ok { + labelscheck := checkMetadata(ns.ObjectMeta.Labels, cachednamespaceold.ObjectMeta.Labels) + annotationscheck := checkMetadata(ns.ObjectMeta.Annotations, cachednamespaceold.ObjectMeta.Annotations) + // Only if there is a diffrence in Metadata labels or annotations proceed to Pod update + if !labelscheck || !annotationscheck { + // n.store.List() returns a snapshot at this point. If a delete is received + // from the main watcher, this loop may generate an update event after the + // delete is processed, leaving configurations that would never be deleted. + // Also this loop can miss updates, what could leave outdated configurations. + // Avoid these issues by locking the processing of events from the main watcher. + for _, pod := range n.store.List() { + pod, ok := pod.(*Pod) + if ok && pod.Namespace == ns.Name { + n.handler(pod) } } } @@ -233,25 +211,19 @@ func (*namespacePodUpdater) OnDelete(interface{}) {} // nodePodUpdater notifies updates on pods when their nodes are updated. type nodePodUpdater struct { - handler podUpdaterHandlerFunc - store podUpdaterStore - nodestore NodeStore - locker sync.Locker -} - -// NodeStore is the interface that an object needs to implement to be -// used as a node shared store. -type NodeStore interface { - GetByKey(string) (interface{}, bool, error) + handler podUpdaterHandlerFunc + store podUpdaterStore + nodewatcher UpdateWatcher + locker sync.Locker } // NewNodePodUpdater creates a nodePodUpdater -func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodestore NodeStore, locker sync.Locker) *nodePodUpdater { +func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodewatcher UpdateWatcher, locker sync.Locker) *nodePodUpdater { return &nodePodUpdater{ - handler: handler, - store: store, - nodestore: nodestore, - locker: locker, + handler: handler, + store: store, + nodewatcher: nodewatcher, + locker: locker, } } @@ -261,38 +233,29 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { if !ok { return } - // https://pkg.go.dev/k8s.io/client-go/tools/cache#MetaNamespaceKeyFunc Creates key from provided obj - key, _ := cache.MetaNamespaceKeyFunc(obj) - lognode := logp.NewLogger("--------Node---------") if n.locker != nil { n.locker.Lock() defer n.locker.Unlock() } - //Trying to retrieve from the cache. Getbykey returns the object from cache associated with the given object's key - cachednodeobj, exists, err := n.nodestore.GetByKey(key) - - labelscheck := true - annotationscheck := true - if err == nil && exists { - cachednode, ok := cachednodeobj.(*Node) - lognode.Infof("Node: %v .", cachednode.Labels) - if ok { - labelscheck = reflect.DeepEqual(node.ObjectMeta.Labels, cachednode.Labels) - annotationscheck = reflect.DeepEqual(node.ObjectMeta.Annotations, cachednode.Annotations) - } - } - // Only if an update happend either in Labels or Annotations proceed with Pod updates - if !labelscheck || !annotationscheck { - // n.store.List() returns a snapshot at this point. If a delete is received - // from the main watcher, this loop may generate an update event after the - // delete is processed, leaving configurations that would never be deleted. - // Also this loop can miss updates, what could leave outdated configurations. - // Avoid these issues by locking the processing of events from the main watcher. - for _, pod := range n.store.List() { - pod, ok := pod.(*Pod) - if ok && pod.Spec.NodeName == node.Name { - n.handler(pod) + slice := n.nodewatcher.Deltaobjects() + cachednodeold, ok := slice[0].(*Node) + + if node.Name == cachednodeold.Name && ok { + labelscheck := checkMetadata(node.ObjectMeta.Labels, cachednodeold.ObjectMeta.Labels) + annotationscheck := checkMetadata(node.ObjectMeta.Annotations, cachednodeold.ObjectMeta.Annotations) + // Only if there is a diffrence in Metadata labels or annotations proceed to Pod update + if !labelscheck || !annotationscheck { + // n.store.List() returns a snapshot at this point. If a delete is received + // from the main watcher, this loop may generate an update event after the + // delete is processed, leaving configurations that would never be deleted. + // Also this loop can miss updates, what could leave outdated configurations. + // Avoid these issues by locking the processing of events from the main watcher. + for _, pod := range n.store.List() { + pod, ok := pod.(*Pod) + if ok && pod.Spec.NodeName == node.Name { + n.handler(pod) + } } } } @@ -305,3 +268,9 @@ func (*nodePodUpdater) OnAdd(interface{}) {} // OnDelete handles delete events on namespaces. Nothing to do, if pods are deleted from this // namespace they will generate their own delete events. func (*nodePodUpdater) OnDelete(interface{}) {} + +// checkMetadata receives labels or annotations maps and checks their equality. Returns True if equal, False if there is a diffrenece +func checkMetadata(newmetadata, oldmetadata map[string]string) bool { + check := reflect.DeepEqual(newmetadata, oldmetadata) + return check +} diff --git a/kubernetes/watcher.go b/kubernetes/watcher.go index 9da45f7ae6..3bd33cdfdf 100644 --- a/kubernetes/watcher.go +++ b/kubernetes/watcher.go @@ -60,8 +60,8 @@ type Watcher interface { // Client returns the kubernetes client object used by the watcher Client() kubernetes.Interface - // Delta returns the slice of objects changed by the last updated event - Deltaslice() []runtime.Object + // Deltaobjects returns the slice of objects that change during the last updated event + Deltaobjects() []runtime.Object } // WatchOptions controls watch behaviors @@ -162,7 +162,7 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource // state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update. w.enqueue(n, add) } - w.deltaslice(o, n) + w.deltaobjects(o, n) }, }) @@ -186,7 +186,7 @@ func (w *watcher) Client() kubernetes.Interface { } // Client returns the kubernetes client object used by the watcher -func (w *watcher) Deltaslice() []runtime.Object { +func (w *watcher) Deltaobjects() []runtime.Object { return w.delta } @@ -229,7 +229,10 @@ func (w *watcher) enqueue(obj interface{}, state string) { w.queue.Add(&item{key, obj, state}) } -func (w *watcher) deltaslice(o interface{}, n interface{}) { +// deltaobjects creates a slice with the old and the new version of cache objects that are ready to chane on update events +// returns a delta struct to the watcher. w.delta[0] is the old version and w.delta[1] the new updated one +func (w *watcher) deltaobjects(o interface{}, n interface{}) { + //w.delta[:0] initialises always the delta struct before new assignement w.delta = w.delta[:0] w.delta = append(w.delta, o.(runtime.Object)) w.delta = append(w.delta, n.(runtime.Object)) From 5ea29d15dfc768913c9d1ff5f211b45072f3af1d Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Wed, 13 Dec 2023 11:20:30 +0200 Subject: [PATCH 07/31] removing comments --- kubernetes/eventhandler.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index 15e6e13469..0f2f43de04 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -21,7 +21,6 @@ import ( "reflect" "sync" - "github.com/elastic/elastic-agent-libs/logp" "k8s.io/apimachinery/pkg/runtime" ) @@ -167,7 +166,6 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) { if !ok { return } - lognode := logp.NewLogger("--------Namespace---------") if n.locker != nil { n.locker.Lock() @@ -178,8 +176,6 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) { slice := n.namespacewatcher.Deltaobjects() cachednamespaceold, ok := slice[0].(*Namespace) - lognode.Infof("-----Slice-------->: %v, %v ", slice[0], slice[1]) - if ns.Name == cachednamespaceold.Name && ok { labelscheck := checkMetadata(ns.ObjectMeta.Labels, cachednamespaceold.ObjectMeta.Labels) annotationscheck := checkMetadata(ns.ObjectMeta.Annotations, cachednamespaceold.ObjectMeta.Annotations) @@ -238,6 +234,8 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { n.locker.Lock() defer n.locker.Unlock() } + + // Slice includes the old and new version of caching object that changes in the current update event. slice[0] is the old version and slice[1] the new updated one slice := n.nodewatcher.Deltaobjects() cachednodeold, ok := slice[0].(*Node) From 1c86749452152e9ad2d013a3c53e03fc2cdca554 Mon Sep 17 00:00:00 2001 From: Andrew Gizas Date: Wed, 13 Dec 2023 12:26:14 +0200 Subject: [PATCH 08/31] Update eventhandler.go - Fixing typo Fixing typo --- kubernetes/eventhandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index 0f2f43de04..8322e9554d 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -267,7 +267,7 @@ func (*nodePodUpdater) OnAdd(interface{}) {} // namespace they will generate their own delete events. func (*nodePodUpdater) OnDelete(interface{}) {} -// checkMetadata receives labels or annotations maps and checks their equality. Returns True if equal, False if there is a diffrenece +// checkMetadata receives labels or annotations maps and checks their equality. Returns True if equal, False if there is a diffrence func checkMetadata(newmetadata, oldmetadata map[string]string) bool { check := reflect.DeepEqual(newmetadata, oldmetadata) return check From b01ddcc35590db36099feb42b4aa394a5184282c Mon Sep 17 00:00:00 2001 From: Andrew Gizas Date: Wed, 13 Dec 2023 12:28:19 +0200 Subject: [PATCH 09/31] Update watcher.go comment in deltaslice function --- kubernetes/watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kubernetes/watcher.go b/kubernetes/watcher.go index 3bd33cdfdf..3dc9c692c8 100644 --- a/kubernetes/watcher.go +++ b/kubernetes/watcher.go @@ -185,7 +185,7 @@ func (w *watcher) Client() kubernetes.Interface { return w.client } -// Client returns the kubernetes client object used by the watcher +// Deltaobjects returns the slice of objects that change during the last updated event func (w *watcher) Deltaobjects() []runtime.Object { return w.delta } From e78177c486b6f9d3c56539290f8e57929626594b Mon Sep 17 00:00:00 2001 From: Andrew Gizas Date: Tue, 9 Jan 2024 11:17:52 +0200 Subject: [PATCH 10/31] Update kubernetes/eventhandler.go Co-authored-by: Tetiana Kravchenko --- kubernetes/eventhandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index 8322e9554d..64f271c999 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -179,7 +179,7 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) { if ns.Name == cachednamespaceold.Name && ok { labelscheck := checkMetadata(ns.ObjectMeta.Labels, cachednamespaceold.ObjectMeta.Labels) annotationscheck := checkMetadata(ns.ObjectMeta.Annotations, cachednamespaceold.ObjectMeta.Annotations) - // Only if there is a diffrence in Metadata labels or annotations proceed to Pod update + // Only if there is a difference in Metadata labels or annotations proceed to Pod update if !labelscheck || !annotationscheck { // n.store.List() returns a snapshot at this point. If a delete is received // from the main watcher, this loop may generate an update event after the From cdfab0097f1d5694d577524d51ba2e6381ced6d7 Mon Sep 17 00:00:00 2001 From: Andrew Gizas Date: Tue, 9 Jan 2024 11:18:08 +0200 Subject: [PATCH 11/31] Update kubernetes/eventhandler.go Co-authored-by: Tetiana Kravchenko --- kubernetes/eventhandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index 64f271c999..e956707d74 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -242,7 +242,7 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { if node.Name == cachednodeold.Name && ok { labelscheck := checkMetadata(node.ObjectMeta.Labels, cachednodeold.ObjectMeta.Labels) annotationscheck := checkMetadata(node.ObjectMeta.Annotations, cachednodeold.ObjectMeta.Annotations) - // Only if there is a diffrence in Metadata labels or annotations proceed to Pod update + // Only if there is a difference in Metadata labels or annotations proceed to Pod update if !labelscheck || !annotationscheck { // n.store.List() returns a snapshot at this point. If a delete is received // from the main watcher, this loop may generate an update event after the From 416ab77ca7076780617a96001b65b9c60d2aa7dc Mon Sep 17 00:00:00 2001 From: Andrew Gizas Date: Tue, 9 Jan 2024 11:18:27 +0200 Subject: [PATCH 12/31] Update kubernetes/eventhandler.go Co-authored-by: Tetiana Kravchenko --- kubernetes/eventhandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index e956707d74..53477fadf5 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -176,7 +176,7 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) { slice := n.namespacewatcher.Deltaobjects() cachednamespaceold, ok := slice[0].(*Namespace) - if ns.Name == cachednamespaceold.Name && ok { + if ok && ns.Name == cachednamespaceold.Name { labelscheck := checkMetadata(ns.ObjectMeta.Labels, cachednamespaceold.ObjectMeta.Labels) annotationscheck := checkMetadata(ns.ObjectMeta.Annotations, cachednamespaceold.ObjectMeta.Annotations) // Only if there is a difference in Metadata labels or annotations proceed to Pod update From a7108642c7b5dd3bd63e4a3903b071119cf5d3b7 Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Tue, 9 Jan 2024 12:46:26 +0200 Subject: [PATCH 13/31] updates for delta struct and comments --- kubernetes/eventhandler.go | 54 ++++++++++++++++++++------------------ kubernetes/watcher.go | 29 ++++++++++---------- 2 files changed, 43 insertions(+), 40 deletions(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index 8322e9554d..78464879d8 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -20,8 +20,6 @@ package kubernetes import ( "reflect" "sync" - - "k8s.io/apimachinery/pkg/runtime" ) // ResourceEventHandler can handle notifications for events that happen to a @@ -138,8 +136,10 @@ type podUpdaterStore interface { List() []interface{} } +// UpdateWatcher is the interface that an object needs to implement to be +// able to use DeltaObject cache event function. type UpdateWatcher interface { - Deltaobjects() []runtime.Object + Deltaobjects() Delta } // namespacePodUpdater notifies updates on pods when their namespaces are updated. @@ -167,25 +167,26 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) { return } + // n.store.List() returns a snapshot at this point. If a delete is received + // from the main watcher, this loop may generate an update event after the + // delete is processed, leaving configurations that would never be deleted. + // Also this loop can miss updates, what could leave outdated configurations. + // Avoid these issues by locking the processing of events from the main watcher. if n.locker != nil { n.locker.Lock() defer n.locker.Unlock() } - // Slice includes the old and new version of caching object that changes in the current update event. slice[0] is the old version and slice[1] the new updated one - slice := n.namespacewatcher.Deltaobjects() - cachednamespaceold, ok := slice[0].(*Namespace) + // deltaobjects includes the old and new version of caching object that changes in the current update event. + // We compare the cached old version of object with the new one that triggers the update + deltaobjects := n.namespacewatcher.Deltaobjects() + cachednamespaceold, ok := deltaobjects.old.(*Namespace) if ns.Name == cachednamespaceold.Name && ok { - labelscheck := checkMetadata(ns.ObjectMeta.Labels, cachednamespaceold.ObjectMeta.Labels) - annotationscheck := checkMetadata(ns.ObjectMeta.Annotations, cachednamespaceold.ObjectMeta.Annotations) + labelscheck := isEqualMetadata(ns.ObjectMeta.Labels, cachednamespaceold.ObjectMeta.Labels) + annotationscheck := isEqualMetadata(ns.ObjectMeta.Annotations, cachednamespaceold.ObjectMeta.Annotations) // Only if there is a diffrence in Metadata labels or annotations proceed to Pod update if !labelscheck || !annotationscheck { - // n.store.List() returns a snapshot at this point. If a delete is received - // from the main watcher, this loop may generate an update event after the - // delete is processed, leaving configurations that would never be deleted. - // Also this loop can miss updates, what could leave outdated configurations. - // Avoid these issues by locking the processing of events from the main watcher. for _, pod := range n.store.List() { pod, ok := pod.(*Pod) if ok && pod.Namespace == ns.Name { @@ -230,25 +231,26 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { return } + // n.store.List() returns a snapshot at this point. If a delete is received + // from the main watcher, this loop may generate an update event after the + // delete is processed, leaving configurations that would never be deleted. + // Also this loop can miss updates, what could leave outdated configurations. + // Avoid these issues by locking the processing of events from the main watcher. if n.locker != nil { n.locker.Lock() defer n.locker.Unlock() } - // Slice includes the old and new version of caching object that changes in the current update event. slice[0] is the old version and slice[1] the new updated one - slice := n.nodewatcher.Deltaobjects() - cachednodeold, ok := slice[0].(*Node) + // deltaobjects includes the old and new version of caching object that changes in the current update event. + // We compare the cached old version of object with the new one that triggers the update + deltaobjects := n.nodewatcher.Deltaobjects() + cachednodeold, ok := deltaobjects.old.(*Node) - if node.Name == cachednodeold.Name && ok { - labelscheck := checkMetadata(node.ObjectMeta.Labels, cachednodeold.ObjectMeta.Labels) - annotationscheck := checkMetadata(node.ObjectMeta.Annotations, cachednodeold.ObjectMeta.Annotations) + if ok && node.Name == cachednodeold.Name { + labelscheck := isEqualMetadata(node.ObjectMeta.Labels, cachednodeold.ObjectMeta.Labels) + annotationscheck := isEqualMetadata(node.ObjectMeta.Annotations, cachednodeold.ObjectMeta.Annotations) // Only if there is a diffrence in Metadata labels or annotations proceed to Pod update if !labelscheck || !annotationscheck { - // n.store.List() returns a snapshot at this point. If a delete is received - // from the main watcher, this loop may generate an update event after the - // delete is processed, leaving configurations that would never be deleted. - // Also this loop can miss updates, what could leave outdated configurations. - // Avoid these issues by locking the processing of events from the main watcher. for _, pod := range n.store.List() { pod, ok := pod.(*Pod) if ok && pod.Spec.NodeName == node.Name { @@ -267,8 +269,8 @@ func (*nodePodUpdater) OnAdd(interface{}) {} // namespace they will generate their own delete events. func (*nodePodUpdater) OnDelete(interface{}) {} -// checkMetadata receives labels or annotations maps and checks their equality. Returns True if equal, False if there is a diffrence -func checkMetadata(newmetadata, oldmetadata map[string]string) bool { +// isEqualMetadata receives labels or annotations maps and checks their equality. Returns True if equal, False if there is a diffrence +func isEqualMetadata(newmetadata, oldmetadata map[string]string) bool { check := reflect.DeepEqual(newmetadata, oldmetadata) return check } diff --git a/kubernetes/watcher.go b/kubernetes/watcher.go index 3dc9c692c8..986a87c637 100644 --- a/kubernetes/watcher.go +++ b/kubernetes/watcher.go @@ -60,8 +60,8 @@ type Watcher interface { // Client returns the kubernetes client object used by the watcher Client() kubernetes.Interface - // Deltaobjects returns the slice of objects that change during the last updated event - Deltaobjects() []runtime.Object + // Deltaobjects returns the objects struct that change during the last updated event + Deltaobjects() Delta } // WatchOptions controls watch behaviors @@ -85,6 +85,11 @@ type item struct { state string } +type Delta struct { + old runtime.Object + new runtime.Object +} + type watcher struct { client kubernetes.Interface informer cache.SharedInformer @@ -94,7 +99,7 @@ type watcher struct { stop context.CancelFunc handler ResourceEventHandler logger *logp.Logger - delta []runtime.Object + delta Delta } // NewWatcher initializes the watcher client to provide a events handler for @@ -110,7 +115,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { var store cache.Store var queue workqueue.Interface - var deltaslice []runtime.Object + var delta Delta informer, _, err := NewInformer(client, resource, opts, indexers) if err != nil { return nil, err @@ -136,7 +141,7 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource store: store, queue: queue, ctx: ctx, - delta: deltaslice, + delta: delta, stop: cancel, logger: logp.NewLogger("kubernetes"), handler: NoOpEventHandlerFuncs{}, @@ -185,8 +190,8 @@ func (w *watcher) Client() kubernetes.Interface { return w.client } -// Deltaobjects returns the slice of objects that change during the last updated event -func (w *watcher) Deltaobjects() []runtime.Object { +// Deltaobjects returns the objects struct that change during the last updated event +func (w *watcher) Deltaobjects() Delta { return w.delta } @@ -229,14 +234,10 @@ func (w *watcher) enqueue(obj interface{}, state string) { w.queue.Add(&item{key, obj, state}) } -// deltaobjects creates a slice with the old and the new version of cache objects that are ready to chane on update events -// returns a delta struct to the watcher. w.delta[0] is the old version and w.delta[1] the new updated one +// deltaobjects updates the delta struct with the old and the new version of cache objects that are ready to change on update events func (w *watcher) deltaobjects(o interface{}, n interface{}) { - //w.delta[:0] initialises always the delta struct before new assignement - w.delta = w.delta[:0] - w.delta = append(w.delta, o.(runtime.Object)) - w.delta = append(w.delta, n.(runtime.Object)) - + w.delta.old = o.(runtime.Object) + w.delta.new = n.(runtime.Object) } // process gets the top of the work queue and processes the object that is received. From 6cb89b9bdf8e615888478fdda561256c396e60c8 Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Tue, 9 Jan 2024 14:32:11 +0200 Subject: [PATCH 14/31] returning only old cached object --- kubernetes/eventhandler.go | 18 +++++------ kubernetes/watcher.go | 62 +++++++++++++++++--------------------- 2 files changed, 35 insertions(+), 45 deletions(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index 70abfefb53..bb5f448c5f 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -20,6 +20,8 @@ package kubernetes import ( "reflect" "sync" + + "k8s.io/apimachinery/pkg/runtime" ) // ResourceEventHandler can handle notifications for events that happen to a @@ -139,7 +141,7 @@ type podUpdaterStore interface { // UpdateWatcher is the interface that an object needs to implement to be // able to use DeltaObject cache event function. type UpdateWatcher interface { - Deltaobjects() Delta + Oldobject() runtime.Object } // namespacePodUpdater notifies updates on pods when their namespaces are updated. @@ -176,11 +178,8 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) { n.locker.Lock() defer n.locker.Unlock() } - - // deltaobjects includes the old and new version of caching object that changes in the current update event. - // We compare the cached old version of object with the new one that triggers the update - deltaobjects := n.namespacewatcher.Deltaobjects() - cachednamespaceold, ok := deltaobjects.old.(*Namespace) + oldobject := n.namespacewatcher.Oldobject() + cachednamespaceold, ok := oldobject.(*Namespace) if ok && ns.Name == cachednamespaceold.Name { labelscheck := isEqualMetadata(ns.ObjectMeta.Labels, cachednamespaceold.ObjectMeta.Labels) @@ -240,11 +239,8 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { n.locker.Lock() defer n.locker.Unlock() } - - // deltaobjects includes the old and new version of caching object that changes in the current update event. - // We compare the cached old version of object with the new one that triggers the update - deltaobjects := n.nodewatcher.Deltaobjects() - cachednodeold, ok := deltaobjects.old.(*Node) + oldobject := n.nodewatcher.Oldobject() + cachednodeold, ok := oldobject.(*Node) if ok && node.Name == cachednodeold.Name { labelscheck := isEqualMetadata(node.ObjectMeta.Labels, cachednodeold.ObjectMeta.Labels) diff --git a/kubernetes/watcher.go b/kubernetes/watcher.go index 986a87c637..5ae6be82a1 100644 --- a/kubernetes/watcher.go +++ b/kubernetes/watcher.go @@ -60,8 +60,8 @@ type Watcher interface { // Client returns the kubernetes client object used by the watcher Client() kubernetes.Interface - // Deltaobjects returns the objects struct that change during the last updated event - Deltaobjects() Delta + // Oldobject returns the old object before change during the last updated event + Oldobject() runtime.Object } // WatchOptions controls watch behaviors @@ -85,21 +85,16 @@ type item struct { state string } -type Delta struct { - old runtime.Object - new runtime.Object -} - type watcher struct { - client kubernetes.Interface - informer cache.SharedInformer - store cache.Store - queue workqueue.Interface - ctx context.Context - stop context.CancelFunc - handler ResourceEventHandler - logger *logp.Logger - delta Delta + client kubernetes.Interface + informer cache.SharedInformer + store cache.Store + queue workqueue.Interface + ctx context.Context + stop context.CancelFunc + handler ResourceEventHandler + logger *logp.Logger + oldobject runtime.Object } // NewWatcher initializes the watcher client to provide a events handler for @@ -115,7 +110,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { var store cache.Store var queue workqueue.Interface - var delta Delta + var oldobject runtime.Object informer, _, err := NewInformer(client, resource, opts, indexers) if err != nil { return nil, err @@ -136,15 +131,15 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource ctx, cancel := context.WithCancel(context.TODO()) w := &watcher{ - client: client, - informer: informer, - store: store, - queue: queue, - ctx: ctx, - delta: delta, - stop: cancel, - logger: logp.NewLogger("kubernetes"), - handler: NoOpEventHandlerFuncs{}, + client: client, + informer: informer, + store: store, + queue: queue, + ctx: ctx, + oldobject: oldobject, + stop: cancel, + logger: logp.NewLogger("kubernetes"), + handler: NoOpEventHandlerFuncs{}, } w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -167,7 +162,7 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource // state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update. w.enqueue(n, add) } - w.deltaobjects(o, n) + w.oldobjectreturn(o) }, }) @@ -190,9 +185,9 @@ func (w *watcher) Client() kubernetes.Interface { return w.client } -// Deltaobjects returns the objects struct that change during the last updated event -func (w *watcher) Deltaobjects() Delta { - return w.delta +// Oldbject returns the old object in cache during the last updated event +func (w *watcher) Oldobject() runtime.Object { + return w.oldobject } // Start watching pods @@ -234,10 +229,9 @@ func (w *watcher) enqueue(obj interface{}, state string) { w.queue.Add(&item{key, obj, state}) } -// deltaobjects updates the delta struct with the old and the new version of cache objects that are ready to change on update events -func (w *watcher) deltaobjects(o interface{}, n interface{}) { - w.delta.old = o.(runtime.Object) - w.delta.new = n.(runtime.Object) +// oldobjectreturn returns the old version of cache objects before change on update events +func (w *watcher) oldobjectreturn(o interface{}) { + w.oldobject = o.(runtime.Object) } // process gets the top of the work queue and processes the object that is received. From 13bff5838fc3b46c8b6b22fe5dc764607c185f1f Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Tue, 9 Jan 2024 14:39:05 +0200 Subject: [PATCH 15/31] fixing comment after upgrade to return old object --- kubernetes/eventhandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index bb5f448c5f..8ebe4533bc 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -139,7 +139,7 @@ type podUpdaterStore interface { } // UpdateWatcher is the interface that an object needs to implement to be -// able to use DeltaObject cache event function. +// able to use Oldobject cache event function from watcher. type UpdateWatcher interface { Oldobject() runtime.Object } From 9ee164e2406a851e8b02928826b1ac7ab81125e4 Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Tue, 9 Jan 2024 14:48:24 +0200 Subject: [PATCH 16/31] changing name of interface --- kubernetes/eventhandler.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index 8ebe4533bc..863590ac29 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -138,9 +138,9 @@ type podUpdaterStore interface { List() []interface{} } -// UpdateWatcher is the interface that an object needs to implement to be +// OldobjectWatcher is the interface that an object needs to implement to be // able to use Oldobject cache event function from watcher. -type UpdateWatcher interface { +type OldobjectWatcher interface { Oldobject() runtime.Object } @@ -148,12 +148,12 @@ type UpdateWatcher interface { type namespacePodUpdater struct { handler podUpdaterHandlerFunc store podUpdaterStore - namespacewatcher UpdateWatcher + namespacewatcher OldobjectWatcher locker sync.Locker } // NewNamespacePodUpdater creates a namespacePodUpdater -func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespacewatcher UpdateWatcher, locker sync.Locker) *namespacePodUpdater { +func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespacewatcher OldobjectWatcher, locker sync.Locker) *namespacePodUpdater { return &namespacePodUpdater{ handler: handler, store: store, @@ -209,12 +209,12 @@ func (*namespacePodUpdater) OnDelete(interface{}) {} type nodePodUpdater struct { handler podUpdaterHandlerFunc store podUpdaterStore - nodewatcher UpdateWatcher + nodewatcher OldobjectWatcher locker sync.Locker } // NewNodePodUpdater creates a nodePodUpdater -func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodewatcher UpdateWatcher, locker sync.Locker) *nodePodUpdater { +func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodewatcher OldobjectWatcher, locker sync.Locker) *nodePodUpdater { return &nodePodUpdater{ handler: handler, store: store, From 328f78da048175a39068e38a1009e6e2ff2ee31e Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Tue, 9 Jan 2024 15:10:28 +0200 Subject: [PATCH 17/31] adding check for oldobjectreturn --- kubernetes/watcher.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/kubernetes/watcher.go b/kubernetes/watcher.go index 5ae6be82a1..cd786a42ac 100644 --- a/kubernetes/watcher.go +++ b/kubernetes/watcher.go @@ -231,7 +231,11 @@ func (w *watcher) enqueue(obj interface{}, state string) { // oldobjectreturn returns the old version of cache objects before change on update events func (w *watcher) oldobjectreturn(o interface{}) { - w.oldobject = o.(runtime.Object) + if old, ok := o.(runtime.Object); !ok { + utilruntime.HandleError(fmt.Errorf("expected object in cache got %#v", o)) + } else { + w.oldobject = old + } } // process gets the top of the work queue and processes the object that is received. From 2d73a987b32f9eac148ce4987040185db9f84901 Mon Sep 17 00:00:00 2001 From: Andrew Gizas Date: Tue, 9 Jan 2024 15:19:08 +0200 Subject: [PATCH 18/31] Update kubernetes/eventhandler.go Co-authored-by: Tetiana Kravchenko --- kubernetes/eventhandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index 863590ac29..bda1536744 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -245,7 +245,7 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { if ok && node.Name == cachednodeold.Name { labelscheck := isEqualMetadata(node.ObjectMeta.Labels, cachednodeold.ObjectMeta.Labels) annotationscheck := isEqualMetadata(node.ObjectMeta.Annotations, cachednodeold.ObjectMeta.Annotations) - // Only if there is a diffrence in Metadata labels or annotations proceed to Pod update + // Only if there is a difference in Metadata labels or annotations proceed to Pod update if !labelscheck || !annotationscheck { for _, pod := range n.store.List() { pod, ok := pod.(*Pod) From 76b4dce466607616f80db431437d17b9eacdc70c Mon Sep 17 00:00:00 2001 From: Andrew Gizas Date: Tue, 9 Jan 2024 16:25:55 +0200 Subject: [PATCH 19/31] Update kubernetes/eventhandler.go Co-authored-by: Giuseppe Santoro --- kubernetes/eventhandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index bda1536744..296b77255a 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -265,7 +265,7 @@ func (*nodePodUpdater) OnAdd(interface{}) {} // namespace they will generate their own delete events. func (*nodePodUpdater) OnDelete(interface{}) {} -// isEqualMetadata receives labels or annotations maps and checks their equality. Returns True if equal, False if there is a diffrence +// isEqualMetadata receives labels or annotations maps and checks their equality. Returns True if equal, False if there is a difference func isEqualMetadata(newmetadata, oldmetadata map[string]string) bool { check := reflect.DeepEqual(newmetadata, oldmetadata) return check From 66f616c494535ba2d49632ac58e8b33f6a651efd Mon Sep 17 00:00:00 2001 From: Andrew Gizas Date: Tue, 9 Jan 2024 16:26:59 +0200 Subject: [PATCH 20/31] Update kubernetes/watcher.go Co-authored-by: Giuseppe Santoro --- kubernetes/watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kubernetes/watcher.go b/kubernetes/watcher.go index cd786a42ac..fe8b7ee922 100644 --- a/kubernetes/watcher.go +++ b/kubernetes/watcher.go @@ -232,7 +232,7 @@ func (w *watcher) enqueue(obj interface{}, state string) { // oldobjectreturn returns the old version of cache objects before change on update events func (w *watcher) oldobjectreturn(o interface{}) { if old, ok := o.(runtime.Object); !ok { - utilruntime.HandleError(fmt.Errorf("expected object in cache got %#v", o)) + utilruntime.HandleError(fmt.Errorf("expected object in cache got %#v", o)) } else { w.oldobject = old } From 16b0fca86b19f8f8f108c219f43f1024687da2ca Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Tue, 9 Jan 2024 16:41:50 +0200 Subject: [PATCH 21/31] renaming functions and providinng camelCase variables --- kubernetes/eventhandler.go | 41 +++++++++++++++++++------------------- kubernetes/watcher.go | 9 +++++---- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index 863590ac29..8dd5d82885 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -138,9 +138,9 @@ type podUpdaterStore interface { List() []interface{} } -// OldobjectWatcher is the interface that an object needs to implement to be +// OldObjectWatcher is the interface that an object needs to implement to be // able to use Oldobject cache event function from watcher. -type OldobjectWatcher interface { +type oldObjectWatcher interface { Oldobject() runtime.Object } @@ -148,22 +148,23 @@ type OldobjectWatcher interface { type namespacePodUpdater struct { handler podUpdaterHandlerFunc store podUpdaterStore - namespacewatcher OldobjectWatcher + namespaceWatcher oldObjectWatcher locker sync.Locker } // NewNamespacePodUpdater creates a namespacePodUpdater -func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespacewatcher OldobjectWatcher, locker sync.Locker) *namespacePodUpdater { +func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespaceWatcher oldObjectWatcher, locker sync.Locker) *namespacePodUpdater { return &namespacePodUpdater{ handler: handler, store: store, - namespacewatcher: namespacewatcher, + namespaceWatcher: namespaceWatcher, locker: locker, } } // OnUpdate handles update events on namespaces. func (n *namespacePodUpdater) OnUpdate(obj interface{}) { + ns, ok := obj.(*Namespace) if !ok { return @@ -178,12 +179,12 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) { n.locker.Lock() defer n.locker.Unlock() } - oldobject := n.namespacewatcher.Oldobject() - cachednamespaceold, ok := oldobject.(*Namespace) + cachedObject := n.namespaceWatcher.Oldobject() + cachedNamespace, ok := cachedObject.(*Namespace) - if ok && ns.Name == cachednamespaceold.Name { - labelscheck := isEqualMetadata(ns.ObjectMeta.Labels, cachednamespaceold.ObjectMeta.Labels) - annotationscheck := isEqualMetadata(ns.ObjectMeta.Annotations, cachednamespaceold.ObjectMeta.Annotations) + if ok && ns.Name == cachedNamespace.Name { + labelscheck := isEqualMetadata(ns.ObjectMeta.Labels, cachedNamespace.ObjectMeta.Labels) + annotationscheck := isEqualMetadata(ns.ObjectMeta.Annotations, cachedNamespace.ObjectMeta.Annotations) // Only if there is a difference in Metadata labels or annotations proceed to Pod update if !labelscheck || !annotationscheck { for _, pod := range n.store.List() { @@ -209,16 +210,16 @@ func (*namespacePodUpdater) OnDelete(interface{}) {} type nodePodUpdater struct { handler podUpdaterHandlerFunc store podUpdaterStore - nodewatcher OldobjectWatcher + nodeWatcher oldObjectWatcher locker sync.Locker } // NewNodePodUpdater creates a nodePodUpdater -func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodewatcher OldobjectWatcher, locker sync.Locker) *nodePodUpdater { +func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodeWatcher oldObjectWatcher, locker sync.Locker) *nodePodUpdater { return &nodePodUpdater{ handler: handler, store: store, - nodewatcher: nodewatcher, + nodeWatcher: nodeWatcher, locker: locker, } } @@ -239,13 +240,13 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { n.locker.Lock() defer n.locker.Unlock() } - oldobject := n.nodewatcher.Oldobject() - cachednodeold, ok := oldobject.(*Node) + cachedObject := n.nodeWatcher.Oldobject() + cachedNode, ok := cachedObject.(*Node) - if ok && node.Name == cachednodeold.Name { - labelscheck := isEqualMetadata(node.ObjectMeta.Labels, cachednodeold.ObjectMeta.Labels) - annotationscheck := isEqualMetadata(node.ObjectMeta.Annotations, cachednodeold.ObjectMeta.Annotations) - // Only if there is a diffrence in Metadata labels or annotations proceed to Pod update + if ok && node.Name == cachedNode.Name { + labelscheck := isEqualMetadata(node.ObjectMeta.Labels, cachedNode.ObjectMeta.Labels) + annotationscheck := isEqualMetadata(node.ObjectMeta.Annotations, cachedNode.ObjectMeta.Annotations) + // Only if there is a difference in Metadata labels or annotations proceed to Pod update if !labelscheck || !annotationscheck { for _, pod := range n.store.List() { pod, ok := pod.(*Pod) @@ -265,7 +266,7 @@ func (*nodePodUpdater) OnAdd(interface{}) {} // namespace they will generate their own delete events. func (*nodePodUpdater) OnDelete(interface{}) {} -// isEqualMetadata receives labels or annotations maps and checks their equality. Returns True if equal, False if there is a diffrence +// isEqualMetadata receives labels or annotations maps and checks their equality. Returns True if equal, False if there is a difference func isEqualMetadata(newmetadata, oldmetadata map[string]string) bool { check := reflect.DeepEqual(newmetadata, oldmetadata) return check diff --git a/kubernetes/watcher.go b/kubernetes/watcher.go index cd786a42ac..f84c2596ec 100644 --- a/kubernetes/watcher.go +++ b/kubernetes/watcher.go @@ -162,7 +162,7 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource // state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update. w.enqueue(n, add) } - w.oldobjectreturn(o) + w.cacheObject(o) }, }) @@ -229,10 +229,11 @@ func (w *watcher) enqueue(obj interface{}, state string) { w.queue.Add(&item{key, obj, state}) } -// oldobjectreturn returns the old version of cache objects before change on update events -func (w *watcher) oldobjectreturn(o interface{}) { +// cacheObject updates watcher with the old version of cache objects before change during update events +func (w *watcher) cacheObject(o interface{}) { if old, ok := o.(runtime.Object); !ok { - utilruntime.HandleError(fmt.Errorf("expected object in cache got %#v", o)) + w.logger.Debugf("Old Object was not retrieved from cache: %v", o) + w.oldobject = nil } else { w.oldobject = old } From 05ed573c93341b59a747893ae4c5413950672955 Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Tue, 9 Jan 2024 16:48:26 +0200 Subject: [PATCH 22/31] reflect.DeepEqual added --- kubernetes/eventhandler.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index 8dd5d82885..2700742ee0 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -183,8 +183,8 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) { cachedNamespace, ok := cachedObject.(*Namespace) if ok && ns.Name == cachedNamespace.Name { - labelscheck := isEqualMetadata(ns.ObjectMeta.Labels, cachedNamespace.ObjectMeta.Labels) - annotationscheck := isEqualMetadata(ns.ObjectMeta.Annotations, cachedNamespace.ObjectMeta.Annotations) + labelscheck := reflect.DeepEqual(ns.ObjectMeta.Labels, cachedNamespace.ObjectMeta.Labels) + annotationscheck := reflect.DeepEqual(ns.ObjectMeta.Annotations, cachedNamespace.ObjectMeta.Annotations) // Only if there is a difference in Metadata labels or annotations proceed to Pod update if !labelscheck || !annotationscheck { for _, pod := range n.store.List() { @@ -244,8 +244,8 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { cachedNode, ok := cachedObject.(*Node) if ok && node.Name == cachedNode.Name { - labelscheck := isEqualMetadata(node.ObjectMeta.Labels, cachedNode.ObjectMeta.Labels) - annotationscheck := isEqualMetadata(node.ObjectMeta.Annotations, cachedNode.ObjectMeta.Annotations) + labelscheck := reflect.DeepEqual(node.ObjectMeta.Labels, cachedNode.ObjectMeta.Labels) + annotationscheck := reflect.DeepEqual(node.ObjectMeta.Annotations, cachedNode.ObjectMeta.Annotations) // Only if there is a difference in Metadata labels or annotations proceed to Pod update if !labelscheck || !annotationscheck { for _, pod := range n.store.List() { @@ -265,9 +265,3 @@ func (*nodePodUpdater) OnAdd(interface{}) {} // OnDelete handles delete events on namespaces. Nothing to do, if pods are deleted from this // namespace they will generate their own delete events. func (*nodePodUpdater) OnDelete(interface{}) {} - -// isEqualMetadata receives labels or annotations maps and checks their equality. Returns True if equal, False if there is a difference -func isEqualMetadata(newmetadata, oldmetadata map[string]string) bool { - check := reflect.DeepEqual(newmetadata, oldmetadata) - return check -} From 8ad0d7ce665a2f80b4c773c5d4ea354099eb8d9f Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Tue, 9 Jan 2024 17:05:05 +0200 Subject: [PATCH 23/31] removing extra lines --- kubernetes/eventhandler.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index 2700742ee0..e35c3b0158 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -164,12 +164,10 @@ func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore // OnUpdate handles update events on namespaces. func (n *namespacePodUpdater) OnUpdate(obj interface{}) { - ns, ok := obj.(*Namespace) if !ok { return } - // n.store.List() returns a snapshot at this point. If a delete is received // from the main watcher, this loop may generate an update event after the // delete is processed, leaving configurations that would never be deleted. @@ -230,7 +228,6 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { if !ok { return } - // n.store.List() returns a snapshot at this point. If a delete is received // from the main watcher, this loop may generate an update event after the // delete is processed, leaving configurations that would never be deleted. From a63bbf309f6853da3c2e2f8da5931d922437fe01 Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Wed, 10 Jan 2024 13:29:24 +0200 Subject: [PATCH 24/31] replacing oldobject with cachedobject --- kubernetes/eventhandler.go | 20 ++++++++-------- kubernetes/watcher.go | 48 +++++++++++++++++++------------------- 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index e35c3b0158..0d866d0670 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -138,22 +138,22 @@ type podUpdaterStore interface { List() []interface{} } -// OldObjectWatcher is the interface that an object needs to implement to be -// able to use Oldobject cache event function from watcher. -type oldObjectWatcher interface { - Oldobject() runtime.Object +// CachedObjectWatcher is the interface that an object needs to implement to be +// able to use CachedObject cache event function from watcher. +type cachedObjectWatcher interface { + CachedObject() runtime.Object } // namespacePodUpdater notifies updates on pods when their namespaces are updated. type namespacePodUpdater struct { handler podUpdaterHandlerFunc store podUpdaterStore - namespaceWatcher oldObjectWatcher + namespaceWatcher cachedObjectWatcher locker sync.Locker } // NewNamespacePodUpdater creates a namespacePodUpdater -func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespaceWatcher oldObjectWatcher, locker sync.Locker) *namespacePodUpdater { +func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespaceWatcher cachedObjectWatcher, locker sync.Locker) *namespacePodUpdater { return &namespacePodUpdater{ handler: handler, store: store, @@ -177,7 +177,7 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) { n.locker.Lock() defer n.locker.Unlock() } - cachedObject := n.namespaceWatcher.Oldobject() + cachedObject := n.namespaceWatcher.CachedObject() cachedNamespace, ok := cachedObject.(*Namespace) if ok && ns.Name == cachedNamespace.Name { @@ -208,12 +208,12 @@ func (*namespacePodUpdater) OnDelete(interface{}) {} type nodePodUpdater struct { handler podUpdaterHandlerFunc store podUpdaterStore - nodeWatcher oldObjectWatcher + nodeWatcher cachedObjectWatcher locker sync.Locker } // NewNodePodUpdater creates a nodePodUpdater -func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodeWatcher oldObjectWatcher, locker sync.Locker) *nodePodUpdater { +func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodeWatcher cachedObjectWatcher, locker sync.Locker) *nodePodUpdater { return &nodePodUpdater{ handler: handler, store: store, @@ -237,7 +237,7 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { n.locker.Lock() defer n.locker.Unlock() } - cachedObject := n.nodeWatcher.Oldobject() + cachedObject := n.nodeWatcher.CachedObject() cachedNode, ok := cachedObject.(*Node) if ok && node.Name == cachedNode.Name { diff --git a/kubernetes/watcher.go b/kubernetes/watcher.go index e5f7d7fc30..ac16f4ad62 100644 --- a/kubernetes/watcher.go +++ b/kubernetes/watcher.go @@ -60,8 +60,8 @@ type Watcher interface { // Client returns the kubernetes client object used by the watcher Client() kubernetes.Interface - // Oldobject returns the old object before change during the last updated event - Oldobject() runtime.Object + // CachedObject returns the old object before change during the last updated event + CachedObject() runtime.Object } // WatchOptions controls watch behaviors @@ -86,15 +86,15 @@ type item struct { } type watcher struct { - client kubernetes.Interface - informer cache.SharedInformer - store cache.Store - queue workqueue.Interface - ctx context.Context - stop context.CancelFunc - handler ResourceEventHandler - logger *logp.Logger - oldobject runtime.Object + client kubernetes.Interface + informer cache.SharedInformer + store cache.Store + queue workqueue.Interface + ctx context.Context + stop context.CancelFunc + handler ResourceEventHandler + logger *logp.Logger + cachedObject runtime.Object } // NewWatcher initializes the watcher client to provide a events handler for @@ -110,7 +110,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { var store cache.Store var queue workqueue.Interface - var oldobject runtime.Object + var cachedObject runtime.Object informer, _, err := NewInformer(client, resource, opts, indexers) if err != nil { return nil, err @@ -131,15 +131,15 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource ctx, cancel := context.WithCancel(context.TODO()) w := &watcher{ - client: client, - informer: informer, - store: store, - queue: queue, - ctx: ctx, - oldobject: oldobject, - stop: cancel, - logger: logp.NewLogger("kubernetes"), - handler: NoOpEventHandlerFuncs{}, + client: client, + informer: informer, + store: store, + queue: queue, + ctx: ctx, + cachedObject: cachedObject, + stop: cancel, + logger: logp.NewLogger("kubernetes"), + handler: NoOpEventHandlerFuncs{}, } w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -186,8 +186,8 @@ func (w *watcher) Client() kubernetes.Interface { } // Oldbject returns the old object in cache during the last updated event -func (w *watcher) Oldobject() runtime.Object { - return w.oldobject +func (w *watcher) CachedObject() runtime.Object { + return w.cachedObject } // Start watching pods @@ -234,7 +234,7 @@ func (w *watcher) cacheObject(o interface{}) { if old, ok := o.(runtime.Object); !ok { utilruntime.HandleError(fmt.Errorf("expected object in cache got %#v", o)) } else { - w.oldobject = old + w.cachedObject = old } } From 51bbd1b0c800a361bc8b18e8d362049ae98d3c1a Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Thu, 11 Jan 2024 15:08:52 +0200 Subject: [PATCH 25/31] keeping only watcher struct --- kubernetes/eventhandler.go | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index 0d866d0670..d542286757 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -20,8 +20,6 @@ package kubernetes import ( "reflect" "sync" - - "k8s.io/apimachinery/pkg/runtime" ) // ResourceEventHandler can handle notifications for events that happen to a @@ -138,22 +136,16 @@ type podUpdaterStore interface { List() []interface{} } -// CachedObjectWatcher is the interface that an object needs to implement to be -// able to use CachedObject cache event function from watcher. -type cachedObjectWatcher interface { - CachedObject() runtime.Object -} - // namespacePodUpdater notifies updates on pods when their namespaces are updated. type namespacePodUpdater struct { handler podUpdaterHandlerFunc store podUpdaterStore - namespaceWatcher cachedObjectWatcher + namespaceWatcher Watcher locker sync.Locker } // NewNamespacePodUpdater creates a namespacePodUpdater -func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespaceWatcher cachedObjectWatcher, locker sync.Locker) *namespacePodUpdater { +func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespaceWatcher Watcher, locker sync.Locker) *namespacePodUpdater { return &namespacePodUpdater{ handler: handler, store: store, @@ -177,6 +169,7 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) { n.locker.Lock() defer n.locker.Unlock() } + cachedObject := n.namespaceWatcher.CachedObject() cachedNamespace, ok := cachedObject.(*Namespace) @@ -208,12 +201,12 @@ func (*namespacePodUpdater) OnDelete(interface{}) {} type nodePodUpdater struct { handler podUpdaterHandlerFunc store podUpdaterStore - nodeWatcher cachedObjectWatcher + nodeWatcher Watcher locker sync.Locker } // NewNodePodUpdater creates a nodePodUpdater -func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodeWatcher cachedObjectWatcher, locker sync.Locker) *nodePodUpdater { +func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodeWatcher Watcher, locker sync.Locker) *nodePodUpdater { return &nodePodUpdater{ handler: handler, store: store, From 5aac01830bdceb485478a66c0a356e7db84994f9 Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Thu, 11 Jan 2024 17:17:37 +0200 Subject: [PATCH 26/31] checks for namespace and node --- kubernetes/watcher.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/kubernetes/watcher.go b/kubernetes/watcher.go index ac16f4ad62..ebc35606ba 100644 --- a/kubernetes/watcher.go +++ b/kubernetes/watcher.go @@ -20,6 +20,8 @@ package kubernetes import ( "context" "fmt" + "reflect" + "strings" "time" "k8s.io/apimachinery/pkg/api/meta" @@ -162,7 +164,12 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource // state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update. w.enqueue(n, add) } - w.cacheObject(o) + + //We check the type of resource and only if it is namespace or node return the cacheObject + stringresource := reflect.TypeOf(resource).String() + if strings.Contains(strings.ToLower(stringresource), "namespace") || strings.Contains(strings.ToLower(stringresource), "node") { + w.cacheObject(o) + } }, }) From b89e4908dae57717094dfe415c2af666f9225537 Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Fri, 12 Jan 2024 11:04:16 +0200 Subject: [PATCH 27/31] checks for namespace and node --- kubernetes/watcher.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/kubernetes/watcher.go b/kubernetes/watcher.go index ebc35606ba..b75177987c 100644 --- a/kubernetes/watcher.go +++ b/kubernetes/watcher.go @@ -20,8 +20,6 @@ package kubernetes import ( "context" "fmt" - "reflect" - "strings" "time" "k8s.io/apimachinery/pkg/api/meta" @@ -166,11 +164,12 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource } //We check the type of resource and only if it is namespace or node return the cacheObject - stringresource := reflect.TypeOf(resource).String() - if strings.Contains(strings.ToLower(stringresource), "namespace") || strings.Contains(strings.ToLower(stringresource), "node") { + switch resource.(type) { + case *Namespace: + w.cacheObject(o) + case *Node: w.cacheObject(o) } - }, }) From c031e0fe6c91b3c45c4e0faf38badd8931dc816e Mon Sep 17 00:00:00 2001 From: Andrew Gizas Date: Fri, 12 Jan 2024 13:15:21 +0200 Subject: [PATCH 28/31] Update kubernetes/watcher.go Co-authored-by: Tetiana Kravchenko --- kubernetes/watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kubernetes/watcher.go b/kubernetes/watcher.go index b75177987c..c18a6f4682 100644 --- a/kubernetes/watcher.go +++ b/kubernetes/watcher.go @@ -191,7 +191,7 @@ func (w *watcher) Client() kubernetes.Interface { return w.client } -// Oldbject returns the old object in cache during the last updated event +// CachedObject returns the old object in cache during the last updated event func (w *watcher) CachedObject() runtime.Object { return w.cachedObject } From 85a74fed88f83782152cd0cab5fd0c9c3cbe8e2b Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Fri, 12 Jan 2024 13:38:21 +0200 Subject: [PATCH 29/31] udpating changelog --- CHANGELOG.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e22f5b156e..0b2d0e38b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,3 +34,13 @@ This project adheres to [Semantic Versioning](http://semver.org/). [0.6.2]: https://github.com/elastic/elastic-agent-autodiscover/compare/v0.6.1...v0.6.2 + + +## [0.6.6] + +### Changed + +- Update NewNodePodUpdater and NewNamespacePodUpdater functions to conditionally check and update kubernetes metadata enrichment of pods + + +[0.6.6]: https://github.com/elastic/elastic-agent-autodiscover/compare/v0.6.2...v0.6.6 From 2862b18265a033d79506bb4d6b5703050c7f5672 Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Fri, 12 Jan 2024 14:02:50 +0200 Subject: [PATCH 30/31] udpating changelog --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b2d0e38b4..1a3714ccc0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,11 +36,11 @@ This project adheres to [Semantic Versioning](http://semver.org/). [0.6.2]: https://github.com/elastic/elastic-agent-autodiscover/compare/v0.6.1...v0.6.2 -## [0.6.6] +## [0.6.7] ### Changed - Update NewNodePodUpdater and NewNamespacePodUpdater functions to conditionally check and update kubernetes metadata enrichment of pods -[0.6.6]: https://github.com/elastic/elastic-agent-autodiscover/compare/v0.6.2...v0.6.6 +[0.6.6]: https://github.com/elastic/elastic-agent-autodiscover/compare/v0.6.2...v0.6.7 From 7ca8875bdb45b4c0b63d04d26ca4394da9b50f1b Mon Sep 17 00:00:00 2001 From: Andreas Gkizas Date: Fri, 12 Jan 2024 14:03:08 +0200 Subject: [PATCH 31/31] udpating changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a3714ccc0..e378d2ea9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,4 +43,4 @@ This project adheres to [Semantic Versioning](http://semver.org/). - Update NewNodePodUpdater and NewNamespacePodUpdater functions to conditionally check and update kubernetes metadata enrichment of pods -[0.6.6]: https://github.com/elastic/elastic-agent-autodiscover/compare/v0.6.2...v0.6.7 +[0.6.7]: https://github.com/elastic/elastic-agent-autodiscover/compare/v0.6.2...v0.6.7