Skip to content

Commit

Permalink
Fix "N instances should be drained but only M instances was drained"
Browse files Browse the repository at this point in the history
This commit fixes the following error:

```
Error: failed to replace instances:
    github.com/abicky/ecsmec/cmd.newRuntimeError
        github.com/abicky/ecsmec/cmd/root.go:38
  - failed to terminate instances:
    github.com/abicky/ecsmec/internal/capacity.(*AutoScalingGroup).ReplaceInstances
        github.com/abicky/ecsmec/internal/capacity/autoscalinggroup.go:64
  - failed to drain instances:
    github.com/abicky/ecsmec/internal/capacity.(*AutoScalingGroup).terminateInstances
        github.com/abicky/ecsmec/internal/capacity/autoscalinggroup.go:212
  - 9 instances should be drained but only 8 instances was drained:
    github.com/abicky/ecsmec/internal/capacity.(*drainer).Drain
        github.com/abicky/ecsmec/internal/capacity/drainer.go:75
```
  • Loading branch information
abicky committed Oct 30, 2024
1 parent b6337e5 commit f9ff433
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 29 deletions.
7 changes: 4 additions & 3 deletions cmd/replaceautoscalinggroupinstances.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ launches new ones.`,

func replaceAutoScalingGroupInstances(cmd *cobra.Command, args []string) error {
name, _ := replaceAutoScalingGroupInstancesCmd.Flags().GetString("auto-scaling-group-name")
cluster, _ := replaceAutoScalingGroupInstancesCmd.Flags().GetString("cluster")
clusterName, _ := replaceAutoScalingGroupInstancesCmd.Flags().GetString("cluster")
batchSize, _ := replaceAutoScalingGroupInstancesCmd.Flags().GetInt32("batch-size")

cfg, err := newConfig(cmd.Context())
Expand All @@ -48,12 +48,13 @@ func replaceAutoScalingGroupInstances(cmd *cobra.Command, args []string) error {
return newRuntimeError("failed to initialize a AutoScalingGroup: %w", err)
}

drainer, err := capacity.NewDrainer(cluster, batchSize, ecs.NewFromConfig(cfg))
ecsSvc := ecs.NewFromConfig(cfg)
drainer, err := capacity.NewDrainer(clusterName, batchSize, ecsSvc)
if err != nil {
return newRuntimeError("failed to initialize a Drainer: %w", err)
}

if err := asg.ReplaceInstances(cmd.Context(), drainer); err != nil {
if err := asg.ReplaceInstances(cmd.Context(), drainer, capacity.NewCluster(clusterName, ecsSvc)); err != nil {
return newRuntimeError("failed to replace instances: %w", err)
}
return nil
Expand Down
10 changes: 8 additions & 2 deletions internal/capacity/autoscalinggroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewAutoScalingGroup(name string, asSvc AutoScalingAPI, ec2Svc EC2API) (*Aut
return &asg, nil
}

func (asg *AutoScalingGroup) ReplaceInstances(ctx context.Context, drainer Drainer) error {
func (asg *AutoScalingGroup) ReplaceInstances(ctx context.Context, drainer Drainer, cluster Cluster) error {
oldInstanceIDs := make([]string, 0)
baseTime := asg.StateSavedAt
if baseTime == nil {
Expand All @@ -60,7 +60,13 @@ func (asg *AutoScalingGroup) ReplaceInstances(ctx context.Context, drainer Drain
return xerrors.Errorf("failed to launch new instances: %w", err)
}

if err := asg.terminateInstances(ctx, *asg.DesiredCapacity-*asg.OriginalDesiredCapacity, drainer); err != nil {
newInstanceCount := *asg.DesiredCapacity - *asg.OriginalDesiredCapacity
log.Printf("Wait for all the new instances to be registered in the cluster %q\n", cluster.Name())
if err := cluster.WaitUntilContainerInstancesRegistered(ctx, int(newInstanceCount), asg.StateSavedAt); err != nil {
return xerrors.Errorf("failed to wait until container instances are registered: %w", err)
}

if err := asg.terminateInstances(ctx, newInstanceCount, drainer); err != nil {
return xerrors.Errorf("failed to terminate instances: %w", err)
}

Expand Down
61 changes: 41 additions & 20 deletions internal/capacity/autoscalinggroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) {
asMock := capacitymock.NewMockAutoScalingAPI(ctrl)
ec2Mock := capacitymock.NewMockEC2API(ctrl)
drainerMock := capacitymock.NewMockDrainer(ctrl)
clusterMock := capacitymock.NewMockCluster(ctrl)
clusterMock.EXPECT().Name()

now := time.Now().UTC()
stateSavedAt := now.Format(time.RFC3339)
Expand All @@ -448,11 +450,13 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) {
},
}, nil),

// For fetchInstances
ec2Mock.EXPECT().DescribeInstances(ctx, gomock.Any()).Return(&ec2.DescribeInstancesOutput{
Reservations: oldReservations,
}, nil),

expectLaunchNewInstances(t, ctx, asMock, tt.oldInstances, tt.newInstances, tt.desiredCapacity, tt.maxSize, stateSavedAt),
clusterMock.EXPECT().WaitUntilContainerInstancesRegistered(ctx, len(tt.newInstances), gomock.AssignableToTypeOf(&time.Time{})),
expectTerminateInstances(t, ctx, asMock, ec2Mock, drainerMock, tt.oldInstances, tt.newInstances, oldReservations, newReservations, tt.desiredCapacity, tt.maxSize),
expectRestoreState(t, ctx, asMock, tt.desiredCapacity, tt.maxSize, stateSavedAt),
)
Expand All @@ -462,7 +466,7 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) {
t.Fatal(err)
}

if err := group.ReplaceInstances(ctx, drainerMock); err != nil {
if err := group.ReplaceInstances(ctx, drainerMock, clusterMock); err != nil {
t.Errorf("err = %#v; want nil", err)
}
})
Expand All @@ -480,6 +484,8 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) {
asMock := capacitymock.NewMockAutoScalingAPI(ctrl)
ec2Mock := capacitymock.NewMockEC2API(ctrl)
drainerMock := capacitymock.NewMockDrainer(ctrl)
clusterMock := capacitymock.NewMockCluster(ctrl)
clusterMock.EXPECT().Name()

now := time.Now().UTC()
stateSavedAt := now.Format(time.RFC3339)
Expand Down Expand Up @@ -527,11 +533,13 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) {
},
}, nil),

// For fetchInstances
ec2Mock.EXPECT().DescribeInstances(ctx, gomock.Any()).Return(&ec2.DescribeInstancesOutput{
Reservations: oldReservations,
}, nil),

expectLaunchNewInstances(t, ctx, asMock, oldInstances, newInstances, desiredCapacity, maxSize, stateSavedAt),
clusterMock.EXPECT().WaitUntilContainerInstancesRegistered(ctx, len(newInstances), gomock.AssignableToTypeOf(&time.Time{})),
expectTerminateInstances(t, ctx, asMock, ec2Mock, drainerMock, instancesToTerminate, instancesToKeep, reservationsToTerminate, reservationsToKeep, desiredCapacity, maxSize),
expectRestoreState(t, ctx, asMock, desiredCapacity, maxSize, stateSavedAt),
)
Expand All @@ -541,12 +549,12 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) {
t.Fatal(err)
}

if err := group.ReplaceInstances(ctx, drainerMock); err != nil {
if err := group.ReplaceInstances(ctx, drainerMock, clusterMock); err != nil {
t.Errorf("err = %#v; want nil", err)
}
})

t.Run("replacement is already finished", func(t *testing.T) {
t.Run("replacement has already finished", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

Expand All @@ -558,6 +566,8 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) {
asMock := capacitymock.NewMockAutoScalingAPI(ctrl)
ec2Mock := capacitymock.NewMockEC2API(ctrl)
drainerMock := capacitymock.NewMockDrainer(ctrl)
clusterMock := capacitymock.NewMockCluster(ctrl)
clusterMock.EXPECT().Name()

now := time.Now().UTC()
stateSavedAt := now.Format(time.RFC3339)
Expand All @@ -570,6 +580,7 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) {
gomock.InOrder(
asMock.EXPECT().DescribeAutoScalingGroups(ctx, gomock.Any()).Return(&autoscaling.DescribeAutoScalingGroupsOutput{
AutoScalingGroups: []autoscalingtypes.AutoScalingGroup{
// NOTE: If replacement has already finished, DesiredCapacity is equal to OriginalDesiredCapacity
{
AutoScalingGroupName: aws.String("autoscaling-group-name"),
AvailabilityZones: []string{
Expand All @@ -584,10 +595,12 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) {
},
}, nil),

// For fetchInstances
ec2Mock.EXPECT().DescribeInstances(ctx, gomock.Any()).Return(&ec2.DescribeInstancesOutput{
Reservations: createReservations(instances, now),
}, nil),

clusterMock.EXPECT().WaitUntilContainerInstancesRegistered(ctx, 0, gomock.AssignableToTypeOf(&time.Time{})),
expectRestoreState(t, ctx, asMock, desiredCapacity, maxSize, stateSavedAt),
)

Expand All @@ -596,7 +609,7 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) {
t.Fatal(err)
}

if err := group.ReplaceInstances(ctx, drainerMock); err != nil {
if err := group.ReplaceInstances(ctx, drainerMock, clusterMock); err != nil {
t.Errorf("err = %#v; want nil", err)
}
})
Expand All @@ -614,6 +627,7 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) {
ec2Mock := capacitymock.NewMockEC2API(ctrl)
drainerMock := capacitymock.NewMockDrainer(ctrl)
clusterMock := capacitymock.NewMockCluster(ctrl)
clusterMock.EXPECT().Name()

now := time.Now().UTC()
stateSavedAt := now.Format(time.RFC3339)
Expand All @@ -630,27 +644,34 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) {
)
newReservations := createReservations(newInstances, now)

asg := autoscalingtypes.AutoScalingGroup{
AutoScalingGroupName: aws.String("autoscaling-group-name"),
AvailabilityZones: []string{
"ap-northeast-1a",
"ap-northeast-1c",
},
DesiredCapacity: aws.Int32(int32(len(oldInstances) + len(newInstances))),
Instances: append(oldInstances, newInstances...),
MaxSize: aws.Int32(int32(len(oldInstances) + len(newInstances))),
Tags: createTagDescriptions(desiredCapacity, maxSize, stateSavedAt),
}

gomock.InOrder(
asMock.EXPECT().DescribeAutoScalingGroups(ctx, gomock.Any()).Times(2).Return(&autoscaling.DescribeAutoScalingGroupsOutput{
AutoScalingGroups: []autoscalingtypes.AutoScalingGroup{
{
AutoScalingGroupName: aws.String("autoscaling-group-name"),
AvailabilityZones: []string{
"ap-northeast-1a",
"ap-northeast-1c",
},
DesiredCapacity: aws.Int32(int32(len(oldInstances) + len(newInstances))),
Instances: append(oldInstances, newInstances...),
MaxSize: aws.Int32(int32(len(oldInstances) + len(newInstances))),
Tags: createTagDescriptions(desiredCapacity, maxSize, stateSavedAt),
},
},
asMock.EXPECT().DescribeAutoScalingGroups(ctx, gomock.Any()).Return(&autoscaling.DescribeAutoScalingGroupsOutput{
AutoScalingGroups: []autoscalingtypes.AutoScalingGroup{asg},
}, nil),

// For fetchInstances
ec2Mock.EXPECT().DescribeInstances(ctx, gomock.Any()).Return(&ec2.DescribeInstancesOutput{
Reservations: oldReservations,
Reservations: append(oldReservations, newReservations...),
}, nil),

// For launchNewInstances that calls waitUntilInstancesInService only once
asMock.EXPECT().DescribeAutoScalingGroups(ctx, gomock.Any()).Return(&autoscaling.DescribeAutoScalingGroupsOutput{
AutoScalingGroups: []autoscalingtypes.AutoScalingGroup{asg},
}, nil),

clusterMock.EXPECT().WaitUntilContainerInstancesRegistered(ctx, len(newInstances), gomock.AssignableToTypeOf(&time.Time{})),
expectTerminateInstances(t, ctx, asMock, ec2Mock, drainerMock, oldInstances, newInstances, oldReservations, newReservations, desiredCapacity, maxSize),
expectRestoreState(t, ctx, asMock, desiredCapacity, maxSize, stateSavedAt),
)
Expand All @@ -660,7 +681,7 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) {
t.Fatal(err)
}

if err := group.ReplaceInstances(ctx, drainerMock); err != nil {
if err := group.ReplaceInstances(ctx, drainerMock, clusterMock); err != nil {
t.Errorf("err = %#v; want nil", err)
}
})
Expand Down
71 changes: 71 additions & 0 deletions internal/capacity/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package capacity

import (
"context"
"fmt"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ecs"
"golang.org/x/xerrors"
)

type Cluster interface {
Name() string
WaitUntilContainerInstancesRegistered(context.Context, int, *time.Time) error
}

type cluster struct {
name string
ecsSvc ECSAPI
}

func NewCluster(name string, ecsSvc ECSAPI) Cluster {
return &cluster{
name: name,
ecsSvc: ecsSvc,
}
}

func (c *cluster) Name() string {
return c.name
}

func (c *cluster) WaitUntilContainerInstancesRegistered(ctx context.Context, count int, registeredAt *time.Time) error {
if count == 0 {
return nil
}

ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

timeout := 5 * time.Minute
timer := time.NewTimer(timeout)
defer timer.Stop()

params := &ecs.ListContainerInstancesInput{
Cluster: aws.String(c.name),
Filter: aws.String(fmt.Sprintf("registeredAt >= %s", registeredAt.UTC().Format(time.RFC3339))),
}
for {
foundCount := 0
paginator := ecs.NewListContainerInstancesPaginator(c.ecsSvc, params)
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
return xerrors.Errorf("failed to list container instances: %w", err)
}
foundCount += len(page.ContainerInstanceArns)
}
if foundCount == count {
return nil
}

select {
case <-ticker.C:
continue
case <-timer.C:
return xerrors.Errorf("%d container instances expect to be registered but only %d instances were registered within %v", count, foundCount, timeout)
}
}
}
36 changes: 36 additions & 0 deletions internal/capacity/cluster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package capacity

import (
"context"
"fmt"
"testing"
"time"

"github.com/abicky/ecsmec/internal/testing/capacitymock"
"github.com/aws/aws-sdk-go-v2/service/ecs"
"go.uber.org/mock/gomock"
)

func TestCluster_WaitUntilContainerInstancesRegistered(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

ctx := context.Background()

ecsMock := capacitymock.NewMockECSAPI(ctrl)

ecsMock.EXPECT().ListContainerInstances(ctx, gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, params *ecs.ListContainerInstancesInput, _ ...func(options *ecs.Options)) (*ecs.ListContainerInstancesOutput, error) {
return &ecs.ListContainerInstancesOutput{
ContainerInstanceArns: []string{
fmt.Sprintf("arn:aws:ecs:ap-northeast-1:1234:container-instance/test/xxxxxxxxxx"),
},
}, nil
})

cluster := NewCluster("cluster", ecsMock)
now := time.Now()
if err := cluster.WaitUntilContainerInstancesRegistered(ctx, 1, &now); err != nil {
t.Errorf("err = %#v; want nil", err)
}
}
6 changes: 3 additions & 3 deletions internal/capacity/drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (d *drainer) Drain(ctx context.Context, instanceIDs []string) error {
processedCount += len(instances)

arns := make([]*string, len(instances))
fmt.Printf("Drain the following container instances in the cluster \"%s\":\n", d.cluster)
fmt.Printf("Drain the following %d container instances in the cluster \"%s\":\n", len(instances), d.cluster)
for i, instance := range instances {
arns[i] = instance.ContainerInstanceArn
fmt.Printf("\t%s (%s)\n", getContainerInstanceID(*instance.ContainerInstanceArn), *instance.Ec2InstanceId)
Expand All @@ -72,7 +72,7 @@ func (d *drainer) Drain(ctx context.Context, instanceIDs []string) error {
return xerrors.Errorf("no target instances exist in the cluster \"%s\"", d.cluster)
}
if processedCount != len(instanceIDs) {
return xerrors.Errorf("%d instances should be drained but only %d instances was drained", len(instanceIDs), processedCount)
return xerrors.Errorf("%d instances should be drained but only %d instances were drained", len(instanceIDs), processedCount)
}

return nil
Expand Down Expand Up @@ -250,7 +250,7 @@ func (d *drainer) processContainerInstances(ctx context.Context, instanceIDs []s
}

if err := callback(resp.ContainerInstances); err != nil {
return xerrors.Errorf("failed to list container instances: %w", err)
return xerrors.Errorf("failed to execute the callback: %w", err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/testing/capacitymock/generate.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package capacitymock

//go:generate mockgen -package capacitymock -destination mocks.go github.com/abicky/ecsmec/internal/capacity AutoScalingAPI,Drainer,EC2API,ECSAPI,Poller,SQSAPI
//go:generate mockgen -package capacitymock -destination mocks.go github.com/abicky/ecsmec/internal/capacity AutoScalingAPI,Drainer,EC2API,ECSAPI,Poller,SQSAPI,Cluster

0 comments on commit f9ff433

Please sign in to comment.