From d49c2ed4dc02aa744b5264eadca6a68cc5186693 Mon Sep 17 00:00:00 2001 From: Pijus Navickas Date: Thu, 7 Nov 2024 10:32:32 +0200 Subject: [PATCH] Revert "feat: TypeMeta as primary object Kind value source (#198)" This reverts commit ab924828a62ac19a5f0dbed6caecbd31863af46b. --- internal/services/controller/controller.go | 4 +- internal/services/controller/delta/delta.go | 126 +++++++++++++++++++- 2 files changed, 123 insertions(+), 7 deletions(-) diff --git a/internal/services/controller/controller.go b/internal/services/controller/controller.go index 0f4939a..c079f92 100644 --- a/internal/services/controller/controller.go +++ b/internal/services/controller/controller.go @@ -180,7 +180,7 @@ func CollectSingleSnapshot(ctx context.Context, return nil, err } - log.Debugf("synced %d items", len(d.Cache)) + log.Debugf("synced %d items", len(d.CacheLegacy)) return d.ToCASTAIRequest(), nil } @@ -514,7 +514,7 @@ func (c *Controller) send(ctx context.Context) { nodesByName := map[string]*corev1.Node{} var nodes []*corev1.Node - for _, item := range c.delta.Cache { + for _, item := range c.delta.CacheLegacy { n, ok := item.Obj.(*corev1.Node) if !ok { continue diff --git a/internal/services/controller/delta/delta.go b/internal/services/controller/delta/delta.go index 0175c58..133f846 100644 --- a/internal/services/controller/delta/delta.go +++ b/internal/services/controller/delta/delta.go @@ -1,8 +1,10 @@ package delta import ( + "crypto/sha256" "encoding/json" "fmt" + "reflect" "time" "github.com/sirupsen/logrus" @@ -23,7 +25,8 @@ func New(log logrus.FieldLogger, clusterID, clusterVersion, agentVersion string) clusterVersion: clusterVersion, agentVersion: agentVersion, FullSnapshot: true, - Cache: map[string]*Item{}, + CacheLegacy: map[string]*Item{}, + CacheModern: map[string]*Item{}, } } @@ -35,12 +38,18 @@ type Delta struct { clusterVersion string agentVersion string FullSnapshot bool - Cache map[string]*Item + CacheLegacy map[string]*Item + CacheModern map[string]*Item } // Add will add an Item to the Delta Cache. It will debounce the objects. func (d *Delta) Add(i *Item) { - cache := d.Cache + d.addToLegacy(i) + d.addToModern(i) +} + +func (d *Delta) addToModern(i *Item) { + cache := d.CacheModern if len(i.kind) == 0 { gvk, err := determineObjectGVK(i.Obj) @@ -64,6 +73,26 @@ func (d *Delta) Add(i *Item) { } } +func (d *Delta) addToLegacy(i *Item) { + cache := d.CacheLegacy + + key := keyObject(i.Obj) + + if other, ok := cache[key]; ok && other.event == castai.EventAdd && i.event == castai.EventUpdate { + i.event = castai.EventAdd + cache[key] = i + } else if ok && other.event == castai.EventDelete && (i.event == castai.EventAdd || i.event == castai.EventUpdate) { + i.event = castai.EventUpdate + cache[key] = i + } else { + cache[key] = i + } +} + +func keyObject(obj Object) string { + return fmt.Sprintf("%s::%s/%s", reflect.TypeOf(obj).String(), obj.GetNamespace(), obj.GetName()) +} + func itemCacheKey(i *Item) string { return fmt.Sprintf("%s::%s/%s", i.kind, i.Obj.GetNamespace(), i.Obj.GetName()) } @@ -72,14 +101,22 @@ func itemCacheKey(i *Item) string { // delivered. func (d *Delta) Clear() { d.FullSnapshot = false - d.Cache = map[string]*Item{} + d.CacheLegacy = map[string]*Item{} + d.CacheModern = map[string]*Item{} } // ToCASTAIRequest maps the collected Delta Cache to the castai.Delta type. func (d *Delta) ToCASTAIRequest() *castai.Delta { + resultLegacy := d.toCASTAIRequestLegacy() + resultModern := d.toCASTAIRequestModern() + logMismatches(d.log, resultLegacy, resultModern) + return resultLegacy +} + +func (d *Delta) toCASTAIRequestModern() *castai.Delta { var items []*castai.DeltaItem - for _, i := range d.Cache { + for _, i := range d.CacheModern { data, err := Encode(i.Obj) if err != nil { d.log.Errorf("failed to encode %T: %v", i.Obj, err) @@ -102,6 +139,43 @@ func (d *Delta) ToCASTAIRequest() *castai.Delta { } } +func (d *Delta) toCASTAIRequestLegacy() *castai.Delta { + var items []*castai.DeltaItem + + for _, i := range d.CacheLegacy { + data, err := Encode(i.Obj) + if err != nil { + d.log.Errorf("failed to encode %T: %v", i.Obj, err) + continue + } + + kinds, _, err := scheme.Scheme.ObjectKinds(i.Obj) + if err != nil { + d.log.Errorf("failed to find Object %T kind: %v", i.Obj, err) + continue + } + if len(kinds) == 0 || kinds[0].Kind == "" { + d.log.Errorf("unknown Object kind for Object %T", i.Obj) + continue + } + + items = append(items, &castai.DeltaItem{ + Event: i.event, + Kind: kinds[0].Kind, + Data: data, + CreatedAt: time.Now().UTC(), + }) + } + + return &castai.Delta{ + ClusterID: d.clusterID, + ClusterVersion: d.clusterVersion, + AgentVersion: d.agentVersion, + FullSnapshot: d.FullSnapshot, + Items: items, + } +} + func Encode(obj interface{}) (*json.RawMessage, error) { b, err := json.Marshal(obj) if err != nil { @@ -132,6 +206,48 @@ func determineObjectGVK(obj runtime.Object) (schema.GroupVersionKind, error) { return kinds[0], nil } +func logMismatches(log logrus.FieldLogger, legacy *castai.Delta, modern *castai.Delta) { + if len(legacy.Items) != len(modern.Items) { + log.Warnf("delta_modern number of items mismatch: %d legacy.Items vs %d modern.Items", len(legacy.Items), len(modern.Items)) + } + + checkLegacy := toChecksumMap(legacy) + checkModern := toChecksumMap(modern) + + for key, _ := range checkLegacy { + if checkModern[key] == nil { + log.Warnf("delta_modern item mismatch: legacy list has item that is missing from modern list: %s", key) + } + delete(checkLegacy, key) + delete(checkModern, key) + } + + for key, _ := range checkModern { + if checkLegacy[key] == nil { + log.Warnf("delta_modern item mismatch: modern list has item that is missing from legacy list: %s", key) + } + delete(checkLegacy, key) + delete(checkModern, key) + } +} + +func toChecksumMap(delta *castai.Delta) map[string]*castai.DeltaItem { + out := map[string]*castai.DeltaItem{} + for _, i := range delta.Items { + var hash string + if i.Data != nil { + hash = sha256hash(*i.Data) + } + key := fmt.Sprintf("%s-%s-%s", i.Event, i.Kind, hash) + out[key] = i + } + return out +} + +func sha256hash(data []byte) string { + return fmt.Sprintf("%x", sha256.Sum256(data)) +} + type Object interface { runtime.Object metav1.Object