Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: restart cluster #432

Merged
merged 5 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,33 @@ example:
beekeeper node-operator --geth-url="http://geth-swap.default.testnet.internal" --wallet-key="4663c222787e30c1994b59044aa5045377a6e79193a8ead88293926b535c722d" --namespace=default --min-swarm=180 --min-native=2.2 --log-verbosity=3
```

### restart

Command **restart** restarts bee node in Kubernetes , with optional targeting by namespace, label selectors, and node groups.

It has following flags:

```console
--cluster-name string Kubernetes cluster to operate on (overrides namespace and label selector).
--namespace string Namespace to delete pods from (only used if cluster name is not set).
--label-selector string Label selector for resources in the namespace (only used with namespace). An empty string disables filtering, allowing all resources to be selected.
--image string Container image to use when restarting pods (defaults to current image if not set).
--node-groups strings Comma-separated list of node groups to target for restarts (applies to all groups if not set).
--timeout duration Operation timeout (e.g., 5s, 10m, 1.5h). (default 5m0s)
```

example:

```bash
beekeeper restart --cluster-name=default --image="bee:latest" --node-groups="group1,group2" --timeout=10m
```

or

```bash
beekeeper restart -namespace=default --label-selector="app=bee" --timeout=10m
```

## Global flags

Global flags can be used with any command.
Expand Down
16 changes: 9 additions & 7 deletions cmd/beekeeper/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ func (c *command) setupCluster(ctx context.Context, clusterName string, cfg *con

// fund bootnode node group if cluster is started
if startCluster {
err = fund(ctx, fundAddresses, chainNodeEndpoint, walletKey, fundOpts, c.log)
if err != nil {
if err = fund(ctx, fundAddresses, chainNodeEndpoint, walletKey, fundOpts, c.log); err != nil {
return nil, fmt.Errorf("funding node group bootnode: %w", err)
}
c.log.Infof("bootnode node group funded")
Expand All @@ -159,8 +158,7 @@ func (c *command) setupCluster(ctx context.Context, clusterName string, cfg *con

// fund other node groups if cluster is started
if startCluster {
err = fund(ctx, fundAddresses, chainNodeEndpoint, walletKey, fundOpts, c.log)
if err != nil {
if err = fund(ctx, fundAddresses, chainNodeEndpoint, walletKey, fundOpts, c.log); err != nil {
return nil, fmt.Errorf("fund other node groups: %w", err)
}
c.log.Infof("node groups funded")
Expand Down Expand Up @@ -283,10 +281,14 @@ func setupNodes(ctx context.Context, clusterConfig config.Cluster, cfg *config.C
func setupOrAddNode(ctx context.Context, startCluster bool, ng orchestration.NodeGroup, nName string, nodeOpts orchestration.NodeOptions, ch chan<- nodeResult, beeOpt orchestration.BeeClientOption) {
if startCluster {
ethAddress, err := ng.SetupNode(ctx, nName, nodeOpts)
ch <- nodeResult{ethAddress: ethAddress, err: err}
ch <- nodeResult{
ethAddress: ethAddress,
err: err,
}
} else {
err := ng.AddNode(ctx, nName, nodeOpts, beeOpt)
ch <- nodeResult{err: err}
ch <- nodeResult{
err: ng.AddNode(ctx, nName, nodeOpts, beeOpt),
}
}
}

Expand Down
1 change: 0 additions & 1 deletion cmd/beekeeper/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ func (c *command) initConfig() (err error) {
if err != nil {
return fmt.Errorf("new logger: %w", err)
}
c.log.Infof("verbosity log level: %v", c.log.GetLevel())

if c.globalConfig.GetString(optionNameConfigGitRepo) != "" {
// read configuration from git repo
Expand Down
70 changes: 30 additions & 40 deletions cmd/beekeeper/cmd/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"fmt"
"time"

"github.com/ethersphere/beekeeper/pkg/config"
"github.com/ethersphere/beekeeper/pkg/restart"
"github.com/spf13/cobra"
)

Expand All @@ -15,6 +15,8 @@ func (c *command) initRestartCmd() (err error) {
optionNameClusterName = "cluster-name"
optionNameLabelSelector = "label-selector"
optionNameNamespace = "namespace"
optionNameImage = "image"
optionNameNodeGroups = "node-groups"
optionNameTimeout = "timeout"
)

Expand All @@ -33,14 +35,34 @@ func (c *command) initRestartCmd() (err error) {
return errors.New("either cluster name or namespace must be provided")
}

restartClient := restart.NewClient(c.k8sClient, c.log)

if clusterName != "" {
if err := c.restartCluster(ctx, clusterName, c.config); err != nil {
clusterConfig, ok := c.config.Clusters[clusterName]
if !ok {
return fmt.Errorf("cluster config %s not defined", clusterName)
}

cluster, err := c.setupCluster(ctx, clusterName, c.config, false)
if err != nil {
return fmt.Errorf("setting up cluster %s: %w", clusterName, err)
}

c.log.Infof("restarting cluster %s", clusterName)

if err := restartClient.RestartCluster(ctx,
cluster,
clusterConfig.GetNamespace(),
c.globalConfig.GetString(optionNameImage),
c.globalConfig.GetStringSlice(optionNameNodeGroups),
); err != nil {
return fmt.Errorf("restarting cluster %s: %w", clusterName, err)
}

return nil
}

if err := c.k8sClient.Pods.DeletePods(ctx, namespace, c.globalConfig.GetString(optionNameLabelSelector)); err != nil {
if err := restartClient.RestartPods(ctx, namespace, c.globalConfig.GetString(optionNameLabelSelector)); err != nil {
return fmt.Errorf("restarting pods in namespace %s: %w", namespace, err)
}

Expand All @@ -49,46 +71,14 @@ func (c *command) initRestartCmd() (err error) {
PreRunE: c.preRunE,
}

cmd.Flags().String(optionNameClusterName, "", "Kubernetes cluster to operate on (overrides namespace).")
cmd.Flags().StringP(optionNameNamespace, "n", "", "Namespace to delete pods from (used if cluster name is not set).")
cmd.Flags().String(optionNameLabelSelector, "", "Label selector for resources in the namespace. Ignored if cluster name is set.")
cmd.Flags().String(optionNameClusterName, "", "Kubernetes cluster to operate on (overrides namespace and label selector).")
cmd.Flags().StringP(optionNameNamespace, "n", "", "Namespace to delete pods from (only used if cluster name is not set).")
cmd.Flags().String(optionNameLabelSelector, "", "Label selector for resources in the namespace (only used with namespace).")
cmd.Flags().String(optionNameImage, "", "Container image to use when restarting pods (defaults to current image if not set).")
cmd.Flags().StringSlice(optionNameNodeGroups, nil, "List of node groups to target for restarts (applies to all groups if not set).")
cmd.Flags().Duration(optionNameTimeout, 5*time.Minute, "Operation timeout (e.g., 5s, 10m, 1.5h).")

c.root.AddCommand(cmd)

return nil
}

func (c *command) restartCluster(ctx context.Context, clusterName string, cfg *config.Config) (err error) {
c.log.Infof("restarting cluster %s", clusterName)

clusterConfig, ok := cfg.Clusters[clusterName]
if !ok {
return fmt.Errorf("cluster config %s not defined", clusterName)
}

cluster, err := c.setupCluster(ctx, clusterName, c.config, false)
if err != nil {
return fmt.Errorf("setting up cluster %s: %w", clusterName, err)
}

nodes := cluster.NodeNames()

count := 0

for _, node := range nodes {
podName := fmt.Sprintf("%s-0", node) // Suffix "-0" added as StatefulSet names pods based on replica count.
ok, err := c.k8sClient.Pods.Delete(ctx, podName, clusterConfig.GetNamespace())
if err != nil {
return fmt.Errorf("deleting pod %s in namespace %s: %w", node, clusterConfig.GetNamespace(), err)
}
if ok {
count++
c.log.Debugf("pod %s in namespace %s deleted", podName, clusterConfig.GetNamespace())
}
}

c.log.Infof("cluster %s restarted %d/%d nodes", clusterName, count, len(nodes))

return nil
}
25 changes: 16 additions & 9 deletions pkg/k8s/pod/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ func (c *Client) Set(ctx context.Context, name, namespace string, o Options) (po

// Delete deletes Pod
func (c *Client) Delete(ctx context.Context, name, namespace string) (ok bool, err error) {
err = c.clientset.CoreV1().Pods(namespace).Delete(ctx, name, metav1.DeleteOptions{})
if err != nil {
if err = c.clientset.CoreV1().Pods(namespace).Delete(ctx, name, metav1.DeleteOptions{}); err != nil {
if errors.IsNotFound(err) {
c.log.Warningf("pod %s in namespace %s not found", name, namespace)
return false, nil
Expand All @@ -74,31 +73,39 @@ func (c *Client) Delete(ctx context.Context, name, namespace string) (ok bool, e
return true, nil
}

func (c *Client) DeletePods(ctx context.Context, namespace, labelSelector string) error {
c.log.Infof("restarting pods in namespace %s, label selector %s", namespace, labelSelector)
func (c *Client) DeletePods(ctx context.Context, namespace, labelSelector string) (int, error) {
pods, err := c.clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return fmt.Errorf("listing pods in namespace %s: %w", namespace, err)
return 0, fmt.Errorf("listing pods in namespace %s: %w", namespace, err)
}

deletedCount := 0
var deletionErrors []error

for _, pod := range pods.Items {
if pod.ObjectMeta.DeletionTimestamp == nil {
if err := c.clientset.CoreV1().Pods(namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("deleting pod %s in namespace %s: %w", pod.Name, namespace, err)
c.log.Errorf("failed to delete pod %s in namespace %s: %v", pod.Name, namespace, err)
deletionErrors = append(deletionErrors, err)
continue
}
deletedCount++
}
}

c.log.Infof("found and deleted %d pods in namespace %s", len(pods.Items), namespace)
c.log.Debugf("attempted to delete %d pods; successfully deleted %d pods in namespace %s", len(pods.Items), deletedCount, namespace)

return nil
if len(deletionErrors) > 0 {
return deletedCount, fmt.Errorf("some pods failed to delete: %v", deletionErrors)
}
return deletedCount, nil
}

// WatchNewRunning detects new running Pods in the namespace and sends their IPs to the channel.
func (c *Client) WatchNewRunning(ctx context.Context, namespace, labelSelector string, newPodIps chan string) (err error) {
c.log.Infof("starting events watch in namespace %s, label selector %s", namespace, labelSelector)
c.log.Debugf("starting events watch in namespace %s, label selector %s", namespace, labelSelector)
defer c.log.Infof("events watch done")
defer close(newPodIps)

Expand Down
33 changes: 33 additions & 0 deletions pkg/k8s/statefulset/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,36 @@ func (c *Client) StoppedStatefulSets(ctx context.Context, namespace string) (sto

return
}

// UpdateImage updates StatefulSet image
func (c *Client) UpdateImage(ctx context.Context, name, namespace, image string) (err error) {
statefulSet, err := c.clientset.AppsV1().StatefulSets(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return fmt.Errorf("getting statefulset %s in namespace %s: %w", name, namespace, err)
}

statefulSet.Spec.Template.Spec.Containers[0].Image = image

_, err = c.clientset.AppsV1().StatefulSets(namespace).Update(ctx, statefulSet, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("updating statefulset %s in namespace %s: %w", name, namespace, err)
}

return nil
}

// GetUpdateStrategy returns the update strategy of the StatefulSet
func (c *Client) GetUpdateStrategy(ctx context.Context, name, namespace string) (strategy UpdateStrategy, err error) {
statefulSet, err := c.clientset.AppsV1().StatefulSets(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return UpdateStrategy{}, nil
}
return UpdateStrategy{}, fmt.Errorf("getting statefulset %s in namespace %s: %w", name, namespace, err)
}

return newUpdateStrategy(statefulSet.Spec.UpdateStrategy), nil
}
20 changes: 19 additions & 1 deletion pkg/k8s/statefulset/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
UpdateStrategyOnDelete = "OnDelete"
UpdateStrategyRolling = "RollingUpdate"
)

// StatefulSetSpec represents Kubernetes StatefulSetSpec
type StatefulSetSpec struct {
PodManagementPolicy string
Expand Down Expand Up @@ -39,9 +44,22 @@ type UpdateStrategy struct {
RollingUpdatePartition int32
}

func newUpdateStrategy(us appsv1.StatefulSetUpdateStrategy) UpdateStrategy {
if us.Type == appsv1.OnDeleteStatefulSetStrategyType {
return UpdateStrategy{
Type: UpdateStrategyOnDelete,
}
}

return UpdateStrategy{
Type: UpdateStrategyRolling,
RollingUpdatePartition: *us.RollingUpdate.Partition,
}
}

// toK8S converts UpdateStrategy to Kuberntes client object
func (u *UpdateStrategy) toK8S() appsv1.StatefulSetUpdateStrategy {
if u.Type == "OnDelete" {
if u.Type == UpdateStrategyOnDelete {
return appsv1.StatefulSetUpdateStrategy{
Type: appsv1.OnDeleteStatefulSetStrategyType,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/orchestration/k8s/nodegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ func (g *NodeGroup) SetupNode(ctx context.Context, name string, o orchestration.
return "", fmt.Errorf("get eth address for funding: %w", err)
}

return
return ethAddress, nil
}

// Settlements returns NodeGroupSettlements
Expand Down
Loading