Skip to content

Commit

Permalink
feat: add middleware for checking allowed data fields
Browse files Browse the repository at this point in the history
In myProfile query there should be checked the service for allowed
data fields. This adds middleware and mixin class for nodes for checking
the queried field exists in service's  allowed data fields

Refs HP-2319
  • Loading branch information
nicobav committed Apr 29, 2024
1 parent 2d78114 commit 9f33558
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 53 deletions.
1 change: 1 addition & 0 deletions open_city_profile/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@
"SCHEMA": "open_city_profile.schema.schema",
"MIDDLEWARE": [
# NOTE: Graphene runs its middlewares in reverse order!
"open_city_profile.graphene.AllowedDataFieldsMiddleware",
"open_city_profile.graphene.JWTMiddleware",
"open_city_profile.graphene.GQLDataLoaders",
],
Expand Down
2 changes: 2 additions & 0 deletions open_city_profile/tests/graphql_test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

AUDIENCE = getattr(settings, "OIDC_API_TOKEN_AUTH")["AUDIENCE"]
ISSUER = getattr(settings, "OIDC_API_TOKEN_AUTH")["ISSUER"]
if isinstance(ISSUER, list):
ISSUER = ISSUER[0]

CONFIG_URL = f"{ISSUER}/.well-known/openid-configuration"
JWKS_URL = f"{ISSUER}/jwks"
Expand Down
72 changes: 70 additions & 2 deletions profiles/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
ProfileMustHavePrimaryEmailError,
ServiceConnectionDoesNotExist,
ServiceDoesNotExist,
ServiceNotIdentifiedError,
TokenExpiredError,
)
from open_city_profile.graphene import UUIDMultipleChoiceFilter
Expand Down Expand Up @@ -370,6 +371,46 @@ def filter_by_nin_exact(self, queryset, name, value):
return queryset.none()


class AllowedDataFieldsMixin:
"""
Mixin class for checking allowed data fields per service.
`allowed_data_fields_map` is a dictionary where the key is the `field_name` of the allowed data field
`allowed_data_fields.json` and the value is an iterable of django model's field names that the `field_name`
describes. For example, if the `field_name` is `name`, the value could be `("first_name", "last_name")`.
e.g:
allowed_data_fields_map = {
"name": ("first_name", "last_name", "nickname"),
"personalidentitycode": ("national_identification_number",),
"address": ("address", "postal_code", "city", "country_code")
}
`always_allow_fields`: Since connections are not defined in `allowed_data_fields.json` they should be
defined here. If the field is connection and the node does not inherit this mixin the data will be available
to all services.
"""

allowed_data_fields_map = {}
always_allow_fields = ["id", "service_connections"]
check_allowed_data_fields = True

@classmethod
def is_field_allowed_for_service(cls, field_name: str, service: Service):
if not service:
raise ServiceNotIdentifiedError("No service identified")

if field_name in cls.always_allow_fields:
return True

allowed_data_fields = service.allowed_data_fields.values_list(
"field_name", flat=True
)
return any(
field_name in cls.allowed_data_fields_map.get(allowed_data_field, [])
for allowed_data_field in allowed_data_fields
)


class ContactNode(DjangoObjectType):
class Meta:
model = Contact
Expand Down Expand Up @@ -447,7 +488,7 @@ class Meta:
fields = ("street_address", "additional_address", "country_code")


class VerifiedPersonalInformationNode(DjangoObjectType):
class VerifiedPersonalInformationNode(DjangoObjectType, AllowedDataFieldsMixin):
class Meta:
model = VerifiedPersonalInformation
fields = (
Expand All @@ -459,6 +500,18 @@ class Meta:
"municipality_of_residence_number",
)

allowed_data_fields_map = {
"name": ("first_name", "last_name", "given_name"),
"personalidentitycode": ("national_identification_number",),
"address": (
"municipality_of_residence",
"municipality_of_residence_number",
"permanent_address",
"temporary_address",
"permanent_foreign_address",
),
}

# Need to set the national_identification_number field explicitly as non-null
# because django-searchable-encrypted-fields SearchFields are always nullable
# and you can't change it.
Expand Down Expand Up @@ -567,14 +620,29 @@ def resolve_addresses(self: Profile, info, **kwargs):


@key(fields="id")
class ProfileNode(RestrictedProfileNode):
class ProfileNode(RestrictedProfileNode, AllowedDataFieldsMixin):
class Meta:
model = Profile
fields = ("first_name", "last_name", "nickname", "language")
interfaces = (relay.Node,)
connection_class = ProfilesConnection
filterset_class = ProfileFilter

allowed_data_fields_map = {
"name": (
"first_name",
"last_name",
"nickname",
),
"email": ("emails", "primary_email"),
"phone": ("phones", "primary_phone"),
"address": ("addresses", "primary_address"),
"personalidentitycode": ("sensitivedata",),
}
always_allow_fields = AllowedDataFieldsMixin.always_allow_fields + [
"verified_personal_information"
]

sensitivedata = graphene.Field(
SensitiveDataNode,
description="Data that is consider to be sensitive e.g. social security number",
Expand Down
137 changes: 92 additions & 45 deletions profiles/tests/test_gql_my_profile_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

from open_city_profile.tests import to_graphql_name
from open_city_profile.tests.asserts import assert_match_error_code
from services.tests.factories import ServiceConnectionFactory
from services.tests.factories import (
AllowedDataFieldFactory,
ServiceConnectionFactory,
ServiceFactory,
)

from .conftest import VERIFIED_PERSONAL_INFORMATION_ADDRESS_TYPES
from .factories import (
Expand All @@ -16,7 +20,7 @@
)


def test_normal_user_can_query_emails(user_gql_client):
def test_normal_user_can_query_emails(user_gql_client, service):
profile = ProfileWithPrimaryEmailFactory(user=user_gql_client.user)
email = profile.emails.first()

Expand Down Expand Up @@ -50,11 +54,14 @@ def test_normal_user_can_query_emails(user_gql_client):
}
}
}
executed = user_gql_client.execute(query)
service.allowed_data_fields.add(AllowedDataFieldFactory(field_name="email"))
ServiceConnectionFactory(profile=profile, service=service)

executed = user_gql_client.execute(query, service=service)
assert dict(executed["data"]) == expected_data


def test_normal_user_can_query_phones(user_gql_client):
def test_normal_user_can_query_phones(user_gql_client, service):
profile = ProfileWithPrimaryEmailFactory(user=user_gql_client.user)
phone = PhoneFactory(profile=profile)

Expand Down Expand Up @@ -88,11 +95,14 @@ def test_normal_user_can_query_phones(user_gql_client):
}
}
}
executed = user_gql_client.execute(query)
service.allowed_data_fields.add(AllowedDataFieldFactory(field_name="phone"))
ServiceConnectionFactory(profile=profile, service=service)

executed = user_gql_client.execute(query, service=service)
assert dict(executed["data"]) == expected_data


def test_normal_user_can_query_addresses(user_gql_client):
def test_normal_user_can_query_addresses(user_gql_client, service):
profile = ProfileWithPrimaryEmailFactory(user=user_gql_client.user)
address = AddressFactory(profile=profile)

Expand Down Expand Up @@ -126,12 +136,15 @@ def test_normal_user_can_query_addresses(user_gql_client):
}
}
}
executed = user_gql_client.execute(query)
service.allowed_data_fields.add(AllowedDataFieldFactory(field_name="address"))
ServiceConnectionFactory(profile=profile, service=service)

executed = user_gql_client.execute(query, service=service)
assert dict(executed["data"]) == expected_data


def test_normal_user_can_query_primary_contact_details(
user_gql_client, execution_context_class
user_gql_client, execution_context_class, service
):
profile = ProfileFactory(user=user_gql_client.user)
phone = PhoneFactory(profile=profile, primary=True)
Expand Down Expand Up @@ -181,8 +194,15 @@ def test_normal_user_can_query_primary_contact_details(
},
}
}
service.allowed_data_fields.add(
AllowedDataFieldFactory(field_name="phone"),
AllowedDataFieldFactory(field_name="email"),
AllowedDataFieldFactory(field_name="address"),
)
ServiceConnectionFactory(profile=profile, service=service)

executed = user_gql_client.execute(
query, execution_context_class=execution_context_class
query, execution_context_class=execution_context_class, service=service
)
assert dict(executed["data"]) == expected_data

Expand Down Expand Up @@ -218,31 +238,56 @@ class TestProfileWithVerifiedPersonalInformation:
}
"""

@staticmethod
def _execute_query(gql_client, loa="substantial"):
@pytest.fixture(autouse=True)
def setup_data(self, db, user_gql_client):
self.user = user_gql_client.user
self.client = user_gql_client
self.service = ServiceFactory(is_profile_service=True)
self.profile = ProfileFactory(user=user_gql_client.user)
ServiceConnectionFactory(profile=self.profile, service=self.service)
self._add_allowed_data_fields_to_service(self.service)

def _create_allowed_data_fields(self):
self.allowed_name = AllowedDataFieldFactory(field_name="name")
self.allowed_address = AllowedDataFieldFactory(field_name="address")
self.allowed_personal_identity_code = AllowedDataFieldFactory(
field_name="personalidentitycode"
)

def _add_allowed_data_fields_to_service(self, service):
if not getattr(self, "allowed_name", None):
self._create_allowed_data_fields()

service.allowed_data_fields.add(
self.allowed_name,
self.allowed_address,
self.allowed_personal_identity_code,
)

def _execute_query(self, loa="substantial", service=None):
token_payload = {
"loa": loa,
}

return gql_client.execute(
kwargs = {"service": self.service}
if service:
kwargs["service"] = service

return self.client.execute(
TestProfileWithVerifiedPersonalInformation.QUERY,
auth_token_payload=token_payload,
**kwargs,
)

def test_when_verified_personal_infomation_does_not_exist_returns_null(
self, user_gql_client
):
ProfileFactory(user=user_gql_client.user)

executed = self._execute_query(user_gql_client)
def test_when_verified_personal_information_does_not_exist_returns_null(self):
executed = self._execute_query()

assert "errors" not in executed
assert executed["data"]["myProfile"]["verifiedPersonalInformation"] is None

def test_normal_user_can_query_verified_personal_information(self, user_gql_client):
profile = ProfileFactory(user=user_gql_client.user)
def test_normal_user_can_query_verified_personal_information(self):
verified_personal_information = VerifiedPersonalInformationFactory(
profile=profile
profile=self.profile
)

permanent_address = verified_personal_information.permanent_address
Expand Down Expand Up @@ -279,20 +324,17 @@ def test_normal_user_can_query_verified_personal_information(self, user_gql_clie
}
}

executed = self._execute_query(user_gql_client)
executed = self._execute_query()

assert executed["data"] == expected_data

@pytest.mark.parametrize(
"address_type", VERIFIED_PERSONAL_INFORMATION_ADDRESS_TYPES
)
def test_when_address_does_not_exist_returns_null(
self, address_type, user_gql_client
):
profile = ProfileFactory(user=user_gql_client.user)
VerifiedPersonalInformationFactory(profile=profile, **{address_type: None})
def test_when_address_does_not_exist_returns_null(self, address_type):
VerifiedPersonalInformationFactory(profile=self.profile, **{address_type: None})

executed = self._execute_query(user_gql_client)
executed = self._execute_query()

assert "errors" not in executed

Expand All @@ -305,38 +347,35 @@ def test_when_address_does_not_exist_returns_null(
assert isinstance(received_address, dict)

@pytest.mark.parametrize("loa", ["substantial", "high"])
def test_high_enough_level_of_assurance_gains_access(self, loa, user_gql_client):
profile = ProfileFactory(user=user_gql_client.user)
VerifiedPersonalInformationFactory(profile=profile)
def test_high_enough_level_of_assurance_gains_access(self, loa):
VerifiedPersonalInformationFactory(profile=self.profile)

executed = self._execute_query(user_gql_client, loa)
executed = self._execute_query(loa)

assert not hasattr(executed, "errors")
assert isinstance(
executed["data"]["myProfile"]["verifiedPersonalInformation"], dict
)

@pytest.mark.parametrize("loa", [None, "low", "unknown"])
def test_too_low_level_of_assurance_denies_access(self, loa, user_gql_client):
profile = ProfileFactory(user=user_gql_client.user)
VerifiedPersonalInformationFactory(profile=profile)
def test_too_low_level_of_assurance_denies_access(self, loa):
VerifiedPersonalInformationFactory(profile=self.profile)

executed = self._execute_query(user_gql_client, loa)
executed = self._execute_query(loa)

assert_match_error_code(executed, "PERMISSION_DENIED_ERROR")

assert executed["data"]["myProfile"]["verifiedPersonalInformation"] is None

@pytest.mark.parametrize("with_serviceconnection", (True, False))
def test_service_connection_required(
self, user_gql_client, service, with_serviceconnection
):
profile = ProfileFactory(user=user_gql_client.user)
VerifiedPersonalInformationFactory(profile=profile)
def test_service_connection_required(self, with_serviceconnection):
service = ServiceFactory()
self._add_allowed_data_fields_to_service(service)
VerifiedPersonalInformationFactory(profile=self.profile)
if with_serviceconnection:
ServiceConnectionFactory(profile=profile, service=service)
ServiceConnectionFactory(profile=self.profile, service=service)

executed = user_gql_client.execute(
executed = self.client.execute(
TestProfileWithVerifiedPersonalInformation.QUERY,
auth_token_payload={"loa": "substantial"},
service=service,
Expand Down Expand Up @@ -372,6 +411,7 @@ def test_querying_non_existent_profile_doesnt_return_errors(user_gql_client, ser
def test_normal_user_can_query_their_own_profile(
user_gql_client, service, with_service, with_serviceconnection
):
service.allowed_data_fields.add(AllowedDataFieldFactory(field_name="name"))
profile = ProfileFactory(user=user_gql_client.user)
if with_serviceconnection:
ServiceConnectionFactory(profile=profile, service=service)
Expand Down Expand Up @@ -399,8 +439,15 @@ def test_normal_user_can_query_their_own_profile(
assert executed["data"]["myProfile"] is None


def test_normal_user_can_query_their_own_profiles_sensitivedata(user_gql_client):
def test_normal_user_can_query_their_own_profiles_sensitivedata(
user_gql_client, service
):
service.allowed_data_fields.add(
AllowedDataFieldFactory(field_name="name"),
AllowedDataFieldFactory(field_name="personalidentitycode"),
)
profile = ProfileFactory(user=user_gql_client.user)
ServiceConnectionFactory(profile=profile, service=service)
sensitive_data = SensitiveDataFactory(profile=profile)

query = """
Expand All @@ -419,5 +466,5 @@ def test_normal_user_can_query_their_own_profiles_sensitivedata(user_gql_client)
"sensitivedata": {"ssn": sensitive_data.ssn},
}
}
executed = user_gql_client.execute(query)
executed = user_gql_client.execute(query, service=service)
assert dict(executed["data"]) == expected_data
Loading

0 comments on commit 9f33558

Please sign in to comment.