Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use k8s service when beekeeper deployed in-cluster #439

Merged
merged 11 commits into from
Dec 11, 2024
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
GO ?= go
GOLANGCI_LINT ?= golangci-lint
GOLANGCI_LINT_VERSION ?= v1.61.0

COMMIT ?= "$(shell git describe --long --dirty --always --match "" || true)"
LDFLAGS ?= -s -w -X github.com/ethersphere/beekeeper.commit=$(COMMIT)

Expand Down
37 changes: 28 additions & 9 deletions cmd/beekeeper/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ func (c *command) setupCluster(ctx context.Context, clusterName string, cfg *con
nodeResultChan := make(chan nodeResult)
defer close(nodeResultChan)

inCluster := c.globalConfig.GetBool(optionNameInCluster)

// setup bootnode node group
fundAddresses, bootnodes, err := setupNodes(ctx, clusterConfig, cfg, true, cluster, startCluster, "", nodeResultChan)
fundAddresses, bootnodes, err := setupNodes(ctx, clusterConfig, cfg, true, cluster, startCluster, inCluster, "", nodeResultChan)
if err != nil {
return nil, fmt.Errorf("setup node group bootnode: %w", err)
}
Expand All @@ -151,7 +153,7 @@ func (c *command) setupCluster(ctx context.Context, clusterName string, cfg *con
}

// setup other node groups
fundAddresses, _, err = setupNodes(ctx, clusterConfig, cfg, false, cluster, startCluster, bootnodes, nodeResultChan)
fundAddresses, _, err = setupNodes(ctx, clusterConfig, cfg, false, cluster, startCluster, inCluster, bootnodes, nodeResultChan)
if err != nil {
return nil, fmt.Errorf("setup other node groups: %w", err)
}
Expand Down Expand Up @@ -188,7 +190,16 @@ func initializeCluster(clusterConfig config.Cluster, c *command) orchestration.C
return orchestrationK8S.NewCluster(clusterConfig.GetName(), clusterOpts, c.log)
}

func setupNodes(ctx context.Context, clusterConfig config.Cluster, cfg *config.Config, bootnode bool, cluster orchestration.Cluster, startCluster bool, bootnodesIn string, nodeResultCh chan nodeResult) (fundAddresses []string, bootnodesOut string, err error) {
func setupNodes(ctx context.Context,
clusterConfig config.Cluster,
cfg *config.Config,
bootnode bool,
cluster orchestration.Cluster,
startCluster bool,
inCluster bool,
bootnodesIn string,
nodeResultCh chan nodeResult,
) (fundAddresses []string, bootnodesOut string, err error) {
var nodeCount uint32

for ngName, v := range clusterConfig.GetNodeGroups() {
Expand Down Expand Up @@ -226,7 +237,7 @@ func setupNodes(ctx context.Context, clusterConfig config.Cluster, cfg *config.C
for nodeName, endpoint := range v.GetEndpoints() {
beeOpt := orchestration.WithURL(endpoint.APIURL)
nodeCount++
go setupOrAddNode(ctx, false, ng, nodeName, orchestration.NodeOptions{
go setupOrAddNode(ctx, false, inCluster, ng, nodeName, orchestration.NodeOptions{
Config: &bConfig,
}, nodeResultCh, beeOpt)
}
Expand All @@ -250,15 +261,15 @@ func setupNodes(ctx context.Context, clusterConfig config.Cluster, cfg *config.C
nodeOpts = setupNodeOptions(node, nil)
}
nodeCount++
go setupOrAddNode(ctx, startCluster, ng, nodeName, nodeOpts, nodeResultCh, orchestration.WithNoOptions())
go setupOrAddNode(ctx, startCluster, inCluster, ng, nodeName, nodeOpts, nodeResultCh, orchestration.WithNoOptions())
}

if len(v.Nodes) == 0 && !bootnode {
for i := 0; i < v.Count; i++ {
// set node name
nodeName := fmt.Sprintf("%s-%d", ngName, i)
nodeCount++
go setupOrAddNode(ctx, startCluster, ng, nodeName, orchestration.NodeOptions{}, nodeResultCh, orchestration.WithNoOptions())
go setupOrAddNode(ctx, startCluster, inCluster, ng, nodeName, orchestration.NodeOptions{}, nodeResultCh, orchestration.WithNoOptions())
}
}
}
Expand All @@ -278,16 +289,24 @@ func setupNodes(ctx context.Context, clusterConfig config.Cluster, cfg *config.C
return fundAddresses, bootnodesOut, nil
}

func setupOrAddNode(ctx context.Context, startCluster bool, ng orchestration.NodeGroup, nName string, nodeOpts orchestration.NodeOptions, ch chan<- nodeResult, beeOpt orchestration.BeeClientOption) {
func setupOrAddNode(ctx context.Context,
startCluster bool,
inCluster bool,
ng orchestration.NodeGroup,
nodeName string,
nodeOpts orchestration.NodeOptions,
ch chan<- nodeResult,
beeOpt orchestration.BeeClientOption,
) {
if startCluster {
ethAddress, err := ng.SetupNode(ctx, nName, nodeOpts)
ethAddress, err := ng.SetupNode(ctx, nodeName, inCluster, nodeOpts)
ch <- nodeResult{
ethAddress: ethAddress,
err: err,
}
} else {
ch <- nodeResult{
err: ng.AddNode(ctx, nName, nodeOpts, beeOpt),
err: ng.AddNode(ctx, nodeName, inCluster, nodeOpts, beeOpt),
}
}
}
Expand Down
42 changes: 22 additions & 20 deletions cmd/beekeeper/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,24 +144,27 @@ func Execute() (err error) {

func (c *command) initGlobalFlags() {
globalFlags := c.root.PersistentFlags()
globalFlags.StringVar(&c.globalConfigFile, "config", "", "config file (default is $HOME/.beekeeper.yaml)")
globalFlags.String(optionNameConfigDir, filepath.Join(c.homeDir, "/.beekeeper/"), "config directory (default is $HOME/.beekeeper/)")
globalFlags.String(optionNameConfigGitRepo, "", "Git repository with configurations (uses config directory when Git repo is not specified) (default \"\")")
globalFlags.String(optionNameConfigGitDir, ".", "Git directory in the repository with configurations (default \".\")")
globalFlags.String(optionNameConfigGitBranch, "main", "Git branch")
globalFlags.String(optionNameConfigGitUsername, "", "Git username (needed for private repos)")
globalFlags.String(optionNameConfigGitPassword, "", "Git password or personal access tokens (needed for private repos)")
globalFlags.String(optionNameLogVerbosity, "info", "log verbosity level 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=trace")
globalFlags.String(optionNameLokiEndpoint, "", "loki http endpoint for pushing local logs (use http://loki.testnet.internal/loki/api/v1/push)")
globalFlags.Bool(optionNameTracingEnabled, false, "enable tracing")
globalFlags.String(optionNameTracingEndpoint, "tempo-tempo-distributed-distributor.observability:6831", "endpoint to send tracing data")
globalFlags.String(optionNameTracingHost, "", "host to send tracing data")
globalFlags.String(optionNameTracingPort, "", "port to send tracing data")
globalFlags.String(optionNameTracingServiceName, "beekeeper", "service name identifier for tracing")
globalFlags.StringVar(&c.globalConfigFile, "config", "", "Path to the configuration file (default is $HOME/.beekeeper.yaml)")
globalFlags.String(optionNameConfigDir, filepath.Join(c.homeDir, "/.beekeeper/"), "Directory for configuration files")
globalFlags.String(optionNameConfigGitRepo, "", "URL of the Git repository containing configuration files (uses the config-dir if not specified)")
globalFlags.String(optionNameConfigGitDir, ".", "Directory within the Git repository containing configuration files. Defaults to the root directory")
globalFlags.String(optionNameConfigGitBranch, "main", "Git branch to use for configuration files")
globalFlags.String(optionNameConfigGitUsername, "", "Git username for authentication (required for private repositories)")
globalFlags.String(optionNameConfigGitPassword, "", "Git password or personal access token for authentication (required for private repositories)")
globalFlags.String(optionNameLogVerbosity, "info", "Log verbosity level (0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=trace;")
globalFlags.String(optionNameLokiEndpoint, "", "HTTP endpoint for sending logs to Loki (e.g., http://loki.testnet.internal/loki/api/v1/push)")
globalFlags.Bool(optionNameTracingEnabled, false, "Enable tracing for performance monitoring and debugging")
globalFlags.String(optionNameTracingEndpoint, "127.0.0.1:6831", "Endpoint for sending tracing data, specified as host:port")
globalFlags.String(optionNameTracingHost, "", "Host address for sending tracing data")
globalFlags.String(optionNameTracingPort, "", "Port for sending tracing data")
globalFlags.String(optionNameTracingServiceName, "beekeeper", "Service name identifier used in tracing data")
globalFlags.Bool(optionNameEnableK8S, true, "Enable Kubernetes client functionality")
globalFlags.Bool(optionNameInCluster, false, "Use the in-cluster Kubernetes client")
globalFlags.String(optionNameKubeconfig, "~/.kube/config", "Path to the kubeconfig file")
}

func (c *command) bindGlobalFlags() (err error) {
for _, flag := range []string{optionNameConfigDir, optionNameConfigGitRepo, optionNameConfigGitBranch, optionNameConfigGitUsername, optionNameConfigGitPassword, optionNameLogVerbosity, optionNameLokiEndpoint} {
for _, flag := range []string{optionNameConfigDir, optionNameConfigGitRepo, optionNameConfigGitBranch, optionNameConfigGitDir, optionNameConfigGitUsername, optionNameConfigGitPassword, optionNameLogVerbosity, optionNameLokiEndpoint} {
if err := c.globalConfig.BindPFlag(flag, c.root.PersistentFlags().Lookup(flag)); err != nil {
return err
}
Expand Down Expand Up @@ -213,6 +216,7 @@ func (c *command) initConfig() (err error) {
}

if c.globalConfig.GetString(optionNameConfigGitRepo) != "" {
c.log.Debugf("using configuration from Git repository %s, branch %s, directory %s", c.globalConfig.GetString(optionNameConfigGitRepo), c.globalConfig.GetString(optionNameConfigGitBranch), c.globalConfig.GetString(optionNameConfigGitDir))
// read configuration from git repo
fs := memfs.New()
if _, err := git.Clone(memory.NewStorage(), fs, &git.CloneOptions{
Expand Down Expand Up @@ -261,6 +265,7 @@ func (c *command) initConfig() (err error) {
return err
}
} else {
c.log.Debugf("using configuration from directory %s", c.globalConfig.GetString(optionNameConfigDir))
// read configuration from directory
files, err := os.ReadDir(c.globalConfig.GetString(optionNameConfigDir))
if err != nil {
Expand Down Expand Up @@ -322,13 +327,10 @@ func (c *command) preRunE(cmd *cobra.Command, args []string) (err error) {

func (c *command) setK8S() (err error) {
if c.globalConfig.GetBool(optionNameEnableK8S) {
inCluster := c.globalConfig.GetBool(optionNameInCluster)
kubeconfigPath := c.globalConfig.GetString(optionNameKubeconfig)

options := []k8s.ClientOption{
k8s.WithLogger(c.log),
k8s.WithInCluster(inCluster),
k8s.WithKubeconfigPath(kubeconfigPath),
k8s.WithInCluster(c.globalConfig.GetBool(optionNameInCluster)),
k8s.WithKubeconfigPath(c.globalConfig.GetString(optionNameKubeconfig)),
}

if c.k8sClient, err = k8s.NewClient(options...); err != nil && !errors.Is(err, k8s.ErrKubeconfigNotSet) {
Expand Down
52 changes: 2 additions & 50 deletions cmd/beekeeper/cmd/node_funder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/ethersphere/beekeeper/pkg/config"
"github.com/ethersphere/beekeeper/pkg/k8s"
"github.com/ethersphere/beekeeper/pkg/logging"
nodefunder "github.com/ethersphere/beekeeper/pkg/funder/node"
"github.com/ethersphere/node-funder/pkg/funder"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -74,7 +72,6 @@ func (c *command) initNodeFunderCmd() (err error) {
ctx, cancel := context.WithTimeout(cmd.Context(), c.globalConfig.GetDuration(optionNameTimeout))
defer cancel()

c.log.Infof("node-funder started")
defer c.log.Infof("node-funder done")

// NOTE: Swarm key address is the same as the nodeEndpoint/wallet walletAddress.
Expand All @@ -86,7 +83,7 @@ func (c *command) initNodeFunderCmd() (err error) {
// if addresses are provided, use them, not k8s client to list nodes
if cfg.Namespace != "" {
label := c.globalConfig.GetString(optionNameLabelSelector)
nodeLister = newNodeLister(c.k8sClient, label, c.log)
nodeLister = nodefunder.NewClient(c.k8sClient, c.globalConfig.GetBool(optionNameInCluster), label, c.log)
}

return funder.Fund(ctx, funder.Config{
Expand Down Expand Up @@ -117,48 +114,3 @@ func (c *command) initNodeFunderCmd() (err error) {

return nil
}

type nodeLister struct {
k8sClient *k8s.Client
label string
log logging.Logger
}

func newNodeLister(k8sClient *k8s.Client, label string, l logging.Logger) *nodeLister {
return &nodeLister{
k8sClient: k8sClient,
label: label,
log: l,
}
}

func (nf *nodeLister) List(ctx context.Context, namespace string) (nodes []funder.NodeInfo, err error) {
if nf.k8sClient == nil {
return nil, fmt.Errorf("k8s client not initialized")
}

if namespace == "" {
return nil, fmt.Errorf("namespace not provided")
}

ingressHosts, err := nf.k8sClient.Ingress.GetIngressHosts(ctx, namespace, nf.label)
if err != nil {
return nil, fmt.Errorf("list ingress api nodes hosts: %s", err.Error())
}

ingressRouteHosts, err := nf.k8sClient.IngressRoute.GetIngressHosts(ctx, namespace, nf.label)
if err != nil {
return nil, fmt.Errorf("list ingress route api nodes hosts: %s", err.Error())
}

ingressHosts = append(ingressHosts, ingressRouteHosts...)

for _, node := range ingressHosts {
nodes = append(nodes, funder.NodeInfo{
Name: strings.TrimSuffix(node.Name, "-api"),
Address: fmt.Sprintf("http://%s", node.Host),
})
}

return nodes, nil
}
2 changes: 1 addition & 1 deletion cmd/beekeeper/cmd/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

"github.com/ethersphere/beekeeper/pkg/config"
"github.com/ethersphere/beekeeper/pkg/operator"
"github.com/ethersphere/beekeeper/pkg/funder/operator"
"github.com/spf13/cobra"
)

Expand Down
3 changes: 1 addition & 2 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
clusters:
default:
_inherit: ""
name: bee
namespace: bee-playground
disable-namespace: false
use-static-endpoints: false
api-domain: testnet.internal # testnet.ethswarm.org
api-domain-internal: svc.swarm1.local:1633 # Internal API domain with port when in-cluster is set to true
api-insecure-tls: true
api-scheme: http
admin-password: test
funding:
eth: 0.1
bzz: 100.0
Expand Down
1 change: 0 additions & 1 deletion config/helm-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
clusters:
helm:
_inherit: ""
name: bee
namespace: beekeeper
disable-namespace: false
api-domain: staging.internal
Expand Down
3 changes: 1 addition & 2 deletions config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
clusters:
local:
_inherit: ""
name: bee
namespace: local
disable-namespace: true
api-domain: localhost
api-domain-internal: local.svc.cluster.local:1633
api-scheme: http
admin-password: test
funding:
eth: 0.1
bzz: 100.0
Expand Down
1 change: 0 additions & 1 deletion config/mainnet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
clusters:
mainnet:
_inherit: ""
name: bee
namespace: beekeeper
disable-namespace: true
api-domain: gateway.ethswarm.org
Expand Down
1 change: 0 additions & 1 deletion config/staging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
clusters:
staging:
_inherit: ""
name: bee
namespace: staging
disable-namespace: false
api-domain: testnet.internal
Expand Down
1 change: 0 additions & 1 deletion config/testnet-giant.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
clusters:
testnet-giant:
_inherit: ""
name: bee
namespace: testnet-giant
disable-namespace: false
api-domain: staging.internal
Expand Down
1 change: 0 additions & 1 deletion config/testnet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
clusters:
testnet:
_inherit: ""
name: bee
namespace: beekeeper
disable-namespace: false
api-domain: staging.internal
Expand Down
12 changes: 9 additions & 3 deletions pkg/bee/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,16 @@ func NewClient(opts ClientOptions, log logging.Logger) (c *Client) {
log: log,
}

httpClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: opts.APIInsecureTLS,
},
},
}

if opts.APIURL != nil {
c.api = api.NewClient(opts.APIURL, &api.ClientOptions{HTTPClient: &http.Client{Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: opts.APIInsecureTLS},
}}})
c.api = api.NewClient(opts.APIURL, &api.ClientOptions{HTTPClient: httpClient})
}
if opts.Retry > 0 {
c.retry = opts.Retry
Expand Down
Loading
Loading