diff --git a/README.md b/README.md index 137eacd3..3f328b8b 100644 --- a/README.md +++ b/README.md @@ -40,21 +40,22 @@ Therefore, this application will not run into any issues if it is restarted, res ## Usage -| Environment variable | Description | Required | Default | -|:-----------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:---------|:------------| -| CLUSTER_NAME | Name of the eks-cluster, used in place of `AUTODISCOVERRY_TAGS` and `AUTO_SCALING_GROUP_NAMES`. Checks for `k8s.io/cluster-autoscaler/: owned` and `k8s.io/cluster-autoscaler/enabled: true` tags on ASG | yes | `""` | -| AUTODISCOVERY_TAGS | Comma separated key value string with tags to autodiscover ASGs, used in place of `CLUSTER_NAME` and `AUTO_SCALING_GROUP_NAMES`. | yes | `""` | -| AUTO_SCALING_GROUP_NAMES | Comma-separated list of ASGs, CLUSTER_NAME takes priority. | yes | `""` | -| IGNORE_DAEMON_SETS | Whether to ignore DaemonSets when draining the nodes | no | `true` | -| DELETE_EMPTY_DIR_DATA | Whether to delete empty dir data when draining the nodes | no | `true` | -| AWS_REGION | Self-explanatory | no | `us-west-2` | -| ENVIRONMENT | If set to `dev`, will try to create the Kubernetes client using your local kubeconfig. Any other values will use the in-cluster configuration | no | `""` | -| EXECUTION_INTERVAL | Duration to sleep between each execution in seconds | no | `20` | -| EXECUTION_TIMEOUT | Maximum execution duration before timing out in seconds | no | `900` | -| POD_TERMINATION_GRACE_PERIOD | How long to wait for a pod to terminate in seconds; 0 means "delete immediately"; set to a negative value to use the pod's terminationGracePeriodSeconds. | no | `-1` | -| METRICS_PORT | Port to bind metrics server to | no | `8080` | -| METRICS | Expose metrics in Prometheus format at `:${METRICS_PORT}/metrics` | no | `""` | -| SLOW_MODE | If enabled, every time a node is terminated during an execution, the current execution will stop rather than continuing to the next ASG | no | `false` | +| Environment variable | Description | Required | Default | +|:------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:---------|:------------| +| CLUSTER_NAME | Name of the eks-cluster, used in place of `AUTODISCOVERRY_TAGS` and `AUTO_SCALING_GROUP_NAMES`. Checks for `k8s.io/cluster-autoscaler/: owned` and `k8s.io/cluster-autoscaler/enabled: true` tags on ASG | yes | `""` | +| AUTODISCOVERY_TAGS | Comma separated key value string with tags to autodiscover ASGs, used in place of `CLUSTER_NAME` and `AUTO_SCALING_GROUP_NAMES`. | yes | `""` | +| AUTO_SCALING_GROUP_NAMES | Comma-separated list of ASGs, CLUSTER_NAME takes priority. | yes | `""` | +| IGNORE_DAEMON_SETS | Whether to ignore DaemonSets when draining the nodes | no | `true` | +| DELETE_EMPTY_DIR_DATA | Whether to delete empty dir data when draining the nodes | no | `true` | +| AWS_REGION | Self-explanatory | no | `us-west-2` | +| ENVIRONMENT | If set to `dev`, will try to create the Kubernetes client using your local kubeconfig. Any other values will use the in-cluster configuration | no | `""` | +| EXECUTION_INTERVAL | Duration to sleep between each execution in seconds | no | `20` | +| EXECUTION_TIMEOUT | Maximum execution duration before timing out in seconds | no | `900` | +| POD_TERMINATION_GRACE_PERIOD | How long to wait for a pod to terminate in seconds; 0 means "delete immediately"; set to a negative value to use the pod's terminationGracePeriodSeconds. | no | `-1` | +| METRICS_PORT | Port to bind metrics server to | no | `8080` | +| METRICS | Expose metrics in Prometheus format at `:${METRICS_PORT}/metrics` | no | `""` | +| SLOW_MODE | If enabled, every time a node is terminated during an execution, the current execution will stop rather than continuing to the next ASG | no | `false` | +| EAGER_CORDONING | If enabled, alle outdated nodes will get cordoned before any rolling update action. The default mode is to cordon a node just before draining it. See [#41](https://github.com/TwiN/aws-eks-asg-rolling-update-handler/issues/41) for possible consequences of enabling this. | no | `false` | **NOTE:** Only one of `CLUSTER_NAME`, `AUTODISCOVERY_TAGS` or `AUTO_SCALING_GROUP_NAMES` must be set. diff --git a/config/config.go b/config/config.go index 26226ef2..3b17e746 100644 --- a/config/config.go +++ b/config/config.go @@ -27,6 +27,7 @@ const ( EnvMetrics = "METRICS" EnvMetricsPort = "METRICS_PORT" EnvSlowMode = "SLOW_MODE" + EnvEagerCordoning = "EAGER_CORDONING" ) type config struct { @@ -43,14 +44,16 @@ type config struct { Metrics bool // Defaults to false MetricsPort int // Defaults to 8080 SlowMode bool // Defaults to false + EagerCordoning bool // Defaults to false } // Initialize is used to initialize the application's configuration func Initialize() error { cfg = &config{ - Environment: strings.ToLower(os.Getenv(EnvEnvironment)), - Debug: strings.ToLower(os.Getenv(EnvDebug)) == "true", - SlowMode: strings.ToLower(os.Getenv(EnvSlowMode)) == "true", + Environment: strings.ToLower(os.Getenv(EnvEnvironment)), + Debug: strings.ToLower(os.Getenv(EnvDebug)) == "true", + SlowMode: strings.ToLower(os.Getenv(EnvSlowMode)) == "true", + EagerCordoning: strings.ToLower(os.Getenv(EnvEagerCordoning)) == "true", } if clusterName := os.Getenv(EnvClusterName); len(clusterName) > 0 { cfg.AutodiscoveryTags = fmt.Sprintf("k8s.io/cluster-autoscaler/%s=owned,k8s.io/cluster-autoscaler/enabled=true", clusterName) diff --git a/k8s/client.go b/k8s/client.go index 64312b19..703ddee0 100644 --- a/k8s/client.go +++ b/k8s/client.go @@ -17,6 +17,7 @@ import ( const ( AnnotationRollingUpdateStartedTimestamp = "aws-eks-asg-rolling-update-handler.twin.sh/started-at" + AnnotationRollingUpdateCordonedTimestamp = "aws-eks-asg-rolling-update-handler.twin.sh/cordoned-at" AnnotationRollingUpdateDrainedTimestamp = "aws-eks-asg-rolling-update-handler.twin.sh/drained-at" AnnotationRollingUpdateTerminatedTimestamp = "aws-eks-asg-rolling-update-handler.twin.sh/terminated-at" @@ -33,6 +34,7 @@ type ClientAPI interface { GetNodeByAutoScalingInstance(instance *autoscaling.Instance) (*v1.Node, error) FilterNodeByAutoScalingInstance(nodes []v1.Node, instance *autoscaling.Instance) (*v1.Node, error) UpdateNode(node *v1.Node) error + Cordon(nodeName string) error Drain(nodeName string, ignoreDaemonSets, deleteEmptyDirData bool, podTerminationGracePeriod int) error } @@ -108,6 +110,23 @@ func (k *Client) UpdateNode(node *v1.Node) error { return err } +// Cordon disables scheduling new pods onto the given node +func (k *Client) Cordon(nodeName string) error { + node, err := k.client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return err + } + drainer := &drain.Helper{ + Client: k.client, + Ctx: context.TODO(), + } + if err := drain.RunCordonOrUncordon(drainer, node, true); err != nil { + log.Printf("[%s][CORDONER] Failed to cordon node: %v", node.Name, err) + return err + } + return nil +} + // Drain gracefully deletes all pods from a given node func (k *Client) Drain(nodeName string, ignoreDaemonSets, deleteEmptyDirData bool, podTerminationGracePeriod int) error { node, err := k.client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) @@ -128,10 +147,6 @@ func (k *Client) Drain(nodeName string, ignoreDaemonSets, deleteEmptyDirData boo log.Printf("[%s][DRAINER] evicted pod %s/%s", nodeName, pod.Namespace, pod.Name) }, } - if err := drain.RunCordonOrUncordon(drainer, node, true); err != nil { - log.Printf("[%s][DRAINER] Failed to cordon node: %v", node.Name, err) - return err - } if err := drain.RunNodeDrain(drainer, node.Name); err != nil { log.Printf("[%s][DRAINER] Failed to drain node: %v", node.Name, err) return err diff --git a/k8s/client_test.go b/k8s/client_test.go index 58e7850c..073eaf19 100644 --- a/k8s/client_test.go +++ b/k8s/client_test.go @@ -11,8 +11,10 @@ import ( func TestClient_Drain(t *testing.T) { fakeKubernetesClient := fakekubernetes.NewSimpleClientset(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "default"}}) kc := NewClient(fakeKubernetesClient) - err := kc.Drain("default", true, true, -1) - if err != nil { + if err := kc.Cordon("default"); err != nil { + t.Errorf("Unexpected error: %v", err) + } + if err := kc.Drain("default", true, true, -1); err != nil { t.Errorf("Unexpected error: %v", err) } } diff --git a/k8stest/k8stest.go b/k8stest/k8stest.go index 8e78b738..2a2dd336 100644 --- a/k8stest/k8stest.go +++ b/k8stest/k8stest.go @@ -76,6 +76,11 @@ func (mock *MockClient) UpdateNode(node *v1.Node) error { return nil } +func (mock *MockClient) Cordon(nodeName string) error { + mock.Counter["Cordon"]++ + return nil +} + func (mock *MockClient) Drain(nodeName string, ignoreDaemonSets, deleteLocalData bool, podTerminationGracePeriod int) error { mock.Counter["Drain"]++ return nil diff --git a/main.go b/main.go index b100dbd6..c67facb3 100644 --- a/main.go +++ b/main.go @@ -151,13 +151,39 @@ func DoHandleRollingUpgrade(client k8s.ClientAPI, ec2Service ec2iface.EC2API, au rand.Shuffle(len(outdatedInstances), func(i, j int) { outdatedInstances[i], outdatedInstances[j] = outdatedInstances[j], outdatedInstances[i] }) + + if config.Get().EagerCordoning { + for _, outdatedInstance := range outdatedInstances { + node, err := client.GetNodeByAutoScalingInstance(outdatedInstance) + if err != nil { + log.Printf("[%s][%s] Skipping because unable to get outdated node from Kubernetes: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error()) + continue + } + + _, minutesSinceCordoned, _, _ := getRollingUpdateTimestampsFromNode(node) + if minutesSinceCordoned == -1 { + log.Printf("[%s][%s] Cordoning node", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId)) + if err := client.Cordon(node.Name); err != nil { + metrics.Server.Errors.Inc() + log.Printf("[%s][%s] Skipping because ran into error while cordoning node: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error()) + continue + } + + if err := k8s.AnnotateNodeByAutoScalingInstance(client, outdatedInstance, k8s.AnnotationRollingUpdateCordonedTimestamp, time.Now().Format(time.RFC3339)); err != nil { + log.Printf("[%s][%s] Skipping because unable to annotate node: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error()) + continue + } + } + } + } + for _, outdatedInstance := range outdatedInstances { node, err := client.GetNodeByAutoScalingInstance(outdatedInstance) if err != nil { log.Printf("[%s][%s] Skipping because unable to get outdated node from Kubernetes: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error()) continue } - minutesSinceStarted, minutesSinceDrained, minutesSinceTerminated := getRollingUpdateTimestampsFromNode(node) + minutesSinceStarted, minutesSinceCordoned, minutesSinceDrained, minutesSinceTerminated := getRollingUpdateTimestampsFromNode(node) // Check if outdated nodes in k8s have been marked with annotation from aws-eks-asg-rolling-update-handler if minutesSinceStarted == -1 { log.Printf("[%s][%s] Starting node rollout process", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId)) @@ -173,6 +199,20 @@ func DoHandleRollingUpgrade(client k8s.ClientAPI, ec2Service ec2iface.EC2API, au hasEnoughResources := k8s.CheckIfNodeHasEnoughResourcesToTransferAllPodsInNodes(client, node, updatedReadyNodes) if hasEnoughResources { log.Printf("[%s][%s] Updated nodes have enough resources available", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId)) + if minutesSinceCordoned == -1 { + log.Printf("[%s][%s] Cordoning node", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId)) + err := client.Cordon(node.Name) + if err != nil { + metrics.Server.Errors.Inc() + log.Printf("[%s][%s] Skipping because ran into error while cordoning node: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error()) + continue + } else { + // Only annotate if no error was encountered + _ = k8s.AnnotateNodeByAutoScalingInstance(client, outdatedInstance, k8s.AnnotationRollingUpdateCordonedTimestamp, time.Now().Format(time.RFC3339)) + } + } else { + log.Printf("[%s][%s] Node has already been cordoned %d minutes ago, skipping", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), minutesSinceCordoned) + } if minutesSinceDrained == -1 { log.Printf("[%s][%s] Draining node", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId)) err := client.Drain(node.Name, config.Get().IgnoreDaemonSets, config.Get().DeleteEmptyDirData, config.Get().PodTerminationGracePeriod) @@ -312,7 +352,7 @@ func getReadyNodesAndNumberOfNonReadyNodesOrInstances(client k8s.ClientAPI, upda return updatedReadyNodes, numberOfNonReadyNodesOrInstances } -func getRollingUpdateTimestampsFromNode(node *v1.Node) (minutesSinceStarted int, minutesSinceDrained int, minutesSinceTerminated int) { +func getRollingUpdateTimestampsFromNode(node *v1.Node) (minutesSinceStarted int, minutesSinceCordoned int, minutesSinceDrained int, minutesSinceTerminated int) { rollingUpdateStartedAt, ok := node.Annotations[k8s.AnnotationRollingUpdateStartedTimestamp] if ok { startedAt, err := time.Parse(time.RFC3339, rollingUpdateStartedAt) @@ -322,6 +362,15 @@ func getRollingUpdateTimestampsFromNode(node *v1.Node) (minutesSinceStarted int, } else { minutesSinceStarted = -1 } + cordonedAtValue, ok := node.Annotations[k8s.AnnotationRollingUpdateCordonedTimestamp] + if ok { + cordonedAt, err := time.Parse(time.RFC3339, cordonedAtValue) + if err == nil { + minutesSinceCordoned = int(time.Since(cordonedAt).Minutes()) + } + } else { + minutesSinceCordoned = -1 + } drainedAtValue, ok := node.Annotations[k8s.AnnotationRollingUpdateDrainedTimestamp] if ok { drainedAt, err := time.Parse(time.RFC3339, drainedAtValue)