Skip to content

Commit

Permalink
feat: improve api request count for node drain action (#36)
Browse files Browse the repository at this point in the history
- batching pod requests in chunks of 5 to not overload control plane api
- do not call for each pod individually when waiting for them to be terminated on the node, list node pods instead
- expose k8s clientset rate limiting config. Defaults: 25 QPS, 150 Burst
  • Loading branch information
r0kas authored Jun 8, 2022
1 parent 69d4f28 commit 4b1812b
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 123 deletions.
211 changes: 96 additions & 115 deletions actions/drain_node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,24 @@ import (
"github.com/castai/cluster-controller/castai"
)

var (
errPodPresent = errors.New("pod is still present")
)

type drainNodeConfig struct {
podsDeleteTimeout time.Duration
podDeleteRetries uint64
podDeleteRetryDelay time.Duration
podEvictRetryDelay time.Duration
podsDeleteTimeout time.Duration
podDeleteRetries uint64
podDeleteRetryDelay time.Duration
podEvictRetryDelay time.Duration
podsTerminationWaitRetryDelay time.Duration
}

func newDrainNodeHandler(log logrus.FieldLogger, clientset kubernetes.Interface) ActionHandler {
return &drainNodeHandler{
log: log,
clientset: clientset,
cfg: drainNodeConfig{
podsDeleteTimeout: 5 * time.Minute,
podDeleteRetries: 5,
podDeleteRetryDelay: 5 * time.Second,
podEvictRetryDelay: 5 * time.Second,
podsDeleteTimeout: 5 * time.Minute,
podDeleteRetries: 5,
podDeleteRetryDelay: 5 * time.Second,
podEvictRetryDelay: 5 * time.Second,
podsTerminationWaitRetryDelay: 10 * time.Second,
},
}
}
Expand Down Expand Up @@ -73,19 +71,10 @@ func (h *drainNodeHandler) Handle(ctx context.Context, data interface{}) error {
return fmt.Errorf("tainting node %q: %w", req.NodeName, err)
}

allNodePods, err := h.listNodePods(ctx, node)
if err != nil {
return fmt.Errorf("listing pods for node %q: %w", req.NodeName, err)
}

podsToEvict := lo.Filter(allNodePods.Items, func(pod v1.Pod, _ int) bool {
return !isDaemonSetPod(&pod) && !isStaticPod(&pod)
})

// First try to evict pods gracefully.
evictCtx, evictCancel := context.WithTimeout(ctx, time.Duration(req.DrainTimeoutSeconds)*time.Second)
defer evictCancel()
err = h.evictPods(evictCtx, log, podsToEvict)
err = h.evictNodePods(evictCtx, log, node)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
Expand All @@ -97,7 +86,7 @@ func (h *drainNodeHandler) Handle(ctx context.Context, data interface{}) error {
// If force is set and evict timeout exceeded delete pods.
deleteCtx, deleteCancel := context.WithTimeout(ctx, h.cfg.podsDeleteTimeout)
defer deleteCancel()
if err := h.deletePods(deleteCtx, log, podsToEvict); err != nil {
if err := h.deleteNodePods(deleteCtx, log, node); err != nil {
return err
}
}
Expand All @@ -107,74 +96,71 @@ func (h *drainNodeHandler) Handle(ctx context.Context, data interface{}) error {
return nil
}

func (h *drainNodeHandler) deletePods(ctx context.Context, log logrus.FieldLogger, pods []v1.Pod) error {
log.Infof("forcefully deleting %d pods", len(pods))

g, ctx := errgroup.WithContext(ctx)
for _, pod := range pods {
pod := pod

g.Go(func() error {
err := h.deletePod(ctx, pod)
if err != nil {
return err
}
return h.waitPodTerminated(ctx, log, pod)
})
func (h *drainNodeHandler) taintNode(ctx context.Context, node *v1.Node) error {
if node.Spec.Unschedulable {
return nil
}

if err := g.Wait(); err != nil {
return fmt.Errorf("deleting pods: %w", err)
err := patchNode(ctx, h.clientset, node, func(n *v1.Node) error {
n.Spec.Unschedulable = true
return nil
})
if err != nil {
return fmt.Errorf("patching node unschedulable: %w", err)
}

return nil
}

func (h *drainNodeHandler) deletePod(ctx context.Context, pod v1.Pod) error {
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(h.cfg.podDeleteRetryDelay), h.cfg.podDeleteRetries), ctx) // nolint:gomnd
action := func() error {
err := h.clientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err != nil {
// Pod is not found - ignore.
if apierrors.IsNotFound(err) {
return nil
}
func (h *drainNodeHandler) evictNodePods(ctx context.Context, log logrus.FieldLogger, node *v1.Node) error {
pods, err := h.listNodePodsToEvict(ctx, node)
if err != nil {
return err
}

// Pod is misconfigured - stop retry.
if apierrors.IsInternalError(err) {
return backoff.Permanent(err)
}
}
log.Infof("evicting %d pods", len(pods))

// Other errors - retry.
if err := h.sendPodsRequests(ctx, pods, h.evictPod); err != nil {
return fmt.Errorf("sending evict pods requests: %w", err)
}

return h.waitNodePodsTerminated(ctx, node)
}

func (h *drainNodeHandler) deleteNodePods(ctx context.Context, log logrus.FieldLogger, node *v1.Node) error {
pods, err := h.listNodePodsToEvict(ctx, node)
if err != nil {
return err
}
if err := backoff.Retry(action, b); err != nil {
return fmt.Errorf("deleting pod %s in namespace %s: %w", pod.Name, pod.Namespace, err)

log.Infof("forcefully deleting %d pods", len(pods))

if err := h.sendPodsRequests(ctx, pods, h.deletePod); err != nil {
return fmt.Errorf("sending delete pods requests: %w", err)
}
return nil

return h.waitNodePodsTerminated(ctx, node)
}

// taintNode to make it unshedulable.
func (h *drainNodeHandler) taintNode(ctx context.Context, node *v1.Node) error {
if node.Spec.Unschedulable {
return nil
}
func (h *drainNodeHandler) sendPodsRequests(ctx context.Context, pods []v1.Pod, f func(context.Context, v1.Pod) error) error {
const batchSize = 5

err := patchNode(ctx, h.clientset, node, func(n *v1.Node) error {
n.Spec.Unschedulable = true
return nil
})
if err != nil {
return fmt.Errorf("patching node unschedulable: %w", err)
for _, batch := range lo.Chunk(pods, batchSize) {
g, ctx := errgroup.WithContext(ctx)
for _, pod := range batch {
pod := pod
g.Go(func() error { return f(ctx, pod) })
}
if err := g.Wait(); err != nil {
return err
}
}

return nil
}

// listNodePods returns a list of all pods scheduled on the provided node.
func (h *drainNodeHandler) listNodePods(ctx context.Context, node *v1.Node) (*v1.PodList, error) {
func (h *drainNodeHandler) listNodePodsToEvict(ctx context.Context, node *v1.Node) ([]v1.Pod, error) {
var pods *v1.PodList
err := backoff.Retry(func() error {
if err := backoff.Retry(func() error {
p, err := h.clientset.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": node.Name}).String(),
})
Expand All @@ -183,58 +169,28 @@ func (h *drainNodeHandler) listNodePods(ctx context.Context, node *v1.Node) (*v1
}
pods = p
return nil
}, defaultBackoff(ctx))
return pods, err
}

func (h *drainNodeHandler) evictPods(ctx context.Context, log logrus.FieldLogger, pods []v1.Pod) error {
log.Infof("evicting %d pods", len(pods))

g, ctx := errgroup.WithContext(ctx)
for _, pod := range pods {
pod := pod

g.Go(func() error {
err := h.evictPod(ctx, pod)
if err != nil {
return err
}
return h.waitPodTerminated(ctx, log, pod)
})
}, defaultBackoff(ctx)); err != nil {
return nil, fmt.Errorf("listing node %v pods: %w", node.Name, err)
}

if err := g.Wait(); err != nil {
return fmt.Errorf("evicting pods: %w", err)
}
podsToEvict := lo.Filter(pods.Items, func(pod v1.Pod, _ int) bool {
return !isDaemonSetPod(&pod) && !isStaticPod(&pod)
})

return nil
return podsToEvict, nil
}

func (h *drainNodeHandler) waitPodTerminated(ctx context.Context, log logrus.FieldLogger, pod v1.Pod) error {
b := backoff.WithContext(backoff.NewConstantBackOff(5*time.Second), ctx) // nolint:gomnd

err := backoff.Retry(func() error {
p, err := h.clientset.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil && apierrors.IsNotFound(err) {
return nil
}
func (h *drainNodeHandler) waitNodePodsTerminated(ctx context.Context, node *v1.Node) error {
return backoff.Retry(func() error {
pods, err := h.listNodePodsToEvict(ctx, node)
if err != nil {
return err
return fmt.Errorf("waiting for node %q pods to be terminated: %w", node.Name, err)
}
// replicaSets will recreate pods with equal name and namespace, therefore we compare UIDs
if p.GetUID() == pod.GetUID() {
return errPodPresent
if len(pods) > 0 {
return fmt.Errorf("waiting for %d pods to be terminated on node %v", len(pods), node.Name)
}
return nil
}, b)
if err != nil && errors.Is(err, errPodPresent) {
log.Infof("timeout waiting for pod %s in namespace %s to terminate", pod.Name, pod.Namespace)
return nil
}
if err != nil {
return fmt.Errorf("waiting for pod %s in namespace %s termination: %w", pod.Name, pod.Namespace, err)
}
return nil
}, backoff.WithContext(backoff.NewConstantBackOff(h.cfg.podsTerminationWaitRetryDelay), ctx))
}

// evictPod from the k8s node. Error handling is based on eviction api documentation:
Expand Down Expand Up @@ -274,6 +230,31 @@ func (h *drainNodeHandler) evictPod(ctx context.Context, pod v1.Pod) error {
return nil
}

func (h *drainNodeHandler) deletePod(ctx context.Context, pod v1.Pod) error {
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(h.cfg.podDeleteRetryDelay), h.cfg.podDeleteRetries), ctx) // nolint:gomnd
action := func() error {
err := h.clientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err != nil {
// Pod is not found - ignore.
if apierrors.IsNotFound(err) {
return nil
}

// Pod is misconfigured - stop retry.
if apierrors.IsInternalError(err) {
return backoff.Permanent(err)
}
}

// Other errors - retry.
return err
}
if err := backoff.Retry(action, b); err != nil {
return fmt.Errorf("deleting pod %s in namespace %s: %w", pod.Name, pod.Namespace, err)
}
return nil
}

func isDaemonSetPod(p *v1.Pod) bool {
return isControlledBy(p, "DaemonSet")
}
Expand Down
2 changes: 1 addition & 1 deletion actions/drain_node_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestDrainNodeHandler(t *testing.T) {
}

err := h.Handle(context.Background(), req)
r.EqualError(err, "evicting pods: evicting pod pod1 in namespace default: internal")
r.EqualError(err, "sending evict pods requests: evicting pod pod1 in namespace default: internal")

n, err := clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
r.NoError(err)
Expand Down
18 changes: 18 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Config struct {
Log Log
API API
Kubeconfig string
KubeClient KubeClient
ClusterID string
PprofPort int
LeaderElection LeaderElection
Expand All @@ -32,6 +33,15 @@ type LeaderElection struct {
LockName string
}

type KubeClient struct {
// K8S client rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a
// smoothed qps rate of 'qps'.
// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'.
// The maximum number of tokens in the bucket is capped at 'burst'.
QPS int
Burst int
}

var cfg *Config

// Get configuration bound to environment variables.
Expand All @@ -45,6 +55,8 @@ func Get() Config {
_ = viper.BindEnv("api.url", "API_URL")
_ = viper.BindEnv("clusterid", "CLUSTER_ID")
_ = viper.BindEnv("kubeconfig")
_ = viper.BindEnv("kubeclient.qps", "KUBECLIENT_QPS")
_ = viper.BindEnv("kubeclient.burst", "KUBECLIENT_BURST")
_ = viper.BindEnv("pprofport", "PPROF_PORT")
_ = viper.BindEnv("leaderelection.enabled", "LEADER_ELECTION_ENABLED")
_ = viper.BindEnv("leaderelection.namespace", "LEADER_ELECTION_NAMESPACE")
Expand Down Expand Up @@ -79,6 +91,12 @@ func Get() Config {
required("LEADER_ELECTION_LOCK_NAME")
}
}
if cfg.KubeClient.QPS == 0 {
cfg.KubeClient.QPS = 25
}
if cfg.KubeClient.Burst == 0 {
cfg.KubeClient.Burst = 150
}

return *cfg
}
Expand Down
2 changes: 2 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ func TestConfig(t *testing.T) {
require.Equal(t, true, cfg.LeaderElection.Enabled)
require.Equal(t, "castai-agent", cfg.LeaderElection.Namespace)
require.Equal(t, "castai-cluster-controller", cfg.LeaderElection.LockName)
require.Equal(t, 25, cfg.KubeClient.QPS)
require.Equal(t, 150, cfg.KubeClient.Burst)
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ require (
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/go-errors/errors v1.0.1 // indirect
github.com/go-gorp/gorp/v3 v3.0.2 // indirect
github.com/go-logr/logr v1.2.2 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.19.5 // indirect
github.com/go-openapi/swag v0.19.14 // indirect
Expand Down Expand Up @@ -127,7 +127,7 @@ require (
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/net v0.0.0-20220107192237-5cfca573fb4d // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,9 @@ github.com/go-logr/logr v0.3.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTg
github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.1/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.2 h1:ahHml/yUpnlb96Rp8HCvtYVPY8ZYpxq3g7UYchIYwbs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.0/go.mod h1:YkVgnZu1ZjjL7xTxrfm/LLZBfkhTqSR1ydtm6jTKKwI=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-logr/zapr v0.2.0/go.mod h1:qhKdvif7YF5GI9NWEpyxTSSBdGmzkNguibrdCNVPunU=
Expand Down Expand Up @@ -1477,8 +1478,9 @@ golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 h1:xHms4gcpe1YE7A3yIllJXP16CMAGuqwO2lX1mTyyRRc=
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
Loading

0 comments on commit 4b1812b

Please sign in to comment.