Skip to content

Commit

Permalink
When using a list param in foreach pass back select statements when n…
Browse files Browse the repository at this point in the history
…o allowed value (#3176)
  • Loading branch information
kddejong authored Apr 30, 2024
1 parent 6918363 commit c53b104
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 4 deletions.
1 change: 0 additions & 1 deletion src/cfnlint/decode/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ class node_class(cls):
def __init__(
self, x, start_mark: Mark | None = None, end_mark: Mark | None = None
):
LOGGER.debug(type(start_mark))
try:
cls.__init__(self, x)
except TypeError:
Expand Down
13 changes: 11 additions & 2 deletions src/cfnlint/template/transforms/_language_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from __future__ import annotations

import hashlib
import json
import logging
import random
import string
Expand Down Expand Up @@ -193,14 +195,18 @@ def _walk(self, item: Any, params: MutableMapping[str, Any], cfn: Any):
return obj

def _replace_string_params(
self, s: str, params: Mapping[str, Any]
self,
s: str,
params: Mapping[str, Any],
) -> Tuple[bool, str]:
pattern = r"(\$|&){[a-zA-Z0-9\.:]+}"
if not re.search(pattern, s):
return (True, s)

new_s = deepcopy(s)
for k, v in params.items():
if isinstance(v, dict):
v = hashlib.md5(json.dumps(v).encode("utf-8")).digest().hex()[0:4]
new_s = re.sub(rf"\$\{{{k}\}}", v, new_s)
new_s = re.sub(rf"\&\{{{k}\}}", re.sub("[^0-9a-zA-Z]+", "", v), new_s)

Expand Down Expand Up @@ -453,6 +459,9 @@ def value(
return [x.strip() for x in allowed_values[0].split(",")]
return allowed_values[0]

if "List" in t:
return [{"Fn::Select": [0, {"Ref": v}]}, {"Fn::Select": [1, {"Ref": v}]}]

raise _ResolveError("Can't resolve Fn::Ref", self._obj)


Expand Down Expand Up @@ -490,7 +499,7 @@ def values(
if values:
if isinstance(values, list):
for value in values:
if isinstance(value, str):
if isinstance(value, (str, dict)):
yield value
else:
raise _ValueError(
Expand Down
111 changes: 110 additions & 1 deletion test/unit/module/template/transforms/test_language_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
from cfnlint.template import Template
from cfnlint.template.transforms._language_extensions import (
_ForEach,
_ForEachCollection,
_ForEachValue,
_ForEachValueFnFindInMap,
_ForEachValueRef,
_ResolveError,
_Transform,
_TypeError,
Expand All @@ -27,6 +27,7 @@ def test_valid(self):
_ForEach(
"key", [{"Ref": "Parameter"}, {"Ref": "AWS::NotificationArns"}, {}], {}
)
_ForEach("key", ["AccountId", {"Ref": "AccountIds"}, {}], {})

def test_wrong_type(self):
with self.assertRaises(_TypeError):
Expand Down Expand Up @@ -54,6 +55,32 @@ def test_output_type(self):
_ForEach("key", ["foo", ["bar"], []], {})


class TestForEach(TestCase):
def setUp(self) -> None:
super().setUp()
self.cfn = Template(
"",
{
"Parameters": {
"AccountIds": {
"Type": "CommaDelimitedList",
},
},
},
regions=["us-west-2"],
)

def test_valid(self):
fec = _ForEachCollection({"Ref": "AccountIds"})
self.assertListEqual(
list(fec.values(self.cfn, {})),
[
{"Fn::Select": [0, {"Ref": "AccountIds"}]},
{"Fn::Select": [1, {"Ref": "AccountIds"}]},
],
)


class TestRef(TestCase):
def setUp(self) -> None:
self.template_obj = convert_dict(
Expand All @@ -75,6 +102,9 @@ def setUp(self) -> None:
"Type": "List<AWS::EC2::Subnet::Id>",
"AllowedValues": ["sg-12345678, sg-87654321"],
},
"AccountIds": {
"Type": "CommaDelimitedList",
},
},
}
)
Expand Down Expand Up @@ -115,6 +145,15 @@ def test_ref(self):
fe = _ForEachValue.create({"Ref": "SecurityGroups"})
self.assertEqual(fe.value(self.cfn), ["sg-12345678", "sg-87654321"])

fe = _ForEachValue.create({"Ref": "AccountIds"})
self.assertEqual(
fe.value(self.cfn),
[
{"Fn::Select": [0, {"Ref": "AccountIds"}]},
{"Fn::Select": [1, {"Ref": "AccountIds"}]},
],
)


class TestFindInMap(TestCase):
def setUp(self) -> None:
Expand Down Expand Up @@ -416,6 +455,76 @@ def test_transform_findinmap_function(self):
result,
)

def test_transform_list_parameter(self):
template_obj = deepcopy(self.template_obj)
parameters = {"AccountIds": {"Type": "CommaDelimitedList"}}
template_obj["Parameters"] = parameters

nested_set(
template_obj,
[
"Resources",
"Fn::ForEach::SpecialCharacters",
1,
],
{"Ref": "AccountIds"},
)
nested_set(
template_obj,
[
"Resources",
"Fn::ForEach::SpecialCharacters",
2,
],
{
"S3Bucket&{Identifier}": {
"Type": "AWS::S3::Bucket",
"Properties": {
"BucketName": {"Ref": "Identifier"},
"Tags": [
{"Key": "Name", "Value": {"Fn::Sub": "Name-${Identifier}"}},
],
},
}
},
)
cfn = Template(filename="", template=template_obj, regions=["us-east-1"])
matches, template = language_extension(cfn)
self.assertListEqual(matches, [])

result = deepcopy(self.result)
result["Parameters"] = parameters
result["Resources"]["S3Bucket5096"] = {
"Properties": {
"BucketName": {"Fn::Select": [1, {"Ref": "AccountIds"}]},
"Tags": [
{
"Key": "Name",
"Value": "Name-5096",
},
],
},
"Type": "AWS::S3::Bucket",
}
result["Resources"]["S3Bucketa72a"] = {
"Properties": {
"BucketName": {"Fn::Select": [0, {"Ref": "AccountIds"}]},
"Tags": [
{
"Key": "Name",
"Value": "Name-a72a",
},
],
},
"Type": "AWS::S3::Bucket",
}
del result["Resources"]["S3Bucketab"]
del result["Resources"]["S3Bucketcd"]
self.assertDictEqual(
template,
result,
)

def test_bad_collection_ref(self):
template_obj = deepcopy(self.template_obj)
nested_set(
Expand Down

0 comments on commit c53b104

Please sign in to comment.