Skip to content

Commit

Permalink
Refactor RedacterBase
Browse files Browse the repository at this point in the history
  • Loading branch information
hugorodgerbrown committed Oct 2, 2023
1 parent f6cbc52 commit a45225f
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 116 deletions.
144 changes: 57 additions & 87 deletions anonymiser/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import dataclasses
import logging
from enum import StrEnum # 3.11 only
from typing import Any, Iterator, TypeAlias
from typing import Any, Callable, TypeAlias

from django.db import models

from .settings import AUTO_REDACT_FIELD_FUNCS
from .redacters import get_default_field_redacter

# (old_value, new_value) tuple
AnonymisationResult: TypeAlias = tuple[Any, Any]
Expand All @@ -29,12 +29,6 @@ def get_model_fields(model: type[models.Model]) -> list[models.Field]:
]


def auto_redact(field: type[models.Field]) -> Any:
if func := AUTO_REDACT_FIELD_FUNCS.get(field.__class__):
return func(field)
return None


class _ModelBase:
# Override with the model to be anonymised
model: type[models.Model]
Expand All @@ -45,19 +39,6 @@ def get_model_fields(self) -> list[models.Field]:
raise NotImplementedError("model must be set")
return get_model_fields(self.model)

def exclude_from_anonymisation(self, queryset: models.QuerySet) -> models.QuerySet:
"""
Override in subclasses to exclude any objects from anonymisation.
Canonical example is to exclude certain users from anonymisation
- in this case the UserAnonymiser would override this method to
exclude e.g. is_staff=True users.
Default is a noop.
"""
return queryset


class AnonymiserBase(_ModelBase):
"""Base class for anonymisation functions."""
Expand Down Expand Up @@ -111,15 +92,6 @@ def anonymise_object(self, obj: models.Model) -> None:
output[field.name] = self.anonymise_field(obj, field)
self.post_anonymise_object(obj, **output)

def anonymise_queryset(self, queryset: Iterator[models.Model]) -> int:
"""Anonymise all objects in the queryset (and SAVE)."""
count = 0
for obj in self.exclude_from_anonymisation(queryset):
self.anonymise_object(obj)
obj.save()
count += 1
return count

def post_anonymise_object(
self, obj: models.Model, **updates: AnonymisationResult
) -> None:
Expand Down Expand Up @@ -150,70 +122,80 @@ class FieldRedactionStrategy(StrEnum):
CUSTOM = "CUSTOM"
NONE = ""

def is_field_redaction_auto(self, field: models.Field) -> bool:
def is_field_redactable(self, field: models.Field) -> bool:
"""
Return True if the field should be auto-redacted.
Return True if the field can be redacted.
Return False if the class-level auto_redact attr is False.
Currently this includes text fields that are not choices, primary
keys, unique fields, or in the auto_redact_exclude list.
By default primary keys, relations, and choice fields cannot be
redacted. Override this method to change this behaviour.
"""
if not self.auto_redact:
return False
if field.name in self.auto_redact_exclude:
return False
if field.is_relation:
return False
if getattr(field, "primary_key", False):
return False
if getattr(field, "choices", []):
if getattr(field, "choices", None):
return False
if isinstance(field, models.UUIDField):
return self.auto_redact
return isinstance(field, tuple(AUTO_REDACT_FIELD_FUNCS.keys())) and not getattr(
field, "unique", False
)

def is_field_redaction_custom(self, field: models.Field) -> bool:
"""Return True if the field has custom redaction."""
field.choices
return field.name in self.custom_field_redactions

def is_field_redacted(self, field: models.Field) -> bool:
"""Return True if the field is redacted."""
return self.is_field_redaction_auto(field) or self.is_field_redaction_custom(
field
)

def auto_field_redactions(self) -> dict[str, object | None]:
"""
Return a dict of redaction_values for all text fields.
This is used to "auto-redact" all char/text fields with "X" - if
the field does not use choices, and is not a primary key or
unique field.
if getattr(field, "unique", None):
return False
return True

"""
return {
f.name: auto_redact(f)
for f in self.get_model_fields()
if self.is_field_redaction_auto(f)
}
def get_redactable_fields(self) -> list[models.Field]:
"""Return a list of fields on the model that are redactable."""
return [f for f in self.get_model_fields() if self.is_field_redactable(f)]

def field_redaction_strategy(self, field: models.Field) -> FieldRedactionStrategy:
"""Return the FieldRedaction value for a field."""
if self.is_field_redaction_custom(field):
if field.name in self.custom_field_redactions:
return self.FieldRedactionStrategy.CUSTOM
if self.is_field_redaction_auto(field):
if self.get_field_auto_redacter(field):
return self.FieldRedactionStrategy.AUTO
return self.FieldRedactionStrategy.NONE

def get_field_auto_redacter(
self, field: models.Field
) -> Callable[[models.Field], Any] | None:
"""
Return the auto redacter function for a field.
Override this to provide global auto-redaction functions for
your models.
"""
if not self.auto_redact:
return None
if field.name in self.auto_redact_exclude:
return None
# will return None if the field isn't already handled by the
# default redacters.
return get_default_field_redacter(field)

def get_auto_redaction_values(self) -> dict[str, Any]:
"""Return field:value dict for all auto-redactable fields."""
# because None is a valid redaction value, we need to do this in
# two passes - first get the redacter function, which _can_ be None,
# then filter out the None values and call the redacter function
# on the field.
auto_redactors = {
f: self.get_field_auto_redacter(f) for f in self.get_redactable_fields()
}
return {f.name: func(f) for f, func in auto_redactors.items() if func}

def get_field_redaction_values(self) -> dict[str, Any]:
"""
Return the redaction values for all field, custom or auto.
This is a cascading lookup - start with all the auto-redaction
values, then overwrite with the custom values.
"""
vals = self.get_auto_redaction_values()
vals.update(self.custom_field_redactions)
return vals

def redact_queryset(
self,
queryset: models.QuerySet[models.Model],
auto_redact_override: bool | None = None,
**field_overrides: Any,
) -> int:
"""
Expand All @@ -233,13 +215,7 @@ def redact_queryset(
- field_overrides (values passed in to method)
"""
redactions: dict[str, Any] = {}
auto = (
self.auto_redact if auto_redact_override is None else auto_redact_override
)
if auto:
redactions.update(self.auto_field_redactions())
redactions.update(self.custom_field_redactions)
redactions = self.get_field_redaction_values()
redactions.update(field_overrides)
return queryset.update(**redactions)

Expand Down Expand Up @@ -305,12 +281,6 @@ def is_anonymised(self) -> bool:
return self.anonymiser.is_field_anonymised(self.field)
return False

@property
def is_redacted(self) -> bool:
if self.anonymiser:
return self.anonymiser.is_field_redacted(self.field)
return False

@property
def redaction_strategy(self) -> ModelAnonymiser.FieldRedactionStrategy:
if self.anonymiser:
Expand Down
51 changes: 51 additions & 0 deletions anonymiser/redacters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from __future__ import annotations

from typing import Any, Callable

from django.db import models
from django.utils import timezone

from anonymiser.db.functions import GenerateUuid4


def default_redact_charfield(field: models.CharField) -> str:
return "X" * field.max_length


def default_redact_textfield(field: models.TextField) -> str:
return "X" * 400


def default_redact_datefield(field: models.DateField) -> str:
return timezone.now().date().isoformat()


def default_redact_datetimefield(field: models.DateTimeField) -> str:
return timezone.now().isoformat()


def default_redact_jsonfield(field: models.JSONField) -> dict[str, Any]:
return {}


def default_redact_uuidfield(field: models.UUIDField) -> str:
return GenerateUuid4()


def get_default_field_redacter(
field: models.Field,
) -> Callable[[models.Field], Any] | None:
"""Return default redacter for basic Django field types."""
if isinstance(field, models.CharField):
return default_redact_charfield
if isinstance(field, models.TextField):
return default_redact_textfield
if isinstance(field, models.DateField):
return default_redact_datefield
if isinstance(field, models.DateTimeField):
return default_redact_datetimefield
if isinstance(field, models.JSONField):
return default_redact_jsonfield
if isinstance(field, models.UUIDField):
return default_redact_uuidfield
return None
11 changes: 11 additions & 0 deletions tests/anonymisers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from typing import Any, Callable

from django.db import models
from django.db.models import F, Value
from django.db.models.functions import Concat

Expand Down Expand Up @@ -36,3 +39,11 @@ class UserRedacter(RedacterBase):
"last_name": "LAST_NAME",
"email": Concat(Value("user_"), F("id"), Value("@example.com")),
}

def get_field_auto_redacter(
self, field: models.Field
) -> Callable[[models.Field], Any] | None:
# Totally contrived example used for testing purposes only
if isinstance(field, models.JSONField):
return lambda f: {"foo": "bar"}
return super().get_field_auto_redacter(field)
17 changes: 17 additions & 0 deletions tests/migrations/0004_user_extra_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Generated by Django 4.2.5 on 2023-10-02 15:40

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("tests", "0003_user_biography_user_date_of_birth_user_location"),
]

operations = [
migrations.AddField(
model_name="user",
name="extra_info",
field=models.JSONField(default=dict),
),
]
1 change: 1 addition & 0 deletions tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ class User(AbstractUser):
location = models.CharField(max_length=255, blank=True)
biography = models.TextField(blank=True)
date_of_birth = models.DateField(blank=True, null=True)
extra_info = models.JSONField(default=dict)
31 changes: 2 additions & 29 deletions tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,6 @@ def test_get_anonymisable_fields(self, user_anonymiser: UserAnonymiser) -> None:
User._meta.get_field("first_name")
]

def test_anonymise_queryset(
self, user: User, user_anonymiser: UserAnonymiser
) -> None:
assert user_anonymiser.anonymise_queryset(User.objects.none()) == 0
assert user_anonymiser.anonymise_queryset(User.objects.all()) == 1


def test_bad_anonymiser() -> None:
with pytest.raises(AttributeError):
Expand All @@ -102,6 +96,7 @@ def test_redact_queryset_one(self, user: User, user_redacter: UserRedacter) -> N
assert user.first_name == "FIRST_NAME"
assert user.last_name == "LAST_NAME"
assert user.email == f"user_{user.id}@example.com"
assert user.extra_info == {"foo": "bar"}

def test_redact_queryset_two(
self,
Expand All @@ -115,27 +110,6 @@ def test_redact_queryset_two(
# confirm that we haven't reused the same uuid for all objects
assert user.uuid != user2.uuid

@pytest.mark.parametrize(
"override,location,biography",
[
(True, 255 * "X", 400 * "X"),
(False, "London", "I am a test user"),
],
)
def test_redact_queryset__auto_redact_with_override(
self,
user: User,
user_redacter: UserRedacter,
override: bool,
location: str,
biography: str,
) -> None:
user_redacter.redact_queryset(User.objects.all(), auto_redact_override=override)
user.refresh_from_db()
# auto-redacted fields
assert user.location == location
assert user.biography == biography

def test_redact_queryset__field_overrides(
self,
user: User,
Expand Down Expand Up @@ -177,7 +151,7 @@ def test_auto_redact(
models.IntegerField(name="integer_field"),
models.DateField(name="date_field"),
]
assert user_redacter.auto_field_redactions() == {
assert user_redacter.get_auto_redaction_values() == {
"char_field": 255 * "X",
"text_field": 400 * "X",
"date_field": "2021-01-01",
Expand All @@ -194,5 +168,4 @@ def test_model_fields_data() -> None:
assert mfs.field_type == "CharField"
assert isinstance(mfs.anonymiser, UserAnonymiser)
assert mfs.is_anonymised is True
assert mfs.is_redacted is True
assert mfs.redaction_strategy == UserAnonymiser.FieldRedactionStrategy.CUSTOM

0 comments on commit a45225f

Please sign in to comment.