Skip to content

Commit

Permalink
Merge branch 'feat/tprm' of https://github.com/intuitem/ciso-assistan…
Browse files Browse the repository at this point in the history
…t-community into feat/tprm
  • Loading branch information
eric-intuitem committed Sep 14, 2024
2 parents 5fa0fd0 + 80fe76c commit 73b45c2
Show file tree
Hide file tree
Showing 11 changed files with 2,487 additions and 4,198 deletions.
2 changes: 2 additions & 0 deletions backend/core/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ class Meta:
"date_joined",
"user_groups",
"is_sso",
"is_third_party",
]


Expand All @@ -357,6 +358,7 @@ class Meta:
"is_active",
"date_joined",
"user_groups",
"is_third_party",
]

def validate_email(self, email):
Expand Down
20 changes: 19 additions & 1 deletion backend/core/startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from structlog import get_logger

from ciso_assistant.settings import CISO_ASSISTANT_SUPERUSER_EMAIL
from core.utils import RoleCodename

logger = get_logger(__name__)

Expand Down Expand Up @@ -279,6 +280,16 @@
"delete_entityassessment",
]

THIRD_PARTY_RESPONDENT_PERMISSIONS_LIST = [
"view_complianceassessment",
"view_requirementassessment",
"change_requirementassessment",
"view_evidence",
"add_evidence",
"change_evidence",
"delete_evidence",
]


def startup(sender: AppConfig, **kwargs):
"""
Expand All @@ -290,7 +301,6 @@ def startup(sender: AppConfig, **kwargs):

from iam.models import Folder, Role, RoleAssignment, User, UserGroup
from tprm.models import Entity
from global_settings.models import GlobalSettings

print("startup handler: initialize database")

Expand Down Expand Up @@ -384,6 +394,14 @@ def startup(sender: AppConfig, **kwargs):
)
ra2.perimeter_folders.add(global_approvers.folder)

third_party_respondent_permissions = Permission.objects.filter(
codename__in=THIRD_PARTY_RESPONDENT_PERMISSIONS_LIST
)
third_party_respondent, created = Role.objects.get_or_create(
name=RoleCodename.THIRD_PARTY_RESPONDENT.value, builtin=True
)
third_party_respondent.permissions.set(third_party_respondent_permissions)

# if superuser defined and does not exist, then create it
if (
CISO_ASSISTANT_SUPERUSER_EMAIL
Expand Down
4 changes: 4 additions & 0 deletions backend/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class RoleCodename(Enum):
ANALYST = "BI-RL-ANA"
APPROVER = "BI-RL-APP"
READER = "BI-RL-AUD"
THIRD_PARTY_RESPONDENT = "BI-RL-TPR"

def __str__(self) -> str:
return self.value
Expand All @@ -38,6 +39,7 @@ class UserGroupCodename(Enum):
ANALYST = "BI-UG-ANA"
APPROVER = "BI-UG-APP"
READER = "BI-UG-AUD"
THIRD_PARTY_RESPONDENT = "BI-UG-TPR"

def __str__(self) -> str:
return self.value
Expand All @@ -49,6 +51,7 @@ def __str__(self) -> str:
str(RoleCodename.ANALYST): _("Analyst"),
str(RoleCodename.APPROVER): _("Approver"),
str(RoleCodename.READER): _("Reader"),
str(RoleCodename.THIRD_PARTY_RESPONDENT): _("Third-party respondent"),
}

BUILTIN_USERGROUP_CODENAMES = {
Expand All @@ -59,6 +62,7 @@ def __str__(self) -> str:
str(UserGroupCodename.ANALYST): _("Analyst"),
str(UserGroupCodename.APPROVER): _("Approver"),
str(UserGroupCodename.READER): _("Reader"),
str(UserGroupCodename.THIRD_PARTY_RESPONDENT): _("Third-party respondent"),
}

COUNTRY_FLAGS = {
Expand Down
9 changes: 8 additions & 1 deletion backend/core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,14 @@ def filter_approver(self, queryset, name, value):

class Meta:
model = User
fields = ["email", "first_name", "last_name", "is_active", "is_approver"]
fields = [
"email",
"first_name",
"last_name",
"is_active",
"is_approver",
"is_third_party",
]


class UserViewSet(BaseModelViewSet):
Expand Down
12 changes: 6 additions & 6 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
django==5.1
django==5.1.1
weasyprint==62.3
psycopg2-binary==2.9.9
gunicorn==22.0.0
pytest-django==4.8.0
gunicorn==23.0.0
pytest-django==4.9.0
pytest-html==4.1.1
django-filter==24.3
whitenoise==6.7.0
Expand All @@ -18,7 +18,7 @@ structlog==24.4.0
python-dotenv==1.0.1
drf-spectacular==0.27.2
django-rest-knox==5.0.1
django-allauth[socialaccount]==64.0.0
django-allauth[socialaccount]==64.2.1
pre-commit==3.8.0
django-allauth[saml]==64.0.0
django-allauth==64.0.0
django-allauth[saml]==64.2.1
django-allauth==64.2.1
168 changes: 91 additions & 77 deletions backend/tprm/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@

from core.serializer_fields import FieldsRelatedField
from core.serializers import BaseModelSerializer
from iam.models import Folder
from core.utils import RoleCodename, UserGroupCodename
from iam.models import Folder, Role, RoleAssignment, User, UserGroup
from tprm.models import Entity, EntityAssessment, Representative, Solution
from django.utils.translation import gettext_lazy as _
from ciso_assistant.settings import EMAIL_HOST, EMAIL_HOST_RESCUE

import structlog

logger = structlog.get_logger(__name__)


class EntityReadSerializer(BaseModelSerializer):
Expand Down Expand Up @@ -47,92 +53,100 @@ class EntityAssessmentWriteSerializer(BaseModelSerializer):
child=serializers.CharField(), required=False
)

def update(self, instance, validated_data):
for author in validated_data.get("authors", []):
if author not in instance.authors.all():
author.mailing(
email_template_name="tprm/third_party_email.html",
subject=_(
"CISO Assistant: A questionnaire has been assigned to you"
),
object="entity-assessments",
object_id=instance.id,
)
_audit = instance.compliance_assessment
if not _audit:
create_audit = validated_data.pop("create_audit")
_framework = validated_data.pop("framework", None)
_selected_implementation_groups = validated_data.pop(
def _extract_audit_data(self, validated_data):
audit_data = {
"create_audit": validated_data.pop("create_audit", False),
"framework": validated_data.pop("framework", None),
"selected_implementation_groups": validated_data.pop(
"selected_implementation_groups", None
)
instance = super().update(instance, validated_data)
if create_audit:
if not _framework:
raise serializers.ValidationError(
{"framework": [_("Framework required")]}
)
audit = ComplianceAssessment.objects.create(
name=validated_data["name"],
framework=_framework,
project=validated_data["project"],
selected_implementation_groups=_selected_implementation_groups,
)
audit.create_requirement_assessments()
instance.compliance_assessment = audit
instance.save()
return instance

class Meta:
model = EntityAssessment
exclude = ["penetration", "dependency", "maturity", "trust"]
),
}
return audit_data


class EntityAssessmentCreateSerializer(BaseModelSerializer):
create_audit = serializers.BooleanField(default=True)
framework = serializers.PrimaryKeyRelatedField(
queryset=Framework.objects.all(), required=False
)
selected_implementation_groups = serializers.ListField(
child=serializers.CharField(), required=False
)

def create(self, validated_data):
create_audit = validated_data.pop("create_audit")
_framework = validated_data.pop("framework", None)
_selected_implementation_groups = validated_data.pop(
"selected_implementation_groups", None
)
instance = super().create(validated_data)
if create_audit:
if not _framework:
def _create_or_update_audit(self, instance, audit_data):
if audit_data["create_audit"]:
if not audit_data["framework"]:
raise serializers.ValidationError(
{"framework": [_("Framework required")]}
)
enclave = Folder.objects.create(
content_type=Folder.ContentType.ENCLAVE,
name=f"{instance.project.name}/{instance.name}",
parent_folder=instance.folder,
)

audit = ComplianceAssessment.objects.create(
folder=enclave,
name=validated_data["name"],
framework=_framework,
project=validated_data["project"],
selected_implementation_groups=_selected_implementation_groups,
name=instance.name,
framework=audit_data["framework"],
project=instance.project,
selected_implementation_groups=audit_data[
"selected_implementation_groups"
],
)

if not instance.compliance_assessment:
enclave = Folder.objects.create(
content_type=Folder.ContentType.ENCLAVE,
name=f"{instance.project.name}/{instance.name}",
parent_folder=instance.folder,
)
audit.folder = enclave
audit.save()

audit.create_requirement_assessments()
instance.compliance_assessment = audit
instance.save()
if instance.authors:
for author in instance.authors.all():
author.mailing(
email_template_name="tprm/third_party_email.html",
subject=_(
"CISO Assistant: A questionnaire has been assigned to you"
),
object="entity-assessments",
object_id=instance.id,
)

def _assign_third_party_respondents(
self, instance: EntityAssessment, third_party_users: set[User]
):
enclave = instance.compliance_assessment.folder
respondents, _ = UserGroup.objects.get_or_create(
name=UserGroupCodename.THIRD_PARTY_RESPONDENT, folder=enclave, builtin=True
)
role_assignment, _ = RoleAssignment.objects.get_or_create(
user_group=respondents,
role=Role.objects.get(name=RoleCodename.THIRD_PARTY_RESPONDENT),
builtin=True,
folder=enclave,
is_recursive=True,
)
role_assignment.perimeter_folders.add(enclave)
for user in third_party_users:
if not user.is_third_party:
logger.warning("User is not a third-party", user=user)
user.user_groups.add(respondents)

def _send_author_emails(self, instance, authors_to_email: set):
if EMAIL_HOST or EMAIL_HOST_RESCUE:
for author in authors_to_email:
try:
author.mailing(
email_template_name="tprm/third_party_email.html",
subject=_(
"CISO Assistant: A questionnaire has been assigned to you"
),
object="entity-assessments",
object_id=instance.id,
)
except Exception as e:
print(f"Failed to send email to {author}: {e}")

def create(self, validated_data):
audit_data = self._extract_audit_data(validated_data)
instance = super().create(validated_data)
self._create_or_update_audit(instance, audit_data)
self._assign_third_party_respondents(instance, set(instance.authors.all()))
self._send_author_emails(instance, set(instance.authors.all()))
return instance

def update(self, instance: EntityAssessment, validated_data):
audit_data = self._extract_audit_data(validated_data)
new_authors = set(validated_data.get("authors", [])) - set(
instance.authors.all()
)
instance = super().update(instance, validated_data)

if not instance.compliance_assessment:
self._create_or_update_audit(instance, audit_data)

self._assign_third_party_respondents(instance, new_authors)
self._send_author_emails(instance, new_authors)
return instance

class Meta:
Expand Down
6 changes: 0 additions & 6 deletions backend/tprm/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from core.views import BaseModelViewSet as AbstractBaseModelViewSet
from tprm.models import Entity, Representative, Solution, EntityAssessment
from rest_framework.decorators import action
from tprm.serializers import EntityAssessmentCreateSerializer
import structlog

logger = structlog.get_logger(__name__)
Expand All @@ -30,11 +29,6 @@ class EntityAssessmentViewSet(BaseModelViewSet):
model = EntityAssessment
filterset_fields = ["status", "project", "project__folder", "authors", "entity"]

def get_serializer_class(self, **kwargs):
if self.action == "create":
return EntityAssessmentCreateSerializer
return super().get_serializer_class(**kwargs)

def destroy(self, request, *args, **kwargs):
instance = self.get_object()
if instance.compliance_assessment:
Expand Down
Loading

0 comments on commit 73b45c2

Please sign in to comment.