From c53b1040aedb23ba5f1ed3981aef0040679a96e7 Mon Sep 17 00:00:00 2001 From: Kevin DeJong Date: Tue, 30 Apr 2024 09:45:33 -0700 Subject: [PATCH] When using a list param in foreach pass back select statements when no allowed value (#3176) --- src/cfnlint/decode/node.py | 1 - .../transforms/_language_extensions.py | 13 +- .../transforms/test_language_extensions.py | 111 +++++++++++++++++- 3 files changed, 121 insertions(+), 4 deletions(-) diff --git a/src/cfnlint/decode/node.py b/src/cfnlint/decode/node.py index 7ceca7d75c..fe4c506884 100644 --- a/src/cfnlint/decode/node.py +++ b/src/cfnlint/decode/node.py @@ -76,7 +76,6 @@ class node_class(cls): def __init__( self, x, start_mark: Mark | None = None, end_mark: Mark | None = None ): - LOGGER.debug(type(start_mark)) try: cls.__init__(self, x) except TypeError: diff --git a/src/cfnlint/template/transforms/_language_extensions.py b/src/cfnlint/template/transforms/_language_extensions.py index d966cfa031..3a5ce67d66 100644 --- a/src/cfnlint/template/transforms/_language_extensions.py +++ b/src/cfnlint/template/transforms/_language_extensions.py @@ -5,6 +5,8 @@ from __future__ import annotations +import hashlib +import json import logging import random import string @@ -193,7 +195,9 @@ def _walk(self, item: Any, params: MutableMapping[str, Any], cfn: Any): return obj def _replace_string_params( - self, s: str, params: Mapping[str, Any] + self, + s: str, + params: Mapping[str, Any], ) -> Tuple[bool, str]: pattern = r"(\$|&){[a-zA-Z0-9\.:]+}" if not re.search(pattern, s): @@ -201,6 +205,8 @@ def _replace_string_params( new_s = deepcopy(s) for k, v in params.items(): + if isinstance(v, dict): + v = hashlib.md5(json.dumps(v).encode("utf-8")).digest().hex()[0:4] new_s = re.sub(rf"\$\{{{k}\}}", v, new_s) new_s = re.sub(rf"\&\{{{k}\}}", re.sub("[^0-9a-zA-Z]+", "", v), new_s) @@ -453,6 +459,9 @@ def value( return [x.strip() for x in allowed_values[0].split(",")] return allowed_values[0] + if "List" in t: + return [{"Fn::Select": [0, {"Ref": v}]}, {"Fn::Select": [1, {"Ref": v}]}] + raise _ResolveError("Can't resolve Fn::Ref", self._obj) @@ -490,7 +499,7 @@ def values( if values: if isinstance(values, list): for value in values: - if isinstance(value, str): + if isinstance(value, (str, dict)): yield value else: raise _ValueError( diff --git a/test/unit/module/template/transforms/test_language_extensions.py b/test/unit/module/template/transforms/test_language_extensions.py index eed28f4cd1..5e2ab5dab6 100644 --- a/test/unit/module/template/transforms/test_language_extensions.py +++ b/test/unit/module/template/transforms/test_language_extensions.py @@ -10,9 +10,9 @@ from cfnlint.template import Template from cfnlint.template.transforms._language_extensions import ( _ForEach, + _ForEachCollection, _ForEachValue, _ForEachValueFnFindInMap, - _ForEachValueRef, _ResolveError, _Transform, _TypeError, @@ -27,6 +27,7 @@ def test_valid(self): _ForEach( "key", [{"Ref": "Parameter"}, {"Ref": "AWS::NotificationArns"}, {}], {} ) + _ForEach("key", ["AccountId", {"Ref": "AccountIds"}, {}], {}) def test_wrong_type(self): with self.assertRaises(_TypeError): @@ -54,6 +55,32 @@ def test_output_type(self): _ForEach("key", ["foo", ["bar"], []], {}) +class TestForEach(TestCase): + def setUp(self) -> None: + super().setUp() + self.cfn = Template( + "", + { + "Parameters": { + "AccountIds": { + "Type": "CommaDelimitedList", + }, + }, + }, + regions=["us-west-2"], + ) + + def test_valid(self): + fec = _ForEachCollection({"Ref": "AccountIds"}) + self.assertListEqual( + list(fec.values(self.cfn, {})), + [ + {"Fn::Select": [0, {"Ref": "AccountIds"}]}, + {"Fn::Select": [1, {"Ref": "AccountIds"}]}, + ], + ) + + class TestRef(TestCase): def setUp(self) -> None: self.template_obj = convert_dict( @@ -75,6 +102,9 @@ def setUp(self) -> None: "Type": "List", "AllowedValues": ["sg-12345678, sg-87654321"], }, + "AccountIds": { + "Type": "CommaDelimitedList", + }, }, } ) @@ -115,6 +145,15 @@ def test_ref(self): fe = _ForEachValue.create({"Ref": "SecurityGroups"}) self.assertEqual(fe.value(self.cfn), ["sg-12345678", "sg-87654321"]) + fe = _ForEachValue.create({"Ref": "AccountIds"}) + self.assertEqual( + fe.value(self.cfn), + [ + {"Fn::Select": [0, {"Ref": "AccountIds"}]}, + {"Fn::Select": [1, {"Ref": "AccountIds"}]}, + ], + ) + class TestFindInMap(TestCase): def setUp(self) -> None: @@ -416,6 +455,76 @@ def test_transform_findinmap_function(self): result, ) + def test_transform_list_parameter(self): + template_obj = deepcopy(self.template_obj) + parameters = {"AccountIds": {"Type": "CommaDelimitedList"}} + template_obj["Parameters"] = parameters + + nested_set( + template_obj, + [ + "Resources", + "Fn::ForEach::SpecialCharacters", + 1, + ], + {"Ref": "AccountIds"}, + ) + nested_set( + template_obj, + [ + "Resources", + "Fn::ForEach::SpecialCharacters", + 2, + ], + { + "S3Bucket&{Identifier}": { + "Type": "AWS::S3::Bucket", + "Properties": { + "BucketName": {"Ref": "Identifier"}, + "Tags": [ + {"Key": "Name", "Value": {"Fn::Sub": "Name-${Identifier}"}}, + ], + }, + } + }, + ) + cfn = Template(filename="", template=template_obj, regions=["us-east-1"]) + matches, template = language_extension(cfn) + self.assertListEqual(matches, []) + + result = deepcopy(self.result) + result["Parameters"] = parameters + result["Resources"]["S3Bucket5096"] = { + "Properties": { + "BucketName": {"Fn::Select": [1, {"Ref": "AccountIds"}]}, + "Tags": [ + { + "Key": "Name", + "Value": "Name-5096", + }, + ], + }, + "Type": "AWS::S3::Bucket", + } + result["Resources"]["S3Bucketa72a"] = { + "Properties": { + "BucketName": {"Fn::Select": [0, {"Ref": "AccountIds"}]}, + "Tags": [ + { + "Key": "Name", + "Value": "Name-a72a", + }, + ], + }, + "Type": "AWS::S3::Bucket", + } + del result["Resources"]["S3Bucketab"] + del result["Resources"]["S3Bucketcd"] + self.assertDictEqual( + template, + result, + ) + def test_bad_collection_ref(self): template_obj = deepcopy(self.template_obj) nested_set(