Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR update inline replace '--fix' implementation logic #245

Merged
merged 2 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions ansible_risk_insight/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,24 +234,33 @@ def run(self):
for i in reversed(range(len(targets))):
logger.debug("Nodes dir number: %s", i)
nodes = targets[i]['nodes']
for j in reversed(range(1, len(nodes))):
line_number_list = []
mutated_yaml_list = []
target_file_path = ''
for j in range(1, len(nodes)):
node_rules = nodes[j]['rules']
for k in reversed(range(len(node_rules))):
for k in range(len(node_rules)):
w007_rule = node_rules[k]
if (w007_rule['rule']['rule_id']).lower() == 'w007':
if not w007_rule.get('verdict') and w007_rule:
break
mutated_yaml = w007_rule['detail']['mutated_yaml']
if mutated_yaml == '':
break
mutated_yaml_list.append(mutated_yaml)
if w007_rule['file'][0] not in index_data[each]:
target_file_path = os.path.join(args.target_name, index_data[each], w007_rule['file'][0])
else:
target_file_path = os.path.join(args.target_name, index_data[each])
target_file_path = os.path.join(args.target_name, index_data[each], w007_rule['file'][0])
line_number = w007_rule['file'][1]
update_the_yaml_target(target_file_path, line_number, mutated_yaml)
line_number_list.append(line_number)
break # w007 rule with mutated yaml is processed, breaking out of iteration
try:
if target_file_path == '' or not mutated_yaml_list or not line_number_list:
continue
update_the_yaml_target(target_file_path, line_number_list, mutated_yaml_list)
except Exception as ex:
logger.warning("ARI inline replace mutation failed with exception: %s", ex)
else:
if not silent and not pretty:
print("Start preparing dependencies")
Expand Down
182 changes: 119 additions & 63 deletions ansible_risk_insight/finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import yaml
import traceback
from ansible_risk_insight.yaml_utils import FormattedYAML
from ruamel.yaml.comments import CommentedMap, CommentedSeq

try:
# if `libyaml` is available, use C based loader for performance
Expand Down Expand Up @@ -744,74 +743,131 @@ def list_scan_target(root_dir: str, task_num_threshold: int = -1):
return all_targets


def check_and_replace(new_data, old_data, replaced=False):
if new_data == old_data:
logger.info("Current file data and ARI mutated data are same!")
return True
if new_data['name'] == old_data['name']:
replaced = True
return new_data, replaced


def update_the_yaml_target(file_path, line_number, new_content):
input_line_number = line_number.lstrip("L").split("-")
logger.debug("Target file path: %s", file_path)
logger.debug("Target line number: %s", input_line_number)
logger.debug("Target new content %s", new_content)
def update_line_logic(new_line_content, old_line_content, leading_spaces=0):
"""
Returns the line of the input lines with mutation, having spaces
exactly same as input yaml lines
"""
new_line_content = new_line_content.lstrip(' ')
if not leading_spaces:
leading_spaces = len(old_line_content) - len(old_line_content.lstrip())
return ' ' * leading_spaces + new_line_content


def populate_new_data_list(data, line_number_list):
"""
Function to check diff in line between the
first mutated results, and then copy the in
between lines to mutated data lines
"""
input_line_number = 0
for each in line_number_list:
input_line_number = int(each.lstrip("L").split("-")[0])
break
temp_data = data.splitlines(keepends=True)
return temp_data[0:input_line_number - 1]


def check_and_add_diff_lines(start_line, stop_line, lines, data_copy):
"""
Function to check diff in line between the mutated results,
and then copy the in between lines to mutated data lines
"""
diff_in_line = stop_line - start_line
data_copy.append('\n')
for i in range(start_line, (start_line + diff_in_line) - 1):
line = lines[i]
data_copy.append(line)


def update_the_yaml_target(file_path, line_number_list, new_content_list):
try:
# Read the original YAML file
with open(file_path, 'r') as file:
data = file.read()

yaml = FormattedYAML(
# Ansible only uses YAML 1.1, but others files should use newer 1.2 (ruamel.yaml defaults to 1.2)
)
# Parse the YAML content with preserved formatting
parsed_data = yaml.load(data)
if not isinstance(parsed_data, CommentedMap | CommentedSeq):
# This is an empty vars file or similar which loads as None.
# It is not safe to write this file or data-loss is likely.
# Only maps and sequences can preserve comments. Skip it.
print(
"Ignored reformatting %s because current implementation in ruamel.yaml would drop comments."
+ " See https://sourceforge.net/p/ruamel-yaml/tickets/460/",
file,
)
new_parsed_data = yaml.load(new_content)
if new_parsed_data == parsed_data:
logger.info("Current data and ARI mutated data are same!")
return
if not new_parsed_data:
return
new_parsed_data = new_parsed_data[0]
# variable to keep a check if there's a change in mutated and existing data
no_change = False

if isinstance(parsed_data, list):
if parsed_data[0].get('tasks'):
tasks = [each_task for each_task in parsed_data[0]['tasks']]
for i in reversed(range(len(tasks))):
each_task = tasks[i]
output = check_and_replace(new_parsed_data, each_task)
if output:
if isinstance(output, tuple):
parsed_data[0]['tasks'][i] = output[0]
break
no_change = True
break
else:
for i in reversed(range(len(parsed_data))):
output = check_and_replace(new_parsed_data, parsed_data[i])
if output:
if isinstance(output, tuple) and len(output) > 1:
parsed_data[i] = output[0]
break
no_change = True
break

if not no_change:
data_copy = populate_new_data_list(data, line_number_list)
stop_line_number = 0
for iter in range(len(line_number_list)):
line_number = line_number_list[iter]
new_content = new_content_list[iter]
input_line_number = line_number.lstrip("L").split("-")
lines = data.splitlines(keepends=True)
new_lines = new_content.splitlines(keepends=True)
# Update the specific line with new content
j = 0
start_line_number = int(input_line_number[0])
if stop_line_number > 0 and (start_line_number - stop_line_number) > 1:
check_and_add_diff_lines(stop_line_number, start_line_number, lines, data_copy)
stop_line_number = int(input_line_number[1])
diff_in_lines = stop_line_number - start_line_number
temp_content = []
data_copy.append('\n')
for i in range(start_line_number - 1, stop_line_number - 1):
line_number = i
if len(lines) == i or j >= len(new_lines):
break
new_line_content = new_lines[j]
if 0 <= line_number < len(lines):
# Preserve the original indentation
old_line_content = lines[line_number]
if '---' in old_line_content:
continue
if new_line_content in old_line_content:
leading_spaces = len(lines[line_number]) - len(lines[line_number].lstrip())
temp_content.append(new_line_content)
new_line_content = new_line_content.lstrip(' ')
lines[line_number] = ' ' * leading_spaces + new_line_content
data_copy.append(lines[line_number])
else:
new_line_key = new_line_content.split(':')
for k in range(start_line_number - 1, stop_line_number - 1):
if k < len(lines):
old_line_key = lines[k].split(':')
if '---' in old_line_key[0]:
continue
old_key = old_line_key[0].strip(' ')
new_key = new_line_key[0].strip(' ')
if '-' in old_line_key[0] and ':' not in lines[k]:
# diff_in_lines = len(lines) - len(new_lines)
leading_spaces = len(lines[k]) - len(lines[k].lstrip())
if diff_in_lines > len(lines):
for i in range(k, k + diff_in_lines):
if lines[i] == '\n':
lines.pop(i - 1)
break
elif i < len(lines) and ':' not in lines[i]:
lines.pop(i)
else:
break
lines[k] = update_line_logic(new_line_content, lines[k], leading_spaces)
data_copy.append(lines[k])
break
elif old_key == new_key:
lines[k] = update_line_logic(new_line_content, lines[k])
data_copy.append(lines[k])
break
elif old_key.rstrip('\n') == new_key:
lines[k] = update_line_logic(new_line_content, lines[k])
data_copy.append(lines[k])
break
elif old_key.rstrip('\n') in new_key.split('.'):
lines[k] = update_line_logic(new_line_content, lines[k])
data_copy.append(lines[k])
break
else:
return IndexError("Line number out of range.")
j += 1
# Join the lines back to a single string
updated_data = ''.join(data_copy)
# Parse the updated YAML content to ensure it is valid
updated_parsed_data = yaml.load(updated_data)
# Write the updated YAML content back to the file
if updated_parsed_data:
with open(file_path, 'w') as file:
yaml.dump(parsed_data, file)
yaml.dump(updated_parsed_data, file)
except Exception as ex:
logger.warning("ARI yaml update fix functionality failed with: %s for file: %s", ex, file_path)
return
logger.warning("YAML LINES: ARI fix update yaml by lines failed for file: '%s', with error: '%s'", file_path, ex)
raise ex
Loading