From a4d6ac903aadcb474c936f5dc57c06922388469f Mon Sep 17 00:00:00 2001 From: Sumit Jaiswal Date: Wed, 5 Jun 2024 11:48:00 +0530 Subject: [PATCH] add fix functionality Signed-off-by: Sumit Jaiswal --- .vscode/launch.json | 23 ++++++++ ansible_risk_insight/cli/__init__.py | 30 +++++++++- ansible_risk_insight/finder.py | 82 ++++++++++++++++++++++++++++ 3 files changed, 133 insertions(+), 2 deletions(-) create mode 100644 .vscode/launch.json diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 00000000..22d3ffb1 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,23 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python Debugger: Remote Attach", + "type": "debugpy", + "request": "attach", + "connect": { + "host": "localhost:5678", + "port": 5678 + }, + "pathMappings": [ + { + "localRoot": "${workspaceFolder}", + "remoteRoot": "." + } + ] + } + ] +} \ No newline at end of file diff --git a/ansible_risk_insight/cli/__init__.py b/ansible_risk_insight/cli/__init__.py index a2963804..b09f5bed 100644 --- a/ansible_risk_insight/cli/__init__.py +++ b/ansible_risk_insight/cli/__init__.py @@ -16,6 +16,7 @@ import os import json +import logging import argparse from ..scanner import ARIScanner, config @@ -26,8 +27,11 @@ get_role_metadata, split_name_and_version, ) -from ..finder import list_scan_target +from ..finder import list_scan_target, update_the_yaml_target +logging.basicConfig( + level=os.environ.get('LOGLEVEL', 'WARNING').upper() +) class ARICLI: args = None @@ -70,6 +74,7 @@ def __init__(self): action="store_true", help="if true, do scanning per playbook, role or taskfile (this reduces memory usage while scanning)", ) + parser.add_argument("--fix", action="store_true", help="if true, fix the scanned playbook after performing the inpline replace with ARI suggestions") parser.add_argument( "--task-num-threshold", default="100", @@ -85,6 +90,7 @@ def __init__(self): def run(self): args = self.args + print("ARI args: ", args.target_name) target_name = args.target_name target_version = "" if args.target_type in ["collection", "role"]: @@ -214,9 +220,29 @@ def run(self): for i, fpath in enumerate(list_per_type): index_data[i] = fpath list_file_path = os.path.join(args.out_dir, f"{scan_type}s", "index.json") + logging.info("list_file_path: ", list_file_path) with open(list_file_path, "w") as file: json.dump(index_data, file) - + if args.fix: + for each in index_data.keys(): + ari_suggestion_file_path = os.path.join(args.out_dir, f"{scan_type}s", str(each), "rule_result.json") + logging.info("ARI suggestion file path: %s", ari_suggestion_file_path) + with open(ari_suggestion_file_path) as f: + ari_suggestion_data = json.load(f) + targets = ari_suggestion_data['targets'] + for i in reversed(range(len(targets)-1)): + nodes = targets[i]['nodes'] + for j in reversed(range(len(nodes))): + node_rules = nodes[j]['rules'] + for k in reversed(range(len(node_rules))): + w007_rule = node_rules[k] + if (w007_rule['rule']['rule_id']).lower() == 'w007': + if not w007_rule.get('verdict') and w007_rule: + break + mutated_yaml = w007_rule['detail']['mutated_yaml'] + target_file_path = os.path.join(args.target_name, index_data[each], w007_rule['file'][0]) + line_number = w007_rule['file'][1] + update_the_yaml_target(target_file_path, line_number, mutated_yaml) else: if not silent and not pretty: print("Start preparing dependencies") diff --git a/ansible_risk_insight/finder.py b/ansible_risk_insight/finder.py index 4bdd1653..2f5abf71 100644 --- a/ansible_risk_insight/finder.py +++ b/ansible_risk_insight/finder.py @@ -19,8 +19,15 @@ import re import os import json +import logging import yaml import traceback +from ansible_risk_insight.yaml_utils import FormattedYAML +from ruamel.yaml.comments import CommentedMap, CommentedSeq + +logging.basicConfig( + level=os.environ.get('LOGLEVEL', 'WARNING').upper() +) try: # if `libyaml` is available, use C based loader for performance @@ -731,3 +738,78 @@ def list_scan_target(root_dir: str, task_num_threshold: int = -1): all_targets = sorted(all_targets, key=lambda x: x["filepath"]) all_targets = sorted(all_targets, key=lambda x: x["scan_type"]) return all_targets + + +def check_and_replace(new_data, old_data, replaced=False): + if new_data == old_data: + logging.info("Current file data and ARI mutated data are same!") + return True + if new_data['name'] == old_data['name']: + # each_task = new_parsed_data + replaced = True + return new_data, replaced + + +def update_the_yaml_target(file_path, line_number, new_content): + input_line_number = line_number.lstrip("L").split("-") + logging.info("Target file path: %s", file_path) + logging.info("Target line number: %s", input_line_number) + logging.info("Target new content %s", new_content) + + # Read the original YAML file + with open(file_path, 'r') as file: + data = file.read() + + yaml = FormattedYAML( + # Ansible only uses YAML 1.1, but others files should use newer 1.2 (ruamel.yaml defaults to 1.2) + ) + # Parse the YAML content with preserved formatting + parsed_data = yaml.load(data) + if not isinstance(parsed_data, CommentedMap | CommentedSeq): + # This is an empty vars file or similar which loads as None. + # It is not safe to write this file or data-loss is likely. + # Only maps and sequences can preserve comments. Skip it. + print( + "Ignored reformatting %s because current implementation in ruamel.yaml would drop comments. See https://sourceforge.net/p/ruamel-yaml/tickets/460/", + file, + ) + new_parsed_data = yaml.load(new_content) + if new_parsed_data == parsed_data: + logging.info("Current data and ARI mutated data are same!") + return + if not new_parsed_data: + return + new_parsed_data = new_parsed_data[0] + # variable to keep a check if there's a change in mutated and existing data + no_change = False + # parsed_data = parsed_data[0] + try: + if isinstance(parsed_data, list): + if parsed_data[0].get('tasks'): + tasks = [each_task for each_task in parsed_data[0]['tasks']] + for i in reversed(range(len(tasks))): + each_task = tasks[i] + output = check_and_replace(new_parsed_data, each_task) + if output: + if isinstance(output, tuple): + parsed_data[0]['tasks'][i] = output[0] + break + no_change = True + break + else: + for i in reversed(range(len(parsed_data))): + output = check_and_replace(new_parsed_data, parsed_data[i]) + if output: + if isinstance(output, tuple) and len(output) > 1: + parsed_data[i] = output[0] + break + no_change = True + break + + if not no_change: + with open(file_path, 'w') as file: + yaml.dump(parsed_data, file) + except Exception as ex: + print("ARI fix functionality failed with: %s", ex) + logging.warning("ARI fix functionality failed with: %s", ex) + return