Skip to content

Commit

Permalink
Wait for autpoilot CRDs before starting Autopilot worker component
Browse files Browse the repository at this point in the history
Instead of having multiple brute-force retry loops in a row, just have
one that actually watches the CRDs and waits until it has observed
all the Autopilot CRDs to be established. This covers API server
reachability and CRD availability in one go.

It also ensures that controller-runtime registration is done only once.
Since controller-runtime v0.19.0, registering the same thing twice will
result in an error.

Signed-off-by: Tom Wieczorek <[email protected]>
  • Loading branch information
twz123 committed Jan 31, 2025
1 parent 06efc82 commit f0293f9
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 82 deletions.
66 changes: 22 additions & 44 deletions pkg/autopilot/controller/root_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,19 @@ package controller
import (
"context"
"fmt"
"time"

apcli "github.com/k0sproject/k0s/pkg/autopilot/client"
apdel "github.com/k0sproject/k0s/pkg/autopilot/controller/delegate"
aproot "github.com/k0sproject/k0s/pkg/autopilot/controller/root"
apsig "github.com/k0sproject/k0s/pkg/autopilot/controller/signal"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
k8sretry "k8s.io/client-go/util/retry"

cr "sigs.k8s.io/controller-runtime"
crman "sigs.k8s.io/controller-runtime/pkg/manager"
crmetricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
crwebhook "sigs.k8s.io/controller-runtime/pkg/webhook"

"github.com/avast/retry-go"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -71,49 +68,30 @@ func (w *rootWorker) Run(ctx context.Context) error {
HealthProbeBindAddress: w.cfg.HealthProbeBindAddr,
}

var mgr crman.Manager
if err := retry.Do(
func() (err error) {
mgr, err = cr.NewManager(w.clientFactory.RESTConfig(), managerOpts)
return err
},
retry.Context(ctx),
retry.LastErrorOnly(true),
retry.Delay(1*time.Second),
retry.OnRetry(func(attempt uint, err error) {
logger.WithError(err).Debugf("Failed to start controller manager in attempt #%d, retrying after backoff", attempt+1)
}),
); err != nil {
clusterID, err := w.getClusterID(ctx)
if err != nil {
return err
}

mgr, err := cr.NewManager(w.clientFactory.RESTConfig(), managerOpts)
if err != nil {
return fmt.Errorf("unable to start controller manager: %w", err)
}

// In some cases, we need to wait on the worker side until controller deploys all autopilot CRDs
return k8sretry.OnError(wait.Backoff{
Steps: 120,
Duration: 1 * time.Second,
Factor: 1.0,
Jitter: 0.1,
}, func(err error) bool {
return true
}, func() error {
clusterID, err := w.getClusterID(ctx)
if err != nil {
return err
}

if err := RegisterIndexers(ctx, mgr, "worker"); err != nil {
return fmt.Errorf("unable to register indexers: %w", err)
}

if err := apsig.RegisterControllers(ctx, logger, mgr, apdel.NodeControllerDelegate(), w.cfg.K0sDataDir, clusterID); err != nil {
return fmt.Errorf("unable to register 'controlnodes' controllers: %w", err)
}
// The controller-runtime start blocks until the context is cancelled.
if err := mgr.Start(ctx); err != nil {
return fmt.Errorf("unable to run controller-runtime manager for workers: %w", err)
}
return nil
})
if err := RegisterIndexers(ctx, mgr, "worker"); err != nil {
return fmt.Errorf("unable to register indexers: %w", err)
}

if err := apsig.RegisterControllers(ctx, logger, mgr, apdel.NodeControllerDelegate(), w.cfg.K0sDataDir, clusterID); err != nil {
return fmt.Errorf("unable to register 'controlnodes' controllers: %w", err)
}

// The controller-runtime start blocks until the context is cancelled.
if err := mgr.Start(ctx); err != nil {
return fmt.Errorf("unable to run controller-runtime manager for workers: %w", err)
}

return nil
}

func (w *rootWorker) getClusterID(ctx context.Context) (string, error) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/component/controller/systemrbac.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ rules:
- apiGroups: ["apps"]
resources: ["*"]
verbs: ["*"]
- apiGroups: [apiextensions.k8s.io]
resources: [customresourcedefinitions]
verbs: [get, list, watch]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down
170 changes: 132 additions & 38 deletions pkg/component/worker/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,81 +19,89 @@ limitations under the License.
package worker

import (
"cmp"
"context"
"errors"
"fmt"
"slices"
"time"

autopilotv1beta2 "github.com/k0sproject/k0s/pkg/apis/autopilot/v1beta2"
apcli "github.com/k0sproject/k0s/pkg/autopilot/client"
apcont "github.com/k0sproject/k0s/pkg/autopilot/controller"
aproot "github.com/k0sproject/k0s/pkg/autopilot/controller/root"
k0sscheme "github.com/k0sproject/k0s/pkg/client/clientset/scheme"
"github.com/k0sproject/k0s/pkg/component/manager"
"github.com/k0sproject/k0s/pkg/config"
"github.com/sirupsen/logrus"
"github.com/k0sproject/k0s/pkg/kubernetes/watch"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
)
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/runtime/schema"

const (
defaultPollDuration = 5 * time.Second
defaultPollTimeout = 5 * time.Minute
"github.com/avast/retry-go"
"github.com/sirupsen/logrus"
)

var _ manager.Component = (*Autopilot)(nil)

type Autopilot struct {
K0sVars *config.CfgVars
CertManager *CertificateManager

clientFactory apcli.FactoryInterface
}

func (a *Autopilot) Init(ctx context.Context) error {
return nil
}

func (a *Autopilot) Start(ctx context.Context) error {
log := logrus.WithFields(logrus.Fields{"component": "autopilot"})

// Wait 5 mins till we see kubelet auth config in place
timeout, cancel := context.WithTimeout(ctx, defaultPollTimeout)
defer cancel()

var restConfig *rest.Config
// wait.PollUntilWithContext passes it is own ctx argument as a ctx to the given function
// Poll until the kubelet config can be loaded successfully, as this is the access to the kube api
// needed by autopilot.
if err := wait.PollUntilWithContext(timeout, defaultPollDuration, func(ctx context.Context) (done bool, err error) {
log.Debugf("Attempting to load autopilot client config")
if restConfig, err = a.CertManager.GetRestConfig(ctx); err != nil {
log.WithError(err).Warnf("Failed to load autopilot client config, retrying in %v", defaultPollDuration)
return false, nil
}

return true, nil
}); err != nil {
return fmt.Errorf("unable to create autopilot client: %w", err)
}

// Without the config, there is nothing that we can do.
log := logrus.WithField("component", "autopilot")

if restConfig == nil {
return errors.New("unable to create an autopilot client -- timed out")
kinds, err := getAutopilotKinds()
if err != nil {
return err
}

autopilotClientFactory, err := apcli.NewClientFactory(restConfig)
if err != nil {
return fmt.Errorf("creating autopilot client factory error: %w", err)
var lastErr error
if err := retry.Do(
func() (err error) {
defer func() { lastErr = err }()

restConfig, err := a.CertManager.GetRestConfig(ctx)
if err != nil {
return err
}

a.clientFactory, err = apcli.NewClientFactory(restConfig)
if err != nil {
return err
}

// We need to wait until all autopilot CRDs are established.
kinds := slices.Clone(kinds) // take a copy to avoid side effects
if err := waitUntilCRDsEstablished(ctx, a.clientFactory, kinds); err != nil {
return fmt.Errorf("while waiting for Autopilot CRDs %v to become established: %w", kinds, err)
}

return nil
},
retry.Context(ctx),
retry.LastErrorOnly(true),
retry.OnRetry(func(attempt uint, err error) {
log.WithField("attempt", attempt+1).WithError(err).Debug("Retrying after backoff")
}),
); err != nil {
return fmt.Errorf("failed to initialize autopilot: %w", cmp.Or(lastErr, err))
}

log.Info("Autopilot client factory created, booting up worker root controller")
autopilotRoot, err := apcont.NewRootWorker(aproot.RootConfig{
KubeConfig: a.K0sVars.KubeletAuthConfigPath,
K0sDataDir: a.K0sVars.DataDir,
Mode: "worker",
ManagerPort: 8899,
MetricsBindAddr: "0",
HealthProbeBindAddr: "0",
}, log, autopilotClientFactory)
}, log, a.clientFactory)
if err != nil {
return fmt.Errorf("failed to create autopilot worker: %w", err)
}
Expand All @@ -113,3 +121,89 @@ func (a *Autopilot) Start(ctx context.Context) error {
func (a *Autopilot) Stop() error {
return nil
}

// Gathers all kinds in the autopilot API group.
func getAutopilotKinds() ([]string, error) {
var kinds []string

gv := autopilotv1beta2.SchemeGroupVersion
for kind := range k0sscheme.Scheme.KnownTypes(gv) {
// For some reason, the scheme also returns types from core/v1. Filter
// those out by only adding kinds which are _only_ in the autopilot
// group, and not in some other group as well. The only way to get all
// the GVKs for a certain type is by creating a new instance of that
// type and then asking the scheme about it.
obj, err := k0sscheme.Scheme.New(gv.WithKind(kind))
if err != nil {
return nil, err
}
gvks, _, err := k0sscheme.Scheme.ObjectKinds(obj)
if err != nil {
return nil, err
}

// Skip the kind if there's at least one GVK which is not in the
// autopilot group
if !slices.ContainsFunc(gvks, func(gvk schema.GroupVersionKind) bool {
return gvk.Group != autopilotv1beta2.GroupName
}) {
kinds = append(kinds, kind)
}
}

slices.Sort(kinds) // for cosmetic purposes
return kinds, nil
}

func waitUntilCRDsEstablished(ctx context.Context, clientFactory apcli.FactoryInterface, kinds []string) error {
client, err := clientFactory.GetExtensionClient()
if err != nil {
return err
}

// Watch all the CRDs until all the required ones are established.
log := logrus.WithField("component", "autopilot")
return watch.CRDs(client.CustomResourceDefinitions()).
WithErrorCallback(func(err error) (time.Duration, error) {
if retryAfter, e := watch.IsRetryable(err); e == nil {
log.WithError(err).Info(
"Transient error while watching for CRDs",
", starting over after ", retryAfter, " ...",
)
return retryAfter, nil
}

retryAfter := 10 * time.Second
log.WithError(err).Error(
"Error while watching CRDs",
", starting over after ", retryAfter, " ...",
)
return retryAfter, nil
}).
Until(ctx, func(item *apiextensionsv1.CustomResourceDefinition) (bool, error) {
if item.Spec.Group != autopilotv1beta2.GroupName {
return false, nil // Not an autopilot CRD.
}

// Find the established status for the CRD.
var established apiextensionsv1.ConditionStatus
for _, cond := range item.Status.Conditions {
if cond.Type == apiextensionsv1.Established {
established = cond.Status
break
}
}

if established != apiextensionsv1.ConditionTrue {
return false, nil // CRD not yet established.
}

// Remove the CRD's (list) kind from the list.
kinds = slices.DeleteFunc(kinds, func(kind string) bool {
return kind == item.Spec.Names.Kind || kind == item.Spec.Names.ListKind
})

// If the list is empty, all required CRDs are established.
return len(kinds) < 1, nil
})
}

0 comments on commit f0293f9

Please sign in to comment.