diff --git a/internal/services/controller/controller.go b/internal/services/controller/controller.go index 78293c5c..8c538ff0 100644 --- a/internal/services/controller/controller.go +++ b/internal/services/controller/controller.go @@ -2,6 +2,7 @@ package controller import ( "context" + "errors" "reflect" "regexp" "sync" @@ -37,7 +38,7 @@ type Controller struct { clusterID string castaiclient castai.Client provider types.Provider - queue workqueue.RateLimitingInterface + queue workqueue.Interface interval time.Duration prepDuration time.Duration informers map[reflect.Type]cache.SharedInformer @@ -87,7 +88,7 @@ func New( prepDuration: prepDuration, delta: newDelta(log, clusterID, v.Full()), spotCache: map[string]bool{}, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "castai-agent"), + queue: workqueue.NewNamed("castai-agent"), informers: typeInformerMap, agentVersion: agentVersion, } @@ -170,7 +171,7 @@ func (c *Controller) nodeDeleteHandler(log logrus.FieldLogger, event event, obj func genericHandler( log logrus.FieldLogger, - queue workqueue.RateLimitingInterface, + queue workqueue.Interface, expected reflect.Type, event event, obj interface{}, @@ -288,8 +289,10 @@ func (c *Controller) Run(ctx context.Context) { }() go func() { - c.log.Info("collecting initial cluster snapshot") - time.Sleep(c.prepDuration) + if err := c.collectInitialSnapshot(ctx); err != nil { + c.log.Errorf("error while collecting initial snapshot: %v", err) + } + c.log.Infof("sending cluster deltas every %s", c.interval) wait.Until(func() { c.send(ctx) @@ -301,26 +304,71 @@ func (c *Controller) Run(ctx context.Context) { c.queue.ShutDown() }() - c.pollQueueUntilDone() + c.pollQueueUntilShutdown() +} + +// collectInitialSnapshot is used to add a time buffer to collect the initial snapshot which is larger than periodic +// delta because it contains a significant portion of the Kubernetes state. The function has multiple exit points: +// 1. Exits when the workqueue.Interface reports queue length of 0. +// 2. The deadline of prepDuration has expired. +func (c *Controller) collectInitialSnapshot(ctx context.Context) error { + c.log.Info("collecting initial cluster snapshot") + start := time.Now().UTC() + deadline := start.Add(c.prepDuration) + + cond := func() (done bool, err error) { + defer func() { + if !done { + return + } + c.log.Infof("done waiting for initial cluster snapshot collection after %v", time.Now().UTC().Sub(start)) + }() + + queueLen := c.queue.Len() + log := c.log.WithField("queue_length", queueLen) + log.Debug("waiting until initial queue empty") + + if queueLen == 0 { + log.Debug("initial workqueue is empty") + return true, nil + } + + if time.Now().UTC().After(deadline) { + log.Debug("initial cluster snapshot collection deadline reached") + return true, nil + } + + return false, nil + } + + if err := wait.PollImmediateUntil(time.Second, cond, ctx.Done()); err != nil && !errors.Is(err, wait.ErrWaitTimeout) { + return err + } + return nil } -func (c *Controller) pollQueueUntilDone() { +func (c *Controller) pollQueueUntilShutdown() { for { - i, done := c.queue.Get() - if done { + i, shutdown := c.queue.Get() + if shutdown { return } + c.processItem(i) + } +} - di, ok := i.(*item) - if !ok { - c.log.Errorf("expected queue item to be of type %T but got %T", &item{}, i) - continue - } +func (c *Controller) processItem(i interface{}) { + defer c.queue.Done(i) - c.mu.Lock() - c.delta.add(di) - c.mu.Unlock() + di, ok := i.(*item) + if !ok { + c.log.Errorf("expected queue item to be of type %T but got %T", &item{}, i) + return } + + c.mu.Lock() + c.delta.add(di) + c.mu.Unlock() } func (c *Controller) send(ctx context.Context) {