-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrds_vscale_alarm_lambda.py
323 lines (279 loc) · 13.3 KB
/
rds_vscale_alarm_lambda.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
import json
import os
import random
import re
from datetime import datetime, timezone, timedelta
from botocore.exceptions import ClientError
import boto3
size_order_str = os.environ.get("SIZE_ORDER", "[]")
SIZE_ORDER = json.loads(size_order_str)
MODIFY_COOLDOWN_PERIOD = int(os.environ.get("MODIFY_COOLDOWN_PERIOD", "900"))
MODIFYING_STATUSES = ["modifying", "storage-optimization", "creating", "rebooting"]
rds_client = boto3.client('rds')
sns_client = boto3.client('sns')
sns_topic_arn = os.environ.get('ALARMS_SNS')
def send_sns_alert(message):
"""
SNS alerting
"""
try:
sns_client.publish(
TopicArn=sns_topic_arn,
Message=message
)
print(f"SNS alert sent. Message: {message}")
except ClientError as e:
print(f"Failed to send SNS alert. Error: {e}")
def get_instance_details(client, instance_identifier):
"""
get the instance details
"""
try:
response = client.describe_db_instances(DBInstanceIdentifier=instance_identifier)
instance_info = response['DBInstances'][0]
instance_class = instance_info['DBInstanceClass']
cluster_identifier = instance_info.get('DBClusterIdentifier', None)
return instance_class, cluster_identifier
except ClientError as e:
error_message = f"Error getting the DB instance details: {e}"
print(error_message)
send_sns_alert(error_message)
return None, None
def get_cluster_version(client, cluster_identifier):
"""
get the cluster version
"""
try:
response = client.describe_db_clusters(DBClusterIdentifier=cluster_identifier)
cluster_info = response['DBClusters'][0]
return cluster_info['EngineVersion']
except ClientError as e:
error_message = f"Error getting the DB cluster version: {e}"
print(error_message)
send_sns_alert(error_message)
return None
def instance_type_sorter(instance_type):
"""
instance type sorter
"""
return SIZE_ORDER.index(instance_type) if instance_type in SIZE_ORDER else -1
def change_instance_type(client, instance_identifier, new_instance_type):
"""
change the instance type
"""
try:
response = client.modify_db_instance(
DBInstanceIdentifier=instance_identifier,
DBInstanceClass=new_instance_type,
ApplyImmediately=True
)
return response, None
except ClientError as e:
error_message = f"Error during an attempt to vertically scale the RDS instance {e}"
print(error_message)
send_sns_alert(error_message)
return None, str(e)
def get_instance_arn(client, instance_identifier):
"""
get the instance arn
"""
try:
instance_info = client.describe_db_instances(DBInstanceIdentifier=instance_identifier)
return instance_info['DBInstances'][0]['DBInstanceArn']
except ClientError as e:
error_message = f"Error getting the instance ARN for {instance_identifier}: {e}"
print(error_message)
send_sns_alert(error_message)
return None
def add_modifying_tag(client, instance_identifier):
"""
add the modifying tag and timestamp to prevent simultaneous actions at the same time
"""
instance_arn = get_instance_arn(client, instance_identifier)
if not instance_arn:
print(f"ARN not found for the instance {instance_identifier}")
return
timestamp = datetime.now(timezone.utc).isoformat()
try:
client.add_tags_to_resource(
ResourceName=instance_arn,
Tags=[{'Key': 'modifying', 'Value': 'true'},
{'Key': 'modificationTimestamp', 'Value': timestamp}
]
)
print(f"Added the 'modifying' tag to instance {instance_identifier}")
except ClientError as e:
error_message = f"Error adding the 'modifying' tag to {instance_identifier}: {e}"
print(error_message)
send_sns_alert(error_message)
def any_instance_has_modifying_tag(client, cluster_instances):
"""
check if the modifying tag exists
"""
for member in cluster_instances:
instance_arn = get_instance_arn(client, member['DBInstanceIdentifier'])
tags = rds_client.list_tags_for_resource(ResourceName=instance_arn)['TagList']
if any(tag['Key'] == 'modifying' for tag in tags):
return True
return False
def modification_timestamps(client, cluster_instances, cooldown_period):
"""
modification timestamps workflow
"""
now = datetime.now(timezone.utc)
expired_instances = []
cooldown_not_expired = False
for member in cluster_instances:
instance_identifier = member['DBInstanceIdentifier']
instance_arn = get_instance_arn(client, instance_identifier)
tags = client.list_tags_for_resource(ResourceName=instance_arn)['TagList']
for tag in tags:
if tag['Key'] == 'modificationTimestamp':
tag_timestamp = datetime.fromisoformat(tag['Value'])
time_diff_seconds = (now - tag_timestamp).total_seconds()
cooldown_seconds = timedelta(seconds=cooldown_period).total_seconds()
if time_diff_seconds >= cooldown_seconds:
expired_instances.append(instance_identifier)
else:
cooldown_not_expired = True
for instance_identifier in expired_instances:
instance_arn = get_instance_arn(rds_client, instance_identifier)
rds_client.remove_tags_from_resource(
ResourceName=instance_arn,
TagKeys=['modificationTimestamp']
)
return cooldown_not_expired
def any_member_modifying(client, cluster_instances):
"""
checking if any cluster instance is being modified already
"""
for member in cluster_instances:
instance_info = client.describe_db_instances(DBInstanceIdentifier=member['DBInstanceIdentifier'])
if instance_info['DBInstances'][0]['DBInstanceStatus'] in MODIFYING_STATUSES:
return True
return False
def lambda_handler(event, _):
"""
lambda function triggered by alarm
"""
print("Received event: " + json.dumps(event, indent=2))
try:
for record in event['Records']:
sns_message = json.loads(record['Sns']['Message'])
db_instance_identifier = None
for dimension in sns_message['Trigger']['Dimensions']:
if dimension['name'] == 'DBInstanceIdentifier':
db_instance_identifier = dimension['value']
break
if db_instance_identifier is None:
raise ValueError("DBInstanceIdentifier not found in the CloudWatch Alarm event")
_, cluster_identifier = get_instance_details(rds_client, db_instance_identifier)
if not cluster_identifier:
raise ValueError("Instance is not a part of any RDS cluster")
cluster_response = rds_client.describe_db_clusters(DBClusterIdentifier=cluster_identifier)
cluster_instances = cluster_response['DBClusters'][0]['DBClusterMembers']
if any_member_modifying(rds_client, cluster_instances):
print("At least one instance in the cluster is currently being modified.")
return
writer_instance_identifier, writer_instance_type = None, None
for member in cluster_instances:
if member['IsClusterWriter']:
writer_instance_identifier = member['DBInstanceIdentifier']
writer_instance_type, _ = get_instance_details(rds_client, writer_instance_identifier)
if writer_instance_type:
writer_size_index = SIZE_ORDER.index(writer_instance_type)
is_writer_smallest = True
for member in cluster_instances:
member_instance_type, _ = get_instance_details(rds_client, member['DBInstanceIdentifier'])
if member['DBInstanceIdentifier'] == writer_instance_identifier:
continue
member_size_index = SIZE_ORDER.index(member_instance_type)
if member_size_index < writer_size_index:
is_writer_smallest = False
break
# Ensure writer_instance_type is defined before comparing
if writer_instance_type is None:
raise ValueError("The writer instance type not found in the cluster")
for member in cluster_instances:
if not member['IsClusterWriter']:
member_instance_type, _ = get_instance_details(rds_client, member['DBInstanceIdentifier'])
if instance_type_sorter(member_instance_type) <= instance_type_sorter(writer_instance_type):
is_writer_smallest = False
# Check if any instance is being modified or has the modifying tag
if any_instance_has_modifying_tag(rds_client, cluster_instances):
print("An instance in the cluster has the 'modifying' tag.")
return
cooldown_not_expired = modification_timestamps(rds_client, cluster_instances, MODIFY_COOLDOWN_PERIOD)
if cooldown_not_expired:
message = "We tried to vertically scale the RDS instance in the cluster. However, the Cooldown period has not expired for at least one instance in the cluster."
print(message)
print(send_sns_alert)
return
if is_writer_smallest:
# Scaling up the writer
new_writer_instance_type = SIZE_ORDER[writer_size_index + 1]
print(f"Selected new instance type for the writer: {new_writer_instance_type}")
if new_writer_instance_type != writer_instance_type:
print(f"Attempting to change the instance type for {writer_instance_identifier} to {new_writer_instance_type}")
_, error = change_instance_type(rds_client, writer_instance_identifier, new_writer_instance_type)
if not error:
message = f"Changed the writer instance type to {new_writer_instance_type}"
print(message)
send_sns_alert(message)
add_modifying_tag(rds_client, writer_instance_identifier)
else:
error_message = f"Failed to change the writer instance type. Error: {error}"
print(error_message)
send_sns_alert(error_message)
else:
error_message = "The writer instance is at the maximum size already; scaling is not possible"
print(error_message)
send_sns_alert(error_message)
continue
# Process the reader instances
smallest_size = None
min_size_index = float('inf')
eligible_readers = []
for member in cluster_instances:
if not member['IsClusterWriter']:
member_instance_type, _ = get_instance_details(rds_client, member['DBInstanceIdentifier'])
member_index = SIZE_ORDER.index(member_instance_type)
if member_index < min_size_index:
min_size_index = member_index
eligible_readers = [member['DBInstanceIdentifier']]
elif member_index == min_size_index:
eligible_readers.append(member['DBInstanceIdentifier'])
if eligible_readers and min_size_index < len(SIZE_ORDER) - 1:
reader_to_scale = random.choice(eligible_readers)
new_reader_instance_type = SIZE_ORDER[min_size_index + 1]
if new_reader_instance_type != smallest_size:
print(f"Attempting to change the instance type for {reader_to_scale} to {new_reader_instance_type}")
_, error = change_instance_type(rds_client, reader_to_scale, new_reader_instance_type)
if not error:
message = f"Changed the reader instance type to {new_reader_instance_type}"
print(message)
send_sns_alert(message)
add_modifying_tag(rds_client, reader_to_scale)
else:
error_message = f"Failed to change the reader instance type. Error: {error}"
print(error_message)
send_sns_alert(error_message)
else:
error_message = "The reader instance is at the maximum size already; scaling is not possible"
print(error_message)
send_sns_alert(error_message)
else:
print("No eligible readers to scale up.")
send_sns_alert("We tried to vertically scale the RDS instance. However, the required conditions were not met.")
return {
'statusCode': 200,
'body': json.dumps("Processed instances in the cluster.")
}
except ClientError as e:
error_message = f"Failed to execute the function. Error: {str(e)}"
print(error_message)
send_sns_alert(error_message)
return {
'statusCode': 500,
'body': json.dumps(f"Failed to execute the function. Error: {str(e)}")
}