Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Even more appropriate dynamic references #3071

Merged
merged 1 commit into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/cfnlint/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ class Context:
init=True, default_factory=lambda: set(PSEUDOPARAMS)
)

# can we use dynamic references
dynamic_references: bool = field(init=True, default=False)

# Combiniation of storing any resolved ref
# and adds in any Refs available from things like Fn::Sub
ref_values: Dict[str, Any] = field(init=True, default_factory=dict)
Expand Down
8 changes: 3 additions & 5 deletions src/cfnlint/jsonschema/_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,9 @@ def _filter_schemas(self, schema, validator: Any) -> Tuple[Any, Any]:
def filter(self, validator: Any, instance: Any, schema: Any):
# Lets validate dynamic references when appropriate
if validator.is_type(instance, "string"):
if validator.context:
if validator.context.dynamic_references:
if REGEX_DYN_REF.findall(instance):
yield (instance, {"dynamicReference": schema})
return
if REGEX_DYN_REF.findall(instance):
yield (instance, {"dynamicReference": schema})
return

# dependencies, required, minProperties, maxProperties
# need to have filtered properties to validate
Expand Down
42 changes: 29 additions & 13 deletions src/cfnlint/rules/functions/DynamicReference.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from cfnlint.helpers import REGEX_DYN_REF
from cfnlint.jsonschema import ValidationError, Validator
from cfnlint.rules import CloudFormationLintRule
from cfnlint.rules.functions._BaseFn import BaseFn

_all = {
"items": [
Expand Down Expand Up @@ -62,14 +62,9 @@
}


class DynamicReference(CloudFormationLintRule):
"""
Check if Dynamic Reference Secure Strings are
only used in the correct locations
"""

class DynamicReference(BaseFn):
id = "E1050"
shortdesc = "Check dynamic references secure strings are in supported locations"
shortdesc = "Validate the structure of a dynamic reference"
description = (
"Dynamic References Secure Strings are only supported for a small set of"
" resource properties. Validate that they are being used in the correct"
Expand All @@ -79,6 +74,13 @@ class DynamicReference(CloudFormationLintRule):
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/dynamic-references.html"
tags = ["functions", "dynamic reference"]

def __init__(self) -> None:
super().__init__()
self.child_rules = {
"E1051": None,
"E1027": None,
}

def _clean_errors(self, err: ValidationError) -> ValidationError:
err.rule = self
err.path = deque([])
Expand All @@ -91,6 +93,10 @@ def dynamicReference(
if not validator.is_type(instance, "string"):
return

# SSM parameters can be used in Resources and Outputs and Parameters
# SSM secrets are only used in a small number of locations
# Secrets manager can be used only in resource properties

for v in REGEX_DYN_REF.findall(instance):
parts = v.split(":")

Expand All @@ -101,14 +107,24 @@ def dynamicReference(
found = True

if found:
return
continue

if parts[1] in ["ssm", "ssm-secure"]:
if parts[1] == "ssm":
evolved = validator.evolve(schema=_ssm)
elif parts[2] == "arn":
evolved = validator.evolve(schema=_secrets_manager_arn)
elif parts[1] == "ssm-secure":
evolved = validator.evolve(schema=_ssm)
rule = self.child_rules["E1027"]
if rule:
yield from rule.validate(validator, {}, v, schema)
else:
evolved = validator.evolve(schema=_secrets_manager)
if parts[2] == "arn":
evolved = validator.evolve(schema=_secrets_manager_arn)
else: # this is secrets manager
evolved = validator.evolve(schema=_secrets_manager)
rule = self.child_rules["E1051"]
if rule:
yield from rule.validate(validator, {}, v, schema)

for err in evolved.iter_errors(parts):
yield self._clean_errors(err)
found = True
38 changes: 38 additions & 0 deletions src/cfnlint/rules/functions/DynamicReferenceSecretsManagerPath.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from typing import Any

from cfnlint.jsonschema import ValidationError, Validator
from cfnlint.rules import CloudFormationLintRule


class DynamicReferenceSecretsManagerPath(CloudFormationLintRule):
id = "E1051"
shortdesc = (
"Validate dynamic references to secrets manager are only in resource properties"
)
description = (
"Dynamic references from secrets manager can only be used "
"in resource properties"
)
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/dynamic-references.html#dynamic-references-secretsmanager"
tags = ["functions", "dynamic reference"]

def validate(self, validator: Validator, s: Any, instance: Any, schema: Any):
if len(validator.context.path) >= 3:
if (
validator.context.path[0] == "Resources"
and validator.context.path[2] == "Properties"
):
return

yield ValidationError(
(
f"Dynamic reference {instance!r} to secrets manager can only be "
"used in resource properties"
),
rule=self,
)
138 changes: 30 additions & 108 deletions src/cfnlint/rules/functions/DynamicReferenceSecureString.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,13 @@
SPDX-License-Identifier: MIT-0
"""

import regex as re
from typing import Any

import cfnlint.helpers
from cfnlint.rules import CloudFormationLintRule, RuleMatch
from cfnlint.jsonschema import ValidationError, Validator
from cfnlint.rules.functions._BaseFn import BaseFn


class DynamicReferenceSecureString(CloudFormationLintRule):
"""
Check if Dynamic Reference Secure Strings are
only used in the correct locations
"""

class DynamicReferenceSecureString(BaseFn):
id = "E1027"
shortdesc = "Check dynamic references secure strings are in supported locations"
description = (
Expand All @@ -29,102 +24,29 @@ class DynamicReferenceSecureString(CloudFormationLintRule):
def __init__(self):
"""Init"""
super().__init__()
self.property_specs = []
self.resource_specs = []
self.exceptions = {
"AWS::DirectoryService::MicrosoftAD": "Password",
"AWS::DirectoryService::SimpleAD": "Password",
"AWS::ElastiCache::ReplicationGroup": "AuthToken",
"AWS::IAM::User": {
"LoginProfile": "Password",
},
"AWS::KinesisFirehose::DeliveryStream": {
"RedshiftDestinationConfiguration": "Password",
},
"AWS::OpsWorks::App": {
"AppSource": "Password",
},
"AWS::OpsWorks::Stack": {
"RdsDbInstances": "DbPassword",
"CustomCookbooksSource": "Password",
},
"AWS::RDS::DBCluster": "MasterUserPassword",
"AWS::RDS::DBInstance": "MasterUserPassword",
"AWS::Redshift::Cluster": "MasterUserPassword",
}

def _match_values(self, cfnelem, path):
"""Recursively search for values matching the searchRegex"""
values = []
if isinstance(cfnelem, dict):
for key in cfnelem:
pathprop = path[:]
pathprop.append(key)
values.extend(self._match_values(cfnelem[key], pathprop))
elif isinstance(cfnelem, list):
for index, item in enumerate(cfnelem):
pathprop = path[:]
pathprop.append(index)
values.extend(self._match_values(item, pathprop))
else:
# Leaf node
if isinstance(cfnelem, str): # and re.match(searchRegex, cfnelem):
for variable in re.findall(
cfnlint.helpers.REGEX_DYN_REF_SSM_SECURE, cfnelem
):
values.append(path + [variable])

return values

def match_values(self, cfn):
"""
Search for values in all parts of the templates that match the searchRegex
"""
results = []
results.extend(self._match_values(cfn.template, []))
# Globals are removed during a transform. They need to be checked manually
results.extend(self._match_values(cfn.template.get("Globals", {}), []))
return results

def _test_list(self, parent_list, child_list):
result = False
for idx in range(len(parent_list) - len(child_list) + 1):
if parent_list[idx : idx + len(child_list)] == child_list:
result = True
break

return result

def match(self, cfn):
matches = []
paths = self.match_values(cfn)

for path in paths:
message = (
"Dynamic reference secure strings are not supported at this location"
)
if path[0] == "Resources":
resource_type = (
cfn.template.get("Resources", {}).get(path[1]).get("Type")
)
exception = self.exceptions.get(resource_type)
if not exception:
matches.append(RuleMatch(path[:-1], message=message))
continue

if isinstance(exception, dict):
matched = False
for k, v in exception.items():
if self._test_list(path, ["Properties", k]):
if v in path:
matched = True
if not matched:
matches.append(RuleMatch(path[:-1], message=message))
continue
if not self._test_list(path, ["Properties", exception]):
matches.append(RuleMatch(path[:-1], message=message))

else:
matches.append(RuleMatch(path[:-1], message=message))

return matches
self.exceptions = [
"Resources/AWS::DirectoryService::MicrosoftAD/Properties/Password",
"Resources/AWS::DirectoryService::SimpleAD/Properties/Password",
"Resources/AWS::ElastiCache::ReplicationGroup/Properties/AuthToken",
"Resources/AWS::IAM::User/Properties/LoginProfile/Password",
"Resources/AWS::KinesisFirehose::DeliveryStream/Properties/RedshiftDestinationConfiguration/Password",
"Resources/AWS::OpsWorks::App/Properties/AppSource/Password",
"Resources/AWS::OpsWorks::Stack/Properties/RdsDbInstances/DbPassword",
"Resources/AWS::OpsWorks::Stack/Properties/CustomCookbooksSource/Password",
"Resources/AWS::RDS::DBCluster/Properties/MasterUserPassword",
"Resources/AWS::RDS::DBInstance/Properties/MasterUserPassword",
"Resources/AWS::Redshift::Cluster/Properties/MasterUserPassword",
]

def validate(self, validator: Validator, s: Any, instance: Any, schema: Any):
keyword = self.get_keyword(validator)
if keyword in self.exceptions:
return

yield ValidationError(
(
f"Dynamic reference {instance!r} to SSM secure strings "
"can only be used in resource properties"
),
rule=self,
)
40 changes: 40 additions & 0 deletions src/cfnlint/rules/functions/DynamicReferenceSsmPath.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from typing import Any

from cfnlint.jsonschema import ValidationError, Validator
from cfnlint.rules import CloudFormationLintRule


class DynamicReferenceSsmPath(CloudFormationLintRule):
id = "E1052"
shortdesc = "Validate dynamic references to SSM are in a valid location"
description = (
"Dynamic references to SSM parameters are only supported "
"in certain locations"
)
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/dynamic-references.html#dynamic-references-ssm"
tags = ["functions", "dynamic reference"]

def validate(self, validator: Validator, s: Any, instance: Any, schema: Any):
if len(validator.context.path) > 0:
if validator.context.path[0] == "Parameters":
if len(validator.context.path) >= 3:
if validator.context.path[2] in ["Default", "AllowedValues"]:
return
elif validator.context.path[0] == "Resources":
if len(validator.context.path) >= 3:
if validator.context.path[2] in ["Properties", "Metadata"]:
return
elif validator.context.path[0] == "Outputs":
if len(validator.context.path) >= 3:
if validator.context.path[2] in ["Value"]:
return

yield ValidationError(
(f"Dynamic reference {instance!r} to SSM parameters are not allowed here"),
rule=self,
)
1 change: 0 additions & 1 deletion src/cfnlint/rules/resources/properties/Properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def cfnresourceproperties(self, validator: Validator, _, instance: Any, schema):
validator = validator.evolve(
context=validator.context.evolve(
functions=FUNCTIONS,
dynamic_references=True,
)
)

Expand Down
40 changes: 0 additions & 40 deletions test/fixtures/templates/bad/functions/dynamic_reference.yaml

This file was deleted.

Loading