Skip to content

Commit

Permalink
finalize swap
Browse files Browse the repository at this point in the history
  • Loading branch information
willronchetti committed Aug 9, 2024
1 parent 901b625 commit dde8bbc
Showing 1 changed file with 119 additions and 37 deletions.
156 changes: 119 additions & 37 deletions src/commands/identity_swap.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io
import json
import sys
from botocore.exceptions import ClientError
from typing import List, Union
from ..base import ConfigManager
from ..base import Settings
Expand All @@ -28,7 +29,7 @@ def download_config(*, bucket, key):
return json.loads(stream.getvalue())


def upload_config(*, bucket, key, data: Union[str, dict], query=True):
def upload_config(*, bucket, key, data: Union[str, dict], query=True, kms_key=None):
""" Uploads config intended for GLOBAL_ENV_BUCKET
Note that this does not support S3_ENCRYPT_KEY_ID at the moment
"""
Expand All @@ -41,7 +42,13 @@ def upload_config(*, bucket, key, data: Union[str, dict], query=True):
data = json.dumps(data, indent=2, default=str) + "\n"
stream = io.BytesIO(data.encode('utf-8'))
s3 = boto3.client('s3')
s3.upload_fileobj(Fileobj=stream, Bucket=bucket, Key=key)
if not key:
s3.upload_fileobj(Fileobj=stream, Bucket=bucket, Key=key)
else:

s3.upload_fileobj(Fileobj=stream, Bucket=bucket, Key=key,
ExtraArgs={'ServerSideEncryption': 'aws:kms',
'SSEKMSKeyId': kms_key})
PRINT("Uploaded.")
else:
PRINT("NOT uploaded.")
Expand All @@ -67,6 +74,8 @@ class C4IdentitySwap:
MAIN_ECOSYSTEM = 'main.ecosystem'
BLUE_ECOSYSTEM = 'blue.ecosystem'
GREEN_ECOSYSTEM = 'green.ecosystem'
SCALE_OUT_COUNT = 32
SCALE_IN_COUNT = 8

@staticmethod
def unseparate(identifier: str) -> str:
Expand Down Expand Up @@ -150,6 +159,35 @@ def _pretty_print_swap_plan(swap_plan: dict) -> None:
short_task_name = task_definition.split('/')[-1]
PRINT(f' {short_service_name} -----> {short_task_name}')

@staticmethod
def extract_resource_id_from_arn(service_arn):
""" Pulls a resource ID from ECS service arn
Example service ARN: arn:aws:ecs:region:account-id:service/cluster-name/service-name
"""
arn_parts = service_arn.split('/')
if len(arn_parts) == 3:
cluster_name = arn_parts[1]
service_name = arn_parts[2]
resource_id = f'service/{cluster_name}/{service_name}'
return resource_id
else:
raise ValueError(f"Invalid ECS service ARN: {service_arn}")

@staticmethod
def _register_scalable_target(autoscaling_client: boto3.client, resource_id: str,
min_capacity: int = 4, max_capacity: int = 48):
""" Registers a scalable target for a given ECS service """
try:
autoscaling_client.register_scalable_target(
ServiceNamespace='ecs',
ResourceId=resource_id,
ScalableDimension='ecs:service:DesiredCount',
MinCapacity=min_capacity,
MaxCapacity=max_capacity
)
except ClientError as e:
PRINT(f'Resource {resource_id} may already be registered: {e}')

@staticmethod
def _delete_scaling_policies(autoscaling_client, resource_id):
""" Helper function that deletes existing scaling policies, since these cannot be modified in place """
Expand All @@ -168,9 +206,9 @@ def _delete_scaling_policies(autoscaling_client, resource_id):

@staticmethod
def _create_scaling_policy(autoscaling_client: boto3.client, policy_name: str,
ecs_service_arn: str, alarm_arn: str, scaling_adjustment: int) -> None:
""" Creates a single scaling policy for use with exact capacity and existing CW alarms """
autoscaling_client.put_scaling_policy(
ecs_service_arn: str, scaling_adjustment: int):
""" Creates a single scaling policy for use with exact capacity """
return autoscaling_client.put_scaling_policy(
PolicyName=policy_name,
ServiceNamespace='ecs',
ResourceId=ecs_service_arn,
Expand All @@ -186,12 +224,45 @@ def _create_scaling_policy(autoscaling_client: boto3.client, policy_name: str,
],
'Cooldown': 300,
'MetricAggregationType': 'Average'
},
Alarms=[
{'AlarmName': alarm_arn}
]
}
)

@staticmethod
def _update_cloudwatch_alarm(cloudwatch_client: boto3.client, alarm_arn: str, scaling_policy_arn: str,
scale_out=True):
""" Creates (or replaces) a CW alarm with a new one pointing to a particular scaling policy """
alarm_name = alarm_arn.split(':')[-1]
# Retrieve the existing alarm configuration
alarm = cloudwatch_client.describe_alarms(AlarmNames=[alarm_name])['MetricAlarms'][0]

cloudwatch_client.put_metric_alarm(
AlarmName=alarm_arn.split(':')[-1], # just take name, rest of ARN is implied
AlarmActions=[
scaling_policy_arn # queue target is constant but application target is swapped
],
MetricName=alarm['MetricName'],
Namespace=alarm['Namespace'],
Statistic=alarm['Statistic'],
Period=alarm['Period'],
EvaluationPeriods=alarm['EvaluationPeriods'],
Threshold=alarm['Threshold'],
Dimensions=alarm['Dimensions'],
ComparisonOperator='GreaterThanOrEqualToThreshold' if scale_out else 'LessThanOrEqualToThreshold'
)

@classmethod
def update_autoscaling_action(cls, autoscaling_client: boto3.client, cloudwatch_client: boto3.client,
policy_name: str, ecs_service_arn: str,
scaling_adjustment: int, alarm_arn: str):
""" Creates a new scaling policy and associates it with a cloudwatch alarm """
scaling_policy_resp = cls._create_scaling_policy(autoscaling_client, policy_name, ecs_service_arn,
scaling_adjustment)
policy_arn = scaling_policy_resp['PolicyARN']
if scaling_adjustment > cls.SCALE_IN_COUNT:
cls._update_cloudwatch_alarm(cloudwatch_client, alarm_arn, policy_arn)
else:
cls._update_cloudwatch_alarm(cloudwatch_client, alarm_arn, policy_arn, scale_out=False)


class CGAPIdentitySwap(C4IdentitySwap):
""" Not implemented, as we do not do blue/green for CGAP. """
Expand All @@ -204,6 +275,7 @@ class SMaHTIdentitySwap(C4IdentitySwap):
has been harmonized, so this issue is no longer a problem.
"""
GLOBAL_ENV_BUCKET = 'smaht-production-foursight-envs'
SMAHT_KMS_KEY_ID = '9777cd71-4b5b-44b7-a8a0-de107c667c64'

# These constants refer to alarms created as part of this repo - if changed, the autoscaling updates
# will be deleted and not updated!
Expand All @@ -212,8 +284,6 @@ class SMaHTIdentitySwap(C4IdentitySwap):
GREEN_SCALE_OUT_ALARM = f'{ARN_PREFIX}c4-ecs-blue-green-smaht-production-stack-IndexingQueueDepthAlarmgreen-TFMKK6TS1NPK'
BLUE_SCALE_IN_ALARM = f'{ARN_PREFIX}c4-ecs-blue-green-smaht-production-stack-IndexingQueueEmptyAlarmblue-1UFIC7AOA2S10'
GREEN_SCALE_IN_ALARM = f'{ARN_PREFIX}c4-ecs-blue-green-smaht-production-stack-IndexingQueueEmptyAlarmgreen-R95GUEMYUEBG'
SCALE_OUT_COUNT = 32
SCALE_IN_COUNT = 8

@staticmethod
def _is_mirror_state(service_mapping: dict):
Expand Down Expand Up @@ -281,56 +351,68 @@ def _update_foursight(cls) -> None:

# Get the current prod env name
current_main = download_config(bucket=cls.GLOBAL_ENV_BUCKET, key=cls.MAIN_ECOSYSTEM)
current_prd_env_name = current_main['prd_env_name']
if current_prd_env_name == 'smaht-production-green': # green is data --> we swapped to blue
current_prd_ecosystem = current_main['ecosystem']
if current_prd_ecosystem == cls.GREEN_ECOSYSTEM: # green is data --> we swapped to blue
new_ecosystem = cls.BLUE_ECOSYSTEM
else: # blue is data --> we swapped to green
new_ecosystem = cls.GREEN_ECOSYSTEM
swapped_data = download_config(bucket=cls.GLOBAL_ENV_BUCKET, key=new_ecosystem)
upload_config(bucket=cls.GLOBAL_ENV_BUCKET, key=cls.MAIN_ECOSYSTEM, data=swapped_data)
swapped_data = {
'ecosystem': new_ecosystem
}
upload_config(bucket=cls.GLOBAL_ENV_BUCKET, key=cls.MAIN_ECOSYSTEM, data=swapped_data,
kms_key=cls.SMAHT_KMS_KEY_ID)

@classmethod
def _update_autoscaling(cls, autoscaling_client: boto3.client, swap_plan: dict) -> None:
def _update_autoscaling(cls, autoscaling_client: boto3.client, cloudwatch_client: boto3.client,
swap_plan: dict) -> None:
""" Updates the Indexer worker autoscaling configuration to point to the correct CW Alarms
This function assumes the hard coded alarms already exist.
It first deletes then re-creates the scaling policies on the applicable services
"""
blue_indexer_service_arn = list(filter(lambda k: cls.INDEXER in k and 'smahtblue' in k, swap_plan.keys()))[0]
green_indexer_service_arn = list(filter(lambda k: cls.INDEXER in k and 'smahtgreen' in k, swap_plan.keys()))[0]
blue_indexer_service_arn = list(filter(lambda k: cls.INDEXER in k and 'smahtblue' in k.lower(), swap_plan.keys()))[0]
green_indexer_service_arn = list(filter(lambda k: cls.INDEXER in k and 'smahtgreen' in k.lower(), swap_plan.keys()))[0]
blue_indexer_service_resource_id = cls.extract_resource_id_from_arn(blue_indexer_service_arn)
green_indexer_service_resource_id = cls.extract_resource_id_from_arn(green_indexer_service_arn)
mirror_state = cls._is_mirror_state(swap_plan) # if True, we just went from green --> blue

# Ensure indexer services are registered as scalable targets
# Should only run once, but may need to rerun if you want to
cls._register_scalable_target(autoscaling_client, blue_indexer_service_resource_id)
cls._register_scalable_target(autoscaling_client, green_indexer_service_resource_id)

# Delete all scaling policies from the indexer services, since we will be replacing them
cls._delete_scaling_policies(autoscaling_client, blue_indexer_service_arn)
cls._delete_scaling_policies(autoscaling_client, green_indexer_service_arn)
cls._delete_scaling_policies(autoscaling_client, blue_indexer_service_resource_id)
cls._delete_scaling_policies(autoscaling_client, green_indexer_service_resource_id)

# if in mirror state, prod is blue, so the green services point to blue tasks, so should point to blue alarms
if mirror_state:
cls._create_scaling_policy(autoscaling_client, 'DataIndexerScaleOut', green_indexer_service_arn,
cls.BLUE_SCALE_OUT_ALARM, cls.SCALE_OUT_COUNT)
cls._create_scaling_policy(autoscaling_client, 'DataIndexerScaleIn', green_indexer_service_arn,
cls.BLUE_SCALE_IN_ALARM, cls.SCALE_IN_COUNT)
cls._create_scaling_policy(autoscaling_client, 'StagingIndexerScaleOut', blue_indexer_service_arn,
cls.GREEN_SCALE_OUT_ALARM, cls.SCALE_OUT_COUNT)
cls._create_scaling_policy(autoscaling_client, 'StagingIndexerScaleIn', blue_indexer_service_arn,
cls.GREEN_SCALE_IN_ALARM, cls.SCALE_IN_COUNT)
cls.update_autoscaling_action(autoscaling_client, cloudwatch_client, 'DataIndexerScaleOut',
green_indexer_service_resource_id, cls.SCALE_OUT_COUNT, cls.BLUE_SCALE_OUT_ALARM)
cls.update_autoscaling_action(autoscaling_client, cloudwatch_client, 'DataIndexerScaleIn',
green_indexer_service_resource_id, cls.SCALE_IN_COUNT, cls.BLUE_SCALE_IN_ALARM)
cls.update_autoscaling_action(autoscaling_client, cloudwatch_client, 'StagingIndexerScaleOut',
blue_indexer_service_resource_id, cls.SCALE_OUT_COUNT, cls.GREEN_SCALE_OUT_ALARM)
cls.update_autoscaling_action(autoscaling_client, cloudwatch_client, 'StagingIndexerScaleIn',
blue_indexer_service_resource_id, cls.SCALE_IN_COUNT, cls.GREEN_SCALE_IN_ALARM)

# if we are not in mirror state, then green services point to green tasks and we want to track
# green queues
else:
cls._create_scaling_policy(autoscaling_client, 'DataIndexerScaleOut', green_indexer_service_arn,
cls.GREEN_SCALE_OUT_ALARM, cls.SCALE_OUT_COUNT)
cls._create_scaling_policy(autoscaling_client, 'DataIndexerScaleIn', green_indexer_service_arn,
cls.GREEN_SCALE_IN_ALARM, cls.SCALE_IN_COUNT)
cls._create_scaling_policy(autoscaling_client, 'StagingIndexerScaleOut', blue_indexer_service_arn,
cls.BLUE_SCALE_OUT_ALARM, cls.SCALE_OUT_COUNT)
cls._create_scaling_policy(autoscaling_client, 'StagingIndexerScaleIn', blue_indexer_service_arn,
cls.BLUE_SCALE_IN_ALARM, cls.SCALE_IN_COUNT)
cls.update_autoscaling_action(autoscaling_client, cloudwatch_client, 'DataIndexerScaleOut',
green_indexer_service_resource_id, cls.SCALE_OUT_COUNT, cls.GREEN_SCALE_OUT_ALARM)
cls.update_autoscaling_action(autoscaling_client, cloudwatch_client, 'DataIndexerScaleIn',
green_indexer_service_resource_id, cls.SCALE_IN_COUNT, cls.GREEN_SCALE_IN_ALARM)
cls.update_autoscaling_action(autoscaling_client, cloudwatch_client, 'StagingIndexerScaleOut',
blue_indexer_service_resource_id, cls.SCALE_OUT_COUNT, cls.BLUE_SCALE_OUT_ALARM)
cls.update_autoscaling_action(autoscaling_client, cloudwatch_client, 'StagingIndexerScaleIn',
blue_indexer_service_resource_id, cls.SCALE_IN_COUNT, cls.BLUE_SCALE_IN_ALARM)

@classmethod
def identity_swap(cls, *, blue: str, green: str) -> None:
""" Top level execution of the identity swap """
ecs = ECSUtils()
autoscaling = boto3.client('application-autoscaling')
cw = boto3.client('cloudwatch')
app_kind = ConfigManager.get_config_setting(Settings.APP_KIND)
if app_kind != 'smaht':
raise IdentitySwapSetupError(f'{app_kind} is not supported - must be smaht')
Expand All @@ -353,7 +435,7 @@ def identity_swap(cls, *, blue: str, green: str) -> None:

# Update indexer task autoscaling triggers
PRINT(f'Updating indexer autoscaling to reflect new state')
cls._update_autoscaling(autoscaling, swap_plan)
cls._update_autoscaling(autoscaling, cw, swap_plan)

else:
PRINT(f'Swap plan NOT executed - exiting with no further action')
Expand Down

0 comments on commit dde8bbc

Please sign in to comment.