From 8961249d0035d5ee0630a1a2dcbd0aa12f9f615a Mon Sep 17 00:00:00 2001 From: Sumit Jaiswal Date: Thu, 13 Jun 2024 09:45:04 +0530 Subject: [PATCH] PR update inline replace '--fix' implementation logic (#245) * update fix logic Signed-off-by: Sumit Jaiswal * fix flake8 issues Signed-off-by: Sumit Jaiswal --------- Signed-off-by: Sumit Jaiswal --- ansible_risk_insight/cli/__init__.py | 17 ++- ansible_risk_insight/finder.py | 182 +++++++++++++++++---------- 2 files changed, 132 insertions(+), 67 deletions(-) diff --git a/ansible_risk_insight/cli/__init__.py b/ansible_risk_insight/cli/__init__.py index 34ad5a02..a669bb38 100644 --- a/ansible_risk_insight/cli/__init__.py +++ b/ansible_risk_insight/cli/__init__.py @@ -234,9 +234,12 @@ def run(self): for i in reversed(range(len(targets))): logger.debug("Nodes dir number: %s", i) nodes = targets[i]['nodes'] - for j in reversed(range(1, len(nodes))): + line_number_list = [] + mutated_yaml_list = [] + target_file_path = '' + for j in range(1, len(nodes)): node_rules = nodes[j]['rules'] - for k in reversed(range(len(node_rules))): + for k in range(len(node_rules)): w007_rule = node_rules[k] if (w007_rule['rule']['rule_id']).lower() == 'w007': if not w007_rule.get('verdict') and w007_rule: @@ -244,14 +247,20 @@ def run(self): mutated_yaml = w007_rule['detail']['mutated_yaml'] if mutated_yaml == '': break + mutated_yaml_list.append(mutated_yaml) if w007_rule['file'][0] not in index_data[each]: target_file_path = os.path.join(args.target_name, index_data[each], w007_rule['file'][0]) else: target_file_path = os.path.join(args.target_name, index_data[each]) - target_file_path = os.path.join(args.target_name, index_data[each], w007_rule['file'][0]) line_number = w007_rule['file'][1] - update_the_yaml_target(target_file_path, line_number, mutated_yaml) + line_number_list.append(line_number) break # w007 rule with mutated yaml is processed, breaking out of iteration + try: + if target_file_path == '' or not mutated_yaml_list or not line_number_list: + continue + update_the_yaml_target(target_file_path, line_number_list, mutated_yaml_list) + except Exception as ex: + logger.warning("ARI inline replace mutation failed with exception: %s", ex) else: if not silent and not pretty: print("Start preparing dependencies") diff --git a/ansible_risk_insight/finder.py b/ansible_risk_insight/finder.py index 58b72702..67391743 100644 --- a/ansible_risk_insight/finder.py +++ b/ansible_risk_insight/finder.py @@ -22,7 +22,6 @@ import yaml import traceback from ansible_risk_insight.yaml_utils import FormattedYAML -from ruamel.yaml.comments import CommentedMap, CommentedSeq try: # if `libyaml` is available, use C based loader for performance @@ -740,74 +739,131 @@ def list_scan_target(root_dir: str, task_num_threshold: int = -1): return all_targets -def check_and_replace(new_data, old_data, replaced=False): - if new_data == old_data: - logger.info("Current file data and ARI mutated data are same!") - return True - if new_data['name'] == old_data['name']: - replaced = True - return new_data, replaced - - -def update_the_yaml_target(file_path, line_number, new_content): - input_line_number = line_number.lstrip("L").split("-") - logger.debug("Target file path: %s", file_path) - logger.debug("Target line number: %s", input_line_number) - logger.debug("Target new content %s", new_content) +def update_line_logic(new_line_content, old_line_content, leading_spaces=0): + """ + Returns the line of the input lines with mutation, having spaces + exactly same as input yaml lines + """ + new_line_content = new_line_content.lstrip(' ') + if not leading_spaces: + leading_spaces = len(old_line_content) - len(old_line_content.lstrip()) + return ' ' * leading_spaces + new_line_content + + +def populate_new_data_list(data, line_number_list): + """ + Function to check diff in line between the + first mutated results, and then copy the in + between lines to mutated data lines + """ + input_line_number = 0 + for each in line_number_list: + input_line_number = int(each.lstrip("L").split("-")[0]) + break + temp_data = data.splitlines(keepends=True) + return temp_data[0:input_line_number - 1] + + +def check_and_add_diff_lines(start_line, stop_line, lines, data_copy): + """ + Function to check diff in line between the mutated results, + and then copy the in between lines to mutated data lines + """ + diff_in_line = stop_line - start_line + data_copy.append('\n') + for i in range(start_line, (start_line + diff_in_line) - 1): + line = lines[i] + data_copy.append(line) + + +def update_the_yaml_target(file_path, line_number_list, new_content_list): try: # Read the original YAML file with open(file_path, 'r') as file: data = file.read() - yaml = FormattedYAML( # Ansible only uses YAML 1.1, but others files should use newer 1.2 (ruamel.yaml defaults to 1.2) ) - # Parse the YAML content with preserved formatting - parsed_data = yaml.load(data) - if not isinstance(parsed_data, CommentedMap | CommentedSeq): - # This is an empty vars file or similar which loads as None. - # It is not safe to write this file or data-loss is likely. - # Only maps and sequences can preserve comments. Skip it. - print( - "Ignored reformatting %s because current implementation in ruamel.yaml would drop comments." - + " See https://sourceforge.net/p/ruamel-yaml/tickets/460/", - file, - ) - new_parsed_data = yaml.load(new_content) - if new_parsed_data == parsed_data: - logger.info("Current data and ARI mutated data are same!") - return - if not new_parsed_data: - return - new_parsed_data = new_parsed_data[0] - # variable to keep a check if there's a change in mutated and existing data - no_change = False - - if isinstance(parsed_data, list): - if parsed_data[0].get('tasks'): - tasks = [each_task for each_task in parsed_data[0]['tasks']] - for i in reversed(range(len(tasks))): - each_task = tasks[i] - output = check_and_replace(new_parsed_data, each_task) - if output: - if isinstance(output, tuple): - parsed_data[0]['tasks'][i] = output[0] - break - no_change = True - break - else: - for i in reversed(range(len(parsed_data))): - output = check_and_replace(new_parsed_data, parsed_data[i]) - if output: - if isinstance(output, tuple) and len(output) > 1: - parsed_data[i] = output[0] - break - no_change = True - break - - if not no_change: + data_copy = populate_new_data_list(data, line_number_list) + stop_line_number = 0 + for iter in range(len(line_number_list)): + line_number = line_number_list[iter] + new_content = new_content_list[iter] + input_line_number = line_number.lstrip("L").split("-") + lines = data.splitlines(keepends=True) + new_lines = new_content.splitlines(keepends=True) + # Update the specific line with new content + j = 0 + start_line_number = int(input_line_number[0]) + if stop_line_number > 0 and (start_line_number - stop_line_number) > 1: + check_and_add_diff_lines(stop_line_number, start_line_number, lines, data_copy) + stop_line_number = int(input_line_number[1]) + diff_in_lines = stop_line_number - start_line_number + temp_content = [] + data_copy.append('\n') + for i in range(start_line_number - 1, stop_line_number - 1): + line_number = i + if len(lines) == i or j >= len(new_lines): + break + new_line_content = new_lines[j] + if 0 <= line_number < len(lines): + # Preserve the original indentation + old_line_content = lines[line_number] + if '---' in old_line_content: + continue + if new_line_content in old_line_content: + leading_spaces = len(lines[line_number]) - len(lines[line_number].lstrip()) + temp_content.append(new_line_content) + new_line_content = new_line_content.lstrip(' ') + lines[line_number] = ' ' * leading_spaces + new_line_content + data_copy.append(lines[line_number]) + else: + new_line_key = new_line_content.split(':') + for k in range(start_line_number - 1, stop_line_number - 1): + if k < len(lines): + old_line_key = lines[k].split(':') + if '---' in old_line_key[0]: + continue + old_key = old_line_key[0].strip(' ') + new_key = new_line_key[0].strip(' ') + if '-' in old_line_key[0] and ':' not in lines[k]: + # diff_in_lines = len(lines) - len(new_lines) + leading_spaces = len(lines[k]) - len(lines[k].lstrip()) + if diff_in_lines > len(lines): + for i in range(k, k + diff_in_lines): + if lines[i] == '\n': + lines.pop(i - 1) + break + elif i < len(lines) and ':' not in lines[i]: + lines.pop(i) + else: + break + lines[k] = update_line_logic(new_line_content, lines[k], leading_spaces) + data_copy.append(lines[k]) + break + elif old_key == new_key: + lines[k] = update_line_logic(new_line_content, lines[k]) + data_copy.append(lines[k]) + break + elif old_key.rstrip('\n') == new_key: + lines[k] = update_line_logic(new_line_content, lines[k]) + data_copy.append(lines[k]) + break + elif old_key.rstrip('\n') in new_key.split('.'): + lines[k] = update_line_logic(new_line_content, lines[k]) + data_copy.append(lines[k]) + break + else: + return IndexError("Line number out of range.") + j += 1 + # Join the lines back to a single string + updated_data = ''.join(data_copy) + # Parse the updated YAML content to ensure it is valid + updated_parsed_data = yaml.load(updated_data) + # Write the updated YAML content back to the file + if updated_parsed_data: with open(file_path, 'w') as file: - yaml.dump(parsed_data, file) + yaml.dump(updated_parsed_data, file) except Exception as ex: - logger.warning("ARI yaml update fix functionality failed with: %s for file: %s", ex, file_path) - return + logger.warning("YAML LINES: ARI fix update yaml by lines failed for file: '%s', with error: '%s'", file_path, ex) + raise ex