diff --git a/cmd/replaceautoscalinggroupinstances.go b/cmd/replaceautoscalinggroupinstances.go index 63c309d..a8d18b6 100644 --- a/cmd/replaceautoscalinggroupinstances.go +++ b/cmd/replaceautoscalinggroupinstances.go @@ -35,7 +35,7 @@ launches new ones.`, func replaceAutoScalingGroupInstances(cmd *cobra.Command, args []string) error { name, _ := replaceAutoScalingGroupInstancesCmd.Flags().GetString("auto-scaling-group-name") - cluster, _ := replaceAutoScalingGroupInstancesCmd.Flags().GetString("cluster") + clusterName, _ := replaceAutoScalingGroupInstancesCmd.Flags().GetString("cluster") batchSize, _ := replaceAutoScalingGroupInstancesCmd.Flags().GetInt32("batch-size") cfg, err := newConfig(cmd.Context()) @@ -48,12 +48,13 @@ func replaceAutoScalingGroupInstances(cmd *cobra.Command, args []string) error { return newRuntimeError("failed to initialize a AutoScalingGroup: %w", err) } - drainer, err := capacity.NewDrainer(cluster, batchSize, ecs.NewFromConfig(cfg)) + ecsSvc := ecs.NewFromConfig(cfg) + drainer, err := capacity.NewDrainer(clusterName, batchSize, ecsSvc) if err != nil { return newRuntimeError("failed to initialize a Drainer: %w", err) } - if err := asg.ReplaceInstances(cmd.Context(), drainer); err != nil { + if err := asg.ReplaceInstances(cmd.Context(), drainer, capacity.NewCluster(clusterName, ecsSvc)); err != nil { return newRuntimeError("failed to replace instances: %w", err) } return nil diff --git a/internal/capacity/autoscalinggroup.go b/internal/capacity/autoscalinggroup.go index 4269da8..d9ec9ca 100644 --- a/internal/capacity/autoscalinggroup.go +++ b/internal/capacity/autoscalinggroup.go @@ -39,7 +39,7 @@ func NewAutoScalingGroup(name string, asSvc AutoScalingAPI, ec2Svc EC2API) (*Aut return &asg, nil } -func (asg *AutoScalingGroup) ReplaceInstances(ctx context.Context, drainer Drainer) error { +func (asg *AutoScalingGroup) ReplaceInstances(ctx context.Context, drainer Drainer, cluster Cluster) error { oldInstanceIDs := make([]string, 0) baseTime := asg.StateSavedAt if baseTime == nil { @@ -60,7 +60,13 @@ func (asg *AutoScalingGroup) ReplaceInstances(ctx context.Context, drainer Drain return xerrors.Errorf("failed to launch new instances: %w", err) } - if err := asg.terminateInstances(ctx, *asg.DesiredCapacity-*asg.OriginalDesiredCapacity, drainer); err != nil { + newInstanceCount := *asg.DesiredCapacity - *asg.OriginalDesiredCapacity + log.Printf("Wait for all the new instances to be registered in the cluster %q\n", cluster.Name()) + if err := cluster.WaitUntilContainerInstancesRegistered(ctx, int(newInstanceCount), asg.StateSavedAt); err != nil { + return xerrors.Errorf("failed to wait until container instances are registered: %w", err) + } + + if err := asg.terminateInstances(ctx, newInstanceCount, drainer); err != nil { return xerrors.Errorf("failed to terminate instances: %w", err) } diff --git a/internal/capacity/autoscalinggroup_test.go b/internal/capacity/autoscalinggroup_test.go index c223468..adbf2ef 100644 --- a/internal/capacity/autoscalinggroup_test.go +++ b/internal/capacity/autoscalinggroup_test.go @@ -425,6 +425,8 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) { asMock := capacitymock.NewMockAutoScalingAPI(ctrl) ec2Mock := capacitymock.NewMockEC2API(ctrl) drainerMock := capacitymock.NewMockDrainer(ctrl) + clusterMock := capacitymock.NewMockCluster(ctrl) + clusterMock.EXPECT().Name() now := time.Now().UTC() stateSavedAt := now.Format(time.RFC3339) @@ -448,11 +450,13 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) { }, }, nil), + // For fetchInstances ec2Mock.EXPECT().DescribeInstances(ctx, gomock.Any()).Return(&ec2.DescribeInstancesOutput{ Reservations: oldReservations, }, nil), expectLaunchNewInstances(t, ctx, asMock, tt.oldInstances, tt.newInstances, tt.desiredCapacity, tt.maxSize, stateSavedAt), + clusterMock.EXPECT().WaitUntilContainerInstancesRegistered(ctx, len(tt.newInstances), gomock.AssignableToTypeOf(&time.Time{})), expectTerminateInstances(t, ctx, asMock, ec2Mock, drainerMock, tt.oldInstances, tt.newInstances, oldReservations, newReservations, tt.desiredCapacity, tt.maxSize), expectRestoreState(t, ctx, asMock, tt.desiredCapacity, tt.maxSize, stateSavedAt), ) @@ -462,7 +466,7 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) { t.Fatal(err) } - if err := group.ReplaceInstances(ctx, drainerMock); err != nil { + if err := group.ReplaceInstances(ctx, drainerMock, clusterMock); err != nil { t.Errorf("err = %#v; want nil", err) } }) @@ -480,6 +484,8 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) { asMock := capacitymock.NewMockAutoScalingAPI(ctrl) ec2Mock := capacitymock.NewMockEC2API(ctrl) drainerMock := capacitymock.NewMockDrainer(ctrl) + clusterMock := capacitymock.NewMockCluster(ctrl) + clusterMock.EXPECT().Name() now := time.Now().UTC() stateSavedAt := now.Format(time.RFC3339) @@ -527,11 +533,13 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) { }, }, nil), + // For fetchInstances ec2Mock.EXPECT().DescribeInstances(ctx, gomock.Any()).Return(&ec2.DescribeInstancesOutput{ Reservations: oldReservations, }, nil), expectLaunchNewInstances(t, ctx, asMock, oldInstances, newInstances, desiredCapacity, maxSize, stateSavedAt), + clusterMock.EXPECT().WaitUntilContainerInstancesRegistered(ctx, len(newInstances), gomock.AssignableToTypeOf(&time.Time{})), expectTerminateInstances(t, ctx, asMock, ec2Mock, drainerMock, instancesToTerminate, instancesToKeep, reservationsToTerminate, reservationsToKeep, desiredCapacity, maxSize), expectRestoreState(t, ctx, asMock, desiredCapacity, maxSize, stateSavedAt), ) @@ -541,12 +549,12 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) { t.Fatal(err) } - if err := group.ReplaceInstances(ctx, drainerMock); err != nil { + if err := group.ReplaceInstances(ctx, drainerMock, clusterMock); err != nil { t.Errorf("err = %#v; want nil", err) } }) - t.Run("replacement is already finished", func(t *testing.T) { + t.Run("replacement has already finished", func(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -558,6 +566,8 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) { asMock := capacitymock.NewMockAutoScalingAPI(ctrl) ec2Mock := capacitymock.NewMockEC2API(ctrl) drainerMock := capacitymock.NewMockDrainer(ctrl) + clusterMock := capacitymock.NewMockCluster(ctrl) + clusterMock.EXPECT().Name() now := time.Now().UTC() stateSavedAt := now.Format(time.RFC3339) @@ -570,6 +580,7 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) { gomock.InOrder( asMock.EXPECT().DescribeAutoScalingGroups(ctx, gomock.Any()).Return(&autoscaling.DescribeAutoScalingGroupsOutput{ AutoScalingGroups: []autoscalingtypes.AutoScalingGroup{ + // NOTE: If replacement has already finished, DesiredCapacity is equal to OriginalDesiredCapacity { AutoScalingGroupName: aws.String("autoscaling-group-name"), AvailabilityZones: []string{ @@ -584,10 +595,12 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) { }, }, nil), + // For fetchInstances ec2Mock.EXPECT().DescribeInstances(ctx, gomock.Any()).Return(&ec2.DescribeInstancesOutput{ Reservations: createReservations(instances, now), }, nil), + clusterMock.EXPECT().WaitUntilContainerInstancesRegistered(ctx, 0, gomock.AssignableToTypeOf(&time.Time{})), expectRestoreState(t, ctx, asMock, desiredCapacity, maxSize, stateSavedAt), ) @@ -596,7 +609,7 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) { t.Fatal(err) } - if err := group.ReplaceInstances(ctx, drainerMock); err != nil { + if err := group.ReplaceInstances(ctx, drainerMock, clusterMock); err != nil { t.Errorf("err = %#v; want nil", err) } }) @@ -614,6 +627,7 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) { ec2Mock := capacitymock.NewMockEC2API(ctrl) drainerMock := capacitymock.NewMockDrainer(ctrl) clusterMock := capacitymock.NewMockCluster(ctrl) + clusterMock.EXPECT().Name() now := time.Now().UTC() stateSavedAt := now.Format(time.RFC3339) @@ -630,27 +644,34 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) { ) newReservations := createReservations(newInstances, now) + asg := autoscalingtypes.AutoScalingGroup{ + AutoScalingGroupName: aws.String("autoscaling-group-name"), + AvailabilityZones: []string{ + "ap-northeast-1a", + "ap-northeast-1c", + }, + DesiredCapacity: aws.Int32(int32(len(oldInstances) + len(newInstances))), + Instances: append(oldInstances, newInstances...), + MaxSize: aws.Int32(int32(len(oldInstances) + len(newInstances))), + Tags: createTagDescriptions(desiredCapacity, maxSize, stateSavedAt), + } + gomock.InOrder( - asMock.EXPECT().DescribeAutoScalingGroups(ctx, gomock.Any()).Times(2).Return(&autoscaling.DescribeAutoScalingGroupsOutput{ - AutoScalingGroups: []autoscalingtypes.AutoScalingGroup{ - { - AutoScalingGroupName: aws.String("autoscaling-group-name"), - AvailabilityZones: []string{ - "ap-northeast-1a", - "ap-northeast-1c", - }, - DesiredCapacity: aws.Int32(int32(len(oldInstances) + len(newInstances))), - Instances: append(oldInstances, newInstances...), - MaxSize: aws.Int32(int32(len(oldInstances) + len(newInstances))), - Tags: createTagDescriptions(desiredCapacity, maxSize, stateSavedAt), - }, - }, + asMock.EXPECT().DescribeAutoScalingGroups(ctx, gomock.Any()).Return(&autoscaling.DescribeAutoScalingGroupsOutput{ + AutoScalingGroups: []autoscalingtypes.AutoScalingGroup{asg}, }, nil), + // For fetchInstances ec2Mock.EXPECT().DescribeInstances(ctx, gomock.Any()).Return(&ec2.DescribeInstancesOutput{ - Reservations: oldReservations, + Reservations: append(oldReservations, newReservations...), + }, nil), + + // For launchNewInstances that calls waitUntilInstancesInService only once + asMock.EXPECT().DescribeAutoScalingGroups(ctx, gomock.Any()).Return(&autoscaling.DescribeAutoScalingGroupsOutput{ + AutoScalingGroups: []autoscalingtypes.AutoScalingGroup{asg}, }, nil), + clusterMock.EXPECT().WaitUntilContainerInstancesRegistered(ctx, len(newInstances), gomock.AssignableToTypeOf(&time.Time{})), expectTerminateInstances(t, ctx, asMock, ec2Mock, drainerMock, oldInstances, newInstances, oldReservations, newReservations, desiredCapacity, maxSize), expectRestoreState(t, ctx, asMock, desiredCapacity, maxSize, stateSavedAt), ) @@ -660,7 +681,7 @@ func TestAutoScalingGroup_ReplaceInstances(t *testing.T) { t.Fatal(err) } - if err := group.ReplaceInstances(ctx, drainerMock); err != nil { + if err := group.ReplaceInstances(ctx, drainerMock, clusterMock); err != nil { t.Errorf("err = %#v; want nil", err) } }) diff --git a/internal/capacity/cluster.go b/internal/capacity/cluster.go new file mode 100644 index 0000000..556cd9c --- /dev/null +++ b/internal/capacity/cluster.go @@ -0,0 +1,71 @@ +package capacity + +import ( + "context" + "fmt" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/ecs" + "golang.org/x/xerrors" +) + +type Cluster interface { + Name() string + WaitUntilContainerInstancesRegistered(context.Context, int, *time.Time) error +} + +type cluster struct { + name string + ecsSvc ECSAPI +} + +func NewCluster(name string, ecsSvc ECSAPI) Cluster { + return &cluster{ + name: name, + ecsSvc: ecsSvc, + } +} + +func (c *cluster) Name() string { + return c.name +} + +func (c *cluster) WaitUntilContainerInstancesRegistered(ctx context.Context, count int, registeredAt *time.Time) error { + if count == 0 { + return nil + } + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + timeout := 5 * time.Minute + timer := time.NewTimer(timeout) + defer timer.Stop() + + params := &ecs.ListContainerInstancesInput{ + Cluster: aws.String(c.name), + Filter: aws.String(fmt.Sprintf("registeredAt >= %s", registeredAt.UTC().Format(time.RFC3339))), + } + for { + foundCount := 0 + paginator := ecs.NewListContainerInstancesPaginator(c.ecsSvc, params) + for paginator.HasMorePages() { + page, err := paginator.NextPage(ctx) + if err != nil { + return xerrors.Errorf("failed to list container instances: %w", err) + } + foundCount += len(page.ContainerInstanceArns) + } + if foundCount == count { + return nil + } + + select { + case <-ticker.C: + continue + case <-timer.C: + return xerrors.Errorf("%d container instances expect to be registered but only %d instances were registered within %v", count, foundCount, timeout) + } + } +} diff --git a/internal/capacity/cluster_test.go b/internal/capacity/cluster_test.go new file mode 100644 index 0000000..ceef879 --- /dev/null +++ b/internal/capacity/cluster_test.go @@ -0,0 +1,36 @@ +package capacity + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/abicky/ecsmec/internal/testing/capacitymock" + "github.com/aws/aws-sdk-go-v2/service/ecs" + "go.uber.org/mock/gomock" +) + +func TestCluster_WaitUntilContainerInstancesRegistered(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx := context.Background() + + ecsMock := capacitymock.NewMockECSAPI(ctrl) + + ecsMock.EXPECT().ListContainerInstances(ctx, gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, params *ecs.ListContainerInstancesInput, _ ...func(options *ecs.Options)) (*ecs.ListContainerInstancesOutput, error) { + return &ecs.ListContainerInstancesOutput{ + ContainerInstanceArns: []string{ + fmt.Sprintf("arn:aws:ecs:ap-northeast-1:1234:container-instance/test/xxxxxxxxxx"), + }, + }, nil + }) + + cluster := NewCluster("cluster", ecsMock) + now := time.Now() + if err := cluster.WaitUntilContainerInstancesRegistered(ctx, 1, &now); err != nil { + t.Errorf("err = %#v; want nil", err) + } +} diff --git a/internal/capacity/drainer.go b/internal/capacity/drainer.go index 6ca56a1..06e427b 100644 --- a/internal/capacity/drainer.go +++ b/internal/capacity/drainer.go @@ -55,7 +55,7 @@ func (d *drainer) Drain(ctx context.Context, instanceIDs []string) error { processedCount += len(instances) arns := make([]*string, len(instances)) - fmt.Printf("Drain the following container instances in the cluster \"%s\":\n", d.cluster) + fmt.Printf("Drain the following %d container instances in the cluster \"%s\":\n", len(instances), d.cluster) for i, instance := range instances { arns[i] = instance.ContainerInstanceArn fmt.Printf("\t%s (%s)\n", getContainerInstanceID(*instance.ContainerInstanceArn), *instance.Ec2InstanceId) @@ -72,7 +72,7 @@ func (d *drainer) Drain(ctx context.Context, instanceIDs []string) error { return xerrors.Errorf("no target instances exist in the cluster \"%s\"", d.cluster) } if processedCount != len(instanceIDs) { - return xerrors.Errorf("%d instances should be drained but only %d instances was drained", len(instanceIDs), processedCount) + return xerrors.Errorf("%d instances should be drained but only %d instances were drained", len(instanceIDs), processedCount) } return nil @@ -250,7 +250,7 @@ func (d *drainer) processContainerInstances(ctx context.Context, instanceIDs []s } if err := callback(resp.ContainerInstances); err != nil { - return xerrors.Errorf("failed to list container instances: %w", err) + return xerrors.Errorf("failed to execute the callback: %w", err) } } diff --git a/internal/testing/capacitymock/generate.go b/internal/testing/capacitymock/generate.go index 947e1e0..9c81f8a 100644 --- a/internal/testing/capacitymock/generate.go +++ b/internal/testing/capacitymock/generate.go @@ -1,3 +1,3 @@ package capacitymock -//go:generate mockgen -package capacitymock -destination mocks.go github.com/abicky/ecsmec/internal/capacity AutoScalingAPI,Drainer,EC2API,ECSAPI,Poller,SQSAPI +//go:generate mockgen -package capacitymock -destination mocks.go github.com/abicky/ecsmec/internal/capacity AutoScalingAPI,Drainer,EC2API,ECSAPI,Poller,SQSAPI,Cluster