Skip to content

Commit

Permalink
Kube-333: Unify the retry behavior around k8s.io/apimachinery/pkg/uti…
Browse files Browse the repository at this point in the history
…l/wait (#113)

* Initial extensions funcs

* Change main.go to wait pkg

* Change check_node_deleted.go to wait pkg

* Change kubernetes_helpers.go to wait pkg

* Change drain_node_handler.go to wait pkg

* Fix calls to patchNode to pass logger

* Change actions.go to wait pkg

* Change exporter.go to latest version of Retry

* Change approve_csr_handler.go to new version

* Change chart_loader.go to new version

* Change check_node_status.go to new version

* Change delete_node_handler.go to new version

* Change drain_node_handler.go to new version

* Updating usages of the older extension func

* Fix chart_loader_test.go

* Fix approve_csr_handler_test.go test to use wait pkg

* Change extensions and add tests

* Tests and adjustments

* Update extension pkg with docs

* Remove backoff library

* Add a RetryForever option to simulate what backoff could do

* Cleanup

* Change the "forever" notion again to be easier to get right

* Adjust getNodeForPatching so that it has roughly the same max timeout but even on exhausting retries it surfaces the error instead of context.Cancelled.

* Update logexporter.go after rebase

* Fix TODO

* Delete file leftover after rebase

* Remove double-log-per-retry

* Adjust getaction logging from PR review

* Remove some helpers that are probably overkill (PR) and attempt to simplify a bit.

* Change all usages to new pattern

* Move package (PR comment)

* Change package godoc (PR comment)

* Remove empty line for uuid import (PR comment)

* Make default values internal

* Remove the non-context retry and only leave 1 function

* PR comment to remove unnecessary func

* PR comment to remove unnecessary constructor

* Wrap context error, context cause and last encountered error in return value
  • Loading branch information
Tsonov authored Apr 25, 2024
1 parent 6eba93d commit 5fc3c84
Show file tree
Hide file tree
Showing 18 changed files with 683 additions and 274 deletions.
34 changes: 16 additions & 18 deletions actions/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/sirupsen/logrus"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"

"github.com/castai/cluster-controller/castai"
"github.com/castai/cluster-controller/health"
"github.com/castai/cluster-controller/helm"
"github.com/castai/cluster-controller/waitext"
)

const (
Expand Down Expand Up @@ -132,16 +132,19 @@ func (s *service) doWork(ctx context.Context) error {
iteration int
)

b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(5*time.Second), 3), ctx)
errR := backoff.Retry(func() error {
boff := waitext.NewConstantBackoff(5 * time.Second)

errR := waitext.Retry(ctx, boff, 3, func(ctx context.Context) (bool, error) {
iteration++
actions, err = s.castAIClient.GetActions(ctx, s.k8sVersion)
if err != nil {
s.log.Errorf("polling actions: get action request failed: iteration: %v %v", iteration, err)
return err
return true, err
}
return nil
}, b)
return false, nil
}, func(err error) {
s.log.Errorf("polling actions: get action request failed: iteration: %v %v", iteration, err)
})

if errR != nil {
return fmt.Errorf("polling actions: %w", err)
}
Expand Down Expand Up @@ -242,21 +245,16 @@ func (s *service) ackAction(ctx context.Context, action *castai.ClusterAction, h
"type": actionType.String(),
}).Info("ack action")

return backoff.RetryNotify(func() error {
boff := waitext.NewConstantBackoff(s.cfg.AckRetryWait)

return waitext.Retry(ctx, boff, s.cfg.AckRetriesCount, func(ctx context.Context) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, s.cfg.AckTimeout)
defer cancel()
return s.castAIClient.AckAction(ctx, action.ID, &castai.AckClusterActionRequest{
return true, s.castAIClient.AckAction(ctx, action.ID, &castai.AckClusterActionRequest{
Error: getHandlerError(handleErr),
})
}, backoff.WithContext(
backoff.WithMaxRetries(
backoff.NewConstantBackOff(s.cfg.AckRetryWait), uint64(s.cfg.AckRetriesCount),
),
ctx,
), func(err error, duration time.Duration) {
if err != nil {
s.log.Debugf("ack failed, will retry: %v", err)
}
}, func(err error) {
s.log.Debugf("ack failed, will retry: %v", err)
})
}

Expand Down
64 changes: 38 additions & 26 deletions actions/approve_csr_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@ import (
"reflect"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"

"github.com/castai/cluster-controller/castai"
"github.com/castai/cluster-controller/csr"
"github.com/castai/cluster-controller/waitext"
)

const (
approveCSRTimeout = 4 * time.Minute
)

func newApproveCSRHandler(log logrus.FieldLogger, clientset kubernetes.Interface) ActionHandler {
Expand Down Expand Up @@ -53,17 +58,21 @@ func (h *approveCSRHandler) Handle(ctx context.Context, action *castai.ClusterAc
return nil
}

b := backoff.WithContext(
newApproveCSRExponentialBackoff(),
ctx, cancel := context.WithTimeout(ctx, approveCSRTimeout)
defer cancel()

b := newApproveCSRExponentialBackoff()
return waitext.Retry(
ctx,
)
return backoff.RetryNotify(func() error {
return h.handle(ctx, log, cert)
}, b, func(err error, duration time.Duration) {
if err != nil {
b,
waitext.Forever,
func(ctx context.Context) (bool, error) {
return true, h.handle(ctx, log, cert)
},
func(err error) {
log.Warnf("csr approval failed, will retry: %v", err)
}
})
},
)
}

func (h *approveCSRHandler) handle(ctx context.Context, log logrus.FieldLogger, cert *csr.Certificate) (reterr error) {
Expand Down Expand Up @@ -122,25 +131,28 @@ func (h *approveCSRHandler) getInitialNodeCSR(ctx context.Context, log logrus.Fi
var cert *csr.Certificate
var err error

logRetry := func(err error, _ time.Duration) {
log.Warnf("getting initial csr, will retry: %v", err)
}
b := backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 3)
err = backoff.RetryNotify(func() error {
cert, err = poll()
if errors.Is(err, context.DeadlineExceeded) {
return backoff.Permanent(err)
}
return err
}, b, logRetry)
b := waitext.DefaultExponentialBackoff()
err = waitext.Retry(
ctx,
b,
3,
func(ctx context.Context) (bool, error) {
cert, err = poll()
if errors.Is(err, context.DeadlineExceeded) {
return false, err
}
return true, err
},
func(err error) {
log.Warnf("getting initial csr, will retry: %v", err)
},
)

return cert, err
}

func newApproveCSRExponentialBackoff() *backoff.ExponentialBackOff {
b := backoff.NewExponentialBackOff()
b.Multiplier = 2
b.MaxElapsedTime = 4 * time.Minute
b.Reset()
func newApproveCSRExponentialBackoff() wait.Backoff {
b := waitext.DefaultExponentialBackoff()
b.Factor = 2
return b
}
6 changes: 3 additions & 3 deletions actions/approve_csr_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"errors"
"fmt"
"github.com/google/uuid"
"sync/atomic"
"testing"
"time"

"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
certv1 "k8s.io/api/certificates/v1"
Expand Down Expand Up @@ -231,10 +231,10 @@ func TestApproveCSRExponentialBackoff(t *testing.T) {
b := newApproveCSRExponentialBackoff()
var sum time.Duration
for i := 0; i < 10; i++ {
tmp := b.NextBackOff()
tmp := b.Step()
sum += tmp
}
r.Truef(100 < sum.Seconds(), "actual elapsed seconds %s", sum.Seconds())
r.Truef(100 < sum.Seconds(), "actual elapsed seconds %v", sum.Seconds())
}

func getCSR() *certv1.CertificateSigningRequest {
Expand Down
63 changes: 36 additions & 27 deletions actions/check_node_deleted.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ import (
"reflect"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/castai/cluster-controller/castai"
"github.com/castai/cluster-controller/waitext"
)

type checkNodeDeletedConfig struct {
retries uint64
retries int
retryWait time.Duration
}

Expand Down Expand Up @@ -52,35 +52,44 @@ func (h *checkNodeDeletedHandler) Handle(ctx context.Context, action *castai.Clu
})
log.Info("checking if node is deleted")

b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(h.cfg.retryWait), h.cfg.retries), ctx)
return backoff.Retry(func() error {
n, err := h.clientset.CoreV1().Nodes().Get(ctx, req.NodeName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return nil
}
boff := waitext.NewConstantBackoff(h.cfg.retryWait)

if n == nil {
return nil
}
return waitext.Retry(
ctx,
boff,
h.cfg.retries,
func(ctx context.Context) (bool, error) {
n, err := h.clientset.CoreV1().Nodes().Get(ctx, req.NodeName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return false, nil
}

if n == nil {
return false, nil
}

currentNodeID, ok := n.Labels[castai.LabelNodeID]
if !ok {
log.Info("node doesn't have castai node id label")
}
if currentNodeID != "" {
if currentNodeID != req.NodeID {
log.Info("node name was reused. Original node is deleted")
return nil
currentNodeID, ok := n.Labels[castai.LabelNodeID]
if !ok {
log.Info("node doesn't have castai node id label")
}
if currentNodeID == req.NodeID {
return backoff.Permanent(errors.New("node is not deleted"))
if currentNodeID != "" {
if currentNodeID != req.NodeID {
log.Info("node name was reused. Original node is deleted")
return false, nil
}
if currentNodeID == req.NodeID {
return false, errors.New("node is not deleted")
}
}
}

if n != nil {
return backoff.Permanent(errors.New("node is not deleted"))
}
if n != nil {
return false, errors.New("node is not deleted")
}

return err
}, b)
return true, err
},
func(err error) {
log.Warnf("node deletion check failed, will retry: %v", err)
},
)
}
69 changes: 39 additions & 30 deletions actions/check_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"reflect"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -16,6 +15,7 @@ import (
"k8s.io/client-go/kubernetes"

"github.com/castai/cluster-controller/castai"
"github.com/castai/cluster-controller/waitext"
)

func newCheckNodeStatusHandler(log logrus.FieldLogger, clientset kubernetes.Interface) ActionHandler {
Expand Down Expand Up @@ -64,42 +64,51 @@ func (h *checkNodeStatusHandler) checkNodeDeleted(ctx context.Context, log *logr
}
ctx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
defer cancel()
b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
return backoff.Retry(func() error {
n, err := h.clientset.CoreV1().Nodes().Get(ctx, req.NodeName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return nil
}

// If node is nil - deleted
// If label is present and doesn't match - node was reused - deleted
// If label is present and matches - node is not deleted
// If label is not present and node is not nil - node is not deleted (potentially corrupted state)
b := waitext.DefaultExponentialBackoff()
return waitext.Retry(
ctx,
b,
waitext.Forever,
func(ctx context.Context) (bool, error) {
n, err := h.clientset.CoreV1().Nodes().Get(ctx, req.NodeName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return false, nil
}

if n == nil {
return nil
}
// If node is nil - deleted
// If label is present and doesn't match - node was reused - deleted
// If label is present and matches - node is not deleted
// If label is not present and node is not nil - node is not deleted (potentially corrupted state)

currentNodeID, ok := n.Labels[castai.LabelNodeID]
if !ok {
log.Info("node doesn't have castai node id label")
}
if currentNodeID != "" {
if currentNodeID != req.NodeID {
log.Info("node name was reused. Original node is deleted")
return nil
if n == nil {
return false, nil
}
if currentNodeID == req.NodeID {
return backoff.Permanent(errors.New("node is not deleted"))

currentNodeID, ok := n.Labels[castai.LabelNodeID]
if !ok {
log.Info("node doesn't have castai node id label")
}
if currentNodeID != "" {
if currentNodeID != req.NodeID {
log.Info("node name was reused. Original node is deleted")
return false, nil
}
if currentNodeID == req.NodeID {
return false, errors.New("node is not deleted")
}
}
}

if n != nil {
return backoff.Permanent(errors.New("node is not deleted"))
}
if n != nil {
return false, errors.New("node is not deleted")
}

return err
}, b)
return true, err
},
func(err error) {
h.log.Warnf("check node %s status failed, will retry: %v", req.NodeName, err)
},
)
}

func (h *checkNodeStatusHandler) checkNodeReady(ctx context.Context, log *logrus.Entry, req *castai.ActionCheckNodeStatus) error {
Expand Down
Loading

0 comments on commit 5fc3c84

Please sign in to comment.