Skip to content

Commit

Permalink
fix: add csr approval retry (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao authored May 13, 2022
1 parent d560044 commit 7d3815c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 0 deletions.
19 changes: 19 additions & 0 deletions actions/approve_csr_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"

Expand All @@ -19,6 +20,8 @@ func newApproveCSRHandler(log logrus.FieldLogger, clientset kubernetes.Interface
clientset: clientset,
csrFetchInterval: 5 * time.Second,
initialCSRFetchTimeout: 5 * time.Minute,
maxRetries: 10,
retryAfter: 1 * time.Second,
}
}

Expand All @@ -27,6 +30,8 @@ type approveCSRHandler struct {
clientset kubernetes.Interface
csrFetchInterval time.Duration
initialCSRFetchTimeout time.Duration
maxRetries uint64
retryAfter time.Duration
}

func (h *approveCSRHandler) Handle(ctx context.Context, data interface{}) error {
Expand All @@ -37,6 +42,20 @@ func (h *approveCSRHandler) Handle(ctx context.Context, data interface{}) error

log := h.log.WithField("node_name", req.NodeName)

b := backoff.WithContext(
backoff.WithMaxRetries(backoff.NewConstantBackOff(h.retryAfter), h.maxRetries),
ctx,
)
return backoff.RetryNotify(func() error {
return h.handle(ctx, log, req)
}, b, func(err error, duration time.Duration) {
if err != nil {
log.Warnf("csr approval failed, will retry: %v", err)
}
})
}

func (h *approveCSRHandler) handle(ctx context.Context, log logrus.FieldLogger, req *castai.ActionApproveCSR) error {
// First get original csr which is created by kubelet.
log.Debug("getting initial csr")
cert, err := h.getInitialNodeCSR(ctx, req.NodeName)
Expand Down
11 changes: 11 additions & 0 deletions actions/approve_csr_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package actions

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -61,6 +63,7 @@ AiAHVYZXHxxspoV0hcfn2Pdsl89fIPCOFy/K1PqSUR6QNAIgYdt51ZbQt9rgM2BD
}
return
})
var approveCalls int32
client.PrependReactor("update", "certificatesigningrequests", func(action ktest.Action) (handled bool, ret runtime.Object, err error) {
approved := csr.DeepCopy()
approved.Status.Conditions = []certv1.CertificateSigningRequestCondition{
Expand All @@ -72,6 +75,12 @@ AiAHVYZXHxxspoV0hcfn2Pdsl89fIPCOFy/K1PqSUR6QNAIgYdt51ZbQt9rgM2BD
Status: v1.ConditionTrue,
},
}
// Simulate failure for some initial calls to test retry.
calls := atomic.LoadInt32(&approveCalls)
if calls < 2 {
atomic.AddInt32(&approveCalls, 1)
return true, approved, fmt.Errorf("ups")
}
return true, approved, nil
})

Expand All @@ -80,6 +89,8 @@ AiAHVYZXHxxspoV0hcfn2Pdsl89fIPCOFy/K1PqSUR6QNAIgYdt51ZbQt9rgM2BD
clientset: client,
csrFetchInterval: 1 * time.Millisecond,
initialCSRFetchTimeout: 10 * time.Millisecond,
retryAfter: 100 * time.Millisecond,
maxRetries: 5,
}

ctx := context.Background()
Expand Down

0 comments on commit 7d3815c

Please sign in to comment.