From ee6fcfe6c308b1bb7205dcd9d778c578f0835891 Mon Sep 17 00:00:00 2001 From: Guilherme Cassolato Date: Fri, 4 Oct 2024 14:37:58 +0200 Subject: [PATCH] controller: do not propagate cache events when there were no updates to the snapshot Signed-off-by: Guilherme Cassolato --- controller/controller.go | 101 +++++++++++++++++++--------------- controller/controller_test.go | 22 ++++++++ 2 files changed, 80 insertions(+), 43 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index 13f112b..884d829 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -9,6 +9,7 @@ import ( "github.com/go-logr/logr" "github.com/samber/lo" + "github.com/telepresenceio/watchable" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" @@ -264,54 +265,68 @@ func (c *Controller) propagate(resourceEvents []ResourceEvent) { } func (c *Controller) subscribe(ctx context.Context) { - oldObjs := make(Store) + // init and subscribe resource store + c.cache.LoadOrStore(resourceStoreId, Store{}) subscription := c.cache.SubscribeSubset(ctx, func(storeId string, _ Store) bool { return storeId == resourceStoreId }) + // handle cache events + objs := make(Store) go func() { for snapshot := range subscription { - c.Lock() - - newObjs := snapshot.State[resourceStoreId] - - events := lo.FilterMap(lo.Keys(newObjs), func(uid string, _ int) (ResourceEvent, bool) { - newObj := newObjs[uid] - event := ResourceEvent{ - Kind: newObj.GetObjectKind().GroupVersionKind().GroupKind(), - NewObject: newObj, - } - if oldObj, exists := oldObjs[uid]; !exists { - event.EventType = CreateEvent - oldObjs[uid] = newObj - return event, true - } else if !reflect.DeepEqual(oldObj, newObj) { - event.EventType = UpdateEvent - event.OldObject = oldObj - oldObjs[uid] = newObj - return event, true - } - return event, false - }) - - deleteEvents := lo.FilterMap(lo.Keys(oldObjs), func(uid string, _ int) (ResourceEvent, bool) { - oldObj := oldObjs[uid] - event := ResourceEvent{ - EventType: DeleteEvent, - Kind: oldObj.GetObjectKind().GroupVersionKind().GroupKind(), - OldObject: oldObj, - } - _, exists := newObjs[uid] - if !exists { - delete(oldObjs, uid) - } - return event, !exists - }) - - events = append(events, deleteEvents...) - - c.propagate(events) - - c.Unlock() + objs = c.handleCacheEvent(snapshot, objs) } }() } + +func (c *Controller) handleCacheEvent(snapshot watchable.Snapshot[string, Store], objs Store) Store { + c.Lock() + defer c.Unlock() + + if len(snapshot.Updates) == 0 { + return objs + } + + newObjs := snapshot.State[resourceStoreId] + + events := lo.FilterMap(lo.Keys(newObjs), func(uid string, _ int) (ResourceEvent, bool) { + newObj := newObjs[uid] + event := ResourceEvent{ + Kind: newObj.GetObjectKind().GroupVersionKind().GroupKind(), + NewObject: newObj, + } + if obj, exists := objs[uid]; !exists { + event.EventType = CreateEvent + objs[uid] = newObj + return event, true + } else if !reflect.DeepEqual(obj, newObj) { + event.EventType = UpdateEvent + event.OldObject = obj + objs[uid] = newObj + return event, true + } + return event, false + }) + + deleteEvents := lo.FilterMap(lo.Keys(objs), func(uid string, _ int) (ResourceEvent, bool) { + obj := objs[uid] + event := ResourceEvent{ + EventType: DeleteEvent, + Kind: obj.GetObjectKind().GroupVersionKind().GroupKind(), + OldObject: obj, + } + _, exists := newObjs[uid] + if !exists { + delete(objs, uid) + } + return event, !exists + }) + + events = append(events, deleteEvents...) + + if len(events) > 0 { // this condition is actually redundant; if the snapshot has updates, there must be events + c.propagate(events) + } + + return objs +} diff --git a/controller/controller_test.go b/controller/controller_test.go index 4042ebf..b15823b 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -225,3 +225,25 @@ func TestStartControllerUnmanaged(t *testing.T) { }() time.Sleep(3 * time.Second) } + +func TestCacheSubscription(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + count := 0 + c := NewController(WithReconcile(func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map) error { + count++ + return nil + })) + + c.subscribe(ctx) + time.Sleep(1 * time.Second) + if count != 0 { + t.Errorf("expected no reconcile call, got %d", count) + } + + c.add(&corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "test-service", UID: "7ed703a2-635d-4002-a825-5624823760a5"}}) + time.Sleep(1 * time.Second) + if count != 1 { + t.Errorf("expected 1 reconcile call, got %d", count) + } +}