Skip to content

Commit

Permalink
feat: add middleware for checking allowed data fields
Browse files Browse the repository at this point in the history
In profile and myProfile query there should be checked the service for
allowed fields. This adds middleware and mixin class for nodes for
checking the queried field exists in service's  allowed data fields

Refs HP-2319
  • Loading branch information
nicobav committed May 10, 2024
1 parent d499cfd commit 6498814
Show file tree
Hide file tree
Showing 9 changed files with 495 additions and 56 deletions.
30 changes: 30 additions & 0 deletions open_city_profile/graphene.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
from django.conf import settings
from django.forms import MultipleChoiceField
from django_filters import MultipleChoiceFilter
from graphene.utils.str_converters import to_snake_case
from graphene_django import DjangoObjectType
from graphene_django.forms.converter import convert_form_field
from graphene_django.types import ALL_FIELDS
from graphql_sync_dataloaders import SyncDataLoader
from parler.models import TranslatableModel

from open_city_profile.exceptions import FieldNotAllowedError, ServiceNotIdentifiedError
from profiles.loaders import (
addresses_by_profile_id_loader,
emails_by_profile_id_loader,
Expand Down Expand Up @@ -178,3 +180,31 @@ def __init_subclass_with_meta__(
_meta=_meta,
**options,
)


class AllowedDataFieldsMiddleware:

def _is_profile_query(self, info):
return any(
selection.name.value in ("myProfile", "profile", "profiles")
for selection in info.operation.selection_set.selections
)

def resolve(self, next, root, info, **kwargs):
try:
is_profile_query = self._is_profile_query(info)
node = info.parent_type.graphene_type
except AttributeError:
is_profile_query = False
node = None

if is_profile_query and getattr(node, "check_allowed_data_fields", False):
field_name = to_snake_case(getattr(info, "field_name", ""))

if not getattr(info.context, "service", False):
raise ServiceNotIdentifiedError("Service not identified")

if not node.is_field_allowed_for_service(field_name, info.context.service):
raise FieldNotAllowedError("Field is not allowed for service.")

return next(root, info, **kwargs)
1 change: 1 addition & 0 deletions open_city_profile/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@
"SCHEMA": "open_city_profile.schema.schema",
"MIDDLEWARE": [
# NOTE: Graphene runs its middlewares in reverse order!
"open_city_profile.graphene.AllowedDataFieldsMiddleware",
"open_city_profile.graphene.JWTMiddleware",
"open_city_profile.graphene.GQLDataLoaders",
],
Expand Down
20 changes: 19 additions & 1 deletion open_city_profile/tests/graphql_test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@
from django.utils.crypto import get_random_string
from jose import jwt

from services.tests.factories import ServiceClientIdFactory
from services.tests.factories import (
AllowedDataFieldFactory,
ServiceClientIdFactory,
ServiceConnectionFactory,
)

from .conftest import get_unix_timestamp_now
from .keys import rsa_key

AUDIENCE = getattr(settings, "OIDC_API_TOKEN_AUTH")["AUDIENCE"]
ISSUER = getattr(settings, "OIDC_API_TOKEN_AUTH")["ISSUER"]
if isinstance(ISSUER, list):
ISSUER = ISSUER[0]

CONFIG_URL = f"{ISSUER}/.well-known/openid-configuration"
JWKS_URL = f"{ISSUER}/jwks"
Expand Down Expand Up @@ -129,6 +135,18 @@ def do_graphql_call_as_user(
service_client_id = ServiceClientIdFactory(
service__service_type=None, service__is_profile_service=True
)
if getattr(user, "profile", None):
ServiceConnectionFactory(
profile=user.profile, service=service_client_id.service
)
service_client_id.service.allowed_data_fields.add(
AllowedDataFieldFactory(field_name="name"),
AllowedDataFieldFactory(field_name="address"),
AllowedDataFieldFactory(field_name="email"),
AllowedDataFieldFactory(field_name="phone"),
AllowedDataFieldFactory(field_name="personalidentitycode"),
)

elif service:
service_client_id = service.client_ids.first()

Expand Down
72 changes: 70 additions & 2 deletions profiles/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
ProfileMustHavePrimaryEmailError,
ServiceConnectionDoesNotExist,
ServiceDoesNotExist,
ServiceNotIdentifiedError,
TokenExpiredError,
)
from open_city_profile.graphene import UUIDMultipleChoiceFilter
Expand Down Expand Up @@ -370,6 +371,46 @@ def filter_by_nin_exact(self, queryset, name, value):
return queryset.none()


class AllowedDataFieldsMixin:
"""
Mixin class for checking allowed data fields per service.
`allowed_data_fields_map` is a dictionary where the key is the `field_name` of the allowed data field
`allowed_data_fields.json` and the value is an iterable of django model's field names that the `field_name`
describes. For example, if the `field_name` is `name`, the value could be `("first_name", "last_name")`.
e.g:
allowed_data_fields_map = {
"name": ("first_name", "last_name", "nickname"),
"personalidentitycode": ("national_identification_number",),
"address": ("address", "postal_code", "city", "country_code")
}
`always_allow_fields`: Since connections are not defined in `allowed_data_fields.json` they should be
defined here. If the field is connection and the node does not inherit this mixin the data will be available
to all services.
"""

allowed_data_fields_map = {}
always_allow_fields = ["id", "service_connections"]
check_allowed_data_fields = True

@classmethod
def is_field_allowed_for_service(cls, field_name: str, service: Service):
if not service:
raise ServiceNotIdentifiedError("No service identified")

if field_name in cls.always_allow_fields:
return True

allowed_data_fields = service.allowed_data_fields.values_list(
"field_name", flat=True
)
return any(
field_name in cls.allowed_data_fields_map.get(allowed_data_field, [])
for allowed_data_field in allowed_data_fields
)


class ContactNode(DjangoObjectType):
class Meta:
model = Contact
Expand Down Expand Up @@ -447,7 +488,7 @@ class Meta:
fields = ("street_address", "additional_address", "country_code")


class VerifiedPersonalInformationNode(DjangoObjectType):
class VerifiedPersonalInformationNode(DjangoObjectType, AllowedDataFieldsMixin):
class Meta:
model = VerifiedPersonalInformation
fields = (
Expand All @@ -459,6 +500,18 @@ class Meta:
"municipality_of_residence_number",
)

allowed_data_fields_map = {
"name": ("first_name", "last_name", "given_name"),
"personalidentitycode": ("national_identification_number",),
"address": (
"municipality_of_residence",
"municipality_of_residence_number",
"permanent_address",
"temporary_address",
"permanent_foreign_address",
),
}

# Need to set the national_identification_number field explicitly as non-null
# because django-searchable-encrypted-fields SearchFields are always nullable
# and you can't change it.
Expand Down Expand Up @@ -567,14 +620,29 @@ def resolve_addresses(self: Profile, info, **kwargs):


@key(fields="id")
class ProfileNode(RestrictedProfileNode):
class ProfileNode(RestrictedProfileNode, AllowedDataFieldsMixin):
class Meta:
model = Profile
fields = ("first_name", "last_name", "nickname", "language")
interfaces = (relay.Node,)
connection_class = ProfilesConnection
filterset_class = ProfileFilter

allowed_data_fields_map = {
"name": (
"first_name",
"last_name",
"nickname",
),
"email": ("emails", "primary_email"),
"phone": ("phones", "primary_phone"),
"address": ("addresses", "primary_address"),
"personalidentitycode": ("sensitivedata",),
}
always_allow_fields = AllowedDataFieldsMixin.always_allow_fields + [
"verified_personal_information"
]

sensitivedata = graphene.Field(
SensitiveDataNode,
description="Data that is consider to be sensitive e.g. social security number",
Expand Down
Loading

0 comments on commit 6498814

Please sign in to comment.