Skip to content

Commit

Permalink
chore: ruff format
Browse files Browse the repository at this point in the history
  • Loading branch information
nas-tabchiche committed Jan 2, 2025
1 parent 9a8a90a commit 43bc8e4
Showing 1 changed file with 35 additions and 70 deletions.
105 changes: 35 additions & 70 deletions backend/core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,7 @@ def graph(self, request):
)
nodes_idx[asset.name] = N
links.append(
{"source": nodes_idx[asset.folder.name],
"target": N, "value": "scope"}
{"source": nodes_idx[asset.folder.name], "target": N, "value": "scope"}
)
N += 1
for asset in Asset.objects.filter(id__in=viewable_assets):
Expand Down Expand Up @@ -603,8 +602,7 @@ def perform_create(self, serializer):
instance: RiskAssessment = serializer.save()
if instance.ebios_rm_study:
instance.risk_matrix = instance.ebios_rm_study.risk_matrix
ebios_rm_study = EbiosRMStudy.objects.get(
id=instance.ebios_rm_study.id)
ebios_rm_study = EbiosRMStudy.objects.get(id=instance.ebios_rm_study.id)
for operational_scenario in [
operational_scenario
for operational_scenario in ebios_rm_study.operational_scenarios.all()
Expand Down Expand Up @@ -658,8 +656,7 @@ def quality_check(self, request):
user=request.user,
object_type=RiskAssessment,
)
risk_assessments = RiskAssessment.objects.filter(
id__in=viewable_objects)
risk_assessments = RiskAssessment.objects.filter(id__in=viewable_objects)
res = [
{"id": a.id, "name": a.name, "quality_check": a.quality_check()}
for a in risk_assessments
Expand Down Expand Up @@ -692,8 +689,7 @@ def plan(self, request, pk):
if UUID(pk) in viewable_objects:
risk_assessment_object = self.get_object()
risk_scenarios_objects = risk_assessment_object.risk_scenarios.all()
risk_assessment = RiskAssessmentReadSerializer(
risk_assessment_object).data
risk_assessment = RiskAssessmentReadSerializer(risk_assessment_object).data
risk_scenarios = RiskScenarioReadSerializer(
risk_scenarios_objects, many=True
).data
Expand Down Expand Up @@ -909,10 +905,8 @@ def duplicate(self, request, pk):
status=risk_assessment.status,
)

duplicate_risk_assessment.authors.set(
risk_assessment.authors.all())
duplicate_risk_assessment.reviewers.set(
risk_assessment.reviewers.all())
duplicate_risk_assessment.authors.set(risk_assessment.authors.all())
duplicate_risk_assessment.reviewers.set(risk_assessment.reviewers.all())

for scenario in risk_assessment.risk_scenarios.all():
duplicate_scenario = RiskScenario.objects.create(
Expand Down Expand Up @@ -982,8 +976,7 @@ class AppliedControlViewSet(BaseModelViewSet):
"requirement_assessments",
"evidences",
]
search_fields = ["name", "description",
"risk_scenarios", "requirement_assessments"]
search_fields = ["name", "description", "risk_scenarios", "requirement_assessments"]

@method_decorator(cache_page(60 * LONG_CACHE_TTL))
@action(detail=False, name="Get status choices")
Expand Down Expand Up @@ -1052,8 +1045,7 @@ def todo(self, request):
for key in ["created_at","updated_at","eta"] :
measures[i][key] = str(measures[i][key])"""

ranking_scores = {str(mtg.id): mtg.get_ranking_score()
for mtg in measures}
ranking_scores = {str(mtg.id): mtg.get_ranking_score() for mtg in measures}

measures = [AppliedControlReadSerializer(mtg).data for mtg in measures]

Expand Down Expand Up @@ -1129,8 +1121,7 @@ def get_controls_info(self, request):
requirement_assessments__applied_controls=ac
).distinct():
audit_coverage = (
RequirementAssessment.objects.filter(
compliance_assessment=ca)
RequirementAssessment.objects.filter(compliance_assessment=ca)
.filter(applied_controls=ac)
.count()
)
Expand Down Expand Up @@ -1346,8 +1337,7 @@ def duplicate(self, request, pk):
duplicate_applied_control.save()

return Response(
{"results": AppliedControlReadSerializer(
duplicate_applied_control).data}
{"results": AppliedControlReadSerializer(duplicate_applied_control).data}
)

@action(detail=False, methods=["get"])
Expand Down Expand Up @@ -1422,11 +1412,9 @@ def impact_graph(self, request):
)
indexes[audit.id] = idx_cnt
idx_cnt += 1
links.append(
{"source": indexes[audit.id], "target": indexes[req.id]})
links.append({"source": indexes[audit.id], "target": indexes[req.id]})

links.append(
{"source": indexes[ac.id], "target": indexes[req.id]})
links.append({"source": indexes[ac.id], "target": indexes[req.id]})
for sc in RiskScenario.objects.filter(applied_controls__id=ac.id):
nodes.append(
{
Expand All @@ -1451,11 +1439,9 @@ def impact_graph(self, request):
)
indexes[ra.id] = idx_cnt
idx_cnt += 1
links.append(
{"source": indexes[ra.id], "target": indexes[sc.id]})
links.append({"source": indexes[ra.id], "target": indexes[sc.id]})

links.append(
{"source": indexes[ac.id], "target": indexes[sc.id]})
links.append({"source": indexes[ac.id], "target": indexes[sc.id]})

return Response({"nodes": nodes, "categories": categories, "links": links})

Expand All @@ -1472,8 +1458,7 @@ class PolicyViewSet(AppliedControlViewSet):
"requirement_assessments",
"evidences",
]
search_fields = ["name", "description",
"risk_scenarios", "requirement_assessments"]
search_fields = ["name", "description", "risk_scenarios", "requirement_assessments"]

@method_decorator(cache_page(60 * LONG_CACHE_TTL))
@action(detail=False, name="Get csf_function choices")
Expand Down Expand Up @@ -1889,8 +1874,7 @@ def org_tree(self, request):
.filter(id__in=viewable_objects, parent_folder=Folder.get_root_folder())
.distinct()
):
entry = {"name": folder.name,
"children": get_folder_content(folder)}
entry = {"name": folder.name, "children": get_folder_content(folder)}
folders_list.append(entry)
tree.update({"children": folders_list})

Expand Down Expand Up @@ -1937,8 +1921,7 @@ def my_assignments(self, request):
.distinct()
)
non_active_controls = controls.exclude(status="active")
risk_scenarios = RiskScenario.objects.filter(
owner=request.user).distinct()
risk_scenarios = RiskScenario.objects.filter(owner=request.user).distinct()
controls_progress = 0
evidences_progress = 0
tot_ac = controls.count()
Expand All @@ -1952,15 +1935,12 @@ def my_assignments(self, request):

evidences_progress = int((with_evidences / tot_ac) * 100)

RA_serializer = RiskAssessmentReadSerializer(
risk_assessments[:10], many=True)
CA_serializer = ComplianceAssessmentReadSerializer(
audits[:6], many=True)
RA_serializer = RiskAssessmentReadSerializer(risk_assessments[:10], many=True)
CA_serializer = ComplianceAssessmentReadSerializer(audits[:6], many=True)
AC_serializer = AppliedControlReadSerializer(
non_active_controls[:10], many=True
)
RS_serializer = RiskScenarioReadSerializer(
risk_scenarios[:10], many=True)
RS_serializer = RiskScenarioReadSerializer(risk_scenarios[:10], many=True)

return Response(
{
Expand Down Expand Up @@ -2034,8 +2014,7 @@ def get_agg_data(request):
Folder.get_root_folder(), request.user, RiskAssessment
)[0]
data = risk_status(
request.user, RiskAssessment.objects.filter(
id__in=viewable_risk_assessments)
request.user, RiskAssessment.objects.filter(id__in=viewable_risk_assessments)
)

return Response({"results": data})
Expand Down Expand Up @@ -2089,8 +2068,7 @@ def get_composer_data(request):

data = compile_risk_assessment_for_composer(request.user, risk_assessments)
for _data in data["risk_assessment_objects"]:
quality_check = serialize_nested(
_data["risk_assessment"].quality_check())
quality_check = serialize_nested(_data["risk_assessment"].quality_check())
_data["risk_assessment"] = RiskAssessmentReadSerializer(
_data["risk_assessment"]
).data
Expand Down Expand Up @@ -2161,8 +2139,7 @@ def used(self, request):
used_frameworks = _used_frameworks.values("id", "name")
for i in range(len(used_frameworks)):
used_frameworks[i]["compliance_assessments_count"] = (
ComplianceAssessment.objects.filter(
framework=_used_frameworks[i].id)
ComplianceAssessment.objects.filter(framework=_used_frameworks[i].id)
.filter(id__in=viewable_assessments)
.count()
)
Expand All @@ -2172,11 +2149,9 @@ def used(self, request):
def mappings(self, request, pk):
framework = self.get_object()
available_target_frameworks_objects = [framework]
mappings = RequirementMappingSet.objects.filter(
source_framework=framework)
mappings = RequirementMappingSet.objects.filter(source_framework=framework)
for mapping in mappings:
available_target_frameworks_objects.append(
mapping.target_framework)
available_target_frameworks_objects.append(mapping.target_framework)
available_target_frameworks = FrameworkReadSerializer(
available_target_frameworks_objects, many=True
).data
Expand Down Expand Up @@ -2215,8 +2190,7 @@ class EvidenceViewSet(BaseModelViewSet):
"""

model = Evidence
filterset_fields = ["folder", "applied_controls",
"requirement_assessments", "name"]
filterset_fields = ["folder", "applied_controls", "requirement_assessments", "name"]
search_fields = ["name"]
ordering_fields = ["name", "description"]

Expand Down Expand Up @@ -2650,16 +2624,14 @@ def export(self, request, pk):
with tempfile.NamedTemporaryFile(delete=True) as tmp:
# Download the attachment to the temporary file
if default_storage.exists(evidence.attachment.name):
file = default_storage.open(
evidence.attachment.name)
file = default_storage.open(evidence.attachment.name)
tmp.write(file.read())
tmp.flush()
zipf.write(
tmp.name,
os.path.join(
"evidences",
os.path.basename(
evidence.attachment.name),
os.path.basename(evidence.attachment.name),
),
)
zipf.writestr("index.html", index_content)
Expand Down Expand Up @@ -2775,8 +2747,7 @@ def todo(self, request):
for key in ["created_at","updated_at","eta"] :
measures[i][key] = str(measures[i][key])"""

ranking_scores = {str(mtg.id): mtg.get_ranking_score()
for mtg in measures}
ranking_scores = {str(mtg.id): mtg.get_ranking_score() for mtg in measures}

measures = [AppliedControlReadSerializer(mtg).data for mtg in measures]

Expand Down Expand Up @@ -2833,8 +2804,7 @@ class RequirementMappingSetViewSet(BaseModelViewSet):
@action(detail=True, methods=["get"], url_path="graph_data")
def graph_data(self, request, pk=None):
mapping_set_id = pk
mapping_set = get_object_or_404(
RequirementMappingSet, id=mapping_set_id)
mapping_set = get_object_or_404(RequirementMappingSet, id=mapping_set_id)

nodes = []
links = []
Expand Down Expand Up @@ -2874,8 +2844,7 @@ def graph_data(self, request, pk=None):
)
tnodes_idx[req.ref_id] = N
N += 1
req_mappings = RequirementMapping.objects.filter(
mapping_set=mapping_set_id)
req_mappings = RequirementMapping.objects.filter(mapping_set=mapping_set_id)
for item in req_mappings:
if (
item.source_requirement.assessable
Expand Down Expand Up @@ -2973,10 +2942,8 @@ def generate_html(
graph = filter_graph_by_implementation_groups(graph, implementation_groups)
flattened_graph = flatten_dict(graph)

requirement_nodes = requirement_nodes.filter(
urn__in=flattened_graph.values())
assessments = assessments.filter(
requirement__urn__in=flattened_graph.values())
requirement_nodes = requirement_nodes.filter(urn__in=flattened_graph.values())
assessments = assessments.filter(requirement__urn__in=flattened_graph.values())

node_per_urn = {r.urn: r for r in requirement_nodes}
ancestors = {}
Expand Down Expand Up @@ -3018,10 +2985,8 @@ def generate_data_rec(requirement_node: RequirementNode):
node_data["assessments"] = assessment
node_data["result"] = assessment.get_result_display()
node_data["status"] = assessment.get_status_display()
node_data["result_color_class"] = color_css_class(
assessment.result)
node_data["status_color_class"] = color_css_class(
assessment.status)
node_data["result_color_class"] = color_css_class(assessment.result)
node_data["status_color_class"] = color_css_class(assessment.status)
direct_evidences = assessment.evidences.all()
if direct_evidences:
selected_evidences += direct_evidences
Expand Down

0 comments on commit 43bc8e4

Please sign in to comment.