Skip to content

Commit

Permalink
feat(engine_risk): 🚀 Exclude Image Base findings from remediation rat…
Browse files Browse the repository at this point in the history
…e. (#326)
  • Loading branch information
ssantaa9 authored Feb 28, 2025
2 parents ae4f53d + c986292 commit 51cd9ba
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

from collections import Counter
import copy
import sympy as sp
import math


class BreakBuild:
Expand Down Expand Up @@ -53,7 +55,7 @@ def __init__(

def process(self):
new_report_list, applied_exclusions = self._apply_exclusions(self.report_list)
self._tag_blacklist_control(new_report_list)
self._blacklist_control(new_report_list)
self._remediation_rate_control(self.all_report, new_report_list)
self._risk_score_control(new_report_list)
all_exclusions = list(self.vm_exclusions) + list(applied_exclusions)
Expand Down Expand Up @@ -121,17 +123,45 @@ def _breaker(self):
def _remediation_rate_control(
self, all_report: "list[Report]", new_report_list: "list[Report]"
):
mitigated = sum(1 for report in all_report if report.mitigated)
white_list = sum(
sp.init_printing(use_unicode=True)
(
remediation_rate_name,
mitigated_name,
all_findings_name,
new_industry_vulnerabilities,
white_list_name,
base_image_name,
) = sp.symbols(
"RemediationRate Mitigated AllFindings NewIndustryVulnerabilities WhiteList BaseImage"
)
formula = sp.Eq(
remediation_rate_name,
100
* (mitigated_name / (all_findings_name - new_industry_vulnerabilities - white_list_name - base_image_name)),
)
print("\n")
sp.pretty_print(formula)
print("\n")

mitigated_count = sum(1 for report in all_report if report.mitigated)
white_list_count = sum(
1
for report in all_report
if "On Whitelist" in report.risk_status and not report.mitigated
)
base_image_count = sum(
1
for report in all_report
if "white_list" in report.tags and not report.mitigated
if "Image Base" in report.vul_description
and "On Whitelist" not in report.risk_status
and not report.mitigated
)
total = len(all_report) - self.policy_excluded - white_list
all_findings_count = len(all_report)
print(
f"Mitigated count: {mitigated} Total count: {len(all_report)} Policy excluded: {self.policy_excluded + white_list}"
f"Mitigated: {mitigated_count} AllFindings: {all_findings_count} BaseImage: {base_image_count} NewIndustryVulnerabilities: {self.policy_excluded} WhiteList: {white_list_count}\n\n"
)
remediation_rate_value = self._get_percentage(mitigated / total)
total = all_findings_count - self.policy_excluded - white_list_count - base_image_count
remediation_rate_value = self._get_percentage(mitigated_count / total)

risk_threshold = self._get_remediation_rate_threshold(total)
self.remediation_rate = remediation_rate_value
Expand All @@ -152,10 +182,11 @@ def _remediation_rate_control(
)
self.warning_build = True
else:
missing_findings = math.ceil((risk_threshold / 100 * total) - mitigated_count)
print(
self.devops_platform_gateway.message(
"error",
f"Remediation rate {remediation_rate_value}% is less than {risk_threshold}%",
f"Remediation rate {remediation_rate_value}% is less than {risk_threshold}%. Minimum findings to mitigate: {missing_findings}.",
)
)
self.break_build = True
Expand Down Expand Up @@ -230,7 +261,7 @@ def _apply_exclusions(self, report_list: "list[Report]"):

return filtered_reports, applied_exclusions

def _tag_blacklist_control(self, report_list: "list[Report]"):
def _blacklist_control(self, report_list: "list[Report]"):
remote_config = self.remote_config
if report_list:
tag_blacklist = set(remote_config["TAG_BLACKLIST_EXCLUSION_DAYS"].keys())
Expand Down Expand Up @@ -271,13 +302,26 @@ def _tag_blacklist_control(self, report_list: "list[Report]"):

if filtered_reports_above_threshold:
self.break_build = True
self.blacklisted = len(filtered_reports_above_threshold)
self.blacklisted += len(filtered_reports_above_threshold)
self.report_breaker.extend(
copy.deepcopy(
[report for report, _ in filtered_reports_above_threshold]
)
)

for report in report_list:
if "On Blacklist" in report.risk_status:
self.break_build = True
report.reason = "Blacklisted"
self.blacklisted += 1
self.report_breaker.append(copy.deepcopy(report))
print(
self.devops_platform_gateway.message(
"error",
f"Report {report.vm_id} is blacklisted.",
)
)

def _risk_score_control(self, report_list: "list[Report]"):
remote_config = self.remote_config
risk_score_threshold = self.threshold["RISK_SCORE"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,7 @@ def _get_exclusions_new_vuln(self):
).strftime("%d%m%Y")
exclusion_data["reason"] = "New vulnerability in the industry"
exclusions.append(Exclusions(**exclusion_data))
finding.vul_description = finding.vul_description.replace(
"Image Base", ""
)
return exclusions, len(exclusions)
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"devsecops_engine_tools.engine_risk.src.domain.usecases.break_build.BreakBuild._apply_exclusions"
)
@patch(
"devsecops_engine_tools.engine_risk.src.domain.usecases.break_build.BreakBuild._tag_blacklist_control"
"devsecops_engine_tools.engine_risk.src.domain.usecases.break_build.BreakBuild._blacklist_control"
)
@patch(
"devsecops_engine_tools.engine_risk.src.domain.usecases.break_build.BreakBuild._risk_score_control"
Expand All @@ -36,7 +36,7 @@ def test_process(
map_applied_exclusion,
print_exclusions,
risk_score_control,
tag_blacklist_control,
blacklist_control,
apply_exclusions,
remediation_rate_control,
):
Expand All @@ -53,7 +53,7 @@ def test_process(

remediation_rate_control.assert_called_once()
apply_exclusions.assert_called_once()
tag_blacklist_control.assert_called_once()
blacklist_control.assert_called_once()
risk_score_control.assert_called_once()
print_exclusions.assert_called_once()
map_applied_exclusion.assert_called_once()
Expand Down Expand Up @@ -174,7 +174,7 @@ def test_remediation_rate_control_less():

devops_platform_gateway.message.assert_called_with(
"error",
f"Remediation rate {remediation_rate_value}% is less than {risk_threshold}%",
f"Remediation rate {remediation_rate_value}% is less than {risk_threshold}%. Minimum findings to mitigate: 1.",
)


Expand Down Expand Up @@ -266,7 +266,7 @@ def test_apply_exclusions_id():
assert result == ([], exclusions)


def test_tag_blacklist_control_error():
def test_blacklist_control_error():
report_list = [
Report(
vuln_id_from_tool="id1",
Expand All @@ -290,15 +290,15 @@ def test_tag_blacklist_control_error():
{"TAG_MAX_AGE": 5},
0,
)
break_build._tag_blacklist_control(report_list)
break_build._blacklist_control(report_list)

mock_devops_platform_gateway.message.assert_called_once_with(
"error",
f"Report {report_list[0].vm_id} with tag '{report_list[0].tags[0]}' is blacklisted and age {report_list[0].age} is above threshold {tag_age_threshold}",
)


def test_tag_blacklist_control_warning():
def test_blacklist_control_warning():
report_list = [
Report(
vuln_id_from_tool="id2",
Expand All @@ -322,7 +322,7 @@ def test_tag_blacklist_control_warning():
{"TAG_MAX_AGE": 5},
0,
)
break_build._tag_blacklist_control(report_list)
break_build._blacklist_control(report_list)

mock_devops_platform_gateway.message.assert_called_once_with(
"warning",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ def test_get_exclusions_new_vuln(mock_datetime, mock_exclusions):
mock_datetime.now.return_value = fixed_date
mock_datetime.strptime.side_effect = lambda s, fmt: datetime.strptime(s, fmt)

finding = SimpleNamespace(publish_date="2023-01-10", id="123")
finding = SimpleNamespace(
publish_date="2023-01-10",
id="123",
vul_description="New vulnerability in the industry",
)

get_exclusions = GetExclusions(
devops_platform_gateway=MagicMock(),
Expand Down
1 change: 1 addition & 0 deletions tools/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ packageurl-python==0.15.6
ruamel.yaml==0.18.6
Authlib==1.3.2
PyJWT==2.9.0
sympy==1.13.3

0 comments on commit 51cd9ba

Please sign in to comment.