From dde8bbcd9b204a28c481033aca144d8d6a80ce67 Mon Sep 17 00:00:00 2001 From: William Ronchetti Date: Fri, 9 Aug 2024 11:29:08 -0400 Subject: [PATCH] finalize swap --- src/commands/identity_swap.py | 156 ++++++++++++++++++++++++++-------- 1 file changed, 119 insertions(+), 37 deletions(-) diff --git a/src/commands/identity_swap.py b/src/commands/identity_swap.py index 3fc4ebb6..5e9c4ab5 100644 --- a/src/commands/identity_swap.py +++ b/src/commands/identity_swap.py @@ -3,6 +3,7 @@ import io import json import sys +from botocore.exceptions import ClientError from typing import List, Union from ..base import ConfigManager from ..base import Settings @@ -28,7 +29,7 @@ def download_config(*, bucket, key): return json.loads(stream.getvalue()) -def upload_config(*, bucket, key, data: Union[str, dict], query=True): +def upload_config(*, bucket, key, data: Union[str, dict], query=True, kms_key=None): """ Uploads config intended for GLOBAL_ENV_BUCKET Note that this does not support S3_ENCRYPT_KEY_ID at the moment """ @@ -41,7 +42,13 @@ def upload_config(*, bucket, key, data: Union[str, dict], query=True): data = json.dumps(data, indent=2, default=str) + "\n" stream = io.BytesIO(data.encode('utf-8')) s3 = boto3.client('s3') - s3.upload_fileobj(Fileobj=stream, Bucket=bucket, Key=key) + if not key: + s3.upload_fileobj(Fileobj=stream, Bucket=bucket, Key=key) + else: + + s3.upload_fileobj(Fileobj=stream, Bucket=bucket, Key=key, + ExtraArgs={'ServerSideEncryption': 'aws:kms', + 'SSEKMSKeyId': kms_key}) PRINT("Uploaded.") else: PRINT("NOT uploaded.") @@ -67,6 +74,8 @@ class C4IdentitySwap: MAIN_ECOSYSTEM = 'main.ecosystem' BLUE_ECOSYSTEM = 'blue.ecosystem' GREEN_ECOSYSTEM = 'green.ecosystem' + SCALE_OUT_COUNT = 32 + SCALE_IN_COUNT = 8 @staticmethod def unseparate(identifier: str) -> str: @@ -150,6 +159,35 @@ def _pretty_print_swap_plan(swap_plan: dict) -> None: short_task_name = task_definition.split('/')[-1] PRINT(f' {short_service_name} -----> {short_task_name}') + @staticmethod + def extract_resource_id_from_arn(service_arn): + """ Pulls a resource ID from ECS service arn + Example service ARN: arn:aws:ecs:region:account-id:service/cluster-name/service-name + """ + arn_parts = service_arn.split('/') + if len(arn_parts) == 3: + cluster_name = arn_parts[1] + service_name = arn_parts[2] + resource_id = f'service/{cluster_name}/{service_name}' + return resource_id + else: + raise ValueError(f"Invalid ECS service ARN: {service_arn}") + + @staticmethod + def _register_scalable_target(autoscaling_client: boto3.client, resource_id: str, + min_capacity: int = 4, max_capacity: int = 48): + """ Registers a scalable target for a given ECS service """ + try: + autoscaling_client.register_scalable_target( + ServiceNamespace='ecs', + ResourceId=resource_id, + ScalableDimension='ecs:service:DesiredCount', + MinCapacity=min_capacity, + MaxCapacity=max_capacity + ) + except ClientError as e: + PRINT(f'Resource {resource_id} may already be registered: {e}') + @staticmethod def _delete_scaling_policies(autoscaling_client, resource_id): """ Helper function that deletes existing scaling policies, since these cannot be modified in place """ @@ -168,9 +206,9 @@ def _delete_scaling_policies(autoscaling_client, resource_id): @staticmethod def _create_scaling_policy(autoscaling_client: boto3.client, policy_name: str, - ecs_service_arn: str, alarm_arn: str, scaling_adjustment: int) -> None: - """ Creates a single scaling policy for use with exact capacity and existing CW alarms """ - autoscaling_client.put_scaling_policy( + ecs_service_arn: str, scaling_adjustment: int): + """ Creates a single scaling policy for use with exact capacity """ + return autoscaling_client.put_scaling_policy( PolicyName=policy_name, ServiceNamespace='ecs', ResourceId=ecs_service_arn, @@ -186,12 +224,45 @@ def _create_scaling_policy(autoscaling_client: boto3.client, policy_name: str, ], 'Cooldown': 300, 'MetricAggregationType': 'Average' - }, - Alarms=[ - {'AlarmName': alarm_arn} - ] + } ) + @staticmethod + def _update_cloudwatch_alarm(cloudwatch_client: boto3.client, alarm_arn: str, scaling_policy_arn: str, + scale_out=True): + """ Creates (or replaces) a CW alarm with a new one pointing to a particular scaling policy """ + alarm_name = alarm_arn.split(':')[-1] + # Retrieve the existing alarm configuration + alarm = cloudwatch_client.describe_alarms(AlarmNames=[alarm_name])['MetricAlarms'][0] + + cloudwatch_client.put_metric_alarm( + AlarmName=alarm_arn.split(':')[-1], # just take name, rest of ARN is implied + AlarmActions=[ + scaling_policy_arn # queue target is constant but application target is swapped + ], + MetricName=alarm['MetricName'], + Namespace=alarm['Namespace'], + Statistic=alarm['Statistic'], + Period=alarm['Period'], + EvaluationPeriods=alarm['EvaluationPeriods'], + Threshold=alarm['Threshold'], + Dimensions=alarm['Dimensions'], + ComparisonOperator='GreaterThanOrEqualToThreshold' if scale_out else 'LessThanOrEqualToThreshold' + ) + + @classmethod + def update_autoscaling_action(cls, autoscaling_client: boto3.client, cloudwatch_client: boto3.client, + policy_name: str, ecs_service_arn: str, + scaling_adjustment: int, alarm_arn: str): + """ Creates a new scaling policy and associates it with a cloudwatch alarm """ + scaling_policy_resp = cls._create_scaling_policy(autoscaling_client, policy_name, ecs_service_arn, + scaling_adjustment) + policy_arn = scaling_policy_resp['PolicyARN'] + if scaling_adjustment > cls.SCALE_IN_COUNT: + cls._update_cloudwatch_alarm(cloudwatch_client, alarm_arn, policy_arn) + else: + cls._update_cloudwatch_alarm(cloudwatch_client, alarm_arn, policy_arn, scale_out=False) + class CGAPIdentitySwap(C4IdentitySwap): """ Not implemented, as we do not do blue/green for CGAP. """ @@ -204,6 +275,7 @@ class SMaHTIdentitySwap(C4IdentitySwap): has been harmonized, so this issue is no longer a problem. """ GLOBAL_ENV_BUCKET = 'smaht-production-foursight-envs' + SMAHT_KMS_KEY_ID = '9777cd71-4b5b-44b7-a8a0-de107c667c64' # These constants refer to alarms created as part of this repo - if changed, the autoscaling updates # will be deleted and not updated! @@ -212,8 +284,6 @@ class SMaHTIdentitySwap(C4IdentitySwap): GREEN_SCALE_OUT_ALARM = f'{ARN_PREFIX}c4-ecs-blue-green-smaht-production-stack-IndexingQueueDepthAlarmgreen-TFMKK6TS1NPK' BLUE_SCALE_IN_ALARM = f'{ARN_PREFIX}c4-ecs-blue-green-smaht-production-stack-IndexingQueueEmptyAlarmblue-1UFIC7AOA2S10' GREEN_SCALE_IN_ALARM = f'{ARN_PREFIX}c4-ecs-blue-green-smaht-production-stack-IndexingQueueEmptyAlarmgreen-R95GUEMYUEBG' - SCALE_OUT_COUNT = 32 - SCALE_IN_COUNT = 8 @staticmethod def _is_mirror_state(service_mapping: dict): @@ -281,56 +351,68 @@ def _update_foursight(cls) -> None: # Get the current prod env name current_main = download_config(bucket=cls.GLOBAL_ENV_BUCKET, key=cls.MAIN_ECOSYSTEM) - current_prd_env_name = current_main['prd_env_name'] - if current_prd_env_name == 'smaht-production-green': # green is data --> we swapped to blue + current_prd_ecosystem = current_main['ecosystem'] + if current_prd_ecosystem == cls.GREEN_ECOSYSTEM: # green is data --> we swapped to blue new_ecosystem = cls.BLUE_ECOSYSTEM else: # blue is data --> we swapped to green new_ecosystem = cls.GREEN_ECOSYSTEM - swapped_data = download_config(bucket=cls.GLOBAL_ENV_BUCKET, key=new_ecosystem) - upload_config(bucket=cls.GLOBAL_ENV_BUCKET, key=cls.MAIN_ECOSYSTEM, data=swapped_data) + swapped_data = { + 'ecosystem': new_ecosystem + } + upload_config(bucket=cls.GLOBAL_ENV_BUCKET, key=cls.MAIN_ECOSYSTEM, data=swapped_data, + kms_key=cls.SMAHT_KMS_KEY_ID) @classmethod - def _update_autoscaling(cls, autoscaling_client: boto3.client, swap_plan: dict) -> None: + def _update_autoscaling(cls, autoscaling_client: boto3.client, cloudwatch_client: boto3.client, + swap_plan: dict) -> None: """ Updates the Indexer worker autoscaling configuration to point to the correct CW Alarms This function assumes the hard coded alarms already exist. It first deletes then re-creates the scaling policies on the applicable services """ - blue_indexer_service_arn = list(filter(lambda k: cls.INDEXER in k and 'smahtblue' in k, swap_plan.keys()))[0] - green_indexer_service_arn = list(filter(lambda k: cls.INDEXER in k and 'smahtgreen' in k, swap_plan.keys()))[0] + blue_indexer_service_arn = list(filter(lambda k: cls.INDEXER in k and 'smahtblue' in k.lower(), swap_plan.keys()))[0] + green_indexer_service_arn = list(filter(lambda k: cls.INDEXER in k and 'smahtgreen' in k.lower(), swap_plan.keys()))[0] + blue_indexer_service_resource_id = cls.extract_resource_id_from_arn(blue_indexer_service_arn) + green_indexer_service_resource_id = cls.extract_resource_id_from_arn(green_indexer_service_arn) mirror_state = cls._is_mirror_state(swap_plan) # if True, we just went from green --> blue + # Ensure indexer services are registered as scalable targets + # Should only run once, but may need to rerun if you want to + cls._register_scalable_target(autoscaling_client, blue_indexer_service_resource_id) + cls._register_scalable_target(autoscaling_client, green_indexer_service_resource_id) + # Delete all scaling policies from the indexer services, since we will be replacing them - cls._delete_scaling_policies(autoscaling_client, blue_indexer_service_arn) - cls._delete_scaling_policies(autoscaling_client, green_indexer_service_arn) + cls._delete_scaling_policies(autoscaling_client, blue_indexer_service_resource_id) + cls._delete_scaling_policies(autoscaling_client, green_indexer_service_resource_id) # if in mirror state, prod is blue, so the green services point to blue tasks, so should point to blue alarms if mirror_state: - cls._create_scaling_policy(autoscaling_client, 'DataIndexerScaleOut', green_indexer_service_arn, - cls.BLUE_SCALE_OUT_ALARM, cls.SCALE_OUT_COUNT) - cls._create_scaling_policy(autoscaling_client, 'DataIndexerScaleIn', green_indexer_service_arn, - cls.BLUE_SCALE_IN_ALARM, cls.SCALE_IN_COUNT) - cls._create_scaling_policy(autoscaling_client, 'StagingIndexerScaleOut', blue_indexer_service_arn, - cls.GREEN_SCALE_OUT_ALARM, cls.SCALE_OUT_COUNT) - cls._create_scaling_policy(autoscaling_client, 'StagingIndexerScaleIn', blue_indexer_service_arn, - cls.GREEN_SCALE_IN_ALARM, cls.SCALE_IN_COUNT) + cls.update_autoscaling_action(autoscaling_client, cloudwatch_client, 'DataIndexerScaleOut', + green_indexer_service_resource_id, cls.SCALE_OUT_COUNT, cls.BLUE_SCALE_OUT_ALARM) + cls.update_autoscaling_action(autoscaling_client, cloudwatch_client, 'DataIndexerScaleIn', + green_indexer_service_resource_id, cls.SCALE_IN_COUNT, cls.BLUE_SCALE_IN_ALARM) + cls.update_autoscaling_action(autoscaling_client, cloudwatch_client, 'StagingIndexerScaleOut', + blue_indexer_service_resource_id, cls.SCALE_OUT_COUNT, cls.GREEN_SCALE_OUT_ALARM) + cls.update_autoscaling_action(autoscaling_client, cloudwatch_client, 'StagingIndexerScaleIn', + blue_indexer_service_resource_id, cls.SCALE_IN_COUNT, cls.GREEN_SCALE_IN_ALARM) # if we are not in mirror state, then green services point to green tasks and we want to track # green queues else: - cls._create_scaling_policy(autoscaling_client, 'DataIndexerScaleOut', green_indexer_service_arn, - cls.GREEN_SCALE_OUT_ALARM, cls.SCALE_OUT_COUNT) - cls._create_scaling_policy(autoscaling_client, 'DataIndexerScaleIn', green_indexer_service_arn, - cls.GREEN_SCALE_IN_ALARM, cls.SCALE_IN_COUNT) - cls._create_scaling_policy(autoscaling_client, 'StagingIndexerScaleOut', blue_indexer_service_arn, - cls.BLUE_SCALE_OUT_ALARM, cls.SCALE_OUT_COUNT) - cls._create_scaling_policy(autoscaling_client, 'StagingIndexerScaleIn', blue_indexer_service_arn, - cls.BLUE_SCALE_IN_ALARM, cls.SCALE_IN_COUNT) + cls.update_autoscaling_action(autoscaling_client, cloudwatch_client, 'DataIndexerScaleOut', + green_indexer_service_resource_id, cls.SCALE_OUT_COUNT, cls.GREEN_SCALE_OUT_ALARM) + cls.update_autoscaling_action(autoscaling_client, cloudwatch_client, 'DataIndexerScaleIn', + green_indexer_service_resource_id, cls.SCALE_IN_COUNT, cls.GREEN_SCALE_IN_ALARM) + cls.update_autoscaling_action(autoscaling_client, cloudwatch_client, 'StagingIndexerScaleOut', + blue_indexer_service_resource_id, cls.SCALE_OUT_COUNT, cls.BLUE_SCALE_OUT_ALARM) + cls.update_autoscaling_action(autoscaling_client, cloudwatch_client, 'StagingIndexerScaleIn', + blue_indexer_service_resource_id, cls.SCALE_IN_COUNT, cls.BLUE_SCALE_IN_ALARM) @classmethod def identity_swap(cls, *, blue: str, green: str) -> None: """ Top level execution of the identity swap """ ecs = ECSUtils() autoscaling = boto3.client('application-autoscaling') + cw = boto3.client('cloudwatch') app_kind = ConfigManager.get_config_setting(Settings.APP_KIND) if app_kind != 'smaht': raise IdentitySwapSetupError(f'{app_kind} is not supported - must be smaht') @@ -353,7 +435,7 @@ def identity_swap(cls, *, blue: str, green: str) -> None: # Update indexer task autoscaling triggers PRINT(f'Updating indexer autoscaling to reflect new state') - cls._update_autoscaling(autoscaling, swap_plan) + cls._update_autoscaling(autoscaling, cw, swap_plan) else: PRINT(f'Swap plan NOT executed - exiting with no further action')