Skip to content

Commit

Permalink
feat: debounce delta items (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
saumas authored Jun 15, 2021
1 parent 0523aeb commit ff7035b
Show file tree
Hide file tree
Showing 4 changed files with 417 additions and 54 deletions.
79 changes: 25 additions & 54 deletions internal/services/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@ package controller

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"reflect"
"strings"
"sync"
"time"

Expand All @@ -15,6 +11,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
Expand All @@ -29,14 +26,15 @@ import (

type Controller struct {
log logrus.FieldLogger
clusterID string
castaiclient castai.Client
provider types.Provider
queue workqueue.RateLimitingInterface
interval time.Duration
prepDuration time.Duration
informers map[reflect.Type]cache.SharedInformer

delta *castai.Delta
delta *delta
mu sync.Mutex
spotCache map[string]bool
agentVersion *config.AgentVersion
Expand Down Expand Up @@ -74,11 +72,12 @@ func New(

c := &Controller{
log: log,
clusterID: clusterID,
castaiclient: castaiclient,
provider: provider,
interval: interval,
prepDuration: prepDuration,
delta: &castai.Delta{ClusterID: clusterID, ClusterVersion: v.Full(), FullSnapshot: true},
delta: newDelta(log, clusterID, v.Full()),
spotCache: map[string]bool{},
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "castai-agent"),
informers: typeInformerMap,
Expand All @@ -95,25 +94,25 @@ func New(
if typ == reflect.TypeOf(&corev1.Node{}) {
h = cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.nodeAddHandler(log, castai.EventAdd, obj)
c.nodeAddHandler(log, eventAdd, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
c.nodeAddHandler(log, castai.EventUpdate, newObj)
c.nodeAddHandler(log, eventUpdate, newObj)
},
DeleteFunc: func(obj interface{}) {
c.nodeDeleteHandler(log, castai.EventDelete, obj)
c.nodeDeleteHandler(log, eventDelete, obj)
},
}
} else {
h = cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
genericHandler(log, c.queue, typ, castai.EventAdd, obj)
genericHandler(log, c.queue, typ, eventAdd, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
genericHandler(log, c.queue, typ, castai.EventUpdate, newObj)
genericHandler(log, c.queue, typ, eventUpdate, newObj)
},
DeleteFunc: func(obj interface{}) {
genericHandler(log, c.queue, typ, castai.EventDelete, obj)
genericHandler(log, c.queue, typ, eventDelete, obj)
},
}
}
Expand All @@ -124,11 +123,7 @@ func New(
return c
}

func (c *Controller) nodeAddHandler(
log logrus.FieldLogger,
event castai.EventType,
obj interface{},
) {
func (c *Controller) nodeAddHandler(log logrus.FieldLogger, event event, obj interface{}) {
node, ok := obj.(*corev1.Node)
if !ok {
log.Errorf("expected to get *corev1.Node but got %T", obj)
Expand All @@ -153,11 +148,7 @@ func (c *Controller) nodeAddHandler(
genericHandler(log, c.queue, reflect.TypeOf(&corev1.Node{}), event, node)
}

func (c *Controller) nodeDeleteHandler(
log logrus.FieldLogger,
event castai.EventType,
obj interface{},
) {
func (c *Controller) nodeDeleteHandler(log logrus.FieldLogger, event event, obj interface{}) {
node, ok := obj.(*corev1.Node)
if !ok {
log.Errorf("expected to get *corev1.Node but got %T", obj)
Expand All @@ -173,39 +164,20 @@ func genericHandler(
log logrus.FieldLogger,
queue workqueue.RateLimitingInterface,
expected reflect.Type,
event castai.EventType,
event event,
obj interface{},
) {
if reflect.TypeOf(obj) != expected {
log.Errorf("expected to get %v but got %T", expected, obj)
return
}

typeName := expected.String()
kind := typeName[strings.LastIndex(typeName, ".")+1:]

data, err := encode(obj)
if err != nil {
log.Errorf("failed to encode %T: %v", obj, err)
return
}

queue.Add(&castai.DeltaItem{
Event: event,
Kind: kind,
Data: data,
CreatedAt: time.Now().UTC(),
queue.Add(&item{
obj: obj.(runtime.Object),
event: event,
})
}

func encode(obj interface{}) (string, error) {
b, err := json.Marshal(obj)
if err != nil {
return "", fmt.Errorf("marshaling %T to json: %v", obj, err)
}
return base64.StdEncoding.EncodeToString(b), nil
}

func (c *Controller) Run(ctx context.Context) {
defer c.queue.ShutDown()

Expand All @@ -230,13 +202,13 @@ func (c *Controller) Run(ctx context.Context) {
AgentVersion: c.agentVersion.Version,
GitCommit: c.agentVersion.GitCommit,
}
cfg, err := c.castaiclient.ExchangeAgentTelemetry(ctx, c.delta.ClusterID, req)
cfg, err := c.castaiclient.ExchangeAgentTelemetry(ctx, c.clusterID, req)
if err != nil {
c.log.Errorf("failed getting agent configuration: %v", err)
return
}
// Resync only when at least one full snapshot has already been sent.
if cfg.Resync && !c.delta.FullSnapshot {
if cfg.Resync && !c.delta.fullSnapshot {
c.log.Info("restarting controller to resync data")
cancel()
}
Expand All @@ -262,19 +234,19 @@ func (c *Controller) Run(ctx context.Context) {

func (c *Controller) pollQueueUntilDone() {
for {
item, done := c.queue.Get()
i, done := c.queue.Get()
if done {
return
}

di, ok := item.(*castai.DeltaItem)
di, ok := i.(*item)
if !ok {
c.log.Errorf("expected queue item to be of type %T but got %T", &castai.DeltaItem{}, item)
c.log.Errorf("expected queue item to be of type %T but got %T", &item{}, i)
continue
}

c.mu.Lock()
c.delta.Items = append(c.delta.Items, di)
c.delta.add(di)
c.mu.Unlock()
}
}
Expand All @@ -283,11 +255,10 @@ func (c *Controller) send(ctx context.Context) {
c.mu.Lock()
defer c.mu.Unlock()

if err := c.castaiclient.SendDelta(ctx, c.delta); err != nil {
if err := c.castaiclient.SendDelta(ctx, c.delta.toCASTAIRequest()); err != nil {
c.log.Errorf("failed sending delta: %v", err)
return
}

c.delta.Items = nil
c.delta.FullSnapshot = false
c.delta.clear()
}
148 changes: 148 additions & 0 deletions internal/services/controller/delta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package controller

import (
"encoding/base64"
"encoding/json"
"fmt"
"reflect"
"time"

"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

"castai-agent/internal/castai"
)

// newDelta initializes the delta struct which is used to collect cluster deltas, debounce them and map to CASTAI
// requests.
func newDelta(log logrus.FieldLogger, clusterID, clusterVersion string) *delta {
return &delta{
log: log,
clusterID: clusterID,
clusterVersion: clusterVersion,
fullSnapshot: true,
cache: map[string]*item{},
}
}

// delta is used to colelct cluster deltas, debounce them and map to CASTAI requests. It holds a cache of queue items
// which is referenced any time a new item is added to debounce the items.
type delta struct {
log logrus.FieldLogger
clusterID string
clusterVersion string
fullSnapshot bool
cache map[string]*item
}

// add will add an item to the delta cache. It will debounce the objects.
func (d *delta) add(i *item) {
key := mustKeyObject(i.obj)

if other, ok := d.cache[key]; ok && other.event == eventAdd && i.event == eventDelete {
delete(d.cache, key)
} else if ok && other.event == eventAdd && i.event == eventUpdate {
i.event = eventAdd
d.cache[key] = i
} else if ok && other.event == eventDelete && (i.event == eventAdd || i.event == eventUpdate) {
i.event = eventUpdate
d.cache[key] = i
} else {
d.cache[key] = i
}
}

// clear resets the delta cache and sets fullSnapshot to false. Should be called after toCASTAIRequest is successfully
// delivered.
func (d *delta) clear() {
d.fullSnapshot = false
d.cache = map[string]*item{}
}

// toCASTAIRequest maps the collected delta cache to the castai.Delta type.
func (d *delta) toCASTAIRequest() *castai.Delta {
var items []*castai.DeltaItem

for _, i := range d.cache {
data, err := encode(i.obj)
if err != nil {
d.log.Errorf("failed to encode %T: %v", i.obj, err)
continue
}

kinds, _, err := scheme.ObjectKinds(i.obj)
if err != nil {
d.log.Errorf("failed to find object %T kind: %v", i.obj, err)
continue
}
if len(kinds) == 0 || kinds[0].Kind == "" {
d.log.Errorf("unknown object kind for object %T", i.obj)
continue
}

items = append(items, &castai.DeltaItem{
Event: i.event.toCASTAIEvent(),
Kind: kinds[0].Kind,
Data: data,
CreatedAt: time.Now().UTC(),
})
}

return &castai.Delta{
ClusterID: d.clusterID,
ClusterVersion: d.clusterVersion,
FullSnapshot: d.fullSnapshot,
Items: items,
}
}

func encode(obj interface{}) (string, error) {
b, err := json.Marshal(obj)
if err != nil {
return "", fmt.Errorf("marshaling %T to json: %v", obj, err)
}
return base64.StdEncoding.EncodeToString(b), nil
}

type item struct {
obj runtime.Object
event event
}

type event string

const (
eventAdd event = "add"
eventDelete event = "delete"
eventUpdate event = "update"
)

func (e event) toCASTAIEvent() castai.EventType {
switch e {
case eventAdd:
return castai.EventAdd
case eventDelete:
return castai.EventDelete
case eventUpdate:
return castai.EventUpdate
}
return ""
}

// keyObject generates a unique key for an object, for example: `*v1.Pod::namespace/name`.
func keyObject(obj runtime.Object) (string, error) {
metaObj, ok := obj.(metav1.Object)
if !ok {
return "", fmt.Errorf("expected object of type %T to implement metav1.Object", obj)
}
return fmt.Sprintf("%s::%s/%s", reflect.TypeOf(obj).String(), metaObj.GetNamespace(), metaObj.GetName()), nil
}

func mustKeyObject(obj runtime.Object) string {
k, err := keyObject(obj)
if err != nil {
panic(fmt.Errorf("getting object key: %w", err))
}
return k
}
Loading

0 comments on commit ff7035b

Please sign in to comment.