Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conditions on podupdater functions of kubernetes autodiscovery #71

Merged
merged 33 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e72955a
first update for nodePodUpdater
gizas Dec 7, 2023
d8ace11
first update for nodePodUpdater with key function
gizas Dec 8, 2023
ac43d6f
first update for namespacePodUpdater
gizas Dec 8, 2023
979e3c0
updating comments for code
gizas Dec 8, 2023
0a91978
updating with delta
gizas Dec 12, 2023
853661d
fixing referneces only to one wather as an argument
gizas Dec 13, 2023
5ea29d1
removing comments
gizas Dec 13, 2023
1c86749
Update eventhandler.go - Fixing typo
gizas Dec 13, 2023
b01ddcc
Update watcher.go comment in deltaslice function
gizas Dec 13, 2023
e78177c
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
cdfab00
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
416ab77
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
a710864
updates for delta struct and comments
gizas Jan 9, 2024
e265b8e
updates for delta struct and comments
gizas Jan 9, 2024
6cb89b9
returning only old cached object
gizas Jan 9, 2024
13bff58
fixing comment after upgrade to return old object
gizas Jan 9, 2024
9ee164e
changing name of interface
gizas Jan 9, 2024
328f78d
adding check for oldobjectreturn
gizas Jan 9, 2024
2d73a98
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
76b4dce
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
66f616c
Update kubernetes/watcher.go
gizas Jan 9, 2024
16b0fca
renaming functions and providinng camelCase variables
gizas Jan 9, 2024
8629378
renaming functions and providinng camelCase variables
gizas Jan 9, 2024
05ed573
reflect.DeepEqual added
gizas Jan 9, 2024
8ad0d7c
removing extra lines
gizas Jan 9, 2024
a63bbf3
replacing oldobject with cachedobject
gizas Jan 10, 2024
51bbd1b
keeping only watcher struct
gizas Jan 11, 2024
5aac018
checks for namespace and node
gizas Jan 11, 2024
b89e490
checks for namespace and node
gizas Jan 12, 2024
c031e0f
Update kubernetes/watcher.go
gizas Jan 12, 2024
85a74fe
udpating changelog
gizas Jan 12, 2024
2862b18
udpating changelog
gizas Jan 12, 2024
7ca8875
udpating changelog
gizas Jan 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 61 additions & 35 deletions kubernetes/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,25 @@
package kubernetes

import (
"reflect"
"sync"
)

// ResourceEventHandler can handle notifications for events that happen to a
// resource. The events are informational only, so you can't return an
// error.
// * OnAdd is called when an object is added.
// * OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
// * OnDelete will get the final state of the item if it is known, otherwise
// it will get an object of type DeletedFinalStateUnknown. This can
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
// - OnAdd is called when an object is added.
// - OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
// - OnDelete will get the final state of the item if it is known, otherwise
// it will get an object of type DeletedFinalStateUnknown. This can
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
//
// idea: allow the On* methods to return an error so that the RateLimited WorkQueue
// idea: can requeue the failed event processing.
type ResourceEventHandler interface {
Expand Down Expand Up @@ -136,17 +138,19 @@ type podUpdaterStore interface {

// namespacePodUpdater notifies updates on pods when their namespaces are updated.
type namespacePodUpdater struct {
handler podUpdaterHandlerFunc
store podUpdaterStore
locker sync.Locker
handler podUpdaterHandlerFunc
store podUpdaterStore
namespaceWatcher Watcher
locker sync.Locker
}

// NewNamespacePodUpdater creates a namespacePodUpdater
func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *namespacePodUpdater {
func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespaceWatcher Watcher, locker sync.Locker) *namespacePodUpdater {
return &namespacePodUpdater{
handler: handler,
store: store,
locker: locker,
handler: handler,
store: store,
namespaceWatcher: namespaceWatcher,
locker: locker,
}
}

Expand All @@ -156,7 +160,6 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) {
if !ok {
return
}

// n.store.List() returns a snapshot at this point. If a delete is received
// from the main watcher, this loop may generate an update event after the
// delete is processed, leaving configurations that would never be deleted.
Expand All @@ -166,12 +169,24 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) {
n.locker.Lock()
defer n.locker.Unlock()
}
for _, pod := range n.store.List() {
pod, ok := pod.(*Pod)
if ok && pod.Namespace == ns.Name {
n.handler(pod)

cachedObject := n.namespaceWatcher.CachedObject()
cachedNamespace, ok := cachedObject.(*Namespace)

if ok && ns.Name == cachedNamespace.Name {
labelscheck := reflect.DeepEqual(ns.ObjectMeta.Labels, cachedNamespace.ObjectMeta.Labels)
annotationscheck := reflect.DeepEqual(ns.ObjectMeta.Annotations, cachedNamespace.ObjectMeta.Annotations)
// Only if there is a difference in Metadata labels or annotations proceed to Pod update
if !labelscheck || !annotationscheck {
for _, pod := range n.store.List() {
pod, ok := pod.(*Pod)
if ok && pod.Namespace == ns.Name {
n.handler(pod)
}
}
}
}

}

// OnAdd handles add events on namespaces. Nothing to do, if pods are added to this
Expand All @@ -184,17 +199,19 @@ func (*namespacePodUpdater) OnDelete(interface{}) {}

// nodePodUpdater notifies updates on pods when their nodes are updated.
type nodePodUpdater struct {
handler podUpdaterHandlerFunc
store podUpdaterStore
locker sync.Locker
handler podUpdaterHandlerFunc
store podUpdaterStore
nodeWatcher Watcher
locker sync.Locker
}

// NewNodePodUpdater creates a nodePodUpdater
func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *nodePodUpdater {
func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodeWatcher Watcher, locker sync.Locker) *nodePodUpdater {
return &nodePodUpdater{
handler: handler,
store: store,
locker: locker,
handler: handler,
store: store,
nodeWatcher: nodeWatcher,
locker: locker,
}
}

Expand All @@ -204,7 +221,6 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) {
if !ok {
return
}

// n.store.List() returns a snapshot at this point. If a delete is received
// from the main watcher, this loop may generate an update event after the
// delete is processed, leaving configurations that would never be deleted.
Expand All @@ -214,10 +230,20 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) {
n.locker.Lock()
defer n.locker.Unlock()
}
for _, pod := range n.store.List() {
pod, ok := pod.(*Pod)
if ok && pod.Spec.NodeName == node.Name {
n.handler(pod)
cachedObject := n.nodeWatcher.CachedObject()
cachedNode, ok := cachedObject.(*Node)

if ok && node.Name == cachedNode.Name {
labelscheck := reflect.DeepEqual(node.ObjectMeta.Labels, cachedNode.ObjectMeta.Labels)
annotationscheck := reflect.DeepEqual(node.ObjectMeta.Annotations, cachedNode.ObjectMeta.Annotations)
// Only if there is a difference in Metadata labels or annotations proceed to Pod update
if !labelscheck || !annotationscheck {
for _, pod := range n.store.List() {
pod, ok := pod.(*Pod)
if ok && pod.Spec.NodeName == node.Name {
n.handler(pod)
}
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions kubernetes/metadata/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (p *pod) Generate(obj kubernetes.Resource, opts ...FieldOptions) mapstr.M {
"kubernetes": p.GenerateK8s(obj, opts...),
}
meta.DeepUpdate(ecsFields)

return meta
}

Expand Down
62 changes: 45 additions & 17 deletions kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package kubernetes
import (
"context"
"fmt"
"reflect"
"strings"
"time"

"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -59,6 +61,9 @@ type Watcher interface {

// Client returns the kubernetes client object used by the watcher
Client() kubernetes.Interface

// CachedObject returns the old object before change during the last updated event
CachedObject() runtime.Object
}

// WatchOptions controls watch behaviors
Expand All @@ -83,14 +88,15 @@ type item struct {
}

type watcher struct {
client kubernetes.Interface
informer cache.SharedInformer
store cache.Store
queue workqueue.Interface
ctx context.Context
stop context.CancelFunc
handler ResourceEventHandler
logger *logp.Logger
client kubernetes.Interface
informer cache.SharedInformer
store cache.Store
queue workqueue.Interface
ctx context.Context
stop context.CancelFunc
handler ResourceEventHandler
logger *logp.Logger
cachedObject runtime.Object
}

// NewWatcher initializes the watcher client to provide a events handler for
Expand All @@ -106,7 +112,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption
func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) {
var store cache.Store
var queue workqueue.Interface

var cachedObject runtime.Object
informer, _, err := NewInformer(client, resource, opts, indexers)
if err != nil {
return nil, err
Expand All @@ -127,14 +133,15 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource

ctx, cancel := context.WithCancel(context.TODO())
w := &watcher{
client: client,
informer: informer,
store: store,
queue: queue,
ctx: ctx,
stop: cancel,
logger: logp.NewLogger("kubernetes"),
handler: NoOpEventHandlerFuncs{},
client: client,
informer: informer,
store: store,
queue: queue,
ctx: ctx,
cachedObject: cachedObject,
stop: cancel,
logger: logp.NewLogger("kubernetes"),
handler: NoOpEventHandlerFuncs{},
}

w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand All @@ -157,6 +164,13 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource
// state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update.
w.enqueue(n, add)
}

//We check the type of resource and only if it is namespace or node return the cacheObject
stringresource := reflect.TypeOf(resource).String()
if strings.Contains(strings.ToLower(stringresource), "namespace") || strings.Contains(strings.ToLower(stringresource), "node") {
w.cacheObject(o)
}

},
})

Expand All @@ -178,6 +192,11 @@ func (w *watcher) Client() kubernetes.Interface {
return w.client
}

// Oldbject returns the old object in cache during the last updated event
gizas marked this conversation as resolved.
Show resolved Hide resolved
func (w *watcher) CachedObject() runtime.Object {
return w.cachedObject
}

// Start watching pods
func (w *watcher) Start() error {
go w.informer.Run(w.ctx.Done())
Expand Down Expand Up @@ -217,6 +236,15 @@ func (w *watcher) enqueue(obj interface{}, state string) {
w.queue.Add(&item{key, obj, state})
}

// cacheObject updates watcher with the old version of cache objects before change during update events
func (w *watcher) cacheObject(o interface{}) {
if old, ok := o.(runtime.Object); !ok {
utilruntime.HandleError(fmt.Errorf("expected object in cache got %#v", o))
} else {
w.cachedObject = old
}
}

// process gets the top of the work queue and processes the object that is received.
func (w *watcher) process(_ context.Context) bool {
obj, quit := w.queue.Get()
Expand Down
Loading