Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conditions on podupdater functions of kubernetes autodiscovery #71

Merged
merged 33 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e72955a
first update for nodePodUpdater
gizas Dec 7, 2023
d8ace11
first update for nodePodUpdater with key function
gizas Dec 8, 2023
ac43d6f
first update for namespacePodUpdater
gizas Dec 8, 2023
979e3c0
updating comments for code
gizas Dec 8, 2023
0a91978
updating with delta
gizas Dec 12, 2023
853661d
fixing referneces only to one wather as an argument
gizas Dec 13, 2023
5ea29d1
removing comments
gizas Dec 13, 2023
1c86749
Update eventhandler.go - Fixing typo
gizas Dec 13, 2023
b01ddcc
Update watcher.go comment in deltaslice function
gizas Dec 13, 2023
e78177c
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
cdfab00
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
416ab77
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
a710864
updates for delta struct and comments
gizas Jan 9, 2024
e265b8e
updates for delta struct and comments
gizas Jan 9, 2024
6cb89b9
returning only old cached object
gizas Jan 9, 2024
13bff58
fixing comment after upgrade to return old object
gizas Jan 9, 2024
9ee164e
changing name of interface
gizas Jan 9, 2024
328f78d
adding check for oldobjectreturn
gizas Jan 9, 2024
2d73a98
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
76b4dce
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
66f616c
Update kubernetes/watcher.go
gizas Jan 9, 2024
16b0fca
renaming functions and providinng camelCase variables
gizas Jan 9, 2024
8629378
renaming functions and providinng camelCase variables
gizas Jan 9, 2024
05ed573
reflect.DeepEqual added
gizas Jan 9, 2024
8ad0d7c
removing extra lines
gizas Jan 9, 2024
a63bbf3
replacing oldobject with cachedobject
gizas Jan 10, 2024
51bbd1b
keeping only watcher struct
gizas Jan 11, 2024
5aac018
checks for namespace and node
gizas Jan 11, 2024
b89e490
checks for namespace and node
gizas Jan 12, 2024
c031e0f
Update kubernetes/watcher.go
gizas Jan 12, 2024
85a74fe
udpating changelog
gizas Jan 12, 2024
2862b18
udpating changelog
gizas Jan 12, 2024
7ca8875
udpating changelog
gizas Jan 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 74 additions & 33 deletions kubernetes/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,27 @@
package kubernetes

import (
"reflect"
"sync"

"k8s.io/apimachinery/pkg/runtime"
)

// ResourceEventHandler can handle notifications for events that happen to a
// resource. The events are informational only, so you can't return an
// error.
// * OnAdd is called when an object is added.
// * OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
// * OnDelete will get the final state of the item if it is known, otherwise
// it will get an object of type DeletedFinalStateUnknown. This can
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
// - OnAdd is called when an object is added.
// - OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
// - OnDelete will get the final state of the item if it is known, otherwise
// it will get an object of type DeletedFinalStateUnknown. This can
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
//
// idea: allow the On* methods to return an error so that the RateLimited WorkQueue
// idea: can requeue the failed event processing.
type ResourceEventHandler interface {
Expand Down Expand Up @@ -134,19 +138,27 @@ type podUpdaterStore interface {
List() []interface{}
}

// OldobjectWatcher is the interface that an object needs to implement to be
// able to use Oldobject cache event function from watcher.
type OldobjectWatcher interface {
Oldobject() runtime.Object
}

// namespacePodUpdater notifies updates on pods when their namespaces are updated.
type namespacePodUpdater struct {
handler podUpdaterHandlerFunc
store podUpdaterStore
locker sync.Locker
handler podUpdaterHandlerFunc
store podUpdaterStore
namespacewatcher OldobjectWatcher
locker sync.Locker
}

// NewNamespacePodUpdater creates a namespacePodUpdater
func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *namespacePodUpdater {
func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespacewatcher OldobjectWatcher, locker sync.Locker) *namespacePodUpdater {
return &namespacePodUpdater{
handler: handler,
store: store,
locker: locker,
handler: handler,
store: store,
namespacewatcher: namespacewatcher,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use MixedCaps (aka camelCase) here and for all other variables? as suggested at https://go.dev/doc/effective_go?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update in 16b0fca

Let me know If I missed anything

locker: locker,
}
}

Expand All @@ -166,12 +178,23 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) {
n.locker.Lock()
defer n.locker.Unlock()
}
for _, pod := range n.store.List() {
pod, ok := pod.(*Pod)
if ok && pod.Namespace == ns.Name {
n.handler(pod)
oldobject := n.namespacewatcher.Oldobject()
gizas marked this conversation as resolved.
Show resolved Hide resolved
cachednamespaceold, ok := oldobject.(*Namespace)

if ok && ns.Name == cachednamespaceold.Name {
labelscheck := isEqualMetadata(ns.ObjectMeta.Labels, cachednamespaceold.ObjectMeta.Labels)
annotationscheck := isEqualMetadata(ns.ObjectMeta.Annotations, cachednamespaceold.ObjectMeta.Annotations)
// Only if there is a difference in Metadata labels or annotations proceed to Pod update
if !labelscheck || !annotationscheck {
for _, pod := range n.store.List() {
pod, ok := pod.(*Pod)
if ok && pod.Namespace == ns.Name {
n.handler(pod)
}
}
}
}

}

// OnAdd handles add events on namespaces. Nothing to do, if pods are added to this
Expand All @@ -184,17 +207,19 @@ func (*namespacePodUpdater) OnDelete(interface{}) {}

// nodePodUpdater notifies updates on pods when their nodes are updated.
type nodePodUpdater struct {
handler podUpdaterHandlerFunc
store podUpdaterStore
locker sync.Locker
handler podUpdaterHandlerFunc
store podUpdaterStore
nodewatcher OldobjectWatcher
locker sync.Locker
}

// NewNodePodUpdater creates a nodePodUpdater
func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *nodePodUpdater {
func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodewatcher OldobjectWatcher, locker sync.Locker) *nodePodUpdater {
return &nodePodUpdater{
handler: handler,
store: store,
locker: locker,
handler: handler,
store: store,
nodewatcher: nodewatcher,
locker: locker,
}
}

Expand All @@ -214,10 +239,20 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) {
n.locker.Lock()
defer n.locker.Unlock()
}
for _, pod := range n.store.List() {
pod, ok := pod.(*Pod)
if ok && pod.Spec.NodeName == node.Name {
n.handler(pod)
oldobject := n.nodewatcher.Oldobject()
gizas marked this conversation as resolved.
Show resolved Hide resolved
cachednodeold, ok := oldobject.(*Node)

if ok && node.Name == cachednodeold.Name {
labelscheck := isEqualMetadata(node.ObjectMeta.Labels, cachednodeold.ObjectMeta.Labels)
annotationscheck := isEqualMetadata(node.ObjectMeta.Annotations, cachednodeold.ObjectMeta.Annotations)
// Only if there is a difference in Metadata labels or annotations proceed to Pod update
if !labelscheck || !annotationscheck {
for _, pod := range n.store.List() {
pod, ok := pod.(*Pod)
if ok && pod.Spec.NodeName == node.Name {
n.handler(pod)
}
}
}
}
}
Expand All @@ -229,3 +264,9 @@ func (*nodePodUpdater) OnAdd(interface{}) {}
// OnDelete handles delete events on namespaces. Nothing to do, if pods are deleted from this
// namespace they will generate their own delete events.
func (*nodePodUpdater) OnDelete(interface{}) {}

// isEqualMetadata receives labels or annotations maps and checks their equality. Returns True if equal, False if there is a diffrence
gizas marked this conversation as resolved.
Show resolved Hide resolved
gizas marked this conversation as resolved.
Show resolved Hide resolved
func isEqualMetadata(newmetadata, oldmetadata map[string]string) bool {
check := reflect.DeepEqual(newmetadata, oldmetadata)
return check
}
1 change: 1 addition & 0 deletions kubernetes/metadata/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (p *pod) Generate(obj kubernetes.Resource, opts ...FieldOptions) mapstr.M {
"kubernetes": p.GenerateK8s(obj, opts...),
}
meta.DeepUpdate(ecsFields)

return meta
}

Expand Down
55 changes: 38 additions & 17 deletions kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type Watcher interface {

// Client returns the kubernetes client object used by the watcher
Client() kubernetes.Interface

// Oldobject returns the old object before change during the last updated event
Oldobject() runtime.Object
}

// WatchOptions controls watch behaviors
Expand All @@ -83,14 +86,15 @@ type item struct {
}

type watcher struct {
client kubernetes.Interface
informer cache.SharedInformer
store cache.Store
queue workqueue.Interface
ctx context.Context
stop context.CancelFunc
handler ResourceEventHandler
logger *logp.Logger
client kubernetes.Interface
informer cache.SharedInformer
store cache.Store
queue workqueue.Interface
ctx context.Context
stop context.CancelFunc
handler ResourceEventHandler
logger *logp.Logger
oldobject runtime.Object
}

// NewWatcher initializes the watcher client to provide a events handler for
Expand All @@ -106,7 +110,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption
func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) {
var store cache.Store
var queue workqueue.Interface

var oldobject runtime.Object
informer, _, err := NewInformer(client, resource, opts, indexers)
if err != nil {
return nil, err
Expand All @@ -127,14 +131,15 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource

ctx, cancel := context.WithCancel(context.TODO())
w := &watcher{
client: client,
informer: informer,
store: store,
queue: queue,
ctx: ctx,
stop: cancel,
logger: logp.NewLogger("kubernetes"),
handler: NoOpEventHandlerFuncs{},
client: client,
informer: informer,
store: store,
queue: queue,
ctx: ctx,
oldobject: oldobject,
stop: cancel,
logger: logp.NewLogger("kubernetes"),
handler: NoOpEventHandlerFuncs{},
}

w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand All @@ -157,6 +162,8 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource
// state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update.
w.enqueue(n, add)
}
w.oldobjectreturn(o)

},
})

Expand All @@ -178,6 +185,11 @@ func (w *watcher) Client() kubernetes.Interface {
return w.client
}

// Oldbject returns the old object in cache during the last updated event
gizas marked this conversation as resolved.
Show resolved Hide resolved
func (w *watcher) Oldobject() runtime.Object {
return w.oldobject
}

// Start watching pods
func (w *watcher) Start() error {
go w.informer.Run(w.ctx.Done())
Expand Down Expand Up @@ -217,6 +229,15 @@ func (w *watcher) enqueue(obj interface{}, state string) {
w.queue.Add(&item{key, obj, state})
}

// oldobjectreturn returns the old version of cache objects before change on update events
func (w *watcher) oldobjectreturn(o interface{}) {
gizas marked this conversation as resolved.
Show resolved Hide resolved
if old, ok := o.(runtime.Object); !ok {
utilruntime.HandleError(fmt.Errorf("expected object in cache got %#v", o))
gizas marked this conversation as resolved.
Show resolved Hide resolved
} else {
w.oldobject = old
}
}

// process gets the top of the work queue and processes the object that is received.
func (w *watcher) process(_ context.Context) bool {
obj, quit := w.queue.Get()
Expand Down