diff --git a/CHANGELOG.md b/CHANGELOG.md index e22f5b156..e378d2ea9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,3 +34,13 @@ This project adheres to [Semantic Versioning](http://semver.org/). [0.6.2]: https://github.com/elastic/elastic-agent-autodiscover/compare/v0.6.1...v0.6.2 + + +## [0.6.7] + +### Changed + +- Update NewNodePodUpdater and NewNamespacePodUpdater functions to conditionally check and update kubernetes metadata enrichment of pods + + +[0.6.7]: https://github.com/elastic/elastic-agent-autodiscover/compare/v0.6.2...v0.6.7 diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index acbc51394..d54228675 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -18,23 +18,25 @@ package kubernetes import ( + "reflect" "sync" ) // ResourceEventHandler can handle notifications for events that happen to a // resource. The events are informational only, so you can't return an // error. -// * OnAdd is called when an object is added. -// * OnUpdate is called when an object is modified. Note that oldObj is the -// last known state of the object-- it is possible that several changes -// were combined together, so you can't use this to see every single -// change. OnUpdate is also called when a re-list happens, and it will -// get called even if nothing changed. This is useful for periodically -// evaluating or syncing something. -// * OnDelete will get the final state of the item if it is known, otherwise -// it will get an object of type DeletedFinalStateUnknown. This can -// happen if the watch is closed and misses the delete event and we don't -// notice the deletion until the subsequent re-list. +// - OnAdd is called when an object is added. +// - OnUpdate is called when an object is modified. Note that oldObj is the +// last known state of the object-- it is possible that several changes +// were combined together, so you can't use this to see every single +// change. OnUpdate is also called when a re-list happens, and it will +// get called even if nothing changed. This is useful for periodically +// evaluating or syncing something. +// - OnDelete will get the final state of the item if it is known, otherwise +// it will get an object of type DeletedFinalStateUnknown. This can +// happen if the watch is closed and misses the delete event and we don't +// notice the deletion until the subsequent re-list. +// // idea: allow the On* methods to return an error so that the RateLimited WorkQueue // idea: can requeue the failed event processing. type ResourceEventHandler interface { @@ -136,17 +138,19 @@ type podUpdaterStore interface { // namespacePodUpdater notifies updates on pods when their namespaces are updated. type namespacePodUpdater struct { - handler podUpdaterHandlerFunc - store podUpdaterStore - locker sync.Locker + handler podUpdaterHandlerFunc + store podUpdaterStore + namespaceWatcher Watcher + locker sync.Locker } // NewNamespacePodUpdater creates a namespacePodUpdater -func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *namespacePodUpdater { +func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespaceWatcher Watcher, locker sync.Locker) *namespacePodUpdater { return &namespacePodUpdater{ - handler: handler, - store: store, - locker: locker, + handler: handler, + store: store, + namespaceWatcher: namespaceWatcher, + locker: locker, } } @@ -156,7 +160,6 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) { if !ok { return } - // n.store.List() returns a snapshot at this point. If a delete is received // from the main watcher, this loop may generate an update event after the // delete is processed, leaving configurations that would never be deleted. @@ -166,12 +169,24 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) { n.locker.Lock() defer n.locker.Unlock() } - for _, pod := range n.store.List() { - pod, ok := pod.(*Pod) - if ok && pod.Namespace == ns.Name { - n.handler(pod) + + cachedObject := n.namespaceWatcher.CachedObject() + cachedNamespace, ok := cachedObject.(*Namespace) + + if ok && ns.Name == cachedNamespace.Name { + labelscheck := reflect.DeepEqual(ns.ObjectMeta.Labels, cachedNamespace.ObjectMeta.Labels) + annotationscheck := reflect.DeepEqual(ns.ObjectMeta.Annotations, cachedNamespace.ObjectMeta.Annotations) + // Only if there is a difference in Metadata labels or annotations proceed to Pod update + if !labelscheck || !annotationscheck { + for _, pod := range n.store.List() { + pod, ok := pod.(*Pod) + if ok && pod.Namespace == ns.Name { + n.handler(pod) + } + } } } + } // OnAdd handles add events on namespaces. Nothing to do, if pods are added to this @@ -184,17 +199,19 @@ func (*namespacePodUpdater) OnDelete(interface{}) {} // nodePodUpdater notifies updates on pods when their nodes are updated. type nodePodUpdater struct { - handler podUpdaterHandlerFunc - store podUpdaterStore - locker sync.Locker + handler podUpdaterHandlerFunc + store podUpdaterStore + nodeWatcher Watcher + locker sync.Locker } // NewNodePodUpdater creates a nodePodUpdater -func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *nodePodUpdater { +func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodeWatcher Watcher, locker sync.Locker) *nodePodUpdater { return &nodePodUpdater{ - handler: handler, - store: store, - locker: locker, + handler: handler, + store: store, + nodeWatcher: nodeWatcher, + locker: locker, } } @@ -204,7 +221,6 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { if !ok { return } - // n.store.List() returns a snapshot at this point. If a delete is received // from the main watcher, this loop may generate an update event after the // delete is processed, leaving configurations that would never be deleted. @@ -214,10 +230,20 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { n.locker.Lock() defer n.locker.Unlock() } - for _, pod := range n.store.List() { - pod, ok := pod.(*Pod) - if ok && pod.Spec.NodeName == node.Name { - n.handler(pod) + cachedObject := n.nodeWatcher.CachedObject() + cachedNode, ok := cachedObject.(*Node) + + if ok && node.Name == cachedNode.Name { + labelscheck := reflect.DeepEqual(node.ObjectMeta.Labels, cachedNode.ObjectMeta.Labels) + annotationscheck := reflect.DeepEqual(node.ObjectMeta.Annotations, cachedNode.ObjectMeta.Annotations) + // Only if there is a difference in Metadata labels or annotations proceed to Pod update + if !labelscheck || !annotationscheck { + for _, pod := range n.store.List() { + pod, ok := pod.(*Pod) + if ok && pod.Spec.NodeName == node.Name { + n.handler(pod) + } + } } } } diff --git a/kubernetes/metadata/pod.go b/kubernetes/metadata/pod.go index 3a5f3d539..7dfbb6a85 100644 --- a/kubernetes/metadata/pod.go +++ b/kubernetes/metadata/pod.go @@ -74,6 +74,7 @@ func (p *pod) Generate(obj kubernetes.Resource, opts ...FieldOptions) mapstr.M { "kubernetes": p.GenerateK8s(obj, opts...), } meta.DeepUpdate(ecsFields) + return meta } diff --git a/kubernetes/watcher.go b/kubernetes/watcher.go index 35b308e09..c18a6f468 100644 --- a/kubernetes/watcher.go +++ b/kubernetes/watcher.go @@ -59,6 +59,9 @@ type Watcher interface { // Client returns the kubernetes client object used by the watcher Client() kubernetes.Interface + + // CachedObject returns the old object before change during the last updated event + CachedObject() runtime.Object } // WatchOptions controls watch behaviors @@ -83,14 +86,15 @@ type item struct { } type watcher struct { - client kubernetes.Interface - informer cache.SharedInformer - store cache.Store - queue workqueue.Interface - ctx context.Context - stop context.CancelFunc - handler ResourceEventHandler - logger *logp.Logger + client kubernetes.Interface + informer cache.SharedInformer + store cache.Store + queue workqueue.Interface + ctx context.Context + stop context.CancelFunc + handler ResourceEventHandler + logger *logp.Logger + cachedObject runtime.Object } // NewWatcher initializes the watcher client to provide a events handler for @@ -106,7 +110,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { var store cache.Store var queue workqueue.Interface - + var cachedObject runtime.Object informer, _, err := NewInformer(client, resource, opts, indexers) if err != nil { return nil, err @@ -127,14 +131,15 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource ctx, cancel := context.WithCancel(context.TODO()) w := &watcher{ - client: client, - informer: informer, - store: store, - queue: queue, - ctx: ctx, - stop: cancel, - logger: logp.NewLogger("kubernetes"), - handler: NoOpEventHandlerFuncs{}, + client: client, + informer: informer, + store: store, + queue: queue, + ctx: ctx, + cachedObject: cachedObject, + stop: cancel, + logger: logp.NewLogger("kubernetes"), + handler: NoOpEventHandlerFuncs{}, } w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -157,6 +162,14 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource // state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update. w.enqueue(n, add) } + + //We check the type of resource and only if it is namespace or node return the cacheObject + switch resource.(type) { + case *Namespace: + w.cacheObject(o) + case *Node: + w.cacheObject(o) + } }, }) @@ -178,6 +191,11 @@ func (w *watcher) Client() kubernetes.Interface { return w.client } +// CachedObject returns the old object in cache during the last updated event +func (w *watcher) CachedObject() runtime.Object { + return w.cachedObject +} + // Start watching pods func (w *watcher) Start() error { go w.informer.Run(w.ctx.Done()) @@ -217,6 +235,15 @@ func (w *watcher) enqueue(obj interface{}, state string) { w.queue.Add(&item{key, obj, state}) } +// cacheObject updates watcher with the old version of cache objects before change during update events +func (w *watcher) cacheObject(o interface{}) { + if old, ok := o.(runtime.Object); !ok { + utilruntime.HandleError(fmt.Errorf("expected object in cache got %#v", o)) + } else { + w.cachedObject = old + } +} + // process gets the top of the work queue and processes the object that is received. func (w *watcher) process(_ context.Context) bool { obj, quit := w.queue.Get()