diff --git a/backend/core/views.py b/backend/core/views.py index 4e9161878..13e07a6f1 100644 --- a/backend/core/views.py +++ b/backend/core/views.py @@ -443,8 +443,7 @@ def graph(self, request): ) nodes_idx[asset.name] = N links.append( - {"source": nodes_idx[asset.folder.name], - "target": N, "value": "scope"} + {"source": nodes_idx[asset.folder.name], "target": N, "value": "scope"} ) N += 1 for asset in Asset.objects.filter(id__in=viewable_assets): @@ -603,8 +602,7 @@ def perform_create(self, serializer): instance: RiskAssessment = serializer.save() if instance.ebios_rm_study: instance.risk_matrix = instance.ebios_rm_study.risk_matrix - ebios_rm_study = EbiosRMStudy.objects.get( - id=instance.ebios_rm_study.id) + ebios_rm_study = EbiosRMStudy.objects.get(id=instance.ebios_rm_study.id) for operational_scenario in [ operational_scenario for operational_scenario in ebios_rm_study.operational_scenarios.all() @@ -658,8 +656,7 @@ def quality_check(self, request): user=request.user, object_type=RiskAssessment, ) - risk_assessments = RiskAssessment.objects.filter( - id__in=viewable_objects) + risk_assessments = RiskAssessment.objects.filter(id__in=viewable_objects) res = [ {"id": a.id, "name": a.name, "quality_check": a.quality_check()} for a in risk_assessments @@ -692,8 +689,7 @@ def plan(self, request, pk): if UUID(pk) in viewable_objects: risk_assessment_object = self.get_object() risk_scenarios_objects = risk_assessment_object.risk_scenarios.all() - risk_assessment = RiskAssessmentReadSerializer( - risk_assessment_object).data + risk_assessment = RiskAssessmentReadSerializer(risk_assessment_object).data risk_scenarios = RiskScenarioReadSerializer( risk_scenarios_objects, many=True ).data @@ -909,10 +905,8 @@ def duplicate(self, request, pk): status=risk_assessment.status, ) - duplicate_risk_assessment.authors.set( - risk_assessment.authors.all()) - duplicate_risk_assessment.reviewers.set( - risk_assessment.reviewers.all()) + duplicate_risk_assessment.authors.set(risk_assessment.authors.all()) + duplicate_risk_assessment.reviewers.set(risk_assessment.reviewers.all()) for scenario in risk_assessment.risk_scenarios.all(): duplicate_scenario = RiskScenario.objects.create( @@ -982,8 +976,7 @@ class AppliedControlViewSet(BaseModelViewSet): "requirement_assessments", "evidences", ] - search_fields = ["name", "description", - "risk_scenarios", "requirement_assessments"] + search_fields = ["name", "description", "risk_scenarios", "requirement_assessments"] @method_decorator(cache_page(60 * LONG_CACHE_TTL)) @action(detail=False, name="Get status choices") @@ -1052,8 +1045,7 @@ def todo(self, request): for key in ["created_at","updated_at","eta"] : measures[i][key] = str(measures[i][key])""" - ranking_scores = {str(mtg.id): mtg.get_ranking_score() - for mtg in measures} + ranking_scores = {str(mtg.id): mtg.get_ranking_score() for mtg in measures} measures = [AppliedControlReadSerializer(mtg).data for mtg in measures] @@ -1129,8 +1121,7 @@ def get_controls_info(self, request): requirement_assessments__applied_controls=ac ).distinct(): audit_coverage = ( - RequirementAssessment.objects.filter( - compliance_assessment=ca) + RequirementAssessment.objects.filter(compliance_assessment=ca) .filter(applied_controls=ac) .count() ) @@ -1346,8 +1337,7 @@ def duplicate(self, request, pk): duplicate_applied_control.save() return Response( - {"results": AppliedControlReadSerializer( - duplicate_applied_control).data} + {"results": AppliedControlReadSerializer(duplicate_applied_control).data} ) @action(detail=False, methods=["get"]) @@ -1422,11 +1412,9 @@ def impact_graph(self, request): ) indexes[audit.id] = idx_cnt idx_cnt += 1 - links.append( - {"source": indexes[audit.id], "target": indexes[req.id]}) + links.append({"source": indexes[audit.id], "target": indexes[req.id]}) - links.append( - {"source": indexes[ac.id], "target": indexes[req.id]}) + links.append({"source": indexes[ac.id], "target": indexes[req.id]}) for sc in RiskScenario.objects.filter(applied_controls__id=ac.id): nodes.append( { @@ -1451,11 +1439,9 @@ def impact_graph(self, request): ) indexes[ra.id] = idx_cnt idx_cnt += 1 - links.append( - {"source": indexes[ra.id], "target": indexes[sc.id]}) + links.append({"source": indexes[ra.id], "target": indexes[sc.id]}) - links.append( - {"source": indexes[ac.id], "target": indexes[sc.id]}) + links.append({"source": indexes[ac.id], "target": indexes[sc.id]}) return Response({"nodes": nodes, "categories": categories, "links": links}) @@ -1472,8 +1458,7 @@ class PolicyViewSet(AppliedControlViewSet): "requirement_assessments", "evidences", ] - search_fields = ["name", "description", - "risk_scenarios", "requirement_assessments"] + search_fields = ["name", "description", "risk_scenarios", "requirement_assessments"] @method_decorator(cache_page(60 * LONG_CACHE_TTL)) @action(detail=False, name="Get csf_function choices") @@ -1889,8 +1874,7 @@ def org_tree(self, request): .filter(id__in=viewable_objects, parent_folder=Folder.get_root_folder()) .distinct() ): - entry = {"name": folder.name, - "children": get_folder_content(folder)} + entry = {"name": folder.name, "children": get_folder_content(folder)} folders_list.append(entry) tree.update({"children": folders_list}) @@ -1937,8 +1921,7 @@ def my_assignments(self, request): .distinct() ) non_active_controls = controls.exclude(status="active") - risk_scenarios = RiskScenario.objects.filter( - owner=request.user).distinct() + risk_scenarios = RiskScenario.objects.filter(owner=request.user).distinct() controls_progress = 0 evidences_progress = 0 tot_ac = controls.count() @@ -1952,15 +1935,12 @@ def my_assignments(self, request): evidences_progress = int((with_evidences / tot_ac) * 100) - RA_serializer = RiskAssessmentReadSerializer( - risk_assessments[:10], many=True) - CA_serializer = ComplianceAssessmentReadSerializer( - audits[:6], many=True) + RA_serializer = RiskAssessmentReadSerializer(risk_assessments[:10], many=True) + CA_serializer = ComplianceAssessmentReadSerializer(audits[:6], many=True) AC_serializer = AppliedControlReadSerializer( non_active_controls[:10], many=True ) - RS_serializer = RiskScenarioReadSerializer( - risk_scenarios[:10], many=True) + RS_serializer = RiskScenarioReadSerializer(risk_scenarios[:10], many=True) return Response( { @@ -2034,8 +2014,7 @@ def get_agg_data(request): Folder.get_root_folder(), request.user, RiskAssessment )[0] data = risk_status( - request.user, RiskAssessment.objects.filter( - id__in=viewable_risk_assessments) + request.user, RiskAssessment.objects.filter(id__in=viewable_risk_assessments) ) return Response({"results": data}) @@ -2089,8 +2068,7 @@ def get_composer_data(request): data = compile_risk_assessment_for_composer(request.user, risk_assessments) for _data in data["risk_assessment_objects"]: - quality_check = serialize_nested( - _data["risk_assessment"].quality_check()) + quality_check = serialize_nested(_data["risk_assessment"].quality_check()) _data["risk_assessment"] = RiskAssessmentReadSerializer( _data["risk_assessment"] ).data @@ -2161,8 +2139,7 @@ def used(self, request): used_frameworks = _used_frameworks.values("id", "name") for i in range(len(used_frameworks)): used_frameworks[i]["compliance_assessments_count"] = ( - ComplianceAssessment.objects.filter( - framework=_used_frameworks[i].id) + ComplianceAssessment.objects.filter(framework=_used_frameworks[i].id) .filter(id__in=viewable_assessments) .count() ) @@ -2172,11 +2149,9 @@ def used(self, request): def mappings(self, request, pk): framework = self.get_object() available_target_frameworks_objects = [framework] - mappings = RequirementMappingSet.objects.filter( - source_framework=framework) + mappings = RequirementMappingSet.objects.filter(source_framework=framework) for mapping in mappings: - available_target_frameworks_objects.append( - mapping.target_framework) + available_target_frameworks_objects.append(mapping.target_framework) available_target_frameworks = FrameworkReadSerializer( available_target_frameworks_objects, many=True ).data @@ -2215,8 +2190,7 @@ class EvidenceViewSet(BaseModelViewSet): """ model = Evidence - filterset_fields = ["folder", "applied_controls", - "requirement_assessments", "name"] + filterset_fields = ["folder", "applied_controls", "requirement_assessments", "name"] search_fields = ["name"] ordering_fields = ["name", "description"] @@ -2650,16 +2624,14 @@ def export(self, request, pk): with tempfile.NamedTemporaryFile(delete=True) as tmp: # Download the attachment to the temporary file if default_storage.exists(evidence.attachment.name): - file = default_storage.open( - evidence.attachment.name) + file = default_storage.open(evidence.attachment.name) tmp.write(file.read()) tmp.flush() zipf.write( tmp.name, os.path.join( "evidences", - os.path.basename( - evidence.attachment.name), + os.path.basename(evidence.attachment.name), ), ) zipf.writestr("index.html", index_content) @@ -2775,8 +2747,7 @@ def todo(self, request): for key in ["created_at","updated_at","eta"] : measures[i][key] = str(measures[i][key])""" - ranking_scores = {str(mtg.id): mtg.get_ranking_score() - for mtg in measures} + ranking_scores = {str(mtg.id): mtg.get_ranking_score() for mtg in measures} measures = [AppliedControlReadSerializer(mtg).data for mtg in measures] @@ -2833,8 +2804,7 @@ class RequirementMappingSetViewSet(BaseModelViewSet): @action(detail=True, methods=["get"], url_path="graph_data") def graph_data(self, request, pk=None): mapping_set_id = pk - mapping_set = get_object_or_404( - RequirementMappingSet, id=mapping_set_id) + mapping_set = get_object_or_404(RequirementMappingSet, id=mapping_set_id) nodes = [] links = [] @@ -2874,8 +2844,7 @@ def graph_data(self, request, pk=None): ) tnodes_idx[req.ref_id] = N N += 1 - req_mappings = RequirementMapping.objects.filter( - mapping_set=mapping_set_id) + req_mappings = RequirementMapping.objects.filter(mapping_set=mapping_set_id) for item in req_mappings: if ( item.source_requirement.assessable @@ -2973,10 +2942,8 @@ def generate_html( graph = filter_graph_by_implementation_groups(graph, implementation_groups) flattened_graph = flatten_dict(graph) - requirement_nodes = requirement_nodes.filter( - urn__in=flattened_graph.values()) - assessments = assessments.filter( - requirement__urn__in=flattened_graph.values()) + requirement_nodes = requirement_nodes.filter(urn__in=flattened_graph.values()) + assessments = assessments.filter(requirement__urn__in=flattened_graph.values()) node_per_urn = {r.urn: r for r in requirement_nodes} ancestors = {} @@ -3018,10 +2985,8 @@ def generate_data_rec(requirement_node: RequirementNode): node_data["assessments"] = assessment node_data["result"] = assessment.get_result_display() node_data["status"] = assessment.get_status_display() - node_data["result_color_class"] = color_css_class( - assessment.result) - node_data["status_color_class"] = color_css_class( - assessment.status) + node_data["result_color_class"] = color_css_class(assessment.result) + node_data["status_color_class"] = color_css_class(assessment.status) direct_evidences = assessment.evidences.all() if direct_evidences: selected_evidences += direct_evidences