Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update lambda-manager code to handle ThrottlingException [CDS-1331] #153

Merged
merged 7 commits into from
Jul 1, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add retries to boto3 config
  • Loading branch information
guyrenny committed Jul 1, 2024
commit e2c7c2d82f1dc221ddbcaa3ad8f4e8c7ab28d770
152 changes: 95 additions & 57 deletions src/lambda-manager/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,20 @@
import re
import uuid
import cfnresponse
from botocore.config import Config

cloudwatch_logs = boto3.client('logs')
config = Config(
retries = {
'max_attempts': 10,
'mode': 'standard'
}
)

cloudwatch_logs = boto3.client('logs', config=config)

def lambda_handler(event, context):
status = cfnresponse.SUCCESS
lambda_client = boto3.client('lambda')
lambda_client = boto3.client('lambda', config=config)
try:
regex_pattern_list = os.environ.get('REGEX_PATTERN').split(',')
destination_type = os.environ.get('DESTINATION_TYPE')
Expand All @@ -20,56 +28,52 @@ def lambda_handler(event, context):
log_group_permission_prefix = os.environ.get('LOG_GROUP_PERMISSION_PREFIX', '').split(',')
region = context.invoked_function_arn.split(":")[3]
account_id = context.invoked_function_arn.split(":")[4]
log_exist_in_regex_pattern = False

if "RequestType" in event and event['RequestType'] == 'Create' and log_group_permission_prefix != ['']:
print("Addning permissions in creation")
add_permissions_first_time(destination_arn, log_group_permission_prefix, region, account_id)

print(f"Scanning all log groups: {scan_all_log_groups}")
if scan_all_log_groups == 'true' and "RequestType" in event:
if scan_all_log_groups == 'true' and "RequestType" in event and event['RequestType'] == 'Create':
print(f"Scanning all log groups: {scan_all_log_groups}")
list_log_groups_and_subscriptions(cloudwatch_logs, regex_pattern_list, logs_filter, destination_arn, role_arn, filter_name, context,log_group_permission_prefix)
update_scan_all_log_groups_status(context, lambda_client)

elif scan_all_log_groups == 'true':
scan_all_log_groups = 'false'
update_scan_all_log_groups_status(context, lambda_client)

function_name = context.function_name

# Fetch the current function configuration
current_config = lambda_client.get_function_configuration(FunctionName=function_name)
current_env_vars = current_config['Environment']['Variables']

# Update the environment variables
current_env_vars['SCAN_OLD_LOGGROUPS'] = 'false'

# Update the Lambda function configuration
try:
response = lambda_client.update_function_configuration(
FunctionName=function_name,
Environment={'Variables': current_env_vars}
)
print("Updated environment variables:", response['Environment']['Variables'])
except Exception as e:
print("Error updating function configuration:", e)
status = cfnresponse.FAILED
elif scan_all_log_groups != 'true' and "RequestType" not in event:
if scan_all_log_groups != 'true' and "RequestType" not in event:
log_group_to_subscribe = event['detail']['requestParameters']['logGroupName']
print(f"Log Group: {log_group_to_subscribe}")
for regex_pattern in regex_pattern_list:
if regex_pattern and re.match(regex_pattern, log_group_to_subscribe):
log_exist_in_regex_pattern = True
if destination_type == 'firehose':
print(f"Adding subscription filter for {log_group_to_subscribe}")
status = add_subscription(filter_name, logs_filter, log_group_to_subscribe, destination_arn, role_arn)
status = add_subscription(filter_name, logs_filter, log_group_to_subscribe, destination_arn)
if status == cfnresponse.FAILED:
print(f"retrying to add subscription filter for {log_group_to_subscribe}")
add_subscription(filter_name, logs_filter, log_group_to_subscribe, destination_arn)
elif destination_type == 'lambda':
try:
if not check_if_log_group_exist_in_log_group_permission_prefix(log_group_to_subscribe, log_group_permission_prefix):
print("Adding permission to lambda")
add_permission_to_lambda(destination_arn, log_group_to_subscribe, region, account_id)
print(f"Adding subscription filter for {log_group_to_subscribe}")
status = add_subscription(filter_name, logs_filter, log_group_to_subscribe, destination_arn)
if status == cfnresponse.FAILED:
print(f"retrying to add subscription filter for {log_group_to_subscribe}")
add_subscription(filter_name, logs_filter, log_group_to_subscribe, destination_arn)
except Exception as e:
print(f"Failed to put subscription filter for {log_group_to_subscribe}: {e}")
status = cfnresponse.FAILED
else:
print(f"Invalid destination type {destination_type}")
status = cfnresponse.FAILED
else:
print(f"Loggroup {log_group_to_subscribe} excluded")

if not log_exist_in_regex_pattern:
print(f"Loggroup {log_group_to_subscribe} excluded")
except Exception as e:
print(f"Failed with exception: {e}")
status = cfnresponse.FAILED
Expand All @@ -87,7 +91,6 @@ def lambda_handler(event, context):
def list_log_groups_and_subscriptions(cloudwatch_logs, regex_pattern_list, logs_filter, destination_arn, role_arn, filter_name, context,log_group_permission_prefix):
'''Scan for all of the log groups in the region and add subscription to the log groups that match the regex pattern, this function will only run 1 time'''
log_groups = []
create_subscription = False
response = {'nextToken': None} # Initialize with a dict containing nextToken as None
print("Scanning all log groups")
while response.get('nextToken') is not None or 'logGroups' not in response:
Expand All @@ -99,34 +102,47 @@ def list_log_groups_and_subscriptions(cloudwatch_logs, regex_pattern_list, logs_
region = context.invoked_function_arn.split(":")[3]
account_id = context.invoked_function_arn.split(":")[4]
for log_group in log_groups:
create_subscription = False
log_group_name = log_group['logGroupName']

subscriptions = cloudwatch_logs.describe_subscription_filters(logGroupName=log_group_name)
subscriptions = subscriptions.get('subscriptionFilters')

if subscriptions == None:
create_subscription = True
elif len(subscriptions) < 2:
create_subscription = True
for subscription in subscriptions:
if subscription['destinationArn'] == destination_arn:
print(f" Subscription already exists for {log_group_name}")
create_subscription = False
break

if create_subscription:
for regex_pattern in regex_pattern_list:
if regex_pattern and re.match(regex_pattern, log_group_name):
print(f"Log Group: {log_group_name}")
if identify_arn_service(destination_arn) == "lambda":
if not check_if_log_group_exist_in_log_group_permission_prefix(log_group_name, log_group_permission_prefix):
add_permission_to_lambda(destination_arn, log_group_name, region, account_id)
print(f"Adding subscription filter for {log_group_name}")
add_subscription(filter_name, logs_filter, log_group_name, destination_arn)
else:
print(f"Adding subscription filter for {log_group_name}")
add_subscription(filter_name, logs_filter, log_group_name, destination_arn, role_arn)
break # no need to continue the loop if we find a match for the log group

for regex_pattern in regex_pattern_list:
if regex_pattern and re.match(regex_pattern, log_group_name):

subscriptions = cloudwatch_logs.describe_subscription_filters(logGroupName=log_group_name)
subscriptions = subscriptions.get('subscriptionFilters')

if subscriptions == None:
create_subscription = True

elif len(subscriptions) < 2:
create_subscription = True
for subscription in subscriptions:
if subscription['destinationArn'] == destination_arn:
print(f" Subscription already exists for {log_group_name}")
create_subscription = False
break

elif len(subscriptions) >= 2:
print(f" Skipping {log_group_name} as it already has 2 subscriptions")
continue

if create_subscription:
print(f"Log Group: {log_group_name}")
if identify_arn_service(destination_arn) == "lambda":
if not check_if_log_group_exist_in_log_group_permission_prefix(log_group_name, log_group_permission_prefix):
add_permission_to_lambda(destination_arn, log_group_name, region, account_id)
print(f"Adding subscription filter for {log_group_name}")
status = add_subscription(filter_name, logs_filter, log_group_name, destination_arn)
if status == cfnresponse.FAILED:
print(f"retrying to add subscription filter for {log_group_name}")
add_subscription(filter_name, logs_filter, log_group_name, destination_arn)
else:
print(f"Adding subscription filter for {log_group_name}")
status = add_subscription(filter_name, logs_filter, log_group_name, destination_arn)
if status == cfnresponse.FAILED:
print(f"retrying to add subscription filter for {log_group_name}")
add_subscription(filter_name, logs_filter, log_group_name, destination_arn)
break # no need to continue the loop if we find a match for the log group

def add_subscription(filter_name: str, logs_filter: str, log_group_to_subscribe: str, destination_arn: str, role_arn: str = None) -> str:
'''Add subscription to CloudWatch log group'''
Expand All @@ -153,7 +169,7 @@ def add_subscription(filter_name: str, logs_filter: str, log_group_to_subscribe:

def add_permissions_first_time(destination_arn: str, log_group_permission_prefix: list[str], region: str, account_id: str):
'''Add permissions to the lambda on the creation of the lambda function for the first time'''
lambda_client = boto3.client('lambda')
lambda_client = boto3.client('lambda', config=config)
for prefix in log_group_permission_prefix:
try:
lambda_client.add_permission(
Expand All @@ -168,7 +184,7 @@ def add_permissions_first_time(destination_arn: str, log_group_permission_prefix

def add_permission_to_lambda(destination_arn: str, log_group_name: str, region: str, account_id: str):
'''In case that the log group is not part of the log_group_permission_prefix then add permissions for it to the lambda function'''
lambda_client = boto3.client('lambda')
lambda_client = boto3.client('lambda', config=config)
try:
lambda_client.add_permission(
FunctionName=destination_arn,
Expand Down Expand Up @@ -200,3 +216,25 @@ def identify_arn_service(arn: str) -> str:
return "firehose"
else:
return "Unknown AWS Service"

def update_scan_all_log_groups_status(context, lambda_client):

function_name = context.function_name

# Fetch the current function configuration
current_config = lambda_client.get_function_configuration(FunctionName=function_name)
current_env_vars = current_config['Environment']['Variables']

# Update the environment variables
current_env_vars['SCAN_OLD_LOGGROUPS'] = 'false'

# Update the Lambda function configuration
try:
response = lambda_client.update_function_configuration(
FunctionName=function_name,
Environment={'Variables': current_env_vars}
)
print("Updated environment variables:", response['Environment']['Variables'])
except Exception as e:
print("Error updating function configuration:", e)
status = cfnresponse.FAILED
Loading