Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conditions on podupdater functions of kubernetes autodiscovery #71

Merged
merged 33 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e72955a
first update for nodePodUpdater
gizas Dec 7, 2023
d8ace11
first update for nodePodUpdater with key function
gizas Dec 8, 2023
ac43d6f
first update for namespacePodUpdater
gizas Dec 8, 2023
979e3c0
updating comments for code
gizas Dec 8, 2023
0a91978
updating with delta
gizas Dec 12, 2023
853661d
fixing referneces only to one wather as an argument
gizas Dec 13, 2023
5ea29d1
removing comments
gizas Dec 13, 2023
1c86749
Update eventhandler.go - Fixing typo
gizas Dec 13, 2023
b01ddcc
Update watcher.go comment in deltaslice function
gizas Dec 13, 2023
e78177c
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
cdfab00
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
416ab77
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
a710864
updates for delta struct and comments
gizas Jan 9, 2024
e265b8e
updates for delta struct and comments
gizas Jan 9, 2024
6cb89b9
returning only old cached object
gizas Jan 9, 2024
13bff58
fixing comment after upgrade to return old object
gizas Jan 9, 2024
9ee164e
changing name of interface
gizas Jan 9, 2024
328f78d
adding check for oldobjectreturn
gizas Jan 9, 2024
2d73a98
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
76b4dce
Update kubernetes/eventhandler.go
gizas Jan 9, 2024
66f616c
Update kubernetes/watcher.go
gizas Jan 9, 2024
16b0fca
renaming functions and providinng camelCase variables
gizas Jan 9, 2024
8629378
renaming functions and providinng camelCase variables
gizas Jan 9, 2024
05ed573
reflect.DeepEqual added
gizas Jan 9, 2024
8ad0d7c
removing extra lines
gizas Jan 9, 2024
a63bbf3
replacing oldobject with cachedobject
gizas Jan 10, 2024
51bbd1b
keeping only watcher struct
gizas Jan 11, 2024
5aac018
checks for namespace and node
gizas Jan 11, 2024
b89e490
checks for namespace and node
gizas Jan 12, 2024
c031e0f
Update kubernetes/watcher.go
gizas Jan 12, 2024
85a74fe
udpating changelog
gizas Jan 12, 2024
2862b18
udpating changelog
gizas Jan 12, 2024
7ca8875
udpating changelog
gizas Jan 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 86 additions & 43 deletions kubernetes/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,27 @@
package kubernetes

import (
"reflect"
"sync"

"k8s.io/apimachinery/pkg/runtime"
)

// ResourceEventHandler can handle notifications for events that happen to a
// resource. The events are informational only, so you can't return an
// error.
// * OnAdd is called when an object is added.
// * OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
// * OnDelete will get the final state of the item if it is known, otherwise
// it will get an object of type DeletedFinalStateUnknown. This can
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
// - OnAdd is called when an object is added.
// - OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
// - OnDelete will get the final state of the item if it is known, otherwise
// it will get an object of type DeletedFinalStateUnknown. This can
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
//
// idea: allow the On* methods to return an error so that the RateLimited WorkQueue
// idea: can requeue the failed event processing.
type ResourceEventHandler interface {
Expand Down Expand Up @@ -134,19 +138,25 @@ type podUpdaterStore interface {
List() []interface{}
}

type UpdateWatcher interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gizas could you please add Doc comments for the exported type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me know if ok?

Deltaobjects() []runtime.Object
}

// namespacePodUpdater notifies updates on pods when their namespaces are updated.
type namespacePodUpdater struct {
handler podUpdaterHandlerFunc
store podUpdaterStore
locker sync.Locker
handler podUpdaterHandlerFunc
store podUpdaterStore
namespacewatcher UpdateWatcher
locker sync.Locker
}

// NewNamespacePodUpdater creates a namespacePodUpdater
func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *namespacePodUpdater {
func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespacewatcher UpdateWatcher, locker sync.Locker) *namespacePodUpdater {
return &namespacePodUpdater{
handler: handler,
store: store,
locker: locker,
handler: handler,
store: store,
namespacewatcher: namespacewatcher,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use MixedCaps (aka camelCase) here and for all other variables? as suggested at https://go.dev/doc/effective_go?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update in 16b0fca

Let me know If I missed anything

locker: locker,
}
}

Expand All @@ -157,21 +167,34 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) {
return
}

// n.store.List() returns a snapshot at this point. If a delete is received
// from the main watcher, this loop may generate an update event after the
// delete is processed, leaving configurations that would never be deleted.
// Also this loop can miss updates, what could leave outdated configurations.
// Avoid these issues by locking the processing of events from the main watcher.
if n.locker != nil {
n.locker.Lock()
defer n.locker.Unlock()
}
for _, pod := range n.store.List() {
pod, ok := pod.(*Pod)
if ok && pod.Namespace == ns.Name {
n.handler(pod)

// Slice includes the old and new version of caching object that changes in the current update event. slice[0] is the old version and slice[1] the new updated one
slice := n.namespacewatcher.Deltaobjects()
cachednamespaceold, ok := slice[0].(*Namespace)

if ns.Name == cachednamespaceold.Name && ok {
gizas marked this conversation as resolved.
Show resolved Hide resolved
labelscheck := checkMetadata(ns.ObjectMeta.Labels, cachednamespaceold.ObjectMeta.Labels)
annotationscheck := checkMetadata(ns.ObjectMeta.Annotations, cachednamespaceold.ObjectMeta.Annotations)
// Only if there is a diffrence in Metadata labels or annotations proceed to Pod update
gizas marked this conversation as resolved.
Show resolved Hide resolved
if !labelscheck || !annotationscheck {
// n.store.List() returns a snapshot at this point. If a delete is received
// from the main watcher, this loop may generate an update event after the
// delete is processed, leaving configurations that would never be deleted.
// Also this loop can miss updates, what could leave outdated configurations.
// Avoid these issues by locking the processing of events from the main watcher.
gizas marked this conversation as resolved.
Show resolved Hide resolved
for _, pod := range n.store.List() {
pod, ok := pod.(*Pod)
if ok && pod.Namespace == ns.Name {
n.handler(pod)
}
}
}
}

}

// OnAdd handles add events on namespaces. Nothing to do, if pods are added to this
Expand All @@ -184,17 +207,19 @@ func (*namespacePodUpdater) OnDelete(interface{}) {}

// nodePodUpdater notifies updates on pods when their nodes are updated.
type nodePodUpdater struct {
handler podUpdaterHandlerFunc
store podUpdaterStore
locker sync.Locker
handler podUpdaterHandlerFunc
store podUpdaterStore
nodewatcher UpdateWatcher
locker sync.Locker
}

// NewNodePodUpdater creates a nodePodUpdater
func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *nodePodUpdater {
func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodewatcher UpdateWatcher, locker sync.Locker) *nodePodUpdater {
return &nodePodUpdater{
handler: handler,
store: store,
locker: locker,
handler: handler,
store: store,
nodewatcher: nodewatcher,
locker: locker,
}
}

Expand All @@ -205,19 +230,31 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) {
return
}

// n.store.List() returns a snapshot at this point. If a delete is received
// from the main watcher, this loop may generate an update event after the
// delete is processed, leaving configurations that would never be deleted.
// Also this loop can miss updates, what could leave outdated configurations.
// Avoid these issues by locking the processing of events from the main watcher.
if n.locker != nil {
n.locker.Lock()
defer n.locker.Unlock()
}
for _, pod := range n.store.List() {
pod, ok := pod.(*Pod)
if ok && pod.Spec.NodeName == node.Name {
n.handler(pod)

// Slice includes the old and new version of caching object that changes in the current update event. slice[0] is the old version and slice[1] the new updated one
slice := n.nodewatcher.Deltaobjects()
cachednodeold, ok := slice[0].(*Node)
Copy link
Contributor

@tetianakravchenko tetianakravchenko Jan 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slice[1] seems to be not used and instead the old version (slice[0]) is compared with the obj.(*Node) passed to the OnUpdate. The same for the namespace
Do we actually need the updated version?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get the meaning of checking the current namespace name is the same as the cached version. Maybe you can clarify that in a comment. Should we instead check the new name has not changed compared to the cached version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gsantoro the one you call "current" is the one that triggers the OnUpdate. So still the "current" is not saved in the cache until we make the checks. The actual checks to make the replacement is the ones that come from isEqualMetadata function. And if we have diffrences in metadata we proceed with the Pod updates

FYI : The check we make ns.Name == cachednamespaceold.Name is just for safety in order to make sure that we talk for the same object in cache

@tetianakravchenko indeed only the old version is needed (former slice[0]). I initially thought the wather to rerutn only the old version. Then I changed my mind that probably we will need in the future the deltas if we need to make more checks. I dont think that it is a big diff to return both? But tbh I have not tested the memory in scale here

Copy link
Contributor

@tetianakravchenko tetianakravchenko Jan 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tetianakravchenko indeed only the old version is needed (former slice[0]). I initially thought the wather to rerutn only the old version. Then I changed my mind that probably we will need in the future the deltas if we need to make more checks. I don't think that it is a big diff to return both? But tbh I have not tested the memory in scale here

since obj.(*Node) passed to the OnUpdate is equal to the slice[1] (new version) - then we are just duplicating data here and I can't imagine any use case to use this slice[1] for future (instead of obj.(*Node)), do you have anything in mind?
I also don't think that it is a big diff to return, but I think it is better to not introduce obscurity/duplication

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tetianakravchenko I was biased due to this and was thinking that we might use it in the future. But not needed for now for sure.

I chanhed again the code only to return old object.


if node.Name == cachednodeold.Name && ok {
gizas marked this conversation as resolved.
Show resolved Hide resolved
labelscheck := checkMetadata(node.ObjectMeta.Labels, cachednodeold.ObjectMeta.Labels)
annotationscheck := checkMetadata(node.ObjectMeta.Annotations, cachednodeold.ObjectMeta.Annotations)
// Only if there is a diffrence in Metadata labels or annotations proceed to Pod update
gizas marked this conversation as resolved.
Show resolved Hide resolved
if !labelscheck || !annotationscheck {
// n.store.List() returns a snapshot at this point. If a delete is received
// from the main watcher, this loop may generate an update event after the
// delete is processed, leaving configurations that would never be deleted.
// Also this loop can miss updates, what could leave outdated configurations.
// Avoid these issues by locking the processing of events from the main watcher.
gizas marked this conversation as resolved.
Show resolved Hide resolved
for _, pod := range n.store.List() {
pod, ok := pod.(*Pod)
if ok && pod.Spec.NodeName == node.Name {
n.handler(pod)
}
}
}
}
}
Expand All @@ -229,3 +266,9 @@ func (*nodePodUpdater) OnAdd(interface{}) {}
// OnDelete handles delete events on namespaces. Nothing to do, if pods are deleted from this
// namespace they will generate their own delete events.
func (*nodePodUpdater) OnDelete(interface{}) {}

// checkMetadata receives labels or annotations maps and checks their equality. Returns True if equal, False if there is a diffrence
func checkMetadata(newmetadata, oldmetadata map[string]string) bool {
gizas marked this conversation as resolved.
Show resolved Hide resolved
check := reflect.DeepEqual(newmetadata, oldmetadata)
return check
}
1 change: 1 addition & 0 deletions kubernetes/metadata/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (p *pod) Generate(obj kubernetes.Resource, opts ...FieldOptions) mapstr.M {
"kubernetes": p.GenerateK8s(obj, opts...),
}
meta.DeepUpdate(ecsFields)

return meta
}

Expand Down
24 changes: 23 additions & 1 deletion kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type Watcher interface {

// Client returns the kubernetes client object used by the watcher
Client() kubernetes.Interface

// Deltaobjects returns the slice of objects that change during the last updated event
Deltaobjects() []runtime.Object
}

// WatchOptions controls watch behaviors
Expand Down Expand Up @@ -91,6 +94,7 @@ type watcher struct {
stop context.CancelFunc
handler ResourceEventHandler
logger *logp.Logger
delta []runtime.Object
}

// NewWatcher initializes the watcher client to provide a events handler for
Expand All @@ -106,7 +110,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption
func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) {
var store cache.Store
var queue workqueue.Interface

var deltaslice []runtime.Object
informer, _, err := NewInformer(client, resource, opts, indexers)
if err != nil {
return nil, err
Expand All @@ -132,6 +136,7 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource
store: store,
queue: queue,
ctx: ctx,
delta: deltaslice,
stop: cancel,
logger: logp.NewLogger("kubernetes"),
handler: NoOpEventHandlerFuncs{},
Expand All @@ -157,6 +162,8 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource
// state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update.
w.enqueue(n, add)
}
w.deltaobjects(o, n)

},
})

Expand All @@ -178,6 +185,11 @@ func (w *watcher) Client() kubernetes.Interface {
return w.client
}

// Deltaobjects returns the slice of objects that change during the last updated event
func (w *watcher) Deltaobjects() []runtime.Object {
return w.delta
}

// Start watching pods
func (w *watcher) Start() error {
go w.informer.Run(w.ctx.Done())
Expand Down Expand Up @@ -217,6 +229,16 @@ func (w *watcher) enqueue(obj interface{}, state string) {
w.queue.Add(&item{key, obj, state})
}

// deltaobjects creates a slice with the old and the new version of cache objects that are ready to chane on update events
// returns a delta struct to the watcher. w.delta[0] is the old version and w.delta[1] the new updated one
func (w *watcher) deltaobjects(o interface{}, n interface{}) {
gizas marked this conversation as resolved.
Show resolved Hide resolved
//w.delta[:0] initialises always the delta struct before new assignement
w.delta = w.delta[:0]
w.delta = append(w.delta, o.(runtime.Object))
w.delta = append(w.delta, n.(runtime.Object))

}

// process gets the top of the work queue and processes the object that is received.
func (w *watcher) process(_ context.Context) bool {
obj, quit := w.queue.Get()
Expand Down
Loading