Skip to content

Commit

Permalink
KUBE-644: use informers to watch CSRs (#154)
Browse files Browse the repository at this point in the history
  • Loading branch information
Trojan295 authored Nov 4, 2024
1 parent 4a3f219 commit 94411f2
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 162 deletions.
7 changes: 5 additions & 2 deletions cmd/controller/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,12 @@ func runController(
}

if isGKE {
log.Info("auto approve csr started as running on GKE")
csrMgr := csr.NewApprovalManager(log, clientset)
csrMgr.Start(ctx)
if err := csrMgr.Start(ctx); err != nil {
log.WithError(err).Fatal("failed to start approval manager")
}

log.Info("auto approve csr started as running on GKE")
}

svc.Run(ctx)
Expand Down
188 changes: 84 additions & 104 deletions internal/actions/csr/csr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"

"github.com/castai/cluster-controller/internal/waitext"
"k8s.io/client-go/tools/cache"
)

const (
ReasonApproved = "AutoApproved"
approvedMessage = "This CSR was approved by CAST AI"
csrTTL = time.Hour

// We should approve CSRs, when they are created, so resync can be high.
// Resync plays back all events (create, update, delete), which are in informer cache.
// This does not involve talking to API server, it is not relist.
csrInformerResyncPeriod = 12 * time.Hour
)

var ErrNodeCertificateNotFound = errors.New("node certificate not found")
Expand Down Expand Up @@ -66,6 +70,34 @@ func (c *Certificate) Approved() bool {
return false
}

// Outdated returns, whether the certificate request is old and should not be processed by cluster-controller.
// It has nothing to do with certificate expiration.
func (c *Certificate) Outdated() bool {
if c.v1Beta1 != nil {
return c.v1Beta1.CreationTimestamp.Add(csrTTL).Before(time.Now())
}
return c.v1.CreationTimestamp.Add(csrTTL).Before(time.Now())
}

func (c *Certificate) ForCASTAINode() bool {
if c.Name == "" {
return false
}

if strings.HasPrefix(c.Name, "system:node") && strings.Contains(c.Name, "cast-pool") {
return true
}

return false
}

func (c *Certificate) NodeBootstrap() bool {
// Since we only have one handler per CSR/certificate name,
// which is the node name, we can process the controller's certificates and kubelet-bootstrap`s.
// This covers the case when the controller restarts but the bootstrap certificate was deleted without our own certificate being approved.
return c.RequestingUser == "kubelet-bootstrap" || c.RequestingUser == "system:serviceaccount:castai-agent:castai-cluster-controller"
}

func isAlreadyApproved(err error) bool {
if err == nil {
return false
Expand Down Expand Up @@ -147,6 +179,17 @@ func (c *Certificate) NewCSR(ctx context.Context, client kubernetes.Interface) (
return &Certificate{v1: resp}, nil
}

func startInformer(ctx context.Context, log logrus.FieldLogger, factory informers.SharedInformerFactory) {
stopCh := make(chan struct{})
defer close(stopCh)

factory.Start(stopCh)
log.Info("watching for new node csr")

<-ctx.Done()
log.WithField("context", ctx.Err()).Info("finished watching for new node csr")
}

func get(ctx context.Context, client kubernetes.Interface, cert *Certificate) (*Certificate, error) {
if cert.v1Beta1 != nil {
v1beta1req, err := client.CertificatesV1beta1().CertificateSigningRequests().Get(ctx, cert.v1Beta1.Name, metav1.GetOptions{})
Expand Down Expand Up @@ -273,144 +316,81 @@ func getNodeCSRV1Beta1(ctx context.Context, client kubernetes.Interface, nodeNam
return nil, ErrNodeCertificateNotFound
}

func WatchCastAINodeCSRs(ctx context.Context, log logrus.FieldLogger, client kubernetes.Interface, c chan *Certificate) {
var w watch.Interface
var err error
b := waitext.DefaultExponentialBackoff()
err = waitext.Retry(
ctx,
b,
waitext.Forever,
func(ctx context.Context) (bool, error) {
w, err = getWatcher(ctx, client)
// Context canceled is when the cluster-controller is stopped.
// In that case context.Canceled is not an error.
if errors.Is(err, context.Canceled) {
return false, err
}
if err != nil {
return true, fmt.Errorf("getWatcher: %w", err)
}
return false, nil
},
func(err error) {
log.Warnf("retrying: %v", err)
},
func createInformer(ctx context.Context, client kubernetes.Interface) (informers.SharedInformerFactory, cache.SharedIndexInformer, error) {
var (
errv1 error
errv1beta1 error
)
if err != nil {
log.Warnf("finished: %v", err)
return
}

defer w.Stop()

log.Info("watching for new node csr")
if _, errv1 = client.CertificatesV1().CertificateSigningRequests().List(ctx, metav1.ListOptions{}); errv1 == nil {
v1Factory := informers.NewSharedInformerFactoryWithOptions(client, csrInformerResyncPeriod,
informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
opts.FieldSelector = getOptions(certv1.KubeAPIServerClientKubeletSignerName).FieldSelector
}))
v1Informer := v1Factory.Certificates().V1().CertificateSigningRequests().Informer()
return v1Factory, v1Informer, nil
}

for {
select {
case <-ctx.Done():
return
case event, ok := <-w.ResultChan():
if !ok {
log.Info("watcher closed")
go WatchCastAINodeCSRs(ctx, log, client, c) // start over in case of any error.
return
}
if _, errv1beta1 = client.CertificatesV1beta1().CertificateSigningRequests().List(ctx, metav1.ListOptions{}); errv1beta1 == nil {
v1Factory := informers.NewSharedInformerFactoryWithOptions(client, csrInformerResyncPeriod,
informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
opts.FieldSelector = getOptions(certv1beta1.KubeAPIServerClientKubeletSignerName).FieldSelector
}))
v1Informer := v1Factory.Certificates().V1beta1().CertificateSigningRequests().Informer()
return v1Factory, v1Informer, nil
}

cert, err := toCertificate(event)
if err != nil {
log.Warnf("toCertificate: skipping csr event: %v", err)
continue
}
return nil, nil, fmt.Errorf("failed to create informer: v1: %w, v1beta1: %w", errv1, errv1beta1)
}

if cert == nil {
continue
}
var errUnexpectedObjectType = errors.New("unexpected object type")

if cert.Approved() {
continue
}
func processCSREvent(ctx context.Context, c chan<- *Certificate, csrObj interface{}) error {
cert, err := toCertificate(csrObj)
if err != nil {
return err
}

sendCertificate(ctx, c, cert)
}
if cert == nil {
return nil
}
}

func getWatcher(ctx context.Context, client kubernetes.Interface) (watch.Interface, error) {
w, err := client.CertificatesV1().CertificateSigningRequests().Watch(ctx, getOptions(certv1.KubeAPIServerClientKubeletSignerName))
if err != nil {
w, err = client.CertificatesV1beta1().CertificateSigningRequests().Watch(ctx, getOptions(certv1beta1.KubeAPIServerClientKubeletSignerName))
if err != nil {
return nil, fmt.Errorf("fail to open v1 and v1beta watching client: %w", err)
}
if cert.Approved() || !cert.ForCASTAINode() || !cert.NodeBootstrap() || cert.Outdated() {
return nil
}
return w, nil
}

var (
errUnexpectedObjectType = errors.New("unexpected object type")
errCSRTooOld = errors.New("csr is too old")
errOwner = errors.New("owner is not bootstrap")
errNonCastAINode = errors.New("not a castai node")
)
sendCertificate(ctx, c, cert)
return nil
}

func toCertificate(event watch.Event) (cert *Certificate, err error) {
func toCertificate(obj interface{}) (cert *Certificate, err error) {
var name string
var request []byte

isOutdated := false
switch e := event.Object.(type) {
switch e := obj.(type) {
case *certv1.CertificateSigningRequest:
name = e.Name
request = e.Spec.Request
cert = &Certificate{Name: name, v1: e, RequestingUser: e.Spec.Username}
isOutdated = e.CreationTimestamp.Add(csrTTL).Before(time.Now())
case *certv1beta1.CertificateSigningRequest:
name = e.Name
request = e.Spec.Request
cert = &Certificate{Name: name, v1Beta1: e, RequestingUser: e.Spec.Username}
isOutdated = e.CreationTimestamp.Add(csrTTL).Before(time.Now())
default:
return nil, errUnexpectedObjectType
}

if isOutdated {
return nil, fmt.Errorf("csr with certificate Name: %v RequestingUser: %v %w", cert.Name, cert.RequestingUser, errCSRTooOld)
}

// Since we only have one handler per CSR/certificate name,
// which is the node name, we can process the controller's certificates and kubelet-bootstrap`s.
// This covers the case when the controller restarts but the bootstrap certificate was deleted without our own certificate being approved.
if cert.RequestingUser != "kubelet-bootstrap" && cert.RequestingUser != "system:serviceaccount:castai-agent:castai-cluster-controller" {
return nil, fmt.Errorf("csr with certificate Name: %v RequestingUser: %v %w", cert.Name, cert.RequestingUser, errOwner)
}

cn, err := getSubjectCommonName(name, request)
if err != nil {
return nil, fmt.Errorf("getSubjectCommonName: Name: %v RequestingUser: %v request: %v %w", cert.Name, cert.RequestingUser, string(request), err)
}

if !isCastAINodeCsr(cn) {
return nil, fmt.Errorf("csr with certificate Name: %v RequestingUser: %v cn: %v %w", cert.Name, cert.RequestingUser, cn, errNonCastAINode)
}
cert.Name = cn

return cert, nil
}

func isCastAINodeCsr(subjectCommonName string) bool {
if subjectCommonName == "" {
return false
}

if strings.HasPrefix(subjectCommonName, "system:node") && strings.Contains(subjectCommonName, "cast-pool") {
return true
}

return false
}

func sendCertificate(ctx context.Context, c chan *Certificate, cert *Certificate) {
func sendCertificate(ctx context.Context, c chan<- *Certificate, cert *Certificate) {
select {
case c <- cert:
case <-ctx.Done():
Expand Down
Loading

0 comments on commit 94411f2

Please sign in to comment.