Skip to content

Commit

Permalink
fix(operator): improve logic to use svc port
Browse files Browse the repository at this point in the history
  • Loading branch information
gacevicljubisa committed Dec 16, 2024
1 parent 131ba6f commit abaed79
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 45 deletions.
5 changes: 2 additions & 3 deletions cmd/beekeeper/cmd/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@ import (

var errMissingClusterName = fmt.Errorf("cluster name not provided")

func (c *command) initCheckCmd() (err error) {
func (c *command) initCheckCmd() error {
const (
optionNameCreateCluster = "create-cluster"
optionNameChecks = "checks"
optionNameMetricsEnabled = "metrics-enabled"
optionNameSeed = "seed"
optionNameTimeout = "timeout"
optionNameMetricsPusherAddress = "metrics-pusher-address"
// TODO: optionNameStages = "stages"
)

cmd := &cobra.Command{
Use: "check",
Short: "runs integration tests on a Bee cluster",
Long: `runs integration tests on a Bee cluster.`,
RunE: func(cmd *cobra.Command, args []string) (err error) {
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithTimeout(cmd.Context(), c.globalConfig.GetDuration(optionNameTimeout))
defer cancel()

Expand Down
6 changes: 4 additions & 2 deletions cmd/beekeeper/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type nodeResult struct {
err error
}

func (c *command) deleteCluster(ctx context.Context, clusterName string, cfg *config.Config, deleteStorage bool) (err error) {
func (c *command) deleteCluster(ctx context.Context, clusterName string, cfg *config.Config, deleteStorage bool) error {
if clusterName == "" {
return errMissingClusterName
}
Expand Down Expand Up @@ -74,12 +74,14 @@ func (c *command) deleteCluster(ctx context.Context, clusterName string, cfg *co
if err != nil {
return err
}

if len(v.Nodes) > 0 {
for i := 0; i < len(v.Nodes); i++ {
nName := fmt.Sprintf("%s-%d", ngName, i)
if len(v.Nodes[i].Name) > 0 {
nName = v.Nodes[i].Name
}

if err := ng.DeleteNode(ctx, nName); err != nil {
return fmt.Errorf("deleting node %s from the node group %s: %w", nName, ngName, err)
}
Expand Down Expand Up @@ -109,7 +111,7 @@ func (c *command) deleteCluster(ctx context.Context, clusterName string, cfg *co
}
}

return
return nil
}

func (c *command) setupCluster(ctx context.Context, clusterName string, startCluster bool) (cluster orchestration.Cluster, err error) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/beekeeper/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (c *command) initConfig(loadConfigDir bool) error {
}

if !loadConfigDir {
c.log.Debugf("Skipping loading configuration directory as the cluster name is not set")
c.log.Debugf("skpping loading configuration directory as the cluster name is not used")
return nil
}

Expand Down
24 changes: 10 additions & 14 deletions cmd/beekeeper/cmd/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ import (
"github.com/spf13/cobra"
)

const nodeOperatorCmd string = "node-operator"

func (c *command) initOperatorCmd() (err error) {
func (c *command) initOperatorCmd() error {
const (
optionNameNamespace = "namespace"
optionNameChainNodeEndpoint = "geth-url"
Expand All @@ -23,24 +21,22 @@ func (c *command) initOperatorCmd() (err error) {
)

cmd := &cobra.Command{
Use: nodeOperatorCmd,
Short: "scans for scheduled pods and funds them",
Long: `Node operator scans for scheduled pods and funds them using node-funder. beekeeper node-operator`,
Use: "node-operator",
Short: "scans for scheduled Kubernetes pods and funds them",
Long: `Node operator scans for scheduled Kubernetes pods and funds them using node-funder. beekeeper node-operator`,
RunE: func(cmd *cobra.Command, args []string) (err error) {
var namespace string
if namespace = c.globalConfig.GetString(optionNameNamespace); namespace == "" {
namespace := c.globalConfig.GetString(optionNameNamespace)
if namespace == "" {
return errors.New("namespace not provided")
}

// chain node endpoint check
var chainNodeEndpoint string
if chainNodeEndpoint = c.globalConfig.GetString(optionNameChainNodeEndpoint); chainNodeEndpoint == "" {
chainNodeEndpoint := c.globalConfig.GetString(optionNameChainNodeEndpoint)
if chainNodeEndpoint == "" {
return errors.New("chain node endpoint (geth-url) not provided")
}

// wallet key check
var walletKey string
if walletKey = c.globalConfig.GetString(optionNameWalletKey); walletKey == "" {
walletKey := c.globalConfig.GetString(optionNameWalletKey)
if walletKey == "" {
return errors.New("wallet key not provided")
}

Expand Down
55 changes: 36 additions & 19 deletions pkg/funder/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"time"

"github.com/ethersphere/beekeeper/pkg/bee"
"github.com/ethersphere/beekeeper/pkg/k8s"
"github.com/ethersphere/beekeeper/pkg/logging"
"github.com/ethersphere/node-funder/pkg/funder"
v1 "k8s.io/api/core/v1"
)

type ClientConfig struct {
Expand Down Expand Up @@ -53,30 +54,52 @@ func NewClient(cfg *ClientConfig) *Client {
}

func (c *Client) Run(ctx context.Context) error {
c.Log.Infof("operator started")
defer c.Log.Infof("operator done")
c.Log.Infof("operator started for namespace %s", c.Namespace)
defer c.Log.Info("operator done")

newPodIps := make(chan string)
newPods := make(chan *v1.Pod)
go func() {
for {
select {
case <-ctx.Done():
c.Log.Error("operator context canceled")
return
case podIp, ok := <-newPodIps:
case pod, ok := <-newPods:
if !ok {
c.Log.Error("operator channel closed")
return
}
c.Log.Debugf("operator received pod ip: %s", podIp)

addresses, err := c.getAddresses(ctx, podIp)
c.Log.Debugf("operator received pod with ip: %s", pod.Status.PodIP)

nodeInfo, _, err := c.K8sClient.Service.FindNode(ctx, c.Namespace, pod)
if err != nil {
c.Log.Errorf("process pod ip: %v", err)
c.Log.Errorf("find service for pod: %v", err)
continue
}

c.Log.Infof("ethereum address: %s", addresses.Ethereum)
var addresses bee.Addresses

maxRetries := 5
for i := 0; i < maxRetries; i++ {
addresses, err = c.getAddresses(ctx, nodeInfo.Endpoint)
if err != nil {
c.Log.Errorf("get addresses (attempt %d/%d): %v", i+1, maxRetries, err)
if i < maxRetries-1 { // Wait before retrying, except on the last attempt
time.Sleep(1 * time.Second)
}
continue
}

c.Log.Tracef("Successfully fetched addresses on attempt %d/%d", i+1, maxRetries)
break
}

if err != nil {
c.Log.Errorf("Failed to fetch addresses after %d attempts: %v", maxRetries, err)
}

c.Log.Infof("node '%s' ethereum address: %s", nodeInfo.Name, addresses.Ethereum)

err = funder.Fund(ctx, funder.Config{
Addresses: []string{addresses.Ethereum},
Expand All @@ -94,23 +117,17 @@ func (c *Client) Run(ctx context.Context) error {
}
}()

if err := c.K8sClient.Pods.WatchNewRunning(ctx, c.Namespace, c.LabelSelector, newPodIps); err != nil {
if err := c.K8sClient.Pods.WatchNewRunning(ctx, c.Namespace, c.LabelSelector, newPods); err != nil {
return fmt.Errorf("events watch: %w", err)
}

return nil
}

// getAddresses sends a request to the pod IP and retrieves the Addresses struct,
// getAddresses sends a request to the node to get the addresses of the node,
// which includes overlay, underlay addresses, Ethereum address, and public keys.
func (c *Client) getAddresses(ctx context.Context, podIp string) (bee.Addresses, error) {
url := &url.URL{
Scheme: "http",
Host: podIp + ":1633", // it is possible to extract port from service
Path: "/addresses",
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil)
func (c *Client) getAddresses(ctx context.Context, endpoint string) (bee.Addresses, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/addresses", endpoint), nil)
if err != nil {
return bee.Addresses{}, fmt.Errorf("new request: %s", err.Error())
}
Expand Down
17 changes: 11 additions & 6 deletions pkg/k8s/pod/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (c *Client) Set(ctx context.Context, name, namespace string, o Options) (po
}
}

return
return pod, nil
}

// Delete deletes Pod
Expand Down Expand Up @@ -104,11 +104,11 @@ func (c *Client) DeletePods(ctx context.Context, namespace, labelSelector string
return deletedCount, nil
}

// WatchNewRunning detects new running Pods in the namespace and sends their IPs to the channel.
func (c *Client) WatchNewRunning(ctx context.Context, namespace, labelSelector string, newPodIps chan string) (err error) {
// WatchNewRunning detects new running Pods in the namespace and sends them to the channel.
func (c *Client) WatchNewRunning(ctx context.Context, namespace, labelSelector string, newPods chan *v1.Pod) error {
c.log.Debugf("starting events watch in namespace %s, label selector %s", namespace, labelSelector)
defer c.log.Infof("events watch done")
defer close(newPodIps)
defer c.log.Debug("events watch done")
defer close(newPods)

watcher, err := c.clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
Expand All @@ -133,7 +133,12 @@ func (c *Client) WatchNewRunning(ctx context.Context, namespace, labelSelector s
pod, ok := event.Object.(*v1.Pod)
if ok {
if pod.Status.PodIP != "" && pod.ObjectMeta.DeletionTimestamp == nil && pod.Status.Phase == v1.PodRunning {
newPodIps <- pod.Status.PodIP
for _, condition := range pod.Status.Conditions {
if condition.Type == v1.PodReady && condition.Status == v1.ConditionTrue {
newPods <- pod
break
}
}
}
}
}
Expand Down
31 changes: 31 additions & 0 deletions pkg/k8s/service/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,34 @@ func (c *Client) GetNodes(ctx context.Context, namespace, labelSelector string)

return
}

func (c *Client) FindNode(ctx context.Context, namespace string, pod *v1.Pod) (*NodeInfo, *v1.Service, error) {
services, err := c.clientset.CoreV1().Services(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, nil, fmt.Errorf("listing services in namespace %s: %w", namespace, err)
}

for _, svc := range services.Items {
if selector := svc.Spec.Selector; selector != nil {
matches := true
for key, value := range selector {
if pod.Labels[key] != value {
matches = false
break
}
}
if matches {
for _, port := range svc.Spec.Ports {
if port.Name == "api" {
return &NodeInfo{
Name: svc.Name,
Endpoint: fmt.Sprintf("http://%s:%v", svc.Spec.ClusterIP, port.Port),
}, &svc, nil
}
}
}
}
}

return nil, nil, fmt.Errorf("no matching service found for pod %s", pod.Name)
}

0 comments on commit abaed79

Please sign in to comment.