Skip to content

Commit

Permalink
feat: node status check action (#40)
Browse files Browse the repository at this point in the history
* feat: node status check action

* Update actions/check_node_status.go

Co-authored-by: Tadeuš Varnas <[email protected]>

* feat: use watch for checking node status

* chore: more detail error when draining node

* feat: use watch for checking node status and on timeout fallback to get

* tests: fix unit tests

* refactor: move require to each testing function

* refactor: code review remarks

* refactor: code review remarks

Co-authored-by: Tadeuš Varnas <[email protected]>
  • Loading branch information
aldor007 and varnastadeus authored Sep 21, 2022
1 parent 4160883 commit 8d3c7d2
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 3 deletions.
1 change: 1 addition & 0 deletions actions/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func NewService(
reflect.TypeOf(&castai.ActionDisconnectCluster{}): newDisconnectClusterHandler(log, clientset),
reflect.TypeOf(&castai.ActionSendAKSInitData{}): newSendAKSInitDataHandler(log, castaiClient),
reflect.TypeOf(&castai.ActionCheckNodeDeleted{}): newCheckNodeDeletedHandler(log, clientset),
reflect.TypeOf(&castai.ActionCheckNodeStatus{}): newCheckNodeStatusHandler(log, clientset),
},
healthCheck: healthCheck,
}
Expand Down
110 changes: 110 additions & 0 deletions actions/check_node_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package actions

import (
"context"
"errors"
"fmt"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/client-go/kubernetes"

"github.com/castai/cluster-controller/castai"
)

func newCheckNodeStatusHandler(log logrus.FieldLogger, clientset kubernetes.Interface) ActionHandler {
return &checkNodeStatusHandler{
log: log,
clientset: clientset,
}
}

type checkNodeStatusHandler struct {
log logrus.FieldLogger
clientset kubernetes.Interface
}

func (h *checkNodeStatusHandler) Handle(ctx context.Context, data interface{}) error {
req, ok := data.(*castai.ActionCheckNodeStatus)
if !ok {
return fmt.Errorf("unexpected type %T for check node status handler", data)
}

log := h.log.WithFields(logrus.Fields{
"node_name": req.NodeName,
"node_status": req.NodeStatus,
})

switch req.NodeStatus {
case castai.ActionCheckNodeStatus_READY:
log.Info("checking node ready")
return h.checkNodeReady(ctx, req)
case castai.ActionCheckNodeStatus_DELETED:
log.Info("checking node deleted")
return h.checkNodeDeleted(ctx, req)

}

return fmt.Errorf("unknown status to check provided node=%s status=%s", req.NodeName, req.NodeStatus)
}

func (h *checkNodeStatusHandler) checkNodeDeleted(ctx context.Context, req *castai.ActionCheckNodeStatus) error {
timeout := 10
if req.WaitTimeoutSeconds != nil {
timeout = int(*req.WaitTimeoutSeconds)
}
ctx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
defer cancel()
b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
return backoff.Retry(func() error {
n, err := h.clientset.CoreV1().Nodes().Get(ctx, req.NodeName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return nil
}
if n != nil {
return backoff.Permanent(errors.New("node is not deleted"))
}
return err
}, b)
}

func (h *checkNodeStatusHandler) checkNodeReady(ctx context.Context, req *castai.ActionCheckNodeStatus) error {
timeout := 9 * time.Minute
watchObject := metav1.SingleObject(metav1.ObjectMeta{Name: req.NodeName})
if req.WaitTimeoutSeconds != nil {
timeout = time.Duration(*req.WaitTimeoutSeconds) * time.Second
}
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

watch, err := h.clientset.CoreV1().Nodes().Watch(ctx, watchObject)
if err != nil {
return fmt.Errorf("creating node watch: %w", err)
}

defer watch.Stop()
for r := range watch.ResultChan() {
if node, ok := r.Object.(*corev1.Node); ok {
if isNodeReady(node.Status.Conditions) {
return nil
}
}
}

return fmt.Errorf("timeout waiting for node %s to become ready", req.NodeName)
}

func isNodeReady(conditions []corev1.NodeCondition) bool {
for _, cond := range conditions {
if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue {
return true
}
}

return false
}
182 changes: 182 additions & 0 deletions actions/check_node_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package actions

import (
"context"
"sync"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/fake"
k8stest "k8s.io/client-go/testing"

"github.com/castai/cluster-controller/castai"
)

func TestCheckStatus_Deleted(t *testing.T) {
log := logrus.New()
log.SetLevel(logrus.DebugLevel)

t.Run("return error when node is not deleted", func(t *testing.T) {
r := require.New(t)
nodeName := "node1"
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
},
}
clientset := fake.NewSimpleClientset(node)

h := checkNodeStatusHandler{
log: log,
clientset: clientset,
}

req := &castai.ActionCheckNodeStatus{
NodeName: "node1",
NodeStatus: castai.ActionCheckNodeStatus_DELETED,
}

err := h.Handle(context.Background(), req)
r.EqualError(err, "node is not deleted")
})

t.Run("handle check successfully when node is not found", func(t *testing.T) {
r := require.New(t)
clientset := fake.NewSimpleClientset()

h := checkNodeStatusHandler{
log: log,
clientset: clientset,
}

req := &castai.ActionCheckNodeStatus{
NodeName: "node1",
NodeStatus: castai.ActionCheckNodeStatus_DELETED,
}

err := h.Handle(context.Background(), req)
r.NoError(err)
})
}

func TestCheckStatus_Ready(t *testing.T) {
log := logrus.New()
log.SetLevel(logrus.DebugLevel)

t.Run("return error when node is not found", func(t *testing.T) {
r := require.New(t)
clientset := fake.NewSimpleClientset()

h := checkNodeStatusHandler{
log: log,
clientset: clientset,
}

watcher := watch.NewFake()

clientset.PrependWatchReactor("nodes", k8stest.DefaultWatchReactor(watcher, nil))
go func() {
time.Sleep(time.Second)
watcher.Stop()
}()

timeout := int32(1)
req := &castai.ActionCheckNodeStatus{
NodeName: "node1",
NodeStatus: castai.ActionCheckNodeStatus_READY,
WaitTimeoutSeconds: &timeout,
}

err := h.Handle(context.Background(), req)
r.EqualError(err, "timeout waiting for node node1 to become ready")
})

t.Run("handle check successfully when node become ready", func(t *testing.T) {
r := require.New(t)
nodeName := "node1"
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
},
},
},
}
clientset := fake.NewSimpleClientset(node)

h := checkNodeStatusHandler{
log: log,
clientset: clientset,
}

timeout := int32(60)
req := &castai.ActionCheckNodeStatus{
NodeName: "node1",
NodeStatus: castai.ActionCheckNodeStatus_READY,
WaitTimeoutSeconds: &timeout,
}

var wg sync.WaitGroup
wg.Add(2)
var err error
go func() {
err = h.Handle(context.Background(), req)
wg.Done()
}()

go func() {
time.Sleep(1 * time.Second)
node.Status.Conditions[0].Status = v1.ConditionTrue
clientset.CoreV1().Nodes().Update(context.Background(), node, metav1.UpdateOptions{})
wg.Done()
}()
wg.Wait()

r.NoError(err)
})

t.Run("handle error when node is not ready", func(t *testing.T) {
r := require.New(t)
nodeName := "node1"
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{},
},
}
clientset := fake.NewSimpleClientset(node)
watcher := watch.NewFake()

clientset.PrependWatchReactor("nodes", k8stest.DefaultWatchReactor(watcher, nil))
go func() {
time.Sleep(time.Second)
watcher.Stop()
}()

h := checkNodeStatusHandler{
log: log,
clientset: clientset,
}

req := &castai.ActionCheckNodeStatus{
NodeName: "node1",
NodeStatus: castai.ActionCheckNodeStatus_READY,
}

err := h.Handle(context.Background(), req)
r.Error(err)
r.EqualError(err, "timeout waiting for node node1 to become ready")
})
}
4 changes: 2 additions & 2 deletions actions/drain_node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (h *drainNodeHandler) Handle(ctx context.Context, data interface{}) error {
defer evictCancel()
err = h.evictNodePods(evictCtx, log, node)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
return fmt.Errorf("eviciting node pods: %w", err)
}

if errors.Is(err, context.DeadlineExceeded) {
Expand All @@ -87,7 +87,7 @@ func (h *drainNodeHandler) Handle(ctx context.Context, data interface{}) error {
deleteCtx, deleteCancel := context.WithTimeout(ctx, h.cfg.podsDeleteTimeout)
defer deleteCancel()
if err := h.deleteNodePods(deleteCtx, log, node); err != nil {
return err
return fmt.Errorf("forcefully deleting pods: %w", err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion actions/drain_node_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestDrainNodeHandler(t *testing.T) {
}

err := h.Handle(context.Background(), req)
r.EqualError(err, "sending evict pods requests: evicting pod pod1 in namespace default: internal")
r.EqualError(err, "eviciting node pods: sending evict pods requests: evicting pod pod1 in namespace default: internal")

n, err := clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
r.NoError(err)
Expand Down
17 changes: 17 additions & 0 deletions castai/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type ClusterAction struct {
ActionDisconnectCluster *ActionDisconnectCluster `json:"actionDisconnectCluster,omitempty"`
ActionSendAKSInitData *ActionSendAKSInitData `json:"actionSendAksInitData,omitempty"`
ActionCheckNodeDeleted *ActionCheckNodeDeleted `json:"actionCheckNodeDeleted,omitempty"`
ActionCheckNodeStatus *ActionCheckNodeStatus `json:"actionCheckNodeStatus,omitempty"`
CreatedAt time.Time `json:"createdAt"`
DoneAt *time.Time `json:"doneAt,omitempty"`
Error *string `json:"error,omitempty"`
Expand Down Expand Up @@ -68,6 +69,9 @@ func (c *ClusterAction) Data() interface{} {
if c.ActionCheckNodeDeleted != nil {
return c.ActionCheckNodeDeleted
}
if c.ActionCheckNodeStatus != nil {
return c.ActionCheckNodeStatus
}
return nil
}

Expand Down Expand Up @@ -125,6 +129,19 @@ type ActionCheckNodeDeleted struct {
NodeName string `json:"nodeName"`
}

type ActionCheckNodeStatus_Status string

const (
ActionCheckNodeStatus_READY ActionCheckNodeStatus_Status = "NodeStatus_READY"
ActionCheckNodeStatus_DELETED ActionCheckNodeStatus_Status = "NodeStatus_DELETED"
)

type ActionCheckNodeStatus struct {
NodeName string `json:"nodeName"`
NodeStatus ActionCheckNodeStatus_Status `json:"nodeStatus,omitempty"`
WaitTimeoutSeconds *int32 `json:"waitTimeoutSeconds,omitempty"`
}

type ActionChartUpsert struct {
Namespace string `json:"namespace"`
ReleaseName string `json:"releaseName"`
Expand Down

0 comments on commit 8d3c7d2

Please sign in to comment.