Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add node watcher functionality and test case #3

Merged
merged 11 commits into from
Feb 27, 2025
Merged
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
module github.com/civo/node-agent

go 1.23.5
go 1.24.0

require (
github.com/civo/civogo v0.3.94
k8s.io/api v0.32.2
k8s.io/apimachinery v0.32.2
k8s.io/client-go v0.32.2
)

Expand Down Expand Up @@ -42,8 +44,6 @@ require (
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.32.2 // indirect
k8s.io/apimachinery v0.32.2 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect
Expand Down
31 changes: 20 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package main
import (
"context"
"flag"
"log"
"log/slog"
"os"
"os/signal"
"strings"
Expand All @@ -16,17 +16,15 @@ import (
var versionInfo = flag.Bool("version", false, "Print the driver version")

var (
region = strings.TrimSpace(os.Getenv("CIVO_REGION"))
clusterName = strings.TrimSpace(os.Getenv("CIVO_CLUSTER_NAME"))
apiKey = strings.TrimSpace(os.Getenv("CIVO_API_KEY"))
apiURL = strings.TrimSpace(os.Getenv("CIVO_API_URL"))
apiKey = strings.TrimSpace(os.Getenv("CIVO_API_KEY"))
region = strings.TrimSpace(os.Getenv("CIVO_REGION"))
clusterID = strings.TrimSpace(os.Getenv("CIVO_CLUSTER_ID"))
nodePoolID = strings.TrimSpace(os.Getenv("CIVO_NODE_POOL_ID"))
nodeDesiredGPUCount = strings.TrimSpace(os.Getenv("CIVO_NODE_DESIRED_GPU_COUNT"))
)

func run(ctx context.Context) error {
w, err := watcher.NewWatcher(ctx, clusterName, region, apiKey) // TODO: Add options
if err != nil {
return err
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -44,17 +42,28 @@ func run(ctx context.Context) error {
cancel()
}()

w, err := watcher.NewWatcher(ctx, apiURL, apiKey, region, clusterID, nodePoolID, nodeDesiredGPUCount)
if err != nil {
return err
}
return w.Run(ctx)
}

func main() {
flag.Parse()
if *versionInfo {
// TOD: log
slog.Info("node-agent", "version", watcher.Version)
return
}

slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil).WithAttrs([]slog.Attr{
slog.String("clusterID", clusterID),
slog.String("region", region),
slog.String("nodePoolID", nodePoolID),
})))

if err := run(context.Background()); err != nil {
log.Fatal(err)
slog.Error("The node-agent encountered a critical error and will exit", "error", err)
os.Exit(1)
}
}
10 changes: 9 additions & 1 deletion pkg/watcher/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import "github.com/civo/civogo"
// FakeClient is a test client used for more flexible behavior control
// when FakeClient alone is not sufficient.
type FakeClient struct {
HardRebootInstanceFunc func(id string) (*civogo.SimpleResponse, error)
HardRebootInstanceFunc func(id string) (*civogo.SimpleResponse, error)
FindKubernetesClusterInstanceFunc func(clusterID, search string) (*civogo.Instance, error)

*civogo.FakeClient
}
Expand All @@ -17,4 +18,11 @@ func (f *FakeClient) HardRebootInstance(id string) (*civogo.SimpleResponse, erro
return f.FakeClient.HardRebootInstance(id)
}

func (f *FakeClient) FindKubernetesClusterInstance(clusterID, search string) (*civogo.Instance, error) {
if f.FindKubernetesClusterInstanceFunc != nil {
return f.FindKubernetesClusterInstanceFunc(clusterID, search)
}
return f.FakeClient.FindKubernetesClusterInstance(clusterID, search)
}

var _ civogo.Clienter = (*FakeClient)(nil)
32 changes: 32 additions & 0 deletions pkg/watcher/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,38 @@
package watcher

import (
"github.com/civo/civogo"
"k8s.io/client-go/kubernetes"
)

// Option represents a configuration function that modifies watcher object.
type Option func(*watcher)

var defaultOptions = []Option{}

// WithKubernetesClient returns Option to set Kubernetes API client.
func WithKubernetesClient(client kubernetes.Interface) Option {
return func(w *watcher) {
if client != nil {
w.client = client
}
}
}

// WithKubernetesClient returns Option to set Kubernetes config path.
func WithKubernetesClientConfigPath(path string) Option {
return func(w *watcher) {
if path != "" {
w.clientCfgPath = path
}
}
}

// WithCivoClient returns Option to set Civo API client.
func WithCivoClient(client civogo.Clienter) Option {
return func(w *watcher) {
if client != nil {
w.civoClient = client
}
}
}
154 changes: 102 additions & 52 deletions pkg/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package watcher
import (
"context"
"fmt"
"os"
"log/slog"
"strconv"
"time"

"github.com/civo/civogo"
v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/client-go/kubernetes"
Expand All @@ -18,43 +19,77 @@ import (
// Version is the current version of the this watcher
var Version string = "0.0.1"

const (
nodePoolLabelKey = "kubernetes.civo.com/civo-node-pool"
gpuResourceName = "nvidia.com/gpu"
)

type Watcher interface {
Run(ctx context.Context) error
}

type watcher struct {
client kubernetes.Interface
civoClient civogo.Clienter
clusterName string
region string
apiKey string
client kubernetes.Interface
civoClient civogo.Clienter
clientCfgPath string

clusterID string
region string
apiKey string
apiURL string
nodeDesiredGPUCount int

nodeSelector *metav1.LabelSelector
}

func NewWatcher(ctx context.Context, clusterName, region, apiKey string, opts ...Option) (Watcher, error) {
func NewWatcher(ctx context.Context, apiURL, apiKey, region, clusterID, nodePoolID, nodeDesiredGPUCount string, opts ...Option) (Watcher, error) {
w := new(watcher)
for _, opt := range append(defaultOptions, opts...) {
opt(w)
}

if clusterID == "" {
return nil, fmt.Errorf("CIVO_CLUSTER_ID not set")
}
if nodePoolID == "" {
return nil, fmt.Errorf("CIVO_NODE_POOL_ID not set")
}
if w.civoClient == nil && apiKey == "" {
return nil, fmt.Errorf("CIVO_API_KEY not set")
}

n, err := strconv.Atoi(nodeDesiredGPUCount)
if err != nil {
return nil, fmt.Errorf("CIVO_NODE_DESIRED_GPU_COUNT has an invalid value, %s: %w", nodeDesiredGPUCount, err)
}
if n < 1 {
return nil, fmt.Errorf("CIVO_NODE_DESIRED_GPU_COUNT must be at least 1: %s", nodeDesiredGPUCount)
}

w.nodeDesiredGPUCount = n
w.nodeSelector = &metav1.LabelSelector{
MatchLabels: map[string]string{
nodePoolLabelKey: nodePoolID,
},
}

if err := w.setupKubernetesClient(); err != nil {
return nil, err
}
if err := w.setupCivoClient(ctx); err != nil {
if err := w.setupCivoClient(); err != nil {
return nil, err
}

return w, nil
}

// setupKubernetesClient creates Kubernetes client based on the kubeconfig path.
// If kubeconfig path is not empty, the client will be created using that path.
// Otherwise, if the kubeconfig path is empty, the client will be created using the in-clustetr config.
func (w *watcher) setupKubernetesClient() (err error) {
kubeconfig := os.Getenv("KUBECONFIG")

if kubeconfig != "" && w.client == nil {
cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if w.clientCfgPath != "" && w.client == nil {
cfg, err := clientcmd.BuildConfigFromFlags("", w.clientCfgPath)
if err != nil {
return fmt.Errorf("failed to build kubeconfig from path %q: %w", kubeconfig, err)
return fmt.Errorf("failed to build kubeconfig from path %q: %w", cfg, err)
}
w.client, err = kubernetes.NewForConfig(cfg)
if err != nil {
Expand All @@ -76,80 +111,95 @@ func (w *watcher) setupKubernetesClient() (err error) {
return nil
}

func (w *watcher) setupCivoClient(_ context.Context) error {

if len(w.apiKey) == 0 {
return fmt.Errorf("CIVO_API_KEY not set")
func (w *watcher) setupCivoClient() error {
if w.civoClient != nil {
return nil
}

civoClient, err := civogo.NewClient(w.apiKey, w.region)
client, err := civogo.NewClientWithURL(w.apiKey, w.apiURL, w.region)
if err != nil {
return err
return fmt.Errorf("failed to intiliase civo client: %w", err)
}
w.civoClient = civoClient

userAgent := &civogo.Component{
ID: w.clusterID,
Name: "node-agent",
Version: Version,
}
client.SetUserAgent(userAgent)

w.civoClient = client
return nil
}

func (w *watcher) Run(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
w.listNodes(ctx)
slog.Info("Started the watcher process...")
if err := w.run(ctx); err != nil {
slog.Error("An error occurred while running the watcher process", "error", err)
}
case <-ctx.Done():
return nil
}
}
}

func (w *watcher) listNodes(ctx context.Context) {
nodes, err := w.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
fmt.Printf("Error listing nodes: %v\n", err)
return
}

cluster, err := w.civoClient.GetKubernetesCluster(w.clusterName)
func (w *watcher) run(ctx context.Context) error {
nodes, err := w.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(w.nodeSelector),
})
if err != nil {
fmt.Printf("Error getting cluster: %v\n", err)
return
return err
}

fmt.Println("\nNodes List:")
for _, node := range nodes.Items {
condition := getNodeCondition(node)
if condition != "Ready" {
if err := w.restart(cluster); err != nil {
fmt.Printf("Error restarting instance: %v\n", err)
if !isNodeDesiredGPU(&node, w.nodeDesiredGPUCount) || !isNodeReady(&node) {
slog.Info("Node is not ready, attempting to reboot", "node", node.GetName())
if err := w.rebootNode(node.GetName()); err != nil {
slog.Error("Failed to reboot Node", "node", node.GetName(), "error", err)
return fmt.Errorf("failed to reboot node: %w", err)
}
}
}
return nil
}

func getNodeCondition(node v1.Node) string {
func isNodeReady(node *corev1.Node) bool {
for _, cond := range node.Status.Conditions {
if cond.Type == v1.NodeReady {
if cond.Status == v1.ConditionTrue {
return "Ready"
}
return "NotReady"
if cond.Type == corev1.NodeReady {
return cond.Status == corev1.ConditionTrue
}
}
return "Unknown"
return false
}

func (w *watcher) restart(cluster *civogo.KubernetesCluster) error {
instance, err := w.civoClient.GetKubernetesCluster(cluster.ID)
if err != nil {
return fmt.Errorf("failed to get instance: %w", err)
func isNodeDesiredGPU(node *corev1.Node, desired int) bool {
quantity := node.Status.Allocatable[gpuResourceName]
if quantity.IsZero() {
return false
}
gpuCount, ok := quantity.AsInt64()
if !ok {
return false
}
return gpuCount == int64(desired)
}

res, err := w.civoClient.RebootInstance(instance.ID)
func (w *watcher) rebootNode(name string) error {
instance, err := w.civoClient.FindKubernetesClusterInstance(w.clusterID, name)
if err != nil {
return fmt.Errorf("failed to reboot instance: %w", err)
return fmt.Errorf("failed to find instance, clusterID: %s, nodeName: %s: %w", w.clusterID, name, err)
}

fmt.Printf("Instance %s is rebooting: %v\n", instance.ID, res)
_, err = w.civoClient.HardRebootInstance(instance.ID)
if err != nil {
return fmt.Errorf("failed to reboot instance, clusterID: %s, instanceID: %s: %w", w.clusterID, instance.ID, err)
}
slog.Info("Instance is rebooting", "instanceID", instance.ID)
return nil
}
Loading
Loading