Skip to content

Commit

Permalink
Kube-637: continue csr handling on pod restart (#151)
Browse files Browse the repository at this point in the history
* KUBE-637: handle csr approving after pod restart
  • Loading branch information
ValyaB authored Oct 24, 2024
1 parent f7ac9e9 commit 117875a
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 84 deletions.
4 changes: 2 additions & 2 deletions internal/actions/approve_csr_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (h *ApproveCSRHandler) handleWithRetry(ctx context.Context, log *logrus.Ent
func (h *ApproveCSRHandler) handle(ctx context.Context, log logrus.FieldLogger, cert *csr.Certificate) (reterr error) {
// Since this new csr may be denied we need to delete it.
log.Debug("deleting old csr")
if err := cert.DeleteCertificate(ctx, h.clientset); err != nil {
if err := cert.DeleteCSR(ctx, h.clientset); err != nil {
return fmt.Errorf("deleting csr: %w", err)
}

Expand All @@ -105,7 +105,7 @@ func (h *ApproveCSRHandler) handle(ctx context.Context, log logrus.FieldLogger,

// Approve new csr.
log.Debug("approving new csr")
resp, err := newCert.ApproveCertificate(ctx, h.clientset)
resp, err := newCert.ApproveCSRCertificate(ctx, h.clientset)
if err != nil {
return fmt.Errorf("approving csr: %w", err)
}
Expand Down
56 changes: 28 additions & 28 deletions internal/actions/csr/csr.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func isAlreadyApproved(err error) bool {
return strings.Contains(err.Error(), "Duplicate value: \"Approved\"")
}

// ApproveCertificate approves csr.
func (c *Certificate) ApproveCertificate(ctx context.Context, client kubernetes.Interface) (*Certificate, error) {
// ApproveCSRCertificate approves csr.
func (c *Certificate) ApproveCSRCertificate(ctx context.Context, client kubernetes.Interface) (*Certificate, error) {
if err := c.Validate(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -105,8 +105,8 @@ func (c *Certificate) ApproveCertificate(ctx context.Context, client kubernetes.
return &Certificate{v1: resp}, nil
}

// DeleteCertificate deletes csr.
func (c *Certificate) DeleteCertificate(ctx context.Context, client kubernetes.Interface) error {
// DeleteCSR deletes csr.
func (c *Certificate) DeleteCSR(ctx context.Context, client kubernetes.Interface) error {
if err := c.Validate(); err != nil {
return err
}
Expand Down Expand Up @@ -315,7 +315,7 @@ func WatchCastAINodeCSRs(ctx context.Context, log logrus.FieldLogger, client kub
return
}

cert, name, request, err := toCertificate(event)
cert, err := toCertificate(event)
if err != nil {
log.Warnf("toCertificate: skipping csr event: %v", err)
continue
Expand All @@ -325,26 +325,10 @@ func WatchCastAINodeCSRs(ctx context.Context, log logrus.FieldLogger, client kub
continue
}

cn, err := getSubjectCommonName(name, request)
if err != nil {
log.WithFields(logrus.Fields{
"csr": name,
"node_name": cn,
}).Infof("skipping csr unable to get common name: %v", err)
continue
}
if cert.Approved() {
continue
}

if !isCastAINodeCsr(cn) {
log.WithFields(logrus.Fields{
"csr": name,
"node_name": cn,
}).Infof("skipping csr not CAST AI node")
continue
}
cert.Name = cn
sendCertificate(ctx, c, cert)
}
}
Expand All @@ -365,9 +349,13 @@ var (
errUnexpectedObjectType = errors.New("unexpected object type")
errCSRTooOld = errors.New("csr is too old")
errOwner = errors.New("owner is not bootstrap")
errNonCastAINode = errors.New("not a castai node")
)

func toCertificate(event watch.Event) (cert *Certificate, name string, request []byte, err error) {
func toCertificate(event watch.Event) (cert *Certificate, err error) {
var name string
var request []byte

isOutdated := false
switch e := event.Object.(type) {
case *certv1.CertificateSigningRequest:
Expand All @@ -381,19 +369,31 @@ func toCertificate(event watch.Event) (cert *Certificate, name string, request [
cert = &Certificate{Name: name, v1Beta1: e, RequestingUser: e.Spec.Username}
isOutdated = e.CreationTimestamp.Add(csrTTL).Before(time.Now())
default:
return nil, "", nil, errUnexpectedObjectType
return nil, errUnexpectedObjectType
}

if isOutdated {
return nil, "", nil, fmt.Errorf("csr with certificate Name: %v RequestingUser: %v %w", cert.Name, cert.RequestingUser, errCSRTooOld)
return nil, fmt.Errorf("csr with certificate Name: %v RequestingUser: %v %w", cert.Name, cert.RequestingUser, errCSRTooOld)
}

// Since we only have one handler per CSR/certificate name,
// which is the node name, we can process the controller's certificates and kubelet-bootstrap`s.
// This covers the case when the controller restarts but the bootstrap certificate was deleted without our own certificate being approved.
if cert.RequestingUser != "kubelet-bootstrap" && cert.RequestingUser != "system:serviceaccount:castai-agent:castai-cluster-controller" {
return nil, fmt.Errorf("csr with certificate Name: %v RequestingUser: %v %w", cert.Name, cert.RequestingUser, errOwner)
}

cn, err := getSubjectCommonName(name, request)
if err != nil {
return nil, fmt.Errorf("getSubjectCommonName: Name: %v RequestingUser: %v request: %v %w", cert.Name, cert.RequestingUser, string(request), err)
}

// We are only interested in kubelet-bootstrap csr. SKIP own CSR due to the infinite loop of deleting->creating new->deleting.
if cert.RequestingUser != "kubelet-bootstrap" {
return nil, "", nil, fmt.Errorf("csr with certificate Name: %v RequestingUser: %v %w", cert.Name, cert.RequestingUser, errOwner)
if !isCastAINodeCsr(cn) {
return nil, fmt.Errorf("csr with certificate Name: %v RequestingUser: %v cn: %v %w", cert.Name, cert.RequestingUser, cn, errNonCastAINode)
}
cert.Name = cn

return cert, name, request, nil
return cert, nil
}

func isCastAINodeCsr(subjectCommonName string) bool {
Expand Down
53 changes: 13 additions & 40 deletions internal/actions/csr/csr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/stretchr/testify/require"
certv1 "k8s.io/api/certificates/v1"
certv1beta1 "k8s.io/api/certificates/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
Expand All @@ -33,13 +32,13 @@ func TestApproveCSR(t *testing.T) {
cert, err := GetCertificateByNodeName(ctx, client, "gke-csr-cast-pool-ab259afb")
r.NoError(err)

err = cert.DeleteCertificate(ctx, client)
err = cert.DeleteCSR(ctx, client)
r.NoError(err)

cert, err = cert.NewCSR(ctx, client)
r.NoError(err)

_, err = cert.ApproveCertificate(ctx, client)
_, err = cert.ApproveCSRCertificate(ctx, client)
r.NoError(err)
}

Expand Down Expand Up @@ -102,34 +101,16 @@ func Test_isCastAINodeCsr(t *testing.T) {
}

func Test_toCertificate(t *testing.T) {
testCSRv1 := &certv1.CertificateSigningRequest{
Spec: certv1.CertificateSigningRequestSpec{
Username: "kubelet-bootstrap",
},
ObjectMeta: metav1.ObjectMeta{
CreationTimestamp: metav1.Time{Time: time.Now().Add(csrTTL)},
Name: "test",
},
}
testCSRv1beta1 := &certv1beta1.CertificateSigningRequest{
Spec: certv1beta1.CertificateSigningRequestSpec{
Username: "kubelet-bootstrap",
},
ObjectMeta: metav1.ObjectMeta{
CreationTimestamp: metav1.Time{Time: time.Now().Add(csrTTL)},
Name: "test",
},
}
testCSRv1 := getCSRv1("node-csr", "kubelet-bootstrap")
testCSRv1beta1 := getCSRv1betav1("node-csr", "kubelet-bootstrap")
type args struct {
event watch.Event
}
tests := []struct {
name string
args args
wantCert *Certificate
wantName string
wantRequest []byte
wantErr bool
name string
args args
wantCert *Certificate
wantErr bool
}{
{
name: "empty event",
Expand Down Expand Up @@ -174,10 +155,9 @@ func Test_toCertificate(t *testing.T) {
Object: testCSRv1,
},
},
wantErr: false,
wantName: "test",
wantErr: false,
wantCert: &Certificate{
Name: "test",
Name: "system:node:gke-dev-master-cast-pool-cb53177b",
RequestingUser: "kubelet-bootstrap",
v1: testCSRv1,
},
Expand All @@ -189,31 +169,24 @@ func Test_toCertificate(t *testing.T) {
Object: testCSRv1beta1,
},
},
wantErr: false,
wantName: "test",
wantErr: false,
wantCert: &Certificate{
Name: "test",
Name: "system:node:gke-dev-master-cast-pool-cb53177b",
RequestingUser: "kubelet-bootstrap",
v1Beta1: testCSRv1beta1,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotCert, gotName, gotRequest, err := toCertificate(tt.args.event)
gotCert, err := toCertificate(tt.args.event)
if (err != nil) != tt.wantErr {
t.Errorf("toCertificate() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotCert, tt.wantCert) {
t.Errorf("toCertificate() gotCert = %v, want %v", gotCert, tt.wantCert)
}
if gotName != tt.wantName {
t.Errorf("toCertificate() gotName = %v, want %v", gotName, tt.wantName)
}
if !reflect.DeepEqual(gotRequest, tt.wantRequest) {
t.Errorf("toCertificate() gotRequest = %v, want %v", gotRequest, tt.wantRequest)
}
})
}
}
21 changes: 12 additions & 9 deletions internal/actions/csr/svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,9 @@ func (h *ApprovalManager) handle(ctx context.Context, log logrus.FieldLogger, ce
return nil
}
log = log.WithField("csr_name", cert.Name)
// Since this new csr may be denied we need to delete it.
log.Info("deleting old csr")
if err := cert.DeleteCertificate(ctx, h.clientset); err != nil {
if !apierrors.IsNotFound(err) {
return fmt.Errorf("deleting csr: %w", err)
}
}

// Create a new CSR with the same request data as the original one.
// Create a new CSR with the same request data as the original one,
// since the old csr maybe denied.
log.Info("requesting new csr")
newCert, err := cert.NewCSR(ctx, h.clientset)
if err != nil {
Expand All @@ -83,14 +77,23 @@ func (h *ApprovalManager) handle(ctx context.Context, log logrus.FieldLogger, ce

// Approve new csr.
log.Info("approving new csr")
resp, err := newCert.ApproveCertificate(ctx, h.clientset)
resp, err := newCert.ApproveCSRCertificate(ctx, h.clientset)
if err != nil {
return fmt.Errorf("approving csr: %w", err)
}
if resp.Approved() {
return nil
}

// clean original csr. should be the last step for having the possibility.
// continue approving csr: old deleted-> restart-> node never join.
log.Info("deleting old csr")
if err := cert.DeleteCSR(ctx, h.clientset); err != nil {
if !apierrors.IsNotFound(err) {
return fmt.Errorf("deleting csr: %w", err)
}
}

return errors.New("certificate signing request was not approved")
}

Expand Down
36 changes: 31 additions & 5 deletions internal/actions/csr/svc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ import (
"testing"
"time"

"github.com/samber/lo"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
certv1 "k8s.io/api/certificates/v1"
certv1beta1 "k8s.io/api/certificates/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/fake"
ktest "k8s.io/client-go/testing"
)

func getCSR(name, username string) *certv1.CertificateSigningRequest {
func getCSRv1(name, username string) *certv1.CertificateSigningRequest {
return &certv1.CertificateSigningRequest{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand All @@ -39,6 +41,30 @@ S59zc2bEaJ3y4aSMXLY3gmri14jZvvnFrxaPDT2PAiEA7C3hvZwrCJsoO61JWKqc
}
}

func getCSRv1betav1(name, username string) *certv1beta1.CertificateSigningRequest {
return &certv1beta1.CertificateSigningRequest{
ObjectMeta: metav1.ObjectMeta{
Name: name,
CreationTimestamp: metav1.Now(),
},
Spec: certv1beta1.CertificateSigningRequestSpec{
Request: []byte(`-----BEGIN CERTIFICATE REQUEST-----
MIIBLTCB0wIBADBPMRUwEwYDVQQKEwxzeXN0ZW06bm9kZXMxNjA0BgNVBAMTLXN5
c3RlbTpub2RlOmdrZS1kZXYtbWFzdGVyLWNhc3QtcG9vbC1jYjUzMTc3YjBZMBMG
ByqGSM49AgEGCCqGSM49AwEHA0IABMZKNQROiVpxfH4nHaPnE6NaY9Mr8/HBnxCl
mPe4mrvNGRnlJV+LvYCUAVlfinzLcMJSmRjJADgzN0Pn+i+4ra6gIjAgBgkqhkiG
9w0BCQ4xEzARMA8GA1UdEQQIMAaHBAoKADIwCgYIKoZIzj0EAwIDSQAwRgIhAOKQ
S59zc2bEaJ3y4aSMXLY3gmri14jZvvnFrxaPDT2PAiEA7C3hvZwrCJsoO61JWKqc
1ElMb/fzAVBcP34rfsE7qmQ=
-----END CERTIFICATE REQUEST-----`),
SignerName: lo.ToPtr(certv1beta1.KubeAPIServerClientKubeletSignerName),
Usages: []certv1beta1.KeyUsage{"kubelet"},
Username: username,
},
// Status: certv1.CertificateSigningRequestStatus{},.
}
}

func TestCSRApprove(t *testing.T) {
log := logrus.New()
log.SetLevel(logrus.DebugLevel)
Expand All @@ -49,7 +75,7 @@ func TestCSRApprove(t *testing.T) {

csrName := "node-csr-123"
userName := "kubelet-bootstrap"
client := fake.NewClientset(getCSR(csrName, userName))
client := fake.NewClientset(getCSRv1(csrName, userName))
s := NewApprovalManager(log, client)
watcher := watch.NewFake()
client.PrependWatchReactor("certificatesigningrequests", ktest.DefaultWatchReactor(watcher, nil))
Expand All @@ -63,7 +89,7 @@ func TestCSRApprove(t *testing.T) {
}()
go func() {
defer wg.Done()
watcher.Add(getCSR(csrName, userName))
watcher.Add(getCSRv1(csrName, userName))
time.Sleep(100 * time.Millisecond)
s.Stop()
}()
Expand All @@ -82,7 +108,7 @@ func TestCSRApprove(t *testing.T) {

csrName := "123"
userName := "kubelet-bootstrap"
client := fake.NewClientset(getCSR(csrName, userName))
client := fake.NewClientset(getCSRv1(csrName, userName))
s := NewApprovalManager(log, client)
watcher := watch.NewFake()
client.PrependWatchReactor("certificatesigningrequests", ktest.DefaultWatchReactor(watcher, nil))
Expand All @@ -96,7 +122,7 @@ func TestCSRApprove(t *testing.T) {
}()
go func() {
defer wg.Done()
watcher.Add(getCSR(csrName, userName))
watcher.Add(getCSRv1(csrName, userName))
time.Sleep(100 * time.Millisecond)
s.Stop()
}()
Expand Down

0 comments on commit 117875a

Please sign in to comment.