Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conditions on podupdater functions of kubernetes autodiscovery #71

Merged
merged 33 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e72955a
first update for nodePodUpdater
gizas Dec 7, 2023
d8ace11
first update for nodePodUpdater with key function
gizas Dec 8, 2023
ac43d6f
first update for namespacePodUpdater
gizas Dec 8, 2023
979e3c0
updating comments for code
gizas Dec 8, 2023
0a91978
updating with delta
gizas Dec 12, 2023
853661d
fixing referneces only to one wather as an argument
gizas Dec 13, 2023
5ea29d1
removing comments
gizas Dec 13, 2023
1c86749
Update eventhandler.go - Fixing typo
gizas Dec 13, 2023
b01ddcc
Update watcher.go comment in deltaslice function
gizas Dec 13, 2023
e78177c
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
cdfab00
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
416ab77
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
a710864
updates for delta struct and comments
gizas Jan 9, 2024
e265b8e
updates for delta struct and comments
gizas Jan 9, 2024
6cb89b9
returning only old cached object
gizas Jan 9, 2024
13bff58
fixing comment after upgrade to return old object
gizas Jan 9, 2024
9ee164e
changing name of interface
gizas Jan 9, 2024
328f78d
adding check for oldobjectreturn
gizas Jan 9, 2024
2d73a98
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
76b4dce
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
66f616c
Update kubernetes/watcher.go
gizas Jan 9, 2024
16b0fca
renaming functions and providinng camelCase variables
gizas Jan 9, 2024
8629378
renaming functions and providinng camelCase variables
gizas Jan 9, 2024
05ed573
reflect.DeepEqual added
gizas Jan 9, 2024
8ad0d7c
removing extra lines
gizas Jan 9, 2024
a63bbf3
replacing oldobject with cachedobject
gizas Jan 10, 2024
51bbd1b
keeping only watcher struct
gizas Jan 11, 2024
5aac018
checks for namespace and node
gizas Jan 11, 2024
b89e490
checks for namespace and node
gizas Jan 12, 2024
c031e0f
Update kubernetes/watcher.go
gizas Jan 12, 2024
85a74fe
udpating changelog
gizas Jan 12, 2024
2862b18
udpating changelog
gizas Jan 12, 2024
7ca8875
udpating changelog
gizas Jan 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 18 additions & 26 deletions kubernetes/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,26 +138,26 @@ type podUpdaterStore interface {
List() []interface{}
}

// OldobjectWatcher is the interface that an object needs to implement to be
// OldObjectWatcher is the interface that an object needs to implement to be
// able to use Oldobject cache event function from watcher.
type OldobjectWatcher interface {
type oldObjectWatcher interface {
Oldobject() runtime.Object
}

// namespacePodUpdater notifies updates on pods when their namespaces are updated.
type namespacePodUpdater struct {
handler podUpdaterHandlerFunc
store podUpdaterStore
namespacewatcher OldobjectWatcher
namespaceWatcher oldObjectWatcher
locker sync.Locker
}

// NewNamespacePodUpdater creates a namespacePodUpdater
func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespacewatcher OldobjectWatcher, locker sync.Locker) *namespacePodUpdater {
func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespaceWatcher oldObjectWatcher, locker sync.Locker) *namespacePodUpdater {
return &namespacePodUpdater{
handler: handler,
store: store,
namespacewatcher: namespacewatcher,
namespaceWatcher: namespaceWatcher,
locker: locker,
}
}
Expand All @@ -168,7 +168,6 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) {
if !ok {
return
}

// n.store.List() returns a snapshot at this point. If a delete is received
// from the main watcher, this loop may generate an update event after the
// delete is processed, leaving configurations that would never be deleted.
Expand All @@ -178,12 +177,12 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) {
n.locker.Lock()
defer n.locker.Unlock()
}
oldobject := n.namespacewatcher.Oldobject()
cachednamespaceold, ok := oldobject.(*Namespace)
cachedObject := n.namespaceWatcher.Oldobject()
cachedNamespace, ok := cachedObject.(*Namespace)

if ok && ns.Name == cachednamespaceold.Name {
labelscheck := isEqualMetadata(ns.ObjectMeta.Labels, cachednamespaceold.ObjectMeta.Labels)
annotationscheck := isEqualMetadata(ns.ObjectMeta.Annotations, cachednamespaceold.ObjectMeta.Annotations)
if ok && ns.Name == cachedNamespace.Name {
labelscheck := reflect.DeepEqual(ns.ObjectMeta.Labels, cachedNamespace.ObjectMeta.Labels)
annotationscheck := reflect.DeepEqual(ns.ObjectMeta.Annotations, cachedNamespace.ObjectMeta.Annotations)
// Only if there is a difference in Metadata labels or annotations proceed to Pod update
if !labelscheck || !annotationscheck {
for _, pod := range n.store.List() {
Expand All @@ -209,16 +208,16 @@ func (*namespacePodUpdater) OnDelete(interface{}) {}
type nodePodUpdater struct {
handler podUpdaterHandlerFunc
store podUpdaterStore
nodewatcher OldobjectWatcher
nodeWatcher oldObjectWatcher
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I meant to say to replace any other reference to oldObject -> cachedObject

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done @gsantoro
thank for insisting on this

Also relevant updates done in tests of elastic/beats#37431
Could you also have a look and approve there as well?

locker sync.Locker
}

// NewNodePodUpdater creates a nodePodUpdater
func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodewatcher OldobjectWatcher, locker sync.Locker) *nodePodUpdater {
func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodeWatcher oldObjectWatcher, locker sync.Locker) *nodePodUpdater {
return &nodePodUpdater{
handler: handler,
store: store,
nodewatcher: nodewatcher,
nodeWatcher: nodeWatcher,
locker: locker,
}
}
Expand All @@ -229,7 +228,6 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) {
if !ok {
return
}

// n.store.List() returns a snapshot at this point. If a delete is received
// from the main watcher, this loop may generate an update event after the
// delete is processed, leaving configurations that would never be deleted.
Expand All @@ -239,12 +237,12 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) {
n.locker.Lock()
defer n.locker.Unlock()
}
oldobject := n.nodewatcher.Oldobject()
cachednodeold, ok := oldobject.(*Node)
cachedObject := n.nodeWatcher.Oldobject()
cachedNode, ok := cachedObject.(*Node)

if ok && node.Name == cachednodeold.Name {
labelscheck := isEqualMetadata(node.ObjectMeta.Labels, cachednodeold.ObjectMeta.Labels)
annotationscheck := isEqualMetadata(node.ObjectMeta.Annotations, cachednodeold.ObjectMeta.Annotations)
if ok && node.Name == cachedNode.Name {
labelscheck := reflect.DeepEqual(node.ObjectMeta.Labels, cachedNode.ObjectMeta.Labels)
annotationscheck := reflect.DeepEqual(node.ObjectMeta.Annotations, cachedNode.ObjectMeta.Annotations)
// Only if there is a difference in Metadata labels or annotations proceed to Pod update
if !labelscheck || !annotationscheck {
for _, pod := range n.store.List() {
Expand All @@ -264,9 +262,3 @@ func (*nodePodUpdater) OnAdd(interface{}) {}
// OnDelete handles delete events on namespaces. Nothing to do, if pods are deleted from this
// namespace they will generate their own delete events.
func (*nodePodUpdater) OnDelete(interface{}) {}

// isEqualMetadata receives labels or annotations maps and checks their equality. Returns True if equal, False if there is a diffrence
func isEqualMetadata(newmetadata, oldmetadata map[string]string) bool {
check := reflect.DeepEqual(newmetadata, oldmetadata)
return check
}
8 changes: 4 additions & 4 deletions kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource
// state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update.
w.enqueue(n, add)
}
w.oldobjectreturn(o)
w.cacheObject(o)

},
})
Expand Down Expand Up @@ -229,10 +229,10 @@ func (w *watcher) enqueue(obj interface{}, state string) {
w.queue.Add(&item{key, obj, state})
}

// oldobjectreturn returns the old version of cache objects before change on update events
func (w *watcher) oldobjectreturn(o interface{}) {
// cacheObject updates watcher with the old version of cache objects before change during update events
func (w *watcher) cacheObject(o interface{}) {
if old, ok := o.(runtime.Object); !ok {
utilruntime.HandleError(fmt.Errorf("expected object in cache got %#v", o))
utilruntime.HandleError(fmt.Errorf("expected object in cache got %#v", o))
} else {
w.oldobject = old
}
Expand Down
Loading