From ef2f8428b4ed862b5c72b3b448853a69794eacfb Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Wed, 8 May 2024 16:45:29 -0400 Subject: [PATCH 1/2] Hook up resource to API filtering --- ansible_base/rbac/api/serializers.py | 105 ++++++++++++--- .../rbac/migrations/0002_resource_link.py | 40 ++++++ ansible_base/rbac/models.py | 43 ++++++ .../resource_registry/models/resource.py | 3 + test_app/tests/rbac/api/test_rbac_views.py | 1 + .../api/test_resource_based_assignments.py | 124 ++++++++++++++++++ 6 files changed, 296 insertions(+), 20 deletions(-) create mode 100644 ansible_base/rbac/migrations/0002_resource_link.py create mode 100644 test_app/tests/rbac/api/test_resource_based_assignments.py diff --git a/ansible_base/rbac/api/serializers.py b/ansible_base/rbac/api/serializers.py index 2c97bd41b..0f7601fe6 100644 --- a/ansible_base/rbac/api/serializers.py +++ b/ansible_base/rbac/api/serializers.py @@ -1,7 +1,10 @@ +from typing import Optional + from django.apps import apps from django.conf import settings from django.core.exceptions import ObjectDoesNotExist from django.db import transaction +from django.db.models import Model from django.db.utils import IntegrityError from django.utils.functional import cached_property from django.utils.translation import gettext_lazy as _ @@ -16,6 +19,7 @@ from ansible_base.rbac.permission_registry import permission_registry # careful for circular imports from ansible_base.rbac.policies import check_content_obj_permission, visible_users from ansible_base.rbac.validators import validate_permissions_for_model +from ansible_base.resource_registry.models import Resource class ChoiceLikeMixin(serializers.ChoiceField): @@ -176,9 +180,17 @@ class RoleDefinitionDetailSerializer(RoleDefinitionSerializer): class BaseAssignmentSerializer(CommonModelSerializer): content_type = ContentTypeField(read_only=True) object_ansible_id = serializers.UUIDField( + required=False, + help_text=_('Deprecated, use the resource field instead.'), + ) + resource = serializers.SlugRelatedField( + slug_field='ansible_id', + queryset=Resource.objects.all(), + allow_null=True, required=False, help_text=_('Resource id of the object this role applies to. Alternative to the object_id field.'), ) + object_fields = ['object_id', 'object_ansible_id', 'resource'] def __init__(self, *args, **kwargs): """ @@ -193,11 +205,22 @@ def __init__(self, *args, **kwargs): qs = self.Meta.model._meta.get_field(self.actor_field).model.objects.all() self.fields[self.actor_field] = serializers.PrimaryKeyRelatedField(queryset=qs, required=False) - def raise_id_fields_error(self, field1, field2): - msg = _('Provide exactly one of %(actor_field)s or %(actor_field)s_ansible_id') % {'actor_field': self.actor_field} - raise ValidationError({self.actor_field: msg, f'{self.actor_field}_ansible_id': msg}) + @property + def actor_fields(self): + return [self.actor_field, f'{self.actor_field}_ansible_id', f'{self.actor_field}_resource'] + + def raise_id_fields_error(self, field_list: list[str]): + msg = _('Provide exactly one of %(field_list)s') % {'field_list': field_list} + raise ValidationError({field_name: msg for field_name in field_list}) + + def raise_actor_fields_error(self): + self.raise_id_fields_error(self.actor_fields) - def get_by_ansible_id(self, ansible_id, requesting_user, for_field): + def raise_object_fields_error(self): + self.raise_id_fields_error(self.object_fields) + + def get_by_ansible_id(self, ansible_id, requesting_user: Model, for_field: str) -> Model: + """Uses ansible_id, deprecated""" try: resource_cls = apps.get_model('dab_resource_registry', 'Resource') except LookupError: @@ -215,24 +238,46 @@ def get_by_ansible_id(self, ansible_id, requesting_user, for_field): except ObjectDoesNotExist: msg = serializers.PrimaryKeyRelatedField.default_error_messages['does_not_exist'] raise ValidationError({for_field: msg.format(pk_value=ansible_id)}) - return resource.content_object + return obj + + def get_obj_from_resource(self, resource: Resource, requesting_user: Model, for_field: str) -> Model: + """Return the content_object of the resource and check permissions""" + try: + # Ensure that the request user has permission to view provided data + obj = resource.content_object + if obj._meta.model_name == 'user': + if not visible_users(requesting_user).filter(pk=obj.pk).exists(): + raise ObjectDoesNotExist + elif not requesting_user.has_obj_perm(obj, 'view'): + raise ObjectDoesNotExist + except ObjectDoesNotExist: + msg = serializers.PrimaryKeyRelatedField.default_error_messages['does_not_exist'] + raise ValidationError({for_field: msg.format(pk_value=resource.ansible_id)}) + return obj - def get_actor_from_data(self, validated_data, requesting_user): + def get_actor_from_data(self, validated_data: dict, requesting_user: Model) -> Model: + """Return the user or team from validated request data, implicitly check read permission for request user""" actor_aid_field = f'{self.actor_field}_ansible_id' - if validated_data.get(self.actor_field) and validated_data.get(actor_aid_field): - self.raise_id_fields_error(self.actor_field, actor_aid_field) + actor_resource_field = f'{self.actor_field}_resource' + if len([field_name for field_name in self.actor_fields if validated_data.get(field_name)]) > 1: + self.raise_actor_fields_error() elif validated_data.get(self.actor_field): actor = validated_data[self.actor_field] - elif ansible_id := validated_data.get(actor_aid_field): - actor = self.get_by_ansible_id(ansible_id, requesting_user, for_field=actor_aid_field) + elif validated_data.get(actor_aid_field) or validated_data.get(actor_resource_field): + if ansible_id := validated_data.get(actor_aid_field): + actor = self.get_by_ansible_id(ansible_id, requesting_user, for_field=actor_aid_field) + else: + resource = validated_data[actor_resource_field] + actor = self.get_obj_from_resource(resource, requesting_user, for_field=actor_resource_field) else: - self.raise_id_fields_error(self.actor_field, f'{self.actor_field}_ansible_id') + self.raise_actor_fields_error() return actor - def get_object_from_data(self, validated_data, role_definition, requesting_user): + def get_object_from_data(self, validated_data, role_definition, requesting_user) -> Optional[Model]: + """Return the content object specified by validated request data, implicitly check read permission for request user""" obj = None - if validated_data.get('object_id') and validated_data.get('object_ansible_id'): - self.raise_id_fields_error('object_id', 'object_ansible_id') + if len([field_name for field_name in self.object_fields if validated_data.get(field_name)]) > 1: + self.raise_object_fields_error() elif validated_data.get('object_id'): if not role_definition.content_type: raise ValidationError({'object_id': _('System role does not allow for object assignment')}) @@ -241,12 +286,18 @@ def get_object_from_data(self, validated_data, role_definition, requesting_user) obj = serializers.PrimaryKeyRelatedField(queryset=model.access_qs(requesting_user)).to_internal_value(validated_data['object_id']) except ValidationError as exc: raise ValidationError({'object_id': exc.detail}) - elif validated_data.get('object_ansible_id'): - obj = self.get_by_ansible_id(validated_data.get('object_ansible_id'), requesting_user, for_field='object_ansible_id') + elif validated_data.get('object_ansible_id') or validated_data.get('resource'): + if validated_data.get('object_ansible_id'): + field_name = 'object_ansible_id' + obj = self.get_by_ansible_id(validated_data.get('object_ansible_id'), requesting_user, for_field='object_ansible_id') + else: + field_name = 'resource' + resource = validated_data['resource'] + obj = self.get_obj_from_resource(resource, requesting_user, for_field='resource') if permission_registry.content_type_model.objects.get_for_model(obj) != role_definition.content_type: raise ValidationError( { - 'object_ansible_id': _('Object type of %(model_name)s does not match role type of %(role_definition)s') + field_name: _('Object type of %(model_name)s does not match role type of %(role_definition)s') % {'model_name': obj._meta.model_name, 'role_definition': role_definition.content_type.model} } ) @@ -300,19 +351,26 @@ def _get_summary_fields(self, obj) -> dict[str, dict]: return summary_fields -ASSIGNMENT_FIELDS = ImmutableCommonModelSerializer.Meta.fields + ['content_type', 'object_id', 'object_ansible_id', 'role_definition'] +ASSIGNMENT_FIELDS = ImmutableCommonModelSerializer.Meta.fields + ['content_type', 'object_id', 'object_ansible_id', 'resource', 'role_definition'] class RoleUserAssignmentSerializer(BaseAssignmentSerializer): actor_field = 'user' user_ansible_id = serializers.UUIDField( + required=False, + help_text=_('Deprecated, use the user_resource field instead.'), + ) + user_resource = serializers.SlugRelatedField( + slug_field='ansible_id', + queryset=Resource.objects.filter(content_type__model='user'), + allow_null=True, required=False, help_text=_('Resource id of the user who will receive permissions from this assignment. Alternative to user field.'), ) class Meta: model = RoleUserAssignment - fields = ASSIGNMENT_FIELDS + ['user', 'user_ansible_id'] + fields = ASSIGNMENT_FIELDS + ['user', 'user_ansible_id', 'user_resource'] def get_actor_queryset(self, requesting_user): return visible_users(requesting_user) @@ -321,13 +379,20 @@ def get_actor_queryset(self, requesting_user): class RoleTeamAssignmentSerializer(BaseAssignmentSerializer): actor_field = 'team' team_ansible_id = serializers.UUIDField( + required=False, + help_text=_('Deprecated, use the team_resource field instead.'), + ) + team_resource = serializers.SlugRelatedField( + slug_field='ansible_id', + queryset=Resource.objects.filter(content_type__model='team'), + allow_null=True, required=False, help_text=_('Resource id of the team who will receive permissions from this assignment. Alternative to team field.'), ) class Meta: model = RoleTeamAssignment - fields = ASSIGNMENT_FIELDS + ['team', 'team_ansible_id'] + fields = ASSIGNMENT_FIELDS + ['team', 'team_ansible_id', 'team_resource'] def get_actor_queryset(self, requesting_user): return permission_registry.team_model.access_qs(requesting_user) diff --git a/ansible_base/rbac/migrations/0002_resource_link.py b/ansible_base/rbac/migrations/0002_resource_link.py new file mode 100644 index 000000000..6c1bb08c0 --- /dev/null +++ b/ansible_base/rbac/migrations/0002_resource_link.py @@ -0,0 +1,40 @@ +# Generated by Django 4.2.11 on 2024-05-06 20:27 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('dab_resource_registry', '0004_remove_resourcetype_migrated'), + ('dab_rbac', '0001_initial'), + ] + + operations = [ + migrations.AddField( + model_name='roleteamassignment', + name='resource', + field=models.ForeignKey(help_text='A UUID identifier of the object this role assignment applies to, null value may indicate system-wide role or a model that has no ansible_id', null=True, on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_object_assignments', to='dab_resource_registry.resource'), + ), + migrations.AddField( + model_name='roleteamassignment', + name='user_resource', + field=models.ForeignKey(help_text='A UUID identifier of the team given permission defined by this assignment', null=True, on_delete=django.db.models.deletion.CASCADE, related_name='role_team_assignments', to='dab_resource_registry.resource'), + ), + migrations.AddField( + model_name='roleuserassignment', + name='resource', + field=models.ForeignKey(help_text='A UUID identifier of the object this role assignment applies to, null value may indicate system-wide role or a model that has no ansible_id', null=True, on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_object_assignments', to='dab_resource_registry.resource'), + ), + migrations.AddField( + model_name='roleuserassignment', + name='user_resource', + field=models.ForeignKey(help_text='A UUID identifier of the user given permission defined by this assignment', null=True, on_delete=django.db.models.deletion.CASCADE, related_name='role_user_assignments', to='dab_resource_registry.resource'), + ), + migrations.AddField( + model_name='objectrole', + name='resource', + field=models.ForeignKey(help_text='A UUID identifier of the object this role assignment applies to, null value may indicate system-wide role or a model that has no ansible_id', null=True, on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_object_assignments', to='dab_resource_registry.resource'), + ), + ] diff --git a/ansible_base/rbac/models.py b/ansible_base/rbac/models.py index 1074d39aa..a946be022 100644 --- a/ansible_base/rbac/models.py +++ b/ansible_base/rbac/models.py @@ -216,6 +216,9 @@ def give_or_remove_permission(self, actor, content_object, giving=True, sync_act # sanitize the object_id to its database version, practically, remove "-" chars from uuids object_id = content_object._meta.pk.get_db_prep_value(content_object.pk, connection) kwargs = dict(role_definition=self, content_type=obj_ct, object_id=object_id) + if hasattr(content_object, 'resource'): + if resource := content_object.resource: + kwargs['resource'] = resource created = False object_role = ObjectRole.objects.filter(**kwargs).first() @@ -336,10 +339,20 @@ class AssignmentBase(ImmutableCommonModel, ObjectRoleFields): """ object_role = models.ForeignKey('dab_rbac.ObjectRole', on_delete=models.CASCADE, editable=False, null=True) + # Resource link fields are duplicated from object_role, for serializer purposes object_id = models.TextField( null=True, blank=True, help_text=_('Primary key of the object this assignment applies to, null value indicates system-wide assignment') ) content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, null=True) + resource = models.ForeignKey( + 'dab_resource_registry.Resource', + on_delete=models.CASCADE, + null=True, + related_name='%(class)s_object_assignments', + help_text=_( + 'A UUID identifier of the object this role assignment applies to, null value may indicate system-wide role or a model that has no ansible_id' + ), + ) # object_role is internal, and not shown in serializer # content_type does not have a link, and ResourceType will be used in lieu sometime @@ -357,6 +370,10 @@ def __init__(self, *args, **kwargs): self.object_id = self.object_role.object_id self.content_type_id = self.object_role.content_type_id self.role_definition_id = self.object_role.role_definition_id + if self.object_role_id and not self.resource_id: + # ObjectRole may not have a linked resource, if not tracked in resource registry + if resource_id := self.object_role.resource_id: + self.resource_id = resource_id class RoleUserAssignment(AssignmentBase): @@ -367,6 +384,14 @@ class RoleUserAssignment(AssignmentBase): related_name='user_assignments', ) user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name='role_assignments') + user_resource = models.ForeignKey( + 'dab_resource_registry.Resource', + on_delete=models.CASCADE, + null=True, + related_name='role_user_assignments', + help_text=_('A UUID identifier of the user given permission defined by this assignment'), + ) + router_basename = 'roleuserassignment' class Meta: @@ -391,6 +416,14 @@ class RoleTeamAssignment(AssignmentBase): related_name='team_assignments', ) team = models.ForeignKey(settings.ANSIBLE_BASE_TEAM_MODEL, on_delete=models.CASCADE, related_name='role_assignments') + user_resource = models.ForeignKey( + 'dab_resource_registry.Resource', + on_delete=models.CASCADE, + null=True, + related_name='role_team_assignments', + help_text=_('A UUID identifier of the team given permission defined by this assignment'), + ) + router_basename = 'roleteamassignment' class Meta: @@ -452,6 +485,16 @@ class Meta: editable=False, help_text=_("Users who have this role obtain member access to these teams, and inherit all their permissions"), ) + # Not really computed, but cached just to have around + resource = models.ForeignKey( + 'dab_resource_registry.Resource', + on_delete=models.CASCADE, + null=True, + related_name='%(class)s_object_assignments', + help_text=_( + 'A UUID identifier of the object this role assignment applies to, null value may indicate system-wide role or a model that has no ansible_id' + ), + ) def __str__(self): return f'ObjectRole(pk={self.id}, {self.content_type.model}={self.object_id})' diff --git a/ansible_base/resource_registry/models/resource.py b/ansible_base/resource_registry/models/resource.py index 779402d8a..8ac115e3e 100644 --- a/ansible_base/resource_registry/models/resource.py +++ b/ansible_base/resource_registry/models/resource.py @@ -58,6 +58,9 @@ class Resource(models.Model): def summary_fields(self): return {"ansible_id": self.ansible_id, "resource_type": self.resource_type} + def __str__(self): + return f'Resource(name={self.name}, object_id={self.object_id}, ansible_id={self.ansible_id})' + @property def resource_type(self): return resource_type_cache(self.content_type.pk).name diff --git a/test_app/tests/rbac/api/test_rbac_views.py b/test_app/tests/rbac/api/test_rbac_views.py index 18066ddd0..34c5fda34 100644 --- a/test_app/tests/rbac/api/test_rbac_views.py +++ b/test_app/tests/rbac/api/test_rbac_views.py @@ -151,6 +151,7 @@ def test_create_user_assignment_immutable(user_api_client, user, rando, task_adm response = user_api_client.post(url, data=request_data) assert response.status_code == 400, response.data + assert 'object_id' in response.data, response.data assert 'object does not exist' in response.data['object_id'][0] task_view_rd.give_permission(user, task) diff --git a/test_app/tests/rbac/api/test_resource_based_assignments.py b/test_app/tests/rbac/api/test_resource_based_assignments.py new file mode 100644 index 000000000..283e98c44 --- /dev/null +++ b/test_app/tests/rbac/api/test_resource_based_assignments.py @@ -0,0 +1,124 @@ +from uuid import uuid4 + +import pytest +from django.contrib.contenttypes.models import ContentType +from django.urls import reverse + +from ansible_base.rbac.models import ObjectRole, RoleEvaluation +from ansible_base.resource_registry.models import Resource + + +@pytest.mark.django_db +def test_assignment_attaches_resource(user, organization, org_admin_rd, admin_api_client): + assignment = org_admin_rd.give_permission(user, organization) + assert assignment.resource == organization.resource + assert assignment.object_role.resource == organization.resource + + url = reverse('roleuserassignment-list') + # Prove that we can filter assignments using ansible_id + response = admin_api_client.get(url, data={'resource__ansible_id': organization.resource.ansible_id}) + assert response.data['count'] == 1 + # Prove the null case for filtering + response = admin_api_client.get(url, data={'resource__ansible_id': str(uuid4())}) + assert response.data['count'] == 0 + + +@pytest.mark.django_db +def test_user_assignment_resource(admin_api_client, inv_rd, rando, inventory): + resource = Resource.objects.get(object_id=rando.pk, content_type=ContentType.objects.get_for_model(rando).pk) + url = reverse('roleuserassignment-list') + data = dict(role_definition=inv_rd.id, content_type='aap.inventory', user_resource=str(resource.ansible_id), object_id=inventory.id) + response = admin_api_client.post(url, data=data, format="json") + assert response.status_code == 201, response.data + assert rando.has_obj_perm(inventory, 'change') + + +@pytest.mark.django_db +def test_team_assignment_resource(admin_api_client, inv_rd, team, inventory, member_rd, rando): + member_rd.give_permission(rando, team) + team_ct = ContentType.objects.get_for_model(team) + resource = Resource.objects.get(object_id=team.pk, content_type=team_ct.pk) + url = reverse('roleteamassignment-list') + data = dict(role_definition=inv_rd.id, content_type='aap.inventory', team_resource=str(resource.ansible_id), object_id=inventory.id) + response = admin_api_client.post(url, data=data, format="json") + assert response.status_code == 201, response.data + + team_role = ObjectRole.objects.get(object_id=team.id, content_type=team_ct, role_definition=member_rd) + assert RoleEvaluation.objects.filter(role=team_role, codename='change_inventory', object_id=inventory.id).count() == 1 + assert rando.has_obj_perm(inventory, 'change') + + +@pytest.mark.django_db +@pytest.mark.parametrize('actor', ['user', 'team']) +def test_assignment_id_validation(admin_api_client, inv_rd, team, inventory, rando, actor): + if actor == 'user': + actor_obj = rando + else: + actor_obj = team + resource = Resource.objects.get(object_id=actor_obj.pk, content_type=ContentType.objects.get_for_model(actor_obj).pk) + url = reverse(f'role{actor}assignment-list') + test_fields = (actor, f'{actor}_resource') + + # Provide too little data + data = dict(role_definition=inv_rd.id, content_type='aap.inventory', object_id=inventory.id) + response = admin_api_client.post(url, data=data, format="json") + assert response.status_code == 400, response.data + for field_name in test_fields: + assert 'Provide exactly one of' in str(response.data[field_name]) + + # Provide too much data + data[f'{actor}_resource'] = str(resource.ansible_id) + data[actor] = actor_obj.id + response = admin_api_client.post(url, data=data, format="json") + assert response.status_code == 400, response.data + for field_name in test_fields: + assert 'Provide exactly one of' in str(response.data[field_name]) + + # And we rolled back or did not take the action, right? + assert not rando.has_obj_perm(inventory, 'change') + + +@pytest.mark.django_db +def test_user_assignment_by_resource(admin_api_client, org_inv_rd, rando, inventory, organization): + resource = Resource.objects.get(object_id=organization.pk, content_type=ContentType.objects.get_for_model(organization).pk) + url = reverse('roleuserassignment-list') + data = dict(role_definition=org_inv_rd.id, user=rando.id, resource=str(resource.ansible_id)) + response = admin_api_client.post(url, data=data, format="json") + assert response.status_code == 201, response.data + assert rando.has_obj_perm(inventory, 'change') + + +@pytest.mark.django_db +def test_missing_object(admin_api_client, inv_rd, rando): + url = reverse('roleuserassignment-list') + data = dict(role_definition=inv_rd.id, user=rando.id) + response = admin_api_client.post(url, data=data, format="json") + assert response.status_code == 400, response.data + assert response.data['object_id'] == 'Object must be specified for this role assignment' + + +@pytest.mark.django_db +def test_invalid_resource(admin_api_client, org_inv_rd, rando): + url = reverse('roleuserassignment-list') + bad_ansible_id = f'{uuid4()}' + data = dict(role_definition=org_inv_rd.id, user=rando.id, resource=bad_ansible_id) + response = admin_api_client.post(url, data=data, format="json") + assert response.status_code == 400, response.data + assert 'does not exist' in str(response.data['resource']) + assert bad_ansible_id in str(response.data['resource']) + + +@pytest.mark.django_db +def test_resource_bad_type(admin_api_client, inv_rd, rando, organization): + """If giving object_id, type is implied from role definition. + + When giving ansible_id that no longer holds true, and user can give any type. + This expects a validation error when the object type does not match the role type. + """ + resource = Resource.objects.get(object_id=organization.pk, content_type=ContentType.objects.get_for_model(organization).pk) + url = reverse('roleuserassignment-list') + data = dict(role_definition=inv_rd.id, user=rando.id, resource=str(resource.ansible_id)) + response = admin_api_client.post(url, data=data, format="json") + assert response.status_code == 400, response.data + assert 'resource' in response.data, response.data + assert 'organization does not match role type of inventory' in str(response.data['resource']) From 6b8d8d16469c07d5c2a67fe06ce626bedd92366f Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Tue, 14 May 2024 13:11:57 -0400 Subject: [PATCH 2/2] Properly handle and test object_role lookups --- ansible_base/rbac/models.py | 60 +++++++++++++------ .../rbac/api/test_ansible_id_assignments.py | 38 +++++++++++- 2 files changed, 80 insertions(+), 18 deletions(-) diff --git a/ansible_base/rbac/models.py b/ansible_base/rbac/models.py index a946be022..4effd7ffd 100644 --- a/ansible_base/rbac/models.py +++ b/ansible_base/rbac/models.py @@ -6,6 +6,7 @@ from django.conf import settings from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType +from django.core.exceptions import ObjectDoesNotExist from django.db import connection, models, transaction from django.db.models.functions import Cast from django.db.models.query import QuerySet @@ -189,6 +190,19 @@ def give_permission(self, actor, content_object): def remove_permission(self, actor, content_object): return self.give_or_remove_permission(actor, content_object, giving=False) + def get_object_role(self, **kwargs): + """Downselects kwargs to the lookup_fields we want to use and does lookup""" + lookup_fields = {'object_id', 'content_type', 'role_definition'} + lookup_data = {} + defaults = {} + for field_name, field_value in kwargs.items(): + if field_name in lookup_fields: + lookup_data[field_name] = field_value + else: + defaults[field_name] = field_value + + return ObjectRole.objects.filter(**lookup_data).first() + def get_or_create_object_role(self, **kwargs): """Transaction-safe method to create ObjectRole @@ -197,17 +211,24 @@ def get_or_create_object_role(self, **kwargs): postgres constraints will still be violated by other active transactions which gives us a way to gracefully handle this. """ - if transaction.get_connection().in_atomic_block: - try: - with transaction.atomic(): - object_role = ObjectRole.objects.create(**kwargs) - return (object_role, True) - except IntegrityError: - object_role = ObjectRole.objects.get(**kwargs) - return (object_role, False) + object_role = self.get_object_role(**kwargs) + if object_role: + return (object_role, False) else: - object_role = ObjectRole.objects.create(**kwargs) - return (object_role, True) + if transaction.get_connection().in_atomic_block: + try: + with transaction.atomic(): + object_role = ObjectRole.objects.create(**kwargs) + return (object_role, True) + except IntegrityError: + object_role = ObjectRole.objects.get(**kwargs) + if object_role: + return (object_role, False) + # Should not really happen, but we will give up here + raise ObjectRole.DoesNotExist + else: + object_role = ObjectRole.objects.create(**kwargs) + return (object_role, True) def give_or_remove_permission(self, actor, content_object, giving=True, sync_action=False): "Shortcut method to do whatever needed to give user or team these permissions" @@ -216,15 +237,20 @@ def give_or_remove_permission(self, actor, content_object, giving=True, sync_act # sanitize the object_id to its database version, practically, remove "-" chars from uuids object_id = content_object._meta.pk.get_db_prep_value(content_object.pk, connection) kwargs = dict(role_definition=self, content_type=obj_ct, object_id=object_id) - if hasattr(content_object, 'resource'): - if resource := content_object.resource: - kwargs['resource'] = resource + + try: + if hasattr(content_object, 'resource'): + if resource := content_object.resource: + kwargs['resource'] = resource + except ObjectDoesNotExist: + pass created = False - object_role = ObjectRole.objects.filter(**kwargs).first() - if object_role is None: - if not giving: - return # nothing to do + if not giving: + object_role = self.get_object_role(**kwargs) + if not object_role: + return + else: object_role, created = self.get_or_create_object_role(**kwargs) from ansible_base.rbac.triggers import needed_updates_on_assignment, update_after_assignment diff --git a/test_app/tests/rbac/api/test_ansible_id_assignments.py b/test_app/tests/rbac/api/test_ansible_id_assignments.py index e8553ad71..8159243b9 100644 --- a/test_app/tests/rbac/api/test_ansible_id_assignments.py +++ b/test_app/tests/rbac/api/test_ansible_id_assignments.py @@ -5,6 +5,7 @@ from django.urls import reverse from ansible_base.rbac.models import ObjectRole, RoleEvaluation +from ansible_base.rbac.triggers import update_after_assignment from ansible_base.resource_registry.models import Resource @@ -12,12 +13,47 @@ def test_user_assignment_ansible_id(admin_api_client, inv_rd, rando, inventory): resource = Resource.objects.get(object_id=rando.pk, content_type=ContentType.objects.get_for_model(rando).pk) url = reverse('roleuserassignment-list') - data = dict(role_definition=inv_rd.id, content_type='aap.inventory', user_ansible_id=str(resource.ansible_id), object_id=inventory.id) + data = dict(role_definition=inv_rd.id, user_ansible_id=str(resource.ansible_id), object_id=inventory.id) response = admin_api_client.post(url, data=data, format="json") assert response.status_code == 201, response.data assert rando.has_obj_perm(inventory, 'change') +@pytest.mark.django_db +def test_user_assinment_resource_deleted(admin_api_client, rando, organization, org_admin_rd): + resource = Resource.objects.get(object_id=rando.pk, content_type=ContentType.objects.get_for_model(rando).pk) + url = reverse('roleuserassignment-list') + data = dict(role_definition=org_admin_rd.id, user_ansible_id=str(resource.ansible_id), object_id=organization.id) + + # sanity assertions of initial state + assert hasattr(organization, 'resource') + assert not ObjectRole.objects.filter(object_id=organization.id, content_type=ContentType.objects.get_for_model(organization)).exists() + + response = admin_api_client.post(url, data=data, format="json") + assert response.status_code == 201, response.data + assert rando.has_obj_perm(organization, 'change') + + # test duplicate assignment + organization.resource.delete() + response = admin_api_client.post(url, data=data, format="json") + assert response.status_code in (201, 200), response.data + + +@pytest.mark.django_db +def test_user_assinment_unmigrated_assignment(admin_api_client, rando, organization, org_admin_rd): + # Mock an object from from before assignment link was added + org_ct = ContentType.objects.get_for_model(organization) + object_role = ObjectRole.objects.create(object_id=organization.pk, content_type=org_ct, role_definition=org_admin_rd) + update_after_assignment([], [object_role]) + + url = reverse('roleuserassignment-list') + data = dict(role_definition=org_admin_rd.id, user_ansible_id=str(rando.resource.ansible_id), object_id=organization.id) + + response = admin_api_client.post(url, data=data, format="json") + assert response.status_code == 201, response.data + assert rando.has_obj_perm(organization, 'change') + + @pytest.mark.django_db def test_team_assignment_ansible_id(admin_api_client, inv_rd, team, inventory, member_rd, rando): member_rd.give_permission(rando, team)