From cd913f320e116a717bc99b4a3c95af51ebe391f2 Mon Sep 17 00:00:00 2001 From: Hugo Rodger-Brown Date: Tue, 19 Sep 2023 22:14:40 +0100 Subject: [PATCH 01/10] Refactor base clases - split redacter and anonymiser --- README.md | 2 +- anonymiser/decorators.py | 6 +- anonymiser/models.py | 237 ++++++++++++++++++++++++++++++++++----- anonymiser/registry.py | 12 +- tests/anonymisers.py | 8 +- tests/test_models.py | 48 ++++---- 6 files changed, 248 insertions(+), 65 deletions(-) diff --git a/README.md b/README.md index 6cb21f4..a2c09f2 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ a new anonymiser that splits out each field: ```python # anonymisers.py @register_anonymiser -class UserAnonymiser(BaseAnonymiser): +class UserAnonymiser(ModelAnonymiser): model = User def anonymise_first_name(self, obj: User) -> None: diff --git a/anonymiser/decorators.py b/anonymiser/decorators.py index ae3405b..74de760 100644 --- a/anonymiser/decorators.py +++ b/anonymiser/decorators.py @@ -1,8 +1,10 @@ -from .models import BaseAnonymiser +from .models import AnonymiserBase, RedacterBase from .registry import register -def register_anonymiser(anonymiser: type[BaseAnonymiser]) -> type[BaseAnonymiser]: +def register_anonymiser( + anonymiser: type[AnonymiserBase | RedacterBase], +) -> type[AnonymiserBase | RedacterBase]: """Add {model: Anonymiser} to the global registry.""" register(anonymiser) return anonymiser diff --git a/anonymiser/models.py b/anonymiser/models.py index fd87a5c..fe26282 100644 --- a/anonymiser/models.py +++ b/anonymiser/models.py @@ -45,36 +45,30 @@ def field_type(self) -> str: def get_field_summary_data( - field: models.Field, anonymiser: BaseAnonymiser | None + field: models.Field, anonymiser: ModelAnonymiser | None ) -> FieldSummaryData: if anonymiser: return FieldSummaryData(field, anonymiser.is_field_anonymisable(field.name)) return FieldSummaryData(field, False) -class BaseAnonymiser: - """ - Base class for anonymisation functions. - - You can instantiate this class and call the anonymise_object method - for any model as a "noop" anonymiser. It will not do anything, but - it can be used to summarise field information in a consistent manner - for models that do not need to be anonymised. - - """ - +class _ModelBase: # Override with the model to be anonymised model: type[models.Model] - # Set to False to disable auto-redaction of text fields - auto_redact: bool = True + def get_model_fields(self) -> list[models.Field]: + """Return a list of fields on the model.""" + if not self.model: + raise NotImplementedError("model must be set") + return [ + f + for f in self.model._meta.get_fields() + if not isinstance(f, models.ForeignObjectRel) + ] - # List of field names to exclude from auto-redaction - auto_redact_exclude: list[str] = [] - # field_name: redaction_value. redaction_value can be a static value - # or a db function, e.g. F("field_name") or Value("static value"). - custom_field_redactions: dict[str, Any] = {} +class AnonymiserBase(_ModelBase): + """Base class for anonymisation functions.""" def __setattr__(self, __name: str, __value: Any) -> None: """ @@ -97,16 +91,6 @@ def __setattr__(self, __name: str, __value: Any) -> None: ) super().__setattr__(__name, __value) - def get_model_fields(self) -> list[models.Field]: - """Return a list of fields on the model.""" - if not self.model: - raise NotImplementedError("model must be set") - return [ - f - for f in self.model._meta.get_fields() - if not isinstance(f, models.ForeignObjectRel) - ] - def get_model_field_summary(self) -> list[FieldSummaryData]: """Return a list of all model fiels and whether they are anonymisable.""" return [ @@ -163,6 +147,20 @@ def post_anonymise_object( """ pass + +class RedacterBase(_ModelBase): + """Base class for redaction functions.""" + + # Set to False to disable auto-redaction of text fields + auto_redact: bool = True + + # List of field names to exclude from auto-redaction + auto_redact_exclude: list[str] = [] + + # field_name: redaction_value. redaction_value can be a static value + # or a db function, e.g. F("field_name") or Value("static value"). + custom_field_redactions: dict[str, Any] = {} + def is_field_auto_redactable(self, field: models.Field) -> bool: """ Return True if the field should be auto-redacted. @@ -231,3 +229,184 @@ def redact_queryset( redactions.update(self.custom_field_redactions) redactions.update(field_overrides) return queryset.update(**redactions) + + +class ModelAnonymiser(AnonymiserBase, RedacterBase): + """ + Base class for anonymisation functions. + + You can instantiate this class and call the anonymise_object method + for any model as a "noop" anonymiser. It will not do anything, but + it can be used to summarise field information in a consistent manner + for models that do not need to be anonymised. + + """ + + # # Override with the model to be anonymised + # model: type[models.Model] + + # # Set to False to disable auto-redaction of text fields + # auto_redact: bool = True + + # # List of field names to exclude from auto-redaction + # auto_redact_exclude: list[str] = [] + + # # field_name: redaction_value. redaction_value can be a static value + # # or a db function, e.g. F("field_name") or Value("static value"). + # custom_field_redactions: dict[str, Any] = {} + + # def __setattr__(self, __name: str, __value: Any) -> None: + # """ + # Prevent setting of attribute on the anonymiser itself. + + # This is a common mistake when writing anonymiser functions - + # inside the `anonymise_FOO` method you call `self.FOO = "bar"` + # instead of `obj.FOO = "bar"`, because that's the natural way to + # write it. + + # This will raise an AttributeError if you try to set an attribute + # that looks like it maps to an anonymiser method. + + # """ + # if hasattr(self, f"anonymise_{__name}"): + # raise AttributeError( + # "Cannot set anonymiser attributes directly - did you mean to " + # "use 'obj' instead of 'self' in method " + # f"`{self.__class__.__name__}.anonymise_{__name}`?" + # ) + # super().__setattr__(__name, __value) + + # def get_model_fields(self) -> list[models.Field]: + # """Return a list of fields on the model.""" + # if not self.model: + # raise NotImplementedError("model must be set") + # return [ + # f + # for f in self.model._meta.get_fields() + # if not isinstance(f, models.ForeignObjectRel) + # ] + + # def get_model_field_summary(self) -> list[FieldSummaryData]: + # """Return a list of all model fiels and whether they are anonymisable.""" + # return [ + # FieldSummaryData(f, self.is_field_anonymisable(f.name)) + # for f in self.get_model_fields() + # ] + + # def is_field_anonymisable(self, field_name: str) -> bool: + # return hasattr(self, f"anonymise_{field_name}") + + # def get_anonymisable_fields(self) -> list[models.Field]: + # """Return a list of fields on the model that are anonymisable.""" + # return [ + # f for f in self.get_model_fields() if self.is_field_anonymisable(f.name) + # ] + + # def anonymise_field( + # self, obj: models.Model, field_name: str + # ) -> AnonymisationResult: + # """Anonymise a single field on the model instance.""" + # if not (anon_func := getattr(self, f"anonymise_{field_name}", None)): + # raise NotImplementedError( + # f"Anonymiser function 'anonymise_{field_name}' not implemented" + # ) + # old_value = getattr(obj, field_name) + # anon_func(obj) + # new_value = getattr(obj, field_name) + # return old_value, new_value + + # def anonymise_object(self, obj: models.Model) -> None: + # """Anonymise the model instance (NOT THREAD SAFE).""" + # output = {} + # for field in self.get_anonymisable_fields(): + # output[field.name] = self.anonymise_field(obj, field.name) + # self.post_anonymise_object(obj, **output) + + # def anonymise_queryset(self, queryset: Iterator[models.Model]) -> int: + # """Anonymise all objects in the queryset (and SAVE).""" + # count = 0 + # for obj in queryset: + # self.anonymise_object(obj) + # obj.save() + # count += 1 + # return count + + # def post_anonymise_object( + # self, obj: models.Model, **updates: AnonymisationResult + # ) -> None: + # """ + # Post-process the model instance after anonymisation. + + # The updates param is a dict of field names to (old_value, new_value) tuples. + + # """ + # pass + + # def is_field_auto_redactable(self, field: models.Field) -> bool: + # """ + # Return True if the field should be auto-redacted. + + # Currently this includes text fields that are not choices, primary + # keys, unique fields, or in the auto_redact_exclude list. + + # """ + # return ( + # isinstance(field, (models.CharField, models.TextField)) + # and not field.choices + # and not field.primary_key + # and not getattr(field, "unique", False) + # and field.name not in self.auto_redact_exclude + # ) + + # def auto_field_redactions(self) -> dict[str, str]: + # """ + # Return a dict of redaction_values for all text fields. + + # This is used to "auto-redact" all char/text fields with "X" - if + # the field does not use choices, and is not a primary key or + # unique field. + + # """ + + # def _max_length(f: models.Field) -> int: + # if isinstance(f, models.CharField): + # return f.max_length + # if isinstance(f, models.TextField): + # return 400 + # raise ValueError("Field must be CharField or TextField") + + # return { + # f.name: _max_length(f) * "X" + # for f in self.get_model_fields() + # if self.is_field_auto_redactable(f) + # } + + # def redact_queryset( + # self, + # queryset: models.QuerySet[models.Model], + # auto_redact: bool = auto_redact, + # **field_overrides: Any, + # ) -> int: + # """ + # Redact a queryset (and SAVE). + + # The `auto_redact` parameter will automatically redact all text + # fields with "X" if they are not already covered in the + # field_redactions dict. + + # The `field_overrides` parameter allows you to pass in a dict of + # field_name: redaction_value to override any other redactions. + + # The redactions cascade in the following order: + + # - auto_redactions (all non-choice text fields) + # - field_redactions (static values set on the anonymiser) + # - field_overrides (values passed in to method) + + # """ + # redactions: dict[str, Any] = {} + # if auto_redact: + # redactions.update(self.auto_field_redactions()) + # redactions.update(self.custom_field_redactions) + # redactions.update(field_overrides) + # return queryset.update(**redactions) diff --git a/anonymiser/registry.py b/anonymiser/registry.py index 0691ed8..2559fcf 100644 --- a/anonymiser/registry.py +++ b/anonymiser/registry.py @@ -3,7 +3,7 @@ from django.db import models -from .models import BaseAnonymiser +from .models import AnonymiserBase, RedacterBase lock = threading.Lock() logger = logging.getLogger(__name__) @@ -17,7 +17,7 @@ class Registry(dict): _registry = Registry() -def _register(anonymiser: type[BaseAnonymiser]) -> None: +def _register(anonymiser: type[AnonymiserBase | RedacterBase]) -> None: if not (model := anonymiser.model): raise ValueError("Anonymiser must have a model attribute set.") if model in _registry: @@ -26,7 +26,7 @@ def _register(anonymiser: type[BaseAnonymiser]) -> None: _registry[model] = anonymiser -def register(anonymiser: type[BaseAnonymiser]) -> None: +def register(anonymiser: type[AnonymiserBase | RedacterBase]) -> None: """Add {model: Anonymiser} to the global registry.""" with lock: _register(anonymiser) @@ -36,11 +36,13 @@ def anonymisable_models() -> list[type[models.Model]]: return list(_registry.keys()) -def anonymisers() -> list[type[BaseAnonymiser]]: +def anonymisers() -> list[type[AnonymiserBase | RedacterBase]]: return list(_registry.values()) -def get_model_anonymiser(model: type[models.Model]) -> BaseAnonymiser | None: +def get_model_anonymiser( + model: type[models.Model], +) -> AnonymiserBase | RedacterBase | None: """Return newly instantiated anonymiser for model.""" if anonymiser := _registry.get(model): return anonymiser() diff --git a/tests/anonymisers.py b/tests/anonymisers.py index 1dbc619..58d76a2 100644 --- a/tests/anonymisers.py +++ b/tests/anonymisers.py @@ -2,20 +2,20 @@ from django.db.models.functions import Concat from anonymiser.decorators import register_anonymiser -from anonymiser.models import BaseAnonymiser +from anonymiser.models import AnonymiserBase, RedacterBase from .models import User @register_anonymiser -class UserAnonymiser(BaseAnonymiser): +class UserAnonymiser(AnonymiserBase): model = User def anonymise_first_name(self, obj: User) -> None: obj.first_name = "Anonymous" -class BadUserAnonymiser(BaseAnonymiser): +class BadUserAnonymiser(AnonymiserBase): model = User def anonymise_first_name(self, obj: User) -> None: @@ -23,7 +23,7 @@ def anonymise_first_name(self, obj: User) -> None: self.first_name = "Anonymous" -class UserRedacter(BaseAnonymiser): +class UserRedacter(RedacterBase): model = User custom_field_redactions = { diff --git a/tests/test_models.py b/tests/test_models.py index 5236e3d..accdfa2 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -92,30 +92,6 @@ def test_anonymise_queryset( assert user_anonymiser.anonymise_queryset(User.objects.none()) == 0 assert user_anonymiser.anonymise_queryset(User.objects.all()) == 1 - @mock.patch.object(UserAnonymiser, "get_model_fields") - def test_auto_redact( - self, mock_get_fields: mock.Mock, user_anonymiser: UserAnonymiser - ) -> None: - mock_get_fields.return_value = [ - # redact to 255 chars - models.CharField(name="char_field", max_length=255), - # redact to 400 chars - models.TextField(name="text_field"), - # don't redact (choices) - models.CharField(name="choices", max_length=255, choices=[("a", "A")]), - # don't redact (unique) - models.CharField(name="unique", max_length=255, unique=True), - # don't redact (primary key) - models.CharField(name="primary_key", max_length=255, primary_key=True), - # don't redact (IntegerField, DateField, etc) - models.IntegerField(name="integer_field"), - models.DateField(name="date_field"), - ] - assert user_anonymiser.auto_field_redactions() == { - "char_field": 255 * "X", - "text_field": 400 * "X", - } - def test_bad_anonymiser() -> None: with pytest.raises(AttributeError): @@ -187,3 +163,27 @@ def test_redact_queryset__field_overrides__postgres( user_redacter.redact_queryset(User.objects.all(), uuid=GenerateUuid4()) user.refresh_from_db() assert user.uuid != uuid + + @mock.patch.object(UserRedacter, "get_model_fields") + def test_auto_redact( + self, mock_get_fields: mock.Mock, user_redacter: UserRedacter + ) -> None: + mock_get_fields.return_value = [ + # redact to 255 chars + models.CharField(name="char_field", max_length=255), + # redact to 400 chars + models.TextField(name="text_field"), + # don't redact (choices) + models.CharField(name="choices", max_length=255, choices=[("a", "A")]), + # don't redact (unique) + models.CharField(name="unique", max_length=255, unique=True), + # don't redact (primary key) + models.CharField(name="primary_key", max_length=255, primary_key=True), + # don't redact (IntegerField, DateField, etc) + models.IntegerField(name="integer_field"), + models.DateField(name="date_field"), + ] + assert user_redacter.auto_field_redactions() == { + "char_field": 255 * "X", + "text_field": 400 * "X", + } From fdeebc955150c8926427b08b1fcf23d74ba4f151 Mon Sep 17 00:00:00 2001 From: Hugo Rodger-Brown Date: Wed, 20 Sep 2023 13:31:19 +0100 Subject: [PATCH 02/10] Update README --- README.md | 24 ++++++++++++++++++++++++ tests/anonymisers.py | 5 +++-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index a2c09f2..0ba0af9 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,30 @@ up-to-date. The anonymisation itself doesn't change - it's just shifting the code around. +## Redaction vs. Anonymisation + +This library contains two flavours of anonymisation - Redaction, and +Anonymisation. The two differ in how the data is overwritten: + +Type | Implementation | Performance | Data +--- | --- | --- | --- +Redaction | SQL | Fast | Table level +Anonymisation | Python | Slow | Row level + +### Redaction + +Redaction is implemented as a single SQL `update` statement that wipes +an entire table in one go. It's very fast, but it's limited in the sense +that it cannot produce realistic data. In fact it may well render your +application unusable. It is recommended as the first step in data +anonymisation. + +### Anonymisation + +Anonymisation is an row-level operation that iterates over a +queryset and updates each object in turn. The main advantage is that +post-anonymisation you will have realistic, usable, data. + ## Usage As an example - this is a hypothetical User model's anonymisation today: diff --git a/tests/anonymisers.py b/tests/anonymisers.py index 58d76a2..ada174f 100644 --- a/tests/anonymisers.py +++ b/tests/anonymisers.py @@ -2,14 +2,15 @@ from django.db.models.functions import Concat from anonymiser.decorators import register_anonymiser -from anonymiser.models import AnonymiserBase, RedacterBase +from anonymiser.models import AnonymiserBase, ModelAnonymiser, RedacterBase from .models import User @register_anonymiser -class UserAnonymiser(AnonymiserBase): +class UserAnonymiser(ModelAnonymiser): model = User + auto_redact = True def anonymise_first_name(self, obj: User) -> None: obj.first_name = "Anonymous" From 2db5ac22af13e006bb116e6f5d432ad3e154ef70 Mon Sep 17 00:00:00 2001 From: Hugo Rodger-Brown Date: Sat, 23 Sep 2023 12:08:29 +0100 Subject: [PATCH 03/10] Recombine redacter and anonymiser --- .github/workflows/tox.yml | 4 +- anonymiser/decorators.py | 6 +- .../commands/display_model_anonymisation.py | 30 +- anonymiser/models.py | 270 +++++------------- anonymiser/registry.py | 10 +- .../templates/display_model_anonymisation.md | 6 +- pyproject.toml | 3 +- tests/anonymisers.py | 4 + tests/test_models.py | 47 +-- tox.ini | 4 +- 10 files changed, 121 insertions(+), 263 deletions(-) diff --git a/.github/workflows/tox.yml b/.github/workflows/tox.yml index f6be1a4..471af9c 100644 --- a/.github/workflows/tox.yml +++ b/.github/workflows/tox.yml @@ -38,8 +38,8 @@ jobs: strategy: matrix: - python: ["3.10", "3.11"] - django: ["32", "40", "41", "42", "main"] + python: ["3.11"] + django: ["32", "40", "41", "42", "50", "main"] env: TOXENV: py${{ matrix.python }}-django${{ matrix.django }} diff --git a/anonymiser/decorators.py b/anonymiser/decorators.py index 74de760..dc777f3 100644 --- a/anonymiser/decorators.py +++ b/anonymiser/decorators.py @@ -1,10 +1,10 @@ -from .models import AnonymiserBase, RedacterBase +from .models import ModelAnonymiser from .registry import register def register_anonymiser( - anonymiser: type[AnonymiserBase | RedacterBase], -) -> type[AnonymiserBase | RedacterBase]: + anonymiser: type[ModelAnonymiser], +) -> type[ModelAnonymiser]: """Add {model: Anonymiser} to the global registry.""" register(anonymiser) return anonymiser diff --git a/anonymiser/management/commands/display_model_anonymisation.py b/anonymiser/management/commands/display_model_anonymisation.py index c546495..888ec77 100644 --- a/anonymiser/management/commands/display_model_anonymisation.py +++ b/anonymiser/management/commands/display_model_anonymisation.py @@ -2,10 +2,9 @@ from django.apps import apps from django.core.management.base import BaseCommand -from django.db.models import ForeignObjectRel, Model +from django.db.models import Model from django.template.loader import render_to_string -from anonymiser.models import FieldSummaryData from anonymiser.registry import get_model_anonymiser @@ -14,33 +13,12 @@ def get_models(self) -> list[type[Model]]: """Force alphabetical order of models.""" return sorted(apps.get_models(), key=lambda m: m._meta.label) - def get_fields(self, model: type[Model]) -> list: - """Get model fields ordered by type and then name.""" - return sorted( - [ - f - for f in model._meta.get_fields() - if not isinstance(f, ForeignObjectRel) - ], - key=lambda f: f.__class__.__name__ + f.name, - ) - def handle(self, *args: Any, **options: Any) -> None: - model_fields: list[FieldSummaryData] = [] - model_anonymisers: dict[str, str] = {} for model in self.get_models(): - model_name = model._meta.label - anonymiser = get_model_anonymiser(model) - anonymiser_name = anonymiser.__class__.__name__ if anonymiser else "" - model_anonymisers[model_name] = anonymiser_name - for f in self.get_fields(model): - is_anonymisable = False - if anonymiser: - is_anonymisable = anonymiser.is_field_anonymisable(f.name) - field_data = FieldSummaryData(f, is_anonymisable) - model_fields.append(field_data) + if anonymiser := get_model_anonymiser(model): + data = anonymiser.get_model_field_summary() out = render_to_string( "display_model_anonymisation.md", - {"model_anonymisers": model_anonymisers, "model_fields": model_fields}, + {"anonymised_models": data}, ) self.stdout.write(out) diff --git a/anonymiser/models.py b/anonymiser/models.py index fe26282..c6cfdee 100644 --- a/anonymiser/models.py +++ b/anonymiser/models.py @@ -3,6 +3,7 @@ import logging from collections import namedtuple from dataclasses import dataclass +from enum import StrEnum from typing import Any, Iterator, TypeAlias from django.db import models @@ -19,37 +20,57 @@ @dataclass -class FieldSummaryData: - field: models.Field - is_anonymisable: bool +class ModelFieldSummary: + """ + Store info about the field and whether it is anonymisable. + + This is used to generate a summary of the fields on a model, and how + they are anonymised / redacted - used to generate the documentation. + + """ + + # python rejects "model" as a field name, so we use "app_model" + app_model: models.Model + model_field: models.Field + anonymiser: ModelAnonymiser | None @property def model_label(self) -> str: - return self.field.model._meta.label + return self.app_model._meta.label @property def app(self) -> str: - return self.field.model._meta.app_label + return self.app_model._meta.app_label @property def model(self) -> str: - return self.field.model._meta.object_name or "" + return self.app_model._meta.object_name or "" @property def field_name(self) -> str: - return self.field.name + return self.model_field.name @property def field_type(self) -> str: - return self.field.__class__.__name__ + return self.model_field.__class__.__name__ + + @property + def is_anonymised(self) -> bool: + if self.anonymiser: + return self.anonymiser.is_field_anonymised(self.model_field) + return False + @property + def is_redacted(self) -> bool: + if self.anonymiser: + return self.anonymiser.is_field_redacted(self.model_field) + return False -def get_field_summary_data( - field: models.Field, anonymiser: ModelAnonymiser | None -) -> FieldSummaryData: - if anonymiser: - return FieldSummaryData(field, anonymiser.is_field_anonymisable(field.name)) - return FieldSummaryData(field, False) + @property + def redaction_strategy(self) -> RedacterBase.FieldRedactionStratgy: + if self.anonymiser: + return self.anonymiser.field_redaction_strategy(self.model_field) + return RedacterBase.FieldRedactionStratgy.NONE class _ModelBase: @@ -91,26 +112,18 @@ def __setattr__(self, __name: str, __value: Any) -> None: ) super().__setattr__(__name, __value) - def get_model_field_summary(self) -> list[FieldSummaryData]: - """Return a list of all model fiels and whether they are anonymisable.""" - return [ - FieldSummaryData(f, self.is_field_anonymisable(f.name)) - for f in self.get_model_fields() - ] - - def is_field_anonymisable(self, field_name: str) -> bool: - return hasattr(self, f"anonymise_{field_name}") + def is_field_anonymised(self, field: models.Field) -> bool: + return hasattr(self, f"anonymise_{field.name}") def get_anonymisable_fields(self) -> list[models.Field]: """Return a list of fields on the model that are anonymisable.""" - return [ - f for f in self.get_model_fields() if self.is_field_anonymisable(f.name) - ] + return [f for f in self.get_model_fields() if self.is_field_anonymised(f)] def anonymise_field( - self, obj: models.Model, field_name: str + self, obj: models.Model, field: models.Field ) -> AnonymisationResult: """Anonymise a single field on the model instance.""" + field_name = field.name if not (anon_func := getattr(self, f"anonymise_{field_name}", None)): raise NotImplementedError( f"Anonymiser function 'anonymise_{field_name}' not implemented" @@ -124,7 +137,7 @@ def anonymise_object(self, obj: models.Model) -> None: """Anonymise the model instance (NOT THREAD SAFE).""" output = {} for field in self.get_anonymisable_fields(): - output[field.name] = self.anonymise_field(obj, field.name) + output[field.name] = self.anonymise_field(obj, field) self.post_anonymise_object(obj, **output) def anonymise_queryset(self, queryset: Iterator[models.Model]) -> int: @@ -161,7 +174,12 @@ class RedacterBase(_ModelBase): # or a db function, e.g. F("field_name") or Value("static value"). custom_field_redactions: dict[str, Any] = {} - def is_field_auto_redactable(self, field: models.Field) -> bool: + class FieldRedactionStratgy(StrEnum): + AUTO = "auto" + CUSTOM = "custom" + NONE = "" + + def is_field_redaction_auto(self, field: models.Field) -> bool: """ Return True if the field should be auto-redacted. @@ -177,6 +195,16 @@ def is_field_auto_redactable(self, field: models.Field) -> bool: and field.name not in self.auto_redact_exclude ) + def is_field_redaction_custom(self, field: models.Field) -> bool: + """Return True if the field has custom redaction.""" + return field.name in self.custom_field_redactions + + def is_field_redacted(self, field: models.Field) -> bool: + """Return True if the field is redacted.""" + return self.is_field_redaction_auto(field) or self.is_field_redaction_custom( + field + ) + def auto_field_redactions(self) -> dict[str, str]: """ Return a dict of redaction_values for all text fields. @@ -197,9 +225,17 @@ def _max_length(f: models.Field) -> int: return { f.name: _max_length(f) * "X" for f in self.get_model_fields() - if self.is_field_auto_redactable(f) + if self.is_field_redaction_auto(f) } + def field_redaction_strategy(self, field: models.Field) -> FieldRedactionStratgy: + """Return the FieldRedaction value for a field.""" + if self.is_field_redaction_custom(field): + return self.FieldRedactionStratgy.CUSTOM + if self.is_field_redaction_auto(field): + return self.FieldRedactionStratgy.AUTO + return self.FieldRedactionStratgy.NONE + def redact_queryset( self, queryset: models.QuerySet[models.Model], @@ -242,171 +278,9 @@ class ModelAnonymiser(AnonymiserBase, RedacterBase): """ - # # Override with the model to be anonymised - # model: type[models.Model] - - # # Set to False to disable auto-redaction of text fields - # auto_redact: bool = True - - # # List of field names to exclude from auto-redaction - # auto_redact_exclude: list[str] = [] - - # # field_name: redaction_value. redaction_value can be a static value - # # or a db function, e.g. F("field_name") or Value("static value"). - # custom_field_redactions: dict[str, Any] = {} - - # def __setattr__(self, __name: str, __value: Any) -> None: - # """ - # Prevent setting of attribute on the anonymiser itself. - - # This is a common mistake when writing anonymiser functions - - # inside the `anonymise_FOO` method you call `self.FOO = "bar"` - # instead of `obj.FOO = "bar"`, because that's the natural way to - # write it. - - # This will raise an AttributeError if you try to set an attribute - # that looks like it maps to an anonymiser method. - - # """ - # if hasattr(self, f"anonymise_{__name}"): - # raise AttributeError( - # "Cannot set anonymiser attributes directly - did you mean to " - # "use 'obj' instead of 'self' in method " - # f"`{self.__class__.__name__}.anonymise_{__name}`?" - # ) - # super().__setattr__(__name, __value) - - # def get_model_fields(self) -> list[models.Field]: - # """Return a list of fields on the model.""" - # if not self.model: - # raise NotImplementedError("model must be set") - # return [ - # f - # for f in self.model._meta.get_fields() - # if not isinstance(f, models.ForeignObjectRel) - # ] - - # def get_model_field_summary(self) -> list[FieldSummaryData]: - # """Return a list of all model fiels and whether they are anonymisable.""" - # return [ - # FieldSummaryData(f, self.is_field_anonymisable(f.name)) - # for f in self.get_model_fields() - # ] - - # def is_field_anonymisable(self, field_name: str) -> bool: - # return hasattr(self, f"anonymise_{field_name}") - - # def get_anonymisable_fields(self) -> list[models.Field]: - # """Return a list of fields on the model that are anonymisable.""" - # return [ - # f for f in self.get_model_fields() if self.is_field_anonymisable(f.name) - # ] - - # def anonymise_field( - # self, obj: models.Model, field_name: str - # ) -> AnonymisationResult: - # """Anonymise a single field on the model instance.""" - # if not (anon_func := getattr(self, f"anonymise_{field_name}", None)): - # raise NotImplementedError( - # f"Anonymiser function 'anonymise_{field_name}' not implemented" - # ) - # old_value = getattr(obj, field_name) - # anon_func(obj) - # new_value = getattr(obj, field_name) - # return old_value, new_value - - # def anonymise_object(self, obj: models.Model) -> None: - # """Anonymise the model instance (NOT THREAD SAFE).""" - # output = {} - # for field in self.get_anonymisable_fields(): - # output[field.name] = self.anonymise_field(obj, field.name) - # self.post_anonymise_object(obj, **output) - - # def anonymise_queryset(self, queryset: Iterator[models.Model]) -> int: - # """Anonymise all objects in the queryset (and SAVE).""" - # count = 0 - # for obj in queryset: - # self.anonymise_object(obj) - # obj.save() - # count += 1 - # return count - - # def post_anonymise_object( - # self, obj: models.Model, **updates: AnonymisationResult - # ) -> None: - # """ - # Post-process the model instance after anonymisation. - - # The updates param is a dict of field names to (old_value, new_value) tuples. - - # """ - # pass - - # def is_field_auto_redactable(self, field: models.Field) -> bool: - # """ - # Return True if the field should be auto-redacted. - - # Currently this includes text fields that are not choices, primary - # keys, unique fields, or in the auto_redact_exclude list. - - # """ - # return ( - # isinstance(field, (models.CharField, models.TextField)) - # and not field.choices - # and not field.primary_key - # and not getattr(field, "unique", False) - # and field.name not in self.auto_redact_exclude - # ) - - # def auto_field_redactions(self) -> dict[str, str]: - # """ - # Return a dict of redaction_values for all text fields. - - # This is used to "auto-redact" all char/text fields with "X" - if - # the field does not use choices, and is not a primary key or - # unique field. - - # """ - - # def _max_length(f: models.Field) -> int: - # if isinstance(f, models.CharField): - # return f.max_length - # if isinstance(f, models.TextField): - # return 400 - # raise ValueError("Field must be CharField or TextField") - - # return { - # f.name: _max_length(f) * "X" - # for f in self.get_model_fields() - # if self.is_field_auto_redactable(f) - # } - - # def redact_queryset( - # self, - # queryset: models.QuerySet[models.Model], - # auto_redact: bool = auto_redact, - # **field_overrides: Any, - # ) -> int: - # """ - # Redact a queryset (and SAVE). - - # The `auto_redact` parameter will automatically redact all text - # fields with "X" if they are not already covered in the - # field_redactions dict. - - # The `field_overrides` parameter allows you to pass in a dict of - # field_name: redaction_value to override any other redactions. - - # The redactions cascade in the following order: - - # - auto_redactions (all non-choice text fields) - # - field_redactions (static values set on the anonymiser) - # - field_overrides (values passed in to method) - - # """ - # redactions: dict[str, Any] = {} - # if auto_redact: - # redactions.update(self.auto_field_redactions()) - # redactions.update(self.custom_field_redactions) - # redactions.update(field_overrides) - # return queryset.update(**redactions) + def get_model_field_summary(self) -> list[ModelFieldSummary]: + """Return a list of ModelFieldSummary objects for the model.""" + return [ + ModelFieldSummary(app_model=self.model, model_field=f, anonymiser=self) + for f in self.get_model_fields() + ] diff --git a/anonymiser/registry.py b/anonymiser/registry.py index 2559fcf..304fc06 100644 --- a/anonymiser/registry.py +++ b/anonymiser/registry.py @@ -3,7 +3,7 @@ from django.db import models -from .models import AnonymiserBase, RedacterBase +from .models import ModelAnonymiser lock = threading.Lock() logger = logging.getLogger(__name__) @@ -17,7 +17,7 @@ class Registry(dict): _registry = Registry() -def _register(anonymiser: type[AnonymiserBase | RedacterBase]) -> None: +def _register(anonymiser: type[ModelAnonymiser]) -> None: if not (model := anonymiser.model): raise ValueError("Anonymiser must have a model attribute set.") if model in _registry: @@ -26,7 +26,7 @@ def _register(anonymiser: type[AnonymiserBase | RedacterBase]) -> None: _registry[model] = anonymiser -def register(anonymiser: type[AnonymiserBase | RedacterBase]) -> None: +def register(anonymiser: type[ModelAnonymiser]) -> None: """Add {model: Anonymiser} to the global registry.""" with lock: _register(anonymiser) @@ -36,13 +36,13 @@ def anonymisable_models() -> list[type[models.Model]]: return list(_registry.keys()) -def anonymisers() -> list[type[AnonymiserBase | RedacterBase]]: +def anonymisers() -> list[type[ModelAnonymiser]]: return list(_registry.values()) def get_model_anonymiser( model: type[models.Model], -) -> AnonymiserBase | RedacterBase | None: +) -> ModelAnonymiser | None: """Return newly instantiated anonymiser for model.""" if anonymiser := _registry.get(model): return anonymiser() diff --git a/anonymiser/templates/display_model_anonymisation.md b/anonymiser/templates/display_model_anonymisation.md index 8fd82a6..614e332 100644 --- a/anonymiser/templates/display_model_anonymisation.md +++ b/anonymiser/templates/display_model_anonymisation.md @@ -6,6 +6,6 @@ Model | Anonymiser {{ model }} | {{ anonymiser|default:"-" }} {% endfor %} ## Model field anonymisation -App | Model | Field | Type | Anonymised ---- | --- | --- | --- | ---{% for field in model_fields %} -{{ field.app }} | {{ field.model }} | {{ field.field_name }} | {{ field.field_type }} | {% if field.is_anonymisable %}X{% else %}-{% endif %}{% endfor %} +App | Model | Field | Type | Anonymise | Redacte +--- | --- | --- | --- | --- | ---{% for field in anonymised_models %} +{{ field.app }} | {{ field.model }} | {{ field.field_name }} | {{ field.field_type }} | {% if field.is_anonymised %}X{% else %}-{% endif %} | {{ field.redaction_strategy|default:"-" }}{% endfor %} diff --git a/pyproject.toml b/pyproject.toml index b7ec85d..971cde5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,13 +20,12 @@ classifiers = [ "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", ] packages = [{ include = "anonymiser" }] [tool.poetry.dependencies] -python = "^3.10" +python = "^3.11" django = "^3.2 || ^4.0 || ^5.0" # optional - used for testing with Postgres psycopg2-binary = { version = "*", optional = true } diff --git a/tests/anonymisers.py b/tests/anonymisers.py index ada174f..41dd852 100644 --- a/tests/anonymisers.py +++ b/tests/anonymisers.py @@ -12,6 +12,10 @@ class UserAnonymiser(ModelAnonymiser): model = User auto_redact = True + custom_field_redactions = { + "first_name": "FIRST_NAME", + } + def anonymise_first_name(self, obj: User) -> None: obj.first_name = "Anonymous" diff --git a/tests/test_models.py b/tests/test_models.py index accdfa2..505ad0a 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -4,41 +4,44 @@ from django.db import models from anonymiser.db.functions import GenerateUuid4 -from anonymiser.models import FieldSummaryData +from anonymiser.models import ModelFieldSummary from .anonymisers import BadUserAnonymiser, UserAnonymiser, UserRedacter from .models import User def test_model_fields_summary(user_anonymiser: UserAnonymiser) -> None: + f = lambda field_name: User._meta.get_field(field_name) assert user_anonymiser.get_model_field_summary() == [ - FieldSummaryData(User._meta.get_field("id"), False), - FieldSummaryData(User._meta.get_field("password"), False), - FieldSummaryData(User._meta.get_field("last_login"), False), - FieldSummaryData(User._meta.get_field("is_superuser"), False), - FieldSummaryData(User._meta.get_field("username"), False), - FieldSummaryData(User._meta.get_field("first_name"), True), - FieldSummaryData(User._meta.get_field("last_name"), False), - FieldSummaryData(User._meta.get_field("email"), False), - FieldSummaryData(User._meta.get_field("is_staff"), False), - FieldSummaryData(User._meta.get_field("is_active"), False), - FieldSummaryData(User._meta.get_field("date_joined"), False), - FieldSummaryData(User._meta.get_field("uuid"), False), - FieldSummaryData(User._meta.get_field("location"), False), - FieldSummaryData(User._meta.get_field("biography"), False), - FieldSummaryData(User._meta.get_field("date_of_birth"), False), - FieldSummaryData(User._meta.get_field("groups"), False), - FieldSummaryData(User._meta.get_field("user_permissions"), False), + ModelFieldSummary(User, f("id"), user_anonymiser), + ModelFieldSummary(User, f("password"), user_anonymiser), + ModelFieldSummary(User, f("last_login"), user_anonymiser), + ModelFieldSummary(User, f("is_superuser"), user_anonymiser), + ModelFieldSummary(User, f("username"), user_anonymiser), + ModelFieldSummary(User, f("first_name"), user_anonymiser), + ModelFieldSummary(User, f("last_name"), user_anonymiser), + ModelFieldSummary(User, f("email"), user_anonymiser), + ModelFieldSummary(User, f("is_staff"), user_anonymiser), + ModelFieldSummary(User, f("is_active"), user_anonymiser), + ModelFieldSummary(User, f("date_joined"), user_anonymiser), + ModelFieldSummary(User, f("uuid"), user_anonymiser), + ModelFieldSummary(User, f("location"), user_anonymiser), + ModelFieldSummary(User, f("biography"), user_anonymiser), + ModelFieldSummary(User, f("date_of_birth"), user_anonymiser), + ModelFieldSummary(User, f("groups"), user_anonymiser), + ModelFieldSummary(User, f("user_permissions"), user_anonymiser), ] def test_model_fields_data(user_anonymiser: UserAnonymiser) -> None: - fsd = FieldSummaryData(User._meta.get_field("first_name"), True) + fsd = ModelFieldSummary(User, User._meta.get_field("first_name"), user_anonymiser) assert fsd.app == "tests" assert fsd.model == "User" assert fsd.field_name == "first_name" assert fsd.field_type == "CharField" - assert fsd.is_anonymisable is True + assert fsd.is_anonymised is True + assert fsd.is_redacted is True + assert fsd.redaction_strategy == user_anonymiser.FieldRedactionStratgy.AUTO @pytest.mark.django_db @@ -47,13 +50,13 @@ def test_anonymise_not_implemented( self, user: User, user_anonymiser: UserAnonymiser ) -> None: with pytest.raises(NotImplementedError): - user_anonymiser.anonymise_field(user, "last_name") + user_anonymiser.anonymise_field(user, User._meta.get_field("last_name")) def test_anonymise_first_name_field( self, user: User, user_anonymiser: UserAnonymiser ) -> None: assert user.first_name == "fred" - user_anonymiser.anonymise_field(user, "first_name") + user_anonymiser.anonymise_field(user, User._meta.get_field("first_name")) assert user.first_name == "Anonymous" def test_anonymise(self, user: User, user_anonymiser: UserAnonymiser) -> None: diff --git a/tox.ini b/tox.ini index c889264..290042d 100644 --- a/tox.ini +++ b/tox.ini @@ -3,8 +3,7 @@ isolated_build = True envlist = fmt, lint, mypy, django-checks, - postgres, - py{310,311}-django{32,40,41,42,main} + py{311}-django{32,40,41,42,50,main} [testenv] deps = @@ -17,6 +16,7 @@ deps = django40: Django>=4.0,<4.1 django41: Django>=4.1,<4.2 django42: Django>=4.2,<4.3 + django50: https://github.com/django/django/archive/stable/5.0.x.tar.gz djangomain: https://github.com/django/django/archive/main.tar.gz commands = From 327d210ded980e703bf1c7ef1a603ab339349665 Mon Sep 17 00:00:00 2001 From: Hugo Rodger-Brown Date: Sun, 24 Sep 2023 10:44:30 +0100 Subject: [PATCH 04/10] Move some methods into registry.py --- .../commands/display_model_anonymisation.py | 14 ++------- anonymiser/models.py | 4 +-- anonymiser/registry.py | 29 ++++++++++++++++++- .../templates/display_model_anonymisation.md | 11 ++----- tests/test_models.py | 16 +++++----- 5 files changed, 43 insertions(+), 31 deletions(-) diff --git a/anonymiser/management/commands/display_model_anonymisation.py b/anonymiser/management/commands/display_model_anonymisation.py index 888ec77..e53d8de 100644 --- a/anonymiser/management/commands/display_model_anonymisation.py +++ b/anonymiser/management/commands/display_model_anonymisation.py @@ -1,24 +1,16 @@ from typing import Any -from django.apps import apps from django.core.management.base import BaseCommand -from django.db.models import Model from django.template.loader import render_to_string -from anonymiser.registry import get_model_anonymiser +from anonymiser import registry class Command(BaseCommand): - def get_models(self) -> list[type[Model]]: - """Force alphabetical order of models.""" - return sorted(apps.get_models(), key=lambda m: m._meta.label) - def handle(self, *args: Any, **options: Any) -> None: - for model in self.get_models(): - if anonymiser := get_model_anonymiser(model): - data = anonymiser.get_model_field_summary() + model_fields = registry.get_all_model_fields() out = render_to_string( "display_model_anonymisation.md", - {"anonymised_models": data}, + {"model_fields": model_fields}, ) self.stdout.write(out) diff --git a/anonymiser/models.py b/anonymiser/models.py index c6cfdee..b42cfbf 100644 --- a/anonymiser/models.py +++ b/anonymiser/models.py @@ -175,8 +175,8 @@ class RedacterBase(_ModelBase): custom_field_redactions: dict[str, Any] = {} class FieldRedactionStratgy(StrEnum): - AUTO = "auto" - CUSTOM = "custom" + AUTO = "AUTO" + CUSTOM = "CUSTOM" NONE = "" def is_field_redaction_auto(self, field: models.Field) -> bool: diff --git a/anonymiser/registry.py b/anonymiser/registry.py index 304fc06..bed077d 100644 --- a/anonymiser/registry.py +++ b/anonymiser/registry.py @@ -1,9 +1,11 @@ import logging import threading +from collections import defaultdict +from django.apps import apps from django.db import models -from .models import ModelAnonymiser +from .models import ModelAnonymiser, ModelFieldSummary lock = threading.Lock() logger = logging.getLogger(__name__) @@ -47,3 +49,28 @@ def get_model_anonymiser( if anonymiser := _registry.get(model): return anonymiser() return None + + +def get_all_model_fields( + anonymised_only: bool = False, +) -> dict[str, list[ModelFieldSummary]]: + """ + Return all models and their fields as ModelFieldSummary. + + The return dict uses the `app.Model` string format as the dict key, + with a list of all fields as the value. This method includes all + models by default unless the `anonymised_only` + param is True. + + """ + models = sorted(apps.get_models(), key=lambda m: m._meta.label) + output = defaultdict(list) + for m in models: + anonymiser = get_model_anonymiser(m) + if anonymised_only and not anonymiser: + continue + for f in m._meta.get_fields(): + output[m._meta.label].append(ModelFieldSummary(m, f, anonymiser)) + # sort fields by type then name - easier to scan. + output[m._meta.label].sort(key=lambda d: f"{d.field_type}.{d.field_name}") + return dict(output) diff --git a/anonymiser/templates/display_model_anonymisation.md b/anonymiser/templates/display_model_anonymisation.md index 614e332..fd1cd30 100644 --- a/anonymiser/templates/display_model_anonymisation.md +++ b/anonymiser/templates/display_model_anonymisation.md @@ -1,11 +1,4 @@ -# Model Anonymisation Snapshot - -## Registered model anonymisers -Model | Anonymiser ---- | ---{% for model,anonymiser in model_anonymisers.items %} -{{ model }} | {{ anonymiser|default:"-" }} {% endfor %} - ## Model field anonymisation App | Model | Field | Type | Anonymise | Redacte ---- | --- | --- | --- | --- | ---{% for field in anonymised_models %} -{{ field.app }} | {{ field.model }} | {{ field.field_name }} | {{ field.field_type }} | {% if field.is_anonymised %}X{% else %}-{% endif %} | {{ field.redaction_strategy|default:"-" }}{% endfor %} +--- | --- | --- | --- | --- | ---{% for model,fields in anonymised_models.items %}{% for field in fields %} +{{ field.app }} | {{ field.model }} | {{ field.field_name }} | {{ field.field_type }} | {{ field.is_anonymised|default:"-" }} | {{ field.redaction_strategy|default:"-"|upper }}{% endfor %}{% endfor %} diff --git a/tests/test_models.py b/tests/test_models.py index 505ad0a..a821cac 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -34,14 +34,14 @@ def test_model_fields_summary(user_anonymiser: UserAnonymiser) -> None: def test_model_fields_data(user_anonymiser: UserAnonymiser) -> None: - fsd = ModelFieldSummary(User, User._meta.get_field("first_name"), user_anonymiser) - assert fsd.app == "tests" - assert fsd.model == "User" - assert fsd.field_name == "first_name" - assert fsd.field_type == "CharField" - assert fsd.is_anonymised is True - assert fsd.is_redacted is True - assert fsd.redaction_strategy == user_anonymiser.FieldRedactionStratgy.AUTO + mfs = ModelFieldSummary(User, User._meta.get_field("first_name"), user_anonymiser) + assert mfs.app == "tests" + assert mfs.model == "User" + assert mfs.field_name == "first_name" + assert mfs.field_type == "CharField" + assert mfs.is_anonymised is True + assert mfs.is_redacted is True + assert mfs.redaction_strategy == user_anonymiser.FieldRedactionStratgy.CUSTOM @pytest.mark.django_db From 8441a68f7a37593e54ef0c150a4f481adf03b53f Mon Sep 17 00:00:00 2001 From: Hugo Rodger-Brown Date: Sun, 24 Sep 2023 10:53:07 +0100 Subject: [PATCH 05/10] Add more tests --- tests/test_models.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/test_models.py b/tests/test_models.py index a821cac..4dd2e0c 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -44,6 +44,27 @@ def test_model_fields_data(user_anonymiser: UserAnonymiser) -> None: assert mfs.redaction_strategy == user_anonymiser.FieldRedactionStratgy.CUSTOM +@pytest.mark.parametrize( + "field_name,strategy", + [ + ("first_name", UserAnonymiser.FieldRedactionStratgy.CUSTOM), + # non-custom redactions of char fields + ("last_name", UserAnonymiser.FieldRedactionStratgy.AUTO), + ("biography", UserAnonymiser.FieldRedactionStratgy.AUTO), + ("location", UserAnonymiser.FieldRedactionStratgy.AUTO), + # date / UUID not redacted automatically + ("date_of_birth", UserAnonymiser.FieldRedactionStratgy.NONE), + ("uuid", UserAnonymiser.FieldRedactionStratgy.NONE), + ], +) +def test_model_fields_redaction_strategy( + field_name: str, strategy: str, user_anonymiser: UserAnonymiser +) -> None: + field = User._meta.get_field(field_name) + mfs = ModelFieldSummary(User, field, user_anonymiser) + assert mfs.redaction_strategy == strategy + + @pytest.mark.django_db class TestAnonymisableUserModel: def test_anonymise_not_implemented( From 789257a78745767110746da81099fcadc45a1340 Mon Sep 17 00:00:00 2001 From: Hugo Rodger-Brown Date: Sun, 24 Sep 2023 10:55:49 +0100 Subject: [PATCH 06/10] Update management command output --- .../templates/display_model_anonymisation.md | 5 +- tests/model_anonymisation.md | 96 +++++++++---------- 2 files changed, 50 insertions(+), 51 deletions(-) diff --git a/anonymiser/templates/display_model_anonymisation.md b/anonymiser/templates/display_model_anonymisation.md index fd1cd30..d069cf3 100644 --- a/anonymiser/templates/display_model_anonymisation.md +++ b/anonymiser/templates/display_model_anonymisation.md @@ -1,4 +1,5 @@ +**DEMO PURPOSES ONLY** ## Model field anonymisation -App | Model | Field | Type | Anonymise | Redacte ---- | --- | --- | --- | --- | ---{% for model,fields in anonymised_models.items %}{% for field in fields %} +App | Model | Field | Type | Anonymise | Redact +--- | --- | --- | --- | --- | ---{% for model,fields in model_fields.items %}{% for field in fields %} {{ field.app }} | {{ field.model }} | {{ field.field_name }} | {{ field.field_type }} | {{ field.is_anonymised|default:"-" }} | {{ field.redaction_strategy|default:"-"|upper }}{% endfor %}{% endfor %} diff --git a/tests/model_anonymisation.md b/tests/model_anonymisation.md index d55cc02..27735d8 100644 --- a/tests/model_anonymisation.md +++ b/tests/model_anonymisation.md @@ -1,50 +1,48 @@ -# Model Anonymisation Snapshot - -## Registered model anonymisers -Model | Anonymiser ---- | --- -admin.LogEntry | - -auth.Group | - -auth.Permission | - -contenttypes.ContentType | - -sessions.Session | - -tests.User | UserAnonymiser - +**DEMO PURPOSES ONLY** ## Model field anonymisation -App | Model | Field | Type | Anonymised ---- | --- | --- | --- | --- -admin | LogEntry | id | AutoField | - -admin | LogEntry | object_repr | CharField | - -admin | LogEntry | action_time | DateTimeField | - -admin | LogEntry | content_type | ForeignKey | - -admin | LogEntry | user | ForeignKey | - -admin | LogEntry | action_flag | PositiveSmallIntegerField | - -admin | LogEntry | change_message | TextField | - -admin | LogEntry | object_id | TextField | - -auth | Group | id | AutoField | - -auth | Group | name | CharField | - -auth | Group | permissions | ManyToManyField | - -auth | Permission | id | AutoField | - -auth | Permission | codename | CharField | - -auth | Permission | name | CharField | - -auth | Permission | content_type | ForeignKey | - -contenttypes | ContentType | id | AutoField | - -contenttypes | ContentType | app_label | CharField | - -contenttypes | ContentType | model | CharField | - -sessions | Session | session_key | CharField | - -sessions | Session | expire_date | DateTimeField | - -sessions | Session | session_data | TextField | - -tests | User | id | AutoField | - -tests | User | is_active | BooleanField | - -tests | User | is_staff | BooleanField | - -tests | User | is_superuser | BooleanField | - -tests | User | first_name | CharField | X -tests | User | last_name | CharField | - -tests | User | password | CharField | - -tests | User | username | CharField | - -tests | User | date_joined | DateTimeField | - -tests | User | last_login | DateTimeField | - -tests | User | email | EmailField | - -tests | User | groups | ManyToManyField | - -tests | User | user_permissions | ManyToManyField | - -``` +App | Model | Field | Type | Anonymise | Redact +--- | --- | --- | --- | --- | --- +admin | LogEntry | id | AutoField | - | - +admin | LogEntry | object_repr | CharField | - | - +admin | LogEntry | action_time | DateTimeField | - | - +admin | LogEntry | content_type | ForeignKey | - | - +admin | LogEntry | user | ForeignKey | - | - +admin | LogEntry | action_flag | PositiveSmallIntegerField | - | - +admin | LogEntry | change_message | TextField | - | - +admin | LogEntry | object_id | TextField | - | - +auth | Group | id | AutoField | - | - +auth | Group | name | CharField | - | - +auth | Group | permissions | ManyToManyField | - | - +auth | Group | user | ManyToManyRel | - | - +auth | Permission | id | AutoField | - | - +auth | Permission | codename | CharField | - | - +auth | Permission | name | CharField | - | - +auth | Permission | content_type | ForeignKey | - | - +auth | Permission | group | ManyToManyRel | - | - +auth | Permission | user | ManyToManyRel | - | - +contenttypes | ContentType | id | AutoField | - | - +contenttypes | ContentType | app_label | CharField | - | - +contenttypes | ContentType | model | CharField | - | - +contenttypes | ContentType | logentry | ManyToOneRel | - | - +contenttypes | ContentType | permission | ManyToOneRel | - | - +sessions | Session | session_key | CharField | - | - +sessions | Session | expire_date | DateTimeField | - | - +sessions | Session | session_data | TextField | - | - +tests | User | id | AutoField | - | - +tests | User | is_active | BooleanField | - | - +tests | User | is_staff | BooleanField | - | - +tests | User | is_superuser | BooleanField | - | - +tests | User | first_name | CharField | True | CUSTOM +tests | User | last_name | CharField | - | AUTO +tests | User | location | CharField | - | AUTO +tests | User | password | CharField | - | AUTO +tests | User | username | CharField | - | - +tests | User | date_of_birth | DateField | - | - +tests | User | date_joined | DateTimeField | - | - +tests | User | last_login | DateTimeField | - | - +tests | User | email | EmailField | - | AUTO +tests | User | groups | ManyToManyField | - | - +tests | User | user_permissions | ManyToManyField | - | - +tests | User | logentry | ManyToOneRel | - | - +tests | User | biography | TextField | - | AUTO +tests | User | uuid | UUIDField | - | - From 4b3baa5308d2959a33c88d7dbb4c259e9910050c Mon Sep 17 00:00:00 2001 From: Hugo Rodger-Brown Date: Sun, 24 Sep 2023 18:19:29 +0300 Subject: [PATCH 07/10] Refactor model field summary data --- anonymiser/models.py | 78 +++------------- anonymiser/registry.py | 80 +++++++++++++++-- .../templates/display_model_anonymisation.md | 4 +- tests/model_anonymisation.md | 90 +++++++++---------- tests/test_models.py | 47 +++------- tests/test_registry.py | 21 ++++- 6 files changed, 164 insertions(+), 156 deletions(-) diff --git a/anonymiser/models.py b/anonymiser/models.py index b42cfbf..561e914 100644 --- a/anonymiser/models.py +++ b/anonymiser/models.py @@ -1,9 +1,7 @@ from __future__ import annotations import logging -from collections import namedtuple -from dataclasses import dataclass -from enum import StrEnum +from enum import StrEnum # 3.11 only from typing import Any, Iterator, TypeAlias from django.db import models @@ -11,66 +9,21 @@ # (old_value, new_value) tuple AnonymisationResult: TypeAlias = tuple[Any, Any] -# Store info about the field and whether it is anonymisable -FieldSummaryTuple = namedtuple( - "FieldSummaryTuple", ("app", "model", "field", "type", "is_anonymisable") -) - logger = logging.getLogger(__name__) -@dataclass -class ModelFieldSummary: +def get_model_fields(model: type[models.Model]) -> list[models.Field]: """ - Store info about the field and whether it is anonymisable. + Return a list of fields on the model. - This is used to generate a summary of the fields on a model, and how - they are anonymised / redacted - used to generate the documentation. + Removes any related_name fields. """ - - # python rejects "model" as a field name, so we use "app_model" - app_model: models.Model - model_field: models.Field - anonymiser: ModelAnonymiser | None - - @property - def model_label(self) -> str: - return self.app_model._meta.label - - @property - def app(self) -> str: - return self.app_model._meta.app_label - - @property - def model(self) -> str: - return self.app_model._meta.object_name or "" - - @property - def field_name(self) -> str: - return self.model_field.name - - @property - def field_type(self) -> str: - return self.model_field.__class__.__name__ - - @property - def is_anonymised(self) -> bool: - if self.anonymiser: - return self.anonymiser.is_field_anonymised(self.model_field) - return False - - @property - def is_redacted(self) -> bool: - if self.anonymiser: - return self.anonymiser.is_field_redacted(self.model_field) - return False - - @property - def redaction_strategy(self) -> RedacterBase.FieldRedactionStratgy: - if self.anonymiser: - return self.anonymiser.field_redaction_strategy(self.model_field) - return RedacterBase.FieldRedactionStratgy.NONE + return [ + f + for f in model._meta.get_fields() + if not isinstance(f, models.ForeignObjectRel) + ] class _ModelBase: @@ -81,11 +34,7 @@ def get_model_fields(self) -> list[models.Field]: """Return a list of fields on the model.""" if not self.model: raise NotImplementedError("model must be set") - return [ - f - for f in self.model._meta.get_fields() - if not isinstance(f, models.ForeignObjectRel) - ] + return get_model_fields(self.model) class AnonymiserBase(_ModelBase): @@ -277,10 +226,3 @@ class ModelAnonymiser(AnonymiserBase, RedacterBase): for models that do not need to be anonymised. """ - - def get_model_field_summary(self) -> list[ModelFieldSummary]: - """Return a list of ModelFieldSummary objects for the model.""" - return [ - ModelFieldSummary(app_model=self.model, model_field=f, anonymiser=self) - for f in self.get_model_fields() - ] diff --git a/anonymiser/registry.py b/anonymiser/registry.py index bed077d..fdb75f6 100644 --- a/anonymiser/registry.py +++ b/anonymiser/registry.py @@ -1,3 +1,4 @@ +import dataclasses import logging import threading from collections import defaultdict @@ -5,7 +6,7 @@ from django.apps import apps from django.db import models -from .models import ModelAnonymiser, ModelFieldSummary +from .models import ModelAnonymiser lock = threading.Lock() logger = logging.getLogger(__name__) @@ -35,22 +36,89 @@ def register(anonymiser: type[ModelAnonymiser]) -> None: def anonymisable_models() -> list[type[models.Model]]: - return list(_registry.keys()) + with lock: + return list(_registry.keys()) + + +def not_anonymisable_models() -> list[type[models.Model]]: + with lock: + return [m for m in apps.get_models() if m not in _registry] def anonymisers() -> list[type[ModelAnonymiser]]: - return list(_registry.values()) + with lock: + return list(_registry.values()) def get_model_anonymiser( model: type[models.Model], ) -> ModelAnonymiser | None: """Return newly instantiated anonymiser for model.""" - if anonymiser := _registry.get(model): - return anonymiser() + with lock: + if anonymiser := _registry.get(model): + return anonymiser() return None +@dataclasses.dataclass +class ModelFieldSummary: + """ + Store info about the field and whether it is anonymisable. + + This is used to generate a summary of the fields on a model, and how + they are anonymised / redacted - used to generate the documentation. + + """ + + field: models.Field + anonymiser: ModelAnonymiser | None = dataclasses.field(init=False) + + def __post_init__(self) -> None: + self.anonymiser = get_model_anonymiser(self.model) + + @property + def model(self) -> type[models.Model]: + return self.field.model + + @property + def app_name(self) -> str: + return self.model._meta.app_label + + @property + def model_name(self) -> str: + return self.model._meta.model_name + + @property + def model_label(self) -> str: + return self.model._meta.label + + @property + def field_name(self) -> str: + return self.field.name + + @property + def field_type(self) -> str: + return self.field.__class__.__name__ + + @property + def is_anonymised(self) -> bool: + if self.anonymiser: + return self.anonymiser.is_field_anonymised(self.field) + return False + + @property + def is_redacted(self) -> bool: + if self.anonymiser: + return self.anonymiser.is_field_redacted(self.field) + return False + + @property + def redaction_strategy(self) -> ModelAnonymiser.FieldRedactionStratgy: + if self.anonymiser: + return self.anonymiser.field_redaction_strategy(self.field) + return ModelAnonymiser.FieldRedactionStratgy.NONE + + def get_all_model_fields( anonymised_only: bool = False, ) -> dict[str, list[ModelFieldSummary]]: @@ -70,7 +138,7 @@ def get_all_model_fields( if anonymised_only and not anonymiser: continue for f in m._meta.get_fields(): - output[m._meta.label].append(ModelFieldSummary(m, f, anonymiser)) + output[m._meta.label].append(ModelFieldSummary(f)) # sort fields by type then name - easier to scan. output[m._meta.label].sort(key=lambda d: f"{d.field_type}.{d.field_name}") return dict(output) diff --git a/anonymiser/templates/display_model_anonymisation.md b/anonymiser/templates/display_model_anonymisation.md index d069cf3..052d7d4 100644 --- a/anonymiser/templates/display_model_anonymisation.md +++ b/anonymiser/templates/display_model_anonymisation.md @@ -1,5 +1,5 @@ **DEMO PURPOSES ONLY** ## Model field anonymisation App | Model | Field | Type | Anonymise | Redact ---- | --- | --- | --- | --- | ---{% for model,fields in model_fields.items %}{% for field in fields %} -{{ field.app }} | {{ field.model }} | {{ field.field_name }} | {{ field.field_type }} | {{ field.is_anonymised|default:"-" }} | {{ field.redaction_strategy|default:"-"|upper }}{% endfor %}{% endfor %} +--- | --- | --- | --- | --- | ---{% for model,fields in model_fields.items %}{% for field in fields %} +{{ field.app_name }} | {{ field.model_name }} | {{ field.field_name }} | {{ field.field_type }} | {{ field.is_anonymised|default:"-" }} | {{ field.redaction_strategy|default:"-"|upper }}{% endfor %}{% endfor %} diff --git a/tests/model_anonymisation.md b/tests/model_anonymisation.md index 27735d8..b26fd8c 100644 --- a/tests/model_anonymisation.md +++ b/tests/model_anonymisation.md @@ -1,48 +1,48 @@ **DEMO PURPOSES ONLY** ## Model field anonymisation App | Model | Field | Type | Anonymise | Redact ---- | --- | --- | --- | --- | --- -admin | LogEntry | id | AutoField | - | - -admin | LogEntry | object_repr | CharField | - | - -admin | LogEntry | action_time | DateTimeField | - | - -admin | LogEntry | content_type | ForeignKey | - | - -admin | LogEntry | user | ForeignKey | - | - -admin | LogEntry | action_flag | PositiveSmallIntegerField | - | - -admin | LogEntry | change_message | TextField | - | - -admin | LogEntry | object_id | TextField | - | - -auth | Group | id | AutoField | - | - -auth | Group | name | CharField | - | - -auth | Group | permissions | ManyToManyField | - | - -auth | Group | user | ManyToManyRel | - | - -auth | Permission | id | AutoField | - | - -auth | Permission | codename | CharField | - | - -auth | Permission | name | CharField | - | - -auth | Permission | content_type | ForeignKey | - | - -auth | Permission | group | ManyToManyRel | - | - -auth | Permission | user | ManyToManyRel | - | - -contenttypes | ContentType | id | AutoField | - | - -contenttypes | ContentType | app_label | CharField | - | - -contenttypes | ContentType | model | CharField | - | - -contenttypes | ContentType | logentry | ManyToOneRel | - | - -contenttypes | ContentType | permission | ManyToOneRel | - | - -sessions | Session | session_key | CharField | - | - -sessions | Session | expire_date | DateTimeField | - | - -sessions | Session | session_data | TextField | - | - -tests | User | id | AutoField | - | - -tests | User | is_active | BooleanField | - | - -tests | User | is_staff | BooleanField | - | - -tests | User | is_superuser | BooleanField | - | - -tests | User | first_name | CharField | True | CUSTOM -tests | User | last_name | CharField | - | AUTO -tests | User | location | CharField | - | AUTO -tests | User | password | CharField | - | AUTO -tests | User | username | CharField | - | - -tests | User | date_of_birth | DateField | - | - -tests | User | date_joined | DateTimeField | - | - -tests | User | last_login | DateTimeField | - | - -tests | User | email | EmailField | - | AUTO -tests | User | groups | ManyToManyField | - | - -tests | User | user_permissions | ManyToManyField | - | - -tests | User | logentry | ManyToOneRel | - | - -tests | User | biography | TextField | - | AUTO -tests | User | uuid | UUIDField | - | - +--- | --- | --- | --- | --- | --- +admin | logentry | id | AutoField | - | - +admin | logentry | object_repr | CharField | - | - +admin | logentry | action_time | DateTimeField | - | - +admin | logentry | content_type | ForeignKey | - | - +admin | logentry | user | ForeignKey | - | - +admin | logentry | action_flag | PositiveSmallIntegerField | - | - +admin | logentry | change_message | TextField | - | - +admin | logentry | object_id | TextField | - | - +auth | group | id | AutoField | - | - +auth | group | name | CharField | - | - +auth | group | permissions | ManyToManyField | - | - +auth | group | user | ManyToManyRel | - | - +auth | permission | id | AutoField | - | - +auth | permission | codename | CharField | - | - +auth | permission | name | CharField | - | - +auth | permission | content_type | ForeignKey | - | - +auth | permission | group | ManyToManyRel | - | - +auth | permission | user | ManyToManyRel | - | - +contenttypes | contenttype | id | AutoField | - | - +contenttypes | contenttype | app_label | CharField | - | - +contenttypes | contenttype | model | CharField | - | - +contenttypes | contenttype | logentry | ManyToOneRel | - | - +contenttypes | contenttype | permission | ManyToOneRel | - | - +sessions | session | session_key | CharField | - | - +sessions | session | expire_date | DateTimeField | - | - +sessions | session | session_data | TextField | - | - +tests | user | id | AutoField | - | - +tests | user | is_active | BooleanField | - | - +tests | user | is_staff | BooleanField | - | - +tests | user | is_superuser | BooleanField | - | - +tests | user | first_name | CharField | True | CUSTOM +tests | user | last_name | CharField | - | AUTO +tests | user | location | CharField | - | AUTO +tests | user | password | CharField | - | AUTO +tests | user | username | CharField | - | - +tests | user | date_of_birth | DateField | - | - +tests | user | date_joined | DateTimeField | - | - +tests | user | last_login | DateTimeField | - | - +tests | user | email | EmailField | - | AUTO +tests | user | groups | ManyToManyField | - | - +tests | user | user_permissions | ManyToManyField | - | - +tests | user | logentry | ManyToOneRel | - | - +tests | user | biography | TextField | - | AUTO +tests | user | uuid | UUIDField | - | - diff --git a/tests/test_models.py b/tests/test_models.py index 4dd2e0c..37e48f3 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -4,44 +4,23 @@ from django.db import models from anonymiser.db.functions import GenerateUuid4 -from anonymiser.models import ModelFieldSummary +from anonymiser.registry import ModelFieldSummary from .anonymisers import BadUserAnonymiser, UserAnonymiser, UserRedacter from .models import User -def test_model_fields_summary(user_anonymiser: UserAnonymiser) -> None: - f = lambda field_name: User._meta.get_field(field_name) - assert user_anonymiser.get_model_field_summary() == [ - ModelFieldSummary(User, f("id"), user_anonymiser), - ModelFieldSummary(User, f("password"), user_anonymiser), - ModelFieldSummary(User, f("last_login"), user_anonymiser), - ModelFieldSummary(User, f("is_superuser"), user_anonymiser), - ModelFieldSummary(User, f("username"), user_anonymiser), - ModelFieldSummary(User, f("first_name"), user_anonymiser), - ModelFieldSummary(User, f("last_name"), user_anonymiser), - ModelFieldSummary(User, f("email"), user_anonymiser), - ModelFieldSummary(User, f("is_staff"), user_anonymiser), - ModelFieldSummary(User, f("is_active"), user_anonymiser), - ModelFieldSummary(User, f("date_joined"), user_anonymiser), - ModelFieldSummary(User, f("uuid"), user_anonymiser), - ModelFieldSummary(User, f("location"), user_anonymiser), - ModelFieldSummary(User, f("biography"), user_anonymiser), - ModelFieldSummary(User, f("date_of_birth"), user_anonymiser), - ModelFieldSummary(User, f("groups"), user_anonymiser), - ModelFieldSummary(User, f("user_permissions"), user_anonymiser), - ] - - -def test_model_fields_data(user_anonymiser: UserAnonymiser) -> None: - mfs = ModelFieldSummary(User, User._meta.get_field("first_name"), user_anonymiser) - assert mfs.app == "tests" - assert mfs.model == "User" - assert mfs.field_name == "first_name" - assert mfs.field_type == "CharField" - assert mfs.is_anonymised is True - assert mfs.is_redacted is True - assert mfs.redaction_strategy == user_anonymiser.FieldRedactionStratgy.CUSTOM +# def test_model_fields_data() -> None: +# mfs = ModelFieldSummary(User._meta.get_field("first_name")) +# assert mfs.app_label == "tests" +# assert mfs.model == User +# assert mfs.model_label == "tests.User" +# assert mfs.field_name == "first_name" +# assert mfs.field_type == "CharField" +# assert mfs.anonymiser.__class__ == UserAnonymiser +# assert mfs.is_anonymised is True +# assert mfs.is_redacted is True +# assert mfs.redaction_strategy == UserAnonymiser.FieldRedactionStratgy.CUSTOM @pytest.mark.parametrize( @@ -61,7 +40,7 @@ def test_model_fields_redaction_strategy( field_name: str, strategy: str, user_anonymiser: UserAnonymiser ) -> None: field = User._meta.get_field(field_name) - mfs = ModelFieldSummary(User, field, user_anonymiser) + mfs = ModelFieldSummary(field) assert mfs.redaction_strategy == strategy diff --git a/tests/test_registry.py b/tests/test_registry.py index 0546693..4786175 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -1,5 +1,11 @@ +from __future__ import annotations + from anonymiser.decorators import register_anonymiser -from anonymiser.registry import _registry, anonymisable_models +from anonymiser.registry import ( + ModelFieldSummary, + _registry, + anonymisable_models, +) from .anonymisers import UserAnonymiser from .models import User @@ -14,3 +20,16 @@ def test_register_anonymiser() -> None: assert anonymisable_models() == [] assert register_anonymiser(UserAnonymiser) == UserAnonymiser assert anonymisable_models() == [User] + + +def test_model_fields_data() -> None: + mfs = ModelFieldSummary(User._meta.get_field("first_name")) + assert mfs.app_name == "tests" + assert mfs.model == User + assert mfs.model_label == "tests.User" + assert mfs.field_name == "first_name" + assert mfs.field_type == "CharField" + assert isinstance(mfs.anonymiser, UserAnonymiser) + assert mfs.is_anonymised is True + assert mfs.is_redacted is True + assert mfs.redaction_strategy == UserAnonymiser.FieldRedactionStratgy.CUSTOM From 5678f64613351789dd360d59438e860ccbd5e718 Mon Sep 17 00:00:00 2001 From: Hugo Rodger-Brown Date: Tue, 26 Sep 2023 13:46:05 +0300 Subject: [PATCH 08/10] Refactor Registry model --- anonymiser/decorators.py | 8 ++--- anonymiser/models.py | 10 +++---- anonymiser/registry.py | 65 +++++++++++++++++----------------------- tests/test_models.py | 25 ++++------------ tests/test_registry.py | 16 ++++------ 5 files changed, 47 insertions(+), 77 deletions(-) diff --git a/anonymiser/decorators.py b/anonymiser/decorators.py index dc777f3..bc32218 100644 --- a/anonymiser/decorators.py +++ b/anonymiser/decorators.py @@ -1,10 +1,8 @@ from .models import ModelAnonymiser -from .registry import register +from .registry import register_model_anonoymiser -def register_anonymiser( - anonymiser: type[ModelAnonymiser], -) -> type[ModelAnonymiser]: +def register_anonymiser(anonymiser: type[ModelAnonymiser]) -> type[ModelAnonymiser]: """Add {model: Anonymiser} to the global registry.""" - register(anonymiser) + register_model_anonoymiser(anonymiser) return anonymiser diff --git a/anonymiser/models.py b/anonymiser/models.py index 561e914..7d11d2f 100644 --- a/anonymiser/models.py +++ b/anonymiser/models.py @@ -123,7 +123,7 @@ class RedacterBase(_ModelBase): # or a db function, e.g. F("field_name") or Value("static value"). custom_field_redactions: dict[str, Any] = {} - class FieldRedactionStratgy(StrEnum): + class FieldRedactionStrategy(StrEnum): AUTO = "AUTO" CUSTOM = "CUSTOM" NONE = "" @@ -177,13 +177,13 @@ def _max_length(f: models.Field) -> int: if self.is_field_redaction_auto(f) } - def field_redaction_strategy(self, field: models.Field) -> FieldRedactionStratgy: + def field_redaction_strategy(self, field: models.Field) -> FieldRedactionStrategy: """Return the FieldRedaction value for a field.""" if self.is_field_redaction_custom(field): - return self.FieldRedactionStratgy.CUSTOM + return self.FieldRedactionStrategy.CUSTOM if self.is_field_redaction_auto(field): - return self.FieldRedactionStratgy.AUTO - return self.FieldRedactionStratgy.NONE + return self.FieldRedactionStrategy.AUTO + return self.FieldRedactionStrategy.NONE def redact_queryset( self, diff --git a/anonymiser/registry.py b/anonymiser/registry.py index fdb75f6..4288336 100644 --- a/anonymiser/registry.py +++ b/anonymiser/registry.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import dataclasses import logging import threading @@ -13,50 +15,33 @@ class Registry(dict): - pass - - -# global registry -_registry = Registry() - - -def _register(anonymiser: type[ModelAnonymiser]) -> None: - if not (model := anonymiser.model): - raise ValueError("Anonymiser must have a model attribute set.") - if model in _registry: - raise ValueError(f"Anonymiser for {model} already registered") - logging.debug("Adding anonymiser for %s to registry", model._meta.label) - _registry[model] = anonymiser + def anonymisable_models(self) -> list[type[models.Model]]: + return [m for m in self.keys() if self[m]] + def non_anonymisable_models(self) -> list[type[models.Model]]: + return [m for m in self.keys() if self[m] is None] -def register(anonymiser: type[ModelAnonymiser]) -> None: - """Add {model: Anonymiser} to the global registry.""" - with lock: - _register(anonymiser) + def is_model_anonymisable(self, model: type[models.Model]) -> bool: + return bool(self[model]) + def register_anonymiser(self, anonymiser: type[ModelAnonymiser]) -> None: + with lock: + if not (model := anonymiser.model): + raise ValueError("Anonymiser must have a model attribute set.") + if model in self: + raise ValueError(f"Anonymiser for {model} already registered") + logging.debug("Adding anonymiser for %s to registry", model._meta.label) + self[model] = anonymiser -def anonymisable_models() -> list[type[models.Model]]: - with lock: - return list(_registry.keys()) +def register_model_anonoymiser(anonymiser: type[ModelAnonymiser]) -> None: + _registry.register_anonymiser(anonymiser) -def not_anonymisable_models() -> list[type[models.Model]]: - with lock: - return [m for m in apps.get_models() if m not in _registry] - -def anonymisers() -> list[type[ModelAnonymiser]]: - with lock: - return list(_registry.values()) - - -def get_model_anonymiser( - model: type[models.Model], -) -> ModelAnonymiser | None: +def get_model_anonymiser(model: type[models.Model]) -> ModelAnonymiser | None: """Return newly instantiated anonymiser for model.""" - with lock: - if anonymiser := _registry.get(model): - return anonymiser() + if anonymiser := _registry.get(model): + return anonymiser() return None @@ -113,10 +98,10 @@ def is_redacted(self) -> bool: return False @property - def redaction_strategy(self) -> ModelAnonymiser.FieldRedactionStratgy: + def redaction_strategy(self) -> ModelAnonymiser.FieldRedactionStrategy: if self.anonymiser: return self.anonymiser.field_redaction_strategy(self.field) - return ModelAnonymiser.FieldRedactionStratgy.NONE + return ModelAnonymiser.FieldRedactionStrategy.NONE def get_all_model_fields( @@ -142,3 +127,7 @@ def get_all_model_fields( # sort fields by type then name - easier to scan. output[m._meta.label].sort(key=lambda d: f"{d.field_type}.{d.field_name}") return dict(output) + + +# Registry object - initialised in init_registry() +_registry: Registry = Registry() diff --git a/tests/test_models.py b/tests/test_models.py index 37e48f3..4865134 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -10,30 +10,17 @@ from .models import User -# def test_model_fields_data() -> None: -# mfs = ModelFieldSummary(User._meta.get_field("first_name")) -# assert mfs.app_label == "tests" -# assert mfs.model == User -# assert mfs.model_label == "tests.User" -# assert mfs.field_name == "first_name" -# assert mfs.field_type == "CharField" -# assert mfs.anonymiser.__class__ == UserAnonymiser -# assert mfs.is_anonymised is True -# assert mfs.is_redacted is True -# assert mfs.redaction_strategy == UserAnonymiser.FieldRedactionStratgy.CUSTOM - - @pytest.mark.parametrize( "field_name,strategy", [ - ("first_name", UserAnonymiser.FieldRedactionStratgy.CUSTOM), + ("first_name", UserAnonymiser.FieldRedactionStrategy.CUSTOM), # non-custom redactions of char fields - ("last_name", UserAnonymiser.FieldRedactionStratgy.AUTO), - ("biography", UserAnonymiser.FieldRedactionStratgy.AUTO), - ("location", UserAnonymiser.FieldRedactionStratgy.AUTO), + ("last_name", UserAnonymiser.FieldRedactionStrategy.AUTO), + ("biography", UserAnonymiser.FieldRedactionStrategy.AUTO), + ("location", UserAnonymiser.FieldRedactionStrategy.AUTO), # date / UUID not redacted automatically - ("date_of_birth", UserAnonymiser.FieldRedactionStratgy.NONE), - ("uuid", UserAnonymiser.FieldRedactionStratgy.NONE), + ("date_of_birth", UserAnonymiser.FieldRedactionStrategy.NONE), + ("uuid", UserAnonymiser.FieldRedactionStrategy.NONE), ], ) def test_model_fields_redaction_strategy( diff --git a/tests/test_registry.py b/tests/test_registry.py index 4786175..ce571e9 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -1,25 +1,21 @@ from __future__ import annotations from anonymiser.decorators import register_anonymiser -from anonymiser.registry import ( - ModelFieldSummary, - _registry, - anonymisable_models, -) +from anonymiser.registry import ModelFieldSummary, _registry from .anonymisers import UserAnonymiser from .models import User def test_registry() -> None: - assert anonymisable_models() == [User] + assert list(_registry.keys()) == [User] def test_register_anonymiser() -> None: _registry.clear() - assert anonymisable_models() == [] - assert register_anonymiser(UserAnonymiser) == UserAnonymiser - assert anonymisable_models() == [User] + assert _registry == {} + register_anonymiser(UserAnonymiser) + assert _registry == {User: UserAnonymiser} def test_model_fields_data() -> None: @@ -32,4 +28,4 @@ def test_model_fields_data() -> None: assert isinstance(mfs.anonymiser, UserAnonymiser) assert mfs.is_anonymised is True assert mfs.is_redacted is True - assert mfs.redaction_strategy == UserAnonymiser.FieldRedactionStratgy.CUSTOM + assert mfs.redaction_strategy == UserAnonymiser.FieldRedactionStrategy.CUSTOM From ef1b7fd0ff2927b4e0ddc1dcb52e88f9ef176d17 Mon Sep 17 00:00:00 2001 From: Hugo Rodger-Brown Date: Tue, 26 Sep 2023 14:13:17 +0300 Subject: [PATCH 09/10] Update report format --- anonymiser/models.py | 63 +++++++++++++ anonymiser/registry.py | 62 +------------ .../templates/display_model_anonymisation.md | 2 +- tests/model_anonymisation.md | 88 +++++++++---------- tests/test_models.py | 14 +++ tests/test_registry.py | 15 +--- 6 files changed, 124 insertions(+), 120 deletions(-) diff --git a/anonymiser/models.py b/anonymiser/models.py index 7d11d2f..cd295f6 100644 --- a/anonymiser/models.py +++ b/anonymiser/models.py @@ -1,5 +1,6 @@ from __future__ import annotations +import dataclasses import logging from enum import StrEnum # 3.11 only from typing import Any, Iterator, TypeAlias @@ -226,3 +227,65 @@ class ModelAnonymiser(AnonymiserBase, RedacterBase): for models that do not need to be anonymised. """ + + +@dataclasses.dataclass +class ModelFieldSummary: + """ + Store info about the field and whether it is anonymisable. + + This is used to generate a summary of the fields on a model, and how + they are anonymised / redacted - used to generate the documentation. + + """ + + field: models.Field + anonymiser: ModelAnonymiser | None = dataclasses.field(init=False) + + def __post_init__(self) -> None: + # circ import + from .registry import get_model_anonymiser + + self.anonymiser = get_model_anonymiser(self.model) + + @property + def model(self) -> type[models.Model]: + return self.field.model + + @property + def app_label(self) -> str: + return self.model._meta.app_label + + @property + def model_name(self) -> str: + return self.label.split(".")[-1] + + @property + def label(self) -> str: + return self.model._meta.label + + @property + def field_name(self) -> str: + return self.field.name + + @property + def field_type(self) -> str: + return self.field.__class__.__name__ + + @property + def is_anonymised(self) -> bool: + if self.anonymiser: + return self.anonymiser.is_field_anonymised(self.field) + return False + + @property + def is_redacted(self) -> bool: + if self.anonymiser: + return self.anonymiser.is_field_redacted(self.field) + return False + + @property + def redaction_strategy(self) -> ModelAnonymiser.FieldRedactionStrategy: + if self.anonymiser: + return self.anonymiser.field_redaction_strategy(self.field) + return ModelAnonymiser.FieldRedactionStrategy.NONE diff --git a/anonymiser/registry.py b/anonymiser/registry.py index 4288336..57767b2 100644 --- a/anonymiser/registry.py +++ b/anonymiser/registry.py @@ -1,6 +1,5 @@ from __future__ import annotations -import dataclasses import logging import threading from collections import defaultdict @@ -8,7 +7,7 @@ from django.apps import apps from django.db import models -from .models import ModelAnonymiser +from .models import ModelAnonymiser, ModelFieldSummary lock = threading.Lock() logger = logging.getLogger(__name__) @@ -45,65 +44,6 @@ def get_model_anonymiser(model: type[models.Model]) -> ModelAnonymiser | None: return None -@dataclasses.dataclass -class ModelFieldSummary: - """ - Store info about the field and whether it is anonymisable. - - This is used to generate a summary of the fields on a model, and how - they are anonymised / redacted - used to generate the documentation. - - """ - - field: models.Field - anonymiser: ModelAnonymiser | None = dataclasses.field(init=False) - - def __post_init__(self) -> None: - self.anonymiser = get_model_anonymiser(self.model) - - @property - def model(self) -> type[models.Model]: - return self.field.model - - @property - def app_name(self) -> str: - return self.model._meta.app_label - - @property - def model_name(self) -> str: - return self.model._meta.model_name - - @property - def model_label(self) -> str: - return self.model._meta.label - - @property - def field_name(self) -> str: - return self.field.name - - @property - def field_type(self) -> str: - return self.field.__class__.__name__ - - @property - def is_anonymised(self) -> bool: - if self.anonymiser: - return self.anonymiser.is_field_anonymised(self.field) - return False - - @property - def is_redacted(self) -> bool: - if self.anonymiser: - return self.anonymiser.is_field_redacted(self.field) - return False - - @property - def redaction_strategy(self) -> ModelAnonymiser.FieldRedactionStrategy: - if self.anonymiser: - return self.anonymiser.field_redaction_strategy(self.field) - return ModelAnonymiser.FieldRedactionStrategy.NONE - - def get_all_model_fields( anonymised_only: bool = False, ) -> dict[str, list[ModelFieldSummary]]: diff --git a/anonymiser/templates/display_model_anonymisation.md b/anonymiser/templates/display_model_anonymisation.md index 052d7d4..3760ac2 100644 --- a/anonymiser/templates/display_model_anonymisation.md +++ b/anonymiser/templates/display_model_anonymisation.md @@ -2,4 +2,4 @@ ## Model field anonymisation App | Model | Field | Type | Anonymise | Redact --- | --- | --- | --- | --- | ---{% for model,fields in model_fields.items %}{% for field in fields %} -{{ field.app_name }} | {{ field.model_name }} | {{ field.field_name }} | {{ field.field_type }} | {{ field.is_anonymised|default:"-" }} | {{ field.redaction_strategy|default:"-"|upper }}{% endfor %}{% endfor %} +{{ field.app_label }} | {{ field.model_name }} | {{ field.field_name }} | {{ field.field_type }} | {% if field.is_anonymised %}X{% else %}-{% endif %} | {{ field.redaction_strategy|default:"-"|upper }}{% endfor %}{% endfor %} diff --git a/tests/model_anonymisation.md b/tests/model_anonymisation.md index b26fd8c..40695e9 100644 --- a/tests/model_anonymisation.md +++ b/tests/model_anonymisation.md @@ -2,47 +2,47 @@ ## Model field anonymisation App | Model | Field | Type | Anonymise | Redact --- | --- | --- | --- | --- | --- -admin | logentry | id | AutoField | - | - -admin | logentry | object_repr | CharField | - | - -admin | logentry | action_time | DateTimeField | - | - -admin | logentry | content_type | ForeignKey | - | - -admin | logentry | user | ForeignKey | - | - -admin | logentry | action_flag | PositiveSmallIntegerField | - | - -admin | logentry | change_message | TextField | - | - -admin | logentry | object_id | TextField | - | - -auth | group | id | AutoField | - | - -auth | group | name | CharField | - | - -auth | group | permissions | ManyToManyField | - | - -auth | group | user | ManyToManyRel | - | - -auth | permission | id | AutoField | - | - -auth | permission | codename | CharField | - | - -auth | permission | name | CharField | - | - -auth | permission | content_type | ForeignKey | - | - -auth | permission | group | ManyToManyRel | - | - -auth | permission | user | ManyToManyRel | - | - -contenttypes | contenttype | id | AutoField | - | - -contenttypes | contenttype | app_label | CharField | - | - -contenttypes | contenttype | model | CharField | - | - -contenttypes | contenttype | logentry | ManyToOneRel | - | - -contenttypes | contenttype | permission | ManyToOneRel | - | - -sessions | session | session_key | CharField | - | - -sessions | session | expire_date | DateTimeField | - | - -sessions | session | session_data | TextField | - | - -tests | user | id | AutoField | - | - -tests | user | is_active | BooleanField | - | - -tests | user | is_staff | BooleanField | - | - -tests | user | is_superuser | BooleanField | - | - -tests | user | first_name | CharField | True | CUSTOM -tests | user | last_name | CharField | - | AUTO -tests | user | location | CharField | - | AUTO -tests | user | password | CharField | - | AUTO -tests | user | username | CharField | - | - -tests | user | date_of_birth | DateField | - | - -tests | user | date_joined | DateTimeField | - | - -tests | user | last_login | DateTimeField | - | - -tests | user | email | EmailField | - | AUTO -tests | user | groups | ManyToManyField | - | - -tests | user | user_permissions | ManyToManyField | - | - -tests | user | logentry | ManyToOneRel | - | - -tests | user | biography | TextField | - | AUTO -tests | user | uuid | UUIDField | - | - +admin | LogEntry | id | AutoField | - | - +admin | LogEntry | object_repr | CharField | - | - +admin | LogEntry | action_time | DateTimeField | - | - +admin | LogEntry | content_type | ForeignKey | - | - +admin | LogEntry | user | ForeignKey | - | - +admin | LogEntry | action_flag | PositiveSmallIntegerField | - | - +admin | LogEntry | change_message | TextField | - | - +admin | LogEntry | object_id | TextField | - | - +auth | Group | id | AutoField | - | - +auth | Group | name | CharField | - | - +auth | Group | permissions | ManyToManyField | - | - +auth | Group | user | ManyToManyRel | - | - +auth | Permission | id | AutoField | - | - +auth | Permission | codename | CharField | - | - +auth | Permission | name | CharField | - | - +auth | Permission | content_type | ForeignKey | - | - +auth | Permission | group | ManyToManyRel | - | - +auth | Permission | user | ManyToManyRel | - | - +contenttypes | ContentType | id | AutoField | - | - +contenttypes | ContentType | app_label | CharField | - | - +contenttypes | ContentType | model | CharField | - | - +contenttypes | ContentType | logentry | ManyToOneRel | - | - +contenttypes | ContentType | permission | ManyToOneRel | - | - +sessions | Session | session_key | CharField | - | - +sessions | Session | expire_date | DateTimeField | - | - +sessions | Session | session_data | TextField | - | - +tests | User | id | AutoField | - | - +tests | User | is_active | BooleanField | - | - +tests | User | is_staff | BooleanField | - | - +tests | User | is_superuser | BooleanField | - | - +tests | User | first_name | CharField | X | CUSTOM +tests | User | last_name | CharField | - | AUTO +tests | User | location | CharField | - | AUTO +tests | User | password | CharField | - | AUTO +tests | User | username | CharField | - | - +tests | User | date_of_birth | DateField | - | - +tests | User | date_joined | DateTimeField | - | - +tests | User | last_login | DateTimeField | - | - +tests | User | email | EmailField | - | AUTO +tests | User | groups | ManyToManyField | - | - +tests | User | user_permissions | ManyToManyField | - | - +tests | User | logentry | ManyToOneRel | - | - +tests | User | biography | TextField | - | AUTO +tests | User | uuid | UUIDField | - | - diff --git a/tests/test_models.py b/tests/test_models.py index 4865134..0df971f 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -177,3 +177,17 @@ def test_auto_redact( "char_field": 255 * "X", "text_field": 400 * "X", } + + +def test_model_fields_data() -> None: + mfs = ModelFieldSummary(User._meta.get_field("first_name")) + assert mfs.app_label == "tests" + assert mfs.model == User + assert mfs.label == "tests.User" + assert mfs.model_name == "User" + assert mfs.field_name == "first_name" + assert mfs.field_type == "CharField" + assert isinstance(mfs.anonymiser, UserAnonymiser) + assert mfs.is_anonymised is True + assert mfs.is_redacted is True + assert mfs.redaction_strategy == UserAnonymiser.FieldRedactionStrategy.CUSTOM diff --git a/tests/test_registry.py b/tests/test_registry.py index ce571e9..9d79a75 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -1,7 +1,7 @@ from __future__ import annotations from anonymiser.decorators import register_anonymiser -from anonymiser.registry import ModelFieldSummary, _registry +from anonymiser.registry import _registry from .anonymisers import UserAnonymiser from .models import User @@ -16,16 +16,3 @@ def test_register_anonymiser() -> None: assert _registry == {} register_anonymiser(UserAnonymiser) assert _registry == {User: UserAnonymiser} - - -def test_model_fields_data() -> None: - mfs = ModelFieldSummary(User._meta.get_field("first_name")) - assert mfs.app_name == "tests" - assert mfs.model == User - assert mfs.model_label == "tests.User" - assert mfs.field_name == "first_name" - assert mfs.field_type == "CharField" - assert isinstance(mfs.anonymiser, UserAnonymiser) - assert mfs.is_anonymised is True - assert mfs.is_redacted is True - assert mfs.redaction_strategy == UserAnonymiser.FieldRedactionStrategy.CUSTOM From c36db7c75ec55f67fbea92eb89e4f14b6f14cda6 Mon Sep 17 00:00:00 2001 From: Hugo Rodger-Brown Date: Tue, 26 Sep 2023 14:17:28 +0300 Subject: [PATCH 10/10] Fix spelling error --- anonymiser/decorators.py | 4 ++-- anonymiser/registry.py | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/anonymiser/decorators.py b/anonymiser/decorators.py index bc32218..adc312b 100644 --- a/anonymiser/decorators.py +++ b/anonymiser/decorators.py @@ -1,8 +1,8 @@ from .models import ModelAnonymiser -from .registry import register_model_anonoymiser +from .registry import register_model_anonymiser def register_anonymiser(anonymiser: type[ModelAnonymiser]) -> type[ModelAnonymiser]: """Add {model: Anonymiser} to the global registry.""" - register_model_anonoymiser(anonymiser) + register_model_anonymiser(anonymiser) return anonymiser diff --git a/anonymiser/registry.py b/anonymiser/registry.py index 57767b2..5f4fb6a 100644 --- a/anonymiser/registry.py +++ b/anonymiser/registry.py @@ -33,7 +33,7 @@ def register_anonymiser(self, anonymiser: type[ModelAnonymiser]) -> None: self[model] = anonymiser -def register_model_anonoymiser(anonymiser: type[ModelAnonymiser]) -> None: +def register_model_anonymiser(anonymiser: type[ModelAnonymiser]) -> None: _registry.register_anonymiser(anonymiser) @@ -44,6 +44,11 @@ def get_model_anonymiser(model: type[models.Model]) -> ModelAnonymiser | None: return None +def get_anonymisable_models() -> list[type[models.Model]]: + """Return list of all models that have an anonymiser.""" + return _registry.anonymisable_models() + + def get_all_model_fields( anonymised_only: bool = False, ) -> dict[str, list[ModelFieldSummary]]: