Skip to content

Commit

Permalink
Ca 546 enrich assets to support security objectives and rto rpo mtd (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
eric-intuitem authored Nov 22, 2024
2 parents e13010e + fd83da1 commit 984c07f
Show file tree
Hide file tree
Showing 33 changed files with 1,245 additions and 75 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Generated by Django 5.1.1 on 2024-11-20 18:29

import core.validators
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("core", "0036_asset_owner"),
]

operations = [
migrations.AddField(
model_name="asset",
name="disaster_recovery_objectives",
field=models.JSONField(
blank=True,
default=dict,
help_text="The disaster recovery objectives of the asset",
validators=[
core.validators.JSONSchemaInstanceValidator(
{
"$id": "https://ciso-assistant.com/schemas/assets/security_objectives.schema.json",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"description": "The security objectives of the asset",
"properties": {
"objectives": {
"patternProperties": {
"^[a-z_]+$": {
"properties": {
"value": {
"minimum": 0,
"type": "integer",
}
},
"type": "object",
}
},
"type": "object",
}
},
"title": "Security objectives",
"type": "object",
}
)
],
verbose_name="Disaster recovery objectives",
),
),
migrations.AddField(
model_name="asset",
name="reference_link",
field=models.URLField(
blank=True,
help_text="External url for action follow-up (eg. Jira ticket)",
max_length=2048,
null=True,
verbose_name="Link",
),
),
migrations.AddField(
model_name="asset",
name="security_objectives",
field=models.JSONField(
blank=True,
default=dict,
help_text="The security objectives of the asset",
validators=[
core.validators.JSONSchemaInstanceValidator(
{
"$id": "https://ciso-assistant.com/schemas/assets/security_objectives.schema.json",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"description": "The security objectives of the asset",
"properties": {
"objectives": {
"patternProperties": {
"^[a-z_]+$": {
"properties": {
"is_enabled": {"type": "boolean"},
"value": {
"minimum": 0,
"type": "integer",
},
},
"type": "object",
}
},
"type": "object",
}
},
"title": "Security objectives",
"type": "object",
}
)
],
verbose_name="Security objectives",
),
),
]
201 changes: 198 additions & 3 deletions backend/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@
update_translations_as_string,
update_translations_in_object,
)
from global_settings.models import GlobalSettings

from .base_models import AbstractBaseModel, ETADueDateMixin, NameDescriptionMixin
from .utils import camel_case, sha256
from .validators import validate_file_name, validate_file_size
from .validators import (
validate_file_name,
validate_file_size,
JSONSchemaInstanceValidator,
)

logger = get_logger(__name__)

Expand Down Expand Up @@ -1213,6 +1218,75 @@ class Type(models.TextChoices):
PRIMARY = "PR", _("Primary")
SUPPORT = "SP", _("Support")

DEFAULT_SECURITY_OBJECTIVES = (
"confidentiality",
"integrity",
"availability",
"proof",
"authenticity",
"privacy",
"safety",
)

SECURITY_OBJECTIVES_JSONSCHEMA = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://ciso-assistant.com/schemas/assets/security_objectives.schema.json",
"title": "Security objectives",
"description": "The security objectives of the asset",
"type": "object",
"properties": {
"objectives": {
"type": "object",
"patternProperties": {
"^[a-z_]+$": {
"type": "object",
"properties": {
"value": {
"type": "integer",
"minimum": 0,
},
"is_enabled": {
"type": "boolean",
},
},
},
},
}
},
}

DEFAULT_DISASTER_RECOVERY_OBJECTIVES = ("rto", "rpo", "mtd")

DISASTER_RECOVERY_OBJECTIVES_JSONSCHEMA = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://ciso-assistant.com/schemas/assets/security_objectives.schema.json",
"title": "Security objectives",
"description": "The security objectives of the asset",
"type": "object",
"properties": {
"objectives": {
"type": "object",
"patternProperties": {
"^[a-z_]+$": {
"type": "object",
"properties": {
"value": {
"type": "integer",
"minimum": 0,
},
},
},
},
}
},
}

SECURITY_OBJECTIVES_SCALES = {
"1-4": range(1, 5),
"0-3": range(0, 4),
"FIPS-199": ["low", "moderate", "moderate", "high"],
}

business_value = models.CharField(
max_length=200, blank=True, verbose_name=_("business value")
)
Expand All @@ -1222,6 +1296,29 @@ class Type(models.TextChoices):
parent_assets = models.ManyToManyField(
"self", blank=True, verbose_name=_("parent assets"), symmetrical=False
)
reference_link = models.URLField(
null=True,
blank=True,
max_length=2048,
help_text=_("External url for action follow-up (eg. Jira ticket)"),
verbose_name=_("Link"),
)
security_objectives = models.JSONField(
default=dict,
blank=True,
verbose_name=_("Security objectives"),
help_text=_("The security objectives of the asset"),
validators=[JSONSchemaInstanceValidator(SECURITY_OBJECTIVES_JSONSCHEMA)],
)
disaster_recovery_objectives = models.JSONField(
default=dict,
blank=True,
verbose_name=_("Disaster recovery objectives"),
help_text=_("The disaster recovery objectives of the asset"),
validators=[
JSONSchemaInstanceValidator(DISASTER_RECOVERY_OBJECTIVES_JSONSCHEMA)
],
)
owner = models.ManyToManyField(
User,
blank=True,
Expand All @@ -1237,23 +1334,121 @@ class Meta:
def __str__(self) -> str:
return str(self.name)

@property
def is_primary(self) -> bool:
"""
Returns True if the asset is a primary asset.
"""
return self.type == Asset.Type.PRIMARY

@property
def is_support(self) -> bool:
"""
Returns True if the asset is a support asset.
"""
return self.type == Asset.Type.SUPPORT

def ancestors_plus_self(self) -> list[Self]:
def ancestors_plus_self(self) -> set[Self]:
result = {self}
for x in self.parent_assets.all():
result.update(x.ancestors_plus_self())
return list(result)
return set(result)

def get_security_objectives(self) -> dict[str, dict[str, dict[str, int | bool]]]:
"""
Gets the security objectives of a given asset.
If the asset is a primary asset, the security objectives are directly stored in the asset.
If the asset is a supporting asset, the security objectives are the union of the security objectives of all the primary assets it supports.
If multiple ancestors share the same security objective, its value in the result is its highest value among the ancestors.
"""
if self.is_primary:
return self.security_objectives

ancestors = self.ancestors_plus_self()
primary_assets = {asset for asset in ancestors if asset.is_primary}
if not primary_assets:
return {}

security_objectives = {}
for asset in primary_assets:
for key, content in asset.security_objectives.get("objectives", {}).items():
if not content.get("is_enabled", False):
continue
if key not in security_objectives:
security_objectives[key] = content
else:
security_objectives[key]["value"] = max(
security_objectives[key]["value"], content.get("value", 0)
)
return {"objectives": security_objectives}

def get_disaster_recovery_objectives(self) -> dict[str, dict[str, dict[str, int]]]:
"""
Gets the disaster recovery objectives of a given asset.
If the asset is a primary asset, the disaster recovery objectives are directly stored in the asset.
If the asset is a supporting asset, the disaster recovery objectives are the union of the disaster recovery objectives of all the primary assets it supports.
If multiple ancestors share the same disaster recovery objective, its value in the result is its lowest value among the ancestors.
"""
if self.is_primary:
return self.disaster_recovery_objectives

ancestors = self.ancestors_plus_self()
primary_assets = {asset for asset in ancestors if asset.is_primary}
if not primary_assets:
return {}

disaster_recovery_objectives = {}
for asset in primary_assets:
for key, content in asset.disaster_recovery_objectives.get(
"objectives", {}
).items():
if key not in disaster_recovery_objectives:
disaster_recovery_objectives[key] = content
else:
disaster_recovery_objectives[key] = min(
disaster_recovery_objectives[key], content.get("value", 0)
)

return {"objectives": disaster_recovery_objectives}

def get_security_objectives_display(self) -> list[dict[str, str]]:
"""
Gets the security objectives of a given asset as strings.
"""
security_objectives = self.get_security_objectives()
if len(security_objectives) == 0:
return []
general_settings = GlobalSettings.objects.filter(name="general").first()
scale = (
general_settings.value.get("security_objective_scale", "1-4")
if general_settings
else "1-4"
)
return [
{
"str": f"{key}: {self.SECURITY_OBJECTIVES_SCALES[scale][content.get('value', 0)]}",
}
for key, content in security_objectives.get("objectives", {}).items()
if content.get("is_enabled", False)
and content.get("value", -1) in range(0, 5)
]

def get_disaster_recovery_objectives_display(self) -> list[dict[str, str]]:
"""
Gets the disaster recovery objectives of a given asset as strings.
"""
disaster_recovery_objectives = self.get_disaster_recovery_objectives()
return [
{"str": f"{key}: {content.get('value', 0)}"}
for key, content in disaster_recovery_objectives.get(
"objectives", {}
).items()
if content.get("value", 0)
]

def save(self, *args, **kwargs) -> None:
self.full_clean()
return super().save(*args, **kwargs)


class Evidence(NameDescriptionMixin, FolderMixin, PublishInRootFolderMixin):
Expand Down
6 changes: 6 additions & 0 deletions backend/core/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ class AssetReadSerializer(AssetWriteSerializer):
folder = FieldsRelatedField()
parent_assets = FieldsRelatedField(many=True)
owner = FieldsRelatedField(many=True)
security_objectives = serializers.JSONField(
source="get_security_objectives_display"
)
disaster_recovery_objectives = serializers.JSONField(
source="get_disaster_recovery_objectives_display"
)

type = serializers.CharField(source="get_type_display")

Expand Down
Loading

0 comments on commit 984c07f

Please sign in to comment.