From a45225f1d7f8742ff5359890f3efce3c098844bd Mon Sep 17 00:00:00 2001 From: Hugo Rodger-Brown Date: Mon, 2 Oct 2023 16:43:20 +0100 Subject: [PATCH] Refactor RedacterBase --- anonymiser/models.py | 144 +++++++++-------------- anonymiser/redacters.py | 51 ++++++++ tests/anonymisers.py | 11 ++ tests/migrations/0004_user_extra_info.py | 17 +++ tests/models.py | 1 + tests/test_models.py | 31 +---- 6 files changed, 139 insertions(+), 116 deletions(-) create mode 100644 anonymiser/redacters.py create mode 100644 tests/migrations/0004_user_extra_info.py diff --git a/anonymiser/models.py b/anonymiser/models.py index 2ec410a..9d6e444 100644 --- a/anonymiser/models.py +++ b/anonymiser/models.py @@ -3,11 +3,11 @@ import dataclasses import logging from enum import StrEnum # 3.11 only -from typing import Any, Iterator, TypeAlias +from typing import Any, Callable, TypeAlias from django.db import models -from .settings import AUTO_REDACT_FIELD_FUNCS +from .redacters import get_default_field_redacter # (old_value, new_value) tuple AnonymisationResult: TypeAlias = tuple[Any, Any] @@ -29,12 +29,6 @@ def get_model_fields(model: type[models.Model]) -> list[models.Field]: ] -def auto_redact(field: type[models.Field]) -> Any: - if func := AUTO_REDACT_FIELD_FUNCS.get(field.__class__): - return func(field) - return None - - class _ModelBase: # Override with the model to be anonymised model: type[models.Model] @@ -45,19 +39,6 @@ def get_model_fields(self) -> list[models.Field]: raise NotImplementedError("model must be set") return get_model_fields(self.model) - def exclude_from_anonymisation(self, queryset: models.QuerySet) -> models.QuerySet: - """ - Override in subclasses to exclude any objects from anonymisation. - - Canonical example is to exclude certain users from anonymisation - - in this case the UserAnonymiser would override this method to - exclude e.g. is_staff=True users. - - Default is a noop. - - """ - return queryset - class AnonymiserBase(_ModelBase): """Base class for anonymisation functions.""" @@ -111,15 +92,6 @@ def anonymise_object(self, obj: models.Model) -> None: output[field.name] = self.anonymise_field(obj, field) self.post_anonymise_object(obj, **output) - def anonymise_queryset(self, queryset: Iterator[models.Model]) -> int: - """Anonymise all objects in the queryset (and SAVE).""" - count = 0 - for obj in self.exclude_from_anonymisation(queryset): - self.anonymise_object(obj) - obj.save() - count += 1 - return count - def post_anonymise_object( self, obj: models.Model, **updates: AnonymisationResult ) -> None: @@ -150,70 +122,80 @@ class FieldRedactionStrategy(StrEnum): CUSTOM = "CUSTOM" NONE = "" - def is_field_redaction_auto(self, field: models.Field) -> bool: + def is_field_redactable(self, field: models.Field) -> bool: """ - Return True if the field should be auto-redacted. + Return True if the field can be redacted. - Return False if the class-level auto_redact attr is False. - - Currently this includes text fields that are not choices, primary - keys, unique fields, or in the auto_redact_exclude list. + By default primary keys, relations, and choice fields cannot be + redacted. Override this method to change this behaviour. """ - if not self.auto_redact: - return False - if field.name in self.auto_redact_exclude: - return False if field.is_relation: return False if getattr(field, "primary_key", False): return False - if getattr(field, "choices", []): + if getattr(field, "choices", None): return False - if isinstance(field, models.UUIDField): - return self.auto_redact - return isinstance(field, tuple(AUTO_REDACT_FIELD_FUNCS.keys())) and not getattr( - field, "unique", False - ) - - def is_field_redaction_custom(self, field: models.Field) -> bool: - """Return True if the field has custom redaction.""" - field.choices - return field.name in self.custom_field_redactions - - def is_field_redacted(self, field: models.Field) -> bool: - """Return True if the field is redacted.""" - return self.is_field_redaction_auto(field) or self.is_field_redaction_custom( - field - ) - - def auto_field_redactions(self) -> dict[str, object | None]: - """ - Return a dict of redaction_values for all text fields. - - This is used to "auto-redact" all char/text fields with "X" - if - the field does not use choices, and is not a primary key or - unique field. + if getattr(field, "unique", None): + return False + return True - """ - return { - f.name: auto_redact(f) - for f in self.get_model_fields() - if self.is_field_redaction_auto(f) - } + def get_redactable_fields(self) -> list[models.Field]: + """Return a list of fields on the model that are redactable.""" + return [f for f in self.get_model_fields() if self.is_field_redactable(f)] def field_redaction_strategy(self, field: models.Field) -> FieldRedactionStrategy: """Return the FieldRedaction value for a field.""" - if self.is_field_redaction_custom(field): + if field.name in self.custom_field_redactions: return self.FieldRedactionStrategy.CUSTOM - if self.is_field_redaction_auto(field): + if self.get_field_auto_redacter(field): return self.FieldRedactionStrategy.AUTO return self.FieldRedactionStrategy.NONE + def get_field_auto_redacter( + self, field: models.Field + ) -> Callable[[models.Field], Any] | None: + """ + Return the auto redacter function for a field. + + Override this to provide global auto-redaction functions for + your models. + + """ + if not self.auto_redact: + return None + if field.name in self.auto_redact_exclude: + return None + # will return None if the field isn't already handled by the + # default redacters. + return get_default_field_redacter(field) + + def get_auto_redaction_values(self) -> dict[str, Any]: + """Return field:value dict for all auto-redactable fields.""" + # because None is a valid redaction value, we need to do this in + # two passes - first get the redacter function, which _can_ be None, + # then filter out the None values and call the redacter function + # on the field. + auto_redactors = { + f: self.get_field_auto_redacter(f) for f in self.get_redactable_fields() + } + return {f.name: func(f) for f, func in auto_redactors.items() if func} + + def get_field_redaction_values(self) -> dict[str, Any]: + """ + Return the redaction values for all field, custom or auto. + + This is a cascading lookup - start with all the auto-redaction + values, then overwrite with the custom values. + + """ + vals = self.get_auto_redaction_values() + vals.update(self.custom_field_redactions) + return vals + def redact_queryset( self, queryset: models.QuerySet[models.Model], - auto_redact_override: bool | None = None, **field_overrides: Any, ) -> int: """ @@ -233,13 +215,7 @@ def redact_queryset( - field_overrides (values passed in to method) """ - redactions: dict[str, Any] = {} - auto = ( - self.auto_redact if auto_redact_override is None else auto_redact_override - ) - if auto: - redactions.update(self.auto_field_redactions()) - redactions.update(self.custom_field_redactions) + redactions = self.get_field_redaction_values() redactions.update(field_overrides) return queryset.update(**redactions) @@ -305,12 +281,6 @@ def is_anonymised(self) -> bool: return self.anonymiser.is_field_anonymised(self.field) return False - @property - def is_redacted(self) -> bool: - if self.anonymiser: - return self.anonymiser.is_field_redacted(self.field) - return False - @property def redaction_strategy(self) -> ModelAnonymiser.FieldRedactionStrategy: if self.anonymiser: diff --git a/anonymiser/redacters.py b/anonymiser/redacters.py new file mode 100644 index 0000000..3f221c3 --- /dev/null +++ b/anonymiser/redacters.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +from typing import Any, Callable + +from django.db import models +from django.utils import timezone + +from anonymiser.db.functions import GenerateUuid4 + + +def default_redact_charfield(field: models.CharField) -> str: + return "X" * field.max_length + + +def default_redact_textfield(field: models.TextField) -> str: + return "X" * 400 + + +def default_redact_datefield(field: models.DateField) -> str: + return timezone.now().date().isoformat() + + +def default_redact_datetimefield(field: models.DateTimeField) -> str: + return timezone.now().isoformat() + + +def default_redact_jsonfield(field: models.JSONField) -> dict[str, Any]: + return {} + + +def default_redact_uuidfield(field: models.UUIDField) -> str: + return GenerateUuid4() + + +def get_default_field_redacter( + field: models.Field, +) -> Callable[[models.Field], Any] | None: + """Return default redacter for basic Django field types.""" + if isinstance(field, models.CharField): + return default_redact_charfield + if isinstance(field, models.TextField): + return default_redact_textfield + if isinstance(field, models.DateField): + return default_redact_datefield + if isinstance(field, models.DateTimeField): + return default_redact_datetimefield + if isinstance(field, models.JSONField): + return default_redact_jsonfield + if isinstance(field, models.UUIDField): + return default_redact_uuidfield + return None diff --git a/tests/anonymisers.py b/tests/anonymisers.py index 41dd852..df7a9a4 100644 --- a/tests/anonymisers.py +++ b/tests/anonymisers.py @@ -1,3 +1,6 @@ +from typing import Any, Callable + +from django.db import models from django.db.models import F, Value from django.db.models.functions import Concat @@ -36,3 +39,11 @@ class UserRedacter(RedacterBase): "last_name": "LAST_NAME", "email": Concat(Value("user_"), F("id"), Value("@example.com")), } + + def get_field_auto_redacter( + self, field: models.Field + ) -> Callable[[models.Field], Any] | None: + # Totally contrived example used for testing purposes only + if isinstance(field, models.JSONField): + return lambda f: {"foo": "bar"} + return super().get_field_auto_redacter(field) diff --git a/tests/migrations/0004_user_extra_info.py b/tests/migrations/0004_user_extra_info.py new file mode 100644 index 0000000..1d566a6 --- /dev/null +++ b/tests/migrations/0004_user_extra_info.py @@ -0,0 +1,17 @@ +# Generated by Django 4.2.5 on 2023-10-02 15:40 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("tests", "0003_user_biography_user_date_of_birth_user_location"), + ] + + operations = [ + migrations.AddField( + model_name="user", + name="extra_info", + field=models.JSONField(default=dict), + ), + ] diff --git a/tests/models.py b/tests/models.py index 6beae49..64fefd3 100644 --- a/tests/models.py +++ b/tests/models.py @@ -9,3 +9,4 @@ class User(AbstractUser): location = models.CharField(max_length=255, blank=True) biography = models.TextField(blank=True) date_of_birth = models.DateField(blank=True, null=True) + extra_info = models.JSONField(default=dict) diff --git a/tests/test_models.py b/tests/test_models.py index 6fba8db..596d525 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -77,12 +77,6 @@ def test_get_anonymisable_fields(self, user_anonymiser: UserAnonymiser) -> None: User._meta.get_field("first_name") ] - def test_anonymise_queryset( - self, user: User, user_anonymiser: UserAnonymiser - ) -> None: - assert user_anonymiser.anonymise_queryset(User.objects.none()) == 0 - assert user_anonymiser.anonymise_queryset(User.objects.all()) == 1 - def test_bad_anonymiser() -> None: with pytest.raises(AttributeError): @@ -102,6 +96,7 @@ def test_redact_queryset_one(self, user: User, user_redacter: UserRedacter) -> N assert user.first_name == "FIRST_NAME" assert user.last_name == "LAST_NAME" assert user.email == f"user_{user.id}@example.com" + assert user.extra_info == {"foo": "bar"} def test_redact_queryset_two( self, @@ -115,27 +110,6 @@ def test_redact_queryset_two( # confirm that we haven't reused the same uuid for all objects assert user.uuid != user2.uuid - @pytest.mark.parametrize( - "override,location,biography", - [ - (True, 255 * "X", 400 * "X"), - (False, "London", "I am a test user"), - ], - ) - def test_redact_queryset__auto_redact_with_override( - self, - user: User, - user_redacter: UserRedacter, - override: bool, - location: str, - biography: str, - ) -> None: - user_redacter.redact_queryset(User.objects.all(), auto_redact_override=override) - user.refresh_from_db() - # auto-redacted fields - assert user.location == location - assert user.biography == biography - def test_redact_queryset__field_overrides( self, user: User, @@ -177,7 +151,7 @@ def test_auto_redact( models.IntegerField(name="integer_field"), models.DateField(name="date_field"), ] - assert user_redacter.auto_field_redactions() == { + assert user_redacter.get_auto_redaction_values() == { "char_field": 255 * "X", "text_field": 400 * "X", "date_field": "2021-01-01", @@ -194,5 +168,4 @@ def test_model_fields_data() -> None: assert mfs.field_type == "CharField" assert isinstance(mfs.anonymiser, UserAnonymiser) assert mfs.is_anonymised is True - assert mfs.is_redacted is True assert mfs.redaction_strategy == UserAnonymiser.FieldRedactionStrategy.CUSTOM