From 4b1812b71138e80f1a129cb6395ea63471141e6f Mon Sep 17 00:00:00 2001 From: Rokas Bilevicius Date: Wed, 8 Jun 2022 18:39:14 +0300 Subject: [PATCH] feat: improve api request count for node drain action (#36) - batching pod requests in chunks of 5 to not overload control plane api - do not call for each pod individually when waiting for them to be terminated on the node, list node pods instead - expose k8s clientset rate limiting config. Defaults: 25 QPS, 150 Burst --- actions/drain_node_handler.go | 211 +++++++++++++---------------- actions/drain_node_handler_test.go | 2 +- config/config.go | 18 +++ config/config_test.go | 2 + go.mod | 4 +- go.sum | 6 +- main.go | 8 +- 7 files changed, 128 insertions(+), 123 deletions(-) diff --git a/actions/drain_node_handler.go b/actions/drain_node_handler.go index ccb51d48..7d0da6f0 100644 --- a/actions/drain_node_handler.go +++ b/actions/drain_node_handler.go @@ -20,15 +20,12 @@ import ( "github.com/castai/cluster-controller/castai" ) -var ( - errPodPresent = errors.New("pod is still present") -) - type drainNodeConfig struct { - podsDeleteTimeout time.Duration - podDeleteRetries uint64 - podDeleteRetryDelay time.Duration - podEvictRetryDelay time.Duration + podsDeleteTimeout time.Duration + podDeleteRetries uint64 + podDeleteRetryDelay time.Duration + podEvictRetryDelay time.Duration + podsTerminationWaitRetryDelay time.Duration } func newDrainNodeHandler(log logrus.FieldLogger, clientset kubernetes.Interface) ActionHandler { @@ -36,10 +33,11 @@ func newDrainNodeHandler(log logrus.FieldLogger, clientset kubernetes.Interface) log: log, clientset: clientset, cfg: drainNodeConfig{ - podsDeleteTimeout: 5 * time.Minute, - podDeleteRetries: 5, - podDeleteRetryDelay: 5 * time.Second, - podEvictRetryDelay: 5 * time.Second, + podsDeleteTimeout: 5 * time.Minute, + podDeleteRetries: 5, + podDeleteRetryDelay: 5 * time.Second, + podEvictRetryDelay: 5 * time.Second, + podsTerminationWaitRetryDelay: 10 * time.Second, }, } } @@ -73,19 +71,10 @@ func (h *drainNodeHandler) Handle(ctx context.Context, data interface{}) error { return fmt.Errorf("tainting node %q: %w", req.NodeName, err) } - allNodePods, err := h.listNodePods(ctx, node) - if err != nil { - return fmt.Errorf("listing pods for node %q: %w", req.NodeName, err) - } - - podsToEvict := lo.Filter(allNodePods.Items, func(pod v1.Pod, _ int) bool { - return !isDaemonSetPod(&pod) && !isStaticPod(&pod) - }) - // First try to evict pods gracefully. evictCtx, evictCancel := context.WithTimeout(ctx, time.Duration(req.DrainTimeoutSeconds)*time.Second) defer evictCancel() - err = h.evictPods(evictCtx, log, podsToEvict) + err = h.evictNodePods(evictCtx, log, node) if err != nil && !errors.Is(err, context.DeadlineExceeded) { return err } @@ -97,7 +86,7 @@ func (h *drainNodeHandler) Handle(ctx context.Context, data interface{}) error { // If force is set and evict timeout exceeded delete pods. deleteCtx, deleteCancel := context.WithTimeout(ctx, h.cfg.podsDeleteTimeout) defer deleteCancel() - if err := h.deletePods(deleteCtx, log, podsToEvict); err != nil { + if err := h.deleteNodePods(deleteCtx, log, node); err != nil { return err } } @@ -107,74 +96,71 @@ func (h *drainNodeHandler) Handle(ctx context.Context, data interface{}) error { return nil } -func (h *drainNodeHandler) deletePods(ctx context.Context, log logrus.FieldLogger, pods []v1.Pod) error { - log.Infof("forcefully deleting %d pods", len(pods)) - - g, ctx := errgroup.WithContext(ctx) - for _, pod := range pods { - pod := pod - - g.Go(func() error { - err := h.deletePod(ctx, pod) - if err != nil { - return err - } - return h.waitPodTerminated(ctx, log, pod) - }) +func (h *drainNodeHandler) taintNode(ctx context.Context, node *v1.Node) error { + if node.Spec.Unschedulable { + return nil } - if err := g.Wait(); err != nil { - return fmt.Errorf("deleting pods: %w", err) + err := patchNode(ctx, h.clientset, node, func(n *v1.Node) error { + n.Spec.Unschedulable = true + return nil + }) + if err != nil { + return fmt.Errorf("patching node unschedulable: %w", err) } - return nil } -func (h *drainNodeHandler) deletePod(ctx context.Context, pod v1.Pod) error { - b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(h.cfg.podDeleteRetryDelay), h.cfg.podDeleteRetries), ctx) // nolint:gomnd - action := func() error { - err := h.clientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) - if err != nil { - // Pod is not found - ignore. - if apierrors.IsNotFound(err) { - return nil - } +func (h *drainNodeHandler) evictNodePods(ctx context.Context, log logrus.FieldLogger, node *v1.Node) error { + pods, err := h.listNodePodsToEvict(ctx, node) + if err != nil { + return err + } - // Pod is misconfigured - stop retry. - if apierrors.IsInternalError(err) { - return backoff.Permanent(err) - } - } + log.Infof("evicting %d pods", len(pods)) - // Other errors - retry. + if err := h.sendPodsRequests(ctx, pods, h.evictPod); err != nil { + return fmt.Errorf("sending evict pods requests: %w", err) + } + + return h.waitNodePodsTerminated(ctx, node) +} + +func (h *drainNodeHandler) deleteNodePods(ctx context.Context, log logrus.FieldLogger, node *v1.Node) error { + pods, err := h.listNodePodsToEvict(ctx, node) + if err != nil { return err } - if err := backoff.Retry(action, b); err != nil { - return fmt.Errorf("deleting pod %s in namespace %s: %w", pod.Name, pod.Namespace, err) + + log.Infof("forcefully deleting %d pods", len(pods)) + + if err := h.sendPodsRequests(ctx, pods, h.deletePod); err != nil { + return fmt.Errorf("sending delete pods requests: %w", err) } - return nil + + return h.waitNodePodsTerminated(ctx, node) } -// taintNode to make it unshedulable. -func (h *drainNodeHandler) taintNode(ctx context.Context, node *v1.Node) error { - if node.Spec.Unschedulable { - return nil - } +func (h *drainNodeHandler) sendPodsRequests(ctx context.Context, pods []v1.Pod, f func(context.Context, v1.Pod) error) error { + const batchSize = 5 - err := patchNode(ctx, h.clientset, node, func(n *v1.Node) error { - n.Spec.Unschedulable = true - return nil - }) - if err != nil { - return fmt.Errorf("patching node unschedulable: %w", err) + for _, batch := range lo.Chunk(pods, batchSize) { + g, ctx := errgroup.WithContext(ctx) + for _, pod := range batch { + pod := pod + g.Go(func() error { return f(ctx, pod) }) + } + if err := g.Wait(); err != nil { + return err + } } + return nil } -// listNodePods returns a list of all pods scheduled on the provided node. -func (h *drainNodeHandler) listNodePods(ctx context.Context, node *v1.Node) (*v1.PodList, error) { +func (h *drainNodeHandler) listNodePodsToEvict(ctx context.Context, node *v1.Node) ([]v1.Pod, error) { var pods *v1.PodList - err := backoff.Retry(func() error { + if err := backoff.Retry(func() error { p, err := h.clientset.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{ FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": node.Name}).String(), }) @@ -183,58 +169,28 @@ func (h *drainNodeHandler) listNodePods(ctx context.Context, node *v1.Node) (*v1 } pods = p return nil - }, defaultBackoff(ctx)) - return pods, err -} - -func (h *drainNodeHandler) evictPods(ctx context.Context, log logrus.FieldLogger, pods []v1.Pod) error { - log.Infof("evicting %d pods", len(pods)) - - g, ctx := errgroup.WithContext(ctx) - for _, pod := range pods { - pod := pod - - g.Go(func() error { - err := h.evictPod(ctx, pod) - if err != nil { - return err - } - return h.waitPodTerminated(ctx, log, pod) - }) + }, defaultBackoff(ctx)); err != nil { + return nil, fmt.Errorf("listing node %v pods: %w", node.Name, err) } - if err := g.Wait(); err != nil { - return fmt.Errorf("evicting pods: %w", err) - } + podsToEvict := lo.Filter(pods.Items, func(pod v1.Pod, _ int) bool { + return !isDaemonSetPod(&pod) && !isStaticPod(&pod) + }) - return nil + return podsToEvict, nil } -func (h *drainNodeHandler) waitPodTerminated(ctx context.Context, log logrus.FieldLogger, pod v1.Pod) error { - b := backoff.WithContext(backoff.NewConstantBackOff(5*time.Second), ctx) // nolint:gomnd - - err := backoff.Retry(func() error { - p, err := h.clientset.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) - if err != nil && apierrors.IsNotFound(err) { - return nil - } +func (h *drainNodeHandler) waitNodePodsTerminated(ctx context.Context, node *v1.Node) error { + return backoff.Retry(func() error { + pods, err := h.listNodePodsToEvict(ctx, node) if err != nil { - return err + return fmt.Errorf("waiting for node %q pods to be terminated: %w", node.Name, err) } - // replicaSets will recreate pods with equal name and namespace, therefore we compare UIDs - if p.GetUID() == pod.GetUID() { - return errPodPresent + if len(pods) > 0 { + return fmt.Errorf("waiting for %d pods to be terminated on node %v", len(pods), node.Name) } return nil - }, b) - if err != nil && errors.Is(err, errPodPresent) { - log.Infof("timeout waiting for pod %s in namespace %s to terminate", pod.Name, pod.Namespace) - return nil - } - if err != nil { - return fmt.Errorf("waiting for pod %s in namespace %s termination: %w", pod.Name, pod.Namespace, err) - } - return nil + }, backoff.WithContext(backoff.NewConstantBackOff(h.cfg.podsTerminationWaitRetryDelay), ctx)) } // evictPod from the k8s node. Error handling is based on eviction api documentation: @@ -274,6 +230,31 @@ func (h *drainNodeHandler) evictPod(ctx context.Context, pod v1.Pod) error { return nil } +func (h *drainNodeHandler) deletePod(ctx context.Context, pod v1.Pod) error { + b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(h.cfg.podDeleteRetryDelay), h.cfg.podDeleteRetries), ctx) // nolint:gomnd + action := func() error { + err := h.clientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + if err != nil { + // Pod is not found - ignore. + if apierrors.IsNotFound(err) { + return nil + } + + // Pod is misconfigured - stop retry. + if apierrors.IsInternalError(err) { + return backoff.Permanent(err) + } + } + + // Other errors - retry. + return err + } + if err := backoff.Retry(action, b); err != nil { + return fmt.Errorf("deleting pod %s in namespace %s: %w", pod.Name, pod.Namespace, err) + } + return nil +} + func isDaemonSetPod(p *v1.Pod) bool { return isControlledBy(p, "DaemonSet") } diff --git a/actions/drain_node_handler_test.go b/actions/drain_node_handler_test.go index 867d07c8..e4b25e40 100644 --- a/actions/drain_node_handler_test.go +++ b/actions/drain_node_handler_test.go @@ -103,7 +103,7 @@ func TestDrainNodeHandler(t *testing.T) { } err := h.Handle(context.Background(), req) - r.EqualError(err, "evicting pods: evicting pod pod1 in namespace default: internal") + r.EqualError(err, "sending evict pods requests: evicting pod pod1 in namespace default: internal") n, err := clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) r.NoError(err) diff --git a/config/config.go b/config/config.go index c328ed31..bc5bd689 100644 --- a/config/config.go +++ b/config/config.go @@ -11,6 +11,7 @@ type Config struct { Log Log API API Kubeconfig string + KubeClient KubeClient ClusterID string PprofPort int LeaderElection LeaderElection @@ -32,6 +33,15 @@ type LeaderElection struct { LockName string } +type KubeClient struct { + // K8S client rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a + // smoothed qps rate of 'qps'. + // The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'. + // The maximum number of tokens in the bucket is capped at 'burst'. + QPS int + Burst int +} + var cfg *Config // Get configuration bound to environment variables. @@ -45,6 +55,8 @@ func Get() Config { _ = viper.BindEnv("api.url", "API_URL") _ = viper.BindEnv("clusterid", "CLUSTER_ID") _ = viper.BindEnv("kubeconfig") + _ = viper.BindEnv("kubeclient.qps", "KUBECLIENT_QPS") + _ = viper.BindEnv("kubeclient.burst", "KUBECLIENT_BURST") _ = viper.BindEnv("pprofport", "PPROF_PORT") _ = viper.BindEnv("leaderelection.enabled", "LEADER_ELECTION_ENABLED") _ = viper.BindEnv("leaderelection.namespace", "LEADER_ELECTION_NAMESPACE") @@ -79,6 +91,12 @@ func Get() Config { required("LEADER_ELECTION_LOCK_NAME") } } + if cfg.KubeClient.QPS == 0 { + cfg.KubeClient.QPS = 25 + } + if cfg.KubeClient.Burst == 0 { + cfg.KubeClient.Burst = 150 + } return *cfg } diff --git a/config/config_test.go b/config/config_test.go index d4ab12d4..7785bba0 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -25,4 +25,6 @@ func TestConfig(t *testing.T) { require.Equal(t, true, cfg.LeaderElection.Enabled) require.Equal(t, "castai-agent", cfg.LeaderElection.Namespace) require.Equal(t, "castai-cluster-controller", cfg.LeaderElection.LockName) + require.Equal(t, 25, cfg.KubeClient.QPS) + require.Equal(t, 150, cfg.KubeClient.Burst) } diff --git a/go.mod b/go.mod index 1fa4a4ac..a2a27c39 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/go-errors/errors v1.0.1 // indirect github.com/go-gorp/gorp/v3 v3.0.2 // indirect - github.com/go-logr/logr v1.2.2 // indirect + github.com/go-logr/logr v1.2.3 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.19.5 // indirect github.com/go-openapi/swag v0.19.14 // indirect @@ -127,7 +127,7 @@ require ( golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect golang.org/x/net v0.0.0-20220107192237-5cfca573fb4d // indirect golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect - golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect + golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect diff --git a/go.sum b/go.sum index 855028bb..4e389c8c 100644 --- a/go.sum +++ b/go.sum @@ -458,8 +458,9 @@ github.com/go-logr/logr v0.3.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTg github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.1/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.2 h1:ahHml/yUpnlb96Rp8HCvtYVPY8ZYpxq3g7UYchIYwbs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/stdr v1.2.0/go.mod h1:YkVgnZu1ZjjL7xTxrfm/LLZBfkhTqSR1ydtm6jTKKwI= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-logr/zapr v0.2.0/go.mod h1:qhKdvif7YF5GI9NWEpyxTSSBdGmzkNguibrdCNVPunU= @@ -1477,8 +1478,9 @@ golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 h1:xHms4gcpe1YE7A3yIllJXP16CMAGuqwO2lX1mTyyRRc= +golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/main.go b/main.go index 371ac63e..7b0aac8d 100644 --- a/main.go +++ b/main.go @@ -21,14 +21,14 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/util/flowcontrol" "sigs.k8s.io/controller-runtime/pkg/manager/signals" - "github.com/castai/cluster-controller/aks" - "github.com/castai/cluster-controller/helm" - "github.com/castai/cluster-controller/actions" + "github.com/castai/cluster-controller/aks" "github.com/castai/cluster-controller/castai" "github.com/castai/cluster-controller/config" + "github.com/castai/cluster-controller/helm" ctrlog "github.com/castai/cluster-controller/log" "github.com/castai/cluster-controller/version" ) @@ -109,6 +109,7 @@ func run( if err != nil { return err } + restconfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(cfg.KubeClient.QPS), cfg.KubeClient.Burst) helmClient := helm.NewClient(logger, helm.NewChartLoader(), restconfig) @@ -285,6 +286,7 @@ func retrieveKubeConfig(log logrus.FieldLogger) (*rest.Config, error) { } }) log.Debug("using in cluster kubeconfig") + return inClusterConfig, nil }