Skip to content

Commit

Permalink
refactor(setup-cluster): reduce duplicated code
Browse files Browse the repository at this point in the history
  • Loading branch information
gacevicljubisa committed Oct 18, 2023
1 parent 8e93a1b commit 45c7943
Showing 1 changed file with 119 additions and 182 deletions.
301 changes: 119 additions & 182 deletions cmd/beekeeper/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import (
"github.com/ethersphere/node-funder/pkg/funder"
)

const bootnodeMode = "bootnode"

type nodeResult struct {
ethAddress string
err error
}

func (c *command) deleteCluster(ctx context.Context, clusterName string, cfg *config.Config, deleteStorage bool) (err error) {
clusterConfig, ok := cfg.Clusters[clusterName]
if !ok {
Expand Down Expand Up @@ -116,8 +123,6 @@ func (c *command) setupCluster(ctx context.Context, clusterName string, cfg *con
return nil, fmt.Errorf("cluster %s not defined", clusterName)
}

fundOpts := clusterConfig.Funding.Export()

var chainNodeEndpoint string
if chainNodeEndpoint = c.globalConfig.GetString(optionNameChainNodeEndpoint); chainNodeEndpoint == "" {
return nil, errors.New("chain node endpoint (geth-url) not provided")
Expand All @@ -128,231 +133,163 @@ func (c *command) setupCluster(ctx context.Context, clusterName string, cfg *con
return nil, errors.New("wallet key not provided")
}

clusterOptions := clusterConfig.Export()
clusterOptions.K8SClient = c.k8sClient
clusterOptions.SwapClient = c.swapClient
fundOpts := clusterConfig.Funding.Export()
clusterOpts := clusterConfig.Export()
clusterOpts.K8SClient = c.k8sClient
clusterOpts.SwapClient = c.swapClient

cluster = orchestrationK8S.NewCluster(clusterConfig.GetName(), clusterOptions, c.log)
bootnodes := ""
cluster = orchestrationK8S.NewCluster(clusterConfig.GetName(), clusterOpts, c.log)

type nodeResult struct {
ethAddress string
err error
}
var nodeCount uint32
nodeResultCh := make(chan nodeResult)
defer close(nodeResultCh)
nodeResultChan := make(chan nodeResult)
defer close(nodeResultChan)

for ng, v := range clusterConfig.GetNodeGroups() {
ngConfig, ok := cfg.NodeGroups[v.Config]
if !ok {
return nil, fmt.Errorf("node group profile %s not defined", v.Config)
}
fundAddresses, bootnodes, err := setupNodes(ctx, clusterConfig, cfg, true, cluster, startCluster, "", nodeResultChan)
if err != nil {
return nil, fmt.Errorf("setup node group bootnode: %w", err)
}

if v.Mode == "bootnode" { // TODO: implement standalone mode
beeConfig, ok := cfg.BeeConfigs[v.BeeConfig]
if !ok {
return nil, fmt.Errorf("bee profile %s not defined", v.BeeConfig)
}
err = fund(ctx, startCluster, err, fundAddresses, chainNodeEndpoint, walletKey, fundOpts)
if err != nil {
return nil, fmt.Errorf("funding node group bootnode: %w", err)
}

// add node group to the cluster
cluster.AddNodeGroup(ng, ngConfig.Export())
fundAddresses, _, err = setupNodes(ctx, clusterConfig, cfg, false, cluster, startCluster, bootnodes, nodeResultChan)
if err != nil {
return nil, fmt.Errorf("setup other node groups: %w", err)
}

// start nodes in the node group
g, err := cluster.NodeGroup(ng)
if err != nil {
return nil, err
}
err = fund(ctx, startCluster, err, fundAddresses, chainNodeEndpoint, walletKey, fundOpts)
if err != nil {
return nil, fmt.Errorf("fund other node groups: %w", err)
}

for i, node := range v.Nodes {
// set node name
nName := fmt.Sprintf("%s-%d", ng, i)
if len(node.Name) > 0 {
nName = node.Name
}
return
}

// set bootnodes
bConfig := beeConfig.Export()
bConfig.Bootnodes = fmt.Sprintf(node.Bootnodes, clusterConfig.GetNamespace()) // TODO: improve bootnode management, support more than 2 bootnodes
bootnodes += bConfig.Bootnodes + " "
func setupNodes(ctx context.Context, clusterConfig config.Cluster, cfg *config.Config, bootnode bool, cluster orchestration.Cluster, startCluster bool, bootnodesIn string, nodeResultCh chan nodeResult) (fundAddresses []string, bootnodesOut string, err error) {
var nodeCount uint32
fundOpts := clusterConfig.Funding.Export()
for ngName, v := range clusterConfig.GetNodeGroups() {

// set NodeOptions
nOptions := orchestration.NodeOptions{
Config: &bConfig,
}
if len(node.Clef.Key) > 0 {
nOptions.ClefKey = node.Clef.Key
}
if len(node.Clef.Password) > 0 {
nOptions.ClefPassword = node.Clef.Password
}
if len(node.LibP2PKey) > 0 {
nOptions.LibP2PKey = node.LibP2PKey
}
if len(node.SwarmKey) > 0 {
nOptions.SwarmKey = orchestration.EncryptedKey(node.SwarmKey)
}
nodeCount++
go func() {
if startCluster {
ethAddress, err := g.SetupNode(ctx, nName, nOptions, fundOpts)
nodeResultCh <- nodeResult{
ethAddress: ethAddress,
err: err,
}
} else {
err := g.AddNode(ctx, nName, nOptions)
nodeResultCh <- nodeResult{
err: err,
}
}
}()
}
if (v.Mode != bootnodeMode && bootnode) || (v.Mode == bootnodeMode && !bootnode) {
continue
}
}

var fundAddresses []string

for i := uint32(0); i < nodeCount; i++ {
nodeResult := <-nodeResultCh
if nodeResult.err != nil {
return nil, fmt.Errorf("starting node group bootnode: %w", nodeResult.err)
ngConfig, ok := cfg.NodeGroups[v.Config]
if !ok {
return nil, "", fmt.Errorf("node group profile %s not defined", v.Config)
}
if nodeResult.ethAddress != "" {
fundAddresses = append(fundAddresses, nodeResult.ethAddress)
ngOptions := ngConfig.Export()

beeConfig, ok := cfg.BeeConfigs[v.BeeConfig]
if !ok {
return nil, "", fmt.Errorf("bee profile %s not defined", v.BeeConfig)
}
}
bConfig := beeConfig.Export()

if startCluster {
err = funder.Fund(ctx, funder.Config{
Addresses: fundAddresses,
ChainNodeEndpoint: chainNodeEndpoint,
WalletKey: walletKey,
MinAmounts: funder.MinAmounts{
NativeCoin: fundOpts.Eth,
SwarmToken: fundOpts.Bzz,
},
}, nil, nil)
if err != nil {
return nil, fmt.Errorf("funding node group bootnode: %w", err)
if !bootnode {
bConfig.Bootnodes = bootnodesIn
ngOptions.BeeConfig = &bConfig
}
}

nodeCount = 0
cluster.AddNodeGroup(ngName, ngOptions)

for ng, v := range clusterConfig.GetNodeGroups() {
ngConfig, ok := cfg.NodeGroups[v.Config]
if !ok {
return nil, fmt.Errorf("node group profile %s not defined", v.Config)
// start nodes in the node group
ng, err := cluster.NodeGroup(ngName)
if err != nil {
return nil, "", err
}

if v.Mode != "bootnode" { // TODO: support standalone nodes
// set bootnodes
beeConfig, ok := cfg.BeeConfigs[v.BeeConfig]
if !ok {
return nil, fmt.Errorf("bee profile %s not defined", v.BeeConfig)
for i, node := range v.Nodes {
// set node name
nodeName := fmt.Sprintf("%s-%d", ngName, i)
if len(node.Name) > 0 {
nodeName = node.Name
}

bConfig := beeConfig.Export()
bConfig.Bootnodes = bootnodes
// add node group to the cluster
ngOptions := ngConfig.Export()
ngOptions.BeeConfig = &bConfig
cluster.AddNodeGroup(ng, ngOptions)
var nodeOpts orchestration.NodeOptions

// start nodes in the node group
g, err := cluster.NodeGroup(ng)
if err != nil {
return nil, err
if bootnode {
// set bootnodes
bConfig.Bootnodes = fmt.Sprintf(node.Bootnodes, clusterConfig.GetNamespace()) // TODO: improve bootnode management, support more than 2 bootnodes
bootnodesOut += bootnodesIn + bConfig.Bootnodes + " "
nodeOpts = setupNodeOptions(node, &bConfig)
} else {
nodeOpts = setupNodeOptions(node, nil)
}

for i, node := range v.Nodes {
nodeCount++
go setupOrAddNode(ctx, startCluster, ng, nodeName, nodeOpts, fundOpts, nodeResultCh)
}

if len(v.Nodes) == 0 && !bootnode {
for i := 0; i < v.Count; i++ {
// set node name
nName := fmt.Sprintf("%s-%d", ng, i)
if len(node.Name) > 0 {
nName = node.Name
}
// set NodeOptions
nOptions := orchestration.NodeOptions{}
if len(node.Clef.Key) > 0 {
nOptions.ClefKey = node.Clef.Key
}
if len(node.Clef.Password) > 0 {
nOptions.ClefPassword = node.Clef.Password
}
if len(node.LibP2PKey) > 0 {
nOptions.LibP2PKey = node.LibP2PKey
}
if len(node.SwarmKey) > 0 {
nOptions.SwarmKey = orchestration.EncryptedKey(node.SwarmKey)
}
nodeName := fmt.Sprintf("%s-%d", ngName, i)
nodeCount++
go func() {
if startCluster {
ethAddress, err := g.SetupNode(ctx, nName, nOptions, fundOpts)
nodeResultCh <- nodeResult{
ethAddress: ethAddress,
err: err,
}
} else {
err := g.AddNode(ctx, nName, nOptions)
nodeResultCh <- nodeResult{
err: err,
}
}
}()
}

if len(v.Nodes) == 0 {
for i := 0; i < v.Count; i++ {
// set node name
nName := fmt.Sprintf("%s-%d", ng, i)
nodeCount++
go func() {
if startCluster {
ethAddress, err := g.SetupNode(ctx, nName, orchestration.NodeOptions{}, fundOpts)
nodeResultCh <- nodeResult{
ethAddress: ethAddress,
err: err,
}
} else {
err := g.AddNode(ctx, nName, orchestration.NodeOptions{})
nodeResultCh <- nodeResult{
err: err,
}
}
}()
}
go setupOrAddNode(ctx, startCluster, ng, nodeName, orchestration.NodeOptions{}, fundOpts, nodeResultCh)
}
}
}

var fundAddresses2 []string

for i := uint32(0); i < nodeCount; i++ {
nodeResult := <-nodeResultCh
if nodeResult.err != nil {
return nil, fmt.Errorf("starting nodes: %w", nodeResult.err)
return nil, "", fmt.Errorf("setup or add node result: %w", nodeResult.err)
}
if nodeResult.ethAddress != "" {
fundAddresses2 = append(fundAddresses2, nodeResult.ethAddress)
fundAddresses = append(fundAddresses, nodeResult.ethAddress)
}
}

return fundAddresses, bootnodesOut, nil
}

func setupOrAddNode(ctx context.Context, startCluster bool, ng orchestration.NodeGroup, nName string, nodeOpts orchestration.NodeOptions, fundOpts orchestration.FundingOptions, ch chan<- nodeResult) {
if startCluster {
err = funder.Fund(ctx, funder.Config{
Addresses: fundAddresses2,
ethAddress, err := ng.SetupNode(ctx, nName, nodeOpts, fundOpts)
ch <- nodeResult{
ethAddress: ethAddress,
err: err,
}
} else {
err := ng.AddNode(ctx, nName, nodeOpts)
ch <- nodeResult{
err: err,
}
}
}

func setupNodeOptions(node config.ClusterNode, bConfig *orchestration.Config) orchestration.NodeOptions {
nOptions := orchestration.NodeOptions{
Config: bConfig,
}
if len(node.Clef.Key) > 0 {
nOptions.ClefKey = node.Clef.Key
}
if len(node.Clef.Password) > 0 {
nOptions.ClefPassword = node.Clef.Password
}
if len(node.LibP2PKey) > 0 {
nOptions.LibP2PKey = node.LibP2PKey
}
if len(node.SwarmKey) > 0 {
nOptions.SwarmKey = orchestration.EncryptedKey(node.SwarmKey)
}
return nOptions
}

func fund(ctx context.Context, startCluster bool, err error, fundAddresses []string, chainNodeEndpoint string, walletKey string, fundOpts orchestration.FundingOptions) error {
if startCluster {
return funder.Fund(ctx, funder.Config{
Addresses: fundAddresses,
ChainNodeEndpoint: chainNodeEndpoint,
WalletKey: walletKey,
MinAmounts: funder.MinAmounts{
NativeCoin: fundOpts.Eth,
SwarmToken: fundOpts.Bzz,
},
}, nil, nil)
if err != nil {
return nil, fmt.Errorf("funding nodes: %w", err)
}
}

return
return nil
}

0 comments on commit 45c7943

Please sign in to comment.