diff --git a/open_city_profile/decorators.py b/open_city_profile/decorators.py index 5d317ed6..8e08543d 100644 --- a/open_city_profile/decorators.py +++ b/open_city_profile/decorators.py @@ -8,6 +8,20 @@ def _use_context_tests(*test_funcs): + """ + Decorator for running context tests before the decorated function. + + E.g. to create a decorator that checks that the user is authenticated:: + + def _require_authenticated(context): + # Check that the user is authenticated + ... + + @_use_context_tests(_require_authenticated) + def login_required(): + pass + """ + def decorator(decorator_function): @wraps(decorator_function) def wrapper(function): @@ -61,12 +75,12 @@ def permission_checker(context): @_use_context_tests(_require_authenticated) -def login_required(): +def login_required(*_, **__): """Decorator for checking that the user is logged in""" @_use_context_tests(_require_authenticated, _require_service) -def login_and_service_required(): +def login_and_service_required(*_, **__): """Decorator for checking that the user is logged in and service is known""" diff --git a/open_city_profile/tests/snapshots/snap_test_graphql_api_schema.py b/open_city_profile/tests/snapshots/snap_test_graphql_api_schema.py index 99c8402b..bb74fe34 100644 --- a/open_city_profile/tests/snapshots/snap_test_graphql_api_schema.py +++ b/open_city_profile/tests/snapshots/snap_test_graphql_api_schema.py @@ -122,6 +122,7 @@ phones(offset: Int, before: String, after: String, first: Int, last: Int): PhoneNodeConnection addresses(offset: Int, before: String, after: String, first: Int, last: Int): AddressNodeConnection contactMethod: ContactMethod + loginMethods: [LoginMethodType] sensitivedata: SensitiveDataNode serviceConnections(offset: Int, before: String, after: String, first: Int, last: Int): ServiceConnectionTypeConnection verifiedPersonalInformation: VerifiedPersonalInformationNode @@ -215,6 +216,12 @@ SMS } +enum LoginMethodType { + PASSWORD + OTP + SUOMI_FI +} + type SensitiveDataNode implements Node { id: ID! ssn: String! diff --git a/profiles/enums.py b/profiles/enums.py index 5de3faa6..e5f44d96 100644 --- a/profiles/enums.py +++ b/profiles/enums.py @@ -41,3 +41,14 @@ class Labels: WORK = _("Work address") HOME = _("Home address") OTHER = _("Other address") + + +class LoginMethodType(Enum): + PASSWORD = "password" + OTP = "otp" + SUOMI_FI = "suomi_fi" + + class Labels: + PASSWORD = _("Password") + OTP = _("One-time password") + SUOMI_FI = _("Suomi.fi") diff --git a/profiles/keycloak_integration.py b/profiles/keycloak_integration.py index c4876ed2..24e9d2a2 100644 --- a/profiles/keycloak_integration.py +++ b/profiles/keycloak_integration.py @@ -8,7 +8,7 @@ ) from utils import keycloak -_keycloak_admin_client = None +_keycloak_admin_client: keycloak.KeycloakAdminClient | None = None def _setup_keycloak_client(): @@ -103,3 +103,29 @@ def send_profile_changes_to_keycloak(instance): _keycloak_admin_client.send_verify_email(user_id) except Exception: pass + + +def get_user_identity_providers(user_id) -> set[str]: + if not _keycloak_admin_client: + return set() + + try: + user_data = _keycloak_admin_client.get_user_federated_identities(user_id) + return {ip["identityProvider"] for ip in user_data} + except keycloak.UserNotFoundError: + return set() + + +def get_user_credential_types(user_id) -> set[str]: + if not _keycloak_admin_client: + return set() + + try: + user_data = _keycloak_admin_client.get_user_credentials(user_id) + return {cred["type"] for cred in user_data} + except keycloak.UserNotFoundError: + return set() + + +def get_user_login_methods(user_id) -> set[str]: + return get_user_identity_providers(user_id) | get_user_credential_types(user_id) diff --git a/profiles/models.py b/profiles/models.py index ba0f55ac..32a67204 100644 --- a/profiles/models.py +++ b/profiles/models.py @@ -119,6 +119,7 @@ class Meta: "verified_personal_information", "language", "contact_method", + "login_methods", ] def resolve_profile(self): diff --git a/profiles/schema.py b/profiles/schema.py index d85dc49f..52e6167f 100644 --- a/profiles/schema.py +++ b/profiles/schema.py @@ -1,3 +1,4 @@ +import logging from itertools import chain import django.dispatch @@ -55,8 +56,8 @@ delete_connected_service_data, download_connected_service_data, ) -from .enums import AddressType, EmailType, PhoneType -from .keycloak_integration import delete_profile_from_keycloak +from .enums import AddressType, EmailType, LoginMethodType, PhoneType +from .keycloak_integration import delete_profile_from_keycloak, get_user_login_methods from .models import ( Address, ClaimToken, @@ -72,10 +73,14 @@ VerifiedPersonalInformationTemporaryAddress, ) from .utils import ( + enum_values, + force_list, requester_can_view_verified_personal_information, requester_has_sufficient_loa_to_perform_gdpr_request, ) +logger = logging.getLogger(__name__) + User = get_user_model() AllowedEmailType = graphene.Enum.from_enum( @@ -514,6 +519,10 @@ def validate_ssn(value, info, **input): class RestrictedProfileNode(DjangoObjectType): + """ + Profile node with a restricted set of data. This does not contain any sensitive data. + """ + class Meta: model = Profile fields = ("first_name", "last_name", "nickname", "language") @@ -575,6 +584,13 @@ class Meta: connection_class = ProfilesConnection filterset_class = ProfileFilter + login_methods = graphene.List( + graphene.Enum.from_enum( + LoginMethodType, description=lambda e: e.label if e else "" + ), + description="List of login methods that the profile has used to authenticate. " + "Only visible to the user themselves.", + ) sensitivedata = graphene.Field( SensitiveDataNode, description="Data that is consider to be sensitive e.g. social security number", @@ -589,6 +605,36 @@ class Meta: "privileges to access this information.", ) + def resolve_login_methods(self: Profile, info, **kwargs): + if info.context.user != self.user: + raise PermissionDenied( + "No permission to read login methods of another user." + ) + + amr = set(force_list(info.context.user_auth.data.get("amr"))) + + # For future software archeologists: + # This field was added to the API to support the front-end's need to know + # which login methods the user has used. It's only needed for profiles + # with helsinki-tunnus or Suomi.fi, so for other cases, save a couple + # API calls and return an empty list. There's no other reasoning for the + # logic here. + # Can remove this after Tunnistamo is no longer in use. Related ticket: HP-2495 + if amr.intersection({"helsinki_tunnus", "heltunnistussuomifi", "suomi_fi"}): + login_methods = get_user_login_methods(self.user.uuid) + login_methods_in_enum = { + val for val in login_methods if val in enum_values(LoginMethodType) + } + if unknown_login_methods := login_methods - login_methods_in_enum: + logger.warning( + "Found login methods which are not part of the LoginMethodType enum: %s", + unknown_login_methods, + ) + + return login_methods_in_enum + + return [] + def resolve_service_connections(self: Profile, info, **kwargs): return self.effective_service_connections_qs() diff --git a/profiles/tests/test_gql_my_profile_query.py b/profiles/tests/test_gql_my_profile_query.py index cf1fcf50..3a6218f2 100644 --- a/profiles/tests/test_gql_my_profile_query.py +++ b/profiles/tests/test_gql_my_profile_query.py @@ -576,3 +576,91 @@ def test_my_profile_checks_allowed_data_fields_for_multiple_queries( assert executed["data"]["myProfile"]["lastName"] == profile.last_name assert executed["data"]["myProfile"]["sensitivedata"] is None assert executed["data"]["services"] is None + + +@pytest.mark.parametrize( + "amr_claim_value", ["suomi_fi", "helsinki_tunnus", "heltunnistussuomifi"] +) +def test_user_can_see_own_login_methods_with_correct_amr_claim( + user_gql_client, profile, group, service, monkeypatch, amr_claim_value +): + def mock_return(*_, **__): + return {"suomi_fi", "password"} + + monkeypatch.setattr( + "profiles.keycloak_integration.get_user_identity_providers", mock_return + ) + + profile = ProfileFactory(user=user_gql_client.user) + ServiceConnectionFactory(profile=profile, service=service) + + query = """ + { + myProfile { + loginMethods + } + } + """ + executed = user_gql_client.execute( + query, auth_token_payload={"amr": amr_claim_value}, service=service + ) + assert "errors" not in executed + assert set(executed["data"]["myProfile"]["loginMethods"]) == { + "SUOMI_FI", + "PASSWORD", + } + + +@pytest.mark.parametrize("amr_claim_value", [None, "helsinkiad"]) +def test_user_cannot_see_own_login_methods_with_other_amr_claims( + user_gql_client, profile, group, service, monkeypatch, amr_claim_value +): + def mock_return(*_, **__): + return {"this should not show up"} + + monkeypatch.setattr( + "profiles.keycloak_integration.get_user_identity_providers", mock_return + ) + + profile = ProfileFactory(user=user_gql_client.user) + ServiceConnectionFactory(profile=profile, service=service) + + query = """ + { + myProfile { + loginMethods + } + } + """ + executed = user_gql_client.execute( + query, auth_token_payload={"amr": amr_claim_value}, service=service + ) + assert "errors" not in executed + assert executed["data"]["myProfile"]["loginMethods"] == [] + + +def test_user_does_not_see_non_enum_login_methods( + user_gql_client, profile, group, service, monkeypatch +): + def mock_return(*_, **__): + return {"password", "this should not show up"} + + monkeypatch.setattr( + "profiles.keycloak_integration.get_user_identity_providers", mock_return + ) + + profile = ProfileFactory(user=user_gql_client.user) + ServiceConnectionFactory(profile=profile, service=service) + + query = """ + { + myProfile { + loginMethods + } + } + """ + executed = user_gql_client.execute( + query, auth_token_payload={"amr": "helsinki_tunnus"}, service=service + ) + assert "errors" not in executed + assert set(executed["data"]["myProfile"]["loginMethods"]) == {"PASSWORD"} diff --git a/profiles/tests/test_gql_profiles_query.py b/profiles/tests/test_gql_profiles_query.py index 4ebc956c..8197435d 100644 --- a/profiles/tests/test_gql_profiles_query.py +++ b/profiles/tests/test_gql_profiles_query.py @@ -798,3 +798,29 @@ def test_not_specifying_requesters_service_results_in_permission_denied_error( """ executed = user_gql_client.execute(query) assert_match_error_code(executed, "PERMISSION_DENIED_ERROR") + + +def test_staff_user_can_not_query_login_methods_of_other_users( + user_gql_client, group, service +): + profile = ProfileFactory() + ServiceConnectionFactory(profile=profile, service=service) + user = user_gql_client.user + user.groups.add(group) + assign_perm("can_view_profiles", group, service) + + query = """ + { + profiles { + edges { + node { + loginMethods + } + } + } + } + """ + + executed = user_gql_client.execute(query, service=service) + assert "errors" in executed + assert_match_error_code(executed, "PERMISSION_DENIED_ERROR") diff --git a/profiles/tests/test_keycloak_integration.py b/profiles/tests/test_keycloak_integration.py new file mode 100644 index 00000000..8924e477 --- /dev/null +++ b/profiles/tests/test_keycloak_integration.py @@ -0,0 +1,120 @@ +from unittest.mock import MagicMock + +import pytest + +from profiles.keycloak_integration import ( + get_user_credential_types, + get_user_identity_providers, + get_user_login_methods, +) +from utils.keycloak import UserNotFoundError + + +@pytest.fixture +def mock_keycloak_admin_client(monkeypatch): + mock = MagicMock() + monkeypatch.setattr("profiles.keycloak_integration._keycloak_admin_client", mock) + return mock + + +# Tests for get_user_identity_providers +def test_get_user_identity_providers_no_client(monkeypatch): + """Test the function when _keycloak_admin_client is None.""" + monkeypatch.setattr("profiles.keycloak_integration._keycloak_admin_client", None) + + assert get_user_identity_providers("dummy_user_id") == set() + + +@pytest.mark.parametrize( + "mock_return_value, expected_result", + [ + ([], set()), + ([{"identityProvider": "helsinkiad"}], {"helsinkiad"}), + ( + [{"identityProvider": "helsinkiad"}, {"identityProvider": "suomi_fi"}], + {"helsinkiad", "suomi_fi"}, + ), + ], +) +def test_get_user_identity_providers_with_data( + mock_keycloak_admin_client, mock_return_value, expected_result +): + mock_keycloak_admin_client.get_user_federated_identities.return_value = ( + mock_return_value + ) + + assert get_user_identity_providers("dummy_user_id") == expected_result + + +def test_get_user_identity_providers_user_not_found(mock_keycloak_admin_client): + """Test the function when the user is not found.""" + mock_keycloak_admin_client.get_user_federated_identities.side_effect = ( + UserNotFoundError + ) + + assert get_user_identity_providers("dummy_user_id") == set() + + +def test_get_user_federated_identities_no_identities(mock_keycloak_admin_client): + """Test the function when the user has no federated identities.""" + mock_keycloak_admin_client.get_user_federated_identities.return_value = set() + + assert get_user_identity_providers("dummy_user_id") == set() + + +def test_get_user_identity_providers_exception(mock_keycloak_admin_client): + """Test the function when an exception is raised.""" + mock_keycloak_admin_client.get_user_federated_identities.side_effect = Exception + + with pytest.raises(Exception): + get_user_identity_providers("dummy_user_id") + + +# Tests for get_user_credential_types +def test_get_user_credential_types_no_client(monkeypatch): + monkeypatch.setattr("profiles.keycloak_integration._keycloak_admin_client", None) + + assert get_user_credential_types("dummy_user_id") == set() + + +@pytest.mark.parametrize( + "mock_return_value, expected_result", + [ + ([], set()), + ([{"type": "password"}], {"password"}), + ([{"type": "password"}, {"type": "otp"}], {"password", "otp"}), + ], +) +def test_get_user_credential_types_with_data( + mock_keycloak_admin_client, mock_return_value, expected_result +): + mock_keycloak_admin_client.get_user_credentials.return_value = mock_return_value + + assert get_user_credential_types("dummy_user_id") == expected_result + + +def test_get_user_credential_types_user_not_found(mock_keycloak_admin_client): + mock_keycloak_admin_client.get_user_credentials.side_effect = UserNotFoundError + + assert get_user_credential_types("dummy_user_id") == set() + + +def test_get_user_credential_types_exception(mock_keycloak_admin_client): + mock_keycloak_admin_client.get_user_credentials.side_effect = Exception + + with pytest.raises(Exception): + get_user_credential_types("dummy_user_id") + + +# Tests for get_user_login_methods +def test_get_user_login_methods(monkeypatch): + monkeypatch.setattr( + "profiles.keycloak_integration.get_user_credential_types", + lambda _: {"password"}, + ) + monkeypatch.setattr( + "profiles.keycloak_integration.get_user_identity_providers", + lambda _: {"provider1"}, + ) + + assert get_user_login_methods("dummy_user_id") == {"password", "provider1"} diff --git a/profiles/tests/test_utils.py b/profiles/tests/test_utils.py new file mode 100644 index 00000000..56e28231 --- /dev/null +++ b/profiles/tests/test_utils.py @@ -0,0 +1,27 @@ +from enum import Enum + +import pytest + +from profiles.utils import enum_values, force_list + + +@pytest.mark.parametrize( + "input_value,expected", + [ + ([1, 2, 3], [1, 2, 3]), + ([], []), + (None, []), + ("foo", ["foo"]), + ((1, 2), [(1, 2)]), # tuples are treated as single values + ], +) +def test_force_list(input_value, expected): + assert force_list(input_value) == expected + + +def test_enum_values(): + class TestEnum(Enum): + FOO = "foo" + BAR = "bar" + + assert enum_values(TestEnum) == ["foo", "bar"] diff --git a/profiles/utils.py b/profiles/utils.py index cad1192c..813ce547 100644 --- a/profiles/utils.py +++ b/profiles/utils.py @@ -1,5 +1,11 @@ +import functools +from enum import Enum +from typing import Type, TypeVar + from django.conf import settings +_EnumType = TypeVar("_EnumType", bound=Type[Enum]) + def requester_has_service_permission(request, permission): service = getattr(request, "service", None) @@ -41,3 +47,22 @@ def requester_has_sufficient_loa_to_perform_gdpr_request(request): "substantial", "high", ] + + +def force_list(value) -> list: + """ + Ensure that the given value is a list. If the value is None, return an empty list. + """ + if value is None: + return [] + if isinstance(value, list): + return value + return [value] + + +@functools.cache +def enum_values(enum: _EnumType) -> list: + """ + Return a list of values from the given enum. + """ + return [e.value for e in enum] diff --git a/utils/keycloak.py b/utils/keycloak.py index 2ce2d5f3..d9fa5ffc 100644 --- a/utils/keycloak.py +++ b/utils/keycloak.py @@ -24,6 +24,19 @@ class ConflictError(KeycloakError): """A conflict occured in Keycloak.""" +def _validate_users_response(response): + if response.status_code == 404: + raise UserNotFoundError("User not found in Keycloak") + + if response.status_code == 409: + raise ConflictError("Keycloak reported a conflict") + + if not response.ok: + raise CommunicationError( + f"Failed communicating with Keycloak (status code {response.status_code})" + ) + + class KeycloakAdminClient: def __init__(self, server_url, realm_name, client_id, client_secret): self._server_url = server_url @@ -89,8 +102,10 @@ def _get_auth(self, force_renew=False): return self._auth - def _single_user_url(self, user_id): - return f"{self._server_url}/admin/realms/{self._realm_name}/users/{user_id}" + def _single_user_url(self, user_id, action: str = ""): + if action and not action.startswith("/"): + action = f"/{action}" + return f"{self._server_url}/admin/realms/{self._realm_name}/users/{user_id}{action}" def _handle_request_with_auth(self, requester): def reauth_requester(): @@ -101,56 +116,57 @@ def reauth_requester(): return self._handle_request_common_errors(reauth_requester) - def _handle_user_request(self, requester): - response = self._handle_request_with_auth(requester) - - if response.status_code == 404: - raise UserNotFoundError("User not found in Keycloak") + def request(self, method, url, validator, **kwargs) -> requests.Response: + kwargs.setdefault("timeout", self._timeout) - if response.status_code == 409: - raise ConflictError("Keycloak reported a conflict") + response = self._handle_request_with_auth( + lambda auth: self._session.request(method, url, auth=auth, **kwargs) + ) - if not response.ok: - raise CommunicationError( - f"Failed communicating with Keycloak (status code {response.status_code}" - ) + if validator: + validator(response) return response - def get_user(self, user_id): - url = self._single_user_url(user_id) + def get(self, url, *args, **kwargs) -> requests.Response: + return self.request("GET", url, *args, **kwargs) - response = self._handle_user_request( - lambda auth: self._session.get(url, auth=auth, timeout=self._timeout) - ) + def put(self, url, *args, **kwargs) -> requests.Response: + return self.request("PUT", url, *args, **kwargs) + + def delete(self, url, *args, **kwargs) -> requests.Response: + return self.request("DELETE", url, *args, **kwargs) + def get_user(self, user_id): + response = self.get( + self._single_user_url(user_id), validator=_validate_users_response + ) return response.json() def update_user(self, user_id, update_data: dict): - url = self._single_user_url(user_id) - - self._handle_user_request( - lambda auth: self._session.put( - url, auth=auth, json=update_data, timeout=self._timeout - ) + self.put( + self._single_user_url(user_id), + validator=_validate_users_response, + json=update_data, ) def delete_user(self, user_id): - url = self._single_user_url(user_id) - - self._handle_user_request( - lambda auth: self._session.delete(url, auth=auth, timeout=self._timeout) - ) + self.delete(self._single_user_url(user_id), validator=_validate_users_response) def send_verify_email(self, user_id): - url = self._single_user_url(user_id) - url += "/send-verify-email" - - return self._handle_user_request( - lambda auth: self._session.put( - url, - auth=auth, - timeout=self._timeout, - params={"client_id": self._client_id}, - ) + url = self._single_user_url(user_id, "send-verify-email") + return self.put( + url, + params={"client_id": self._client_id}, + validator=_validate_users_response, ) + + def get_user_federated_identities(self, user_id): + url = self._single_user_url(user_id, "federated-identity") + response = self.get(url, validator=_validate_users_response) + return response.json() + + def get_user_credentials(self, user_id): + url = self._single_user_url(user_id, "credentials") + response = self.get(url, validator=_validate_users_response) + return response.json() diff --git a/utils/tests/test_keycloak.py b/utils/tests/test_keycloak.py index 51fc4d86..dd0ddd3b 100644 --- a/utils/tests/test_keycloak.py +++ b/utils/tests/test_keycloak.py @@ -2,6 +2,7 @@ import pytest import requests +from requests_mock import Mocker from utils import keycloak @@ -15,7 +16,7 @@ access_token = "test-access-token" unaccepted_access_token = "unaccepted-access-token" -req_mock = None +req_mock: Mocker = None user_id = "test-user-id" user_data = { @@ -41,7 +42,8 @@ def keycloak_client(): ) -def build_mock_kwargs(response, json={}): +def build_mock_kwargs(response, json: dict = None): + json = json or {} if isinstance(response, int): mock_kwargs = { "status_code": response, @@ -367,3 +369,46 @@ def test_renew_access_token_when_old_one_is_not_accepted_with_verify_email_sendi keycloak_client.send_verify_email(user_id) assert success_mock.call_count == 1 + + +def test_get_user_federated_identities(keycloak_client): + setup_well_known() + setup_client_credentials() + federated_identities = [ + { + "identityProvider": "provider", + "userId": "id", + "userName": "username@example.test", + }, + ] + req_mock.get( + f"{server_url}/admin/realms/{realm_name}/users/{user_id}/federated-identity", + request_headers={"Authorization": f"Bearer {access_token}"}, + json=federated_identities, + ) + + result = keycloak_client.get_user_federated_identities(user_id) + + assert result == federated_identities + + +def test_get_user_credentials(keycloak_client): + setup_well_known() + setup_client_credentials() + credentials = [ + { + "id": "uuid", + "type": "password", + "createdDate": 1721043502463, + "credentialData": '{"hashIterations": 27500,"algorithm": "pbkdf2-sha256","additionalParameters": {}}', + }, + ] + req_mock.get( + f"{server_url}/admin/realms/{realm_name}/users/{user_id}/credentials", + request_headers={"Authorization": f"Bearer {access_token}"}, + json=credentials, + ) + + result = keycloak_client.get_user_credentials(user_id) + + assert result == credentials