Skip to content

Commit

Permalink
Even more appropriate dynamic references (#3071)
Browse files Browse the repository at this point in the history
  • Loading branch information
kddejong authored Feb 23, 2024
1 parent 4325382 commit fa77ca3
Show file tree
Hide file tree
Showing 13 changed files with 521 additions and 230 deletions.
3 changes: 0 additions & 3 deletions src/cfnlint/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ class Context:
init=True, default_factory=lambda: set(PSEUDOPARAMS)
)

# can we use dynamic references
dynamic_references: bool = field(init=True, default=False)

# Combiniation of storing any resolved ref
# and adds in any Refs available from things like Fn::Sub
ref_values: Dict[str, Any] = field(init=True, default_factory=dict)
Expand Down
8 changes: 3 additions & 5 deletions src/cfnlint/jsonschema/_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,9 @@ def _filter_schemas(self, schema, validator: Any) -> Tuple[Any, Any]:
def filter(self, validator: Any, instance: Any, schema: Any):
# Lets validate dynamic references when appropriate
if validator.is_type(instance, "string"):
if validator.context:
if validator.context.dynamic_references:
if REGEX_DYN_REF.findall(instance):
yield (instance, {"dynamicReference": schema})
return
if REGEX_DYN_REF.findall(instance):
yield (instance, {"dynamicReference": schema})
return

# dependencies, required, minProperties, maxProperties
# need to have filtered properties to validate
Expand Down
42 changes: 29 additions & 13 deletions src/cfnlint/rules/functions/DynamicReference.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from cfnlint.helpers import REGEX_DYN_REF
from cfnlint.jsonschema import ValidationError, Validator
from cfnlint.rules import CloudFormationLintRule
from cfnlint.rules.functions._BaseFn import BaseFn

_all = {
"items": [
Expand Down Expand Up @@ -62,14 +62,9 @@
}


class DynamicReference(CloudFormationLintRule):
"""
Check if Dynamic Reference Secure Strings are
only used in the correct locations
"""

class DynamicReference(BaseFn):
id = "E1050"
shortdesc = "Check dynamic references secure strings are in supported locations"
shortdesc = "Validate the structure of a dynamic reference"
description = (
"Dynamic References Secure Strings are only supported for a small set of"
" resource properties. Validate that they are being used in the correct"
Expand All @@ -79,6 +74,13 @@ class DynamicReference(CloudFormationLintRule):
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/dynamic-references.html"
tags = ["functions", "dynamic reference"]

def __init__(self) -> None:
super().__init__()
self.child_rules = {
"E1051": None,
"E1027": None,
}

def _clean_errors(self, err: ValidationError) -> ValidationError:
err.rule = self
err.path = deque([])
Expand All @@ -91,6 +93,10 @@ def dynamicReference(
if not validator.is_type(instance, "string"):
return

# SSM parameters can be used in Resources and Outputs and Parameters
# SSM secrets are only used in a small number of locations
# Secrets manager can be used only in resource properties

for v in REGEX_DYN_REF.findall(instance):
parts = v.split(":")

Expand All @@ -101,14 +107,24 @@ def dynamicReference(
found = True

if found:
return
continue

if parts[1] in ["ssm", "ssm-secure"]:
if parts[1] == "ssm":
evolved = validator.evolve(schema=_ssm)
elif parts[2] == "arn":
evolved = validator.evolve(schema=_secrets_manager_arn)
elif parts[1] == "ssm-secure":
evolved = validator.evolve(schema=_ssm)
rule = self.child_rules["E1027"]
if rule:
yield from rule.validate(validator, {}, v, schema)
else:
evolved = validator.evolve(schema=_secrets_manager)
if parts[2] == "arn":
evolved = validator.evolve(schema=_secrets_manager_arn)
else: # this is secrets manager
evolved = validator.evolve(schema=_secrets_manager)
rule = self.child_rules["E1051"]
if rule:
yield from rule.validate(validator, {}, v, schema)

for err in evolved.iter_errors(parts):
yield self._clean_errors(err)
found = True
38 changes: 38 additions & 0 deletions src/cfnlint/rules/functions/DynamicReferenceSecretsManagerPath.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from typing import Any

from cfnlint.jsonschema import ValidationError, Validator
from cfnlint.rules import CloudFormationLintRule


class DynamicReferenceSecretsManagerPath(CloudFormationLintRule):
id = "E1051"
shortdesc = (
"Validate dynamic references to secrets manager are only in resource properties"
)
description = (
"Dynamic references from secrets manager can only be used "
"in resource properties"
)
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/dynamic-references.html#dynamic-references-secretsmanager"
tags = ["functions", "dynamic reference"]

def validate(self, validator: Validator, s: Any, instance: Any, schema: Any):
if len(validator.context.path) >= 3:
if (
validator.context.path[0] == "Resources"
and validator.context.path[2] == "Properties"
):
return

yield ValidationError(
(
f"Dynamic reference {instance!r} to secrets manager can only be "
"used in resource properties"
),
rule=self,
)
138 changes: 30 additions & 108 deletions src/cfnlint/rules/functions/DynamicReferenceSecureString.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,13 @@
SPDX-License-Identifier: MIT-0
"""

import regex as re
from typing import Any

import cfnlint.helpers
from cfnlint.rules import CloudFormationLintRule, RuleMatch
from cfnlint.jsonschema import ValidationError, Validator
from cfnlint.rules.functions._BaseFn import BaseFn


class DynamicReferenceSecureString(CloudFormationLintRule):
"""
Check if Dynamic Reference Secure Strings are
only used in the correct locations
"""

class DynamicReferenceSecureString(BaseFn):
id = "E1027"
shortdesc = "Check dynamic references secure strings are in supported locations"
description = (
Expand All @@ -29,102 +24,29 @@ class DynamicReferenceSecureString(CloudFormationLintRule):
def __init__(self):
"""Init"""
super().__init__()
self.property_specs = []
self.resource_specs = []
self.exceptions = {
"AWS::DirectoryService::MicrosoftAD": "Password",
"AWS::DirectoryService::SimpleAD": "Password",
"AWS::ElastiCache::ReplicationGroup": "AuthToken",
"AWS::IAM::User": {
"LoginProfile": "Password",
},
"AWS::KinesisFirehose::DeliveryStream": {
"RedshiftDestinationConfiguration": "Password",
},
"AWS::OpsWorks::App": {
"AppSource": "Password",
},
"AWS::OpsWorks::Stack": {
"RdsDbInstances": "DbPassword",
"CustomCookbooksSource": "Password",
},
"AWS::RDS::DBCluster": "MasterUserPassword",
"AWS::RDS::DBInstance": "MasterUserPassword",
"AWS::Redshift::Cluster": "MasterUserPassword",
}

def _match_values(self, cfnelem, path):
"""Recursively search for values matching the searchRegex"""
values = []
if isinstance(cfnelem, dict):
for key in cfnelem:
pathprop = path[:]
pathprop.append(key)
values.extend(self._match_values(cfnelem[key], pathprop))
elif isinstance(cfnelem, list):
for index, item in enumerate(cfnelem):
pathprop = path[:]
pathprop.append(index)
values.extend(self._match_values(item, pathprop))
else:
# Leaf node
if isinstance(cfnelem, str): # and re.match(searchRegex, cfnelem):
for variable in re.findall(
cfnlint.helpers.REGEX_DYN_REF_SSM_SECURE, cfnelem
):
values.append(path + [variable])

return values

def match_values(self, cfn):
"""
Search for values in all parts of the templates that match the searchRegex
"""
results = []
results.extend(self._match_values(cfn.template, []))
# Globals are removed during a transform. They need to be checked manually
results.extend(self._match_values(cfn.template.get("Globals", {}), []))
return results

def _test_list(self, parent_list, child_list):
result = False
for idx in range(len(parent_list) - len(child_list) + 1):
if parent_list[idx : idx + len(child_list)] == child_list:
result = True
break

return result

def match(self, cfn):
matches = []
paths = self.match_values(cfn)

for path in paths:
message = (
"Dynamic reference secure strings are not supported at this location"
)
if path[0] == "Resources":
resource_type = (
cfn.template.get("Resources", {}).get(path[1]).get("Type")
)
exception = self.exceptions.get(resource_type)
if not exception:
matches.append(RuleMatch(path[:-1], message=message))
continue

if isinstance(exception, dict):
matched = False
for k, v in exception.items():
if self._test_list(path, ["Properties", k]):
if v in path:
matched = True
if not matched:
matches.append(RuleMatch(path[:-1], message=message))
continue
if not self._test_list(path, ["Properties", exception]):
matches.append(RuleMatch(path[:-1], message=message))

else:
matches.append(RuleMatch(path[:-1], message=message))

return matches
self.exceptions = [
"Resources/AWS::DirectoryService::MicrosoftAD/Properties/Password",
"Resources/AWS::DirectoryService::SimpleAD/Properties/Password",
"Resources/AWS::ElastiCache::ReplicationGroup/Properties/AuthToken",
"Resources/AWS::IAM::User/Properties/LoginProfile/Password",
"Resources/AWS::KinesisFirehose::DeliveryStream/Properties/RedshiftDestinationConfiguration/Password",
"Resources/AWS::OpsWorks::App/Properties/AppSource/Password",
"Resources/AWS::OpsWorks::Stack/Properties/RdsDbInstances/DbPassword",
"Resources/AWS::OpsWorks::Stack/Properties/CustomCookbooksSource/Password",
"Resources/AWS::RDS::DBCluster/Properties/MasterUserPassword",
"Resources/AWS::RDS::DBInstance/Properties/MasterUserPassword",
"Resources/AWS::Redshift::Cluster/Properties/MasterUserPassword",
]

def validate(self, validator: Validator, s: Any, instance: Any, schema: Any):
keyword = self.get_keyword(validator)
if keyword in self.exceptions:
return

yield ValidationError(
(
f"Dynamic reference {instance!r} to SSM secure strings "
"can only be used in resource properties"
),
rule=self,
)
40 changes: 40 additions & 0 deletions src/cfnlint/rules/functions/DynamicReferenceSsmPath.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from typing import Any

from cfnlint.jsonschema import ValidationError, Validator
from cfnlint.rules import CloudFormationLintRule


class DynamicReferenceSsmPath(CloudFormationLintRule):
id = "E1052"
shortdesc = "Validate dynamic references to SSM are in a valid location"
description = (
"Dynamic references to SSM parameters are only supported "
"in certain locations"
)
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/dynamic-references.html#dynamic-references-ssm"
tags = ["functions", "dynamic reference"]

def validate(self, validator: Validator, s: Any, instance: Any, schema: Any):
if len(validator.context.path) > 0:
if validator.context.path[0] == "Parameters":
if len(validator.context.path) >= 3:
if validator.context.path[2] in ["Default", "AllowedValues"]:
return
elif validator.context.path[0] == "Resources":
if len(validator.context.path) >= 3:
if validator.context.path[2] in ["Properties", "Metadata"]:
return
elif validator.context.path[0] == "Outputs":
if len(validator.context.path) >= 3:
if validator.context.path[2] in ["Value"]:
return

yield ValidationError(
(f"Dynamic reference {instance!r} to SSM parameters are not allowed here"),
rule=self,
)
1 change: 0 additions & 1 deletion src/cfnlint/rules/resources/properties/Properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def cfnresourceproperties(self, validator: Validator, _, instance: Any, schema):
validator = validator.evolve(
context=validator.context.evolve(
functions=FUNCTIONS,
dynamic_references=True,
)
)

Expand Down
40 changes: 0 additions & 40 deletions test/fixtures/templates/bad/functions/dynamic_reference.yaml

This file was deleted.

Loading

0 comments on commit fa77ca3

Please sign in to comment.