Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR to fix the ARI observed issue, where inline replace wasn't acting as expected #258

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ansible_risk_insight/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def run(self):
fpath_from_root = target_info["path_from_root"]
scan_type = target_info["scan_type"]
count_in_type = len(file_list[scan_type])
print(f"\r[{i+1}/{total}] {scan_type} {fpath_from_root} ", end="")
print(f"\r[{i + 1}/{total}] {scan_type} {fpath_from_root} ", end="")
out_dir = os.path.join(args.out_dir, f"{scan_type}s", str(count_in_type))
c.evaluate(
type=scan_type,
Expand Down Expand Up @@ -223,8 +223,8 @@ def run(self):
with open(list_file_path, "w") as file:
json.dump(index_data, file)
if args.fix:
for each in index_data.keys():
ari_suggestion_file_path = os.path.join(args.out_dir, f"{scan_type}s", str(each), "rule_result.json")
for key, val in index_data.items():
ari_suggestion_file_path = os.path.join(args.out_dir, f"{scan_type}s", str(key), "rule_result.json")
logger.debug("ARI suggestion file path: %s", ari_suggestion_file_path)
with open(ari_suggestion_file_path) as f:
ari_suggestion_data = json.load(f)
Expand All @@ -246,7 +246,7 @@ def run(self):
mutated_yaml = w007_rule["detail"]["mutated_yaml"]
if mutated_yaml == "":
break
temp_data = index_data[each]
temp_data = index_data[key]
if w007_rule["file"][0] not in temp_data:
target_file_path = os.path.join(args.target_name, temp_data, w007_rule["file"][0])
if temp_file_path != "" and target_file_path != temp_file_path:
Expand Down
118 changes: 79 additions & 39 deletions ansible_risk_insight/finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -970,13 +970,18 @@ def check_diff_and_copy_olddata_to_newdata(line_number_list, lines, new_data):
return new_data


def update_and_append_new_line(new_line, old_line, leading_spaces, data_copy):
def update_and_append_new_line(new_line, old_line, leading_spaces, data_copy, line_number):
"""
Function to get the leading space for the new ARI mutated line, with
its equivaltent old line with space similar to the old line
"""
if "-" in old_line and "-" not in new_line and len(new_line.split(".")) >= 3:
data_copy.append(new_line)
data_copy.append("\n")
return ""
line_with_adjusted_space = update_line_with_space(new_line, old_line, leading_spaces)
data_copy.append(line_with_adjusted_space)
# data_copy.append(line_with_adjusted_space)
data_copy[line_number] = line_with_adjusted_space
return ""


Expand All @@ -986,35 +991,50 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list):
with open(file_path, "r") as file:
data = file.read()
yaml = FormattedYAML(
version=(1, 1)
# Ansible only uses YAML 1.1, but others files should use newer 1.2 (ruamel.yaml defaults to 1.2)
)
data_copy = populate_new_data_list(data, line_number_list)
data_copy = data.splitlines(keepends=True)
stop_line_number = 0
new_lines = []
for iter in range(len(line_number_list)):
line_number = line_number_list[iter]
count = 0
line_number = 0
# for iter in range(len(line_number_list)):
lines = data.splitlines(keepends=True)
new_task = False
for iter, line in enumerate(line_number_list):
ari_line_number = line
new_content = new_content_list[iter]
input_line_number = line_number.lstrip("L").split("-")
lines = data.splitlines(keepends=True)
input_line_number = ari_line_number.lstrip("L").split("-")

if new_lines:
if count > 1 and new_task:
line_number = line_number - (count - 1)
for i in range(len(new_lines)):
try:
data_copy.append(new_lines.pop(i))
leading_spaces = len(lines[stop_line_number + 1]) - len(lines[stop_line_number + 1].lstrip())
new_line_content = update_and_append_new_line(
new_lines.pop(i),
lines[stop_line_number + 1],
leading_spaces, data_copy,
line_number
)
count -= 1
except IndexError:
break
new_lines = new_content.splitlines(keepends=True)
# Update the specific line with new content
start_line_number = int(input_line_number[0])
if stop_line_number > 0 and (start_line_number - stop_line_number) > 1:
check_and_add_diff_lines(stop_line_number, start_line_number, lines, data_copy)
stop_line_number = int(input_line_number[1])
diff_in_lines = stop_line_number - start_line_number
temp_content = []
start = start_line_number - 1
end = stop_line_number - 1
data_copy.append("\n")
end = stop_line_number
not_replaced = False
for i in range(start, end):
line_number = i
if data_copy[line_number] == '\n':
continue
if len(lines) == i:
break
try:
Expand All @@ -1031,54 +1051,74 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list):
leading_spaces = len(lines[line_number]) - len(lines[line_number].lstrip())
temp_content.append(new_line_content)
new_line_content = new_line_content.lstrip(" ")
lines[line_number] = " " * leading_spaces + new_line_content
data_copy.append(lines[line_number])
data_copy[line_number] = " " * leading_spaces + new_line_content
else:
new_line_key = new_line_content.split(":")
new_key = new_line_key[0].strip(" ")
for k in range(start, end):
if count > 1 and new_task:
line_number = line_number - (count - 1)
for k in range(start, end + 1):
if k < len(lines):
old_line_key = lines[k].split(":")
if "---" in old_line_key[0]:
continue
old_key = old_line_key[0].strip(" ")
if old_key == '\n':
continue
if "-" in old_line_key[0] and ":" not in lines[k] and "-" in new_key:
# diff_in_lines = len(lines) - len(new_lines)
leading_spaces = len(lines[k]) - len(lines[k].lstrip())
if diff_in_lines > len(lines):
for i in range(k, k + diff_in_lines):
if lines[i] == "\n":
lines.pop(i - 1)
break
elif i < len(lines) and ":" not in lines[i]:
lines.pop(i)
else:
break
new_line_content = update_and_append_new_line(new_line_content, lines[k], leading_spaces, data_copy)
for i in range(k, k + diff_in_lines):
if data_copy[i] == "\n":
data_copy.pop(i - 1)
lines.pop(i - 1)
break
elif old_key == new_key:
break
elif i < len(data_copy) and ":" not in data_copy[i]:
data_copy.pop(i)
lines.pop(i)
else:
break
count += 1
new_line_content = update_and_append_new_line(new_line_content, lines[k], leading_spaces, data_copy, k)
new_task = False
break
elif old_key == new_key:
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy)
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy, k)
break
elif old_key.rstrip("\n") == new_key:
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy)
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy, k)
break
elif old_key.rstrip("\n") in new_key.split("."):
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy)
elif "." in new_key and old_key.rstrip("\n") in new_key.split("."):
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy, k)
break
if new_line_content: # if there wasn't a match with old line, so this seems updated by ARI and added to w/o any change
data_copy.append(new_line_content)
else:
return IndexError("Line number out of range.")
# check for diff b/w new content and old contents,
# and copy the old content that's not updated by ARI mutation
data_copy = check_diff_and_copy_olddata_to_newdata(line_number_list, lines, data_copy)
# Join the lines back to a single string
elif "." in new_key and new_key.split(".")[-1] in old_key.rstrip("\n"):
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy, k)
break
if new_line_content and not_replaced and len(new_lines) > 0:
leading_spaces = len(lines[line_number]) - len(lines[line_number].lstrip())
new_line_content = update_and_append_new_line(new_line_content, lines[i], leading_spaces, data_copy, line_number)
new_line_content = ""
if new_line_content and len(new_lines) > 0:
try:
k += 1
leading_spaces = len(lines[k]) - len(lines[k].lstrip())
new_line_content = update_and_append_new_line(new_line_content, lines[i], leading_spaces, data_copy, k)
for each in new_lines:
k += 1
leading_spaces = len(lines[k]) - len(lines[k].lstrip())
new_line_content = update_and_append_new_line(each, lines[i], leading_spaces, data_copy, k)
new_lines = []
except IndexError:
break
new_task = True
updated_data = "".join(data_copy)
# Parse the updated YAML content to ensure it is valid
updated_parsed_data = yaml.load(updated_data)
# Write the updated YAML content back to the file
if updated_parsed_data:
file_content = yaml.dumps(updated_parsed_data)
with open(file_path, "w") as file:
yaml.dump(updated_parsed_data, file)
file.write(file_content)
except Exception as ex:
logger.warning("YAML LINES: ARI fix update yaml by lines failed for file: '%s', with error: '%s'", file_path, ex)
4 changes: 2 additions & 2 deletions ansible_risk_insight/ram_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def scan(self, i, num, type, name):
elapsed = round(time.time() - self.start, 2)
start_of_this_scan = time.time()
thread_id = threading.get_native_id()
print(f"[{i+1}/{num}] start {type} {name} ({elapsed} sec. elapsed) (thread: {thread_id})")
print(f"[{i + 1}/{num}] start {type} {name} ({elapsed} sec. elapsed) (thread: {thread_id})")
use_src_cache = True

if self.skip_scan(type, name):
Expand Down Expand Up @@ -133,7 +133,7 @@ def scan(self, i, num, type, name):

elapsed_for_this_scan = round(time.time() - start_of_this_scan, 2)
if elapsed_for_this_scan > 60:
print(f"WARNING: It took {elapsed_for_this_scan} sec. to process [{i+1}/{num}] {type} {name}")
print(f"WARNING: It took {elapsed_for_this_scan} sec. to process [{i + 1}/{num}] {type} {name}")

def save_ram_log(self, type, name, fail):
out_dir = os.path.join(self._scanner.root_dir, "log", type, name)
Expand Down
Loading
Loading