Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR to push implementation Inline replace functionality from ARI #238

Merged
merged 9 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Remote Attach",
"type": "debugpy",
"request": "attach",
"connect": {
"host": "localhost",
"port": 5678
},
"justMyCode": false
},
]
}
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,25 @@ git clone [email protected]:ansible/ansible-risk-insight.git
cd ansible-risk-insight
pip install -e .
```

## Debugging ARI over VSCode (for development)

ARI can be debugged using VSCode. Steps to start debugging:

Step 1: Please add below line to file that needs to be debugged:
```
import debugpy
debugpy.listen(5678)
debugpy.wait_for_client()
```
Step 2: Fire the ARI command via cli command to run the ARI, ref as:
```
(.env) ➜ ari project <GH repository> --out-dir /tmp/CS --save-only-rule-result --scan-per-target --task-num-threshold 100 --fix
0.00s - Debugger warning: It seems that frozen modules are being used, which may
0.00s - make the debugger miss breakpoints. Please pass -Xfrozen_modules=off
0.00s - to python to disable frozen modules.
0.00s - Note: Debugging will proceed. Set PYDEVD_DISABLE_FILE_VALIDATION=1 to disable this validation.
```
Note: If you want to disable the validation warning, please set `PYDEVD_DISABLE_FILE_VALIDATION=1` under your enviornment.

Step 3: From VSCode, click `Run->Start Debugging`, debugger should stop at the breakpoints placed inside the ARI code.
39 changes: 37 additions & 2 deletions ansible_risk_insight/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
get_role_metadata,
split_name_and_version,
)
from ..finder import list_scan_target
from ..finder import list_scan_target, update_the_yaml_target
import ansible_risk_insight.logger as logger


class ARICLI:
Expand Down Expand Up @@ -70,6 +71,11 @@ def __init__(self):
action="store_true",
help="if true, do scanning per playbook, role or taskfile (this reduces memory usage while scanning)",
)
parser.add_argument(
"--fix",
action="store_true",
help="if true, fix the scanned playbook after performing the inpline replace with ARI suggestions"
)
parser.add_argument(
"--task-num-threshold",
default="100",
Expand All @@ -85,6 +91,7 @@ def __init__(self):

def run(self):
args = self.args
print("ARI args: ", args.target_name)
target_name = args.target_name
target_version = ""
if args.target_type in ["collection", "role"]:
Expand Down Expand Up @@ -214,9 +221,37 @@ def run(self):
for i, fpath in enumerate(list_per_type):
index_data[i] = fpath
list_file_path = os.path.join(args.out_dir, f"{scan_type}s", "index.json")
logger.debug("list_file_path: ", list_file_path)
with open(list_file_path, "w") as file:
json.dump(index_data, file)

if args.fix:
for each in index_data.keys():
ari_suggestion_file_path = os.path.join(args.out_dir, f"{scan_type}s", str(each), "rule_result.json")
logger.debug("ARI suggestion file path: %s", ari_suggestion_file_path)
with open(ari_suggestion_file_path) as f:
ari_suggestion_data = json.load(f)
targets = ari_suggestion_data['targets']
for i in reversed(range(len(targets))):
logger.debug("Nodes dir number: %s", i)
nodes = targets[i]['nodes']
for j in reversed(range(1, len(nodes))):
node_rules = nodes[j]['rules']
for k in reversed(range(len(node_rules))):
w007_rule = node_rules[k]
if (w007_rule['rule']['rule_id']).lower() == 'w007':
if not w007_rule.get('verdict') and w007_rule:
break
mutated_yaml = w007_rule['detail']['mutated_yaml']
if mutated_yaml == '':
break
if w007_rule['file'][0] not in index_data[each]:
target_file_path = os.path.join(args.target_name, index_data[each], w007_rule['file'][0])
else:
target_file_path = os.path.join(args.target_name, index_data[each])
target_file_path = os.path.join(args.target_name, index_data[each], w007_rule['file'][0])
line_number = w007_rule['file'][1]
update_the_yaml_target(target_file_path, line_number, mutated_yaml)
break # w007 rule with mutated yaml is processed, breaking out of iteration
else:
if not silent and not pretty:
print("Start preparing dependencies")
Expand Down
75 changes: 75 additions & 0 deletions ansible_risk_insight/finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import json
import yaml
import traceback
from ansible_risk_insight.yaml_utils import FormattedYAML
from ruamel.yaml.comments import CommentedMap, CommentedSeq

try:
# if `libyaml` is available, use C based loader for performance
Expand Down Expand Up @@ -731,3 +733,76 @@ def list_scan_target(root_dir: str, task_num_threshold: int = -1):
all_targets = sorted(all_targets, key=lambda x: x["filepath"])
all_targets = sorted(all_targets, key=lambda x: x["scan_type"])
return all_targets


def check_and_replace(new_data, old_data, replaced=False):
if new_data == old_data:
logger.info("Current file data and ARI mutated data are same!")
return True
if new_data['name'] == old_data['name']:
replaced = True
return new_data, replaced


def update_the_yaml_target(file_path, line_number, new_content):
input_line_number = line_number.lstrip("L").split("-")
logger.debug("Target file path: %s", file_path)
logger.debug("Target line number: %s", input_line_number)
logger.debug("Target new content %s", new_content)
try:
# Read the original YAML file
with open(file_path, 'r') as file:
data = file.read()

yaml = FormattedYAML(
# Ansible only uses YAML 1.1, but others files should use newer 1.2 (ruamel.yaml defaults to 1.2)
)
# Parse the YAML content with preserved formatting
parsed_data = yaml.load(data)
if not isinstance(parsed_data, CommentedMap | CommentedSeq):
# This is an empty vars file or similar which loads as None.
# It is not safe to write this file or data-loss is likely.
# Only maps and sequences can preserve comments. Skip it.
print(
"Ignored reformatting %s because current implementation in ruamel.yaml would drop comments."
+ " See https://sourceforge.net/p/ruamel-yaml/tickets/460/",
file,
)
new_parsed_data = yaml.load(new_content)
if new_parsed_data == parsed_data:
logger.info("Current data and ARI mutated data are same!")
return
if not new_parsed_data:
return
new_parsed_data = new_parsed_data[0]
# variable to keep a check if there's a change in mutated and existing data
no_change = False

if isinstance(parsed_data, list):
if parsed_data[0].get('tasks'):
tasks = [each_task for each_task in parsed_data[0]['tasks']]
for i in reversed(range(len(tasks))):
each_task = tasks[i]
output = check_and_replace(new_parsed_data, each_task)
if output:
if isinstance(output, tuple):
parsed_data[0]['tasks'][i] = output[0]
break
no_change = True
break
else:
for i in reversed(range(len(parsed_data))):
output = check_and_replace(new_parsed_data, parsed_data[i])
if output:
if isinstance(output, tuple) and len(output) > 1:
parsed_data[i] = output[0]
break
no_change = True
break

if not no_change:
with open(file_path, 'w') as file:
yaml.dump(parsed_data, file)
except Exception as ex:
logger.warning("ARI yaml update fix functionality failed with: %s for file: %s", ex, file_path)
return
Loading
Loading