diff --git a/actions/actions.go b/actions/actions.go index 10fc14ad..79abae7d 100644 --- a/actions/actions.go +++ b/actions/actions.go @@ -63,6 +63,7 @@ func NewService( reflect.TypeOf(&castai.ActionDisconnectCluster{}): newDisconnectClusterHandler(log, clientset), reflect.TypeOf(&castai.ActionSendAKSInitData{}): newSendAKSInitDataHandler(log, castaiClient), reflect.TypeOf(&castai.ActionCheckNodeDeleted{}): newCheckNodeDeletedHandler(log, clientset), + reflect.TypeOf(&castai.ActionCheckNodeStatus{}): newCheckNodeStatusHandler(log, clientset), }, healthCheck: healthCheck, } diff --git a/actions/check_node_status.go b/actions/check_node_status.go new file mode 100644 index 00000000..437acd9e --- /dev/null +++ b/actions/check_node_status.go @@ -0,0 +1,110 @@ +package actions + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/castai" +) + +func newCheckNodeStatusHandler(log logrus.FieldLogger, clientset kubernetes.Interface) ActionHandler { + return &checkNodeStatusHandler{ + log: log, + clientset: clientset, + } +} + +type checkNodeStatusHandler struct { + log logrus.FieldLogger + clientset kubernetes.Interface +} + +func (h *checkNodeStatusHandler) Handle(ctx context.Context, data interface{}) error { + req, ok := data.(*castai.ActionCheckNodeStatus) + if !ok { + return fmt.Errorf("unexpected type %T for check node status handler", data) + } + + log := h.log.WithFields(logrus.Fields{ + "node_name": req.NodeName, + "node_status": req.NodeStatus, + }) + + switch req.NodeStatus { + case castai.ActionCheckNodeStatus_READY: + log.Info("checking node ready") + return h.checkNodeReady(ctx, req) + case castai.ActionCheckNodeStatus_DELETED: + log.Info("checking node deleted") + return h.checkNodeDeleted(ctx, req) + + } + + return fmt.Errorf("unknown status to check provided node=%s status=%s", req.NodeName, req.NodeStatus) +} + +func (h *checkNodeStatusHandler) checkNodeDeleted(ctx context.Context, req *castai.ActionCheckNodeStatus) error { + timeout := 10 + if req.WaitTimeoutSeconds != nil { + timeout = int(*req.WaitTimeoutSeconds) + } + ctx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second) + defer cancel() + b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx) + return backoff.Retry(func() error { + n, err := h.clientset.CoreV1().Nodes().Get(ctx, req.NodeName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return nil + } + if n != nil { + return backoff.Permanent(errors.New("node is not deleted")) + } + return err + }, b) +} + +func (h *checkNodeStatusHandler) checkNodeReady(ctx context.Context, req *castai.ActionCheckNodeStatus) error { + timeout := 9 * time.Minute + watchObject := metav1.SingleObject(metav1.ObjectMeta{Name: req.NodeName}) + if req.WaitTimeoutSeconds != nil { + timeout = time.Duration(*req.WaitTimeoutSeconds) * time.Second + } + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + watch, err := h.clientset.CoreV1().Nodes().Watch(ctx, watchObject) + if err != nil { + return fmt.Errorf("creating node watch: %w", err) + } + + defer watch.Stop() + for r := range watch.ResultChan() { + if node, ok := r.Object.(*corev1.Node); ok { + if isNodeReady(node.Status.Conditions) { + return nil + } + } + } + + return fmt.Errorf("timeout waiting for node %s to become ready", req.NodeName) +} + +func isNodeReady(conditions []corev1.NodeCondition) bool { + for _, cond := range conditions { + if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue { + return true + } + } + + return false +} diff --git a/actions/check_node_status_test.go b/actions/check_node_status_test.go new file mode 100644 index 00000000..009158f7 --- /dev/null +++ b/actions/check_node_status_test.go @@ -0,0 +1,182 @@ +package actions + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/fake" + k8stest "k8s.io/client-go/testing" + + "github.com/castai/cluster-controller/castai" +) + +func TestCheckStatus_Deleted(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.DebugLevel) + + t.Run("return error when node is not deleted", func(t *testing.T) { + r := require.New(t) + nodeName := "node1" + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + }, + } + clientset := fake.NewSimpleClientset(node) + + h := checkNodeStatusHandler{ + log: log, + clientset: clientset, + } + + req := &castai.ActionCheckNodeStatus{ + NodeName: "node1", + NodeStatus: castai.ActionCheckNodeStatus_DELETED, + } + + err := h.Handle(context.Background(), req) + r.EqualError(err, "node is not deleted") + }) + + t.Run("handle check successfully when node is not found", func(t *testing.T) { + r := require.New(t) + clientset := fake.NewSimpleClientset() + + h := checkNodeStatusHandler{ + log: log, + clientset: clientset, + } + + req := &castai.ActionCheckNodeStatus{ + NodeName: "node1", + NodeStatus: castai.ActionCheckNodeStatus_DELETED, + } + + err := h.Handle(context.Background(), req) + r.NoError(err) + }) +} + +func TestCheckStatus_Ready(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.DebugLevel) + + t.Run("return error when node is not found", func(t *testing.T) { + r := require.New(t) + clientset := fake.NewSimpleClientset() + + h := checkNodeStatusHandler{ + log: log, + clientset: clientset, + } + + watcher := watch.NewFake() + + clientset.PrependWatchReactor("nodes", k8stest.DefaultWatchReactor(watcher, nil)) + go func() { + time.Sleep(time.Second) + watcher.Stop() + }() + + timeout := int32(1) + req := &castai.ActionCheckNodeStatus{ + NodeName: "node1", + NodeStatus: castai.ActionCheckNodeStatus_READY, + WaitTimeoutSeconds: &timeout, + } + + err := h.Handle(context.Background(), req) + r.EqualError(err, "timeout waiting for node node1 to become ready") + }) + + t.Run("handle check successfully when node become ready", func(t *testing.T) { + r := require.New(t) + nodeName := "node1" + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionFalse, + }, + }, + }, + } + clientset := fake.NewSimpleClientset(node) + + h := checkNodeStatusHandler{ + log: log, + clientset: clientset, + } + + timeout := int32(60) + req := &castai.ActionCheckNodeStatus{ + NodeName: "node1", + NodeStatus: castai.ActionCheckNodeStatus_READY, + WaitTimeoutSeconds: &timeout, + } + + var wg sync.WaitGroup + wg.Add(2) + var err error + go func() { + err = h.Handle(context.Background(), req) + wg.Done() + }() + + go func() { + time.Sleep(1 * time.Second) + node.Status.Conditions[0].Status = v1.ConditionTrue + clientset.CoreV1().Nodes().Update(context.Background(), node, metav1.UpdateOptions{}) + wg.Done() + }() + wg.Wait() + + r.NoError(err) + }) + + t.Run("handle error when node is not ready", func(t *testing.T) { + r := require.New(t) + nodeName := "node1" + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{}, + }, + } + clientset := fake.NewSimpleClientset(node) + watcher := watch.NewFake() + + clientset.PrependWatchReactor("nodes", k8stest.DefaultWatchReactor(watcher, nil)) + go func() { + time.Sleep(time.Second) + watcher.Stop() + }() + + h := checkNodeStatusHandler{ + log: log, + clientset: clientset, + } + + req := &castai.ActionCheckNodeStatus{ + NodeName: "node1", + NodeStatus: castai.ActionCheckNodeStatus_READY, + } + + err := h.Handle(context.Background(), req) + r.Error(err) + r.EqualError(err, "timeout waiting for node node1 to become ready") + }) +} diff --git a/actions/drain_node_handler.go b/actions/drain_node_handler.go index 7d0da6f0..5796ac28 100644 --- a/actions/drain_node_handler.go +++ b/actions/drain_node_handler.go @@ -76,7 +76,7 @@ func (h *drainNodeHandler) Handle(ctx context.Context, data interface{}) error { defer evictCancel() err = h.evictNodePods(evictCtx, log, node) if err != nil && !errors.Is(err, context.DeadlineExceeded) { - return err + return fmt.Errorf("eviciting node pods: %w", err) } if errors.Is(err, context.DeadlineExceeded) { @@ -87,7 +87,7 @@ func (h *drainNodeHandler) Handle(ctx context.Context, data interface{}) error { deleteCtx, deleteCancel := context.WithTimeout(ctx, h.cfg.podsDeleteTimeout) defer deleteCancel() if err := h.deleteNodePods(deleteCtx, log, node); err != nil { - return err + return fmt.Errorf("forcefully deleting pods: %w", err) } } diff --git a/actions/drain_node_handler_test.go b/actions/drain_node_handler_test.go index e4b25e40..2474e1c3 100644 --- a/actions/drain_node_handler_test.go +++ b/actions/drain_node_handler_test.go @@ -103,7 +103,7 @@ func TestDrainNodeHandler(t *testing.T) { } err := h.Handle(context.Background(), req) - r.EqualError(err, "sending evict pods requests: evicting pod pod1 in namespace default: internal") + r.EqualError(err, "eviciting node pods: sending evict pods requests: evicting pod pod1 in namespace default: internal") n, err := clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) r.NoError(err) diff --git a/castai/types.go b/castai/types.go index 27744d6c..3b9ce39e 100644 --- a/castai/types.go +++ b/castai/types.go @@ -29,6 +29,7 @@ type ClusterAction struct { ActionDisconnectCluster *ActionDisconnectCluster `json:"actionDisconnectCluster,omitempty"` ActionSendAKSInitData *ActionSendAKSInitData `json:"actionSendAksInitData,omitempty"` ActionCheckNodeDeleted *ActionCheckNodeDeleted `json:"actionCheckNodeDeleted,omitempty"` + ActionCheckNodeStatus *ActionCheckNodeStatus `json:"actionCheckNodeStatus,omitempty"` CreatedAt time.Time `json:"createdAt"` DoneAt *time.Time `json:"doneAt,omitempty"` Error *string `json:"error,omitempty"` @@ -68,6 +69,9 @@ func (c *ClusterAction) Data() interface{} { if c.ActionCheckNodeDeleted != nil { return c.ActionCheckNodeDeleted } + if c.ActionCheckNodeStatus != nil { + return c.ActionCheckNodeStatus + } return nil } @@ -125,6 +129,19 @@ type ActionCheckNodeDeleted struct { NodeName string `json:"nodeName"` } +type ActionCheckNodeStatus_Status string + +const ( + ActionCheckNodeStatus_READY ActionCheckNodeStatus_Status = "NodeStatus_READY" + ActionCheckNodeStatus_DELETED ActionCheckNodeStatus_Status = "NodeStatus_DELETED" +) + +type ActionCheckNodeStatus struct { + NodeName string `json:"nodeName"` + NodeStatus ActionCheckNodeStatus_Status `json:"nodeStatus,omitempty"` + WaitTimeoutSeconds *int32 `json:"waitTimeoutSeconds,omitempty"` +} + type ActionChartUpsert struct { Namespace string `json:"namespace"` ReleaseName string `json:"releaseName"`