Skip to content

Commit

Permalink
Conditions on podupdater functions of kubernetes autodiscovery (#71)
Browse files Browse the repository at this point in the history
- Enhancement

## Proposed commit message

- WHAT: Update the PodUpdater fucntion with additonal checks before
actaully triggering Pod watcher restarts
- WHY: There where node and namespace events that dont always inlcude
Metadata changes. We want only update events that change labels and
annotations of node and namespace to trigger pod updates. This leads to
bettwr management of watchers


## How to test this PR locally

See info elastic/beats#37338

## Related issues

- Relates elastic/beats#37338
- Relates elastic/beats#37431

---------

Co-authored-by: Tetiana Kravchenko <[email protected]>
Co-authored-by: Giuseppe Santoro <[email protected]>
  • Loading branch information
3 people authored Jan 12, 2024
1 parent 4f4898d commit 018039d
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 52 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,13 @@ This project adheres to [Semantic Versioning](http://semver.org/).


[0.6.2]: https://github.com/elastic/elastic-agent-autodiscover/compare/v0.6.1...v0.6.2


## [0.6.7]

### Changed

- Update NewNodePodUpdater and NewNamespacePodUpdater functions to conditionally check and update kubernetes metadata enrichment of pods


[0.6.7]: https://github.com/elastic/elastic-agent-autodiscover/compare/v0.6.2...v0.6.7
96 changes: 61 additions & 35 deletions kubernetes/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,25 @@
package kubernetes

import (
"reflect"
"sync"
)

// ResourceEventHandler can handle notifications for events that happen to a
// resource. The events are informational only, so you can't return an
// error.
// * OnAdd is called when an object is added.
// * OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
// * OnDelete will get the final state of the item if it is known, otherwise
// it will get an object of type DeletedFinalStateUnknown. This can
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
// - OnAdd is called when an object is added.
// - OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
// - OnDelete will get the final state of the item if it is known, otherwise
// it will get an object of type DeletedFinalStateUnknown. This can
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
//
// idea: allow the On* methods to return an error so that the RateLimited WorkQueue
// idea: can requeue the failed event processing.
type ResourceEventHandler interface {
Expand Down Expand Up @@ -136,17 +138,19 @@ type podUpdaterStore interface {

// namespacePodUpdater notifies updates on pods when their namespaces are updated.
type namespacePodUpdater struct {
handler podUpdaterHandlerFunc
store podUpdaterStore
locker sync.Locker
handler podUpdaterHandlerFunc
store podUpdaterStore
namespaceWatcher Watcher
locker sync.Locker
}

// NewNamespacePodUpdater creates a namespacePodUpdater
func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *namespacePodUpdater {
func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespaceWatcher Watcher, locker sync.Locker) *namespacePodUpdater {
return &namespacePodUpdater{
handler: handler,
store: store,
locker: locker,
handler: handler,
store: store,
namespaceWatcher: namespaceWatcher,
locker: locker,
}
}

Expand All @@ -156,7 +160,6 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) {
if !ok {
return
}

// n.store.List() returns a snapshot at this point. If a delete is received
// from the main watcher, this loop may generate an update event after the
// delete is processed, leaving configurations that would never be deleted.
Expand All @@ -166,12 +169,24 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) {
n.locker.Lock()
defer n.locker.Unlock()
}
for _, pod := range n.store.List() {
pod, ok := pod.(*Pod)
if ok && pod.Namespace == ns.Name {
n.handler(pod)

cachedObject := n.namespaceWatcher.CachedObject()
cachedNamespace, ok := cachedObject.(*Namespace)

if ok && ns.Name == cachedNamespace.Name {
labelscheck := reflect.DeepEqual(ns.ObjectMeta.Labels, cachedNamespace.ObjectMeta.Labels)
annotationscheck := reflect.DeepEqual(ns.ObjectMeta.Annotations, cachedNamespace.ObjectMeta.Annotations)
// Only if there is a difference in Metadata labels or annotations proceed to Pod update
if !labelscheck || !annotationscheck {
for _, pod := range n.store.List() {
pod, ok := pod.(*Pod)
if ok && pod.Namespace == ns.Name {
n.handler(pod)
}
}
}
}

}

// OnAdd handles add events on namespaces. Nothing to do, if pods are added to this
Expand All @@ -184,17 +199,19 @@ func (*namespacePodUpdater) OnDelete(interface{}) {}

// nodePodUpdater notifies updates on pods when their nodes are updated.
type nodePodUpdater struct {
handler podUpdaterHandlerFunc
store podUpdaterStore
locker sync.Locker
handler podUpdaterHandlerFunc
store podUpdaterStore
nodeWatcher Watcher
locker sync.Locker
}

// NewNodePodUpdater creates a nodePodUpdater
func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *nodePodUpdater {
func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodeWatcher Watcher, locker sync.Locker) *nodePodUpdater {
return &nodePodUpdater{
handler: handler,
store: store,
locker: locker,
handler: handler,
store: store,
nodeWatcher: nodeWatcher,
locker: locker,
}
}

Expand All @@ -204,7 +221,6 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) {
if !ok {
return
}

// n.store.List() returns a snapshot at this point. If a delete is received
// from the main watcher, this loop may generate an update event after the
// delete is processed, leaving configurations that would never be deleted.
Expand All @@ -214,10 +230,20 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) {
n.locker.Lock()
defer n.locker.Unlock()
}
for _, pod := range n.store.List() {
pod, ok := pod.(*Pod)
if ok && pod.Spec.NodeName == node.Name {
n.handler(pod)
cachedObject := n.nodeWatcher.CachedObject()
cachedNode, ok := cachedObject.(*Node)

if ok && node.Name == cachedNode.Name {
labelscheck := reflect.DeepEqual(node.ObjectMeta.Labels, cachedNode.ObjectMeta.Labels)
annotationscheck := reflect.DeepEqual(node.ObjectMeta.Annotations, cachedNode.ObjectMeta.Annotations)
// Only if there is a difference in Metadata labels or annotations proceed to Pod update
if !labelscheck || !annotationscheck {
for _, pod := range n.store.List() {
pod, ok := pod.(*Pod)
if ok && pod.Spec.NodeName == node.Name {
n.handler(pod)
}
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions kubernetes/metadata/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (p *pod) Generate(obj kubernetes.Resource, opts ...FieldOptions) mapstr.M {
"kubernetes": p.GenerateK8s(obj, opts...),
}
meta.DeepUpdate(ecsFields)

return meta
}

Expand Down
61 changes: 44 additions & 17 deletions kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type Watcher interface {

// Client returns the kubernetes client object used by the watcher
Client() kubernetes.Interface

// CachedObject returns the old object before change during the last updated event
CachedObject() runtime.Object
}

// WatchOptions controls watch behaviors
Expand All @@ -83,14 +86,15 @@ type item struct {
}

type watcher struct {
client kubernetes.Interface
informer cache.SharedInformer
store cache.Store
queue workqueue.Interface
ctx context.Context
stop context.CancelFunc
handler ResourceEventHandler
logger *logp.Logger
client kubernetes.Interface
informer cache.SharedInformer
store cache.Store
queue workqueue.Interface
ctx context.Context
stop context.CancelFunc
handler ResourceEventHandler
logger *logp.Logger
cachedObject runtime.Object
}

// NewWatcher initializes the watcher client to provide a events handler for
Expand All @@ -106,7 +110,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption
func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) {
var store cache.Store
var queue workqueue.Interface

var cachedObject runtime.Object
informer, _, err := NewInformer(client, resource, opts, indexers)
if err != nil {
return nil, err
Expand All @@ -127,14 +131,15 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource

ctx, cancel := context.WithCancel(context.TODO())
w := &watcher{
client: client,
informer: informer,
store: store,
queue: queue,
ctx: ctx,
stop: cancel,
logger: logp.NewLogger("kubernetes"),
handler: NoOpEventHandlerFuncs{},
client: client,
informer: informer,
store: store,
queue: queue,
ctx: ctx,
cachedObject: cachedObject,
stop: cancel,
logger: logp.NewLogger("kubernetes"),
handler: NoOpEventHandlerFuncs{},
}

w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand All @@ -157,6 +162,14 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource
// state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update.
w.enqueue(n, add)
}

//We check the type of resource and only if it is namespace or node return the cacheObject
switch resource.(type) {
case *Namespace:
w.cacheObject(o)
case *Node:
w.cacheObject(o)
}
},
})

Expand All @@ -178,6 +191,11 @@ func (w *watcher) Client() kubernetes.Interface {
return w.client
}

// CachedObject returns the old object in cache during the last updated event
func (w *watcher) CachedObject() runtime.Object {
return w.cachedObject
}

// Start watching pods
func (w *watcher) Start() error {
go w.informer.Run(w.ctx.Done())
Expand Down Expand Up @@ -217,6 +235,15 @@ func (w *watcher) enqueue(obj interface{}, state string) {
w.queue.Add(&item{key, obj, state})
}

// cacheObject updates watcher with the old version of cache objects before change during update events
func (w *watcher) cacheObject(o interface{}) {
if old, ok := o.(runtime.Object); !ok {
utilruntime.HandleError(fmt.Errorf("expected object in cache got %#v", o))
} else {
w.cachedObject = old
}
}

// process gets the top of the work queue and processes the object that is received.
func (w *watcher) process(_ context.Context) bool {
obj, quit := w.queue.Get()
Expand Down

0 comments on commit 018039d

Please sign in to comment.