Skip to content

Commit

Permalink
Import risk assessment through the CLI (#1110)
Browse files Browse the repository at this point in the history
  • Loading branch information
ab-smith authored Dec 1, 2024
1 parent b444ad8 commit 6f40593
Show file tree
Hide file tree
Showing 5 changed files with 337 additions and 25 deletions.
123 changes: 115 additions & 8 deletions backend/core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,22 @@ def quality_check_detail(self, request, pk):
else:
return Response(status=HTTP_403_FORBIDDEN)

@action(detail=False, methods=["get"])
def ids(self, request):
my_map = dict()

(viewable_items, _, _) = RoleAssignment.get_accessible_object_ids(
folder=Folder.get_root_folder(),
user=request.user,
object_type=Project,
)
for item in Project.objects.filter(id__in=viewable_items):
if my_map.get(item.folder.name) is None:
my_map[item.folder.name] = {}
my_map[item.folder.name].update({item.name: item.id})

return Response(my_map)


class ThreatViewSet(BaseModelViewSet):
"""
Expand All @@ -318,6 +334,21 @@ def retrieve(self, request, *args, **kwargs):
def threats_count(self, request):
return Response({"results": threats_count_per_name(request.user)})

@action(detail=False, methods=["get"])
def ids(self, request):
my_map = dict()

(viewable_items, _, _) = RoleAssignment.get_accessible_object_ids(
folder=Folder.get_root_folder(),
user=request.user,
object_type=Threat,
)
for item in Threat.objects.filter(id__in=viewable_items):
if my_map.get(item.folder.name) is None:
my_map[item.folder.name] = {}
my_map[item.folder.name].update({item.name: item.id})
return Response(my_map)


class AssetViewSet(BaseModelViewSet):
"""
Expand Down Expand Up @@ -395,6 +426,21 @@ def graph(self, request):
{"nodes": nodes, "links": links, "categories": categories, "meta": meta}
)

@action(detail=False, methods=["get"])
def ids(self, request):
my_map = dict()

(viewable_items, _, _) = RoleAssignment.get_accessible_object_ids(
folder=Folder.get_root_folder(),
user=request.user,
object_type=Asset,
)
for item in Asset.objects.filter(id__in=viewable_items):
if my_map.get(item.folder.name) is None:
my_map[item.folder.name] = {}
my_map[item.folder.name].update({item.name: item.id})
return Response(my_map)

@action(detail=False, name="Get security objectives")
def security_objectives(self, request):
return Response({"results": Asset.DEFAULT_SECURITY_OBJECTIVES})
Expand Down Expand Up @@ -459,6 +505,22 @@ def used(self, request):
)
return Response({"results": used_matrices})

@action(detail=False, methods=["get"])
def ids(self, request):
my_map = dict()

(viewable_items, _, _) = RoleAssignment.get_accessible_object_ids(
folder=Folder.get_root_folder(),
user=request.user,
object_type=RiskMatrix,
)
for item in RiskMatrix.objects.filter(id__in=viewable_items):
if my_map.get(item.folder.name) is None:
my_map[item.folder.name] = {}
my_map[item.folder.name].update({item.name: item.id})

return Response(my_map)


class VulnerabilityViewSet(BaseModelViewSet):
"""
Expand Down Expand Up @@ -649,30 +711,46 @@ def risk_assessment_csv(self, request, pk):
writer = csv.writer(response, delimiter=";")
columns = [
"ref_id",
"assets",
"threats",
"name",
"description",
"existing_controls",
"current_level",
"applied_controls",
"residual_level",
"current_impact",
"current_proba",
"current_risk",
"additional_controls",
"residual_impact",
"residual_proba",
"residual_risk",
"treatment",
]
writer.writerow(columns)

for scenario in risk_assessment.risk_scenarios.all().order_by("created_at"):
applied_controls = ",".join(
[m.csv_value for m in scenario.applied_controls.all()]
for scenario in risk_assessment.risk_scenarios.all().order_by("ref_id"):
additional_controls = ",".join(
[m.name for m in scenario.applied_controls.all()]
)
existing_controls = ",".join(
[m.name for m in scenario.existing_applied_controls.all()]
)

threats = ",".join([t.name for t in scenario.threats.all()])
assets = ",".join([t.name for t in scenario.assets.all()])

row = [
scenario.ref_id,
assets,
threats,
scenario.name,
scenario.description,
scenario.existing_controls,
existing_controls,
scenario.get_current_impact()["name"],
scenario.get_current_proba()["name"],
scenario.get_current_risk()["name"],
applied_controls,
additional_controls,
scenario.get_residual_impact()["name"],
scenario.get_residual_proba()["name"],
scenario.get_residual_risk()["name"],
scenario.treatment,
]
Expand Down Expand Up @@ -1070,6 +1148,22 @@ def get_timeline_info(self, request):
colorMap[domain.name] = next(color_cycle)
return Response({"entries": entries, "colorMap": colorMap})

@action(detail=False, methods=["get"])
def ids(self, request):
my_map = dict()

(viewable_items, _, _) = RoleAssignment.get_accessible_object_ids(
folder=Folder.get_root_folder(),
user=request.user,
object_type=AppliedControl,
)
for item in AppliedControl.objects.filter(id__in=viewable_items):
if my_map.get(item.folder.name) is None:
my_map[item.folder.name] = {}
my_map[item.folder.name].update({item.name: item.id})

return Response(my_map)


class PolicyViewSet(AppliedControlViewSet):
model = Policy
Expand Down Expand Up @@ -1501,6 +1595,19 @@ def org_tree(self, request):

return Response(tree)

@action(detail=False, methods=["get"])
def ids(self, request):
my_map = dict()

(viewable_items, _, _) = RoleAssignment.get_accessible_object_ids(
folder=Folder.get_root_folder(),
user=request.user,
object_type=Folder,
)
for item in Folder.objects.filter(id__in=viewable_items):
my_map[item.name] = item.id
return Response(my_map)

@action(detail=False, methods=["get"])
def my_assignments(self, request):
risk_assessments = RiskAssessment.objects.filter(
Expand Down
4 changes: 4 additions & 0 deletions cli/RA_sample.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ref_id;assets;threats;name;description;existing_controls;current_impact;current_proba;current_risk;additional_controls;residual_impact;residual_proba;residual_risk;treatment
R.1;dsafa;Data Encrypted for Impact;Ransomware;;ISMS Scope document,Statement of Applicabilty document;Significant;Likely;Low;Risk management policy,Organization overview document,Main policy,Competency matrix;Critical;Very likely;High;open
R.2;;System Shutdown/Reboot;Unavailability;;Information security awareness and traning policy;Significant;Very likely;Medium;Management review plan document,Main policy,ISMS Scope document,Responsibility matrix;Important;Unlikely;Medium;open
R.3;dsasfad;Scheduled Task,Cloud Administration Command;Insider threats;;;Important;Likely;Medium;;--;--;--;open
6 changes: 6 additions & 0 deletions cli/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

## Examples

```sh
./clica.py import-risk-assessment --file RA_sample.csv --folder "BU 1" --project "Orion" --matrix "4x4 risk matrix from EBIOS-RM" --name example
```
Loading

0 comments on commit 6f40593

Please sign in to comment.