Skip to content

Commit

Permalink
rewrite signature of function StartEventWatcher
Browse files Browse the repository at this point in the history
Kubernetes-commit: c8291e393b32f8ce4e8fd1d4f18e848247be13f8
  • Loading branch information
jlsong01 authored and k8s-publishing-bot committed Jul 29, 2022
1 parent cbe28cf commit 3467961
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
32 changes: 21 additions & 11 deletions tools/events/event_broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,9 @@ func getKey(event *eventsv1.Event) eventKey {

// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function.
// The return value can be ignored or used to stop recording, if desired.
// TODO: this function should also return an error.
func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func() {
return e.StartEventWatcher(
stopWatcher, err := e.StartEventWatcher(
func(obj runtime.Object) {
event, ok := obj.(*eventsv1.Event)
if !ok {
Expand All @@ -302,19 +303,20 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func
}
klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note)
})
if err != nil {
klog.Errorf("failed to start event watcher: '%v'", err)
return func() {}
}
return stopWatcher
}

// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value is used to stop recording
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) func() {
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) (func(), error) {
watcher, err := e.Watch()
if err != nil {
klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
// TODO: Rewrite the function signature to return an error, for
// now just return a no-op function
return func() {
klog.Error("The event watcher failed to start")
}
return nil, err
}
go func() {
defer utilruntime.HandleCrash()
Expand All @@ -326,10 +328,10 @@ func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime
eventHandler(watchEvent.Object)
}
}()
return watcher.Stop
return watcher.Stop, nil
}

func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) {
func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) error {
eventHandler := func(obj runtime.Object) {
event, ok := obj.(*eventsv1.Event)
if !ok {
Expand All @@ -338,18 +340,26 @@ func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) {
}
e.recordToSink(event, clock.RealClock{})
}
stopWatcher := e.StartEventWatcher(eventHandler)
stopWatcher, err := e.StartEventWatcher(eventHandler)
if err != nil {
return err
}
go func() {
<-stopCh
stopWatcher()
}()
return nil
}

// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh)
go wait.Until(e.finishSeries, finishTime, stopCh)
e.startRecordingEvents(stopCh)
err := e.startRecordingEvents(stopCh)
if err != nil {
klog.Errorf("unexpected type, expected eventsv1.Event")
return
}
}

type eventBroadcasterAdapterImpl struct {
Expand Down
5 changes: 4 additions & 1 deletion tools/events/eventseries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,10 @@ func TestEventSeriesf(t *testing.T) {
// Don't call StartRecordingToSink, as we don't need neither refreshing event
// series nor finishing them in this tests and additional events updated would
// race with our expected ones.
broadcaster.startRecordingEvents(stopCh)
err = broadcaster.startRecordingEvents(stopCh)
if err != nil {
t.Fatal(err)
}
recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1})
// read from the chan as this was needed only to populate the cache
<-createEvent
Expand Down
2 changes: 1 addition & 1 deletion tools/events/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type EventBroadcaster interface {
// of StartRecordingToSink. This lets you also process events in a custom way (e.g. in tests).
// NOTE: events received on your eventHandler should be copied before being used.
// TODO: figure out if this can be removed.
StartEventWatcher(eventHandler func(event runtime.Object)) func()
StartEventWatcher(eventHandler func(event runtime.Object)) (func(), error)

// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured
// logging function. The return value can be ignored or used to stop recording, if desired.
Expand Down

0 comments on commit 3467961

Please sign in to comment.