From a691d2cfd35008b5f672e8674075b56906a9d9c0 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 27 Feb 2025 23:53:09 +0900 Subject: [PATCH 01/10] feat: add test case and bugfix Signed-off-by: hlts2 --- go.mod | 4 +- main.go | 32 ++++-- pkg/watcher/fake.go | 10 +- pkg/watcher/options.go | 32 ++++++ pkg/watcher/watcher.go | 125 +++++++++++++--------- pkg/watcher/watcher_test.go | 199 ++++++++++++++++++++++++++++++++++++ 6 files changed, 339 insertions(+), 63 deletions(-) create mode 100644 pkg/watcher/watcher_test.go diff --git a/go.mod b/go.mod index 5adf1ee..231a4f4 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ go 1.23.5 require ( github.com/civo/civogo v0.3.94 + k8s.io/api v0.32.2 + k8s.io/apimachinery v0.32.2 k8s.io/client-go v0.32.2 ) @@ -42,8 +44,6 @@ require ( gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.32.2 // indirect - k8s.io/apimachinery v0.32.2 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect diff --git a/main.go b/main.go index 5cf5ae6..5bcda1f 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,7 @@ package main import ( "context" "flag" - "log" + "log/slog" "os" "os/signal" "strings" @@ -16,17 +16,16 @@ import ( var versionInfo = flag.Bool("version", false, "Print the driver version") var ( - region = strings.TrimSpace(os.Getenv("CIVO_REGION")) - clusterName = strings.TrimSpace(os.Getenv("CIVO_CLUSTER_NAME")) - apiKey = strings.TrimSpace(os.Getenv("CIVO_API_KEY")) + apiURL = strings.TrimSpace(os.Getenv("CIVO_API_URL")) + apiKey = strings.TrimSpace(os.Getenv("CIVO_API_KEY")) + region = strings.TrimSpace(os.Getenv("CIVO_REGION")) + clusterID = strings.TrimSpace(os.Getenv("CIVO_CLUSTER_ID")) + nodePoolID = strings.TrimSpace(os.Getenv("CIVO_NODE_POOL_ID")) + + // TODO: GPU count ) func run(ctx context.Context) error { - w, err := watcher.NewWatcher(ctx, clusterName, region, apiKey) // TODO: Add options - if err != nil { - return err - } - ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -44,17 +43,28 @@ func run(ctx context.Context) error { cancel() }() + w, err := watcher.NewWatcher(ctx, apiURL, apiKey, region, clusterID, nodePoolID) + if err != nil { + return err + } return w.Run(ctx) } func main() { flag.Parse() if *versionInfo { - // TOD: log + slog.Info("node-agent", "version", watcher.Version) return } + slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil).WithAttrs([]slog.Attr{ + slog.String("clusterID", clusterID), + slog.String("region", region), + slog.String("nodePoolID", nodePoolID), + }))) + if err := run(context.Background()); err != nil { - log.Fatal(err) + slog.Error("The node-agent encountered a critical error and will exit", "error", err) + os.Exit(1) } } diff --git a/pkg/watcher/fake.go b/pkg/watcher/fake.go index fab73cb..40c37d9 100644 --- a/pkg/watcher/fake.go +++ b/pkg/watcher/fake.go @@ -5,7 +5,8 @@ import "github.com/civo/civogo" // FakeClient is a test client used for more flexible behavior control // when FakeClient alone is not sufficient. type FakeClient struct { - HardRebootInstanceFunc func(id string) (*civogo.SimpleResponse, error) + HardRebootInstanceFunc func(id string) (*civogo.SimpleResponse, error) + FindKubernetesClusterInstanceFunc func(clusterID, search string) (*civogo.Instance, error) *civogo.FakeClient } @@ -17,4 +18,11 @@ func (f *FakeClient) HardRebootInstance(id string) (*civogo.SimpleResponse, erro return f.FakeClient.HardRebootInstance(id) } +func (f *FakeClient) FindKubernetesClusterInstance(clusterID, search string) (*civogo.Instance, error) { + if f.FindKubernetesClusterInstanceFunc != nil { + return f.FindKubernetesClusterInstanceFunc(clusterID, search) + } + return f.FakeClient.FindKubernetesClusterInstance(clusterID, search) +} + var _ civogo.Clienter = (*FakeClient)(nil) diff --git a/pkg/watcher/options.go b/pkg/watcher/options.go index 73dd492..e5a3a9b 100644 --- a/pkg/watcher/options.go +++ b/pkg/watcher/options.go @@ -1,6 +1,38 @@ package watcher +import ( + "github.com/civo/civogo" + "k8s.io/client-go/kubernetes" +) + // Option represents a configuration function that modifies watcher object. type Option func(*watcher) var defaultOptions = []Option{} + +// WithKubernetesClient returns Option to set Kubernetes API client. +func WithKubernetesClient(client kubernetes.Interface) Option { + return func(w *watcher) { + if client != nil { + w.client = client + } + } +} + +// WithKubernetesClient returns Option to set Kubernetes config path. +func WithKubernetesClientConfigPath(path string) Option { + return func(w *watcher) { + if path != "" { + w.clientCfgPath = path + } + } +} + +// WithCivoClient returns Option to set Civo API client. +func WithCivoClient(client civogo.Clienter) Option { + return func(w *watcher) { + if client != nil { + w.civoClient = client + } + } +} diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 2011d5d..e414815 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -3,10 +3,11 @@ package watcher import ( "context" "fmt" - "os" + "log/slog" "time" "github.com/civo/civogo" + corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -18,30 +19,53 @@ import ( // Version is the current version of the this watcher var Version string = "0.0.1" +const nodePoolLabelKey = "kubernetes.civo.com/civo-node-pool" + type Watcher interface { Run(ctx context.Context) error } type watcher struct { - client kubernetes.Interface - civoClient civogo.Clienter - clusterName string - region string - apiKey string + client kubernetes.Interface + civoClient civogo.Clienter + clientCfgPath string + + clusterID string + region string + apiKey string + apiURL string + + nodeSelector *metav1.LabelSelector } -func NewWatcher(ctx context.Context, clusterName, region, apiKey string, opts ...Option) (Watcher, error) { +func NewWatcher(ctx context.Context, apiURL, apiKey, region, clusterID, nodePoolID string, opts ...Option) (Watcher, error) { w := new(watcher) for _, opt := range append(defaultOptions, opts...) { opt(w) } + + if clusterID == "" { + return nil, fmt.Errorf("CIVO_CLUSTER_ID not set") + } + if nodePoolID == "" { + return nil, fmt.Errorf("CIVO_NODE_POOL_ID not set") + } + if w.civoClient == nil && apiKey == "" { + return nil, fmt.Errorf("CIVO_API_KEY not set") + } + + w.nodeSelector = &metav1.LabelSelector{ + MatchLabels: map[string]string{ + nodePoolLabelKey: nodePoolID, + }, + } + if err := w.setupKubernetesClient(); err != nil { return nil, err } - if err := w.setupCivoClient(ctx); err != nil { + if err := w.setupCivoClient(); err != nil { return nil, err } - return w, nil } @@ -49,12 +73,10 @@ func NewWatcher(ctx context.Context, clusterName, region, apiKey string, opts .. // If kubeconfig path is not empty, the client will be created using that path. // Otherwise, if the kubeconfig path is empty, the client will be created using the in-clustetr config. func (w *watcher) setupKubernetesClient() (err error) { - kubeconfig := os.Getenv("KUBECONFIG") - - if kubeconfig != "" && w.client == nil { - cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if w.clientCfgPath != "" && w.client == nil { + cfg, err := clientcmd.BuildConfigFromFlags("", w.clientCfgPath) if err != nil { - return fmt.Errorf("failed to build kubeconfig from path %q: %w", kubeconfig, err) + return fmt.Errorf("failed to build kubeconfig from path %q: %w", cfg, err) } w.client, err = kubernetes.NewForConfig(cfg) if err != nil { @@ -76,80 +98,85 @@ func (w *watcher) setupKubernetesClient() (err error) { return nil } -func (w *watcher) setupCivoClient(_ context.Context) error { - - if len(w.apiKey) == 0 { - return fmt.Errorf("CIVO_API_KEY not set") +func (w *watcher) setupCivoClient() error { + if w.civoClient != nil { + return nil } - civoClient, err := civogo.NewClient(w.apiKey, w.region) + client, err := civogo.NewClientWithURL(w.apiKey, w.apiURL, w.region) if err != nil { return err } - w.civoClient = civoClient + + userAgent := &civogo.Component{ + ID: w.clusterID, + Name: "node-agent", + Version: Version, + } + client.SetUserAgent(userAgent) + + w.civoClient = client return nil } func (w *watcher) Run(ctx context.Context) error { ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() for { select { case <-ticker.C: - w.listNodes(ctx) + slog.Info("Started the watcher process...") + if err := w.run(ctx); err != nil { + slog.Error("An error occurred while running the watcher process", "error", err) + } case <-ctx.Done(): return nil } } } -func (w *watcher) listNodes(ctx context.Context) { - nodes, err := w.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) +func (w *watcher) run(ctx context.Context) error { + nodes, err := w.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{ + LabelSelector: metav1.FormatLabelSelector(w.nodeSelector), + }) if err != nil { - fmt.Printf("Error listing nodes: %v\n", err) - return + return err } - cluster, err := w.civoClient.GetKubernetesCluster(w.clusterName) - if err != nil { - fmt.Printf("Error getting cluster: %v\n", err) - return - } + // TODO: add logic to check gpu count. - fmt.Println("\nNodes List:") for _, node := range nodes.Items { - condition := getNodeCondition(node) - if condition != "Ready" { - if err := w.restart(cluster); err != nil { - fmt.Printf("Error restarting instance: %v\n", err) + if !isNodeReady(&node) { + slog.Info("Node is not ready, attempting to reboot", "node", node.GetName()) + if err := w.rebootNode(node.GetName()); err != nil { + slog.Error("Failed to reboot Node", "node", node.GetName(), "error", err) + return err } } } + return nil } -func getNodeCondition(node v1.Node) string { +func isNodeReady(node *v1.Node) bool { for _, cond := range node.Status.Conditions { - if cond.Type == v1.NodeReady { - if cond.Status == v1.ConditionTrue { - return "Ready" - } - return "NotReady" + if cond.Type == corev1.NodeReady { + return cond.Status == corev1.ConditionTrue } } - return "Unknown" + return false } -func (w *watcher) restart(cluster *civogo.KubernetesCluster) error { - instance, err := w.civoClient.GetKubernetesCluster(cluster.ID) +func (w *watcher) rebootNode(name string) error { + instance, err := w.civoClient.FindKubernetesClusterInstance(w.clusterID, name) if err != nil { - return fmt.Errorf("failed to get instance: %w", err) + return fmt.Errorf("failed to find instance, clusterID: %s, nodeName: %s: %w", w.clusterID, name, err) } - res, err := w.civoClient.RebootInstance(instance.ID) + _, err = w.civoClient.HardRebootInstance(instance.ID) if err != nil { - return fmt.Errorf("failed to reboot instance: %w", err) + return fmt.Errorf("failed to reboot instance, clusterID: %s, instanceID: %s: %w", w.clusterID, instance.ID, err) } - - fmt.Printf("Instance %s is rebooting: %v\n", instance.ID, res) + slog.Info("Instance is rebooting", "instanceID", instance.ID) return nil } diff --git a/pkg/watcher/watcher_test.go b/pkg/watcher/watcher_test.go new file mode 100644 index 0000000..342696b --- /dev/null +++ b/pkg/watcher/watcher_test.go @@ -0,0 +1,199 @@ +package watcher + +import ( + "errors" + "testing" + + "github.com/civo/civogo" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +var ( + testClusterID = "test-cluster-123" + testRegion = "lon1" + testApiKey = "test-api-key" + testApiURL = "https://test.civo.com" + testnodePoolID = "test-node-pool" +) + +func TestIsNodeReady(t *testing.T) { + type test struct { + name string + node *corev1.Node + want bool + } + + tests := []test{ + { + name: "Returns true when Node is ready state", + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-01", + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + { + Type: corev1.NodeReady, + Status: corev1.ConditionFalse, + }, + }, + }, + }, + want: true, + }, + { + name: "Returns false when Node is not ready state", + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-01", + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionFalse, + }, + }, + }, + }, + }, + { + name: "Returns false when no conditions for the node", + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-01", + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{}, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := isNodeReady(test.node) + if got != test.want { + t.Errorf("got = %v, want %v", got, test.want) + } + }) + } +} + +func TestRebootNode(t *testing.T) { + type args struct { + nodeName string + opts []Option + } + type test struct { + name string + args args + beforeFunc func(*testing.T, *watcher) + wantErr bool + } + + tests := []test{ + { + name: "Returns nil when there is no error finding and rebooting the instance", + args: args{ + nodeName: "node-01", + opts: []Option{ + WithKubernetesClient(fake.NewSimpleClientset()), + WithCivoClient(&FakeClient{}), + }, + }, + beforeFunc: func(t *testing.T, w *watcher) { + t.Helper() + client := w.civoClient.(*FakeClient) + + instance := &civogo.Instance{ + ID: "instance-01", + } + + client.FindKubernetesClusterInstanceFunc = func(clusterID, search string) (*civogo.Instance, error) { + return instance, nil + } + client.HardRebootInstanceFunc = func(id string) (*civogo.SimpleResponse, error) { + if instance.ID != id { + t.Errorf("instanceId dose not match. want: %s, but got: %s", instance.ID, id) + } + return new(civogo.SimpleResponse), nil + } + }, + }, + { + name: "Returns an error when instance lookup fails", + args: args{ + nodeName: "node-01", + opts: []Option{ + WithKubernetesClient(fake.NewSimpleClientset()), + WithCivoClient(&FakeClient{}), + }, + }, + beforeFunc: func(t *testing.T, w *watcher) { + t.Helper() + client := w.civoClient.(*FakeClient) + + client.FindKubernetesClusterInstanceFunc = func(clusterID, search string) (*civogo.Instance, error) { + return nil, errors.New("invalid error") + } + }, + wantErr: true, + }, + { + name: "Returns an error when instance reboot fails", + args: args{ + nodeName: "node-01", + opts: []Option{ + WithKubernetesClient(fake.NewSimpleClientset()), + WithCivoClient(&FakeClient{}), + }, + }, + beforeFunc: func(t *testing.T, w *watcher) { + t.Helper() + client := w.civoClient.(*FakeClient) + + instance := &civogo.Instance{ + ID: "instance-01", + } + + client.FindKubernetesClusterInstanceFunc = func(clusterID, search string) (*civogo.Instance, error) { + return instance, nil + } + client.HardRebootInstanceFunc = func(id string) (*civogo.SimpleResponse, error) { + if instance.ID != id { + t.Errorf("instanceId dose not match. want: %s, but got: %s", instance.ID, id) + } + return nil, errors.New("invalid error") + } + }, + wantErr: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + w, err := NewWatcher(t.Context(), + testApiURL, testApiKey, testRegion, testClusterID, testnodePoolID, test.args.opts...) + if err != nil { + t.Fatal(err) + } + + obj := w.(*watcher) + if test.beforeFunc != nil { + test.beforeFunc(t, obj) + } + + err = obj.rebootNode(test.args.nodeName) + if (err != nil) != test.wantErr { + t.Errorf("error = %v, wantErr %v", err, test.wantErr) + } + }) + } +} From e1a0db4a6c0089c117550cfa576204b8745fcb82 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Fri, 28 Feb 2025 01:27:58 +0900 Subject: [PATCH 02/10] feat: add base logic for checking gpc Signed-off-by: hlts2 --- main.go | 13 ++++++------- pkg/watcher/watcher.go | 32 +++++++++++++++++++++++++------- pkg/watcher/watcher_test.go | 13 +++++++------ 3 files changed, 38 insertions(+), 20 deletions(-) diff --git a/main.go b/main.go index 5bcda1f..375c9b8 100644 --- a/main.go +++ b/main.go @@ -16,13 +16,12 @@ import ( var versionInfo = flag.Bool("version", false, "Print the driver version") var ( - apiURL = strings.TrimSpace(os.Getenv("CIVO_API_URL")) - apiKey = strings.TrimSpace(os.Getenv("CIVO_API_KEY")) - region = strings.TrimSpace(os.Getenv("CIVO_REGION")) - clusterID = strings.TrimSpace(os.Getenv("CIVO_CLUSTER_ID")) - nodePoolID = strings.TrimSpace(os.Getenv("CIVO_NODE_POOL_ID")) - - // TODO: GPU count + apiURL = strings.TrimSpace(os.Getenv("CIVO_API_URL")) + apiKey = strings.TrimSpace(os.Getenv("CIVO_API_KEY")) + region = strings.TrimSpace(os.Getenv("CIVO_REGION")) + clusterID = strings.TrimSpace(os.Getenv("CIVO_CLUSTER_ID")) + nodePoolID = strings.TrimSpace(os.Getenv("CIVO_NODE_POOL_ID")) + nodeDesiredGPUCount = strings.TrimSpace(os.Getenv("CIVO_NODE_DESIRED_GPU_COUNT")) ) func run(ctx context.Context) error { diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index e414815..eb48a0a 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "strconv" "time" "github.com/civo/civogo" @@ -19,7 +20,10 @@ import ( // Version is the current version of the this watcher var Version string = "0.0.1" -const nodePoolLabelKey = "kubernetes.civo.com/civo-node-pool" +const ( + nodePoolLabelKey = "kubernetes.civo.com/civo-node-pool" + gpuStasName = "nvidia.com/gpu" +) type Watcher interface { Run(ctx context.Context) error @@ -30,15 +34,16 @@ type watcher struct { civoClient civogo.Clienter clientCfgPath string - clusterID string - region string - apiKey string - apiURL string + clusterID string + region string + apiKey string + apiURL string + nodeDesiredGPUCount int nodeSelector *metav1.LabelSelector } -func NewWatcher(ctx context.Context, apiURL, apiKey, region, clusterID, nodePoolID string, opts ...Option) (Watcher, error) { +func NewWatcher(ctx context.Context, apiURL, apiKey, region, clusterID, nodePoolID, nodeDesiredGPUCount string, opts ...Option) (Watcher, error) { w := new(watcher) for _, opt := range append(defaultOptions, opts...) { opt(w) @@ -54,6 +59,15 @@ func NewWatcher(ctx context.Context, apiURL, apiKey, region, clusterID, nodePool return nil, fmt.Errorf("CIVO_API_KEY not set") } + n, err := strconv.Atoi(nodeDesiredGPUCount) + if err != nil { + return nil, fmt.Errorf("CIVO_NODE_DESIRED_GPU_COUNT has an invalid value, %s: %w", nodeDesiredGPUCount, err) + } + if n < 1 { + return nil, fmt.Errorf("CIVO_NODE_DESIRED_GPU_COUNT must be at least 1: %s", nodeDesiredGPUCount) + } + + w.nodeDesiredGPUCount = n w.nodeSelector = &metav1.LabelSelector{ MatchLabels: map[string]string{ nodePoolLabelKey: nodePoolID, @@ -147,7 +161,7 @@ func (w *watcher) run(ctx context.Context) error { // TODO: add logic to check gpu count. for _, node := range nodes.Items { - if !isNodeReady(&node) { + if !isNodeDesiredGPU(&node, w.nodeDesiredGPUCount) || !isNodeReady(&node) { slog.Info("Node is not ready, attempting to reboot", "node", node.GetName()) if err := w.rebootNode(node.GetName()); err != nil { slog.Error("Failed to reboot Node", "node", node.GetName(), "error", err) @@ -167,6 +181,10 @@ func isNodeReady(node *v1.Node) bool { return false } +func isNodeDesiredGPU(node *v1.Node, desired int) bool { + return false +} + func (w *watcher) rebootNode(name string) error { instance, err := w.civoClient.FindKubernetesClusterInstance(w.clusterID, name) if err != nil { diff --git a/pkg/watcher/watcher_test.go b/pkg/watcher/watcher_test.go index 342696b..ec3e738 100644 --- a/pkg/watcher/watcher_test.go +++ b/pkg/watcher/watcher_test.go @@ -11,11 +11,12 @@ import ( ) var ( - testClusterID = "test-cluster-123" - testRegion = "lon1" - testApiKey = "test-api-key" - testApiURL = "https://test.civo.com" - testnodePoolID = "test-node-pool" + testClusterID = "test-cluster-123" + testRegion = "lon1" + testApiKey = "test-api-key" + testApiURL = "https://test.civo.com" + testnodePoolID = "test-node-pool" + testNodeDesiredGPUCount = "8" ) func TestIsNodeReady(t *testing.T) { @@ -180,7 +181,7 @@ func TestRebootNode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { w, err := NewWatcher(t.Context(), - testApiURL, testApiKey, testRegion, testClusterID, testnodePoolID, test.args.opts...) + testApiURL, testApiKey, testRegion, testClusterID, testnodePoolID, testNodeDesiredGPUCount, test.args.opts...) if err != nil { t.Fatal(err) } From 749ca210736807f9c220d87c8efbdf058fd3d33b Mon Sep 17 00:00:00 2001 From: hlts2 Date: Fri, 28 Feb 2025 01:41:57 +0900 Subject: [PATCH 03/10] feat: add node gpc check logic Signed-off-by: hlts2 --- pkg/watcher/watcher.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index eb48a0a..5622c26 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -158,8 +158,6 @@ func (w *watcher) run(ctx context.Context) error { return err } - // TODO: add logic to check gpu count. - for _, node := range nodes.Items { if !isNodeDesiredGPU(&node, w.nodeDesiredGPUCount) || !isNodeReady(&node) { slog.Info("Node is not ready, attempting to reboot", "node", node.GetName()) @@ -182,7 +180,15 @@ func isNodeReady(node *v1.Node) bool { } func isNodeDesiredGPU(node *v1.Node, desired int) bool { - return false + quantity := node.Status.Allocatable[gpuStasName] + if quantity.IsZero() { + return false + } + gpuCount, ok := quantity.AsInt64() + if !ok { + return false + } + return gpuCount == int64(desired) } func (w *watcher) rebootNode(name string) error { From aa95462c5913da82a47cd872ef89acd110c53e08 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Fri, 28 Feb 2025 01:51:53 +0900 Subject: [PATCH 04/10] feat: add test case for gpc check Signed-off-by: hlts2 --- pkg/watcher/watcher.go | 9 +++-- pkg/watcher/watcher_test.go | 67 +++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 5 deletions(-) diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 5622c26..bc89ed0 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -9,7 +9,6 @@ import ( "github.com/civo/civogo" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -22,7 +21,7 @@ var Version string = "0.0.1" const ( nodePoolLabelKey = "kubernetes.civo.com/civo-node-pool" - gpuStasName = "nvidia.com/gpu" + gpuResourceName = "nvidia.com/gpu" ) type Watcher interface { @@ -170,7 +169,7 @@ func (w *watcher) run(ctx context.Context) error { return nil } -func isNodeReady(node *v1.Node) bool { +func isNodeReady(node *corev1.Node) bool { for _, cond := range node.Status.Conditions { if cond.Type == corev1.NodeReady { return cond.Status == corev1.ConditionTrue @@ -179,8 +178,8 @@ func isNodeReady(node *v1.Node) bool { return false } -func isNodeDesiredGPU(node *v1.Node, desired int) bool { - quantity := node.Status.Allocatable[gpuStasName] +func isNodeDesiredGPU(node *corev1.Node, desired int) bool { + quantity := node.Status.Allocatable[gpuResourceName] if quantity.IsZero() { return false } diff --git a/pkg/watcher/watcher_test.go b/pkg/watcher/watcher_test.go index ec3e738..5904f12 100644 --- a/pkg/watcher/watcher_test.go +++ b/pkg/watcher/watcher_test.go @@ -6,6 +6,7 @@ import ( "github.com/civo/civogo" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" ) @@ -87,6 +88,72 @@ func TestIsNodeReady(t *testing.T) { } } +func TestIsNodeDesiredGPU(t *testing.T) { + type test struct { + name string + node *corev1.Node + desired int + want bool + } + + tests := []test{ + { + name: "Returns true when GPU count matches desired value", + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-01", + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + gpuResourceName: resource.MustParse("8"), + }, + }, + }, + desired: 8, + want: true, + }, + { + name: "Returns false when GPU count is 0", + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-01", + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + gpuResourceName: resource.MustParse("0"), + }, + }, + }, + desired: 8, + want: false, + }, + { + name: "Returns false when GPU count is less than desired value", + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-01", + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + gpuResourceName: resource.MustParse("7"), + }, + }, + }, + desired: 8, + want: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := isNodeDesiredGPU(test.node, test.desired) + if got != test.want { + t.Errorf("got = %v, want %v", got, test.want) + } + }) + } +} + func TestRebootNode(t *testing.T) { type args struct { nodeName string From e83113eded2030551788f5acae794cc6c8b086e5 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Fri, 28 Feb 2025 01:55:36 +0900 Subject: [PATCH 05/10] fix: add todo comment Signed-off-by: hlts2 --- pkg/watcher/watcher.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index bc89ed0..614eb3c 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -150,6 +150,7 @@ func (w *watcher) Run(ctx context.Context) error { } func (w *watcher) run(ctx context.Context) error { + // TODO: Change to WatchAPI later. nodes, err := w.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{ LabelSelector: metav1.FormatLabelSelector(w.nodeSelector), }) From f58b3750677411be262df0510bd05384043d0b46 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Fri, 28 Feb 2025 01:56:38 +0900 Subject: [PATCH 06/10] fix: build error Signed-off-by: hlts2 --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 375c9b8..c7c27e9 100644 --- a/main.go +++ b/main.go @@ -42,7 +42,7 @@ func run(ctx context.Context) error { cancel() }() - w, err := watcher.NewWatcher(ctx, apiURL, apiKey, region, clusterID, nodePoolID) + w, err := watcher.NewWatcher(ctx, apiURL, apiKey, region, clusterID, nodePoolID, nodeDesiredGPUCount) if err != nil { return err } From 3c5cf231487afd93a7a973ca4933f214ab6442f9 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Fri, 28 Feb 2025 02:30:27 +0900 Subject: [PATCH 07/10] feat: add test case for main check process Signed-off-by: hlts2 --- pkg/watcher/watcher_test.go | 268 +++++++++++++++++++++++++++++++++++- 1 file changed, 266 insertions(+), 2 deletions(-) diff --git a/pkg/watcher/watcher_test.go b/pkg/watcher/watcher_test.go index 5904f12..cec9027 100644 --- a/pkg/watcher/watcher_test.go +++ b/pkg/watcher/watcher_test.go @@ -8,7 +8,9 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" ) var ( @@ -16,10 +18,272 @@ var ( testRegion = "lon1" testApiKey = "test-api-key" testApiURL = "https://test.civo.com" - testnodePoolID = "test-node-pool" + testNodePoolID = "test-node-pool" testNodeDesiredGPUCount = "8" ) +func TestRun(t *testing.T) { + type args struct { + opts []Option + nodeDesiredGPUCount string + nodePoolID string + } + type test struct { + name string + args args + beforeFunc func(*watcher) + wantErr bool + } + + tests := []test{ + { + name: "Returns nil when node GPU count is 8 and no reboot needed", + args: args{ + opts: []Option{ + WithKubernetesClient(fake.NewSimpleClientset()), + WithCivoClient(&FakeClient{}), + }, + nodeDesiredGPUCount: testNodeDesiredGPUCount, + nodePoolID: testNodePoolID, + }, + beforeFunc: func(w *watcher) { + t.Helper() + client := w.client.(*fake.Clientset) + + nodes := &corev1.NodeList{ + Items: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-01", + Labels: map[string]string{ + nodePoolLabelKey: testNodePoolID, + }, + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + { + Type: corev1.NodeReady, + Status: corev1.ConditionFalse, + }, + }, + Allocatable: corev1.ResourceList{ + gpuResourceName: resource.MustParse("8"), + }, + }, + }, + }, + } + client.Fake.PrependReactor("list", "nodes", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nodes, nil + }) + }, + }, + { + name: "Returns nil and triggers reboot when GPU count drops below desired (7 GPUs available)", + args: args{ + opts: []Option{ + WithKubernetesClient(fake.NewSimpleClientset()), + WithCivoClient(&FakeClient{}), + }, + nodeDesiredGPUCount: testNodeDesiredGPUCount, + nodePoolID: testNodePoolID, + }, + beforeFunc: func(w *watcher) { + t.Helper() + client := w.client.(*fake.Clientset) + + nodes := &corev1.NodeList{ + Items: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-01", + Labels: map[string]string{ + nodePoolLabelKey: testNodePoolID, + }, + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + { + Type: corev1.NodeReady, + Status: corev1.ConditionFalse, + }, + }, + Allocatable: corev1.ResourceList{ + gpuResourceName: resource.MustParse("7"), + }, + }, + }, + }, + } + client.Fake.PrependReactor("list", "nodes", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nodes, nil + }) + + civoClient := w.civoClient.(*FakeClient) + instance := &civogo.Instance{ + ID: "instance-01", + } + civoClient.FindKubernetesClusterInstanceFunc = func(clusterID, search string) (*civogo.Instance, error) { + return instance, nil + } + civoClient.HardRebootInstanceFunc = func(id string) (*civogo.SimpleResponse, error) { + return new(civogo.SimpleResponse), nil + } + }, + }, + { + name: "Returns nil and triggers reboot when GPU count matches desired but node is not ready", + args: args{ + opts: []Option{ + WithKubernetesClient(fake.NewSimpleClientset()), + WithCivoClient(&FakeClient{}), + }, + nodeDesiredGPUCount: testNodeDesiredGPUCount, + nodePoolID: testNodePoolID, + }, + beforeFunc: func(w *watcher) { + t.Helper() + client := w.client.(*fake.Clientset) + + nodes := &corev1.NodeList{ + Items: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-01", + Labels: map[string]string{ + nodePoolLabelKey: testNodePoolID, + }, + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionFalse, + }, + }, + Allocatable: corev1.ResourceList{ + gpuResourceName: resource.MustParse("8"), + }, + }, + }, + }, + } + client.Fake.PrependReactor("list", "nodes", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nodes, nil + }) + + civoClient := w.civoClient.(*FakeClient) + instance := &civogo.Instance{ + ID: "instance-01", + } + civoClient.FindKubernetesClusterInstanceFunc = func(clusterID, search string) (*civogo.Instance, error) { + return instance, nil + } + civoClient.HardRebootInstanceFunc = func(id string) (*civogo.SimpleResponse, error) { + return new(civogo.SimpleResponse), nil + } + }, + }, + { + name: "Returns an error when unable to list nodes", + args: args{ + opts: []Option{ + WithKubernetesClient(fake.NewSimpleClientset()), + WithCivoClient(&FakeClient{}), + }, + nodeDesiredGPUCount: testNodeDesiredGPUCount, + nodePoolID: testNodePoolID, + }, + beforeFunc: func(w *watcher) { + t.Helper() + client := w.client.(*fake.Clientset) + + client.Fake.PrependReactor("list", "nodes", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, &corev1.NodeList{}, errors.New("invalid error") + }) + }, + wantErr: true, + }, + + { + name: "Returns an error when finding the Kubernetes cluster instance fails during reboot", + args: args{ + opts: []Option{ + WithKubernetesClient(fake.NewSimpleClientset()), + WithCivoClient(&FakeClient{}), + }, + nodeDesiredGPUCount: testNodeDesiredGPUCount, + nodePoolID: testNodePoolID, + }, + beforeFunc: func(w *watcher) { + t.Helper() + client := w.client.(*fake.Clientset) + + nodes := &corev1.NodeList{ + Items: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-01", + Labels: map[string]string{ + nodePoolLabelKey: testNodePoolID, + }, + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionFalse, + }, + }, + Allocatable: corev1.ResourceList{ + gpuResourceName: resource.MustParse("8"), + }, + }, + }, + }, + } + client.Fake.PrependReactor("list", "nodes", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nodes, nil + }) + + civoClient := w.civoClient.(*FakeClient) + civoClient.FindKubernetesClusterInstanceFunc = func(clusterID, search string) (*civogo.Instance, error) { + return nil, errors.New("invalid error") + } + }, + wantErr: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + w, err := NewWatcher(t.Context(), + testApiURL, testApiKey, testRegion, testClusterID, test.args.nodePoolID, test.args.nodeDesiredGPUCount, test.args.opts...) + if err != nil { + t.Fatal(err) + } + + obj := w.(*watcher) + if test.beforeFunc != nil { + test.beforeFunc(obj) + } + + err = obj.run(t.Context()) + if (err != nil) != test.wantErr { + t.Errorf("error = %v, wantErr %v", err, test.wantErr) + } + }) + } +} + func TestIsNodeReady(t *testing.T) { type test struct { name string @@ -248,7 +512,7 @@ func TestRebootNode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { w, err := NewWatcher(t.Context(), - testApiURL, testApiKey, testRegion, testClusterID, testnodePoolID, testNodeDesiredGPUCount, test.args.opts...) + testApiURL, testApiKey, testRegion, testClusterID, testNodePoolID, testNodeDesiredGPUCount, test.args.opts...) if err != nil { t.Fatal(err) } From 38544d5c8bee65c736d28405a9f93cec1fece7a1 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Fri, 28 Feb 2025 02:32:13 +0900 Subject: [PATCH 08/10] tweak Signed-off-by: hlts2 --- pkg/watcher/watcher.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 614eb3c..bc89ed0 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -150,7 +150,6 @@ func (w *watcher) Run(ctx context.Context) error { } func (w *watcher) run(ctx context.Context) error { - // TODO: Change to WatchAPI later. nodes, err := w.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{ LabelSelector: metav1.FormatLabelSelector(w.nodeSelector), }) From 0100f351f6b4a42dbb829372f91c0a17fe51e09e Mon Sep 17 00:00:00 2001 From: hlts2 Date: Fri, 28 Feb 2025 04:22:26 +0900 Subject: [PATCH 09/10] fix: fix code based on suggestion Signed-off-by: hlts2 --- pkg/watcher/watcher.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index bc89ed0..e240fc4 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -118,7 +118,7 @@ func (w *watcher) setupCivoClient() error { client, err := civogo.NewClientWithURL(w.apiKey, w.apiURL, w.region) if err != nil { - return err + return fmt.Errorf("failed to intiliase civo client: %w", err) } userAgent := &civogo.Component{ @@ -162,7 +162,7 @@ func (w *watcher) run(ctx context.Context) error { slog.Info("Node is not ready, attempting to reboot", "node", node.GetName()) if err := w.rebootNode(node.GetName()); err != nil { slog.Error("Failed to reboot Node", "node", node.GetName(), "error", err) - return err + return fmt.Errorf("failed to reboot node: %w", err) } } } From 5029dd8c2eb70b012878d0ba96c9cc25219ce6d7 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Fri, 28 Feb 2025 04:29:59 +0900 Subject: [PATCH 10/10] fix: update go.mod Signed-off-by: hlts2 --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 231a4f4..e7ac7e5 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/civo/node-agent -go 1.23.5 +go 1.24.0 require ( github.com/civo/civogo v0.3.94