Skip to content

Commit

Permalink
Merge pull request gruntwork-io#81 from adamrbennett/delete-local-dat…
Browse files Browse the repository at this point in the history
…a-flag

Add delete-local-data flag and pass to kubectl drain if present
  • Loading branch information
yorinasub17 authored Feb 26, 2020
2 parents ec2f429 + fc9aea7 commit 956f129
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 5 deletions.
7 changes: 7 additions & 0 deletions cmd/eks.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ var (
Value: 15 * time.Minute,
Usage: "The length of time as duration (e.g 10m = 10 minutes) to wait for draining nodes before giving up, zero means infinite. Defaults to 15 minutes.",
}
deleteLocalDataFlag = cli.BoolFlag{
Name: "delete-local-data",
Usage: "Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).",
}
waitMaxRetriesFlag = cli.IntFlag{
Name: "max-retries",
Value: 0,
Expand Down Expand Up @@ -164,6 +168,7 @@ If max-retries is unspecified, this command will use a value that translates to
eksKubectlCAFlag,
eksKubectlTokenFlag,
drainTimeoutFlag,
deleteLocalDataFlag,
waitMaxRetriesFlag,
waitSleepBetweenRetriesFlag,
},
Expand Down Expand Up @@ -285,6 +290,7 @@ func rollOutDeployment(cliContext *cli.Context) error {
return errors.WithStackTrace(err)
}
drainTimeout := cliContext.Duration(drainTimeoutFlag.Name)
deleteLocalData := cliContext.Bool(deleteLocalDataFlag.Name)
waitMaxRetries := cliContext.Int(waitMaxRetriesFlag.Name)
waitSleepBetweenRetries := cliContext.Duration(waitSleepBetweenRetriesFlag.Name)

Expand All @@ -293,6 +299,7 @@ func rollOutDeployment(cliContext *cli.Context) error {
asgName,
kubectlOptions,
drainTimeout,
deleteLocalData,
waitMaxRetries,
waitSleepBetweenRetries,
)
Expand Down
3 changes: 2 additions & 1 deletion eks/asg.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,15 @@ func drainNodesInAsg(
kubectlOptions *kubectl.KubectlOptions,
asgInstanceIds []string,
drainTimeout time.Duration,
deleteLocalData bool,
) error {
instances, err := instanceDetailsFromIds(ec2Svc, asgInstanceIds)
if err != nil {
return err
}
eksKubeNodeNames := kubeNodeNamesFromInstances(instances)

return kubectl.DrainNodes(kubectlOptions, eksKubeNodeNames, drainTimeout)
return kubectl.DrainNodes(kubectlOptions, eksKubeNodeNames, drainTimeout, deleteLocalData)
}

// Make the call to cordon all the provided nodes in Kubernetes so that they won't be used to schedule new Pods.
Expand Down
3 changes: 2 additions & 1 deletion eks/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func RollOutDeployment(
eksAsgName string,
kubectlOptions *kubectl.KubectlOptions,
drainTimeout time.Duration,
deleteLocalData bool,
maxRetries int,
sleepBetweenRetries time.Duration,
) error {
Expand Down Expand Up @@ -109,7 +110,7 @@ func RollOutDeployment(
logger.Infof("Successfully cordoned old instances in cluster ASG %s", eksAsgName)

logger.Infof("Draining Pods on old instances in cluster ASG %s", eksAsgName)
err = drainNodesInAsg(ec2Svc, kubectlOptions, currentInstanceIds, drainTimeout)
err = drainNodesInAsg(ec2Svc, kubectlOptions, currentInstanceIds, drainTimeout, deleteLocalData)
if err != nil {
logger.Errorf("Error while draining nodes.")
logger.Errorf("Continue to drain nodes that failed manually, and then terminate the underlying instances to complete the rollout.")
Expand Down
15 changes: 12 additions & 3 deletions kubectl/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ func filterNodesByID(nodes []corev1.Node, nodeIds []string) []corev1.Node {
// See
// https://kubernetes.io/docs/tasks/administer-cluster/safely-drain-node/#use-kubectl-drain-to-remove-a-node-from-service
// for more information.
func DrainNodes(kubectlOptions *KubectlOptions, nodeIds []string, timeout time.Duration) error {
func DrainNodes(kubectlOptions *KubectlOptions, nodeIds []string, timeout time.Duration, deleteLocalData bool) error {
// Concurrently trigger drain events for all requested nodes.
var wg sync.WaitGroup // So that we can wait for all the drain calls
errChannel := make(chan NodeDrainError, 1) // Collect all errors from each command
for _, nodeID := range nodeIds {
wg.Add(1)
go drainNode(&wg, errChannel, kubectlOptions, nodeID, timeout)
go drainNode(&wg, errChannel, kubectlOptions, nodeID, timeout, deleteLocalData)
}
go waitForAllDrains(&wg, errChannel)

Expand All @@ -131,9 +131,18 @@ func drainNode(
kubectlOptions *KubectlOptions,
nodeID string,
timeout time.Duration,
deleteLocalData bool,
) {
defer wg.Done()
err := RunKubectl(kubectlOptions, "drain", nodeID, "--ignore-daemonsets", "--timeout", timeout.String())

args := []string{"drain", nodeID, "--ignore-daemonsets", "--timeout", timeout.String()}

if deleteLocalData {
args = append(args, "--delete-local-data")
}

err := RunKubectl(kubectlOptions, args...)

errChannel <- NodeDrainError{NodeID: nodeID, Error: err}
}

Expand Down

0 comments on commit 956f129

Please sign in to comment.