-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conditions on podupdater functions of kubernetes autodiscovery #71
Changes from 9 commits
e72955a
d8ace11
ac43d6f
979e3c0
0a91978
853661d
5ea29d1
1c86749
b01ddcc
e78177c
cdfab00
416ab77
a710864
e265b8e
6cb89b9
13bff58
9ee164e
328f78d
2d73a98
76b4dce
66f616c
16b0fca
8629378
05ed573
8ad0d7c
a63bbf3
51bbd1b
5aac018
b89e490
c031e0f
85a74fe
2862b18
7ca8875
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,23 +18,27 @@ | |
package kubernetes | ||
|
||
import ( | ||
"reflect" | ||
"sync" | ||
|
||
"k8s.io/apimachinery/pkg/runtime" | ||
) | ||
|
||
// ResourceEventHandler can handle notifications for events that happen to a | ||
// resource. The events are informational only, so you can't return an | ||
// error. | ||
// * OnAdd is called when an object is added. | ||
// * OnUpdate is called when an object is modified. Note that oldObj is the | ||
// last known state of the object-- it is possible that several changes | ||
// were combined together, so you can't use this to see every single | ||
// change. OnUpdate is also called when a re-list happens, and it will | ||
// get called even if nothing changed. This is useful for periodically | ||
// evaluating or syncing something. | ||
// * OnDelete will get the final state of the item if it is known, otherwise | ||
// it will get an object of type DeletedFinalStateUnknown. This can | ||
// happen if the watch is closed and misses the delete event and we don't | ||
// notice the deletion until the subsequent re-list. | ||
// - OnAdd is called when an object is added. | ||
// - OnUpdate is called when an object is modified. Note that oldObj is the | ||
// last known state of the object-- it is possible that several changes | ||
// were combined together, so you can't use this to see every single | ||
// change. OnUpdate is also called when a re-list happens, and it will | ||
// get called even if nothing changed. This is useful for periodically | ||
// evaluating or syncing something. | ||
// - OnDelete will get the final state of the item if it is known, otherwise | ||
// it will get an object of type DeletedFinalStateUnknown. This can | ||
// happen if the watch is closed and misses the delete event and we don't | ||
// notice the deletion until the subsequent re-list. | ||
// | ||
// idea: allow the On* methods to return an error so that the RateLimited WorkQueue | ||
// idea: can requeue the failed event processing. | ||
type ResourceEventHandler interface { | ||
|
@@ -134,19 +138,25 @@ type podUpdaterStore interface { | |
List() []interface{} | ||
} | ||
|
||
type UpdateWatcher interface { | ||
Deltaobjects() []runtime.Object | ||
} | ||
|
||
// namespacePodUpdater notifies updates on pods when their namespaces are updated. | ||
type namespacePodUpdater struct { | ||
handler podUpdaterHandlerFunc | ||
store podUpdaterStore | ||
locker sync.Locker | ||
handler podUpdaterHandlerFunc | ||
store podUpdaterStore | ||
namespacewatcher UpdateWatcher | ||
locker sync.Locker | ||
} | ||
|
||
// NewNamespacePodUpdater creates a namespacePodUpdater | ||
func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *namespacePodUpdater { | ||
func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespacewatcher UpdateWatcher, locker sync.Locker) *namespacePodUpdater { | ||
return &namespacePodUpdater{ | ||
handler: handler, | ||
store: store, | ||
locker: locker, | ||
handler: handler, | ||
store: store, | ||
namespacewatcher: namespacewatcher, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we use MixedCaps (aka camelCase) here and for all other variables? as suggested at https://go.dev/doc/effective_go? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update in 16b0fca Let me know If I missed anything |
||
locker: locker, | ||
} | ||
} | ||
|
||
|
@@ -157,21 +167,34 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) { | |
return | ||
} | ||
|
||
// n.store.List() returns a snapshot at this point. If a delete is received | ||
// from the main watcher, this loop may generate an update event after the | ||
// delete is processed, leaving configurations that would never be deleted. | ||
// Also this loop can miss updates, what could leave outdated configurations. | ||
// Avoid these issues by locking the processing of events from the main watcher. | ||
if n.locker != nil { | ||
n.locker.Lock() | ||
defer n.locker.Unlock() | ||
} | ||
for _, pod := range n.store.List() { | ||
pod, ok := pod.(*Pod) | ||
if ok && pod.Namespace == ns.Name { | ||
n.handler(pod) | ||
|
||
// Slice includes the old and new version of caching object that changes in the current update event. slice[0] is the old version and slice[1] the new updated one | ||
slice := n.namespacewatcher.Deltaobjects() | ||
cachednamespaceold, ok := slice[0].(*Namespace) | ||
|
||
if ns.Name == cachednamespaceold.Name && ok { | ||
gizas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
labelscheck := checkMetadata(ns.ObjectMeta.Labels, cachednamespaceold.ObjectMeta.Labels) | ||
annotationscheck := checkMetadata(ns.ObjectMeta.Annotations, cachednamespaceold.ObjectMeta.Annotations) | ||
// Only if there is a diffrence in Metadata labels or annotations proceed to Pod update | ||
gizas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if !labelscheck || !annotationscheck { | ||
// n.store.List() returns a snapshot at this point. If a delete is received | ||
// from the main watcher, this loop may generate an update event after the | ||
// delete is processed, leaving configurations that would never be deleted. | ||
// Also this loop can miss updates, what could leave outdated configurations. | ||
// Avoid these issues by locking the processing of events from the main watcher. | ||
gizas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for _, pod := range n.store.List() { | ||
pod, ok := pod.(*Pod) | ||
if ok && pod.Namespace == ns.Name { | ||
n.handler(pod) | ||
} | ||
} | ||
} | ||
} | ||
|
||
} | ||
|
||
// OnAdd handles add events on namespaces. Nothing to do, if pods are added to this | ||
|
@@ -184,17 +207,19 @@ func (*namespacePodUpdater) OnDelete(interface{}) {} | |
|
||
// nodePodUpdater notifies updates on pods when their nodes are updated. | ||
type nodePodUpdater struct { | ||
handler podUpdaterHandlerFunc | ||
store podUpdaterStore | ||
locker sync.Locker | ||
handler podUpdaterHandlerFunc | ||
store podUpdaterStore | ||
nodewatcher UpdateWatcher | ||
locker sync.Locker | ||
} | ||
|
||
// NewNodePodUpdater creates a nodePodUpdater | ||
func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *nodePodUpdater { | ||
func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodewatcher UpdateWatcher, locker sync.Locker) *nodePodUpdater { | ||
return &nodePodUpdater{ | ||
handler: handler, | ||
store: store, | ||
locker: locker, | ||
handler: handler, | ||
store: store, | ||
nodewatcher: nodewatcher, | ||
locker: locker, | ||
} | ||
} | ||
|
||
|
@@ -205,19 +230,31 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { | |
return | ||
} | ||
|
||
// n.store.List() returns a snapshot at this point. If a delete is received | ||
// from the main watcher, this loop may generate an update event after the | ||
// delete is processed, leaving configurations that would never be deleted. | ||
// Also this loop can miss updates, what could leave outdated configurations. | ||
// Avoid these issues by locking the processing of events from the main watcher. | ||
if n.locker != nil { | ||
n.locker.Lock() | ||
defer n.locker.Unlock() | ||
} | ||
for _, pod := range n.store.List() { | ||
pod, ok := pod.(*Pod) | ||
if ok && pod.Spec.NodeName == node.Name { | ||
n.handler(pod) | ||
|
||
// Slice includes the old and new version of caching object that changes in the current update event. slice[0] is the old version and slice[1] the new updated one | ||
slice := n.nodewatcher.Deltaobjects() | ||
cachednodeold, ok := slice[0].(*Node) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't get the meaning of checking the current namespace name is the same as the cached version. Maybe you can clarify that in a comment. Should we instead check the new name has not changed compared to the cached version? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gsantoro the one you call "current" is the one that triggers the OnUpdate. So still the "current" is not saved in the cache until we make the checks. The actual checks to make the replacement is the ones that come from isEqualMetadata function. And if we have diffrences in metadata we proceed with the Pod updates FYI : The check we make @tetianakravchenko indeed only the old version is needed (former slice[0]). I initially thought the wather to rerutn only the old version. Then I changed my mind that probably we will need in the future the deltas if we need to make more checks. I dont think that it is a big diff to return both? But tbh I have not tested the memory in scale here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
since There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tetianakravchenko I was biased due to this and was thinking that we might use it in the future. But not needed for now for sure. I chanhed again the code only to return old object. |
||
|
||
if node.Name == cachednodeold.Name && ok { | ||
gizas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
labelscheck := checkMetadata(node.ObjectMeta.Labels, cachednodeold.ObjectMeta.Labels) | ||
annotationscheck := checkMetadata(node.ObjectMeta.Annotations, cachednodeold.ObjectMeta.Annotations) | ||
// Only if there is a diffrence in Metadata labels or annotations proceed to Pod update | ||
gizas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if !labelscheck || !annotationscheck { | ||
// n.store.List() returns a snapshot at this point. If a delete is received | ||
// from the main watcher, this loop may generate an update event after the | ||
// delete is processed, leaving configurations that would never be deleted. | ||
// Also this loop can miss updates, what could leave outdated configurations. | ||
// Avoid these issues by locking the processing of events from the main watcher. | ||
gizas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for _, pod := range n.store.List() { | ||
pod, ok := pod.(*Pod) | ||
if ok && pod.Spec.NodeName == node.Name { | ||
n.handler(pod) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -229,3 +266,9 @@ func (*nodePodUpdater) OnAdd(interface{}) {} | |
// OnDelete handles delete events on namespaces. Nothing to do, if pods are deleted from this | ||
// namespace they will generate their own delete events. | ||
func (*nodePodUpdater) OnDelete(interface{}) {} | ||
|
||
// checkMetadata receives labels or annotations maps and checks their equality. Returns True if equal, False if there is a diffrence | ||
func checkMetadata(newmetadata, oldmetadata map[string]string) bool { | ||
gizas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
check := reflect.DeepEqual(newmetadata, oldmetadata) | ||
return check | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gizas could you please add
Doc comments
for the exported type?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me know if ok?