diff --git a/actions/approve_csr_handler.go b/actions/approve_csr_handler.go index 0ba9530b..0d9875d9 100644 --- a/actions/approve_csr_handler.go +++ b/actions/approve_csr_handler.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "reflect" - "sync" "time" "github.com/sirupsen/logrus" @@ -35,8 +34,6 @@ type approveCSRHandler struct { clientset kubernetes.Interface initialCSRFetchTimeout time.Duration csrFetchInterval time.Duration - cancelAutoApprove context.CancelFunc - m sync.Mutex // Used to make sure there is just one watcher running as it may be triggered from multiple CSR actions. } func (h *approveCSRHandler) Handle(ctx context.Context, action *castai.ClusterAction) error { @@ -51,15 +48,7 @@ func (h *approveCSRHandler) Handle(ctx context.Context, action *castai.ClusterAc actionIDLogField: action.ID, }) - // If AllowAutoApprove is enabled, the CSR watcher will be triggered to handle Certificate Signing Requests (CSRs) - // for nodes that are older than 24 hours and managed by CastAI if req.AllowAutoApprove != nil { - if *req.AllowAutoApprove { - go h.RunAutoApproveForCastAINodes(ctx) - } else { - h.StopAutoApproveForCastAINodes() - } - // CSR action may be used only to instruct whether to start / stop watcher responsible for auto-approving; in // this case, there is nothing more to do. if req.NodeName == "" { @@ -171,73 +160,6 @@ func (h *approveCSRHandler) getInitialNodeCSR(ctx context.Context, log logrus.Fi return cert, err } -func (h *approveCSRHandler) RunAutoApproveForCastAINodes(ctx context.Context) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - if !h.startAutoApprove(cancel) { - return // already running - } - defer h.StopAutoApproveForCastAINodes() - - log := h.log.WithField("RunAutoApprove", "auto-approve-csr") - c := make(chan *csr.Certificate, 1) - go csr.WatchCastAINodeCSRs(ctx, log, h.clientset, c) - - for { - select { - case <-ctx.Done(): - log.WithError(ctx.Err()).Errorf("auto approve csr finished") - return - case cert := <-c: - if cert == nil { - continue - } - go func(cert *csr.Certificate) { - log := log.WithField("node_name", cert.Name) - log.Info("auto approving csr") - err := h.handleWithRetry(ctx, log, cert) - if err != nil { - log.WithError(err).Errorf("failed to approve csr: %+v", cert) - } - }(cert) - } - } -} - -func (h *approveCSRHandler) startAutoApprove(cancelFunc context.CancelFunc) bool { - h.m.Lock() - defer h.m.Unlock() - if h.cancelAutoApprove != nil { - return false - } - - h.log.Info("starting auto approve CSRs for managed by Cast AI nodes") - h.cancelAutoApprove = cancelFunc - - return true -} - -func (h *approveCSRHandler) StopAutoApproveForCastAINodes() { - h.m.Lock() - defer h.m.Unlock() - - if h.cancelAutoApprove == nil { - return - } - - h.log.Info("stopping auto approve CSRs for managed by Cast AI nodes") - h.cancelAutoApprove() - h.cancelAutoApprove = nil -} - -func (h *approveCSRHandler) getCancelAutoApprove() context.CancelFunc { - h.m.Lock() - defer h.m.Unlock() - - return h.cancelAutoApprove -} - func newApproveCSRExponentialBackoff() wait.Backoff { b := waitext.DefaultExponentialBackoff() b.Factor = 2 diff --git a/actions/approve_csr_handler_test.go b/actions/approve_csr_handler_test.go index b8e4b528..f10900de 100644 --- a/actions/approve_csr_handler_test.go +++ b/actions/approve_csr_handler_test.go @@ -224,177 +224,6 @@ AiAHVYZXHxxspoV0hcfn2Pdsl89fIPCOFy/K1PqSUR6QNAIgYdt51ZbQt9rgM2BD err := h.Handle(ctx, actionApproveCSR) r.EqualError(err, "getting initial csr: context deadline exceeded") }) - - t.Run("enable-->disable auto-approve", func(t *testing.T) { - r := require.New(t) - - client := fake.NewSimpleClientset() - - boolTrue := true - boolFalse := false - actionRunAutoApprove := &castai.ClusterAction{ - ActionApproveCSR: &castai.ActionApproveCSR{AllowAutoApprove: &boolTrue}, - CreatedAt: time.Time{}, - } - actionStopAutoApprove := &castai.ClusterAction{ - ActionApproveCSR: &castai.ActionApproveCSR{AllowAutoApprove: &boolFalse}, - CreatedAt: time.Time{}, - } - h := &approveCSRHandler{ - log: log, - clientset: client, - csrFetchInterval: 100 * time.Millisecond, - initialCSRFetchTimeout: 1000 * time.Millisecond, - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - err := h.Handle(ctx, actionRunAutoApprove) - time.Sleep(time.Second) - r.NoError(err) - r.NotNil(h.getCancelAutoApprove()) - err = h.Handle(ctx, actionStopAutoApprove) - time.Sleep(time.Second) - r.NoError(err) - r.Nil(h.getCancelAutoApprove()) - }) - - t.Run("enable auto-approve + approve", func(t *testing.T) { - r := require.New(t) - - csrRes := getCSR() - csr2 := csrRes.DeepCopy() - csr2.Name = "node-csr-456" - ch := make(chan struct{}) - defer close(ch) - - client := fake.NewSimpleClientset(csrRes, csr2) - client.PrependReactor("update", "certificatesigningrequests", func(action ktest.Action) (handled bool, ret runtime.Object, err error) { - approved := csrRes.DeepCopy() - approved.Status.Conditions = []certv1.CertificateSigningRequestCondition{ - { - Type: certv1.CertificateApproved, - Reason: csr.ReasonApproved, - Message: "approved", - LastUpdateTime: metav1.Now(), - Status: v1.ConditionTrue, - }, - } - ch <- struct{}{} - return true, approved, nil - }) - client.PrependReactor("get", "nodes", func(action ktest.Action) (handled bool, ret runtime.Object, err error) { - return true, &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - CreationTimestamp: metav1.NewTime(time.Now().Add(-time.Hour * 25)), - Labels: map[string]string{ - castai.LabelManagedBy: castai.LabelValueManagedByCASTAI, - }, - }, - }, nil - }) - - watcher := watch.NewFake() - defer watcher.Stop() - var count int // check retry on error - client.PrependWatchReactor("certificatesigningrequests", - func(action ktest.Action) (handled bool, ret watch.Interface, err error) { - if count == 5 { - return true, watcher, nil - } else { - count++ - return true, nil, fmt.Errorf("ups error") - } - }) - - boolTrue := true - actionRunAutoApprove := &castai.ClusterAction{ - ActionApproveCSR: &castai.ActionApproveCSR{AllowAutoApprove: &boolTrue}, - CreatedAt: time.Time{}, - } - - h := &approveCSRHandler{ - log: log, - clientset: client, - csrFetchInterval: 100 * time.Millisecond, - initialCSRFetchTimeout: 1000 * time.Millisecond, - } - - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() - - for i := 0; i < 5; i++ { - go func() { - err := h.Handle(ctx, actionRunAutoApprove) - time.Sleep(time.Millisecond) - r.NoError(err) - }() - } - - time.Sleep(time.Second) - r.NotNil(h.getCancelAutoApprove()) - go watcher.Add(csrRes) - go watcher.Add(csr2) - for i := 0; i < 2; i++ { - select { - case <-ch: - case <-ctx.Done(): - r.Fail("timeout waiting for auto-approve") - } - } - }) - t.Run("enable auto-approve + skip approve", func(t *testing.T) { - r := require.New(t) - - csrRes := getCSR() - client := fake.NewSimpleClientset(csrRes) - ch := make(chan struct{}) - - client.PrependReactor("get", "nodes", func(action ktest.Action) (handled bool, ret runtime.Object, err error) { - close(ch) - return true, &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - CreationTimestamp: metav1.NewTime(time.Now().Add(-time.Hour * 25)), - Labels: map[string]string{ - castai.LabelManagedBy: castai.LabelValueManagedByCASTAI, - }, - }, - }, nil - }) - - watcher := watch.NewFake() - defer watcher.Stop() - - client.PrependWatchReactor("certificatesigningrequests", ktest.DefaultWatchReactor(watcher, nil)) - - boolTrue := true - actionRunAutoApprove := &castai.ClusterAction{ - ActionApproveCSR: &castai.ActionApproveCSR{AllowAutoApprove: &boolTrue}, - CreatedAt: time.Time{}, - } - - h := &approveCSRHandler{ - log: log, - clientset: client, - csrFetchInterval: 100 * time.Millisecond, - initialCSRFetchTimeout: 1000 * time.Millisecond, - } - - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() - - err := h.Handle(ctx, actionRunAutoApprove) - time.Sleep(time.Millisecond) - r.NoError(err) - r.NotNil(h.getCancelAutoApprove()) - watcher.Add(csrRes) - - select { - case <-ch: - case <-ctx.Done(): - r.Fail("timeout waiting for auto-approve") - } - }) } func TestApproveCSRExponentialBackoff(t *testing.T) { diff --git a/config/config.go b/config/config.go index 938d8eb9..155fe01c 100644 --- a/config/config.go +++ b/config/config.go @@ -19,6 +19,7 @@ type Config struct { LeaderElection LeaderElection PodName string NodeName string + AutoApproveCSR bool } type Log struct { @@ -73,9 +74,9 @@ func Get() Config { _ = viper.BindEnv("leaderelection.lockname", "LEADER_ELECTION_LOCK_NAME") _ = viper.BindEnv("leaderelection.leaseduration", "LEADER_ELECTION_LEASE_DURATION") _ = viper.BindEnv("leaderelection.leaserenewdeadline", "LEADER_ELECTION_LEASE_RENEW_DEADLINE") - _ = viper.BindEnv("aksinitdata", "AKS_INIT_DATA") _ = viper.BindEnv("nodename", "KUBERNETES_NODE_NAME") _ = viper.BindEnv("podname", "KUBERNETES_POD") + _ = viper.BindEnv("autoapprovecsr", "AUTO_APPROVE_CSR") cfg = &Config{} if err := viper.Unmarshal(&cfg); err != nil { diff --git a/csr/csr.go b/csr/csr.go index 5ddfed99..70d15748 100644 --- a/csr/csr.go +++ b/csr/csr.go @@ -8,7 +8,6 @@ import ( "fmt" "sort" "strings" - "time" "github.com/sirupsen/logrus" certv1 "k8s.io/api/certificates/v1" @@ -20,7 +19,6 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - "github.com/castai/cluster-controller/castai" "github.com/castai/cluster-controller/waitext" ) @@ -66,6 +64,13 @@ func (c *Certificate) Approved() bool { return false } +func isAlreadyApproved(err error) bool { + if err == nil { + return false + } + return strings.Contains(err.Error(), "Duplicate value: \"Approved\"") +} + // ApproveCertificate approves csr. func (c *Certificate) ApproveCertificate(ctx context.Context, client kubernetes.Interface) (*Certificate, error) { if err := c.Validate(); err != nil { @@ -80,7 +85,7 @@ func (c *Certificate) ApproveCertificate(ctx context.Context, client kubernetes. LastUpdateTime: metav1.Now(), }) resp, err := client.CertificatesV1beta1().CertificateSigningRequests().UpdateApproval(ctx, c.v1Beta1, metav1.UpdateOptions{}) - if err != nil { + if err != nil && !isAlreadyApproved(err) { return nil, fmt.Errorf("v1beta csr approve: %w", err) } return &Certificate{v1Beta1: resp}, nil @@ -94,7 +99,7 @@ func (c *Certificate) ApproveCertificate(ctx context.Context, client kubernetes. LastUpdateTime: metav1.Now(), }) resp, err := client.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, c.v1.Name, c.v1, metav1.UpdateOptions{}) - if err != nil { + if err != nil && !isAlreadyApproved(err) { return nil, fmt.Errorf("v1 csr approve: %w", err) } return &Certificate{v1: resp}, nil @@ -321,18 +326,18 @@ func WatchCastAINodeCSRs(ctx context.Context, log logrus.FieldLogger, client kub log.WithFields(logrus.Fields{ "csr": name, "node_name": cn, - }).Debugf("skipping csr: %v", err) + }).Debugf("skipping csr unable to get common name: %v", err) continue } if csrResult.Approved() { continue } - if err := autoApprovalValidation(ctx, client, cn); err != nil { + if !isCastAINodeCsr(cn) { log.WithFields(logrus.Fields{ "csr": name, "node_name": cn, - }).Debugf("skipping csr: %s, node: %s %v", name, cn, err) + }).Debug("skipping csr not CAST AI node") continue } csrResult.Name = cn @@ -357,11 +362,11 @@ func toCertificate(event watch.Event) (cert *Certificate, name string, request [ case *certv1.CertificateSigningRequest: name = e.Name request = e.Spec.Request - cert = &Certificate{v1: e, RequestingUser: e.Spec.Username} + cert = &Certificate{Name: name, v1: e, RequestingUser: e.Spec.Username} case *certv1beta1.CertificateSigningRequest: name = e.Name request = e.Spec.Request - cert = &Certificate{v1Beta1: e, RequestingUser: e.Spec.Username} + cert = &Certificate{Name: name, v1Beta1: e, RequestingUser: e.Spec.Username} default: return nil, "", nil } @@ -369,42 +374,16 @@ func toCertificate(event watch.Event) (cert *Certificate, name string, request [ return cert, name, request } -var ( - errNoNodeName = errors.New("no node name") - errCouldNotFindNode = errors.New("could not find node") - errNotManagedByCastAI = errors.New("node is not managed by CAST AI") - errNotOlderThan24Hours = errors.New("node is not older than 24 hours") -) - -func autoApprovalValidation(ctx context.Context, client kubernetes.Interface, subjectCommonName string) error { +func isCastAINodeCsr(subjectCommonName string) bool { if subjectCommonName == "" { - return errNoNodeName - } - - nodeName := strings.TrimPrefix(subjectCommonName, "system:node:") - n, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) - if err != nil { - return err - } - - if n == nil { - return errCouldNotFindNode - } - - managedBy, ok := n.Labels[castai.LabelManagedBy] - if !ok { - return errNotManagedByCastAI - } - - if managedBy != castai.LabelValueManagedByCASTAI { - return fmt.Errorf("label value: %s %w", managedBy, errNotManagedByCastAI) + return false } - if n.CreationTimestamp.After(time.Now().Add(-time.Hour * 24)) { - return errNotOlderThan24Hours + if strings.HasPrefix(subjectCommonName, "system:node") && strings.Contains(subjectCommonName, "cast-pool") { + return true } - return nil + return false } func sendCertificate(ctx context.Context, c chan *Certificate, cert *Certificate) { diff --git a/csr/csr_test.go b/csr/csr_test.go index 7e7c6245..51313c2e 100644 --- a/csr/csr_test.go +++ b/csr/csr_test.go @@ -2,24 +2,15 @@ package csr import ( "context" - "errors" - "fmt" "path/filepath" "testing" "time" "github.com/stretchr/testify/require" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/fake" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - ktest "k8s.io/client-go/testing" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" - - "github.com/castai/cluster-controller/castai" ) func TestApproveCSR(t *testing.T) { @@ -62,111 +53,45 @@ func getClient() (*kubernetes.Clientset, error) { return clientset, err } -func Test_isAutoApproveAllowedForNode(t *testing.T) { - err := fmt.Errorf("error") +func Test_isCastAINodeCsr(t *testing.T) { type args struct { - tuneMockNode runtime.Object - tuneMockErr error subjectCommonName string } tests := []struct { name string args args - want error + want bool }{ { name: "empty node name", - want: errNoNodeName, + want: false, }, { - name: "empty node get response", + name: "not cast in subjectComma ", args: args{ subjectCommonName: "system:node:node1", }, - want: errCouldNotFindNode, - }, - { - name: "empty node get response", - args: args{ - subjectCommonName: "system:node:node1", - tuneMockErr: err, - }, - want: err, + want: false, }, { name: "not CastAI node", args: args{ subjectCommonName: "node1", - tuneMockNode: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "system:node:node1", - CreationTimestamp: metav1.NewTime(time.Now().Add(-time.Hour * 25)), - }, - }, - }, - want: errNotManagedByCastAI, - }, - { - name: "not old enough CastAI node", - args: args{ - subjectCommonName: "system:node:node1", - tuneMockNode: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - castai.LabelManagedBy: castai.LabelValueManagedByCASTAI, - }, - CreationTimestamp: metav1.NewTime(time.Now().Add(-time.Hour)), - Name: "node1", - }, - }, }, - want: errNotOlderThan24Hours, + want: false, }, { - name: "not proper value of CastAI label", + name: "CAST AI node", args: args{ - subjectCommonName: "system:node:node1", - tuneMockNode: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - CreationTimestamp: metav1.NewTime(time.Now().Add(-time.Hour * 25)), - Name: "node1", - Labels: map[string]string{ - castai.LabelManagedBy: "tests", - }, - }, - }, + subjectCommonName: "system:node:node1-cast-pool-123", }, - want: errNotManagedByCastAI, - }, - { - name: "true", - args: args{ - subjectCommonName: "system:node:node1", - tuneMockNode: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - CreationTimestamp: metav1.NewTime(time.Now().Add(-time.Hour * 25)), - Name: "node1", - Labels: map[string]string{ - castai.LabelManagedBy: castai.LabelValueManagedByCASTAI, - }, - }, - }, - }, - want: nil, + want: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - client := fake.NewSimpleClientset() - ch := make(chan struct{}) - - client.PrependReactor("get", "nodes", func(action ktest.Action) (handled bool, ret runtime.Object, err error) { - close(ch) - return true, tt.args.tuneMockNode, tt.args.tuneMockErr - }) - if got := autoApprovalValidation(context.Background(), client, tt.args.subjectCommonName); !errors.Is(got, tt.want) { - t.Errorf("autoApprovalValidation() = %v, want %v", got, tt.want) - } + got := isCastAINodeCsr(tt.args.subjectCommonName) + require.Equal(t, tt.want, got) }) } } diff --git a/csr/svc.go b/csr/svc.go new file mode 100644 index 00000000..a70d91f2 --- /dev/null +++ b/csr/svc.go @@ -0,0 +1,158 @@ +package csr + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/sirupsen/logrus" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/waitext" +) + +const ( + approveCSRTimeout = 4 * time.Minute +) + +func NewApprovalManager(log logrus.FieldLogger, clientset kubernetes.Interface) *ApprovalManager { + return &ApprovalManager{ + log: log, + clientset: clientset, + } +} + +type ApprovalManager struct { + log logrus.FieldLogger + clientset kubernetes.Interface + cancelAutoApprove context.CancelFunc + m sync.Mutex // Used to make sure there is just one watcher running. +} + +func (h *ApprovalManager) Start(ctx context.Context) { + go h.runAutoApproveForCastAINodes(ctx) +} +func (h *ApprovalManager) Stop(ctx context.Context) { + h.stopAutoApproveForCastAINodes() +} + +func (h *ApprovalManager) handleWithRetry(ctx context.Context, log *logrus.Entry, cert *Certificate) error { + ctx, cancel := context.WithTimeout(ctx, approveCSRTimeout) + defer cancel() + + b := newApproveCSRExponentialBackoff() + return waitext.Retry( + ctx, + b, + waitext.Forever, + func(ctx context.Context) (bool, error) { + return true, h.handle(ctx, log, cert) + }, + func(err error) { + log.Warnf("csr approval failed, will retry: %v", err) + }, + ) +} + +func (h *ApprovalManager) handle(ctx context.Context, log logrus.FieldLogger, cert *Certificate) (reterr error) { + if cert.Approved() { + return nil + } + log = log.WithField("csr_name", cert.Name) + // Since this new csr may be denied we need to delete it. + log.Debug("deleting old csr") + if err := cert.DeleteCertificate(ctx, h.clientset); err != nil { + if !apierrors.IsNotFound(err) { + return fmt.Errorf("deleting csr: %w", err) + } + } + + // Create a new CSR with the same request data as the original one. + log.Debug("requesting new csr") + newCert, err := cert.NewCSR(ctx, h.clientset) + if err != nil { + return fmt.Errorf("requesting new csr: %w", err) + } + + // Approve new csr. + log.Debug("approving new csr") + resp, err := newCert.ApproveCertificate(ctx, h.clientset) + if err != nil { + return fmt.Errorf("approving csr: %w", err) + } + if resp.Approved() { + return nil + } + + return errors.New("certificate signing request was not approved") +} + +func (h *ApprovalManager) runAutoApproveForCastAINodes(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if !h.startAutoApprove(cancel) { + return // already running + } + defer h.stopAutoApproveForCastAINodes() + + log := h.log.WithField("RunAutoApprove", "auto-approve-csr") + c := make(chan *Certificate, 1) + go WatchCastAINodeCSRs(ctx, log, h.clientset, c) + + for { + select { + case <-ctx.Done(): + log.WithError(ctx.Err()).Errorf("auto approve csr finished") + return + case cert := <-c: + if cert == nil { + continue + } + go func(cert *Certificate) { + log := log.WithField("node_name", cert.Name) + log.Info("auto approving csr") + err := h.handleWithRetry(ctx, log, cert) + if err != nil { + log.WithError(err).Errorf("failed to approve csr: %+v", cert) + } + }(cert) + } + } +} + +func (h *ApprovalManager) startAutoApprove(cancelFunc context.CancelFunc) bool { + h.m.Lock() + defer h.m.Unlock() + if h.cancelAutoApprove != nil { + return false + } + + h.log.Info("starting auto approve CSRs for managed by CAST AI nodes") + h.cancelAutoApprove = cancelFunc + + return true +} + +func (h *ApprovalManager) stopAutoApproveForCastAINodes() { + h.m.Lock() + defer h.m.Unlock() + + if h.cancelAutoApprove == nil { + return + } + + h.log.Info("stopping auto approve CSRs for managed by CAST AI nodes") + h.cancelAutoApprove() + h.cancelAutoApprove = nil +} + +func newApproveCSRExponentialBackoff() wait.Backoff { + b := waitext.DefaultExponentialBackoff() + b.Factor = 2 + return b +} diff --git a/csr/svc_test.go b/csr/svc_test.go new file mode 100644 index 00000000..7ed2c512 --- /dev/null +++ b/csr/svc_test.go @@ -0,0 +1,108 @@ +package csr + +import ( + "context" + "sync" + + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + certv1 "k8s.io/api/certificates/v1" + "k8s.io/apimachinery/pkg/watch" + ktest "k8s.io/client-go/testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) +import "testing" + +func getCSR(name, username string) *certv1.CertificateSigningRequest { + return &certv1.CertificateSigningRequest{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: certv1.CertificateSigningRequestSpec{ + Request: []byte(`-----BEGIN CERTIFICATE REQUEST----- +MIIBLTCB0wIBADBPMRUwEwYDVQQKEwxzeXN0ZW06bm9kZXMxNjA0BgNVBAMTLXN5 +c3RlbTpub2RlOmdrZS1kZXYtbWFzdGVyLWNhc3QtcG9vbC1jYjUzMTc3YjBZMBMG +ByqGSM49AgEGCCqGSM49AwEHA0IABMZKNQROiVpxfH4nHaPnE6NaY9Mr8/HBnxCl +mPe4mrvNGRnlJV+LvYCUAVlfinzLcMJSmRjJADgzN0Pn+i+4ra6gIjAgBgkqhkiG +9w0BCQ4xEzARMA8GA1UdEQQIMAaHBAoKADIwCgYIKoZIzj0EAwIDSQAwRgIhAOKQ +S59zc2bEaJ3y4aSMXLY3gmri14jZvvnFrxaPDT2PAiEA7C3hvZwrCJsoO61JWKqc +1ElMb/fzAVBcP34rfsE7qmQ= +-----END CERTIFICATE REQUEST-----`), + SignerName: certv1.KubeAPIServerClientKubeletSignerName, + Usages: []certv1.KeyUsage{"kubelet"}, + Username: username, + }, + //Status: certv1.CertificateSigningRequestStatus{}, + } +} + +func TestCSRApprove(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.DebugLevel) + + t.Run("approve v1 csr successfully", func(t *testing.T) { + r := require.New(t) + t.Parallel() + + csrName := "node-csr-123" + userName := "kubelet-bootstrap" + client := fake.NewSimpleClientset(getCSR(csrName, userName)) + s := NewApprovalManager(log, client) + watcher := watch.NewFake() + client.PrependWatchReactor("certificatesigningrequests", ktest.DefaultWatchReactor(watcher, nil)) + + ctx := context.Background() + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + s.Start(ctx) + }() + go func() { + defer wg.Done() + watcher.Add(getCSR(csrName, userName)) + time.Sleep(100 * time.Millisecond) + s.Stop(ctx) + }() + + wg.Wait() + + csrResult, err := client.CertificatesV1().CertificateSigningRequests().Get(ctx, csrName, metav1.GetOptions{}) + r.NoError(err) + r.Equal(csrResult.Status.Conditions[0].Type, certv1.CertificateApproved) + }) + + t.Run("not node csr do nothing", func(t *testing.T) { + r := require.New(t) + t.Parallel() + + csrName := "123" + userName := "kubelet-bootstrap" + client := fake.NewSimpleClientset(getCSR(csrName, userName)) + s := NewApprovalManager(log, client) + watcher := watch.NewFake() + client.PrependWatchReactor("certificatesigningrequests", ktest.DefaultWatchReactor(watcher, nil)) + + ctx := context.Background() + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + s.Start(ctx) + }() + go func() { + defer wg.Done() + watcher.Add(getCSR(csrName, userName)) + time.Sleep(100 * time.Millisecond) + s.Stop(ctx) + }() + + wg.Wait() + + csrResult, err := client.CertificatesV1().CertificateSigningRequests().Get(ctx, csrName, metav1.GetOptions{}) + r.NoError(err) + r.Len(csrResult.Status.Conditions, 0) + }) +} diff --git a/main.go b/main.go index 75e1cc71..f9e82c0b 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( "os" "path" "runtime" + "strings" "time" "github.com/bombsimon/logrusr/v4" @@ -30,6 +31,7 @@ import ( "github.com/castai/cluster-controller/actions" "github.com/castai/cluster-controller/castai" "github.com/castai/cluster-controller/config" + "github.com/castai/cluster-controller/csr" "github.com/castai/cluster-controller/health" "github.com/castai/cluster-controller/helm" "github.com/castai/cluster-controller/version" @@ -196,13 +198,28 @@ func run( } }() + runSvc := func(ctx context.Context) { + isGKE, err := runningOnGKE(clientset, cfg) + if err != nil { + log.Fatalf("failed to determine if running on GKE: %v", err) + } + + log.Debugf("auto approve csr: %v, running on GKE: %v", cfg.AutoApproveCSR, isGKE) + if cfg.AutoApproveCSR && isGKE { + csrMgr := csr.NewApprovalManager(log, clientset) + csrMgr.Start(ctx) + } + + svc.Run(ctx) + } + if cfg.LeaderElection.Enabled { // Run actions service with leader election. Blocks. - return runWithLeaderElection(ctx, log, clientSetLeader, leaderHealthCheck, cfg.LeaderElection, svc.Run) + return runWithLeaderElection(ctx, log, clientSetLeader, leaderHealthCheck, cfg.LeaderElection, runSvc) } // Run action service. Blocks. - svc.Run(ctx) + runSvc(ctx) return nil } @@ -373,3 +390,25 @@ func (e *logContextErr) Error() string { func (e *logContextErr) Unwrap() error { return e.err } + +func runningOnGKE(clientset *kubernetes.Clientset, cfg config.Config) (isGKE bool, err error) { + err = waitext.Retry(context.Background(), waitext.DefaultExponentialBackoff(), 3, func(ctx context.Context) (bool, error) { + node, err := clientset.CoreV1().Nodes().Get(ctx, cfg.NodeName, metav1.GetOptions{}) + if err != nil { + return true, fmt.Errorf("getting node: %w", err) + } + + for k, _ := range node.Labels { + if strings.HasPrefix(k, "cloud.google.com/") { + isGKE = true + return false, nil + } + } + + return false, nil + }, func(err error) { + + }) + + return +}