diff --git a/ansible_risk_insight/finder.py b/ansible_risk_insight/finder.py index e21adcd4..550a4ed2 100644 --- a/ansible_risk_insight/finder.py +++ b/ansible_risk_insight/finder.py @@ -114,7 +114,12 @@ def find_module_name(data_block): return "" -def get_task_blocks(fpath="", yaml_str="", task_dict_list=None): +def get_task_blocks( + fpath="", + yaml_str="", + task_dict_list=None, + jsonpath_prefix="", +): d = None yaml_lines = "" if yaml_str: @@ -143,8 +148,9 @@ def get_task_blocks(fpath="", yaml_str="", task_dict_list=None): if not isinstance(d, list): return None, None tasks = [] - for task_dict in d: - task_dict_loop = flatten_block_tasks(task_dict) + for i, task_dict in enumerate(d): + jsonpath = f"{jsonpath_prefix}.{i}" + task_dict_loop = flatten_block_tasks(task_dict=task_dict, jsonpath_prefix=jsonpath) tasks.extend(task_dict_loop) return tasks, yaml_lines @@ -159,7 +165,7 @@ def get_task_blocks(fpath="", yaml_str="", task_dict_list=None): # - some_module2 # - some_module3 # -def flatten_block_tasks(task_dict, module_defaults={}): +def flatten_block_tasks(task_dict, jsonpath_prefix="", module_defaults={}): if task_dict is None: return [] tasks = [] @@ -176,35 +182,202 @@ def flatten_block_tasks(task_dict, module_defaults={}): # load normal tasks first if "block" in task_dict: + task_jsonpath = jsonpath_prefix + ".block" tasks_in_block = task_dict.get("block", []) if isinstance(tasks_in_block, list): - for t_dict in tasks_in_block: - tasks_in_item = flatten_block_tasks(t_dict, _module_defaults) + for i, t_dict in enumerate(tasks_in_block): + _current_prefix = task_jsonpath + f".{i}" + tasks_in_item = flatten_block_tasks(t_dict, _current_prefix, _module_defaults) tasks.extend(tasks_in_item) else: - tasks = [task_dict] + tasks = [(task_dict, task_jsonpath)] else: - tasks = [task_dict] + task_jsonpath = jsonpath_prefix + tasks = [(task_dict, task_jsonpath)] # then add "rescue" block if "rescue" in task_dict: + task_jsonpath = jsonpath_prefix + ".rescue" tasks_in_rescue = task_dict.get("rescue", []) if isinstance(tasks_in_rescue, list): - for t_dict in tasks_in_rescue: - tasks_in_item = flatten_block_tasks(t_dict, _module_defaults) + for i, t_dict in enumerate(tasks_in_rescue): + _current_prefix = task_jsonpath + f".{i}" + tasks_in_item = flatten_block_tasks(t_dict, _current_prefix, _module_defaults) tasks.extend(tasks_in_item) # finally add "always" block if "always" in task_dict: + task_jsonpath = jsonpath_prefix + ".always" tasks_in_always = task_dict.get("always", []) if isinstance(tasks_in_always, list): - for t_dict in tasks_in_always: - tasks_in_item = flatten_block_tasks(t_dict, _module_defaults) + for i, t_dict in enumerate(tasks_in_always): + _current_prefix = task_jsonpath + f".{i}" + tasks_in_item = flatten_block_tasks(t_dict, _current_prefix, _module_defaults) tasks.extend(tasks_in_item) return tasks +def identify_lines_with_jsonpath(fpath: str = "", yaml_str: str = "", jsonpath: str = "") -> tuple[str, tuple[int, int]]: + if not jsonpath: + return None, None + + d = None + yaml_lines = "" + if yaml_str: + try: + d = yaml.load(yaml_str, Loader=Loader) + yaml_lines = yaml_str + except Exception as e: + logger.debug("failed to load this yaml string to identify lines; {}".format(e.args[0])) + return None, None + elif fpath: + if not os.path.exists(fpath): + return None, None + with open(fpath, "r") as file: + try: + yaml_lines = file.read() + d = yaml.load(yaml_lines, Loader=Loader) + except Exception as e: + logger.debug("failed to load this yaml file to identify lines; {}".format(e.args[0])) + return None, None + if not d: + return None, None + + path_parts = jsonpath.strip(".").split(".") + current_lines = yaml_lines + current_line_num = 1 + line_num_tuple = None + for p in path_parts: + if p == "plays": + pass + elif p in ["pre_tasks", "tasks", "post_tasks", "handlers", "block", "rescue", "always"]: + blocks = find_child_yaml_block(current_lines, key=p, line_num_offset=current_line_num) + current_lines, line_num_tuple = blocks[0] + current_line_num = line_num_tuple[0] + else: + try: + p_num = int(p) + blocks = find_child_yaml_block(current_lines, line_num_offset=current_line_num) + current_lines, line_num_tuple = blocks[p_num] + current_line_num = line_num_tuple[0] + except Exception: + print(p) + pass + return current_lines, line_num_tuple + + +def find_child_yaml_block(yaml_str: str, key: str = "", line_num_offset: int = -1) -> list: + skip_condition_funcs = [lambda x: x.strip().startswith("#"), lambda x: x.strip() == "---"] + + def match_condition_func(x): + if key: + return x.strip().startswith(f"{key}:") + else: + return x.strip().startswith("- ") + + def get_indent_level(x): + return len(x) - len(x.lstrip()) + + top_level_indent = 100 + for line in yaml_str.splitlines(): + skip = False + for skip_cond_func in skip_condition_funcs: + if skip_cond_func(line): + skip = True + break + if skip: + continue + if match_condition_func(line): + current_indent = get_indent_level(line) + if current_indent < top_level_indent: + top_level_indent = current_indent + if top_level_indent == 100: + return [] + + blocks = [] + line_buffer = [] + isolated_line_buffer = [] + buffer_begin = -1 + if key: + for i, line in enumerate(yaml_str.splitlines()): + line_num = i + 1 + current_indent = get_indent_level(line) + if current_indent == top_level_indent: + if line_buffer and not blocks: + block_str = "\n".join(line_buffer) + begin = buffer_begin + end = line_num - 1 + if line_num_offset > 0: + begin += line_num_offset - 1 + end += line_num_offset - 1 + line_num_tuple = (begin, end) + blocks.append((block_str, line_num_tuple)) + if match_condition_func(line): + buffer_begin = line_num + 1 + if buffer_begin > 0 and line_num >= buffer_begin: + line_buffer.append(line) + if line_buffer and not blocks: + block_str = "\n".join(line_buffer) + begin = buffer_begin + end = line_num + if line_num_offset > 0: + begin += line_num_offset - 1 + end += line_num_offset - 1 + line_num_tuple = (begin, end) + blocks.append((block_str, line_num_tuple)) + else: + for i, line in enumerate(yaml_str.splitlines()): + line_num = i + 1 + current_indent = get_indent_level(line) + if current_indent == top_level_indent: + new_block = False + if match_condition_func(line): + skip = False + for skip_cond_func in skip_condition_funcs: + if skip_cond_func(line): + skip = True + break + if not skip: + new_block = True + if new_block: + if line_buffer: + block_str = "" + if isolated_line_buffer: + block_str += "\n".join(isolated_line_buffer) + buffer_begin = 1 + block_str += "\n".join(line_buffer) + begin = buffer_begin + end = line_num - 1 + if line_num_offset > 0: + begin += line_num_offset - 1 + end += line_num_offset - 1 + line_num_tuple = (begin, end) + blocks.append((block_str, line_num_tuple)) + line_buffer = [] + isolated_line_buffer = [] + buffer_begin = line_num + line_buffer.append(line) + else: + if buffer_begin < 0: + isolated_line_buffer.append(line) + else: + line_buffer.append(line) + if line_buffer: + block_str = "" + if isolated_line_buffer: + block_str += "\n".join(isolated_line_buffer) + block_str += "\n".join(line_buffer) + begin = buffer_begin + end = line_num + if line_num_offset > 0: + begin += line_num_offset - 1 + end += line_num_offset - 1 + line_num_tuple = (begin, end) + blocks.append((block_str, line_num_tuple)) + return blocks + + def search_module_files(path, module_dir_paths=[]): file_list = [] # must copy the input here; otherwise, the added items are kept forever @@ -741,36 +914,36 @@ def list_scan_target(root_dir: str, task_num_threshold: int = -1): def update_line_with_space(new_line_content, old_line_content, leading_spaces=0): """ - Returns the line of the input lines with mutation, having spaces - exactly same as input yaml lines + Returns the line of the input lines with mutation, having spaces + exactly same as input yaml lines """ - new_line_content = new_line_content.lstrip(' ') + new_line_content = new_line_content.lstrip(" ") if not leading_spaces: leading_spaces = len(old_line_content) - len(old_line_content.lstrip()) - return ' ' * leading_spaces + new_line_content + return " " * leading_spaces + new_line_content def populate_new_data_list(data, line_number_list): """ - Function to check diff in line between the - first mutated results, and then copy the in - between lines to mutated data lines + Function to check diff in line between the + first mutated results, and then copy the in + between lines to mutated data lines """ input_line_number = 0 for each in line_number_list: input_line_number = int(each.lstrip("L").split("-")[0]) break temp_data = data.splitlines(keepends=True) - return temp_data[0:input_line_number - 1] + return temp_data[0 : input_line_number - 1] def check_and_add_diff_lines(start_line, stop_line, lines, data_copy): """ - Function to check diff in line between the mutated results, - and then copy the in between lines to mutated data lines + Function to check diff in line between the mutated results, + and then copy the in between lines to mutated data lines """ diff_in_line = stop_line - start_line - data_copy.append('\n') + data_copy.append("\n") for i in range(start_line, (start_line + diff_in_line) - 1): line = lines[i] data_copy.append(line) @@ -803,7 +976,7 @@ def update_and_append_new_line(new_line, old_line, leading_spaces, data_copy): def update_the_yaml_target(file_path, line_number_list, new_content_list): try: # Read the original YAML file - with open(file_path, 'r') as file: + with open(file_path, "r") as file: data = file.read() yaml = FormattedYAML( # Ansible only uses YAML 1.1, but others files should use newer 1.2 (ruamel.yaml defaults to 1.2) @@ -845,21 +1018,21 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list): if 0 <= line_number < len(lines): # Preserve the original indentation old_line_content = lines[line_number] - if '---' in old_line_content: + if "---" in old_line_content: continue if new_line_content in old_line_content: leading_spaces = len(lines[line_number]) - len(lines[line_number].lstrip()) temp_content.append(new_line_content) - new_line_content = new_line_content.lstrip(' ') - lines[line_number] = ' ' * leading_spaces + new_line_content + new_line_content = new_line_content.lstrip(" ") + lines[line_number] = " " * leading_spaces + new_line_content data_copy.append(lines[line_number]) else: new_line_key = new_line_content.split(':') new_key = new_line_key[0].strip(' ') for k in range(start, end): if k < len(lines): - old_line_key = lines[k].split(':') - if '---' in old_line_key[0]: + old_line_key = lines[k].split(":") + if "---" in old_line_key[0]: continue old_key = old_line_key[0].strip(' ') if '-' in old_line_key[0] and ':' not in lines[k] and '-' in new_key: @@ -867,10 +1040,10 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list): leading_spaces = len(lines[k]) - len(lines[k].lstrip()) if diff_in_lines > len(lines): for i in range(k, k + diff_in_lines): - if lines[i] == '\n': + if lines[i] == "\n": lines.pop(i - 1) break - elif i < len(lines) and ':' not in lines[i]: + elif i < len(lines) and ":" not in lines[i]: lines.pop(i) else: break @@ -893,12 +1066,12 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list): # and copy the old content that's not updated by ARI mutation data_copy = check_diff_and_copy_olddata_to_newdata(line_number_list, lines, data_copy) # Join the lines back to a single string - updated_data = ''.join(data_copy) + updated_data = "".join(data_copy) # Parse the updated YAML content to ensure it is valid updated_parsed_data = yaml.load(updated_data) # Write the updated YAML content back to the file if updated_parsed_data: - with open(file_path, 'w') as file: + with open(file_path, "w") as file: yaml.dump(updated_parsed_data, file) except Exception as ex: logger.warning("YAML LINES: ARI fix update yaml by lines failed for file: '%s', with error: '%s'", file_path, ex) diff --git a/ansible_risk_insight/model_loader.py b/ansible_risk_insight/model_loader.py index 11c9b0a9..a5bac534 100644 --- a/ansible_risk_insight/model_loader.py +++ b/ansible_risk_insight/model_loader.py @@ -440,9 +440,11 @@ def load_play( if not could_be_play: raise PlaybookFormatError(f"this play block does not have any of the following keywords {play_keywords}; maybe this is not a playbook") + jsonpath = f"$.{index}" pbObj.index = index pbObj.role = role_name pbObj.collection = collection_name + pbObj.jsonpath = jsonpath pbObj.set_key(parent_key, parent_local_key) play_name = data_block.get("name", "") collections_in_play = data_block.get("collections", []) @@ -479,11 +481,12 @@ def load_play( elif k == "pre_tasks": if not isinstance(v, list): continue - task_blocks, _ = get_task_blocks(task_dict_list=v) + jsonpath_prefix = f".plays.{index}.pre_tasks" + task_blocks, _ = get_task_blocks(task_dict_list=v, jsonpath_prefix=jsonpath_prefix) if task_blocks is None: continue last_task_line_num = -1 - for task_dict in task_blocks: + for (task_dict, task_jsonpath) in task_blocks: task_loading["total"] += 1 i = task_count error = None @@ -492,6 +495,7 @@ def load_play( path=path, index=i, task_block_dict=task_dict, + task_jsonpath=task_jsonpath, role_name=role_name, collection_name=collection_name, collections_in_play=collections_in_play, @@ -524,11 +528,12 @@ def load_play( elif k == "tasks": if not isinstance(v, list): continue - task_blocks, _ = get_task_blocks(task_dict_list=v) + jsonpath_prefix = f".plays.{index}.tasks" + task_blocks, _ = get_task_blocks(task_dict_list=v, jsonpath_prefix=jsonpath_prefix) if task_blocks is None: continue last_task_line_num = -1 - for task_dict in task_blocks: + for (task_dict, task_jsonpath) in task_blocks: i = task_count task_loading["total"] += 1 error = None @@ -537,6 +542,7 @@ def load_play( path=path, index=i, task_block_dict=task_dict, + task_jsonpath=task_jsonpath, role_name=role_name, collection_name=collection_name, collections_in_play=collections_in_play, @@ -569,11 +575,12 @@ def load_play( elif k == "post_tasks": if not isinstance(v, list): continue - task_blocks, _ = get_task_blocks(task_dict_list=v) + jsonpath_prefix = f".plays.{index}.post_tasks" + task_blocks, _ = get_task_blocks(task_dict_list=v, jsonpath_prefix=jsonpath_prefix) if task_blocks is None: continue last_task_line_num = -1 - for task_dict in task_blocks: + for (task_dict, task_jsonpath) in task_blocks: i = task_count task_loading["total"] += 1 error = None @@ -582,6 +589,7 @@ def load_play( path=path, index=i, task_block_dict=task_dict, + task_jsonpath=task_jsonpath, role_name=role_name, collection_name=collection_name, collections_in_play=collections_in_play, @@ -614,11 +622,12 @@ def load_play( elif k == "handlers": if not isinstance(v, list): continue - task_blocks, _ = get_task_blocks(task_dict_list=v) + jsonpath_prefix = f".plays.{index}.handlers" + task_blocks, _ = get_task_blocks(task_dict_list=v, jsonpath_prefix=jsonpath_prefix) if task_blocks is None: continue last_task_line_num = -1 - for task_dict in task_blocks: + for (task_dict, task_jsonpath) in task_blocks: i = task_count task_loading["total"] += 1 error = None @@ -627,6 +636,7 @@ def load_play( path=path, index=i, task_block_dict=task_dict, + task_jsonpath=task_jsonpath, role_name=role_name, collection_name=collection_name, collections_in_play=collections_in_play, @@ -1450,6 +1460,7 @@ def load_task( path, index, task_block_dict, + task_jsonpath="", role_name="", collection_name="", collections_in_play=[], @@ -1502,6 +1513,7 @@ def load_task( else: task_options.update({k: v}) + taskObj.jsonpath = task_jsonpath taskObj.set_yaml_lines( fullpath=fullpath, yaml_lines=yaml_lines, @@ -1510,6 +1522,7 @@ def load_task( module_options=module_options, task_options=task_options, previous_task_line=previous_task_line, + jsonpath=task_jsonpath, ) # module_options can be passed as a string like below @@ -1672,7 +1685,7 @@ def load_taskfile(path, yaml_str="", role_name="", collection_name="", basedir=" "errors": [], } last_task_line_num = -1 - for i, t_dict in enumerate(task_dicts): + for i, (t_dict, t_jsonpath) in enumerate(task_dicts): task_loading["total"] += 1 error = None try: @@ -1680,6 +1693,7 @@ def load_taskfile(path, yaml_str="", role_name="", collection_name="", basedir=" fullpath, i, t_dict, + t_jsonpath, role_name, collection_name, yaml_lines=yaml_lines, diff --git a/ansible_risk_insight/models.py b/ansible_risk_insight/models.py index a32b43dd..10f5c19e 100644 --- a/ansible_risk_insight/models.py +++ b/ansible_risk_insight/models.py @@ -44,6 +44,9 @@ equal, recursive_copy_dict, ) +from .finder import ( + identify_lines_with_jsonpath, +) class PlaybookFormatError(Exception): @@ -1100,6 +1103,7 @@ class Task(Object, Resolvable): yaml_lines: str = "" line_num_in_file: list = field(default_factory=list) # [begin, end] + jsonpath: str = "" # FQCN for Module and Role. Or a file path for TaskFile. resolved later resolved_name: str = "" @@ -1110,7 +1114,17 @@ class Task(Object, Resolvable): module_info: dict = field(default_factory=dict) include_info: dict = field(default_factory=dict) - def set_yaml_lines(self, fullpath="", yaml_lines="", task_name="", module_name="", module_options=None, task_options=None, previous_task_line=-1): + def set_yaml_lines( + self, + fullpath="", + yaml_lines="", + task_name="", + module_name="", + module_options=None, + task_options=None, + previous_task_line=-1, + jsonpath="", + ): if not task_name and not module_options: return @@ -1120,6 +1134,13 @@ def set_yaml_lines(self, fullpath="", yaml_lines="", task_name="", module_name=" else: lines = open(fullpath, "r").read().splitlines() + if jsonpath: + found_yaml, line_num = identify_lines_with_jsonpath(fpath=fullpath, yaml_str=yaml_lines, jsonpath=jsonpath) + if found_yaml and line_num: + self.yaml_lines = found_yaml + self.line_num_in_file = list(line_num) + return + # search candidates that match either of the following conditions # - task name is included in the line # - if module name is included, @@ -1273,6 +1294,12 @@ def _find_task_block(self, yaml_lines: list, start_line_num: int): end_found = True end_line_num = index - 1 break + else: + _indent = len(_line) - len(_line.lstrip()) + if _indent <= indent_of_block: + end_found = True + end_line_num = index - 1 + break index += 1 if index >= len(lines): end_found = True @@ -1577,6 +1604,7 @@ def replace_with_dict(self, new_dict: dict): path=self._task_spec.defined_in, index=self._task_spec.index, task_block_dict=new_dict, + task_jsonpath=self._task_spec.jsonpath, role_name=self._task_spec.role, collection_name=self._task_spec.collection, collections_in_play=self._task_spec.collections_in_play, @@ -1996,6 +2024,8 @@ class Play(Object, Resolvable): variables: dict = field(default_factory=dict) vars_files: list = field(default_factory=list) + jsonpath: str = "" + task_loading: dict = field(default_factory=dict) def set_key(self, parent_key="", parent_local_key=""): diff --git a/test/test_scanner.py b/test/test_scanner.py index e3534d54..3929fe0f 100644 --- a/test/test_scanner.py +++ b/test/test_scanner.py @@ -59,16 +59,34 @@ def test_scanner_with_role(type, name): assert result.detail["executed_file"][0] == "/etc/install.sh" -def _scan(type, name): +@pytest.mark.parametrize("type, name", [("playbook", "test/testdata/files/test_line_number.yml")]) +def test_scanner_line_number_detection(type, name): + ari_result, _ = _scan(type=type, name=name, playbook_only=True) + assert ari_result + playbook_result = ari_result.playbook(path=name) + assert playbook_result + task_results = playbook_result.tasks() + expected_line_numbers = [[5, 12], [13, 17], [19, 22], [28, 32]] + for i, task_result in enumerate(task_results.nodes): + assert task_result.node.spec.line_num_in_file + detected = task_result.node.spec.line_num_in_file + assert len(detected) == 2 + expected = expected_line_numbers[i] + assert detected == expected + + +def _scan(type, name, **kwargs): + if not kwargs: + kwargs = {} + kwargs["type"] = type + kwargs["name"] = name + s = ARIScanner( root_dir=config.data_dir, use_ansible_doc=False, read_ram=False, write_ram=False, ) - ari_result = s.evaluate( - type=type, - name=name, - ) + ari_result = s.evaluate(**kwargs) scandata = s.get_last_scandata() return ari_result, scandata