Skip to content

Commit

Permalink
PR update inline replace '--fix' implementation logic (#245)
Browse files Browse the repository at this point in the history
* update fix logic

Signed-off-by: Sumit Jaiswal <[email protected]>

* fix flake8 issues

Signed-off-by: Sumit Jaiswal <[email protected]>

---------

Signed-off-by: Sumit Jaiswal <[email protected]>
  • Loading branch information
justjais authored Jun 13, 2024
1 parent b372778 commit 8961249
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 67 deletions.
17 changes: 13 additions & 4 deletions ansible_risk_insight/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,24 +234,33 @@ def run(self):
for i in reversed(range(len(targets))):
logger.debug("Nodes dir number: %s", i)
nodes = targets[i]['nodes']
for j in reversed(range(1, len(nodes))):
line_number_list = []
mutated_yaml_list = []
target_file_path = ''
for j in range(1, len(nodes)):
node_rules = nodes[j]['rules']
for k in reversed(range(len(node_rules))):
for k in range(len(node_rules)):
w007_rule = node_rules[k]
if (w007_rule['rule']['rule_id']).lower() == 'w007':
if not w007_rule.get('verdict') and w007_rule:
break
mutated_yaml = w007_rule['detail']['mutated_yaml']
if mutated_yaml == '':
break
mutated_yaml_list.append(mutated_yaml)
if w007_rule['file'][0] not in index_data[each]:
target_file_path = os.path.join(args.target_name, index_data[each], w007_rule['file'][0])
else:
target_file_path = os.path.join(args.target_name, index_data[each])
target_file_path = os.path.join(args.target_name, index_data[each], w007_rule['file'][0])
line_number = w007_rule['file'][1]
update_the_yaml_target(target_file_path, line_number, mutated_yaml)
line_number_list.append(line_number)
break # w007 rule with mutated yaml is processed, breaking out of iteration
try:
if target_file_path == '' or not mutated_yaml_list or not line_number_list:
continue
update_the_yaml_target(target_file_path, line_number_list, mutated_yaml_list)
except Exception as ex:
logger.warning("ARI inline replace mutation failed with exception: %s", ex)
else:
if not silent and not pretty:
print("Start preparing dependencies")
Expand Down
182 changes: 119 additions & 63 deletions ansible_risk_insight/finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import yaml
import traceback
from ansible_risk_insight.yaml_utils import FormattedYAML
from ruamel.yaml.comments import CommentedMap, CommentedSeq

try:
# if `libyaml` is available, use C based loader for performance
Expand Down Expand Up @@ -740,74 +739,131 @@ def list_scan_target(root_dir: str, task_num_threshold: int = -1):
return all_targets


def check_and_replace(new_data, old_data, replaced=False):
if new_data == old_data:
logger.info("Current file data and ARI mutated data are same!")
return True
if new_data['name'] == old_data['name']:
replaced = True
return new_data, replaced


def update_the_yaml_target(file_path, line_number, new_content):
input_line_number = line_number.lstrip("L").split("-")
logger.debug("Target file path: %s", file_path)
logger.debug("Target line number: %s", input_line_number)
logger.debug("Target new content %s", new_content)
def update_line_logic(new_line_content, old_line_content, leading_spaces=0):
"""
Returns the line of the input lines with mutation, having spaces
exactly same as input yaml lines
"""
new_line_content = new_line_content.lstrip(' ')
if not leading_spaces:
leading_spaces = len(old_line_content) - len(old_line_content.lstrip())
return ' ' * leading_spaces + new_line_content


def populate_new_data_list(data, line_number_list):
"""
Function to check diff in line between the
first mutated results, and then copy the in
between lines to mutated data lines
"""
input_line_number = 0
for each in line_number_list:
input_line_number = int(each.lstrip("L").split("-")[0])
break
temp_data = data.splitlines(keepends=True)
return temp_data[0:input_line_number - 1]


def check_and_add_diff_lines(start_line, stop_line, lines, data_copy):
"""
Function to check diff in line between the mutated results,
and then copy the in between lines to mutated data lines
"""
diff_in_line = stop_line - start_line
data_copy.append('\n')
for i in range(start_line, (start_line + diff_in_line) - 1):
line = lines[i]
data_copy.append(line)


def update_the_yaml_target(file_path, line_number_list, new_content_list):
try:
# Read the original YAML file
with open(file_path, 'r') as file:
data = file.read()

yaml = FormattedYAML(
# Ansible only uses YAML 1.1, but others files should use newer 1.2 (ruamel.yaml defaults to 1.2)
)
# Parse the YAML content with preserved formatting
parsed_data = yaml.load(data)
if not isinstance(parsed_data, CommentedMap | CommentedSeq):
# This is an empty vars file or similar which loads as None.
# It is not safe to write this file or data-loss is likely.
# Only maps and sequences can preserve comments. Skip it.
print(
"Ignored reformatting %s because current implementation in ruamel.yaml would drop comments."
+ " See https://sourceforge.net/p/ruamel-yaml/tickets/460/",
file,
)
new_parsed_data = yaml.load(new_content)
if new_parsed_data == parsed_data:
logger.info("Current data and ARI mutated data are same!")
return
if not new_parsed_data:
return
new_parsed_data = new_parsed_data[0]
# variable to keep a check if there's a change in mutated and existing data
no_change = False

if isinstance(parsed_data, list):
if parsed_data[0].get('tasks'):
tasks = [each_task for each_task in parsed_data[0]['tasks']]
for i in reversed(range(len(tasks))):
each_task = tasks[i]
output = check_and_replace(new_parsed_data, each_task)
if output:
if isinstance(output, tuple):
parsed_data[0]['tasks'][i] = output[0]
break
no_change = True
break
else:
for i in reversed(range(len(parsed_data))):
output = check_and_replace(new_parsed_data, parsed_data[i])
if output:
if isinstance(output, tuple) and len(output) > 1:
parsed_data[i] = output[0]
break
no_change = True
break

if not no_change:
data_copy = populate_new_data_list(data, line_number_list)
stop_line_number = 0
for iter in range(len(line_number_list)):
line_number = line_number_list[iter]
new_content = new_content_list[iter]
input_line_number = line_number.lstrip("L").split("-")
lines = data.splitlines(keepends=True)
new_lines = new_content.splitlines(keepends=True)
# Update the specific line with new content
j = 0
start_line_number = int(input_line_number[0])
if stop_line_number > 0 and (start_line_number - stop_line_number) > 1:
check_and_add_diff_lines(stop_line_number, start_line_number, lines, data_copy)
stop_line_number = int(input_line_number[1])
diff_in_lines = stop_line_number - start_line_number
temp_content = []
data_copy.append('\n')
for i in range(start_line_number - 1, stop_line_number - 1):
line_number = i
if len(lines) == i or j >= len(new_lines):
break
new_line_content = new_lines[j]
if 0 <= line_number < len(lines):
# Preserve the original indentation
old_line_content = lines[line_number]
if '---' in old_line_content:
continue
if new_line_content in old_line_content:
leading_spaces = len(lines[line_number]) - len(lines[line_number].lstrip())
temp_content.append(new_line_content)
new_line_content = new_line_content.lstrip(' ')
lines[line_number] = ' ' * leading_spaces + new_line_content
data_copy.append(lines[line_number])
else:
new_line_key = new_line_content.split(':')
for k in range(start_line_number - 1, stop_line_number - 1):
if k < len(lines):
old_line_key = lines[k].split(':')
if '---' in old_line_key[0]:
continue
old_key = old_line_key[0].strip(' ')
new_key = new_line_key[0].strip(' ')
if '-' in old_line_key[0] and ':' not in lines[k]:
# diff_in_lines = len(lines) - len(new_lines)
leading_spaces = len(lines[k]) - len(lines[k].lstrip())
if diff_in_lines > len(lines):
for i in range(k, k + diff_in_lines):
if lines[i] == '\n':
lines.pop(i - 1)
break
elif i < len(lines) and ':' not in lines[i]:
lines.pop(i)
else:
break
lines[k] = update_line_logic(new_line_content, lines[k], leading_spaces)
data_copy.append(lines[k])
break
elif old_key == new_key:
lines[k] = update_line_logic(new_line_content, lines[k])
data_copy.append(lines[k])
break
elif old_key.rstrip('\n') == new_key:
lines[k] = update_line_logic(new_line_content, lines[k])
data_copy.append(lines[k])
break
elif old_key.rstrip('\n') in new_key.split('.'):
lines[k] = update_line_logic(new_line_content, lines[k])
data_copy.append(lines[k])
break
else:
return IndexError("Line number out of range.")
j += 1
# Join the lines back to a single string
updated_data = ''.join(data_copy)
# Parse the updated YAML content to ensure it is valid
updated_parsed_data = yaml.load(updated_data)
# Write the updated YAML content back to the file
if updated_parsed_data:
with open(file_path, 'w') as file:
yaml.dump(parsed_data, file)
yaml.dump(updated_parsed_data, file)
except Exception as ex:
logger.warning("ARI yaml update fix functionality failed with: %s for file: %s", ex, file_path)
return
logger.warning("YAML LINES: ARI fix update yaml by lines failed for file: '%s', with error: '%s'", file_path, ex)
raise ex

0 comments on commit 8961249

Please sign in to comment.