Skip to content

Commit

Permalink
fix issue 257
Browse files Browse the repository at this point in the history
Signed-off-by: Sumit Jaiswal <[email protected]>
  • Loading branch information
justjais committed Sep 17, 2024
1 parent 664f2c1 commit a3ec701
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 52 deletions.
8 changes: 4 additions & 4 deletions ansible_risk_insight/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def run(self):
fpath_from_root = target_info["path_from_root"]
scan_type = target_info["scan_type"]
count_in_type = len(file_list[scan_type])
print(f"\r[{i+1}/{total}] {scan_type} {fpath_from_root} ", end="")
print(f"\r[{i + 1}/{total}] {scan_type} {fpath_from_root} ", end="")
out_dir = os.path.join(args.out_dir, f"{scan_type}s", str(count_in_type))
c.evaluate(
type=scan_type,
Expand Down Expand Up @@ -223,8 +223,8 @@ def run(self):
with open(list_file_path, "w") as file:
json.dump(index_data, file)
if args.fix:
for each in index_data.keys():
ari_suggestion_file_path = os.path.join(args.out_dir, f"{scan_type}s", str(each), "rule_result.json")
for key, val in index_data.items():
ari_suggestion_file_path = os.path.join(args.out_dir, f"{scan_type}s", str(key), "rule_result.json")
logger.debug("ARI suggestion file path: %s", ari_suggestion_file_path)
with open(ari_suggestion_file_path) as f:
ari_suggestion_data = json.load(f)
Expand All @@ -246,7 +246,7 @@ def run(self):
mutated_yaml = w007_rule["detail"]["mutated_yaml"]
if mutated_yaml == "":
break
temp_data = index_data[each]
temp_data = index_data[key]
if w007_rule["file"][0] not in temp_data:
target_file_path = os.path.join(args.target_name, temp_data, w007_rule["file"][0])
if temp_file_path != "" and target_file_path != temp_file_path:
Expand Down
118 changes: 79 additions & 39 deletions ansible_risk_insight/finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -970,13 +970,18 @@ def check_diff_and_copy_olddata_to_newdata(line_number_list, lines, new_data):
return new_data


def update_and_append_new_line(new_line, old_line, leading_spaces, data_copy):
def update_and_append_new_line(new_line, old_line, leading_spaces, data_copy, line_number):
"""
Function to get the leading space for the new ARI mutated line, with
its equivaltent old line with space similar to the old line
"""
if "-" in old_line and "-" not in new_line and len(new_line.split(".")) >= 3:
data_copy.append(new_line)
data_copy.append("\n")
return ""
line_with_adjusted_space = update_line_with_space(new_line, old_line, leading_spaces)
data_copy.append(line_with_adjusted_space)
# data_copy.append(line_with_adjusted_space)
data_copy[line_number] = line_with_adjusted_space
return ""


Expand All @@ -986,35 +991,50 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list):
with open(file_path, "r") as file:
data = file.read()
yaml = FormattedYAML(
version=(1, 1)
# Ansible only uses YAML 1.1, but others files should use newer 1.2 (ruamel.yaml defaults to 1.2)
)
data_copy = populate_new_data_list(data, line_number_list)
data_copy = data.splitlines(keepends=True)
stop_line_number = 0
new_lines = []
for iter in range(len(line_number_list)):
line_number = line_number_list[iter]
count = 0
line_number = 0
# for iter in range(len(line_number_list)):
lines = data.splitlines(keepends=True)
new_task = False
for iter, line in enumerate(line_number_list):
ari_line_number = line
new_content = new_content_list[iter]
input_line_number = line_number.lstrip("L").split("-")
lines = data.splitlines(keepends=True)
input_line_number = ari_line_number.lstrip("L").split("-")

if new_lines:
if count > 1 and new_task:
line_number = line_number - (count - 1)
for i in range(len(new_lines)):
try:
data_copy.append(new_lines.pop(i))
leading_spaces = len(lines[stop_line_number + 1]) - len(lines[stop_line_number + 1].lstrip())
new_line_content = update_and_append_new_line(
new_lines.pop(i),
lines[stop_line_number + 1],
leading_spaces, data_copy,
line_number
)
count -= 1
except IndexError:
break
new_lines = new_content.splitlines(keepends=True)
# Update the specific line with new content
start_line_number = int(input_line_number[0])
if stop_line_number > 0 and (start_line_number - stop_line_number) > 1:
check_and_add_diff_lines(stop_line_number, start_line_number, lines, data_copy)
stop_line_number = int(input_line_number[1])
diff_in_lines = stop_line_number - start_line_number
temp_content = []
start = start_line_number - 1
end = stop_line_number - 1
data_copy.append("\n")
end = stop_line_number
not_replaced = False
for i in range(start, end):
line_number = i
if data_copy[line_number] == '\n':
continue
if len(lines) == i:
break
try:
Expand All @@ -1031,54 +1051,74 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list):
leading_spaces = len(lines[line_number]) - len(lines[line_number].lstrip())
temp_content.append(new_line_content)
new_line_content = new_line_content.lstrip(" ")
lines[line_number] = " " * leading_spaces + new_line_content
data_copy.append(lines[line_number])
data_copy[line_number] = " " * leading_spaces + new_line_content
else:
new_line_key = new_line_content.split(":")
new_key = new_line_key[0].strip(" ")
for k in range(start, end):
if count > 1 and new_task:
line_number = line_number - (count - 1)
for k in range(start, end + 1):
if k < len(lines):
old_line_key = lines[k].split(":")
if "---" in old_line_key[0]:
continue
old_key = old_line_key[0].strip(" ")
if old_key == '\n':
continue
if "-" in old_line_key[0] and ":" not in lines[k] and "-" in new_key:
# diff_in_lines = len(lines) - len(new_lines)
leading_spaces = len(lines[k]) - len(lines[k].lstrip())
if diff_in_lines > len(lines):
for i in range(k, k + diff_in_lines):
if lines[i] == "\n":
lines.pop(i - 1)
break
elif i < len(lines) and ":" not in lines[i]:
lines.pop(i)
else:
break
new_line_content = update_and_append_new_line(new_line_content, lines[k], leading_spaces, data_copy)
for i in range(k, k + diff_in_lines):
if data_copy[i] == "\n":
data_copy.pop(i - 1)
lines.pop(i - 1)
break
elif old_key == new_key:
break
elif i < len(data_copy) and ":" not in data_copy[i]:
data_copy.pop(i)
lines.pop(i)
else:
break
count += 1
new_line_content = update_and_append_new_line(new_line_content, lines[k], leading_spaces, data_copy, k)
new_task = False
break
elif old_key == new_key:
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy)
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy, k)
break
elif old_key.rstrip("\n") == new_key:
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy)
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy, k)
break
elif old_key.rstrip("\n") in new_key.split("."):
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy)
elif "." in new_key and old_key.rstrip("\n") in new_key.split("."):
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy, k)
break
if new_line_content: # if there wasn't a match with old line, so this seems updated by ARI and added to w/o any change
data_copy.append(new_line_content)
else:
return IndexError("Line number out of range.")
# check for diff b/w new content and old contents,
# and copy the old content that's not updated by ARI mutation
data_copy = check_diff_and_copy_olddata_to_newdata(line_number_list, lines, data_copy)
# Join the lines back to a single string
elif "." in new_key and new_key.split(".")[-1] in old_key.rstrip("\n"):
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy, k)
break
if new_line_content and not_replaced and len(new_lines) > 0:
leading_spaces = len(lines[line_number]) - len(lines[line_number].lstrip())
new_line_content = update_and_append_new_line(new_line_content, lines[i], leading_spaces, data_copy, line_number)
new_line_content = ""
if new_line_content and len(new_lines) > 0:
try:
k += 1
leading_spaces = len(lines[k]) - len(lines[k].lstrip())
new_line_content = update_and_append_new_line(new_line_content, lines[i], leading_spaces, data_copy, k)
for each in new_lines:
k += 1
leading_spaces = len(lines[k]) - len(lines[k].lstrip())
new_line_content = update_and_append_new_line(each, lines[i], leading_spaces, data_copy, k)
new_lines = []
except IndexError:
break
new_task = True
updated_data = "".join(data_copy)
# Parse the updated YAML content to ensure it is valid
updated_parsed_data = yaml.load(updated_data)
# Write the updated YAML content back to the file
if updated_parsed_data:
file_content = yaml.dumps(updated_parsed_data)
with open(file_path, "w") as file:
yaml.dump(updated_parsed_data, file)
file.write(file_content)
except Exception as ex:
logger.warning("YAML LINES: ARI fix update yaml by lines failed for file: '%s', with error: '%s'", file_path, ex)
4 changes: 2 additions & 2 deletions ansible_risk_insight/ram_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def scan(self, i, num, type, name):
elapsed = round(time.time() - self.start, 2)
start_of_this_scan = time.time()
thread_id = threading.get_native_id()
print(f"[{i+1}/{num}] start {type} {name} ({elapsed} sec. elapsed) (thread: {thread_id})")
print(f"[{i + 1}/{num}] start {type} {name} ({elapsed} sec. elapsed) (thread: {thread_id})")
use_src_cache = True

if self.skip_scan(type, name):
Expand Down Expand Up @@ -133,7 +133,7 @@ def scan(self, i, num, type, name):

elapsed_for_this_scan = round(time.time() - start_of_this_scan, 2)
if elapsed_for_this_scan > 60:
print(f"WARNING: It took {elapsed_for_this_scan} sec. to process [{i+1}/{num}] {type} {name}")
print(f"WARNING: It took {elapsed_for_this_scan} sec. to process [{i + 1}/{num}] {type} {name}")

def save_ram_log(self, type, name, fail):
out_dir = os.path.join(self._scanner.root_dir, "log", type, name)
Expand Down
Loading

0 comments on commit a3ec701

Please sign in to comment.