Skip to content

Commit ec65045

Browse files
authored
Merge pull request #13 from civo/fix/node-reboot
Add configurable reboot threshold time and skip reboot if status recently updated
2 parents 842aa57 + 876dbe0 commit ec65045

File tree

4 files changed

+220
-19
lines changed

4 files changed

+220
-19
lines changed

main.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,22 @@ import (
1515
var versionInfo = flag.Bool("version", false, "Print the driver version")
1616

1717
var (
18-
apiURL = strings.TrimSpace(os.Getenv("CIVO_API_URL"))
19-
apiKey = strings.TrimSpace(os.Getenv("CIVO_API_KEY"))
20-
region = strings.TrimSpace(os.Getenv("CIVO_REGION"))
21-
clusterID = strings.TrimSpace(os.Getenv("CIVO_CLUSTER_ID"))
22-
nodePoolID = strings.TrimSpace(os.Getenv("CIVO_NODE_POOL_ID"))
23-
nodeDesiredGPUCount = strings.TrimSpace(os.Getenv("CIVO_NODE_DESIRED_GPU_COUNT"))
18+
apiURL = strings.TrimSpace(os.Getenv("CIVO_API_URL"))
19+
apiKey = strings.TrimSpace(os.Getenv("CIVO_API_KEY"))
20+
region = strings.TrimSpace(os.Getenv("CIVO_REGION"))
21+
clusterID = strings.TrimSpace(os.Getenv("CIVO_CLUSTER_ID"))
22+
nodePoolID = strings.TrimSpace(os.Getenv("CIVO_NODE_POOL_ID"))
23+
nodeDesiredGPUCount = strings.TrimSpace(os.Getenv("CIVO_NODE_DESIRED_GPU_COUNT"))
24+
rebootTimeWindowMinutes = strings.TrimSpace(os.Getenv("CIVO_NODE_REBOOT_TIME_WINDOW_MINUTES"))
2425
)
2526

2627
func run(ctx context.Context) error {
2728
ctx, stop := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
2829
defer stop()
2930

30-
w, err := watcher.NewWatcher(ctx, apiURL, apiKey, region, clusterID, nodePoolID, nodeDesiredGPUCount)
31+
w, err := watcher.NewWatcher(ctx, apiURL, apiKey, region, clusterID, nodePoolID, nodeDesiredGPUCount,
32+
watcher.WithRebootTimeWindowMinutes(rebootTimeWindowMinutes),
33+
)
3134
if err != nil {
3235
return err
3336
}

pkg/watcher/options.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
package watcher
22

33
import (
4+
"strconv"
5+
"time"
6+
47
"github.com/civo/civogo"
58
"k8s.io/client-go/kubernetes"
69
)
710

811
// Option represents a configuration function that modifies watcher object.
912
type Option func(*watcher)
1013

11-
var defaultOptions = []Option{}
14+
var defaultOptions = []Option{
15+
WithRebootTimeWindowMinutes("40"),
16+
}
1217

1318
// WithKubernetesClient returns Option to set Kubernetes API client.
1419
func WithKubernetesClient(client kubernetes.Interface) Option {
@@ -36,3 +41,13 @@ func WithCivoClient(client civogo.Clienter) Option {
3641
}
3742
}
3843
}
44+
45+
// WithRebootTimeWindowMinutes returns Option to set reboot time window.
46+
func WithRebootTimeWindowMinutes(s string) Option {
47+
return func(w *watcher) {
48+
n, err := strconv.Atoi(s)
49+
if err == nil && n > 0 {
50+
w.rebootTimeWindowMinutes = time.Duration(n)
51+
}
52+
}
53+
}

pkg/watcher/watcher.go

+28-5
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,12 @@ type watcher struct {
3232
civoClient civogo.Clienter
3333
clientCfgPath string
3434

35-
clusterID string
36-
region string
37-
apiKey string
38-
apiURL string
39-
nodeDesiredGPUCount int
35+
clusterID string
36+
region string
37+
apiKey string
38+
apiURL string
39+
nodeDesiredGPUCount int
40+
rebootTimeWindowMinutes time.Duration
4041

4142
nodeSelector *metav1.LabelSelector
4243
}
@@ -158,9 +159,15 @@ func (w *watcher) run(ctx context.Context) error {
158159
return err
159160
}
160161

162+
thresholdTime := time.Now().Add(-w.rebootTimeWindowMinutes * time.Minute)
163+
161164
for _, node := range nodes.Items {
162165
if !isNodeDesiredGPU(&node, w.nodeDesiredGPUCount) || !isNodeReady(&node) {
163166
slog.Info("Node is not ready, attempting to reboot", "node", node.GetName())
167+
if isReadyOrNotReadyStatusChangedAfter(&node, thresholdTime) {
168+
slog.Info("Skipping reboot because Ready/NotReady status was updated recently", "node", node.GetName())
169+
continue
170+
}
164171
if err := w.rebootNode(node.GetName()); err != nil {
165172
slog.Error("Failed to reboot Node", "node", node.GetName(), "error", err)
166173
return fmt.Errorf("failed to reboot node: %w", err)
@@ -170,6 +177,22 @@ func (w *watcher) run(ctx context.Context) error {
170177
return nil
171178
}
172179

180+
func isReadyOrNotReadyStatusChangedAfter(node *corev1.Node, thresholdTime time.Time) bool {
181+
var lastChangedTime time.Time
182+
for _, cond := range node.Status.Conditions {
183+
if cond.Type == corev1.NodeReady {
184+
if cond.LastTransitionTime.After(lastChangedTime) {
185+
lastChangedTime = cond.LastTransitionTime.Time
186+
}
187+
}
188+
}
189+
if lastChangedTime.IsZero() {
190+
slog.Error("Node is in an invalid state, NodeReady condition not found", "node", node.GetName())
191+
return false
192+
}
193+
return lastChangedTime.After(thresholdTime)
194+
}
195+
173196
func isNodeReady(node *corev1.Node) bool {
174197
for _, cond := range node.Status.Conditions {
175198
if cond.Type == corev1.NodeReady {

pkg/watcher/watcher_test.go

+166-6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"strconv"
77
"testing"
8+
"time"
89

910
"github.com/civo/civogo"
1011
corev1 "k8s.io/api/core/v1"
@@ -16,12 +17,13 @@ import (
1617
)
1718

1819
var (
19-
testClusterID = "test-cluster-123"
20-
testRegion = "lon1"
21-
testApiKey = "test-api-key"
22-
testApiURL = "https://test.civo.com"
23-
testNodePoolID = "test-node-pool"
24-
testNodeDesiredGPUCount = "8"
20+
testClusterID = "test-cluster-123"
21+
testRegion = "lon1"
22+
testApiKey = "test-api-key"
23+
testApiURL = "https://test.civo.com"
24+
testNodePoolID = "test-node-pool"
25+
testNodeDesiredGPUCount = "8"
26+
testRebootTimeWindowMinutes = time.Duration(40)
2527
)
2628

2729
func TestNew(t *testing.T) {
@@ -54,6 +56,8 @@ func TestNew(t *testing.T) {
5456
opts: []Option{
5557
WithKubernetesClient(fake.NewSimpleClientset()),
5658
WithCivoClient(&FakeClient{}),
59+
WithRebootTimeWindowMinutes("invalid time"), // It is invalid, but the default time (40) will be used.
60+
WithRebootTimeWindowMinutes("0"), // It is invalid, but the default time (40) will be used.
5761
},
5862
},
5963
checkFunc: func(w *watcher) error {
@@ -86,6 +90,9 @@ func TestNew(t *testing.T) {
8690
if w.civoClient == nil {
8791
return fmt.Errorf("civoClient is nil")
8892
}
93+
if w.rebootTimeWindowMinutes != testRebootTimeWindowMinutes {
94+
return fmt.Errorf("w.rebootTimeWindowMinutes mismatch: got %v, want %s", w.nodeSelector, testNodePoolID)
95+
}
8996
return nil
9097
},
9198
},
@@ -338,6 +345,49 @@ func TestRun(t *testing.T) {
338345
}
339346
},
340347
},
348+
{
349+
name: "Returns nil and skips reboot when GPU count matches desired but node is not ready, and LastTransitionTime is more recent than thresholdTime",
350+
args: args{
351+
opts: []Option{
352+
WithKubernetesClient(fake.NewSimpleClientset()),
353+
WithCivoClient(&FakeClient{}),
354+
},
355+
nodeDesiredGPUCount: testNodeDesiredGPUCount,
356+
nodePoolID: testNodePoolID,
357+
},
358+
beforeFunc: func(w *watcher) {
359+
t.Helper()
360+
client := w.client.(*fake.Clientset)
361+
362+
nodes := &corev1.NodeList{
363+
Items: []corev1.Node{
364+
{
365+
ObjectMeta: metav1.ObjectMeta{
366+
Name: "node-01",
367+
Labels: map[string]string{
368+
nodePoolLabelKey: testNodePoolID,
369+
},
370+
},
371+
Status: corev1.NodeStatus{
372+
Conditions: []corev1.NodeCondition{
373+
{
374+
Type: corev1.NodeReady,
375+
Status: corev1.ConditionFalse,
376+
LastTransitionTime: metav1.NewTime(time.Now()),
377+
},
378+
},
379+
Allocatable: corev1.ResourceList{
380+
gpuResourceName: resource.MustParse("8"),
381+
},
382+
},
383+
},
384+
},
385+
}
386+
client.Fake.PrependReactor("list", "nodes", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
387+
return true, nodes, nil
388+
})
389+
},
390+
},
341391
{
342392
name: "Returns an error when unable to list nodes",
343393
args: args{
@@ -430,6 +480,116 @@ func TestRun(t *testing.T) {
430480
}
431481
}
432482

483+
func TestIsReadyOrNotReadyStatusChangedAfter(t *testing.T) {
484+
type test struct {
485+
name string
486+
node *corev1.Node
487+
thresholdTime time.Time
488+
want bool
489+
}
490+
491+
tests := []test{
492+
{
493+
name: "Returns true when NodeReady condition is true (Ready) and last transition time is after threshold",
494+
node: &corev1.Node{
495+
ObjectMeta: metav1.ObjectMeta{
496+
Name: "node-01",
497+
},
498+
Status: corev1.NodeStatus{
499+
Conditions: []corev1.NodeCondition{
500+
{
501+
Type: corev1.NodeReady,
502+
Status: corev1.ConditionTrue,
503+
LastTransitionTime: metav1.NewTime(time.Now()),
504+
},
505+
},
506+
},
507+
},
508+
thresholdTime: time.Now().Add(-time.Hour),
509+
want: true,
510+
},
511+
{
512+
name: "Returns true when NodeReady condition is false (NotReady) and last transition time is after threshold",
513+
node: &corev1.Node{
514+
ObjectMeta: metav1.ObjectMeta{
515+
Name: "node-01",
516+
},
517+
Status: corev1.NodeStatus{
518+
Conditions: []corev1.NodeCondition{
519+
{
520+
Type: corev1.NodeReady,
521+
Status: corev1.ConditionFalse,
522+
LastTransitionTime: metav1.NewTime(time.Now()),
523+
},
524+
},
525+
},
526+
},
527+
thresholdTime: time.Now().Add(-time.Hour),
528+
want: true,
529+
},
530+
{
531+
name: "Returns false when the latest NodeReady condition is older than thresholdTime",
532+
node: &corev1.Node{
533+
ObjectMeta: metav1.ObjectMeta{
534+
Name: "node-01",
535+
},
536+
Status: corev1.NodeStatus{
537+
Conditions: []corev1.NodeCondition{
538+
{
539+
Type: corev1.NodeReady,
540+
Status: corev1.ConditionFalse,
541+
LastTransitionTime: metav1.NewTime(time.Now().Add(-time.Hour)),
542+
},
543+
},
544+
},
545+
},
546+
thresholdTime: time.Now(),
547+
want: false,
548+
},
549+
{
550+
name: "Returns false when no conditions are present on the node",
551+
node: &corev1.Node{
552+
ObjectMeta: metav1.ObjectMeta{
553+
Name: "node-01",
554+
},
555+
Status: corev1.NodeStatus{
556+
Conditions: []corev1.NodeCondition{},
557+
},
558+
},
559+
thresholdTime: time.Now().Add(-time.Hour),
560+
want: false,
561+
},
562+
{
563+
name: "Returns false when there is only NodeDiskPressure condition",
564+
node: &corev1.Node{
565+
ObjectMeta: metav1.ObjectMeta{
566+
Name: "node-01",
567+
},
568+
Status: corev1.NodeStatus{
569+
Conditions: []corev1.NodeCondition{
570+
{
571+
Type: corev1.NodeDiskPressure,
572+
Status: corev1.ConditionFalse,
573+
LastHeartbeatTime: metav1.NewTime(time.Now()),
574+
},
575+
},
576+
},
577+
},
578+
thresholdTime: time.Now().Add(-time.Hour),
579+
want: false,
580+
},
581+
}
582+
583+
for _, test := range tests {
584+
t.Run(test.name, func(t *testing.T) {
585+
got := isReadyOrNotReadyStatusChangedAfter(test.node, test.thresholdTime)
586+
if got != test.want {
587+
t.Errorf("got = %v, want %v", got, test.want)
588+
}
589+
})
590+
}
591+
}
592+
433593
func TestIsNodeReady(t *testing.T) {
434594
type test struct {
435595
name string

0 commit comments

Comments
 (0)