diff --git a/backend/core/models.py b/backend/core/models.py index 3e0da4a68..c4bf00678 100644 --- a/backend/core/models.py +++ b/backend/core/models.py @@ -266,23 +266,30 @@ def store_library_content( """ hash_checksum = sha256(library_content) if hash_checksum in StoredLibrary.HASH_CHECKSUM_SET: - return None # We do not store the library if its hash checksum is in the database. + return None # We do not store the library if its hash checksum is in the database. try: library_data = yaml.safe_load(library_content) if not isinstance(library_data, dict): - return ("LibraryErrorBadType", { - "value": repr(library_data)[:100] - "type": type(library_data).__name__, - "expected_type": "dict" - }, []) + return ( + "LibraryErrorBadType", + { + "value": repr(library_data)[:100], + "type": type(library_data).__name__, + "expected_type": "dict", + }, + [], + ) except yaml.YAMLError as e: logger.error("Error while loading library content", error=e) return ("LibraryErrorInvalidYamlFormat", {}, []) - from library.utils import LibraryFormatChecker # Should this really be imported within the function ? + from library.utils import ( + LibraryFormatChecker, + ) # Should this really be imported within the function ? + library_format_checker = LibraryFormatChecker(library_data) error_msg = library_format_checker.run() - if error_msg is not None : + if error_msg is not None: return error_msg urn = library_data["urn"].lower() @@ -342,7 +349,9 @@ def store_library_file( return StoredLibrary.store_library_content(library_content, builtin) def load(self) -> Union[str, None]: - from library.utils import LibraryImporter # Should this really be imported within the function ? + from library.utils import ( + LibraryImporter, + ) # Should this really be imported within the function ? if LoadedLibrary.objects.filter(urn=self.urn, locale=self.locale): return "This library has already been loaded." diff --git a/backend/library/utils.py b/backend/library/utils.py index 934351dfc..5a4bcb57c 100644 --- a/backend/library/utils.py +++ b/backend/library/utils.py @@ -661,7 +661,7 @@ def _import_library(self): def import_library(self) -> Union[str, None]: """Main method to import a library.""" if (error_message := self.init()) is not None: - return error_message # This error check should be done when storing the Library but no after. + return error_message # This error check should be done when storing the Library but no after. print("::: Getting Dependencies :::") error_msg = self.check_and_import_dependencies() print("::: Dependencies are ok :::") @@ -682,29 +682,36 @@ def import_library(self) -> Union[str, None]: logger.error("Library import error", error=e, library=self._library) raise e -def merge_location(error: Tuple[str, dict, List[str]], location: List[str]) -> Tuple[str, dict, List[str]] : - for field in reversed(location) : + +def merge_location( + error: Tuple[str, dict, List[str]], location: List[str] +) -> Tuple[str, dict, List[str]]: + for field in reversed(location): error[2].insert(0, field) return error + def get_object_ident(obj: dict, index: int) -> str: - for field in ["urn", "name"] : - if field in obj : + for field in ["urn", "name"]: + if field in obj: return f'{field}="{obj[field]}"' return str(index) + # This function must be used to avoid overly long output when passing a value representation in the error message. def repr_value(value: Any) -> str: """This function will raise an exception if the __repr__ method is not implemented for the value""" stringified_value = repr(value) - if len(stringified_value) > 100 : + if len(stringified_value) > 100: stringified_value = f"{stringified_value[:100]}..." return stringified_value class LibraryFormatUtils: @staticmethod - def generic_check_from_model_field(value: Any, model: Type[models.Model], field_name: str) -> Union[None,Tuple[str, dict]] : + def generic_check_from_model_field( + value: Any, model: Type[models.Model], field_name: str + ) -> Union[None, Tuple[str, dict]]: """ This function only supports checking CharField, TextField and IntegerField. The shape of a JsonField is too complex for a simple helper function like it to verify its validity. @@ -712,187 +719,239 @@ def generic_check_from_model_field(value: Any, model: Type[models.Model], field_ deferred_attribute = getattr(model, field_name) field = deferred_attribute.field - if value is None : + if value is None: if field.null or field.default is not fields.NOT_PROVIDED: return None return ("libraryErrorNonNull", {}) - if isinstance(field, (fields.CharField, fields.TextField)) : - if not isinstance(value, str) : - return ("libraryErrorBadType", { - "value": repr_value(value), - "type": type(value).__name__, - "expected_type": "str" - }) + if isinstance(field, (fields.CharField, fields.TextField)): + if not isinstance(value, str): + return ( + "libraryErrorBadType", + { + "value": repr_value(value), + "type": type(value).__name__, + "expected_type": "str", + }, + ) - if not value : - if field.blank : + if not value: + if field.blank: return None return ("libraryErrorNonBlank", {}) - if field.choices is not None : + if field.choices is not None: choices = field.choices - if not isinstance(choices[0], str) : + if not isinstance(choices[0], str): choices = [choice[0] for choice in choices] - if value in choices : + if value in choices: return None - return ("libraryErrorBadChoice", { - "value": repr_value(value), - "choices": repr_value(choices) - }) + return ( + "libraryErrorBadChoice", + {"value": repr_value(value), "choices": repr_value(choices)}, + ) max_length = field.max_length - if max_length and len(value) > max_length : - return ("libraryErrorTooLong", { - "value": repr_value(value), - "length": repr_value(len(value)), - "max_length": repr_value(max_length) - }) + if max_length and len(value) > max_length: + return ( + "libraryErrorTooLong", + { + "value": repr_value(value), + "length": repr_value(len(value)), + "max_length": repr_value(max_length), + }, + ) - elif isinstance(field, fields.IntegerField) : - if not isinstance(value, int) : - return ("libraryErrorBadType", { - "value": repr_value(value), - "type": type(value).__name__, - "expected_type": "int" - }) + elif isinstance(field, fields.IntegerField): + if not isinstance(value, int): + return ( + "libraryErrorBadType", + { + "value": repr_value(value), + "type": type(value).__name__, + "expected_type": "int", + }, + ) - elif isinstance(field, fields.BooleanField) : - if not isinstance(value, bool) : - return ("libraryErrorBadType", { - "value": repr_value(value), - "type": type(value).__name__, - "expected_type": "bool" - }) + elif isinstance(field, fields.BooleanField): + if not isinstance(value, bool): + return ( + "libraryErrorBadType", + { + "value": repr_value(value), + "type": type(value).__name__, + "expected_type": "bool", + }, + ) @staticmethod - def check_version(version: int, obj=None) -> Union[None, Tuple[str, dict]] : + def check_version(version: int, obj=None) -> Union[None, Tuple[str, dict]]: minimum_version = 1 if obj and (urn := obj.get("urn")): - try : + try: current_library = StoredLibrary.objects.get(urn=urn) minimum_version = str(current_library.version + 1) - except : + except: pass - if version < minimum_version : - return ("libraryErrorOutOfRange", { - "value": repr_value(version), - "minimum": str(minimum_version), - "maximum": "infinity" - }) + if version < minimum_version: + return ( + "libraryErrorOutOfRange", + { + "value": repr_value(version), + "minimum": str(minimum_version), + "maximum": "infinity", + }, + ) @staticmethod - def check_urn(urn: str, obj=None) -> Union[None, Tuple[str, dict]] : - if not re.match(URN_REGEX, urn) : - return ("libraryErrorInvalidString", { - "regex_pattern": URN_REGEX, - "urn": repr_value(urn) - }) + def check_urn(urn: str, obj=None) -> Union[None, Tuple[str, dict]]: + if not re.match(URN_REGEX, urn): + return ( + "libraryErrorInvalidString", + {"regex_pattern": URN_REGEX, "urn": repr_value(urn)}, + ) @staticmethod - def check_locale(locale: str, obj=None) -> Union[None, Tuple[str, dict]] : + def check_locale(locale: str, obj=None) -> Union[None, Tuple[str, dict]]: available_locales = [lang[0] for lang in settings.LANGUAGES] - if locale not in available_locales : - return ("libraryErrorBadChoice", { - "value": repr_value(locale), - "choices": "[{}]".format(", ".join(available_locales)) - }) + if locale not in available_locales: + return ( + "libraryErrorBadChoice", + { + "value": repr_value(locale), + "choices": "[{}]".format(", ".join(available_locales)), + }, + ) @staticmethod - def check_dependencies(dependencies: List[Any]) -> Union[None, Tuple[str, dict]] : - for urn in dependencies : - if not isinstance(urn, str) : - return ("libraryErrorBadType", { - "value": repr_value(urn), - "type": type(urn).__name__, - "expected_type": "str" - }) + def check_dependencies(dependencies: List[Any]) -> Union[None, Tuple[str, dict]]: + for urn in dependencies: + if not isinstance(urn, str): + return ( + "libraryErrorBadType", + { + "value": repr_value(urn), + "type": type(urn).__name__, + "expected_type": "str", + }, + ) # We do this because there is no generic checks used for the dependencies field LibraryFormatUtils.generic_check_from_model_field(urn, LoadedLibrary, "urn") # We do not verify if the URN correspond to any loaded library because the dependencies are lazy loaded. - if (result := LibraryFormatUtils.check_urn(urn)) is not None : + if (result := LibraryFormatUtils.check_urn(urn)) is not None: return result @staticmethod - def check_translations(translations: Dict[str, Dict[str, str]], obj: dict) -> Union[None, Tuple[str, dict]] : - for locale, translation_dict in translations.items() : - if (error := LibraryFormatUtils.check_locale(locale)) : + def check_translations( + translations: Dict[str, Dict[str, str]], obj: dict + ) -> Union[None, Tuple[str, dict]]: + for locale, translation_dict in translations.items(): + if error := LibraryFormatUtils.check_locale(locale): return error - for translation_text in translation_dict.values() : + for translation_text in translation_dict.values(): # It would be better to also display the "translation key" to the client, but it's would require making the signature of this function more complex and the value is already quite good to easy locate the non-string value. - if not isinstance(translation_text, str) : - return ("libraryErrorBadType", { - "value": repr_value(translation_text), - "type": type(translation_text).__name__, - "expected_type": "str" - }) + if not isinstance(translation_text, str): + return ( + "libraryErrorBadType", + { + "value": repr_value(translation_text), + "type": type(translation_text).__name__, + "expected_type": "str", + }, + ) translated_fields = set(translation_dict) obj_fields = set(obj) - if (diff := translated_fields - obj_fields) : - return ("libraryErrorUnknownFieldTranslation", { - "fields": repr_value(sorted(obj_fields)), - "bad_fields": repr_value(sorted(diff)) - }) + if diff := translated_fields - obj_fields: + return ( + "libraryErrorUnknownFieldTranslation", + { + "fields": repr_value(sorted(obj_fields)), + "bad_fields": repr_value(sorted(diff)), + }, + ) @staticmethod - def check_grid(grid: Any, risk_ids: Set[int], probabilities: List[dict], impacts: List[dict]) -> Union[None, Tuple[str, dict]] : - if not isinstance(grid, list) : - - return ("libraryErrorBadType", { - "value": repr_value(grid), - "type": type(grid).__name__, - "expected_type": "list" - }) + def check_grid( + grid: Any, risk_ids: Set[int], probabilities: List[dict], impacts: List[dict] + ) -> Union[None, Tuple[str, dict]]: + if not isinstance(grid, list): + return ( + "libraryErrorBadType", + { + "value": repr_value(grid), + "type": type(grid).__name__, + "expected_type": "list", + }, + ) for row in grid: - if not isinstance(row, list) : - return ("libraryErrorBadType", { - "value": repr_value(row), - "type": type(row).__name__, - "expected_type": "list" - }) - for risk_id in row : + if not isinstance(row, list): + return ( + "libraryErrorBadType", + { + "value": repr_value(row), + "type": type(row).__name__, + "expected_type": "list", + }, + ) + for risk_id in row: if not isinstance(risk_id, int): - return ("libraryErrorBadType", { - "value": repr_value(risk_id), - "type": type(risk_id).__name__, - "expected_type": "int" - }) - if risk_id not in risk_ids : - return ("libraryErrorInvalidMatrix", { - "reason": "libraryErrorUndefinedRiskId", - # The "description" field must disappear if we ever translate the error messages. - "description": f"One of the risk have the ID {risk_id} but this ID doesn't exist among the existing risk IDs: {sorted(risk_ids)}.", - }) - - if len(grid) != len(probabilities) : - return ("libraryErrorInvalidMatrix", { - "reason": "invalidProbabilityNumber", - # The "description" field must disappear if we ever translate the error messages. - "description": f"The number of defined probabilities ({len(probabilities)}) must be equal to the number of rows ({len(grid)}) in the grid matrix." - }) + return ( + "libraryErrorBadType", + { + "value": repr_value(risk_id), + "type": type(risk_id).__name__, + "expected_type": "int", + }, + ) + if risk_id not in risk_ids: + return ( + "libraryErrorInvalidMatrix", + { + "reason": "libraryErrorUndefinedRiskId", + # The "description" field must disappear if we ever translate the error messages. + "description": f"One of the risk have the ID {risk_id} but this ID doesn't exist among the existing risk IDs: {sorted(risk_ids)}.", + }, + ) + + if len(grid) != len(probabilities): + return ( + "libraryErrorInvalidMatrix", + { + "reason": "invalidProbabilityNumber", + # The "description" field must disappear if we ever translate the error messages. + "description": f"The number of defined probabilities ({len(probabilities)}) must be equal to the number of rows ({len(grid)}) in the grid matrix.", + }, + ) invalid_row_lengths = [] valid_row_length = len(impacts) - for index, row in enumerate(grid) : - if len(row) != valid_row_length : + for index, row in enumerate(grid): + if len(row) != valid_row_length: invalid_row_lengths.append((index, len(row))) - if invalid_row_lengths : - invalid_rows_string = ", ".join(f"row[{index}].length={length}" for index, length in invalid_row_lengths) - return ("libraryErrorInvalidMatrix", { - "reason": "invalidImpactNumber", - # The "description" field must disappear if we ever translate the error messages. - "description": f"Some rows have an invalid length (expected_length={valid_row_length}) invalid rows: {invalid_rows_string}" - }) + if invalid_row_lengths: + invalid_rows_string = ", ".join( + f"row[{index}].length={length}" for index, length in invalid_row_lengths + ) + return ( + "libraryErrorInvalidMatrix", + { + "reason": "invalidImpactNumber", + # The "description" field must disappear if we ever translate the error messages. + "description": f"Some rows have an invalid length (expected_length={valid_row_length}) invalid rows: {invalid_rows_string}", + }, + ) @staticmethod - def check_risk_matrix(risk_matrix: Any, location: List[str]) -> Union[None, Tuple[str, dict, List[str]]] : - if (error := RiskMatrixChecker.check(risk_matrix)) : + def check_risk_matrix( + risk_matrix: Any, location: List[str] + ) -> Union[None, Tuple[str, dict, List[str]]]: + if error := RiskMatrixChecker.check(risk_matrix): return merge_location(error, location) probabilities = risk_matrix["probability"] @@ -902,98 +961,145 @@ def check_risk_matrix(risk_matrix: Any, location: List[str]) -> Union[None, Tupl for field_name, matrix_field_data in [ ("probability", probabilities), - ("impact", impacts), - ("risk", risks) - ] : - for index, obj in enumerate(matrix_field_data) : + ("impact", impacts), + ("risk", risks), + ]: + for index, obj in enumerate(matrix_field_data): ident = get_object_ident(obj, index) - if not isinstance(obj, dict) : - return ("libraryErrorBadType", { - "value": repr_value(obj), - "type": type(obj).__name__, - "expected_type": "dict" - }, [*location, field_name]) - if (error := RiskMatrixProbabilityChecker.check(obj)) is not None : + if not isinstance(obj, dict): + return ( + "libraryErrorBadType", + { + "value": repr_value(obj), + "type": type(obj).__name__, + "expected_type": "dict", + }, + [*location, field_name], + ) + if (error := RiskMatrixProbabilityChecker.check(obj)) is not None: return merge_location(error, [*location, field_name, ident]) - if all("id" in risk for risk in risks) : + if all("id" in risk for risk in risks): risk_ids = {risk.get["id"] for risk in risks} - elif not any("id" in risk for risk in risks) : + elif not any("id" in risk for risk in risks): risk_ids = set(range(len(risks))) - else : + else: # If some risks have id but others don't this is invalid. - return ("libraryErrorInvalidMatrix", { - "reason": "inconsistentRiskIds", - # The "description" field must disappear if we ever translate the error messages. - "description": "You can't have risks with an explicit id while other risks don't." - }, [*location, "risk", "...id"]) # "The ..." notation is cryptic but it makes it easier to understand the origin of the error. + return ( + "libraryErrorInvalidMatrix", + { + "reason": "inconsistentRiskIds", + # The "description" field must disappear if we ever translate the error messages. + "description": "You can't have risks with an explicit id while other risks don't.", + }, + [*location, "risk", "...id"], + ) # "The ..." notation is cryptic but it makes it easier to understand the origin of the error. - if (error := LibraryFormatUtils.check_grid(grid, risk_ids, probabilities, impacts)) is not None : + if ( + error := LibraryFormatUtils.check_grid( + grid, risk_ids, probabilities, impacts + ) + ) is not None: return (*error, [*location, "grid"]) @staticmethod - def check_str(value: Any, null: bool=False, blank: bool=False, max_length: Union[int, None]=None, regex_pattern: Union[str, None]=None, obj=None) -> Union[None, Tuple[str, dict]] : + def check_str( + value: Any, + null: bool = False, + blank: bool = False, + max_length: Union[int, None] = None, + regex_pattern: Union[str, None] = None, + obj=None, + ) -> Union[None, Tuple[str, dict]]: if null and value is None: return None - if not isinstance(value, str) : - return ("libraryErrorBadType", { - "value": repr_value(value), - "type": type(value).__name__, - "expected_type": "str" - }) + if not isinstance(value, str): + return ( + "libraryErrorBadType", + { + "value": repr_value(value), + "type": type(value).__name__, + "expected_type": "str", + }, + ) - if blank and not value : + if blank and not value: return None - if max_length is not None and len(value) > max_length : - return ("libraryErrorTooLong", { - "value": repr_value(value), - "length": repr_value(len(value)), - "max_length": repr_value(max_length) - }) + if max_length is not None and len(value) > max_length: + return ( + "libraryErrorTooLong", + { + "value": repr_value(value), + "length": repr_value(len(value)), + "max_length": repr_value(max_length), + }, + ) - if regex_pattern is not None : - if not re.fullmatch(regex_pattern, value) : - return ("libraryErrorInvalidString", { - "regex_pattern": regex_pattern, - "string": repr_value(value) - }) + if regex_pattern is not None: + if not re.fullmatch(regex_pattern, value): + return ( + "libraryErrorInvalidString", + {"regex_pattern": regex_pattern, "string": repr_value(value)}, + ) @staticmethod - def check_int(value: Any, null: bool=False, min_value: Union[int, None]=None, max_value: Union[int, None]=None, obj=None) -> Union[None, Tuple[str, dict]] : - if null and value is None : + def check_int( + value: Any, + null: bool = False, + min_value: Union[int, None] = None, + max_value: Union[int, None] = None, + obj=None, + ) -> Union[None, Tuple[str, dict]]: + if null and value is None: return None - if not isinstance(value, int) : - return ("libraryErrorBadType", { - "value": repr_value(value), - "type": type(value).__name__, - "expected_type": "int" - }) + if not isinstance(value, int): + return ( + "libraryErrorBadType", + { + "value": repr_value(value), + "type": type(value).__name__, + "expected_type": "int", + }, + ) if min_value is not None and value < min_value: - return ("libraryErrorOutOfRange", { - "value": repr_value(value), - "minimum": repr_value(min_value), - "maximum": "infinity" if max_value is None else repr_value(max_value) - }) - - if max_value is not None and value > max_value : - return ("libraryErrorOutOfRange", { - "value": repr_value(value), - "minimum": "-infinity" if min_value is None else repr_value(min_value), - "maximum": repr_value(max_value) - }) + return ( + "libraryErrorOutOfRange", + { + "value": repr_value(value), + "minimum": repr_value(min_value), + "maximum": "infinity" + if max_value is None + else repr_value(max_value), + }, + ) + + if max_value is not None and value > max_value: + return ( + "libraryErrorOutOfRange", + { + "value": repr_value(value), + "minimum": "-infinity" + if min_value is None + else repr_value(min_value), + "maximum": repr_value(max_value), + }, + ) @staticmethod - def check_bool(value: Any, obj=None) -> Union[None, Tuple[str, dict]] : - if not isinstance(value, bool) : - return ("libraryErrorBadType", { - "value": repr_value(value), - "type": type(value).__name__, - "expected_type": "bool" - }) + def check_bool(value: Any, obj=None) -> Union[None, Tuple[str, dict]]: + if not isinstance(value, bool): + return ( + "libraryErrorBadType", + { + "value": repr_value(value), + "type": type(value).__name__, + "expected_type": "bool", + }, + ) @staticmethod def make_str_checker(**kwargs) -> Callable[[Any], Union[None, Tuple[str, dict]]]: @@ -1007,6 +1113,7 @@ def make_int_checker(**kwargs) -> Callable[[Any], Union[None, Tuple[str, dict]]] def make_bool_checker() -> Callable[[Any], Union[None, Tuple[str, dict]]]: return lambda value, **kw: LibraryFormatUtils.check_bool(value, **kw) + class ObjectFieldsChecker: # A set of fields that must be present in the object REQUIRED_FIELDS: Set[str] @@ -1016,32 +1123,40 @@ class ObjectFieldsChecker: # The generic checks the validity of the value by using the information provided by the field defined in the model. # Example (name = models.CharField(max_length=200)) will check if the name if the name is a non blank and non null string with a length <= to 200. # The validators are not checked in the the generic check - GENERIC_CHECKED_FIELDS: List[str] # Dict[str, bool] # This should be a list right ? + GENERIC_CHECKED_FIELDS: List[str] # Dict[str, bool] # This should be a list right ? # A dictionary where a field can be linked to an extra check required when the generic check isn't enough to verify the validity of the value in this field. EXTRA_CHECKS: Dict[str, Callable[[dict], Union[None, Tuple[str, dict]]]] # The model used as the source for field information in the generic checker. MODEL: models.Model # Reperesent the root location of the error e.g. ["threats"], ["threats", "translations"] - def __init_subclass__(cls) : + def __init_subclass__(cls): """This function verifies that all static attributes requred to make this class work as expected are defined. It also checks some mandatory subset relations between different attributes.""" REQUIRED_STATIC_ATTRS = set(ObjectFieldsChecker.__annotations__) missing_fields = REQUIRED_STATIC_ATTRS - set(dir(cls)) - if missing_fields : - raise AttributeError(f"Some fields are missing in the ObjectFieldsChecker subclass '{cls}'. Missing fields: {missing_fields}") + if missing_fields: + raise AttributeError( + f"Some fields are missing in the ObjectFieldsChecker subclass '{cls}'. Missing fields: {missing_fields}" + ) - if (diff := cls.REQUIRED_FIELDS - cls.ALL_FIELDS) : - raise ValueError(f"The REQUIRED_FIELDS attribute must be a subset of the ALL_FIELDS attribute for the ObjectFieldsChecker subclass '{cls}' invalid values: {diff}") + if diff := cls.REQUIRED_FIELDS - cls.ALL_FIELDS: + raise ValueError( + f"The REQUIRED_FIELDS attribute must be a subset of the ALL_FIELDS attribute for the ObjectFieldsChecker subclass '{cls}' invalid values: {diff}" + ) - if (diff := set(cls.GENERIC_CHECKED_FIELDS) - cls.ALL_FIELDS) : - raise ValueError(f"The GENERIC_CHECKED_FIELDS attribute must be a subset of the ALL_FIELDS attribute for the ObjectFieldsChecker subclass '{cls}' invalid values: {diff}") + if diff := set(cls.GENERIC_CHECKED_FIELDS) - cls.ALL_FIELDS: + raise ValueError( + f"The GENERIC_CHECKED_FIELDS attribute must be a subset of the ALL_FIELDS attribute for the ObjectFieldsChecker subclass '{cls}' invalid values: {diff}" + ) - if (diff := (set(cls.EXTRA_CHECKS) - cls.ALL_FIELDS)) : - raise ValueError(f"The EXTRA_CHECKS attribute dictionary keys must form a subset of the ALL_FIELDS attribute for the ObjectFieldsChecker subclass '{cls}' invalid keys: {diff}") + if diff := (set(cls.EXTRA_CHECKS) - cls.ALL_FIELDS): + raise ValueError( + f"The EXTRA_CHECKS attribute dictionary keys must form a subset of the ALL_FIELDS attribute for the ObjectFieldsChecker subclass '{cls}' invalid keys: {diff}" + ) @classmethod - def check(cls, obj: dict, strict=True) -> Union[None, Tuple[str, dict, List[str]]] : + def check(cls, obj: dict, strict=True) -> Union[None, Tuple[str, dict, List[str]]]: """ Check that all fields are valid in an object. @@ -1074,44 +1189,52 @@ def check(cls, obj: dict, strict=True) -> Union[None, Tuple[str, dict, List[str] """ object_fields = set(obj) missing_fields = cls.REQUIRED_FIELDS - object_fields - if missing_fields : - return ("libraryErrorMissingField", { - "fields": repr_value(sorted(missing_fields)) - }, []) + if missing_fields: + return ( + "libraryErrorMissingField", + {"fields": repr_value(sorted(missing_fields))}, + [], + ) - if strict : + if strict: invalid_fields = object_fields - cls.ALL_FIELDS - if invalid_fields : - return ("libraryErrorInvalidFields", { - "fields": repr_value(sorted(invalid_fields)) - }, []) + if invalid_fields: + return ( + "libraryErrorInvalidFields", + {"fields": repr_value(sorted(invalid_fields))}, + [], + ) - for field, value in obj.items() : - if field in cls.GENERIC_CHECKED_FIELDS : + for field, value in obj.items(): + if field in cls.GENERIC_CHECKED_FIELDS: value = obj[field] - if (error := LibraryFormatUtils.generic_check_from_model_field( - value, cls.MODEL, field - )) is not None : + if ( + error := LibraryFormatUtils.generic_check_from_model_field( + value, cls.MODEL, field + ) + ) is not None: return (*error, [field]) - if field in cls.EXTRA_CHECKS : + if field in cls.EXTRA_CHECKS: extra_check = cls.EXTRA_CHECKS[field] - if (error := extra_check(value, obj=obj)) : + if error := extra_check(value, obj=obj): return (*error, [field]) + class ThreatFieldsChecker(ObjectFieldsChecker): REQUIRED_FIELDS = {"urn", "ref_id", "name"} # Should we accept the "annotation" field for threat ? # It exists in the Threat model after all. - ALL_FIELDS = {"urn", "ref_id", "name", "description", "translations"} - GENERIC_CHECKED_FIELDS = ["urn", "ref_id", "name", "description"] + ALL_FIELDS = {"urn", "ref_id", "name", "description", "translations"} + GENERIC_CHECKED_FIELDS = ["urn", "ref_id", "name", "description"] EXTRA_CHECKS = { "urn": LibraryFormatUtils.check_urn, - "translations": LibraryFormatUtils.check_translations + "translations": LibraryFormatUtils.check_translations, } MODEL = Threat DEFAULT_LOCATION = ["threats"] + class ReferenceControlChecker(ObjectFieldsChecker): """ urn: urn:intuitem:risk:reference_control:asf-baseline-v2:asf-rec-12 @@ -1121,58 +1244,116 @@ class ReferenceControlChecker(ObjectFieldsChecker): csf_function: detect annotation: 'PDPDPPDPD' """ + REQUIRED_FIELDS = {"urn", "ref_id", "name"} - ALL_FIELDS = {"urn", "ref_id", "name", "description", "translations", "annotation", "category", "csf_function"} + ALL_FIELDS = { + "urn", + "ref_id", + "name", + "description", + "translations", + "annotation", + "category", + "csf_function", + } # We must test that the choice fields like category and csf_function are correctly checked by the generic checker. - GENERIC_CHECKED_FIELDS = ["urn", "ref_id", "name", "description", "annotation", "category", "csf_function"] + GENERIC_CHECKED_FIELDS = [ + "urn", + "ref_id", + "name", + "description", + "annotation", + "category", + "csf_function", + ] EXTRA_CHECKS = { "urn": LibraryFormatUtils.check_urn, - "translations": LibraryFormatUtils.check_translations + "translations": LibraryFormatUtils.check_translations, } MODEL = ReferenceControl + class RiskMatrixChecker(ObjectFieldsChecker): REQUIRED_FIELDS = {"urn", "ref_id", "name", "impact", "probability", "risk", "grid"} # Should we accept the "annotation" field for threat ? # It exists in the RiskMatrix model after all. - ALL_FIELDS = {"urn", "ref_id", "name", "description", "translations", "impact", "probability", "risk", "grid"} + ALL_FIELDS = { + "urn", + "ref_id", + "name", + "description", + "translations", + "impact", + "probability", + "risk", + "grid", + } # "strength_of_knowledge" is in MATRIX_FIELDS, how is it used though ? # Should i put it in the verifier, i really think so GENERIC_CHECKED_FIELDS = ["urn", "ref_id", "name", "description"] EXTRA_CHECKS = { "urn": LibraryFormatUtils.check_urn, - "translations": LibraryFormatUtils.check_translations + "translations": LibraryFormatUtils.check_translations, } MODEL = RiskMatrix + class RiskMatrixProbabilityChecker(ObjectFieldsChecker): REQUIRED_FIELDS = {"abbreviation", "name", "description"} - ALL_FIELDS = {"abbreviation", "name", "description", "hexcolor", "translations", "id"} - GENERIC_CHECKED_FIELDS = [] # This must be an empty list if MODEL is None + ALL_FIELDS = { + "abbreviation", + "name", + "description", + "hexcolor", + "translations", + "id", + } + GENERIC_CHECKED_FIELDS = [] # This must be an empty list if MODEL is None EXTRA_CHECKS = { "abbreviation": LibraryFormatUtils.make_str_checker(), "name": LibraryFormatUtils.make_str_checker(), "description": LibraryFormatUtils.make_str_checker(), - "hexcolor": LibraryFormatUtils.make_str_checker(regex_pattern=r"#[0-9A-Fa-f]{6}"), + "hexcolor": LibraryFormatUtils.make_str_checker( + regex_pattern=r"#[0-9A-Fa-f]{6}" + ), "translations": LibraryFormatUtils.check_translations, - "id": LibraryFormatUtils.make_int_checker(min_value=0) + "id": LibraryFormatUtils.make_int_checker(min_value=0), } MODEL = None + class FrameworkChecker(ObjectFieldsChecker): REQUIRED_FIELDS = {"urn", "ref_id", "name"} - ALL_FIELDS = {"urn", "ref_id", "name", "description", "translations", "requirement_nodes"} + ALL_FIELDS = { + "urn", + "ref_id", + "name", + "description", + "translations", + "requirement_nodes", + } GENERIC_CHECKED_FIELDS = ["urn", "ref_id", "name", "description"] EXTRA_CHECKS = { "urn": LibraryFormatUtils.check_urn, - "translations": LibraryFormatUtils.check_translations + "translations": LibraryFormatUtils.check_translations, } MODEL = Framework + class RequirementNodeChecker(ObjectFieldsChecker): REQUIRED_FIELDS = {"urn", "ref_id", "name"} # Remove the "depth" field later on (since it's useless) - ALL_FIELDS = {"urn", "ref_id", "name", "description", "parent_urn", "translations", "depth", "assessable", "reference_controls"} + ALL_FIELDS = { + "urn", + "ref_id", + "name", + "description", + "parent_urn", + "translations", + "depth", + "assessable", + "reference_controls", + } GENERIC_CHECKED_FIELDS = ["urn", "ref_id", "name", "description", "assessable"] """ parent_urn: urn @@ -1189,33 +1370,57 @@ class RequirementNodeChecker(ObjectFieldsChecker): } MODEL = RequirementNode + class LibraryFieldsChecker(ObjectFieldsChecker): REQUIRED_FIELDS = {"urn", "name", "version", "objects", "locale", "ref_id"} - ALL_FIELDS = {"urn", "name", "description", "copyright", "locale", "ref_id", "dependencies", "version", "provider", "packager", "translations", "annotation", "objects"} - GENERIC_CHECKED_FIELDS = ["urn", "name", "description", "copyright", "locale", "ref_id", "version", "provider", "packager", "annotation"] + ALL_FIELDS = { + "urn", + "name", + "description", + "copyright", + "locale", + "ref_id", + "dependencies", + "version", + "provider", + "packager", + "translations", + "annotation", + "objects", + } + GENERIC_CHECKED_FIELDS = [ + "urn", + "name", + "description", + "copyright", + "locale", + "ref_id", + "version", + "provider", + "packager", + "annotation", + ] EXTRA_CHECKS = { "urn": LibraryFormatUtils.check_urn, "locale": LibraryFormatUtils.check_locale, "version": LibraryFormatUtils.check_version, "translations": LibraryFormatUtils.check_translations, - "dependencies": LibraryFormatUtils.check_dependencies + "dependencies": LibraryFormatUtils.check_dependencies, } MODEL = StoredLibrary + class LibraryFormatChecker: - OBJECT_FIELDS = { # List of valid keys for the library["objects"] dictionary. + OBJECT_FIELDS = { # List of valid keys for the library["objects"] dictionary. "threats", "reference_controls", "risk_matrix", "framework", "requirement_mapping_set", } - NON_UNIQUE_OBJECTS = [ - "threats", - "reference_controls", - "risk_matrix" - ] + NON_UNIQUE_OBJECTS = ["threats", "reference_controls", "risk_matrix"] UNIQUE_OBJECTS = list(OBJECT_FIELDS - set(NON_UNIQUE_OBJECTS)) + def __init__(self, library_data: dict): self._library = library_data self._objects = {} @@ -1231,88 +1436,107 @@ def check_metadata_validity(self) -> Union[None, Tuple[str, dict, List[str]]]: The metadatas are all the fields contained directly within the dictionary of the library. """ - if (error := LibraryFieldsChecker.check(self._library)) : + if error := LibraryFieldsChecker.check(self._library): return error def check_objects_emptyness(self) -> Union[None, Tuple[str, dict, List[str]]]: - if not self._objects : + if not self._objects: return ("libraryErrorObjectsEmpty", {}, ["objects"]) object_fields = set(self._objects.keys()) invalid_object_fields = object_fields - LibraryFormatChecker.OBJECT_FIELDS - if invalid_object_fields : - return ("libraryErrorInvalidFields", { - "fields": repr_value(sorted(invalid_object_fields)) - }, ["objects"]) + if invalid_object_fields: + return ( + "libraryErrorInvalidFields", + {"fields": repr_value(sorted(invalid_object_fields))}, + ["objects"], + ) - for object_typename in LibraryFormatChecker.NON_UNIQUE_OBJECTS : + for object_typename in LibraryFormatChecker.NON_UNIQUE_OBJECTS: object_data = self._objects.get(object_typename, NotImplemented) - if object_data is not NotImplemented and not isinstance(object_data, list) : - return ("libraryErrorBadType", { - "value": repr_value(object_data), - "type": type(object_data).__name__, - "expected_type": "list" - }, ["objects", object_typename]) - - for object_typename in LibraryFormatChecker.UNIQUE_OBJECTS : + if object_data is not NotImplemented and not isinstance(object_data, list): + return ( + "libraryErrorBadType", + { + "value": repr_value(object_data), + "type": type(object_data).__name__, + "expected_type": "list", + }, + ["objects", object_typename], + ) + + for object_typename in LibraryFormatChecker.UNIQUE_OBJECTS: object_data = self._objects.get(object_typename, NotImplemented) - if object_data is not NotImplemented and not isinstance(object_data, dict) : - return ("libraryErrorBadType", { - "value": repr_value(object_data), - "type": type(object_data).__name__, - "expected_type": "dict" - }, ["objects", object_typename]) + if object_data is not NotImplemented and not isinstance(object_data, dict): + return ( + "libraryErrorBadType", + { + "value": repr_value(object_data), + "type": type(object_data).__name__, + "expected_type": "dict", + }, + ["objects", object_typename], + ) # Returning a list of errors to the client instead of the first detected error would be even better. # But it wouldn't be good to put a lot of text in a toast message so it would require creating a dedicated feature for library validity checking/custom library creation. - def check_threats(self) -> Union[None, Tuple[str, dict, List[str]]] : + def check_threats(self) -> Union[None, Tuple[str, dict, List[str]]]: threats = self._objects.get("threats", []) - for index, threat in enumerate(threats) : + for index, threat in enumerate(threats): ident = get_object_ident(threat, index) - if (error := ThreatFieldsChecker.check(threat)) is not None : + if (error := ThreatFieldsChecker.check(threat)) is not None: return merge_location(error, ["objects", "threats", ident]) - def check_reference_controls(self) -> Union[None, Tuple[str, dict, List[str]]] : + def check_reference_controls(self) -> Union[None, Tuple[str, dict, List[str]]]: reference_controls = self._objects.get("reference_controls", []) - for index, reference_control in enumerate(reference_controls) : + for index, reference_control in enumerate(reference_controls): ident = get_object_ident(reference_control, index) - if (error := ReferenceControlChecker.check(reference_control)) is not None : + if (error := ReferenceControlChecker.check(reference_control)) is not None: return merge_location(error, ["objects", "reference_controls", ident]) - def check_risk_matrices(self) -> Union[None, Tuple[str, dict, List[str]]] : + def check_risk_matrices(self) -> Union[None, Tuple[str, dict, List[str]]]: matrices = self._objects.get("risk_matrix", []) - for index, matrix in enumerate(matrices) : + for index, matrix in enumerate(matrices): ident = get_object_ident(matrix, index) - if (error := LibraryFormatUtils.check_risk_matrix(matrix, ["objects", "risk_matrix", ident])) is not None : + if ( + error := LibraryFormatUtils.check_risk_matrix( + matrix, ["objects", "risk_matrix", ident] + ) + ) is not None: return error - def check_framework(self) -> Union[None, Tuple[str, dict, List[str]]] : + def check_framework(self) -> Union[None, Tuple[str, dict, List[str]]]: framework = self._objects.get("framework") - if framework is None : + if framework is None: return None requirement_nodes = framework.get("requirement_nodes", []) - if not isinstance(requirement_nodes, list) : - return ("libraryErrorBadType", { - "value": "[...]", - "type": type(requirement_nodes).__name__, - "expected_type": "list" - }, ["objects", "framework", "requirement_nodes"]) - - for requirement_node in requirement_nodes : - if (error := RequirementNodeChecker.check(requirement_node)) is not None : + if not isinstance(requirement_nodes, list): + return ( + "libraryErrorBadType", + { + "value": "[...]", + "type": type(requirement_nodes).__name__, + "expected_type": "list", + }, + ["objects", "framework", "requirement_nodes"], + ) + + for requirement_node in requirement_nodes: + if (error := RequirementNodeChecker.check(requirement_node)) is not None: return merge_location(error, ["objects", "framework"]) - for requirement_node in requirement_nodes : + for requirement_node in requirement_nodes: urn = requirement_node["urn"] urn_chain = [urn] - while (parent_urn := requirement_node.get("parent_urn")) is not None : + while (parent_urn := requirement_node.get("parent_urn")) is not None: urn_chain.append(parent_urn) - if urn == parent_urn : - return ("libraryErrorCircularRequirementNode", { - "urns": "===>".join(urn_chain) - }, ["objects", "framework", "requirement_nodes"]) - + if urn == parent_urn: + return ( + "libraryErrorCircularRequirementNode", + {"urns": "===>".join(urn_chain)}, + ["objects", "framework", "requirement_nodes"], + ) def run(self) -> Union[None, Tuple[str, dict, List[str]]]: """ @@ -1332,7 +1556,7 @@ def run(self) -> Union[None, Tuple[str, dict, List[str]]]: - Check if the value is valid compared to other values which validity has already been verified in previous tests """ - if (error := self.check_metadata_validity()) is not None : + if (error := self.check_metadata_validity()) is not None: return error self._objects = self._library["objects"] @@ -1345,10 +1569,10 @@ def run(self) -> Union[None, Tuple[str, dict, List[str]]]: self.check_risk_matrices, self.check_framework, # self.check_requirement_mapping_set - ] : + ]: # The pattern for the error location will generally be : # [object][urn=... OR {nth}][...fields] - if (error := check_function()) is not None : + if (error := check_function()) is not None: return error return ("libraryErrorNotAnError", {}, []) diff --git a/backend/library/views.py b/backend/library/views.py index 443f1ba36..0b5a7104f 100644 --- a/backend/library/views.py +++ b/backend/library/views.py @@ -164,11 +164,11 @@ def upload_library(self, request): validate_file_extension(attachment) # Use safe_load to prevent arbitrary code execution. - content = attachment.read() # Should we read it chunck by chunck or ensure that the file size of the library content is reasonnable before reading ? + content = attachment.read() # Should we read it chunck by chunck or ensure that the file size of the library content is reasonnable before reading ? try: result = StoredLibrary.store_library_content(content) - if isinstance(result, tuple) : + if isinstance(result, tuple): return HttpResponse( json.dumps({"error": result}), status=HTTP_400_BAD_REQUEST, @@ -186,7 +186,7 @@ def upload_library(self, request): json.dumps({"error": "libraryAlreadyLoadedError"}), status=HTTP_400_BAD_REQUEST, ) - except Exception as e : + except Exception as e: return HttpResponse( json.dumps({"error": "invalidLibraryFileError"}), status=HTTP_400_BAD_REQUEST, diff --git a/frontend/src/routes/(app)/(internal)/libraries/+page.server.ts b/frontend/src/routes/(app)/(internal)/libraries/+page.server.ts index b83169b5c..742c6d7a3 100644 --- a/frontend/src/routes/(app)/(internal)/libraries/+page.server.ts +++ b/frontend/src/routes/(app)/(internal)/libraries/+page.server.ts @@ -95,12 +95,12 @@ export const actions: Actions = { const response = await req.json(); const errorData = response.error; - const errorTraceback = errorData[2].map((field) => `[${field}]`).join(""); + const errorTraceback = errorData[2].map((field) => `[${field}]`).join(''); let errorMessage = unsafeTranslate(errorData[0], errorData[1]); if (errorMessage === undefined) { - errorMessage = `${errorData[0]} ${JSON.stringify(errorData[1])}` + errorMessage = `${errorData[0]} ${JSON.stringify(errorData[1])}`; } - errorMessage = `${errorTraceback} ${errorMessage}` + errorMessage = `${errorTraceback} ${errorMessage}`; setFlash({ type: 'error', message: errorMessage }, event); delete form.data['file']; // This removes a warning: Cannot stringify arbitrary non-POJOs (data..form.data.file)