-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add utilities for ecs. #73
base: master
Are you sure you want to change the base?
Changes from 3 commits
3513105
a8078c0
2452161
a7bc593
8f58963
a2d6f74
acbae38
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,219 @@ | ||
package awscommons | ||
|
||
import ( | ||
"fmt" | ||
"math" | ||
"strings" | ||
"time" | ||
|
||
"github.com/aws/aws-sdk-go-v2/aws" | ||
"github.com/aws/aws-sdk-go-v2/service/ecs" | ||
ecs_types "github.com/aws/aws-sdk-go-v2/service/ecs/types" | ||
"github.com/gruntwork-io/go-commons/collections" | ||
"github.com/gruntwork-io/go-commons/errors" | ||
"github.com/gruntwork-io/go-commons/retry" | ||
) | ||
|
||
// GetContainerInstanceArns gets the container instance ARNs of all the EC2 instances in an ECS Cluster. | ||
// ECS container instance ARNs are different from EC2 instance IDs! | ||
// An ECS container instance is an EC2 instance that runs the ECS container agent and has been registered into | ||
// an ECS cluster. | ||
// Example identifiers: | ||
// - EC2 instance ID: i-08e8cfc073db135a9 | ||
// - container instance ID: 2db66342-5f69-4782-89a3-f9b707f979ab | ||
// - container instance ARN: arn:aws:ecs:us-east-1:012345678910:container-instance/2db66342-5f69-4782-89a3-f9b707f979ab | ||
func GetContainerInstanceArns(opts *Options, clusterName string) ([]string, error) { | ||
client, err := NewECSClient(opts) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if opts.Logger != nil { | ||
opts.Logger.Debugf("Looking up Container Instance ARNs for ECS cluster %s", clusterName) | ||
} | ||
|
||
input := &ecs.ListContainerInstancesInput{Cluster: aws.String(clusterName)} | ||
arns := []string{} | ||
// Handle pagination by repeatedly making the API call while there is a next token set. | ||
for { | ||
result, err := client.ListContainerInstances(opts.Context, input) | ||
if err != nil { | ||
return nil, errors.WithStackTrace(err) | ||
} | ||
arns = append(arns, result.ContainerInstanceArns...) | ||
if result.NextToken == nil { | ||
break | ||
} | ||
input.NextToken = result.NextToken | ||
} | ||
|
||
return arns, nil | ||
} | ||
|
||
// StartDrainingContainerInstances puts ECS container instances in DRAINING state so that all ECS Tasks running on | ||
// them are migrated to other container instances. Batches into chunks of 10 because of AWS API limitations. | ||
// (An error occurred InvalidParameterException when calling the UpdateContainerInstancesState | ||
// operation: instanceIds can have at most 10 items.) | ||
func StartDrainingContainerInstances(opts *Options, clusterName string, containerInstanceArns []string) error { | ||
client, err := NewECSClient(opts) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
batchSize := 10 | ||
numBatches := int(math.Ceil(float64(len(containerInstanceArns) / batchSize))) | ||
|
||
errList := NewMultipleDrainContainerInstanceErrors() | ||
for batchIdx, batchedArnList := range collections.BatchListIntoGroupsOf(containerInstanceArns, batchSize) { | ||
batchedArns := aws.StringSlice(batchedArnList) | ||
|
||
if opts.Logger != nil { | ||
opts.Logger.Debugf("Putting batch %d/%d of container instances in cluster %s into DRAINING state", batchIdx, numBatches, clusterName) | ||
} | ||
input := &ecs.UpdateContainerInstancesStateInput{ | ||
Cluster: aws.String(clusterName), | ||
ContainerInstances: aws.ToStringSlice(batchedArns), | ||
Status: "DRAINING", | ||
} | ||
_, err := client.UpdateContainerInstancesState(opts.Context, input) | ||
if err != nil { | ||
errList.AddError(err) | ||
if opts.Logger != nil { | ||
opts.Logger.Errorf("Encountered error starting to drain container instances in batch %d: %s", batchIdx, err) | ||
opts.Logger.Errorf("Container Instance ARNs: %s", strings.Join(batchedArnList, ",")) | ||
} | ||
continue | ||
} | ||
|
||
if opts.Logger != nil { | ||
opts.Logger.Debugf("Started draining %d container instances from batch %d", len(batchedArnList), batchIdx) | ||
} | ||
} | ||
|
||
if !errList.IsEmpty() { | ||
return errors.WithStackTrace(errList) | ||
} | ||
|
||
if opts.Logger != nil { | ||
opts.Logger.Debugf("Successfully started draining all %d container instances", len(containerInstanceArns)) | ||
} | ||
return nil | ||
} | ||
|
||
// WaitForContainerInstancesToDrain waits until there are no more ECS Tasks running on any of the ECS container | ||
// instances. Batches container instances in groups of 100 because of AWS API limitations. | ||
func WaitForContainerInstancesToDrain(opts *Options, clusterName string, containerInstanceArns []string, start time.Time, timeout time.Duration, maxRetries int, sleepBetweenRetries time.Duration) error { | ||
client, err := NewECSClient(opts) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if opts.Logger != nil { | ||
opts.Logger.Debugf("Checking if all ECS Tasks have been drained from the ECS Container Instances in Cluster %s.", clusterName) | ||
} | ||
|
||
batchSize := 100 | ||
numBatches := int(math.Ceil(float64(len(containerInstanceArns) / batchSize))) | ||
|
||
err = retry.DoWithRetry( | ||
opts.Logger, | ||
"Wait for Container Instances to be Drained", | ||
maxRetries, sleepBetweenRetries, | ||
func() error { | ||
responses := []*ecs.DescribeContainerInstancesOutput{} | ||
for batchIdx, batchedArnList := range collections.BatchListIntoGroupsOf(containerInstanceArns, batchSize) { | ||
batchedArns := aws.StringSlice(batchedArnList) | ||
|
||
if opts.Logger != nil { | ||
opts.Logger.Debugf("Fetching description of batch %d/%d of ECS Instances in Cluster %s.", batchIdx, numBatches, clusterName) | ||
} | ||
input := &ecs.DescribeContainerInstancesInput{ | ||
Cluster: aws.String(clusterName), | ||
ContainerInstances: aws.ToStringSlice(batchedArns), | ||
} | ||
result, err := client.DescribeContainerInstances(opts.Context, input) | ||
if err != nil { | ||
return errors.WithStackTrace(err) | ||
} | ||
responses = append(responses, result) | ||
} | ||
|
||
// If we exceeded the timeout, halt with error. | ||
if timeoutExceeded(start, timeout) { | ||
return retry.FatalError{Underlying: fmt.Errorf("maximum drain timeout of %s seconds has elapsed and instances are still draining", timeout)} | ||
} | ||
|
||
// Yay, all done. | ||
if drained, _ := allInstancesFullyDrained(responses); drained == true { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should handle the error or add comment as to why it's ok not to handle the error. |
||
if opts.Logger != nil { | ||
opts.Logger.Debugf("All container instances have been drained in Cluster %s!", clusterName) | ||
} | ||
return nil | ||
} | ||
|
||
// If there's no error, retry. | ||
if err == nil { | ||
return errors.WithStackTrace(fmt.Errorf("container instances still draining")) | ||
} | ||
|
||
// Else, there's an error, halt and fail. | ||
return retry.FatalError{Underlying: err} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This You can reduce this to return the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I see. I think the best thing is to retry in all other cases. If timeout exceeded, exit with error. If drained, return nil. All other cases, retry. |
||
}) | ||
return errors.WithStackTrace(err) | ||
} | ||
|
||
// timeoutExceeded returns true if the amount of time since start has exceeded the timeout. | ||
func timeoutExceeded(start time.Time, timeout time.Duration) bool { | ||
timeElapsed := time.Now().Sub(start) | ||
return timeElapsed > timeout | ||
} | ||
|
||
// NewECSClient returns a new AWS SDK client for interacting with AWS ECS. | ||
func NewECSClient(opts *Options) (*ecs.Client, error) { | ||
cfg, err := NewDefaultConfig(opts) | ||
if err != nil { | ||
return nil, errors.WithStackTrace(err) | ||
} | ||
return ecs.NewFromConfig(cfg), nil | ||
} | ||
|
||
func allInstancesFullyDrained(responses []*ecs.DescribeContainerInstancesOutput) (bool, error) { | ||
for _, response := range responses { | ||
instances := response.ContainerInstances | ||
if len(instances) == 0 { | ||
return false, errors.WithStackTrace(fmt.Errorf("querying DescribeContainerInstances returned no instances")) | ||
} | ||
|
||
for _, instance := range instances { | ||
if !instanceFullyDrained(instance) { | ||
return false, nil | ||
} | ||
} | ||
} | ||
return true, nil | ||
} | ||
|
||
func instanceFullyDrained(instance ecs_types.ContainerInstance) bool { | ||
instanceArn := instance.ContainerInstanceArn | ||
|
||
if *instance.Status == "ACTIVE" { | ||
if opts.Logger != nil { | ||
opts.Logger.Debugf("The ECS Container Instance %s is still in ACTIVE status", *instanceArn) | ||
} | ||
return false | ||
} | ||
if instance.PendingTasksCount > 0 { | ||
if opts.Logger != nil { | ||
opts.Logger.Debugf("The ECS Container Instance %s still has pending tasks", *instanceArn) | ||
} | ||
return false | ||
} | ||
if instance.RunningTasksCount > 0 { | ||
if opts.Logger != nil { | ||
opts.Logger.Debugf("The ECS Container Instance %s still has running tasks", *instanceArn) | ||
} | ||
return false | ||
} | ||
|
||
return true | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
package awscommons | ||
|
||
import ( | ||
"fmt" | ||
"strings" | ||
) | ||
|
||
// MultipleDrainContainerInstanceErrors represents multiple errors found while terminating instances | ||
type MultipleDrainContainerInstanceErrors struct { | ||
errors []error | ||
} | ||
|
||
func (err MultipleDrainContainerInstanceErrors) Error() string { | ||
messages := []string{ | ||
fmt.Sprintf("%d errors found while draining container instances:", len(err.errors)), | ||
} | ||
|
||
for _, individualErr := range err.errors { | ||
messages = append(messages, individualErr.Error()) | ||
} | ||
return strings.Join(messages, "\n") | ||
} | ||
|
||
func (err MultipleDrainContainerInstanceErrors) AddError(newErr error) { | ||
err.errors = append(err.errors, newErr) | ||
} | ||
|
||
func (err MultipleDrainContainerInstanceErrors) IsEmpty() bool { | ||
return len(err.errors) == 0 | ||
} | ||
|
||
func NewMultipleDrainContainerInstanceErrors() MultipleDrainContainerInstanceErrors { | ||
return MultipleDrainContainerInstanceErrors{[]error{}} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: consider adding a maximum to the loop iteration to avoid an infinite loop. Logically, the AWS API should prevent such an infinite loop, but it's always good to practice defensive coding to avoid unknown unknowns causing unexpected behaviors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that's true. I'll take care of this in the next review cycle, putting a todo here for now.