Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Library update #467

Merged
merged 15 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
314 changes: 298 additions & 16 deletions backend/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from django.apps import apps
from django.forms.models import model_to_dict
from django.contrib.auth import get_user_model
from django.db import models
from django.db import models, transaction
from django.utils.translation import gettext_lazy as _
from django.db.models import Q

Expand All @@ -18,7 +18,7 @@

from django.urls import reverse
from datetime import date, datetime
from typing import Union, Self
from typing import Union, Dict, Set, List, Tuple, Type, Self
from django.utils.html import format_html

from structlog import get_logger
Expand Down Expand Up @@ -148,20 +148,22 @@ def store_library_content(
logger.error("Error while loading library content", error=err)
raise ValueError(err)

urn = library_data["urn"]
urn = library_data["urn"].lower()
if not match_urn(urn):
raise ValueError("Library URN is badly formatted")
locale = library_data.get("locale", "en")
version = int(library_data["version"])

if StoredLibrary.objects.filter(
urn=urn, locale=locale, version=version
).exists():
return None # We do not store the library if it is same content

is_loaded = LoadedLibrary.objects.filter(
urn=urn, locale=locale, version=version
is_loaded = LoadedLibrary.objects.filter( # We consider the library as loaded even if the loaded version is different
urn=urn, locale=locale
).exists()
if StoredLibrary.objects.filter(urn=urn, locale=locale, version__gte=version):
return None # We do not accept to store outdated libraries

# This code allows adding outdated libraries in the library store but they will be erased if a greater version of this library is stored.
for outdated_library in StoredLibrary.objects.filter(
urn=urn, locale=locale, version__lt=version
):
outdated_library.delete()

objects_meta = {
key: (1 if key == "framework" else len(value))
Expand Down Expand Up @@ -205,6 +207,9 @@ def store_library_file(
def load(self) -> Union[str, None]:
from library.utils import LibraryImporter

if LoadedLibrary.objects.filter(urn=self.urn, locale=self.locale):
return "This library has already been loaded."

library_importer = LibraryImporter(self)
error_msg = library_importer.import_library()
if error_msg is None:
Expand All @@ -213,11 +218,290 @@ def load(self) -> Union[str, None]:
return error_msg


class LibraryUpdater:
def __init__(self, old_library: Type["LoadedLibrary"], new_library: StoredLibrary):
self.old_library = old_library
self.old_objects = [
*old_library.threats.all(),
*old_library.reference_controls.all(),
*old_library.threats.all(),
*old_library.risk_matrices.all(),
]
self.new_library = new_library
library_content = json.loads(self.new_library.content)
self.dependencies = self.new_library.dependencies
if self.dependencies is None:
self.dependencies = []
self.new_framework = library_content.get("framework")
self.new_matrices = library_content.get("risk_matrix")
self.threats = library_content.get("threats", [])
self.reference_controls = library_content.get("reference_controls", [])
self.new_objects = {obj["urn"].lower(): obj for obj in self.threats}
self.new_objects.update(
{obj["urn"].lower(): obj for obj in self.reference_controls}
)
if self.new_framework:
self.new_objects[self.new_framework["urn"].lower()] = self.new_framework
if self.new_matrices:
for matrix in self.new_matrices:
self.new_objects[matrix["urn"].lower()] = matrix

def update_dependencies(self) -> Union[str, None]:
for dependency_urn in self.dependencies:
possible_dependencies = [*LoadedLibrary.objects.filter(urn=dependency_urn)]
if (
not possible_dependencies
): # This part of the code hasn't been tested yet
stored_dependencies = [
*StoredLibrary.objects.filter(urn=dependency_urn)
]
if not stored_dependencies:
return "dependencyNotFound"
dependency = stored_dependencies[0]
for i in range(1, len(stored_dependencies)):
stored_dependency = stored_dependencies[i]
if stored_dependency.locale == self.old_library.locale:
dependency = stored_dependency
if err_msg := dependency.load():
return err_msg
continue

dependency = possible_dependencies[0]
for i in range(1, len(possible_dependencies)):
possible_dependency = possible_dependencies[i]
if possible_dependency.locale == self.old_library.locale:
dependency = possible_dependency

if (err_msg := dependency.update()) not in [None, "libraryHasNoUpdate"]:
return err_msg

# We should create a LibraryVerifier class in the future that check if the library is valid and use it for a better error handling.
def update_library(self) -> Union[str, None]:
if (error_msg := self.update_dependencies()) is not None:
return error_msg

old_dependencies_urn = {
dependency.urn for dependency in self.old_library.dependencies.all()
}
dependencies_urn = set(self.dependencies)
new_dependencies_urn = dependencies_urn - old_dependencies_urn

if not set(dependencies_urn).issuperset(old_dependencies_urn):
return "invalidLibraryUpdate"

new_dependencies = []
for new_dependency_urn in new_dependencies_urn:
try:
new_dependency = LoadedLibrary.objects.filter(
urn=new_dependency_urn
).first() # The locale is not handled by this code
except:
return "dependencyNotFound"
new_dependencies.append(new_dependency)

for key, value in [
("name", self.new_library.name),
("version", self.new_library.version),
("provider", self.new_library.provider),
(
"packager",
self.new_library.packager,
), # A user can fake a builtin library in this case because he can update a builtin library by adding its own library with the same URN as a builtin library.
("ref_id", self.new_library.ref_id), # Should we even update the ref_id ?
("description", self.new_library.description),
("annotation", self.new_library.annotation),
("copyright", self.new_library.copyright),
("objects_meta", self.new_library.objects_meta),
]:
setattr(self.old_library, key, value)
self.old_library.save()

for new_dependency in new_dependencies:
self.old_library.dependencies.add(new_dependency)

referential_object_dict = {
"locale": self.old_library.locale,
"default_locale": self.old_library.default_locale,
"provider": self.new_library.provider,
"is_published": True,
}

for threat in self.threats:
Threat.objects.update_or_create(
urn=threat["urn"].lower(),
defaults=threat,
create_defaults={
**referential_object_dict,
**threat,
"library": self.old_library,
},
)

for reference_control in self.reference_controls:
ReferenceControl.objects.update_or_create(
urn=reference_control["urn"].lower(),
defaults=reference_control,
create_defaults={
**referential_object_dict,
**reference_control,
"library": self.old_library,
},
)

if self.new_framework is not None:
framework_dict = {**self.new_framework}
del framework_dict["requirement_nodes"]

new_framework, _ = Framework.objects.update_or_create(
urn=self.new_framework["urn"],
defaults=framework_dict,
create_defaults={
**referential_object_dict,
**framework_dict,
"library": self.old_library,
},
)

requirement_node_urns = set(
rc.urn for rc in RequirementNode.objects.filter(framework=new_framework)
)
new_requirement_node_urns = set(
rc["urn"].lower() for rc in self.new_framework["requirement_nodes"]
)
deleted_requirement_node_urns = (
requirement_node_urns - new_requirement_node_urns
)

for requirement_node_urn in deleted_requirement_node_urns:
requirement_node = RequirementNode.objects.filter(
urn=requirement_node_urn
).first() # locale is not used, so if there are more than one requirement node with this URN only the first fetched requirement node will be deleted.
if requirement_node is not None:
requirement_node.delete()

requirement_nodes = self.new_framework["requirement_nodes"]
involved_library_urns = [*self.dependencies, self.old_library.urn]
involved_libraries = set(
LoadedLibrary.objects.filter(urn__in=involved_library_urns)
)
objects_tracked = {}

for threat in Threat.objects.filter(library__in=involved_libraries):
objects_tracked[threat.urn] = threat

for rc in ReferenceControl.objects.filter(library__in=involved_libraries):
objects_tracked[rc.urn] = rc

compliance_assessments = [
*ComplianceAssessment.objects.filter(framework=new_framework)
]

order_id = 0
for requirement_node in requirement_nodes:
requirement_node_dict = {**requirement_node}
for key in ["maturity", "depth", "reference_controls", "threats"]:
requirement_node_dict.pop(key, None)
requirement_node_dict["order_id"] = order_id
order_id += 1

new_requirement_node, created = (
RequirementNode.objects.update_or_create(
urn=requirement_node["urn"].lower(),
defaults=requirement_node_dict,
create_defaults={
**referential_object_dict,
**requirement_node_dict,
"framework": new_framework,
},
)
)

if created:
for compliance_assessment in compliance_assessments:
ra = RequirementAssessment.objects.create(
compliance_assessment=compliance_assessment,
requirement=new_requirement_node,
folder=compliance_assessment.project.folder,
)

for threat_urn in requirement_node_dict.get("threats", []):
thread_to_add = objects_tracked.get(threat_urn)
if thread_to_add is None: # I am not 100% this condition is usefull
thread_to_add = Threat.objects.filter(
urn=threat_urn
).first() # No locale support
if thread_to_add is not None:
new_requirement_node.threats.add(thread_to_add)

for reference_control_urn in requirement_node.get(
"reference_controls", []
):
reference_control_to_add = objects_tracked.get(
reference_control_urn
)
if (
reference_control_to_add is None
): # I am not 100% this condition is usefull
reference_control_to_add = ReferenceControl.objects.filter(
urn=reference_control_urn.lower()
).first() # No locale support

if reference_control_to_add is not None:
new_requirement_node.reference_controls.add(
reference_control_to_add
)

if self.new_matrices is not None:
for matrix in self.new_matrices:
json_definition_keys = {
"grid",
"probability",
"impact",
"risk",
} # Store this as a constant somewhere (as a static attribute of the class)
other_keys = set(matrix.keys()) - json_definition_keys
matrix_dict = {key: matrix[key] for key in other_keys}
matrix_dict["json_definition"] = {}
for key in json_definition_keys:
if (
key in matrix
): # If all keys are mandatory this condition is useless
matrix_dict["json_definition"][key] = matrix[key]
matrix_dict["json_definition"] = json.dumps(
matrix_dict["json_definition"]
)

RiskMatrix.objects.update_or_create(
urn=matrix["urn"].lower(),
defaults=matrix_dict,
create_defaults={
**referential_object_dict,
**matrix_dict,
"library": self.old_library,
},
)


class LoadedLibrary(LibraryMixin):
dependencies = models.ManyToManyField(
"self", blank=True, verbose_name=_("Dependencies"), symmetrical=False
)

@transaction.atomic
def update(self):
new_libraries = [
*StoredLibrary.objects.filter(
urn=self.urn, locale=self.locale, version__gt=self.version
)
]

if not new_libraries:
return "libraryHasNoUpdate"

new_library = max(new_libraries, key=lambda lib: lib.version)
library_updater = LibraryUpdater(self, new_library)
return library_updater.update_library()

@property
def _objects(self):
res = {}
Expand Down Expand Up @@ -277,11 +561,9 @@ def delete(self, *args, **kwargs):
f"This library is a dependency of {dependent_libraries.count()} other libraries"
)
super(LoadedLibrary, self).delete(*args, **kwargs)
stored_library = StoredLibrary.objects.get(
urn=self.urn, locale=self.locale, version=self.version
) # I don't if it works yet
stored_library.is_loaded = False
stored_library.save()
StoredLibrary.objects.filter(urn=self.urn, locale=self.locale).update(
is_loaded=False
)


class Threat(ReferentialObjectMixin, PublishInRootFolderMixin):
Expand Down
4 changes: 3 additions & 1 deletion backend/library/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ def import_requirement_node(self, framework_object: Framework):
)

for threat in self.requirement_data.get("threats", []):
requirement_node.threats.add(Threat.objects.get(urn=threat.lower()))
requirement_node.threats.add(
Threat.objects.get(urn=threat.lower())
) # URN are not case insensitive in the whole codebase yet, we should fix that and make sure URNs are always transformed into lowercase before being used.

for reference_control in self.requirement_data.get("reference_controls", []):
requirement_node.reference_controls.add(
Expand Down
Loading
Loading