From abaed7938edabfcab7262b10c964450c84444e74 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Mon, 16 Dec 2024 18:56:50 +0100 Subject: [PATCH] fix(operator): improve logic to use svc port --- cmd/beekeeper/cmd/check.go | 5 ++- cmd/beekeeper/cmd/cluster.go | 6 ++-- cmd/beekeeper/cmd/cmd.go | 2 +- cmd/beekeeper/cmd/operator.go | 24 ++++++-------- pkg/funder/operator/operator.go | 55 +++++++++++++++++++++------------ pkg/k8s/pod/client.go | 17 ++++++---- pkg/k8s/service/client.go | 31 +++++++++++++++++++ 7 files changed, 95 insertions(+), 45 deletions(-) diff --git a/cmd/beekeeper/cmd/check.go b/cmd/beekeeper/cmd/check.go index 683aef50..7a732ee9 100644 --- a/cmd/beekeeper/cmd/check.go +++ b/cmd/beekeeper/cmd/check.go @@ -16,7 +16,7 @@ import ( var errMissingClusterName = fmt.Errorf("cluster name not provided") -func (c *command) initCheckCmd() (err error) { +func (c *command) initCheckCmd() error { const ( optionNameCreateCluster = "create-cluster" optionNameChecks = "checks" @@ -24,14 +24,13 @@ func (c *command) initCheckCmd() (err error) { optionNameSeed = "seed" optionNameTimeout = "timeout" optionNameMetricsPusherAddress = "metrics-pusher-address" - // TODO: optionNameStages = "stages" ) cmd := &cobra.Command{ Use: "check", Short: "runs integration tests on a Bee cluster", Long: `runs integration tests on a Bee cluster.`, - RunE: func(cmd *cobra.Command, args []string) (err error) { + RunE: func(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithTimeout(cmd.Context(), c.globalConfig.GetDuration(optionNameTimeout)) defer cancel() diff --git a/cmd/beekeeper/cmd/cluster.go b/cmd/beekeeper/cmd/cluster.go index 25a3ab12..37e4cf6e 100644 --- a/cmd/beekeeper/cmd/cluster.go +++ b/cmd/beekeeper/cmd/cluster.go @@ -19,7 +19,7 @@ type nodeResult struct { err error } -func (c *command) deleteCluster(ctx context.Context, clusterName string, cfg *config.Config, deleteStorage bool) (err error) { +func (c *command) deleteCluster(ctx context.Context, clusterName string, cfg *config.Config, deleteStorage bool) error { if clusterName == "" { return errMissingClusterName } @@ -74,12 +74,14 @@ func (c *command) deleteCluster(ctx context.Context, clusterName string, cfg *co if err != nil { return err } + if len(v.Nodes) > 0 { for i := 0; i < len(v.Nodes); i++ { nName := fmt.Sprintf("%s-%d", ngName, i) if len(v.Nodes[i].Name) > 0 { nName = v.Nodes[i].Name } + if err := ng.DeleteNode(ctx, nName); err != nil { return fmt.Errorf("deleting node %s from the node group %s: %w", nName, ngName, err) } @@ -109,7 +111,7 @@ func (c *command) deleteCluster(ctx context.Context, clusterName string, cfg *co } } - return + return nil } func (c *command) setupCluster(ctx context.Context, clusterName string, startCluster bool) (cluster orchestration.Cluster, err error) { diff --git a/cmd/beekeeper/cmd/cmd.go b/cmd/beekeeper/cmd/cmd.go index 47422e7a..203f6fc5 100644 --- a/cmd/beekeeper/cmd/cmd.go +++ b/cmd/beekeeper/cmd/cmd.go @@ -193,7 +193,7 @@ func (c *command) initConfig(loadConfigDir bool) error { } if !loadConfigDir { - c.log.Debugf("Skipping loading configuration directory as the cluster name is not set") + c.log.Debugf("skpping loading configuration directory as the cluster name is not used") return nil } diff --git a/cmd/beekeeper/cmd/operator.go b/cmd/beekeeper/cmd/operator.go index d6ac26b7..9be3f026 100644 --- a/cmd/beekeeper/cmd/operator.go +++ b/cmd/beekeeper/cmd/operator.go @@ -9,9 +9,7 @@ import ( "github.com/spf13/cobra" ) -const nodeOperatorCmd string = "node-operator" - -func (c *command) initOperatorCmd() (err error) { +func (c *command) initOperatorCmd() error { const ( optionNameNamespace = "namespace" optionNameChainNodeEndpoint = "geth-url" @@ -23,24 +21,22 @@ func (c *command) initOperatorCmd() (err error) { ) cmd := &cobra.Command{ - Use: nodeOperatorCmd, - Short: "scans for scheduled pods and funds them", - Long: `Node operator scans for scheduled pods and funds them using node-funder. beekeeper node-operator`, + Use: "node-operator", + Short: "scans for scheduled Kubernetes pods and funds them", + Long: `Node operator scans for scheduled Kubernetes pods and funds them using node-funder. beekeeper node-operator`, RunE: func(cmd *cobra.Command, args []string) (err error) { - var namespace string - if namespace = c.globalConfig.GetString(optionNameNamespace); namespace == "" { + namespace := c.globalConfig.GetString(optionNameNamespace) + if namespace == "" { return errors.New("namespace not provided") } - // chain node endpoint check - var chainNodeEndpoint string - if chainNodeEndpoint = c.globalConfig.GetString(optionNameChainNodeEndpoint); chainNodeEndpoint == "" { + chainNodeEndpoint := c.globalConfig.GetString(optionNameChainNodeEndpoint) + if chainNodeEndpoint == "" { return errors.New("chain node endpoint (geth-url) not provided") } - // wallet key check - var walletKey string - if walletKey = c.globalConfig.GetString(optionNameWalletKey); walletKey == "" { + walletKey := c.globalConfig.GetString(optionNameWalletKey) + if walletKey == "" { return errors.New("wallet key not provided") } diff --git a/pkg/funder/operator/operator.go b/pkg/funder/operator/operator.go index df06d259..5ac845a8 100644 --- a/pkg/funder/operator/operator.go +++ b/pkg/funder/operator/operator.go @@ -6,12 +6,13 @@ import ( "fmt" "io" "net/http" - "net/url" + "time" "github.com/ethersphere/beekeeper/pkg/bee" "github.com/ethersphere/beekeeper/pkg/k8s" "github.com/ethersphere/beekeeper/pkg/logging" "github.com/ethersphere/node-funder/pkg/funder" + v1 "k8s.io/api/core/v1" ) type ClientConfig struct { @@ -53,30 +54,52 @@ func NewClient(cfg *ClientConfig) *Client { } func (c *Client) Run(ctx context.Context) error { - c.Log.Infof("operator started") - defer c.Log.Infof("operator done") + c.Log.Infof("operator started for namespace %s", c.Namespace) + defer c.Log.Info("operator done") - newPodIps := make(chan string) + newPods := make(chan *v1.Pod) go func() { for { select { case <-ctx.Done(): c.Log.Error("operator context canceled") return - case podIp, ok := <-newPodIps: + case pod, ok := <-newPods: if !ok { c.Log.Error("operator channel closed") return } - c.Log.Debugf("operator received pod ip: %s", podIp) - addresses, err := c.getAddresses(ctx, podIp) + c.Log.Debugf("operator received pod with ip: %s", pod.Status.PodIP) + + nodeInfo, _, err := c.K8sClient.Service.FindNode(ctx, c.Namespace, pod) if err != nil { - c.Log.Errorf("process pod ip: %v", err) + c.Log.Errorf("find service for pod: %v", err) continue } - c.Log.Infof("ethereum address: %s", addresses.Ethereum) + var addresses bee.Addresses + + maxRetries := 5 + for i := 0; i < maxRetries; i++ { + addresses, err = c.getAddresses(ctx, nodeInfo.Endpoint) + if err != nil { + c.Log.Errorf("get addresses (attempt %d/%d): %v", i+1, maxRetries, err) + if i < maxRetries-1 { // Wait before retrying, except on the last attempt + time.Sleep(1 * time.Second) + } + continue + } + + c.Log.Tracef("Successfully fetched addresses on attempt %d/%d", i+1, maxRetries) + break + } + + if err != nil { + c.Log.Errorf("Failed to fetch addresses after %d attempts: %v", maxRetries, err) + } + + c.Log.Infof("node '%s' ethereum address: %s", nodeInfo.Name, addresses.Ethereum) err = funder.Fund(ctx, funder.Config{ Addresses: []string{addresses.Ethereum}, @@ -94,23 +117,17 @@ func (c *Client) Run(ctx context.Context) error { } }() - if err := c.K8sClient.Pods.WatchNewRunning(ctx, c.Namespace, c.LabelSelector, newPodIps); err != nil { + if err := c.K8sClient.Pods.WatchNewRunning(ctx, c.Namespace, c.LabelSelector, newPods); err != nil { return fmt.Errorf("events watch: %w", err) } return nil } -// getAddresses sends a request to the pod IP and retrieves the Addresses struct, +// getAddresses sends a request to the node to get the addresses of the node, // which includes overlay, underlay addresses, Ethereum address, and public keys. -func (c *Client) getAddresses(ctx context.Context, podIp string) (bee.Addresses, error) { - url := &url.URL{ - Scheme: "http", - Host: podIp + ":1633", // it is possible to extract port from service - Path: "/addresses", - } - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil) +func (c *Client) getAddresses(ctx context.Context, endpoint string) (bee.Addresses, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/addresses", endpoint), nil) if err != nil { return bee.Addresses{}, fmt.Errorf("new request: %s", err.Error()) } diff --git a/pkg/k8s/pod/client.go b/pkg/k8s/pod/client.go index adde3449..cf5b9c8d 100644 --- a/pkg/k8s/pod/client.go +++ b/pkg/k8s/pod/client.go @@ -57,7 +57,7 @@ func (c *Client) Set(ctx context.Context, name, namespace string, o Options) (po } } - return + return pod, nil } // Delete deletes Pod @@ -104,11 +104,11 @@ func (c *Client) DeletePods(ctx context.Context, namespace, labelSelector string return deletedCount, nil } -// WatchNewRunning detects new running Pods in the namespace and sends their IPs to the channel. -func (c *Client) WatchNewRunning(ctx context.Context, namespace, labelSelector string, newPodIps chan string) (err error) { +// WatchNewRunning detects new running Pods in the namespace and sends them to the channel. +func (c *Client) WatchNewRunning(ctx context.Context, namespace, labelSelector string, newPods chan *v1.Pod) error { c.log.Debugf("starting events watch in namespace %s, label selector %s", namespace, labelSelector) - defer c.log.Infof("events watch done") - defer close(newPodIps) + defer c.log.Debug("events watch done") + defer close(newPods) watcher, err := c.clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{ LabelSelector: labelSelector, @@ -133,7 +133,12 @@ func (c *Client) WatchNewRunning(ctx context.Context, namespace, labelSelector s pod, ok := event.Object.(*v1.Pod) if ok { if pod.Status.PodIP != "" && pod.ObjectMeta.DeletionTimestamp == nil && pod.Status.Phase == v1.PodRunning { - newPodIps <- pod.Status.PodIP + for _, condition := range pod.Status.Conditions { + if condition.Type == v1.PodReady && condition.Status == v1.ConditionTrue { + newPods <- pod + break + } + } } } } diff --git a/pkg/k8s/service/client.go b/pkg/k8s/service/client.go index 2e5f96d6..dc842432 100644 --- a/pkg/k8s/service/client.go +++ b/pkg/k8s/service/client.go @@ -103,3 +103,34 @@ func (c *Client) GetNodes(ctx context.Context, namespace, labelSelector string) return } + +func (c *Client) FindNode(ctx context.Context, namespace string, pod *v1.Pod) (*NodeInfo, *v1.Service, error) { + services, err := c.clientset.CoreV1().Services(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, nil, fmt.Errorf("listing services in namespace %s: %w", namespace, err) + } + + for _, svc := range services.Items { + if selector := svc.Spec.Selector; selector != nil { + matches := true + for key, value := range selector { + if pod.Labels[key] != value { + matches = false + break + } + } + if matches { + for _, port := range svc.Spec.Ports { + if port.Name == "api" { + return &NodeInfo{ + Name: svc.Name, + Endpoint: fmt.Sprintf("http://%s:%v", svc.Spec.ClusterIP, port.Port), + }, &svc, nil + } + } + } + } + } + + return nil, nil, fmt.Errorf("no matching service found for pod %s", pod.Name) +}