Skip to content

Commit

Permalink
add fix functionality
Browse files Browse the repository at this point in the history
Signed-off-by: Sumit Jaiswal <[email protected]>
  • Loading branch information
justjais committed Jun 5, 2024
1 parent 2a8946e commit a4d6ac9
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 2 deletions.
23 changes: 23 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Remote Attach",
"type": "debugpy",
"request": "attach",
"connect": {
"host": "localhost:5678",
"port": 5678
},
"pathMappings": [
{
"localRoot": "${workspaceFolder}",
"remoteRoot": "."
}
]
}
]
}
30 changes: 28 additions & 2 deletions ansible_risk_insight/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import os
import json
import logging
import argparse

from ..scanner import ARIScanner, config
Expand All @@ -26,8 +27,11 @@
get_role_metadata,
split_name_and_version,
)
from ..finder import list_scan_target
from ..finder import list_scan_target, update_the_yaml_target

logging.basicConfig(
level=os.environ.get('LOGLEVEL', 'WARNING').upper()
)

class ARICLI:
args = None
Expand Down Expand Up @@ -70,6 +74,7 @@ def __init__(self):
action="store_true",
help="if true, do scanning per playbook, role or taskfile (this reduces memory usage while scanning)",
)
parser.add_argument("--fix", action="store_true", help="if true, fix the scanned playbook after performing the inpline replace with ARI suggestions")
parser.add_argument(
"--task-num-threshold",
default="100",
Expand All @@ -85,6 +90,7 @@ def __init__(self):

def run(self):
args = self.args
print("ARI args: ", args.target_name)
target_name = args.target_name
target_version = ""
if args.target_type in ["collection", "role"]:
Expand Down Expand Up @@ -214,9 +220,29 @@ def run(self):
for i, fpath in enumerate(list_per_type):
index_data[i] = fpath
list_file_path = os.path.join(args.out_dir, f"{scan_type}s", "index.json")
logging.info("list_file_path: ", list_file_path)
with open(list_file_path, "w") as file:
json.dump(index_data, file)

if args.fix:
for each in index_data.keys():
ari_suggestion_file_path = os.path.join(args.out_dir, f"{scan_type}s", str(each), "rule_result.json")
logging.info("ARI suggestion file path: %s", ari_suggestion_file_path)
with open(ari_suggestion_file_path) as f:
ari_suggestion_data = json.load(f)
targets = ari_suggestion_data['targets']
for i in reversed(range(len(targets)-1)):
nodes = targets[i]['nodes']
for j in reversed(range(len(nodes))):
node_rules = nodes[j]['rules']
for k in reversed(range(len(node_rules))):
w007_rule = node_rules[k]
if (w007_rule['rule']['rule_id']).lower() == 'w007':
if not w007_rule.get('verdict') and w007_rule:
break
mutated_yaml = w007_rule['detail']['mutated_yaml']
target_file_path = os.path.join(args.target_name, index_data[each], w007_rule['file'][0])
line_number = w007_rule['file'][1]
update_the_yaml_target(target_file_path, line_number, mutated_yaml)
else:
if not silent and not pretty:
print("Start preparing dependencies")
Expand Down
82 changes: 82 additions & 0 deletions ansible_risk_insight/finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@
import re
import os
import json
import logging
import yaml
import traceback
from ansible_risk_insight.yaml_utils import FormattedYAML
from ruamel.yaml.comments import CommentedMap, CommentedSeq

logging.basicConfig(
level=os.environ.get('LOGLEVEL', 'WARNING').upper()
)

try:
# if `libyaml` is available, use C based loader for performance
Expand Down Expand Up @@ -731,3 +738,78 @@ def list_scan_target(root_dir: str, task_num_threshold: int = -1):
all_targets = sorted(all_targets, key=lambda x: x["filepath"])
all_targets = sorted(all_targets, key=lambda x: x["scan_type"])
return all_targets


def check_and_replace(new_data, old_data, replaced=False):
if new_data == old_data:
logging.info("Current file data and ARI mutated data are same!")
return True
if new_data['name'] == old_data['name']:
# each_task = new_parsed_data
replaced = True
return new_data, replaced


def update_the_yaml_target(file_path, line_number, new_content):
input_line_number = line_number.lstrip("L").split("-")
logging.info("Target file path: %s", file_path)
logging.info("Target line number: %s", input_line_number)
logging.info("Target new content %s", new_content)

# Read the original YAML file
with open(file_path, 'r') as file:
data = file.read()

yaml = FormattedYAML(
# Ansible only uses YAML 1.1, but others files should use newer 1.2 (ruamel.yaml defaults to 1.2)
)
# Parse the YAML content with preserved formatting
parsed_data = yaml.load(data)
if not isinstance(parsed_data, CommentedMap | CommentedSeq):
# This is an empty vars file or similar which loads as None.
# It is not safe to write this file or data-loss is likely.
# Only maps and sequences can preserve comments. Skip it.
print(
"Ignored reformatting %s because current implementation in ruamel.yaml would drop comments. See https://sourceforge.net/p/ruamel-yaml/tickets/460/",
file,
)
new_parsed_data = yaml.load(new_content)
if new_parsed_data == parsed_data:
logging.info("Current data and ARI mutated data are same!")
return
if not new_parsed_data:
return
new_parsed_data = new_parsed_data[0]
# variable to keep a check if there's a change in mutated and existing data
no_change = False
# parsed_data = parsed_data[0]
try:
if isinstance(parsed_data, list):
if parsed_data[0].get('tasks'):
tasks = [each_task for each_task in parsed_data[0]['tasks']]
for i in reversed(range(len(tasks))):
each_task = tasks[i]
output = check_and_replace(new_parsed_data, each_task)
if output:
if isinstance(output, tuple):
parsed_data[0]['tasks'][i] = output[0]
break
no_change = True
break
else:
for i in reversed(range(len(parsed_data))):
output = check_and_replace(new_parsed_data, parsed_data[i])
if output:
if isinstance(output, tuple) and len(output) > 1:
parsed_data[i] = output[0]
break
no_change = True
break

if not no_change:
with open(file_path, 'w') as file:
yaml.dump(parsed_data, file)
except Exception as ex:
print("ARI fix functionality failed with: %s", ex)
logging.warning("ARI fix functionality failed with: %s", ex)
return

0 comments on commit a4d6ac9

Please sign in to comment.