From bd04a53c35486e01c07eaffb96a9055984b4e7af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ljubi=C5=A1a=20Ga=C4=8Devi=C4=87?= <35105035+gacevicljubisa@users.noreply.github.com> Date: Wed, 11 Dec 2024 11:52:28 +0100 Subject: [PATCH] feat: use k8s service when beekeeper deployed in-cluster (#439) * refactor: move operator to funder pkg * refactor: move node-funder to funder/node pkg * refactor: remove -api suffix * feat(node-funder): user services when in-cluster * fix: remove in-cluster-domain flag * fix: set in-cluster properly and add Dockerfile.dev * fix: bind config-git-dir as global flag * chore: add missing namespace in front of internal domain * chore: enable namespace in local.yaml * fix: disable namespace in local.yaml * chore: remove not needed Dockerfile.dev --- Makefile | 1 - cmd/beekeeper/cmd/cluster.go | 37 +- cmd/beekeeper/cmd/cmd.go | 42 +- cmd/beekeeper/cmd/node_funder.go | 52 +-- cmd/beekeeper/cmd/operator.go | 2 +- config/config.yaml | 3 +- config/helm-cluster.yaml | 1 - config/local.yaml | 3 +- config/mainnet.yaml | 1 - config/staging.yaml | 1 - config/testnet-giant.yaml | 1 - config/testnet.yaml | 1 - pkg/bee/client.go | 12 +- pkg/beekeeper/beekeeper.go | 373 ------------------ pkg/config/cluster.go | 5 +- pkg/config/config.go | 12 + pkg/funder/node/node.go | 84 ++++ pkg/{ => funder}/operator/operator.go | 0 pkg/k8s/customresource/ingressroute/client.go | 4 +- pkg/k8s/ingress/client.go | 4 +- pkg/k8s/k8s.go | 12 +- pkg/k8s/service/client.go | 28 ++ pkg/orchestration/cluster.go | 31 +- pkg/orchestration/k8s/node_orchestrator.go | 33 +- pkg/orchestration/k8s/nodegroup.go | 8 +- pkg/orchestration/nodegroup.go | 4 +- version.go | 2 +- 27 files changed, 241 insertions(+), 516 deletions(-) create mode 100644 pkg/funder/node/node.go rename pkg/{ => funder}/operator/operator.go (100%) diff --git a/Makefile b/Makefile index be979b602..809fb13ba 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,6 @@ GO ?= go GOLANGCI_LINT ?= golangci-lint GOLANGCI_LINT_VERSION ?= v1.61.0 - COMMIT ?= "$(shell git describe --long --dirty --always --match "" || true)" LDFLAGS ?= -s -w -X github.com/ethersphere/beekeeper.commit=$(COMMIT) diff --git a/cmd/beekeeper/cmd/cluster.go b/cmd/beekeeper/cmd/cluster.go index 4b4215e51..9888f10fb 100644 --- a/cmd/beekeeper/cmd/cluster.go +++ b/cmd/beekeeper/cmd/cluster.go @@ -136,8 +136,10 @@ func (c *command) setupCluster(ctx context.Context, clusterName string, cfg *con nodeResultChan := make(chan nodeResult) defer close(nodeResultChan) + inCluster := c.globalConfig.GetBool(optionNameInCluster) + // setup bootnode node group - fundAddresses, bootnodes, err := setupNodes(ctx, clusterConfig, cfg, true, cluster, startCluster, "", nodeResultChan) + fundAddresses, bootnodes, err := setupNodes(ctx, clusterConfig, cfg, true, cluster, startCluster, inCluster, "", nodeResultChan) if err != nil { return nil, fmt.Errorf("setup node group bootnode: %w", err) } @@ -151,7 +153,7 @@ func (c *command) setupCluster(ctx context.Context, clusterName string, cfg *con } // setup other node groups - fundAddresses, _, err = setupNodes(ctx, clusterConfig, cfg, false, cluster, startCluster, bootnodes, nodeResultChan) + fundAddresses, _, err = setupNodes(ctx, clusterConfig, cfg, false, cluster, startCluster, inCluster, bootnodes, nodeResultChan) if err != nil { return nil, fmt.Errorf("setup other node groups: %w", err) } @@ -188,7 +190,16 @@ func initializeCluster(clusterConfig config.Cluster, c *command) orchestration.C return orchestrationK8S.NewCluster(clusterConfig.GetName(), clusterOpts, c.log) } -func setupNodes(ctx context.Context, clusterConfig config.Cluster, cfg *config.Config, bootnode bool, cluster orchestration.Cluster, startCluster bool, bootnodesIn string, nodeResultCh chan nodeResult) (fundAddresses []string, bootnodesOut string, err error) { +func setupNodes(ctx context.Context, + clusterConfig config.Cluster, + cfg *config.Config, + bootnode bool, + cluster orchestration.Cluster, + startCluster bool, + inCluster bool, + bootnodesIn string, + nodeResultCh chan nodeResult, +) (fundAddresses []string, bootnodesOut string, err error) { var nodeCount uint32 for ngName, v := range clusterConfig.GetNodeGroups() { @@ -226,7 +237,7 @@ func setupNodes(ctx context.Context, clusterConfig config.Cluster, cfg *config.C for nodeName, endpoint := range v.GetEndpoints() { beeOpt := orchestration.WithURL(endpoint.APIURL) nodeCount++ - go setupOrAddNode(ctx, false, ng, nodeName, orchestration.NodeOptions{ + go setupOrAddNode(ctx, false, inCluster, ng, nodeName, orchestration.NodeOptions{ Config: &bConfig, }, nodeResultCh, beeOpt) } @@ -250,7 +261,7 @@ func setupNodes(ctx context.Context, clusterConfig config.Cluster, cfg *config.C nodeOpts = setupNodeOptions(node, nil) } nodeCount++ - go setupOrAddNode(ctx, startCluster, ng, nodeName, nodeOpts, nodeResultCh, orchestration.WithNoOptions()) + go setupOrAddNode(ctx, startCluster, inCluster, ng, nodeName, nodeOpts, nodeResultCh, orchestration.WithNoOptions()) } if len(v.Nodes) == 0 && !bootnode { @@ -258,7 +269,7 @@ func setupNodes(ctx context.Context, clusterConfig config.Cluster, cfg *config.C // set node name nodeName := fmt.Sprintf("%s-%d", ngName, i) nodeCount++ - go setupOrAddNode(ctx, startCluster, ng, nodeName, orchestration.NodeOptions{}, nodeResultCh, orchestration.WithNoOptions()) + go setupOrAddNode(ctx, startCluster, inCluster, ng, nodeName, orchestration.NodeOptions{}, nodeResultCh, orchestration.WithNoOptions()) } } } @@ -278,16 +289,24 @@ func setupNodes(ctx context.Context, clusterConfig config.Cluster, cfg *config.C return fundAddresses, bootnodesOut, nil } -func setupOrAddNode(ctx context.Context, startCluster bool, ng orchestration.NodeGroup, nName string, nodeOpts orchestration.NodeOptions, ch chan<- nodeResult, beeOpt orchestration.BeeClientOption) { +func setupOrAddNode(ctx context.Context, + startCluster bool, + inCluster bool, + ng orchestration.NodeGroup, + nodeName string, + nodeOpts orchestration.NodeOptions, + ch chan<- nodeResult, + beeOpt orchestration.BeeClientOption, +) { if startCluster { - ethAddress, err := ng.SetupNode(ctx, nName, nodeOpts) + ethAddress, err := ng.SetupNode(ctx, nodeName, inCluster, nodeOpts) ch <- nodeResult{ ethAddress: ethAddress, err: err, } } else { ch <- nodeResult{ - err: ng.AddNode(ctx, nName, nodeOpts, beeOpt), + err: ng.AddNode(ctx, nodeName, inCluster, nodeOpts, beeOpt), } } } diff --git a/cmd/beekeeper/cmd/cmd.go b/cmd/beekeeper/cmd/cmd.go index 7efc53883..f1ae7cf93 100644 --- a/cmd/beekeeper/cmd/cmd.go +++ b/cmd/beekeeper/cmd/cmd.go @@ -144,24 +144,27 @@ func Execute() (err error) { func (c *command) initGlobalFlags() { globalFlags := c.root.PersistentFlags() - globalFlags.StringVar(&c.globalConfigFile, "config", "", "config file (default is $HOME/.beekeeper.yaml)") - globalFlags.String(optionNameConfigDir, filepath.Join(c.homeDir, "/.beekeeper/"), "config directory (default is $HOME/.beekeeper/)") - globalFlags.String(optionNameConfigGitRepo, "", "Git repository with configurations (uses config directory when Git repo is not specified) (default \"\")") - globalFlags.String(optionNameConfigGitDir, ".", "Git directory in the repository with configurations (default \".\")") - globalFlags.String(optionNameConfigGitBranch, "main", "Git branch") - globalFlags.String(optionNameConfigGitUsername, "", "Git username (needed for private repos)") - globalFlags.String(optionNameConfigGitPassword, "", "Git password or personal access tokens (needed for private repos)") - globalFlags.String(optionNameLogVerbosity, "info", "log verbosity level 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=trace") - globalFlags.String(optionNameLokiEndpoint, "", "loki http endpoint for pushing local logs (use http://loki.testnet.internal/loki/api/v1/push)") - globalFlags.Bool(optionNameTracingEnabled, false, "enable tracing") - globalFlags.String(optionNameTracingEndpoint, "tempo-tempo-distributed-distributor.observability:6831", "endpoint to send tracing data") - globalFlags.String(optionNameTracingHost, "", "host to send tracing data") - globalFlags.String(optionNameTracingPort, "", "port to send tracing data") - globalFlags.String(optionNameTracingServiceName, "beekeeper", "service name identifier for tracing") + globalFlags.StringVar(&c.globalConfigFile, "config", "", "Path to the configuration file (default is $HOME/.beekeeper.yaml)") + globalFlags.String(optionNameConfigDir, filepath.Join(c.homeDir, "/.beekeeper/"), "Directory for configuration files") + globalFlags.String(optionNameConfigGitRepo, "", "URL of the Git repository containing configuration files (uses the config-dir if not specified)") + globalFlags.String(optionNameConfigGitDir, ".", "Directory within the Git repository containing configuration files. Defaults to the root directory") + globalFlags.String(optionNameConfigGitBranch, "main", "Git branch to use for configuration files") + globalFlags.String(optionNameConfigGitUsername, "", "Git username for authentication (required for private repositories)") + globalFlags.String(optionNameConfigGitPassword, "", "Git password or personal access token for authentication (required for private repositories)") + globalFlags.String(optionNameLogVerbosity, "info", "Log verbosity level (0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=trace;") + globalFlags.String(optionNameLokiEndpoint, "", "HTTP endpoint for sending logs to Loki (e.g., http://loki.testnet.internal/loki/api/v1/push)") + globalFlags.Bool(optionNameTracingEnabled, false, "Enable tracing for performance monitoring and debugging") + globalFlags.String(optionNameTracingEndpoint, "127.0.0.1:6831", "Endpoint for sending tracing data, specified as host:port") + globalFlags.String(optionNameTracingHost, "", "Host address for sending tracing data") + globalFlags.String(optionNameTracingPort, "", "Port for sending tracing data") + globalFlags.String(optionNameTracingServiceName, "beekeeper", "Service name identifier used in tracing data") + globalFlags.Bool(optionNameEnableK8S, true, "Enable Kubernetes client functionality") + globalFlags.Bool(optionNameInCluster, false, "Use the in-cluster Kubernetes client") + globalFlags.String(optionNameKubeconfig, "~/.kube/config", "Path to the kubeconfig file") } func (c *command) bindGlobalFlags() (err error) { - for _, flag := range []string{optionNameConfigDir, optionNameConfigGitRepo, optionNameConfigGitBranch, optionNameConfigGitUsername, optionNameConfigGitPassword, optionNameLogVerbosity, optionNameLokiEndpoint} { + for _, flag := range []string{optionNameConfigDir, optionNameConfigGitRepo, optionNameConfigGitBranch, optionNameConfigGitDir, optionNameConfigGitUsername, optionNameConfigGitPassword, optionNameLogVerbosity, optionNameLokiEndpoint} { if err := c.globalConfig.BindPFlag(flag, c.root.PersistentFlags().Lookup(flag)); err != nil { return err } @@ -213,6 +216,7 @@ func (c *command) initConfig() (err error) { } if c.globalConfig.GetString(optionNameConfigGitRepo) != "" { + c.log.Debugf("using configuration from Git repository %s, branch %s, directory %s", c.globalConfig.GetString(optionNameConfigGitRepo), c.globalConfig.GetString(optionNameConfigGitBranch), c.globalConfig.GetString(optionNameConfigGitDir)) // read configuration from git repo fs := memfs.New() if _, err := git.Clone(memory.NewStorage(), fs, &git.CloneOptions{ @@ -261,6 +265,7 @@ func (c *command) initConfig() (err error) { return err } } else { + c.log.Debugf("using configuration from directory %s", c.globalConfig.GetString(optionNameConfigDir)) // read configuration from directory files, err := os.ReadDir(c.globalConfig.GetString(optionNameConfigDir)) if err != nil { @@ -322,13 +327,10 @@ func (c *command) preRunE(cmd *cobra.Command, args []string) (err error) { func (c *command) setK8S() (err error) { if c.globalConfig.GetBool(optionNameEnableK8S) { - inCluster := c.globalConfig.GetBool(optionNameInCluster) - kubeconfigPath := c.globalConfig.GetString(optionNameKubeconfig) - options := []k8s.ClientOption{ k8s.WithLogger(c.log), - k8s.WithInCluster(inCluster), - k8s.WithKubeconfigPath(kubeconfigPath), + k8s.WithInCluster(c.globalConfig.GetBool(optionNameInCluster)), + k8s.WithKubeconfigPath(c.globalConfig.GetString(optionNameKubeconfig)), } if c.k8sClient, err = k8s.NewClient(options...); err != nil && !errors.Is(err, k8s.ErrKubeconfigNotSet) { diff --git a/cmd/beekeeper/cmd/node_funder.go b/cmd/beekeeper/cmd/node_funder.go index 0d3345d4c..706a10570 100644 --- a/cmd/beekeeper/cmd/node_funder.go +++ b/cmd/beekeeper/cmd/node_funder.go @@ -4,12 +4,10 @@ import ( "context" "errors" "fmt" - "strings" "time" "github.com/ethersphere/beekeeper/pkg/config" - "github.com/ethersphere/beekeeper/pkg/k8s" - "github.com/ethersphere/beekeeper/pkg/logging" + nodefunder "github.com/ethersphere/beekeeper/pkg/funder/node" "github.com/ethersphere/node-funder/pkg/funder" "github.com/spf13/cobra" ) @@ -74,7 +72,6 @@ func (c *command) initNodeFunderCmd() (err error) { ctx, cancel := context.WithTimeout(cmd.Context(), c.globalConfig.GetDuration(optionNameTimeout)) defer cancel() - c.log.Infof("node-funder started") defer c.log.Infof("node-funder done") // NOTE: Swarm key address is the same as the nodeEndpoint/wallet walletAddress. @@ -86,7 +83,7 @@ func (c *command) initNodeFunderCmd() (err error) { // if addresses are provided, use them, not k8s client to list nodes if cfg.Namespace != "" { label := c.globalConfig.GetString(optionNameLabelSelector) - nodeLister = newNodeLister(c.k8sClient, label, c.log) + nodeLister = nodefunder.NewClient(c.k8sClient, c.globalConfig.GetBool(optionNameInCluster), label, c.log) } return funder.Fund(ctx, funder.Config{ @@ -117,48 +114,3 @@ func (c *command) initNodeFunderCmd() (err error) { return nil } - -type nodeLister struct { - k8sClient *k8s.Client - label string - log logging.Logger -} - -func newNodeLister(k8sClient *k8s.Client, label string, l logging.Logger) *nodeLister { - return &nodeLister{ - k8sClient: k8sClient, - label: label, - log: l, - } -} - -func (nf *nodeLister) List(ctx context.Context, namespace string) (nodes []funder.NodeInfo, err error) { - if nf.k8sClient == nil { - return nil, fmt.Errorf("k8s client not initialized") - } - - if namespace == "" { - return nil, fmt.Errorf("namespace not provided") - } - - ingressHosts, err := nf.k8sClient.Ingress.GetIngressHosts(ctx, namespace, nf.label) - if err != nil { - return nil, fmt.Errorf("list ingress api nodes hosts: %s", err.Error()) - } - - ingressRouteHosts, err := nf.k8sClient.IngressRoute.GetIngressHosts(ctx, namespace, nf.label) - if err != nil { - return nil, fmt.Errorf("list ingress route api nodes hosts: %s", err.Error()) - } - - ingressHosts = append(ingressHosts, ingressRouteHosts...) - - for _, node := range ingressHosts { - nodes = append(nodes, funder.NodeInfo{ - Name: strings.TrimSuffix(node.Name, "-api"), - Address: fmt.Sprintf("http://%s", node.Host), - }) - } - - return nodes, nil -} diff --git a/cmd/beekeeper/cmd/operator.go b/cmd/beekeeper/cmd/operator.go index b6ddf40af..e663fe458 100644 --- a/cmd/beekeeper/cmd/operator.go +++ b/cmd/beekeeper/cmd/operator.go @@ -6,7 +6,7 @@ import ( "time" "github.com/ethersphere/beekeeper/pkg/config" - "github.com/ethersphere/beekeeper/pkg/operator" + "github.com/ethersphere/beekeeper/pkg/funder/operator" "github.com/spf13/cobra" ) diff --git a/config/config.yaml b/config/config.yaml index 7b83d5bb7..6076a166b 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -3,14 +3,13 @@ clusters: default: _inherit: "" - name: bee namespace: bee-playground disable-namespace: false use-static-endpoints: false api-domain: testnet.internal # testnet.ethswarm.org + api-domain-internal: svc.swarm1.local:1633 # Internal API domain with port when in-cluster is set to true api-insecure-tls: true api-scheme: http - admin-password: test funding: eth: 0.1 bzz: 100.0 diff --git a/config/helm-cluster.yaml b/config/helm-cluster.yaml index ff12eb752..52f4b629b 100644 --- a/config/helm-cluster.yaml +++ b/config/helm-cluster.yaml @@ -2,7 +2,6 @@ clusters: helm: _inherit: "" - name: bee namespace: beekeeper disable-namespace: false api-domain: staging.internal diff --git a/config/local.yaml b/config/local.yaml index a735a4764..38fba9682 100644 --- a/config/local.yaml +++ b/config/local.yaml @@ -2,12 +2,11 @@ clusters: local: _inherit: "" - name: bee namespace: local disable-namespace: true api-domain: localhost + api-domain-internal: local.svc.cluster.local:1633 api-scheme: http - admin-password: test funding: eth: 0.1 bzz: 100.0 diff --git a/config/mainnet.yaml b/config/mainnet.yaml index e3b1c9cee..43f1855e1 100644 --- a/config/mainnet.yaml +++ b/config/mainnet.yaml @@ -3,7 +3,6 @@ clusters: mainnet: _inherit: "" - name: bee namespace: beekeeper disable-namespace: true api-domain: gateway.ethswarm.org diff --git a/config/staging.yaml b/config/staging.yaml index a3da33132..41ecfe8ea 100644 --- a/config/staging.yaml +++ b/config/staging.yaml @@ -3,7 +3,6 @@ clusters: staging: _inherit: "" - name: bee namespace: staging disable-namespace: false api-domain: testnet.internal diff --git a/config/testnet-giant.yaml b/config/testnet-giant.yaml index b67c52ad5..9199e7273 100644 --- a/config/testnet-giant.yaml +++ b/config/testnet-giant.yaml @@ -3,7 +3,6 @@ clusters: testnet-giant: _inherit: "" - name: bee namespace: testnet-giant disable-namespace: false api-domain: staging.internal diff --git a/config/testnet.yaml b/config/testnet.yaml index afcd2f248..7b698425d 100644 --- a/config/testnet.yaml +++ b/config/testnet.yaml @@ -3,7 +3,6 @@ clusters: testnet: _inherit: "" - name: bee namespace: beekeeper disable-namespace: false api-domain: staging.internal diff --git a/pkg/bee/client.go b/pkg/bee/client.go index 1462c27e4..9ea528e63 100644 --- a/pkg/bee/client.go +++ b/pkg/bee/client.go @@ -47,10 +47,16 @@ func NewClient(opts ClientOptions, log logging.Logger) (c *Client) { log: log, } + httpClient := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: opts.APIInsecureTLS, + }, + }, + } + if opts.APIURL != nil { - c.api = api.NewClient(opts.APIURL, &api.ClientOptions{HTTPClient: &http.Client{Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: opts.APIInsecureTLS}, - }}}) + c.api = api.NewClient(opts.APIURL, &api.ClientOptions{HTTPClient: httpClient}) } if opts.Retry > 0 { c.retry = opts.Retry diff --git a/pkg/beekeeper/beekeeper.go b/pkg/beekeeper/beekeeper.go index cf07c4726..28f61d4d5 100644 --- a/pkg/beekeeper/beekeeper.go +++ b/pkg/beekeeper/beekeeper.go @@ -2,14 +2,8 @@ package beekeeper import ( "context" - "fmt" - "math/rand" - "time" - "github.com/ethersphere/beekeeper/pkg/logging" "github.com/ethersphere/beekeeper/pkg/orchestration" - "github.com/ethersphere/beekeeper/pkg/random" - "golang.org/x/sync/errgroup" ) // Action defines Beekeeper Action's interface. An action that @@ -18,370 +12,3 @@ import ( type Action interface { Run(ctx context.Context, cluster orchestration.Cluster, o interface{}) (err error) } - -// Stage define stages for updating Bee -type Stage []Update - -// Update represents details for updating a node group -type Update struct { - NodeGroup string - Actions Actions -} - -// Actions represents node group update actions -type Actions struct { - AddCount int - StartCount int - StopCount int - DeleteCount int -} - -// Run runs check against the cluster -func Run(ctx context.Context, cluster orchestration.Cluster, action Action, options interface{}, stages []Stage, seed int64, logger logging.Logger) (err error) { - logger.Infof("root seed: %d", seed) - - if err := action.Run(ctx, cluster, options); err != nil { - return err - } - - for i, s := range stages { - waitDeleted := false - for _, u := range s { - if u.Actions.DeleteCount > 0 { - waitDeleted = true - } - - logger.Infof("stage %d, node group %s, add %d, delete %d, start %d, stop %d", i, u.NodeGroup, u.Actions.AddCount, u.Actions.DeleteCount, u.Actions.StartCount, u.Actions.StopCount) - - rnd := random.PseudoGenerator(seed) - ng, err := cluster.NodeGroup(u.NodeGroup) - if err != nil { - return err - } - if err := updateNodeGroup(ctx, ng, u.Actions, rnd, i, logger); err != nil { - return err - } - } - - // wait at least 60s for deleted nodes to be removed from the peers list - if waitDeleted { - time.Sleep(60 * time.Second) - } - - if err := action.Run(ctx, cluster, options); err != nil { - return err - } - } - - return -} - -// RunConcurrently runs check against the cluster, cluster updates are executed concurrently -func RunConcurrently(ctx context.Context, cluster orchestration.Cluster, action Action, options interface{}, stages []Stage, buffer int, seed int64, logger logging.Logger) (err error) { - logger.Infof("root seed: %d", seed) - - if err := action.Run(ctx, cluster, options); err != nil { - return err - } - - for i, s := range stages { - logger.Infof("starting stage %d", i) - buffers := weightedBuffers(buffer, s) - rnds := random.PseudoGenerators(seed, len(s)) - - stageGroup := new(errgroup.Group) - stageSemaphore := make(chan struct{}, buffer) - - waitDeleted := false - for j, u := range s { - if u.Actions.DeleteCount > 0 { - waitDeleted = true - } - - stageSemaphore <- struct{}{} - stageGroup.Go(func() error { - defer func() { - <-stageSemaphore - }() - - logger.Infof("node group %s, add %d, delete %d, start %d, stop %d", u.NodeGroup, u.Actions.AddCount, u.Actions.DeleteCount, u.Actions.StartCount, u.Actions.StopCount) - ng, err := cluster.NodeGroup(u.NodeGroup) - if err != nil { - return err - } - if err := updateNodeGroupConcurrently(ctx, ng, u.Actions, rnds[j], i, buffers[j], logger); err != nil { - return err - } - - logger.Infof("node group %s updated successfully", u.NodeGroup) - return nil - }) - } - - if err := stageGroup.Wait(); err != nil { - return fmt.Errorf("stage %d failed: %w", i, err) - } - - // wait 60s for deleted nodes to be removed from the peers list - if waitDeleted { - time.Sleep(60 * time.Second) - } - - if err := action.Run(ctx, cluster, options); err != nil { - return err - } - } - - return -} - -// updateNodeGroup updates node group by adding, deleting, starting and stopping it's nodes -func updateNodeGroup(ctx context.Context, ng orchestration.NodeGroup, a Actions, rnd *rand.Rand, stage int, logger logging.Logger) (err error) { - // get info from the cluster - running, err := ng.RunningNodes(ctx) - if err != nil { - return fmt.Errorf("running nodes: %w", err) - } - if len(running) < a.DeleteCount+a.StopCount { - return fmt.Errorf("not enough running nodes for given parameters, running: %d, delete: %d, stop %d", len(running), a.DeleteCount, a.StopCount) - } - - stopped, err := ng.StoppedNodes(ctx) - if err != nil { - return fmt.Errorf("stoped nodes: %w", err) - } - if len(stopped) < a.StartCount { - return fmt.Errorf("not enough stopped nodes for given parameters, stopped: %d, start: %d", len(running), a.StartCount) - } - - // plan execution - var toAdd []string - for i := 0; i < a.AddCount; i++ { - toAdd = append(toAdd, fmt.Sprintf("%s-s%dn%d", ng.Name(), stage, i)) - } - toDelete, running := randomPick(rnd, running, a.DeleteCount) - toStart, _ := randomPick(rnd, stopped, a.StartCount) - toStop, _ := randomPick(rnd, running, a.StopCount) - - // add nodes - for _, n := range toAdd { - if _, err := ng.SetupNode(ctx, n, orchestration.NodeOptions{}); err != nil { - return fmt.Errorf("add start node %s: %w", n, err) - } - c, err := ng.NodeClient(n) - if err != nil { - return err - } - overlay, err := c.Overlay(ctx) - if err != nil { - return fmt.Errorf("get node %s overlay: %w", n, err) - } - logger.Infof("node %s (%s) is added", n, overlay) - } - - // delete nodes - for _, n := range toDelete { - c, err := ng.NodeClient(n) - if err != nil { - return err - } - overlay, err := c.Overlay(ctx) - if err != nil { - return fmt.Errorf("get node %s overlay: %w", n, err) - } - if err := ng.DeleteNode(ctx, n); err != nil { - return fmt.Errorf("delete node %s: %w", n, err) - } - logger.Infof("node %s (%s) is deleted", n, overlay) - } - - // start nodes - for _, n := range toStart { - if err := ng.StartNode(ctx, n); err != nil { - return fmt.Errorf("start node %s: %w", n, err) - } - c, err := ng.NodeClient(n) - if err != nil { - return err - } - overlay, err := c.Overlay(ctx) - if err != nil { - return fmt.Errorf("get node %s overlay: %w", n, err) - } - logger.Infof("node %s (%s) is started", n, overlay) - } - - // stop nodes - for _, n := range toStop { - c, err := ng.NodeClient(n) - if err != nil { - return err - } - overlay, err := c.Overlay(ctx) - if err != nil { - return fmt.Errorf("get node %s overlay: %w", n, err) - } - if err := ng.StopNode(ctx, n); err != nil { - return fmt.Errorf("stop node %s: %w", n, err) - } - logger.Infof("node %s (%s) is stopped", n, overlay) - } - - return -} - -// updateNodeGroupConcurrently updates node group concurrently -func updateNodeGroupConcurrently(ctx context.Context, ng orchestration.NodeGroup, a Actions, rnd *rand.Rand, stage, buff int, logger logging.Logger) (err error) { - // get info from the cluster - running, err := ng.RunningNodes(ctx) - if err != nil { - return fmt.Errorf("running nodes: %w", err) - } - if len(running) < a.DeleteCount+a.StopCount { - return fmt.Errorf("not enough running nodes for given parameters, running: %d, delete: %d, stop %d", len(running), a.DeleteCount, a.StopCount) - } - - stopped, err := ng.StoppedNodes(ctx) - if err != nil { - return fmt.Errorf("stoped nodes: %w", err) - } - if len(stopped) < a.StartCount { - return fmt.Errorf("not enough stopped nodes for given parameters, stopped: %d, start: %d", len(running), a.StartCount) - } - - // plan execution - var toAdd []string - for i := 0; i < a.AddCount; i++ { - toAdd = append(toAdd, fmt.Sprintf("%s-s%dn%d", ng.Name(), stage, i)) - } - toDelete, running := randomPick(rnd, running, a.DeleteCount) - toStart, _ := randomPick(rnd, stopped, a.StartCount) - toStop, _ := randomPick(rnd, running, a.StopCount) - - updateGroup := new(errgroup.Group) - updateSemaphore := make(chan struct{}, buff) - - // add nodes - for _, n := range toAdd { - updateSemaphore <- struct{}{} - updateGroup.Go(func() error { - defer func() { - <-updateSemaphore - }() - - if _, err := ng.SetupNode(ctx, n, orchestration.NodeOptions{}); err != nil { - return fmt.Errorf("add start node %s: %w", n, err) - } - c, err := ng.NodeClient(n) - if err != nil { - return err - } - overlay, err := c.Overlay(ctx) - if err != nil { - return fmt.Errorf("get node %s overlay: %w", n, err) - } - logger.Infof("node %s (%s) is added", n, overlay) - return nil - }) - } - - // delete nodes - for _, n := range toDelete { - updateSemaphore <- struct{}{} - updateGroup.Go(func() error { - defer func() { - <-updateSemaphore - }() - - c, err := ng.NodeClient(n) - if err != nil { - return err - } - overlay, err := c.Overlay(ctx) - if err != nil { - return fmt.Errorf("get node %s overlay: %w", n, err) - } - if err := ng.DeleteNode(ctx, n); err != nil { - return fmt.Errorf("delete node %s: %w", n, err) - } - logger.Infof("node %s (%s) is deleted", n, overlay) - return nil - }) - } - - // start nodes - for _, n := range toStart { - updateSemaphore <- struct{}{} - updateGroup.Go(func() error { - defer func() { - <-updateSemaphore - }() - - if err := ng.StartNode(ctx, n); err != nil { - return fmt.Errorf("start node %s: %w", n, err) - } - c, err := ng.NodeClient(n) - if err != nil { - return err - } - overlay, err := c.Overlay(ctx) - if err != nil { - return fmt.Errorf("get node %s overlay: %w", n, err) - } - logger.Infof("node %s (%s) is started", n, overlay) - return nil - }) - } - - // stop nodes - for _, n := range toStop { - updateSemaphore <- struct{}{} - updateGroup.Go(func() error { - defer func() { - <-updateSemaphore - }() - - c, err := ng.NodeClient(n) - if err != nil { - return err - } - overlay, err := c.Overlay(ctx) - if err != nil { - return fmt.Errorf("get node %s overlay: %w", n, err) - } - if err := ng.StopNode(ctx, n); err != nil { - return fmt.Errorf("stop node %s: %w", n, err) - } - logger.Infof("node %s (%s) is stopped", n, overlay) - return nil - }) - } - - return updateGroup.Wait() -} - -// randomPick randomly picks n elements from the list, and returns lists of picked and unpicked elements -func randomPick(rnd *rand.Rand, list []string, n int) (picked, unpicked []string) { - for i := 0; i < n; i++ { - index := rnd.Intn(len(list)) - picked = append(picked, list[index]) - list = append(list[:index], list[index+1:]...) - } - return picked, list -} - -// weightedBuffers breaks buffer into smaller buffers for each update -func weightedBuffers(buffer int, s Stage) (buffers []int) { - total := 0 - for _, u := range s { - actions := u.Actions.AddCount + u.Actions.DeleteCount + u.Actions.StartCount + u.Actions.StopCount - total += actions - } - - for _, u := range s { - actions := u.Actions.AddCount + u.Actions.DeleteCount + u.Actions.StartCount + u.Actions.StopCount - buffers = append(buffers, buffer*actions/total) - } - - return -} diff --git a/pkg/config/cluster.go b/pkg/config/cluster.go index 536f39722..79e78245e 100644 --- a/pkg/config/cluster.go +++ b/pkg/config/cluster.go @@ -11,11 +11,12 @@ type Cluster struct { // parent to inherit settings from *Inherit `yaml:",inline"` // Cluster configuration - Name *string `yaml:"name"` + Name *string `yaml:"name,omitempty"` Namespace *string `yaml:"namespace"` DisableNamespace *bool `yaml:"disable-namespace"` UseStaticEndpoints *bool `yaml:"use-static-endpoints"` APIDomain *string `yaml:"api-domain"` + APIDomainInternal *string `yaml:"api-domain-internal"` APIInsecureTLS *bool `yaml:"api-insecure-tls"` APIScheme *string `yaml:"api-scheme"` Funding *Funding `yaml:"funding"` @@ -85,7 +86,7 @@ func (c *Cluster) GetName() string { // GetNamespace returns cluster namespace func (c *Cluster) GetNamespace() string { - if c.Name == nil { + if c.Namespace == nil { return "nonamespace" } return *c.Namespace diff --git a/pkg/config/config.go b/pkg/config/config.go index 0ab8af08e..d851b9682 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -129,11 +129,19 @@ func Read(log logging.Logger, yamlFiles []YamlFile) (*Config, error) { } for _, file := range yamlFiles { + log.Tracef("reading file %s", file.Name) + var tmp *Config if err := yaml.Unmarshal(file.Content, &tmp); err != nil { return nil, fmt.Errorf("unmarshaling yaml file: %w", err) } + // Set the cluster name to the key + for key, cluster := range tmp.Clusters { + cluster.Name = &key + tmp.Clusters[key] = cluster + } + // join Clusters for k, v := range tmp.Clusters { _, ok := c.Clusters[k] @@ -143,6 +151,7 @@ func Read(log logging.Logger, yamlFiles []YamlFile) (*Config, error) { log.Warningf("cluster '%s' in file '%s' already exits in configuration", k, file.Name) } } + // join NodeGroups for k, v := range tmp.NodeGroups { _, ok := c.NodeGroups[k] @@ -152,6 +161,7 @@ func Read(log logging.Logger, yamlFiles []YamlFile) (*Config, error) { log.Warningf("node group '%s' in file '%s' already exits in configuration", k, file.Name) } } + // join BeeConfigs for k, v := range tmp.BeeConfigs { _, ok := c.BeeConfigs[k] @@ -161,6 +171,7 @@ func Read(log logging.Logger, yamlFiles []YamlFile) (*Config, error) { log.Warningf("bee config '%s' in file '%s' already exits in configuration", k, file.Name) } } + // join Checks for k, v := range tmp.Checks { _, ok := c.Checks[k] @@ -170,6 +181,7 @@ func Read(log logging.Logger, yamlFiles []YamlFile) (*Config, error) { log.Warningf("check '%s' in file '%s' already exits in configuration", k, file.Name) } } + // join Simulations for k, v := range tmp.Simulations { _, ok := c.Simulations[k] diff --git a/pkg/funder/node/node.go b/pkg/funder/node/node.go new file mode 100644 index 000000000..c31d50fa9 --- /dev/null +++ b/pkg/funder/node/node.go @@ -0,0 +1,84 @@ +package node + +import ( + "context" + "fmt" + + "github.com/ethersphere/beekeeper/pkg/k8s" + "github.com/ethersphere/beekeeper/pkg/logging" + "github.com/ethersphere/node-funder/pkg/funder" +) + +type Client struct { + k8sClient *k8s.Client + inCluster bool + label string + log logging.Logger +} + +func NewClient(k8sClient *k8s.Client, inCluster bool, label string, l logging.Logger) *Client { + return &Client{ + k8sClient: k8sClient, + label: label, + inCluster: inCluster, + log: l, + } +} + +func (c *Client) List(ctx context.Context, namespace string) ([]funder.NodeInfo, error) { + if c.k8sClient == nil { + return nil, fmt.Errorf("k8s client not initialized") + } + + if namespace == "" { + return nil, fmt.Errorf("namespace not provided") + } + + c.log.Debugf("Listing nodes with parameters: namespace=%s, label=%s, inCluster=%v", namespace, c.label, c.inCluster) + + if c.inCluster { + return c.getServiceNodes(ctx, namespace) + } + + return c.getIngressNodes(ctx, namespace) +} + +func (c *Client) getServiceNodes(ctx context.Context, namespace string) ([]funder.NodeInfo, error) { + svcNodes, err := c.k8sClient.Service.GetNodes(ctx, namespace, c.label) + if err != nil { + return nil, fmt.Errorf("list api services: %w", err) + } + + nodes := make([]funder.NodeInfo, len(svcNodes)) + for i, node := range svcNodes { + nodes[i] = funder.NodeInfo{ + Name: node.Name, + Address: node.Endpoint, + } + } + + return nodes, nil +} + +func (c *Client) getIngressNodes(ctx context.Context, namespace string) ([]funder.NodeInfo, error) { + ingressNodes, err := c.k8sClient.Ingress.GetNodes(ctx, namespace, c.label) + if err != nil { + return nil, fmt.Errorf("list ingress api nodes hosts: %w", err) + } + + ingressRouteNodes, err := c.k8sClient.IngressRoute.GetNodes(ctx, namespace, c.label) + if err != nil { + return nil, fmt.Errorf("list ingress route api nodes hosts: %w", err) + } + + allNodes := append(ingressNodes, ingressRouteNodes...) + nodes := make([]funder.NodeInfo, len(allNodes)) + for i, node := range allNodes { + nodes[i] = funder.NodeInfo{ + Name: node.Name, + Address: fmt.Sprintf("http://%s", node.Host), + } + } + + return nodes, nil +} diff --git a/pkg/operator/operator.go b/pkg/funder/operator/operator.go similarity index 100% rename from pkg/operator/operator.go rename to pkg/funder/operator/operator.go diff --git a/pkg/k8s/customresource/ingressroute/client.go b/pkg/k8s/customresource/ingressroute/client.go index 87e22d513..77958f76f 100644 --- a/pkg/k8s/customresource/ingressroute/client.go +++ b/pkg/k8s/customresource/ingressroute/client.go @@ -81,8 +81,8 @@ func (c *Client) Delete(ctx context.Context, name, namespace string) (err error) return } -// GetIngressHosts list Ingress Routes hosts using label as selector, for the given namespace. If label is empty, all Ingresses are listed. -func (c *Client) GetIngressHosts(ctx context.Context, namespace, label string) (nodes []ingress.NodeInfo, err error) { +// GetNodes list Ingress Routes hosts using label as selector, for the given namespace. If label is empty, all Ingresses are listed. +func (c *Client) GetNodes(ctx context.Context, namespace, label string) (nodes []ingress.NodeInfo, err error) { ingressRoutes, err := c.clientset.IngressRoutes(namespace).List(ctx, metav1.ListOptions{ LabelSelector: label, }) diff --git a/pkg/k8s/ingress/client.go b/pkg/k8s/ingress/client.go index b7bf6d6a9..9fe28a9e9 100644 --- a/pkg/k8s/ingress/client.go +++ b/pkg/k8s/ingress/client.go @@ -78,8 +78,8 @@ func (c *Client) Delete(ctx context.Context, name, namespace string) (err error) return } -// GetIngressHosts list Ingresses hosts using label as selector, for the given namespace. If label is empty, all Ingresses are listed. -func (c *Client) GetIngressHosts(ctx context.Context, namespace, label string) (nodes []NodeInfo, err error) { +// GetNodes list Ingresses hosts using label as selector, for the given namespace. If label is empty, all Ingresses are listed. +func (c *Client) GetNodes(ctx context.Context, namespace, label string) (nodes []NodeInfo, err error) { c.logger.Debugf("listing Ingresses in namespace %s, label selector %s", namespace, label) ingreses, err := c.clientset.NetworkingV1().Ingresses(namespace).List(ctx, metav1.ListOptions{ LabelSelector: label, diff --git a/pkg/k8s/k8s.go b/pkg/k8s/k8s.go index 5a93d3f80..05a29d386 100644 --- a/pkg/k8s/k8s.go +++ b/pkg/k8s/k8s.go @@ -30,12 +30,12 @@ type ClientOption func(*Client) // Client manages communication with the Kubernetes. type Client struct { - logger logging.Logger // logger - clientConfig *ClientConfig // ClientConfig holds functions for configuration of the Client. - inCluster bool // inCluster - kubeconfigPath string // kubeconfigPath - rateLimiter flowcontrol.RateLimiter // rateLimiter - maxConcurrentRequests int // maxConcurentRequests (semaphore) + logger logging.Logger + clientConfig *ClientConfig + inCluster bool + kubeconfigPath string + rateLimiter flowcontrol.RateLimiter + maxConcurrentRequests int // exported services that K8S provides ConfigMap *configmap.Client diff --git a/pkg/k8s/service/client.go b/pkg/k8s/service/client.go index 41a94a319..2e5f96d69 100644 --- a/pkg/k8s/service/client.go +++ b/pkg/k8s/service/client.go @@ -75,3 +75,31 @@ func (c *Client) Delete(ctx context.Context, name, namespace string) (err error) return } + +type NodeInfo struct { + Name string + Endpoint string +} + +// GetNodes returns list of nodes in the namespace with labelSelector. Nodes are filtered by api port. +// Endpoint is constructed from ClusterIP and api port. +func (c *Client) GetNodes(ctx context.Context, namespace, labelSelector string) (nodes []NodeInfo, err error) { + svcs, err := c.clientset.CoreV1().Services(namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + return nil, fmt.Errorf("listing services in namespace %s: %w", namespace, err) + } + + // filter out services with api port and return clusterIP as Endpoint + for _, svc := range svcs.Items { + for _, port := range svc.Spec.Ports { + if port.Name == "api" { + nodes = append(nodes, NodeInfo{ + Name: svc.Name, + Endpoint: fmt.Sprintf("http://%s:%v", svc.Spec.ClusterIP, port.Port), + }) + } + } + } + + return +} diff --git a/pkg/orchestration/cluster.go b/pkg/orchestration/cluster.go index 6d2bfd751..9bb7b8082 100644 --- a/pkg/orchestration/cluster.go +++ b/pkg/orchestration/cluster.go @@ -43,15 +43,16 @@ type Cluster interface { // ClusterOptions represents Bee cluster options type ClusterOptions struct { - Annotations map[string]string - APIDomain string - APIInsecureTLS bool - APIScheme string - K8SClient *k8s.Client - SwapClient swap.Client - Labels map[string]string - Namespace string - DisableNamespace bool + Annotations map[string]string + APIDomain string + APIDomainInternal string + APIInsecureTLS bool + APIScheme string + K8SClient *k8s.Client + SwapClient swap.Client + Labels map[string]string + Namespace string + DisableNamespace bool } // ClusterAddresses represents addresses of all nodes in the cluster @@ -106,11 +107,17 @@ func (c ClusterOverlays) Random(r *rand.Rand) (nodeGroup string, nodeName string } // ApiURL generates URL for node's API -func (c ClusterOptions) ApiURL(name string) (u *url.URL, err error) { +func (c ClusterOptions) ApiURL(name string, inCluster bool) (u *url.URL, err error) { + apiDomain := c.APIDomain + apiScheme := c.APIScheme + if inCluster { + apiDomain = c.APIDomainInternal + apiScheme = "http" + } if c.DisableNamespace { - u, err = url.Parse(fmt.Sprintf("%s://%s.%s", c.APIScheme, name, c.APIDomain)) + u, err = url.Parse(fmt.Sprintf("%s://%s.%s", apiScheme, name, apiDomain)) } else { - u, err = url.Parse(fmt.Sprintf("%s://%s.%s.%s", c.APIScheme, name, c.Namespace, c.APIDomain)) + u, err = url.Parse(fmt.Sprintf("%s://%s.%s.%s", apiScheme, name, c.Namespace, apiDomain)) } if err != nil { return nil, fmt.Errorf("bad API url for node %s: %w", name, err) diff --git a/pkg/orchestration/k8s/node_orchestrator.go b/pkg/orchestration/k8s/node_orchestrator.go index df1cb4fa9..0d96a4c3e 100644 --- a/pkg/orchestration/k8s/node_orchestrator.go +++ b/pkg/orchestration/k8s/node_orchestrator.go @@ -108,8 +108,7 @@ func (n *nodeOrchestrator) Create(ctx context.Context, o orchestration.CreateOpt return fmt.Errorf("parsing API port from config: %w", err) } - apiSvc := fmt.Sprintf("%s-api", o.Name) - if _, err := n.k8s.Service.Set(ctx, apiSvc, o.Namespace, service.Options{ + if _, err := n.k8s.Service.Set(ctx, o.Name, o.Namespace, service.Options{ Annotations: o.Annotations, Labels: o.Labels, ServiceSpec: service.Spec{ @@ -128,12 +127,11 @@ func (n *nodeOrchestrator) Create(ctx context.Context, o orchestration.CreateOpt }); err != nil { return fmt.Errorf("set service in namespace %s: %w", o.Namespace, err) } - n.log.Infof("service %s is set in namespace %s", apiSvc, o.Namespace) + n.log.Infof("service %s is set in namespace %s", o.Name, o.Namespace) if o.IngressClass == "traefik" { // api service's ingressroute - apiIn := fmt.Sprintf("%s-api", o.Name) - if _, err := n.k8s.IngressRoute.Set(ctx, apiIn, o.Namespace, ingressroute.Options{ + if _, err := n.k8s.IngressRoute.Set(ctx, o.Name, o.Namespace, ingressroute.Options{ Annotations: mergeMaps(o.Annotations, o.IngressAnnotations), Labels: o.Labels, Spec: ingressroute.IngressRouteSpec{ @@ -144,7 +142,7 @@ func (n *nodeOrchestrator) Create(ctx context.Context, o orchestration.CreateOpt Services: []ingressroute.Service{ { Kind: "Service", - Name: apiIn, + Name: o.Name, Namespace: "local", Port: "api", }, @@ -155,11 +153,10 @@ func (n *nodeOrchestrator) Create(ctx context.Context, o orchestration.CreateOpt }); err != nil { return fmt.Errorf("set ingressroute in namespace %s: %w", o.Namespace, err) } - n.log.Infof("ingressroute %s is set in namespace %s", apiIn, o.Namespace) + n.log.Infof("ingressroute %s is set in namespace %s", o.Name, o.Namespace) } else { // api service's ingress - apiIn := fmt.Sprintf("%s-api", o.Name) - if _, err := n.k8s.Ingress.Set(ctx, apiIn, o.Namespace, ingress.Options{ + if _, err := n.k8s.Ingress.Set(ctx, o.Name, o.Namespace, ingress.Options{ Annotations: mergeMaps(o.Annotations, o.IngressAnnotations), Labels: o.Labels, Spec: ingress.Spec{ @@ -168,7 +165,7 @@ func (n *nodeOrchestrator) Create(ctx context.Context, o orchestration.CreateOpt Host: o.IngressHost, Paths: ingress.Paths{{ Backend: ingress.Backend{ - ServiceName: apiSvc, + ServiceName: o.Name, ServicePortName: "api", }, Path: "/", @@ -179,7 +176,7 @@ func (n *nodeOrchestrator) Create(ctx context.Context, o orchestration.CreateOpt }); err != nil { return fmt.Errorf("set ingress in namespace %s: %w", o.Namespace, err) } - n.log.Infof("ingress %s is set in namespace %s", apiIn, o.Namespace) + n.log.Infof("ingress %s is set in namespace %s", o.Name, o.Namespace) } // p2p service @@ -337,24 +334,22 @@ func (n *nodeOrchestrator) Delete(ctx context.Context, name string, namespace st n.log.Infof("service %s is deleted in namespace %s", p2pSvc, namespace) // api service's ingress - apiIn := fmt.Sprintf("%s-api", name) - if err := n.k8s.Ingress.Delete(ctx, apiIn, namespace); err != nil { + if err := n.k8s.Ingress.Delete(ctx, name, namespace); err != nil { return fmt.Errorf("deleting ingress in namespace %s: %w", namespace, err) } - n.log.Infof("ingress %s is deleted in namespace %s", apiIn, namespace) + n.log.Infof("ingress %s is deleted in namespace %s", name, namespace) // api service's ingress route - if err := n.k8s.IngressRoute.Delete(ctx, apiIn, namespace); err != nil { + if err := n.k8s.IngressRoute.Delete(ctx, name, namespace); err != nil { return fmt.Errorf("deleting ingress route in namespace %s: %w", namespace, err) } - n.log.Infof("ingress route %s is deleted in namespace %s", apiIn, namespace) + n.log.Infof("ingress route %s is deleted in namespace %s", name, namespace) // api service - apiSvc := fmt.Sprintf("%s-api", name) - if err := n.k8s.Service.Delete(ctx, apiSvc, namespace); err != nil { + if err := n.k8s.Service.Delete(ctx, name, namespace); err != nil { return fmt.Errorf("deleting service in namespace %s: %w", namespace, err) } - n.log.Infof("service %s is deleted in namespace %s", apiSvc, namespace) + n.log.Infof("service %s is deleted in namespace %s", name, namespace) // service account svcAccount := name diff --git a/pkg/orchestration/k8s/nodegroup.go b/pkg/orchestration/k8s/nodegroup.go index 2efdf9bcb..0d4d84f96 100644 --- a/pkg/orchestration/k8s/nodegroup.go +++ b/pkg/orchestration/k8s/nodegroup.go @@ -50,10 +50,10 @@ func NewNodeGroup(name string, copts orchestration.ClusterOptions, no orchestrat } // AddNode adss new node to the node group -func (g *NodeGroup) AddNode(ctx context.Context, name string, o orchestration.NodeOptions, opts ...orchestration.BeeClientOption) (err error) { +func (g *NodeGroup) AddNode(ctx context.Context, name string, inCluster bool, o orchestration.NodeOptions, opts ...orchestration.BeeClientOption) (err error) { var aURL *url.URL - aURL, err = g.clusterOpts.ApiURL(name) + aURL, err = g.clusterOpts.ApiURL(name, inCluster) if err != nil { return fmt.Errorf("API URL %s: %w", name, err) } @@ -718,10 +718,10 @@ func (g *NodeGroup) RunningNodes(ctx context.Context) (running []string, err err } // SetupNode creates new node in the node group, starts it in the k8s cluster and funds it -func (g *NodeGroup) SetupNode(ctx context.Context, name string, o orchestration.NodeOptions) (ethAddress string, err error) { +func (g *NodeGroup) SetupNode(ctx context.Context, name string, inCluster bool, o orchestration.NodeOptions) (ethAddress string, err error) { g.log.Infof("starting setup node: %s", name) - if err := g.AddNode(ctx, name, o); err != nil { + if err := g.AddNode(ctx, name, inCluster, o); err != nil { return "", fmt.Errorf("add node %s: %w", name, err) } diff --git a/pkg/orchestration/nodegroup.go b/pkg/orchestration/nodegroup.go index 788a026ef..c4898bf33 100644 --- a/pkg/orchestration/nodegroup.go +++ b/pkg/orchestration/nodegroup.go @@ -11,7 +11,7 @@ import ( type NodeGroup interface { Accounting(ctx context.Context) (infos NodeGroupAccounting, err error) - AddNode(ctx context.Context, name string, o NodeOptions, opts ...BeeClientOption) (err error) + AddNode(ctx context.Context, name string, inCluster bool, o NodeOptions, opts ...BeeClientOption) (err error) Addresses(ctx context.Context) (addrs NodeGroupAddresses, err error) Balances(ctx context.Context) (balances NodeGroupBalances, err error) CreateNode(ctx context.Context, name string) (err error) @@ -30,7 +30,7 @@ type NodeGroup interface { Peers(ctx context.Context) (peers NodeGroupPeers, err error) RunningNodes(ctx context.Context) (running []string, err error) Settlements(ctx context.Context) (settlements NodeGroupSettlements, err error) - SetupNode(ctx context.Context, name string, o NodeOptions) (ethAddress string, err error) + SetupNode(ctx context.Context, name string, inCluster bool, o NodeOptions) (ethAddress string, err error) Size() int StartNode(ctx context.Context, name string) (err error) StopNode(ctx context.Context, name string) (err error) diff --git a/version.go b/version.go index 5f97ffce7..a881adca7 100644 --- a/version.go +++ b/version.go @@ -1,7 +1,7 @@ package beekeeper var ( - version = "0.21.1" // manually set semantic version number + version = "0.22.0" // manually set semantic version number commit string // automatically set git commit hash // Version TODO