From a3ec701556f533b27e57576bbe170e656fc2458f Mon Sep 17 00:00:00 2001 From: Sumit Jaiswal Date: Tue, 17 Sep 2024 17:54:59 +0530 Subject: [PATCH] fix issue 257 Signed-off-by: Sumit Jaiswal --- ansible_risk_insight/cli/__init__.py | 8 +- ansible_risk_insight/finder.py | 118 ++++++++---- ansible_risk_insight/ram_generator.py | 4 +- ansible_risk_insight/yaml_utils.py | 253 +++++++++++++++++++++++++- scripts/gen_ram_slim.py | 2 +- 5 files changed, 333 insertions(+), 52 deletions(-) diff --git a/ansible_risk_insight/cli/__init__.py b/ansible_risk_insight/cli/__init__.py index f0926aee..5351d788 100644 --- a/ansible_risk_insight/cli/__init__.py +++ b/ansible_risk_insight/cli/__init__.py @@ -190,7 +190,7 @@ def run(self): fpath_from_root = target_info["path_from_root"] scan_type = target_info["scan_type"] count_in_type = len(file_list[scan_type]) - print(f"\r[{i+1}/{total}] {scan_type} {fpath_from_root} ", end="") + print(f"\r[{i + 1}/{total}] {scan_type} {fpath_from_root} ", end="") out_dir = os.path.join(args.out_dir, f"{scan_type}s", str(count_in_type)) c.evaluate( type=scan_type, @@ -223,8 +223,8 @@ def run(self): with open(list_file_path, "w") as file: json.dump(index_data, file) if args.fix: - for each in index_data.keys(): - ari_suggestion_file_path = os.path.join(args.out_dir, f"{scan_type}s", str(each), "rule_result.json") + for key, val in index_data.items(): + ari_suggestion_file_path = os.path.join(args.out_dir, f"{scan_type}s", str(key), "rule_result.json") logger.debug("ARI suggestion file path: %s", ari_suggestion_file_path) with open(ari_suggestion_file_path) as f: ari_suggestion_data = json.load(f) @@ -246,7 +246,7 @@ def run(self): mutated_yaml = w007_rule["detail"]["mutated_yaml"] if mutated_yaml == "": break - temp_data = index_data[each] + temp_data = index_data[key] if w007_rule["file"][0] not in temp_data: target_file_path = os.path.join(args.target_name, temp_data, w007_rule["file"][0]) if temp_file_path != "" and target_file_path != temp_file_path: diff --git a/ansible_risk_insight/finder.py b/ansible_risk_insight/finder.py index 6fe03bec..2fd90a5a 100644 --- a/ansible_risk_insight/finder.py +++ b/ansible_risk_insight/finder.py @@ -970,13 +970,18 @@ def check_diff_and_copy_olddata_to_newdata(line_number_list, lines, new_data): return new_data -def update_and_append_new_line(new_line, old_line, leading_spaces, data_copy): +def update_and_append_new_line(new_line, old_line, leading_spaces, data_copy, line_number): """ Function to get the leading space for the new ARI mutated line, with its equivaltent old line with space similar to the old line """ + if "-" in old_line and "-" not in new_line and len(new_line.split(".")) >= 3: + data_copy.append(new_line) + data_copy.append("\n") + return "" line_with_adjusted_space = update_line_with_space(new_line, old_line, leading_spaces) - data_copy.append(line_with_adjusted_space) + # data_copy.append(line_with_adjusted_space) + data_copy[line_number] = line_with_adjusted_space return "" @@ -986,35 +991,50 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list): with open(file_path, "r") as file: data = file.read() yaml = FormattedYAML( + version=(1, 1) # Ansible only uses YAML 1.1, but others files should use newer 1.2 (ruamel.yaml defaults to 1.2) ) - data_copy = populate_new_data_list(data, line_number_list) + data_copy = data.splitlines(keepends=True) stop_line_number = 0 new_lines = [] - for iter in range(len(line_number_list)): - line_number = line_number_list[iter] + count = 0 + line_number = 0 + # for iter in range(len(line_number_list)): + lines = data.splitlines(keepends=True) + new_task = False + for iter, line in enumerate(line_number_list): + ari_line_number = line new_content = new_content_list[iter] - input_line_number = line_number.lstrip("L").split("-") - lines = data.splitlines(keepends=True) + input_line_number = ari_line_number.lstrip("L").split("-") + if new_lines: + if count > 1 and new_task: + line_number = line_number - (count - 1) for i in range(len(new_lines)): try: - data_copy.append(new_lines.pop(i)) + leading_spaces = len(lines[stop_line_number + 1]) - len(lines[stop_line_number + 1].lstrip()) + new_line_content = update_and_append_new_line( + new_lines.pop(i), + lines[stop_line_number + 1], + leading_spaces, data_copy, + line_number + ) + count -= 1 except IndexError: break new_lines = new_content.splitlines(keepends=True) # Update the specific line with new content start_line_number = int(input_line_number[0]) - if stop_line_number > 0 and (start_line_number - stop_line_number) > 1: - check_and_add_diff_lines(stop_line_number, start_line_number, lines, data_copy) stop_line_number = int(input_line_number[1]) diff_in_lines = stop_line_number - start_line_number temp_content = [] start = start_line_number - 1 - end = stop_line_number - 1 - data_copy.append("\n") + end = stop_line_number + not_replaced = False for i in range(start, end): line_number = i + if data_copy[line_number] == '\n': + continue if len(lines) == i: break try: @@ -1031,54 +1051,74 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list): leading_spaces = len(lines[line_number]) - len(lines[line_number].lstrip()) temp_content.append(new_line_content) new_line_content = new_line_content.lstrip(" ") - lines[line_number] = " " * leading_spaces + new_line_content - data_copy.append(lines[line_number]) + data_copy[line_number] = " " * leading_spaces + new_line_content else: new_line_key = new_line_content.split(":") new_key = new_line_key[0].strip(" ") - for k in range(start, end): + if count > 1 and new_task: + line_number = line_number - (count - 1) + for k in range(start, end + 1): if k < len(lines): old_line_key = lines[k].split(":") if "---" in old_line_key[0]: continue old_key = old_line_key[0].strip(" ") + if old_key == '\n': + continue if "-" in old_line_key[0] and ":" not in lines[k] and "-" in new_key: - # diff_in_lines = len(lines) - len(new_lines) leading_spaces = len(lines[k]) - len(lines[k].lstrip()) - if diff_in_lines > len(lines): - for i in range(k, k + diff_in_lines): - if lines[i] == "\n": - lines.pop(i - 1) - break - elif i < len(lines) and ":" not in lines[i]: - lines.pop(i) - else: - break - new_line_content = update_and_append_new_line(new_line_content, lines[k], leading_spaces, data_copy) + for i in range(k, k + diff_in_lines): + if data_copy[i] == "\n": + data_copy.pop(i - 1) + lines.pop(i - 1) + break + elif old_key == new_key: + break + elif i < len(data_copy) and ":" not in data_copy[i]: + data_copy.pop(i) + lines.pop(i) + else: + break + count += 1 + new_line_content = update_and_append_new_line(new_line_content, lines[k], leading_spaces, data_copy, k) + new_task = False break elif old_key == new_key: - new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy) + new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy, k) break elif old_key.rstrip("\n") == new_key: - new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy) + new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy, k) break - elif old_key.rstrip("\n") in new_key.split("."): - new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy) + elif "." in new_key and old_key.rstrip("\n") in new_key.split("."): + new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy, k) break - if new_line_content: # if there wasn't a match with old line, so this seems updated by ARI and added to w/o any change - data_copy.append(new_line_content) - else: - return IndexError("Line number out of range.") - # check for diff b/w new content and old contents, - # and copy the old content that's not updated by ARI mutation - data_copy = check_diff_and_copy_olddata_to_newdata(line_number_list, lines, data_copy) - # Join the lines back to a single string + elif "." in new_key and new_key.split(".")[-1] in old_key.rstrip("\n"): + new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy, k) + break + if new_line_content and not_replaced and len(new_lines) > 0: + leading_spaces = len(lines[line_number]) - len(lines[line_number].lstrip()) + new_line_content = update_and_append_new_line(new_line_content, lines[i], leading_spaces, data_copy, line_number) + new_line_content = "" + if new_line_content and len(new_lines) > 0: + try: + k += 1 + leading_spaces = len(lines[k]) - len(lines[k].lstrip()) + new_line_content = update_and_append_new_line(new_line_content, lines[i], leading_spaces, data_copy, k) + for each in new_lines: + k += 1 + leading_spaces = len(lines[k]) - len(lines[k].lstrip()) + new_line_content = update_and_append_new_line(each, lines[i], leading_spaces, data_copy, k) + new_lines = [] + except IndexError: + break + new_task = True updated_data = "".join(data_copy) # Parse the updated YAML content to ensure it is valid updated_parsed_data = yaml.load(updated_data) # Write the updated YAML content back to the file if updated_parsed_data: + file_content = yaml.dumps(updated_parsed_data) with open(file_path, "w") as file: - yaml.dump(updated_parsed_data, file) + file.write(file_content) except Exception as ex: logger.warning("YAML LINES: ARI fix update yaml by lines failed for file: '%s', with error: '%s'", file_path, ex) diff --git a/ansible_risk_insight/ram_generator.py b/ansible_risk_insight/ram_generator.py index 00709965..1fed5338 100644 --- a/ansible_risk_insight/ram_generator.py +++ b/ansible_risk_insight/ram_generator.py @@ -102,7 +102,7 @@ def scan(self, i, num, type, name): elapsed = round(time.time() - self.start, 2) start_of_this_scan = time.time() thread_id = threading.get_native_id() - print(f"[{i+1}/{num}] start {type} {name} ({elapsed} sec. elapsed) (thread: {thread_id})") + print(f"[{i + 1}/{num}] start {type} {name} ({elapsed} sec. elapsed) (thread: {thread_id})") use_src_cache = True if self.skip_scan(type, name): @@ -133,7 +133,7 @@ def scan(self, i, num, type, name): elapsed_for_this_scan = round(time.time() - start_of_this_scan, 2) if elapsed_for_this_scan > 60: - print(f"WARNING: It took {elapsed_for_this_scan} sec. to process [{i+1}/{num}] {type} {name}") + print(f"WARNING: It took {elapsed_for_this_scan} sec. to process [{i + 1}/{num}] {type} {name}") def save_ram_log(self, type, name, fail): out_dir = os.path.join(self._scanner.root_dir, "log", type, name) diff --git a/ansible_risk_insight/yaml_utils.py b/ansible_risk_insight/yaml_utils.py index e9ecc4dc..dc656f85 100644 --- a/ansible_risk_insight/yaml_utils.py +++ b/ansible_risk_insight/yaml_utils.py @@ -3,11 +3,16 @@ # pylint: disable=too-many-lines from __future__ import annotations +import functools +import re +from collections.abc import Callable, Iterator, Sequence from io import StringIO from pathlib import Path -from typing import TYPE_CHECKING, Any -import re -from ruamel.yaml.comments import CommentedMap, CommentedSeq +from re import Pattern +from typing import TYPE_CHECKING, Any, cast + +import ruamel.yaml.events +from ruamel.yaml.comments import CommentedMap, CommentedSeq, Format from ruamel.yaml.composer import ComposerError from ruamel.yaml.constructor import RoundTripConstructor from ruamel.yaml.emitter import Emitter @@ -17,6 +22,10 @@ from ruamel.yaml.main import YAML from ruamel.yaml.parser import ParserError from ruamel.yaml.scalarint import HexInt, ScalarInt +from ansiblelint.constants import ( + ANNOTATION_KEYS +) +from ansiblelint.utils import Task import ansible_risk_insight.logger as logger @@ -25,6 +34,120 @@ from ruamel.yaml.compat import StreamTextType from ruamel.yaml.nodes import ScalarNode from ruamel.yaml.representer import RoundTripRepresenter + from ruamel.yaml.tokens import CommentToken + + +def nested_items_path( + data_collection: dict[Any, Any] | list[Any], + ignored_keys: Sequence[str] = (), +) -> Iterator[tuple[Any, Any, list[str | int]]]: + """Iterate a nested data structure, yielding key/index, value, and parent_path. + + This is a recursive function that calls itself for each nested layer of data. + Each iteration yields: + + 1. the current item's dictionary key or list index, + 2. the current item's value, and + 3. the path to the current item from the outermost data structure. + + For dicts, the yielded (1) key and (2) value are what ``dict.items()`` yields. + For lists, the yielded (1) index and (2) value are what ``enumerate()`` yields. + The final component, the parent path, is a list of dict keys and list indexes. + The parent path can be helpful in providing error messages that indicate + precisely which part of a yaml file (or other data structure) needs to be fixed. + + For example, given this playbook: + + .. code-block:: yaml + + - name: A play + tasks: + - name: A task + debug: + msg: foobar + + Here's the first and last yielded items: + + .. code-block:: python + + >>> playbook=[{"name": "a play", "tasks": [{"name": "a task", "debug": {"msg": "foobar"}}]}] + >>> next( nested_items_path( playbook ) ) + (0, {'name': 'a play', 'tasks': [{'name': 'a task', 'debug': {'msg': 'foobar'}}]}, []) + >>> list( nested_items_path( playbook ) )[-1] + ('msg', 'foobar', [0, 'tasks', 0, 'debug']) + + Note that, for outermost data structure, the parent path is ``[]`` because + you do not need to descend into any nested dicts or lists to find the indicated + key and value. + + If a rule were designed to prohibit "foobar" debug messages, it could use the + parent path to provide a path to the problematic ``msg``. It might use a jq-style + path in its error message: "the error is at ``.[0].tasks[0].debug.msg``". + Or if a utility could automatically fix issues, it could use the path to descend + to the parent object using something like this: + + .. code-block:: python + + target = data + for segment in parent_path: + target = target[segment] + + :param data_collection: The nested data (dicts or lists). + + :returns: each iteration yields the key (of the parent dict) or the index (lists) + """ + # As typing and mypy cannot effectively ensure we are called only with + # valid data, we better ignore NoneType + if data_collection is None: + return + data: dict[Any, Any] | list[Any] + if isinstance(data_collection, Task): + data = data_collection.normalized_task + else: + data = data_collection + yield from _nested_items_path( + data_collection=data, + parent_path=[], + ignored_keys=ignored_keys, + ) + + +def _nested_items_path( + data_collection: dict[Any, Any] | list[Any], + parent_path: list[str | int], + ignored_keys: Sequence[str] = (), +) -> Iterator[tuple[Any, Any, list[str | int]]]: + """Iterate through data_collection (internal implementation of nested_items_path). + + This is a separate function because callers of nested_items_path should + not be using the parent_path param which is used in recursive _nested_items_path + calls to build up the path to the parent object of the current key/index, value. + """ + # we have to cast each convert_to_tuples assignment or mypy complains + # that both assignments (for dict and list) do not have the same type + convert_to_tuples_type = Callable[[], Iterator[tuple[str | int, Any]]] + if isinstance(data_collection, dict): + convert_data_collection_to_tuples = cast( + convert_to_tuples_type, + functools.partial(data_collection.items), + ) + elif isinstance(data_collection, list): + convert_data_collection_to_tuples = cast( + convert_to_tuples_type, + functools.partial(enumerate, data_collection), + ) + else: + msg = f"Expected a dict or a list but got {data_collection!r} of type '{type(data_collection)}'" + raise TypeError(msg) + for key, value in convert_data_collection_to_tuples(): + if key in (*ANNOTATION_KEYS, *ignored_keys): + continue + yield key, value, parent_path + if isinstance(value, dict | list): + yield from _nested_items_path( + data_collection=value, + parent_path=[*parent_path, key], + ) class OctalIntYAML11(ScalarInt): @@ -129,6 +252,76 @@ class FormattedEmitter(Emitter): _in_empty_flow_map = False + # "/n/n" results in one blank line (end the previous line, then newline). + # So, "/n/n/n" or more is too many new lines. Clean it up. + _re_repeat_blank_lines: Pattern[str] = re.compile(r"\n{3,}") + + @staticmethod + def drop_octothorpe_protection(string: str) -> str: + """Remove string protection of "#" after full-line-comment post-processing.""" + try: + if "\uFF03#\uFE5F" in string: + # # is \uFF03 (fullwidth number sign) + # ﹟ is \uFE5F (small number sign) + string = string.replace("\uFF03#\uFE5F", "#") + except (ValueError, TypeError): + # probably not really a string. Whatever. + pass + return string + + # comment is a CommentToken, not Any (Any is ruamel.yaml's lazy type hint). + def write_comment( + self, + comment: CommentToken, + pre: bool = False, # noqa: FBT002 + ) -> None: + """Clean up extra new lines and spaces in comments. + + ruamel.yaml treats new or empty lines as comments. + See: https://stackoverflow.com/questions/42708668/removing-all-blank-lines-but-not-comments-in-ruamel-yaml/42712747#42712747 + """ + value: str = comment.value + if ( + pre + and not value.strip() + and not isinstance( + self.event, + ruamel.yaml.events.CollectionEndEvent + | ruamel.yaml.events.DocumentEndEvent + | ruamel.yaml.events.StreamEndEvent + | ruamel.yaml.events.MappingStartEvent, + ) + ): + # drop pure whitespace pre comments + # does not apply to End events since they consume one of the newlines. + value = "" + elif ( + pre + and not value.strip() + and isinstance(self.event, ruamel.yaml.events.MappingStartEvent) + ): + value = self._re_repeat_blank_lines.sub("", value) + elif pre: + # preserve content in pre comment with at least one newline, + # but no extra blank lines. + value = self._re_repeat_blank_lines.sub("\n", value) + else: + # single blank lines in post comments + value = self._re_repeat_blank_lines.sub("\n\n", value) + comment.value = value + + # make sure that the eol comment only has one space before it. + if comment.column > self.column + 1 and not pre: + comment.column = self.column + 1 + + return super().write_comment(comment, pre) + + def write_version_directive(self, version_text: Any) -> None: + """Skip writing '%YAML 1.1'.""" + if version_text == "1.1": + return + super().write_version_directive(version_text) + # pylint: disable=too-many-instance-attributes class FormattedYAML(YAML): @@ -304,9 +497,13 @@ def load(self, stream: Path | StreamTextType) -> Any: data = self.load_all(stream=text) except ParserError as ex: data = None - logger.error("Invalid yaml, verify the file contents and try again. %s", ex) # noqa: TRY400 + logger.error( # noqa: TRY400 + "Invalid yaml, verify the file contents and try again. %s", ex + ) except Exception as ex: - print(ex) + logger.error( # noqa: TRY400 + "Yaml load failed with exception: %s", ex + ) if preamble_comment is not None and isinstance( data, CommentedMap | CommentedSeq, @@ -332,6 +529,48 @@ def dumps(self, data: Any) -> str: strip_version_directive=strip_version_directive, ) + def _prevent_wrapping_flow_style(self, data: Any) -> None: + if not isinstance(data, CommentedMap | CommentedSeq): + return + for key, value, parent_path in nested_items_path(data): + if not isinstance(value, CommentedMap | CommentedSeq): + continue + fa: Format = value.fa + if fa.flow_style(): + predicted_indent = self._predict_indent_length(parent_path, key) + predicted_width = len(str(value)) + if predicted_indent + predicted_width > self.width: + # this flow-style map will probably get line-wrapped, + # so, switch it to block style to avoid the line wrap. + fa.set_block_style() + + def _predict_indent_length(self, parent_path: list[str | int], key: Any) -> int: + indent = 0 + + # each parent_key type tells us what the indent is for the next level. + for parent_key in parent_path: + if isinstance(parent_key, int) and indent == 0: + # root level is a sequence + indent += self.sequence_dash_offset + elif isinstance(parent_key, int): + # next level is a sequence + indent += cast(int, self.sequence_indent) + elif isinstance(parent_key, str): + # next level is a map + indent += cast(int, self.map_indent) + + if isinstance(key, int) and indent == 0: + # flow map is an item in a root-level sequence + indent += self.sequence_dash_offset + elif isinstance(key, int) and indent > 0: + # flow map is in a sequence + indent += cast(int, self.sequence_indent) + elif isinstance(key, str): + # flow map is in a map + indent += len(key + ": ") + + return indent + # ruamel.yaml only preserves empty (no whitespace) blank lines # (ie "/n/n" becomes "/n/n" but "/n /n" becomes "/n"). # So, we need to identify whitespace-only lines to drop spaces before reading. @@ -429,5 +668,7 @@ def _post_process_yaml(text: str, *, strip_version_directive: bool = False) -> s # got an empty list item. drop any trailing spaces. lines[i] = line.rstrip() + "\n" - text = "".join(FormattedEmitter.drop_octothorpe_protection(line) for line in lines) + text = "".join( + FormattedEmitter.drop_octothorpe_protection(line) for line in lines + ) return text diff --git a/scripts/gen_ram_slim.py b/scripts/gen_ram_slim.py index f763fcc9..ba760450 100644 --- a/scripts/gen_ram_slim.py +++ b/scripts/gen_ram_slim.py @@ -39,7 +39,7 @@ def gen_slim(self, priority_file=None): for i, f_json in enumerate(files): _name = f_json.split("/collections/findings/")[1].split("/")[0] - print(f"\r[{i+1}/{total}] {_name} ", end="") + print(f"\r[{i + 1}/{total}] {_name} ", end="") findings = Findings.load(fpath=f_json) if not findings: