diff --git a/go.mod b/go.mod index 3bb4c2b0..00d0b44a 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/stretchr/testify v1.8.4 go.uber.org/goleak v1.2.1 golang.org/x/net v0.18.0 + golang.org/x/sync v0.4.0 k8s.io/api v0.28.3 k8s.io/apimachinery v0.28.3 k8s.io/client-go v0.28.3 diff --git a/internal/services/controller/controller.go b/internal/services/controller/controller.go index bf852bf0..50a87a8a 100644 --- a/internal/services/controller/controller.go +++ b/internal/services/controller/controller.go @@ -19,6 +19,7 @@ import ( karpenter "github.com/aws/karpenter/pkg/apis/v1beta1" "github.com/samber/lo" "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" appsv1 "k8s.io/api/apps/v1" authorizationv1 "k8s.io/api/authorization/v1" autoscalingv1 "k8s.io/api/autoscaling/v1" @@ -157,7 +158,7 @@ func New( func (c *Controller) Run(ctx context.Context) error { defer c.queue.ShutDown() - + g := new(errgroup.Group) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -206,15 +207,15 @@ func (c *Controller) Run(ctx context.Context) error { }, dur, ctx.Done()) }() - go func() { + g.Go(func() error { if err := c.collectInitialSnapshot(ctx); err != nil { const maxItems = 5 queueContent := c.debugQueueContent(maxItems) log := c.log.WithField("queue_content", queueContent) - // Crash agent in case it's not able to collect full snapshot from informers cache. - // TODO (CO-1632): refactor crashing to "normal" exit or healthz metric; abruptly - // stopping the agent does not give it a chance to release leader lock. - log.Fatalf("error while collecting initial snapshot: %v", err) + log.Errorf("error while collecting initial snapshot: %v", err) + c.log.Infof("restarting agent after failure to collect initial snapshot") + c.triggerRestart() + return err } // Since both initial snapshot collection and event handlers writes to the same delta queue add @@ -228,7 +229,8 @@ func (c *Controller) Run(ctx context.Context) error { wait.Until(func() { c.send(ctx) }, c.cfg.Interval, ctx.Done()) - }() + return nil + }) go c.startConditionalInformersWithWatcher(ctx, c.conditionalInformers) @@ -239,7 +241,7 @@ func (c *Controller) Run(ctx context.Context) error { c.pollQueueUntilShutdown() - return nil + return g.Wait() } func (c *Controller) startConditionalInformersWithWatcher(ctx context.Context, conditionalInformers []conditionalInformer) {