Skip to content

Commit

Permalink
Update inline docs and use meta.Accessor for generic objects
Browse files Browse the repository at this point in the history
Signed-off-by: jannfis <[email protected]>
  • Loading branch information
jannfis committed Mar 11, 2024
1 parent 7d02770 commit 93990cd
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 46 deletions.
8 changes: 5 additions & 3 deletions internal/informer/appproject/projectinformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ type AppProjectInformer struct {
addFunc func(proj *v1alpha1.AppProject)
updateFunc func(oldProj *v1alpha1.AppProject, newProj *v1alpha1.AppProject)
deleteFunc func(proj *v1alpha1.AppProject)

projLister applisters.AppProjectLister
}

type AppProjectInformerOption func(pi *AppProjectInformer) error

// WithNamespaces restricts the informer to certain namespaces. If no namespace
// is set, or if multiple are set, the informer requires cluster-wide access to
// AppProjects. When one or more namespaces are set, only objects in these
// namespaces will be considered by the informer.
func WithNamespaces(namespaces ...string) AppProjectInformerOption {
return func(pi *AppProjectInformer) error {
pi.namespaces = namespaces
Expand Down Expand Up @@ -84,7 +86,7 @@ func NewAppProjectInformer(ctx context.Context, client appclientset.Interface, o
if pi.logger == nil {
pi.logger = logrus.WithField("module", "AppProjectInformer")
}
i, err := informer.NewInformer(&v1alpha1.AppProject{},
i, err := informer.NewGenericInformer(&v1alpha1.AppProject{},
informer.WithListCallback(func(options v1.ListOptions, namespace string) (runtime.Object, error) {
projects, err := client.ArgoprojV1alpha1().AppProjects(namespace).List(ctx, options)
pi.logger.Debugf("Lister returned %d AppProjects", len(projects.Items))
Expand Down
12 changes: 12 additions & 0 deletions internal/informer/appproject/projectinformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,25 +88,31 @@ func Test_FilterFunc(t *testing.T) {
numAdded := atomic.Uint32{}
numUpdated := atomic.Uint32{}
numDeleted := atomic.Uint32{}
addCh := make(chan bool)
updateCh := make(chan bool)
deleteCh := make(chan bool)
ac := fakeappclient.NewSimpleClientset()
pi, lister, err := NewAppProjectInformer(context.TODO(), ac,
WithAddFunc(func(proj *v1alpha1.AppProject) {
numAdded.Add(1)
if numAdded.Load() > 1 {
t.Fatalf("AddFunc called for %s", proj.GetName())
}
addCh <- true
}),
WithUpdateFunc(func(oldProj, newProj *v1alpha1.AppProject) {
numUpdated.Add(1)
if numUpdated.Load() > 1 {
t.Fatalf("UpdateFunc called for %s", newProj.GetName())
}
updateCh <- true
}),
WithDeleteFunc(func(proj *v1alpha1.AppProject) {
numDeleted.Add(1)
if numDeleted.Load() > 1 {
t.Fatalf("DeleteFunc called for %s", proj.GetName())
}
deleteCh <- true
}),
WithListFilter(func(proj *v1alpha1.AppProject) bool {
if proj.Name == "proj1" {
Expand All @@ -130,6 +136,7 @@ func Test_FilterFunc(t *testing.T) {
ObjectMeta: v1.ObjectMeta{Name: fmt.Sprintf("proj%d", i)},
}, v1.CreateOptions{})
}
<-addCh
p, err := lister.AppProjects("argocd").Get("proj1")
assert.NotNil(t, p)
assert.NoError(t, err)
Expand All @@ -140,11 +147,16 @@ func Test_FilterFunc(t *testing.T) {
ObjectMeta: v1.ObjectMeta{Name: fmt.Sprintf("proj%d", i)}, Spec: v1alpha1.AppProjectSpec{Description: "Foo"},
}, v1.UpdateOptions{})
}
<-updateCh
})
t.Run("Delete AppProjects", func(t *testing.T) {
for _, i := range []int{1, 2, 3, 4, 5} {
ac.ArgoprojV1alpha1().AppProjects("argocd").Delete(context.TODO(), fmt.Sprintf("proj%d", i), v1.DeleteOptions{})
}
<-deleteCh
p, err := lister.AppProjects("argocd").Get("proj1")
assert.Nil(t, p)
assert.ErrorContains(t, err, "not found")
})

}
Expand Down
81 changes: 38 additions & 43 deletions internal/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import (
"time"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -107,6 +106,11 @@ func WithFieldSelector(sel string) InformerOption {
}
}

// WithNamespaces sets the namespaces for which the informer will process any
// event. If an event is seen for an object in a namespace that is not in this
// list, the event will be ignored. If either zero or multiple namespaces are
// set, the informer will require cluster-wide permissions to list and watch
// the kind of resource to be handled by this informer.
func WithNamespaces(namespaces ...string) InformerOption {
return func(i *GenericInformer) error {
for _, ns := range namespaces {
Expand All @@ -116,15 +120,17 @@ func WithNamespaces(namespaces ...string) InformerOption {
}
}

func NewInformer(objType runtime.Object, options ...InformerOption) (*GenericInformer, error) {
// NewGenericInformer returns a new instance of a GenericInformer for the given
// object type and with the given objects.
func NewGenericInformer(objType runtime.Object, options ...InformerOption) (*GenericInformer, error) {
i := &GenericInformer{
namespaces: make(map[string]interface{}),
}
i.logger = log().WithFields(logrus.Fields{
"resource": fmt.Sprintf("%T", objType),
})
i.runch = make(chan struct{})
i.SetSynced(false)
i.setSynced(false)
for _, o := range options {
err := o(i)
if err != nil {
Expand All @@ -145,8 +151,8 @@ func NewInformer(objType runtime.Object, options ...InformerOption) (*GenericInf
return i.listFunc(options, i.watchAndListNamespace())
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
i.SetSynced(false)
defer i.SetSynced(true)
i.setSynced(false)
defer i.setSynced(true)
logCtx.Trace("Starting watcher")
return i.watchFunc(options, i.watchAndListNamespace())
},
Expand All @@ -161,13 +167,13 @@ func NewInformer(objType runtime.Object, options ...InformerOption) (*GenericInf
)
_, err := i.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
un, err := toUnstructured(obj)
mobj, err := meta.Accessor(obj)
if err != nil {
i.logger.WithError(err).Errorf("Could not convert type %T to unstructured", obj)
return
}
if !i.isNamespaceAllowed(un) {
i.logger.Tracef("Namespace %s not allowed", un.GetNamespace())
if !i.isNamespaceAllowed(mobj) {
i.logger.Tracef("Namespace %s not allowed", mobj.GetNamespace())
return
}
if i.filterFunc != nil {
Expand All @@ -180,13 +186,13 @@ func NewInformer(objType runtime.Object, options ...InformerOption) (*GenericInf
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
un, err := toUnstructured(newObj)
mobj, err := meta.Accessor(newObj)
if err != nil {
i.logger.WithError(err).Errorf("Could not convert type %T to unstructured", newObj)
return
}
if !i.isNamespaceAllowed(un) {
i.logger.Tracef("Namespace %s not allowed", un.GetNamespace())
if !i.isNamespaceAllowed(mobj) {
i.logger.Tracef("Namespace %s not allowed", mobj.GetNamespace())
return
}
if i.filterFunc != nil {
Expand All @@ -199,13 +205,13 @@ func NewInformer(objType runtime.Object, options ...InformerOption) (*GenericInf
}
},
DeleteFunc: func(obj interface{}) {
un, err := toUnstructured(obj)
mobj, err := meta.Accessor(obj)
if err != nil {
i.logger.WithError(err).Errorf("Could not convert type %T to unstructured", obj)
return
}
if !i.isNamespaceAllowed(un) {
i.logger.Tracef("Namespace %s not allowed", un.GetNamespace())
if !i.isNamespaceAllowed(mobj) {
i.logger.Tracef("Namespace %s not allowed", mobj.GetNamespace())
return
}
if i.filterFunc != nil {
Expand All @@ -229,33 +235,40 @@ func (i *GenericInformer) IsSynced() bool {
return i.synced.Load()
}

// SetSynced sets the sync state to either true or false
func (i *GenericInformer) SetSynced(synced bool) {
// setSynced sets the sync state to either true or false
func (i *GenericInformer) setSynced(synced bool) {
i.logger.Tracef("Setting informer sync state to %v", synced)
i.synced.Store(synced)
}

// IsRunning returns whether the GenericInformer is running or not
func (i *GenericInformer) IsRunning() bool {
return i.running.Load()
}

func (i *GenericInformer) SetRunning(running bool) {
// setRunning sets whether this informer is running to either true or false
func (i *GenericInformer) setRunning(running bool) {
i.logger.Tracef("Setting informer run state to %v", running)
i.running.Store(running)
}

// Start starts the GenericInformer with its current options in a goroutine.
// If this method does not return an error, you can use Stop to stop the
// informer and terminate the goroutine it has created. If Start returns an
// error, no goroutine will have been created.
func (i *GenericInformer) Start(ctx context.Context) error {
i.mutex.Lock()
defer i.mutex.Unlock()
if i.IsRunning() {
return fmt.Errorf("cannot start informer: already running")
}
i.SetRunning(true)
i.setRunning(true)
i.logger.Debug("Starting informer goroutine")
go i.informer.Run(i.runch)
return nil
}

// Stop stops the GenericInformer and terminates the associated goroutine.
func (i *GenericInformer) Stop() error {
i.mutex.Lock()
defer i.mutex.Unlock()
Expand All @@ -264,10 +277,11 @@ func (i *GenericInformer) Stop() error {
}
i.logger.Debug("Stopping informer")
close(i.runch)
i.SetRunning(false)
i.setRunning(false)
return nil
}

// AddNamespace adds a namespace to the list of allowed namespaces.
func (i *GenericInformer) AddNamespace(namespace string) error {
i.mutex.Lock()
defer i.mutex.Unlock()
Expand All @@ -278,6 +292,7 @@ func (i *GenericInformer) AddNamespace(namespace string) error {
return nil
}

// RemoveNamespace removes a namespace from the list of allowed namespaces
func (i *GenericInformer) RemoveNamespace(namespace string) error {
i.mutex.Lock()
defer i.mutex.Unlock()
Expand All @@ -304,35 +319,15 @@ func (i *GenericInformer) watchAndListNamespace() string {

// isNamespaceAllowed returns whether the namespace of an event's object is
// permitted.
func (i *GenericInformer) isNamespaceAllowed(un unstructured.Unstructured) bool {
func (i *GenericInformer) isNamespaceAllowed(obj v1.Object) bool {
if len(i.namespaces) == 0 {
return true
}
_, ok := i.namespaces[un.GetNamespace()]
_, ok := i.namespaces[obj.GetNamespace()]
return ok
}

// toUnstructured converts any resource implementing runtime.Object to an
// unstructured data type and returns it. If the resource could not be
// converted, returns the appropriate error.
func toUnstructured(obj interface{}) (unstructured.Unstructured, error) {
var ext runtime.RawExtension
var scope conversion.Scope
o, ok := obj.(runtime.Object)
if !ok {
return unstructured.Unstructured{}, fmt.Errorf("failed type assertion to runtime.Object")
}
if err := runtime.Convert_runtime_Object_To_runtime_RawExtension(&o, &ext, scope); err != nil {
return unstructured.Unstructured{}, err
}
if ro, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj); err != nil {
return unstructured.Unstructured{}, err
} else {
uo := unstructured.Unstructured{Object: ro}
return uo, nil
}
}

// Indexer returns the underlying shared informer's indexer
func (i *GenericInformer) Indexer() cache.Indexer {
return i.informer.GetIndexer()
}
Expand Down

0 comments on commit 93990cd

Please sign in to comment.