Skip to content

Commit

Permalink
fix: use non-ratelimiting informers workqueue (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
saumas authored Aug 13, 2021
1 parent e59a483 commit af10c3c
Showing 1 changed file with 65 additions and 17 deletions.
82 changes: 65 additions & 17 deletions internal/services/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"errors"
"reflect"
"regexp"
"sync"
Expand Down Expand Up @@ -37,7 +38,7 @@ type Controller struct {
clusterID string
castaiclient castai.Client
provider types.Provider
queue workqueue.RateLimitingInterface
queue workqueue.Interface
interval time.Duration
prepDuration time.Duration
informers map[reflect.Type]cache.SharedInformer
Expand Down Expand Up @@ -87,7 +88,7 @@ func New(
prepDuration: prepDuration,
delta: newDelta(log, clusterID, v.Full()),
spotCache: map[string]bool{},
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "castai-agent"),
queue: workqueue.NewNamed("castai-agent"),
informers: typeInformerMap,
agentVersion: agentVersion,
}
Expand Down Expand Up @@ -170,7 +171,7 @@ func (c *Controller) nodeDeleteHandler(log logrus.FieldLogger, event event, obj

func genericHandler(
log logrus.FieldLogger,
queue workqueue.RateLimitingInterface,
queue workqueue.Interface,
expected reflect.Type,
event event,
obj interface{},
Expand Down Expand Up @@ -288,8 +289,10 @@ func (c *Controller) Run(ctx context.Context) {
}()

go func() {
c.log.Info("collecting initial cluster snapshot")
time.Sleep(c.prepDuration)
if err := c.collectInitialSnapshot(ctx); err != nil {
c.log.Errorf("error while collecting initial snapshot: %v", err)
}

c.log.Infof("sending cluster deltas every %s", c.interval)
wait.Until(func() {
c.send(ctx)
Expand All @@ -301,26 +304,71 @@ func (c *Controller) Run(ctx context.Context) {
c.queue.ShutDown()
}()

c.pollQueueUntilDone()
c.pollQueueUntilShutdown()
}

// collectInitialSnapshot is used to add a time buffer to collect the initial snapshot which is larger than periodic
// delta because it contains a significant portion of the Kubernetes state. The function has multiple exit points:
// 1. Exits when the workqueue.Interface reports queue length of 0.
// 2. The deadline of prepDuration has expired.
func (c *Controller) collectInitialSnapshot(ctx context.Context) error {
c.log.Info("collecting initial cluster snapshot")
start := time.Now().UTC()
deadline := start.Add(c.prepDuration)

cond := func() (done bool, err error) {
defer func() {
if !done {
return
}
c.log.Infof("done waiting for initial cluster snapshot collection after %v", time.Now().UTC().Sub(start))
}()

queueLen := c.queue.Len()
log := c.log.WithField("queue_length", queueLen)
log.Debug("waiting until initial queue empty")

if queueLen == 0 {
log.Debug("initial workqueue is empty")
return true, nil
}

if time.Now().UTC().After(deadline) {
log.Debug("initial cluster snapshot collection deadline reached")
return true, nil
}

return false, nil
}

if err := wait.PollImmediateUntil(time.Second, cond, ctx.Done()); err != nil && !errors.Is(err, wait.ErrWaitTimeout) {
return err
}
return nil
}

func (c *Controller) pollQueueUntilDone() {
func (c *Controller) pollQueueUntilShutdown() {
for {
i, done := c.queue.Get()
if done {
i, shutdown := c.queue.Get()
if shutdown {
return
}
c.processItem(i)
}
}

di, ok := i.(*item)
if !ok {
c.log.Errorf("expected queue item to be of type %T but got %T", &item{}, i)
continue
}
func (c *Controller) processItem(i interface{}) {
defer c.queue.Done(i)

c.mu.Lock()
c.delta.add(di)
c.mu.Unlock()
di, ok := i.(*item)
if !ok {
c.log.Errorf("expected queue item to be of type %T but got %T", &item{}, i)
return
}

c.mu.Lock()
c.delta.add(di)
c.mu.Unlock()
}

func (c *Controller) send(ctx context.Context) {
Expand Down

0 comments on commit af10c3c

Please sign in to comment.