diff --git a/backend/app_tests/api/test_api_policies.py b/backend/app_tests/api/test_api_policies.py new file mode 100644 index 000000000..94d3fa026 --- /dev/null +++ b/backend/app_tests/api/test_api_policies.py @@ -0,0 +1,206 @@ +import pytest +from rest_framework.test import APIClient +from core.models import SecurityFunction, Policy +from iam.models import Folder + +from test_api import EndpointTestsQueries + +# Generic policy data for tests +POLICY_NAME = "Test Policy" +POLICY_DESCRIPTION = "Test Description" +POLICY_STATUS = ("planned", "Planned") +POLICY_STATUS2 = ("active", "Active") +POLICY_EFFORT = ("L", "Large") +POLICY_EFFORT2 = ("M", "Medium") +POLICY_LINK = "https://example.com" +POLICY_ETA = "2024-01-01" + + +@pytest.mark.django_db +class TestPolicysUnauthenticated: + """Perform tests on policies API endpoint without authentication""" + + client = APIClient() + + def test_get_security_measures(self): + """test to get policies from the API without authentication""" + + EndpointTestsQueries.get_object( + self.client, + "policies", + Policy, + { + "name": POLICY_NAME, + "description": POLICY_DESCRIPTION, + "folder": Folder.objects.create(name="test"), + }, + ) + + def test_create_security_measures(self): + """test to create policies with the API without authentication""" + + EndpointTestsQueries.create_object( + self.client, + "policies", + Policy, + { + "name": POLICY_NAME, + "description": POLICY_DESCRIPTION, + "folder": Folder.objects.create(name="test").id, + }, + ) + + def test_update_security_measures(self): + """test to update policies with the API without authentication""" + + EndpointTestsQueries.update_object( + self.client, + "policies", + Policy, + { + "name": POLICY_NAME, + "description": POLICY_DESCRIPTION, + "folder": Folder.objects.create(name="test"), + }, + { + "name": "new " + POLICY_NAME, + "description": "new " + POLICY_DESCRIPTION, + "folder": Folder.objects.create(name="test2").id, + }, + ) + + def test_delete_security_measures(self): + """test to delete policies with the API without authentication""" + + EndpointTestsQueries.delete_object( + self.client, + "policies", + Policy, + { + "name": POLICY_NAME, + "folder": Folder.objects.create(name="test"), + }, + ) + + +@pytest.mark.django_db +class TestPolicysAuthenticated: + """Perform tests on policies API endpoint with authentication""" + + def test_get_security_measures(self, authenticated_client): + """test to get policies from the API with authentication""" + + EndpointTestsQueries.Auth.get_object( + authenticated_client, + "policies", + Policy, + { + "name": POLICY_NAME, + "description": POLICY_DESCRIPTION, + "status": POLICY_STATUS[0], + "link": POLICY_LINK, + "eta": POLICY_ETA, + "effort": POLICY_EFFORT[0], + "folder": Folder.get_root_folder(), + }, + { + "folder": {"str": Folder.get_root_folder().name}, + "security_function": None, + "status": POLICY_STATUS[1], + "effort": POLICY_EFFORT[1], + }, + ) + + def test_create_security_measures(self, authenticated_client): + """test to create policies with the API with authentication""" + + security_function = SecurityFunction.objects.create( + name="test", typical_evidence={}, folder=Folder.objects.create(name="test") + ) + + EndpointTestsQueries.Auth.create_object( + authenticated_client, + "policies", + Policy, + { + "name": POLICY_NAME, + "description": POLICY_DESCRIPTION, + "status": POLICY_STATUS[0], + "link": POLICY_LINK, + "eta": POLICY_ETA, + "effort": POLICY_EFFORT[0], + "folder": str(Folder.get_root_folder().id), + }, + { + "folder": {"str": Folder.get_root_folder().name}, + "status": POLICY_STATUS[1], + "effort": POLICY_EFFORT[1], + }, + ) + + def test_update_security_measures(self, authenticated_client): + """test to update policies with the API with authentication""" + + folder = Folder.objects.create(name="test") + security_function = SecurityFunction.objects.create( + name="test", typical_evidence={}, folder=folder + ) + + EndpointTestsQueries.Auth.update_object( + authenticated_client, + "policies", + Policy, + { + "name": POLICY_NAME, + "description": POLICY_DESCRIPTION, + "status": POLICY_STATUS[0], + "link": POLICY_LINK, + "eta": POLICY_ETA, + "effort": POLICY_EFFORT[0], + "folder": Folder.get_root_folder(), + }, + { + "name": "new " + POLICY_NAME, + "description": "new " + POLICY_DESCRIPTION, + "status": POLICY_STATUS2[0], + "link": "new " + POLICY_LINK, + "eta": "2025-01-01", + "effort": POLICY_EFFORT2[0], + "folder": str(folder.id), + }, + { + "folder": {"str": Folder.get_root_folder().name}, + "status": POLICY_STATUS[1], + "effort": POLICY_EFFORT[1], + }, + ) + + def test_delete_security_measures(self, authenticated_client): + """test to delete policies with the API with authentication""" + + EndpointTestsQueries.Auth.delete_object( + authenticated_client, + "policies", + Policy, + { + "name": POLICY_NAME, + "folder": Folder.objects.create(name="test"), + }, + ) + + def test_get_effort_choices(self, authenticated_client): + """test to get policies effort choices from the API with authentication""" + + EndpointTestsQueries.Auth.get_object_options( + authenticated_client, "policies", "effort", Policy.EFFORT + ) + + def test_get_status_choices(self, authenticated_client): + """test to get policies status choices from the API with authentication""" + + EndpointTestsQueries.Auth.get_object_options( + authenticated_client, + "policies", + "status", + Policy.Status.choices, + ) diff --git a/backend/app_tests/test_vars.py b/backend/app_tests/test_vars.py index e0703cc35..a2bd354ca 100644 --- a/backend/app_tests/test_vars.py +++ b/backend/app_tests/test_vars.py @@ -16,6 +16,7 @@ RISK_SCENARIOS_ENDPOINT = "risk-scenarios-list" SECURITY_FUNCTIONS_ENDPOINT = "security-functions-list" SECURITY_MEASURES_ENDPOINT = "security-measures-list" +POLICIES_ENDPOINT = "policies-list" THREATS_ENDPOINT = "threats-list" USERS_ENDPOINT = "users-list" diff --git a/backend/core/apps.py b/backend/core/apps.py index 0cee6422f..c7b067b95 100644 --- a/backend/core/apps.py +++ b/backend/core/apps.py @@ -1,19 +1,21 @@ from django.apps import AppConfig + def startup(): """Implement CISO Assistant 1.0 default Roles and User Groups""" - from iam.models import Folder - from iam.models import UserGroup, Role, RoleAssignment + from ciso_assistant.settings import ( + CISO_ASSISTANT_SUPERUSER_EMAIL, + ) from django.contrib.auth.models import Permission - from iam.models import User - from ciso_assistant.settings import CISO_ASSISTANT_SUPERUSER_EMAIL, EMAIL_HOST, EMAIL_HOST_RESCUE + from iam.models import Folder, Role, RoleAssignment, User, UserGroup auditor_permissions = Permission.objects.filter( codename__in=[ "view_project", "view_riskassessment", "view_securitymeasure", + "view_policy", "view_riskscenario", "view_riskacceptance", "view_asset", @@ -34,6 +36,7 @@ def startup(): "view_project", "view_riskassessment", "view_securitymeasure", + "view_policy", "view_riskscenario", "view_riskacceptance", "approve_riskacceptance", @@ -60,10 +63,15 @@ def startup(): "add_riskassessment", "view_riskassessment", "change_riskassessment", - "delete_riskassessment" "add_securitymeasure", + "delete_riskassessment", + "add_securitymeasure", "view_securitymeasure", "change_securitymeasure", "delete_securitymeasure", + "add_policy", + "view_policy", + "change_policy", + "delete_policy", "add_riskscenario", "view_riskscenario", "change_riskscenario", @@ -109,6 +117,10 @@ def startup(): "view_securitymeasure", "change_securitymeasure", "delete_securitymeasure", + "add_policy", + "view_policy", + "change_policy", + "delete_policy", "add_riskscenario", "view_riskscenario", "change_riskscenario", @@ -183,6 +195,10 @@ def startup(): "view_securitymeasure", "change_securitymeasure", "delete_securitymeasure", + "add_policy", + "view_policy", + "change_policy", + "delete_policy", "add_riskscenario", "view_riskscenario", "change_riskscenario", diff --git a/backend/core/migrations/0004_policy.py b/backend/core/migrations/0004_policy.py new file mode 100644 index 000000000..c1f6dec29 --- /dev/null +++ b/backend/core/migrations/0004_policy.py @@ -0,0 +1,26 @@ +# Generated by Django 5.0.2 on 2024-02-14 15:40 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0003_library_dependencies_and_more'), + ] + + operations = [ + migrations.CreateModel( + name='Policy', + fields=[ + ], + options={ + 'verbose_name': 'Policy', + 'verbose_name_plural': 'Policies', + 'proxy': True, + 'indexes': [], + 'constraints': [], + }, + bases=('core.securitymeasure',), + ), + ] diff --git a/backend/core/models.py b/backend/core/models.py index 87c54f7c2..3999bc642 100644 --- a/backend/core/models.py +++ b/backend/core/models.py @@ -828,6 +828,25 @@ def get_linked_requirements_count(self): ).count() +class PolicyManager(models.Manager): + def create(self, *args, **kwargs): + kwargs["category"] = "policy" # Ensure category is always "policy" + return super().create(*args, **kwargs) + + +class Policy(SecurityMeasure): + class Meta: + proxy = True + verbose_name = _("Policy") + verbose_name_plural = _("Policies") + + objects = PolicyManager() # Use the custom manager + + def save(self, *args, **kwargs): + self.category = "policy" + super(Policy, self).save(*args, **kwargs) + + class RiskScenario(AbstractBaseModel, NameDescriptionMixin): TREATMENT_OPTIONS = [ ("open", _("Open")), diff --git a/backend/core/serializers.py b/backend/core/serializers.py index 1ef5af2d7..35caf8fff 100644 --- a/backend/core/serializers.py +++ b/backend/core/serializers.py @@ -195,6 +195,18 @@ class SecurityMeasureReadSerializer(SecurityMeasureWriteSerializer): effort = serializers.CharField(source="get_effort_display") +class PolicyWriteSerializer(SecurityMeasureWriteSerializer): + class Meta: + model = Policy + fields = "__all__" + + +class PolicyReadSerializer(SecurityMeasureReadSerializer): + class Meta: + model = Policy + fields = "__all__" + + class UserReadSerializer(BaseModelSerializer): user_groups = FieldsRelatedField(many=True) @@ -234,7 +246,10 @@ def create(self, validated_data): user = User.objects.create_user(**validated_data) except Exception as e: print(e) - if User.objects.filter(email=validated_data["email"]).exists() and send_mail: + if ( + User.objects.filter(email=validated_data["email"]).exists() + and send_mail + ): raise serializers.ValidationError( { "warning": [ @@ -244,11 +259,7 @@ def create(self, validated_data): ) else: raise serializers.ValidationError( - { - "error": [ - "An error occurred while creating the user" - ] - } + {"error": ["An error occurred while creating the user"]} ) return user @@ -364,8 +375,13 @@ class Meta: class EvidenceWriteSerializer(BaseModelSerializer): - security_measures = serializers.PrimaryKeyRelatedField(many=True, queryset=SecurityMeasure.objects.all()) - requirement_assessments = serializers.PrimaryKeyRelatedField(many=True, queryset=RequirementAssessment.objects.all()) + security_measures = serializers.PrimaryKeyRelatedField( + many=True, queryset=SecurityMeasure.objects.all() + ) + requirement_assessments = serializers.PrimaryKeyRelatedField( + many=True, queryset=RequirementAssessment.objects.all() + ) + class Meta: model = Evidence fields = "__all__" @@ -397,7 +413,6 @@ class RequirementAssessmentReadSerializer(BaseModelSerializer): name = serializers.CharField(source="__str__") compliance_assessment = FieldsRelatedField() - class Meta: model = RequirementAssessment fields = "__all__" diff --git a/backend/core/tests/test_models.py b/backend/core/tests/test_models.py index ce9f9931a..62a031a36 100644 --- a/backend/core/tests/test_models.py +++ b/backend/core/tests/test_models.py @@ -4,6 +4,7 @@ import pytest from ciso_assistant.settings import BASE_DIR from core.models import ( + Policy, Project, RiskAssessment, ComplianceAssessment, @@ -678,6 +679,51 @@ def test_measure_category_inherited_from_function(self): assert measure.category == "technical" +@pytest.mark.django_db +class TestPolicy: + pytestmark = pytest.mark.django_db + + @pytest.mark.usefixtures("root_folder_fixture") + def test_policy_creation(self): + root_folder = Folder.objects.get(content_type=Folder.ContentType.ROOT) + policy = Policy.objects.create(name="Policy", folder=root_folder) + assert Policy.objects.count() == 1 + assert SecurityMeasure.objects.count() == 1 + assert policy.name == "Policy" + assert policy.folder == root_folder + assert policy.category == "policy" + + @pytest.mark.usefixtures("root_folder_fixture") + def test_policy_does_not_inherit_category_from_security_function(self): + root_folder = Folder.objects.get(content_type=Folder.ContentType.ROOT) + folder = Folder.objects.create(name="Parent", folder=root_folder) + function = SecurityFunction.objects.create( + name="Function", folder=root_folder, category="technical" + ) + policy = Policy.objects.create( + name="Policy", folder=folder, security_function=function + ) + assert policy.category == "policy" + + @pytest.mark.usefixtures("root_folder_fixture") + def test_policy_creation_same_name(self): + root_folder = Folder.objects.get(content_type=Folder.ContentType.ROOT) + Policy.objects.create(name="Policy", folder=root_folder) + with pytest.raises(ValidationError): + Policy.objects.create(name="Policy", folder=root_folder) + + @pytest.mark.usefixtures("root_folder_fixture") + def test_policy_creation_same_name_different_folder(self): + root_folder = Folder.objects.get(content_type=Folder.ContentType.ROOT) + folder = Folder.objects.create(name="Parent", folder=root_folder) + policy1 = Policy.objects.create(name="Policy", folder=root_folder) + policy2 = Policy.objects.create(name="Policy", folder=folder) + assert policy1.name == "Policy" + assert policy2.name == "Policy" + assert policy1.folder == root_folder + assert policy2.folder == folder + + @pytest.mark.django_db class TestRiskAcceptance: pytestmark = pytest.mark.django_db diff --git a/backend/core/urls.py b/backend/core/urls.py index 0975fed3c..e890c002f 100644 --- a/backend/core/urls.py +++ b/backend/core/urls.py @@ -18,6 +18,7 @@ router.register( r"security-measures", SecurityMeasureViewSet, basename="security-measures" ) +router.register(r"policies", PolicyViewSet, basename="policies") router.register(r"risk-acceptances", RiskAcceptanceViewSet, basename="risk-acceptances") router.register( r"security-functions", SecurityFunctionViewSet, basename="security-functions" diff --git a/backend/core/views.py b/backend/core/views.py index cf1128710..13bcd243d 100644 --- a/backend/core/views.py +++ b/backend/core/views.py @@ -278,7 +278,7 @@ class SecurityFunctionViewSet(BaseModelViewSet): """ model = SecurityFunction - filterset_fields = ["folder"] + filterset_fields = ["folder", "category"] search_fields = ["name", "description", "provider"] @action(detail=False, name="Get category choices") @@ -502,7 +502,7 @@ class SecurityMeasureViewSet(BaseModelViewSet): "effort", "risk_scenarios", "requirement_assessments", - "evidences" + "evidences", ] search_fields = ["name", "description", "risk_scenarios", "requirement_assessments"] @@ -590,6 +590,20 @@ def to_review(self, request): return Response({"results": measures}) +class PolicyViewSet(SecurityMeasureViewSet): + model = Policy + filterset_fields = [ + "folder", + "status", + "security_function", + "effort", + "risk_scenarios", + "requirement_assessments", + "evidences", + ] + search_fields = ["name", "description", "risk_scenarios", "requirement_assessments"] + + class RiskScenarioViewSet(BaseModelViewSet): """ API endpoint that allows risk scenarios to be viewed or edited. @@ -1010,7 +1024,7 @@ def attachment(self, request, pk): status=status.HTTP_200_OK, ) return response - + @action(methods=["post"], detail=True) def delete_attachment(self, request, pk): ( @@ -1266,35 +1280,43 @@ class FirstConnexionPasswordConfirmView(PasswordResetConfirmView): form_class = FirstConnexionConfirmForm -def generate_html(compliance_assessment: ComplianceAssessment) -> Tuple[str, list[Evidence]]: +def generate_html( + compliance_assessment: ComplianceAssessment, +) -> Tuple[str, list[Evidence]]: selected_evidences = [] requirement_nodes = RequirementNode.objects.filter( framework=compliance_assessment.framework ) - assessments = RequirementAssessment.objects.filter(compliance_assessment=compliance_assessment).all() + assessments = RequirementAssessment.objects.filter( + compliance_assessment=compliance_assessment + ).all() - node_per_urn = {r.urn:r for r in requirement_nodes} - ancestors ={} + node_per_urn = {r.urn: r for r in requirement_nodes} + ancestors = {} for a in assessments: ancestors[a] = set() - req=a.requirement - while(req): + req = a.requirement + while req: ancestors[a].add(req) - p=req.parent_urn - req = None if not(p) else node_per_urn[p] - + p = req.parent_urn + req = None if not (p) else node_per_urn[p] + def bar_graph(node: RequirementNode): - """ return bar graph filtered by top node (Null for all)""" + """return bar graph filtered by top node (Null for all)""" content = "" compliance_assessments_status = [] - candidates = [c for c in assessments if not(node) or c==node or node in ancestors[c]] + candidates = [ + c for c in assessments if not (node) or c == node or node in ancestors[c] + ] total = len(candidates) for st in RequirementAssessment.Status: count = len([c for c in candidates if c.status == st]) compliance_assessments_status.append((st, round(count * 100 / total))) - content += '