Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "feat: TypeMeta as primary object Kind value source" #200

Merged
merged 1 commit into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/services/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func CollectSingleSnapshot(ctx context.Context,
return nil, err
}

log.Debugf("synced %d items", len(d.Cache))
log.Debugf("synced %d items", len(d.CacheLegacy))

return d.ToCASTAIRequest(), nil
}
Expand Down Expand Up @@ -514,7 +514,7 @@ func (c *Controller) send(ctx context.Context) {
nodesByName := map[string]*corev1.Node{}
var nodes []*corev1.Node

for _, item := range c.delta.Cache {
for _, item := range c.delta.CacheLegacy {
n, ok := item.Obj.(*corev1.Node)
if !ok {
continue
Expand Down
126 changes: 121 additions & 5 deletions internal/services/controller/delta/delta.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package delta

import (
"crypto/sha256"
"encoding/json"
"fmt"
"reflect"
"time"

"github.com/sirupsen/logrus"
Expand All @@ -23,7 +25,8 @@ func New(log logrus.FieldLogger, clusterID, clusterVersion, agentVersion string)
clusterVersion: clusterVersion,
agentVersion: agentVersion,
FullSnapshot: true,
Cache: map[string]*Item{},
CacheLegacy: map[string]*Item{},
CacheModern: map[string]*Item{},
}
}

Expand All @@ -35,12 +38,18 @@ type Delta struct {
clusterVersion string
agentVersion string
FullSnapshot bool
Cache map[string]*Item
CacheLegacy map[string]*Item
CacheModern map[string]*Item
}

// Add will add an Item to the Delta Cache. It will debounce the objects.
func (d *Delta) Add(i *Item) {
cache := d.Cache
d.addToLegacy(i)
d.addToModern(i)
}

func (d *Delta) addToModern(i *Item) {
cache := d.CacheModern

if len(i.kind) == 0 {
gvk, err := determineObjectGVK(i.Obj)
Expand All @@ -64,6 +73,26 @@ func (d *Delta) Add(i *Item) {
}
}

func (d *Delta) addToLegacy(i *Item) {
cache := d.CacheLegacy

key := keyObject(i.Obj)

if other, ok := cache[key]; ok && other.event == castai.EventAdd && i.event == castai.EventUpdate {
i.event = castai.EventAdd
cache[key] = i
} else if ok && other.event == castai.EventDelete && (i.event == castai.EventAdd || i.event == castai.EventUpdate) {
i.event = castai.EventUpdate
cache[key] = i
} else {
cache[key] = i
}
}

func keyObject(obj Object) string {
return fmt.Sprintf("%s::%s/%s", reflect.TypeOf(obj).String(), obj.GetNamespace(), obj.GetName())
}

func itemCacheKey(i *Item) string {
return fmt.Sprintf("%s::%s/%s", i.kind, i.Obj.GetNamespace(), i.Obj.GetName())
}
Expand All @@ -72,14 +101,22 @@ func itemCacheKey(i *Item) string {
// delivered.
func (d *Delta) Clear() {
d.FullSnapshot = false
d.Cache = map[string]*Item{}
d.CacheLegacy = map[string]*Item{}
d.CacheModern = map[string]*Item{}
}

// ToCASTAIRequest maps the collected Delta Cache to the castai.Delta type.
func (d *Delta) ToCASTAIRequest() *castai.Delta {
resultLegacy := d.toCASTAIRequestLegacy()
resultModern := d.toCASTAIRequestModern()
logMismatches(d.log, resultLegacy, resultModern)
return resultLegacy
}

func (d *Delta) toCASTAIRequestModern() *castai.Delta {
var items []*castai.DeltaItem

for _, i := range d.Cache {
for _, i := range d.CacheModern {
data, err := Encode(i.Obj)
if err != nil {
d.log.Errorf("failed to encode %T: %v", i.Obj, err)
Expand All @@ -102,6 +139,43 @@ func (d *Delta) ToCASTAIRequest() *castai.Delta {
}
}

func (d *Delta) toCASTAIRequestLegacy() *castai.Delta {
var items []*castai.DeltaItem

for _, i := range d.CacheLegacy {
data, err := Encode(i.Obj)
if err != nil {
d.log.Errorf("failed to encode %T: %v", i.Obj, err)
continue
}

kinds, _, err := scheme.Scheme.ObjectKinds(i.Obj)
if err != nil {
d.log.Errorf("failed to find Object %T kind: %v", i.Obj, err)
continue
}
if len(kinds) == 0 || kinds[0].Kind == "" {
d.log.Errorf("unknown Object kind for Object %T", i.Obj)
continue
}

items = append(items, &castai.DeltaItem{
Event: i.event,
Kind: kinds[0].Kind,
Data: data,
CreatedAt: time.Now().UTC(),
})
}

return &castai.Delta{
ClusterID: d.clusterID,
ClusterVersion: d.clusterVersion,
AgentVersion: d.agentVersion,
FullSnapshot: d.FullSnapshot,
Items: items,
}
}

func Encode(obj interface{}) (*json.RawMessage, error) {
b, err := json.Marshal(obj)
if err != nil {
Expand Down Expand Up @@ -132,6 +206,48 @@ func determineObjectGVK(obj runtime.Object) (schema.GroupVersionKind, error) {
return kinds[0], nil
}

func logMismatches(log logrus.FieldLogger, legacy *castai.Delta, modern *castai.Delta) {
if len(legacy.Items) != len(modern.Items) {
log.Warnf("delta_modern number of items mismatch: %d legacy.Items vs %d modern.Items", len(legacy.Items), len(modern.Items))
}

checkLegacy := toChecksumMap(legacy)
checkModern := toChecksumMap(modern)

for key, _ := range checkLegacy {
if checkModern[key] == nil {
log.Warnf("delta_modern item mismatch: legacy list has item that is missing from modern list: %s", key)
}
delete(checkLegacy, key)
delete(checkModern, key)
}

for key, _ := range checkModern {
if checkLegacy[key] == nil {
log.Warnf("delta_modern item mismatch: modern list has item that is missing from legacy list: %s", key)
}
delete(checkLegacy, key)
delete(checkModern, key)
}
}

func toChecksumMap(delta *castai.Delta) map[string]*castai.DeltaItem {
out := map[string]*castai.DeltaItem{}
for _, i := range delta.Items {
var hash string
if i.Data != nil {
hash = sha256hash(*i.Data)
}
key := fmt.Sprintf("%s-%s-%s", i.Event, i.Kind, hash)
out[key] = i
}
return out
}

func sha256hash(data []byte) string {
return fmt.Sprintf("%x", sha256.Sum256(data))
}

type Object interface {
runtime.Object
metav1.Object
Expand Down
Loading