Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update lambda-manager code to handle ThrottlingException [CDS-1331] #153

Merged
merged 7 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/lambda-manager/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## lambda-manager

## 2.0.4 / 1-07-2024
### 🧰 Bug fixes 🧰
- Add config to boto3, so the lambda could handle ThrottlingException, update error handling in the lambda.

## 2.0.3 / 26-06-2024
### 💡 Enhancements 💡
- Add a new parameter LogGroupPermissionPreFix, when defined the lambda will not create permission for each log group, but 1 permission for the prefix defined in the parameter.
Expand Down
1 change: 1 addition & 0 deletions src/lambda-manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Environment variables:
| DESTINATION_TYPE | Type of destination (Lambda or Firehose) | | :heavy_check_mark: |
| SCAN_OLD_LOGGROUPS | When set to true the lambda will scan all existing log group and add the ones that match the RegexPattern as a trigger, the scan will only happen on the creation of the lambda after that it will only detect a new log group. | false | |
| LogGroupPermissionPreFix | Instead of creating one permission for each log group in the destination lambda, the code will take the prefix that you set in the parameter and create 1 permission for all of the log groups that match the prefix, for example if you will define "/aws/log/logs" than the lambda will create only 1 permission for all of your log groups that start with /aws/log/logs instead of 1 permision for each of the log group. use this parameter when you have more than 50 log groups. Pay attention that you will not see the log groups as a trigger in the lambda if you use this parameter. | n/a | |
| AWSApiRequestsLimit | In case you got an error in the lambda which is related to ThrottlingException, then you can increase the limit of the requests that the lambda can do to the AWS API using this variable. | 10 | |
| FunctionMemorySize | The maximum allocated memory this lambda may consume. The default value is the minimum recommended setting please consult coralogix support before changing. | 1024 | |
| FunctionTimeout | The maximum time in seconds the function may be allowed to run. The default value is the minimum recommended setting please consult coralogix support before changing. | 300 | |
| NotificationEmail | Failure notification email address | | |
Expand Down
155 changes: 98 additions & 57 deletions src/lambda-manager/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,20 @@
import re
import uuid
import cfnresponse
from botocore.config import Config

cloudwatch_logs = boto3.client('logs')
config = Config(
retries = {
'max_attempts': int(os.environ.get('AWS_API_REUESTS_LIMIT', 10)),
'mode': 'standard'
}
)

cloudwatch_logs = boto3.client('logs', config=config)

def lambda_handler(event, context):
status = cfnresponse.SUCCESS
lambda_client = boto3.client('lambda')
lambda_client = boto3.client('lambda', config=config)
try:
regex_pattern_list = os.environ.get('REGEX_PATTERN').split(',')
destination_type = os.environ.get('DESTINATION_TYPE')
Expand All @@ -20,56 +28,53 @@ def lambda_handler(event, context):
log_group_permission_prefix = os.environ.get('LOG_GROUP_PERMISSION_PREFIX', '').split(',')
region = context.invoked_function_arn.split(":")[3]
account_id = context.invoked_function_arn.split(":")[4]
log_exist_in_regex_pattern = False

if "RequestType" in event and event['RequestType'] == 'Create' and log_group_permission_prefix != ['']:
print("Addning permissions in creation")
add_permissions_first_time(destination_arn, log_group_permission_prefix, region, account_id)

print(f"Scanning all log groups: {scan_all_log_groups}")
if scan_all_log_groups == 'true' and "RequestType" in event:
if scan_all_log_groups == 'true' and "RequestType" in event and event['RequestType'] == 'Create':
print(f"Scanning all log groups: {scan_all_log_groups}")
list_log_groups_and_subscriptions(cloudwatch_logs, regex_pattern_list, logs_filter, destination_arn, role_arn, filter_name, context,log_group_permission_prefix)
update_scan_all_log_groups_status(context, lambda_client)

elif scan_all_log_groups == 'true':
scan_all_log_groups = 'false'
update_scan_all_log_groups_status(context, lambda_client)

function_name = context.function_name

# Fetch the current function configuration
current_config = lambda_client.get_function_configuration(FunctionName=function_name)
current_env_vars = current_config['Environment']['Variables']

# Update the environment variables
current_env_vars['SCAN_OLD_LOGGROUPS'] = 'false'

# Update the Lambda function configuration
try:
response = lambda_client.update_function_configuration(
FunctionName=function_name,
Environment={'Variables': current_env_vars}
)
print("Updated environment variables:", response['Environment']['Variables'])
except Exception as e:
print("Error updating function configuration:", e)
status = cfnresponse.FAILED
elif scan_all_log_groups != 'true' and "RequestType" not in event:
if scan_all_log_groups != 'true' and "RequestType" not in event:
log_group_to_subscribe = event['detail']['requestParameters']['logGroupName']
print(f"Log Group: {log_group_to_subscribe}")
for regex_pattern in regex_pattern_list:
if regex_pattern and re.match(regex_pattern, log_group_to_subscribe):
log_exist_in_regex_pattern = True
if destination_type == 'firehose':
print(f"Adding subscription filter for {log_group_to_subscribe}")
status = add_subscription(filter_name, logs_filter, log_group_to_subscribe, destination_arn, role_arn)
status = add_subscription(filter_name, logs_filter, log_group_to_subscribe, destination_arn)
if status == cfnresponse.FAILED:
print(f"retrying to add subscription filter for {log_group_to_subscribe}")
add_subscription(filter_name, logs_filter, log_group_to_subscribe, destination_arn)
elif destination_type == 'lambda':
try:
if not check_if_log_group_exist_in_log_group_permission_prefix(log_group_to_subscribe, log_group_permission_prefix):
print("Adding permission to lambda")
add_permission_to_lambda(destination_arn, log_group_to_subscribe, region, account_id)
print(f"Adding subscription filter for {log_group_to_subscribe}")
status = add_subscription(filter_name, logs_filter, log_group_to_subscribe, destination_arn)
if status == cfnresponse.FAILED:
print(f"retrying to add subscription filter for {log_group_to_subscribe}")
add_permission_to_lambda(destination_arn, log_group_to_subscribe, region, account_id)
add_subscription(filter_name, logs_filter, log_group_to_subscribe, destination_arn)
except Exception as e:
print(f"Failed to put subscription filter for {log_group_to_subscribe}: {e}")
status = cfnresponse.FAILED
else:
print(f"Invalid destination type {destination_type}")
status = cfnresponse.FAILED
else:
print(f"Loggroup {log_group_to_subscribe} excluded")

if not log_exist_in_regex_pattern:
print(f"Loggroup {log_group_to_subscribe} excluded")
except Exception as e:
print(f"Failed with exception: {e}")
status = cfnresponse.FAILED
Expand All @@ -87,7 +92,6 @@ def lambda_handler(event, context):
def list_log_groups_and_subscriptions(cloudwatch_logs, regex_pattern_list, logs_filter, destination_arn, role_arn, filter_name, context,log_group_permission_prefix):
'''Scan for all of the log groups in the region and add subscription to the log groups that match the regex pattern, this function will only run 1 time'''
log_groups = []
create_subscription = False
response = {'nextToken': None} # Initialize with a dict containing nextToken as None
print("Scanning all log groups")
while response.get('nextToken') is not None or 'logGroups' not in response:
Expand All @@ -99,34 +103,48 @@ def list_log_groups_and_subscriptions(cloudwatch_logs, regex_pattern_list, logs_
region = context.invoked_function_arn.split(":")[3]
account_id = context.invoked_function_arn.split(":")[4]
for log_group in log_groups:
create_subscription = False
log_group_name = log_group['logGroupName']

subscriptions = cloudwatch_logs.describe_subscription_filters(logGroupName=log_group_name)
subscriptions = subscriptions.get('subscriptionFilters')

if subscriptions == None:
create_subscription = True
elif len(subscriptions) < 2:
create_subscription = True
for subscription in subscriptions:
if subscription['destinationArn'] == destination_arn:
print(f" Subscription already exists for {log_group_name}")
create_subscription = False
break

if create_subscription:
for regex_pattern in regex_pattern_list:
if regex_pattern and re.match(regex_pattern, log_group_name):
print(f"Log Group: {log_group_name}")
if identify_arn_service(destination_arn) == "lambda":
if not check_if_log_group_exist_in_log_group_permission_prefix(log_group_name, log_group_permission_prefix):
add_permission_to_lambda(destination_arn, log_group_name, region, account_id)
print(f"Adding subscription filter for {log_group_name}")
add_subscription(filter_name, logs_filter, log_group_name, destination_arn)
else:
print(f"Adding subscription filter for {log_group_name}")
add_subscription(filter_name, logs_filter, log_group_name, destination_arn, role_arn)
break # no need to continue the loop if we find a match for the log group

for regex_pattern in regex_pattern_list:
if regex_pattern and re.match(regex_pattern, log_group_name):

subscriptions = cloudwatch_logs.describe_subscription_filters(logGroupName=log_group_name)
subscriptions = subscriptions.get('subscriptionFilters')

if subscriptions == None:
create_subscription = True

elif len(subscriptions) < 2:
create_subscription = True
for subscription in subscriptions:
if subscription['destinationArn'] == destination_arn:
print(f" Subscription already exists for {log_group_name}")
create_subscription = False
break

elif len(subscriptions) >= 2:
print(f" Skipping {log_group_name} as it already has 2 subscriptions")
continue

if create_subscription:
print(f"Log Group: {log_group_name}")
if identify_arn_service(destination_arn) == "lambda":
if not check_if_log_group_exist_in_log_group_permission_prefix(log_group_name, log_group_permission_prefix):
add_permission_to_lambda(destination_arn, log_group_name, region, account_id)
print(f"Adding subscription filter for {log_group_name}")
status = add_subscription(filter_name, logs_filter, log_group_name, destination_arn)
if status == cfnresponse.FAILED:
print(f"retrying to add subscription filter for {log_group_name}")
add_permission_to_lambda(destination_arn, log_group_name, region, account_id)
add_subscription(filter_name, logs_filter, log_group_name, destination_arn)
else:
print(f"Adding subscription filter for {log_group_name}")
status = add_subscription(filter_name, logs_filter, log_group_name, destination_arn)
if status == cfnresponse.FAILED:
print(f"retrying to add subscription filter for {log_group_name}")
add_subscription(filter_name, logs_filter, log_group_name, destination_arn)
break # no need to continue the loop if we find a match for the log group

def add_subscription(filter_name: str, logs_filter: str, log_group_to_subscribe: str, destination_arn: str, role_arn: str = None) -> str:
'''Add subscription to CloudWatch log group'''
Expand All @@ -146,14 +164,15 @@ def add_subscription(filter_name: str, logs_filter: str, log_group_to_subscribe:
filterPattern=logs_filter,
logGroupName=log_group_to_subscribe,
)
print("Successfully put subscription filter for", log_group_to_subscribe)
return cfnresponse.SUCCESS
except Exception as e:
print(f"Failed to put subscription filter for {log_group_to_subscribe}: {e}")
return cfnresponse.FAILED

def add_permissions_first_time(destination_arn: str, log_group_permission_prefix: list[str], region: str, account_id: str):
'''Add permissions to the lambda on the creation of the lambda function for the first time'''
lambda_client = boto3.client('lambda')
lambda_client = boto3.client('lambda', config=config)
for prefix in log_group_permission_prefix:
try:
lambda_client.add_permission(
Expand All @@ -168,7 +187,7 @@ def add_permissions_first_time(destination_arn: str, log_group_permission_prefix

def add_permission_to_lambda(destination_arn: str, log_group_name: str, region: str, account_id: str):
'''In case that the log group is not part of the log_group_permission_prefix then add permissions for it to the lambda function'''
lambda_client = boto3.client('lambda')
lambda_client = boto3.client('lambda', config=config)
try:
lambda_client.add_permission(
FunctionName=destination_arn,
Expand Down Expand Up @@ -200,3 +219,25 @@ def identify_arn_service(arn: str) -> str:
return "firehose"
else:
return "Unknown AWS Service"

def update_scan_all_log_groups_status(context, lambda_client):

function_name = context.function_name

# Fetch the current function configuration
current_config = lambda_client.get_function_configuration(FunctionName=function_name)
current_env_vars = current_config['Environment']['Variables']

# Update the environment variables
current_env_vars['SCAN_OLD_LOGGROUPS'] = 'false'

# Update the Lambda function configuration
try:
response = lambda_client.update_function_configuration(
FunctionName=function_name,
Environment={'Variables': current_env_vars}
)
print("Updated environment variables:", response['Environment']['Variables'])
except Exception as e:
print("Error updating function configuration:", e)
status = cfnresponse.FAILED
8 changes: 7 additions & 1 deletion src/lambda-manager/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Metadata:
- cloudwatch
- lambda
HomePageUrl: https://coralogix.com
SemanticVersion: 2.0.3
SemanticVersion: 2.0.4
SourceCodeUrl: https://github.com/coralogix/coralogix-aws-serverless
AWS::CloudFormation::Interface:
ParameterGroups:
Expand Down Expand Up @@ -62,6 +62,10 @@ Parameters:
Type: String
Description: instead creating one permission for each log group in the destination lambda, the code will take the prefix that you set in the parameter and create 1 permission for all of the log groups that match the prefix
Default: ""
AWSApiRequestsLimit:
Type: Number
Description: In case you got an error in the lambda which is related to ThrottlingException, then you can increase the limit of the requests that the lambda can do to the AWS API.
Default: 10
FunctionMemorySize:
Type: Number
Description: Lambda function memory limit
Expand Down Expand Up @@ -138,6 +142,8 @@ Resources:
Ref: ScanOldLogGroups
LOG_GROUP_PERMISSION_PREFIX:
Ref: LogGroupPermissionPreFix
AWS_API_REUESTS_LIMIT:
Ref: AWSApiRequestsLimit
Policies:
- Statement:
- Sid: CXLambdaUpdateConfig
Expand Down
Loading