Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Allow filtering of Role Assignment lists by ansible_id #379

Draft
wants to merge 2 commits into
base: devel
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 85 additions & 20 deletions ansible_base/rbac/api/serializers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from typing import Optional

from django.apps import apps
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db import transaction
from django.db.models import Model
from django.db.utils import IntegrityError
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
Expand All @@ -16,6 +19,7 @@
from ansible_base.rbac.permission_registry import permission_registry # careful for circular imports
from ansible_base.rbac.policies import check_content_obj_permission, visible_users
from ansible_base.rbac.validators import validate_permissions_for_model
from ansible_base.resource_registry.models import Resource


class ChoiceLikeMixin(serializers.ChoiceField):
Expand Down Expand Up @@ -176,9 +180,17 @@ class RoleDefinitionDetailSerializer(RoleDefinitionSerializer):
class BaseAssignmentSerializer(CommonModelSerializer):
content_type = ContentTypeField(read_only=True)
object_ansible_id = serializers.UUIDField(
required=False,
help_text=_('Deprecated, use the resource field instead.'),
)
resource = serializers.SlugRelatedField(
slug_field='ansible_id',
queryset=Resource.objects.all(),
allow_null=True,
required=False,
help_text=_('Resource id of the object this role applies to. Alternative to the object_id field.'),
)
object_fields = ['object_id', 'object_ansible_id', 'resource']

def __init__(self, *args, **kwargs):
"""
Expand All @@ -193,11 +205,22 @@ def __init__(self, *args, **kwargs):
qs = self.Meta.model._meta.get_field(self.actor_field).model.objects.all()
self.fields[self.actor_field] = serializers.PrimaryKeyRelatedField(queryset=qs, required=False)

def raise_id_fields_error(self, field1, field2):
msg = _('Provide exactly one of %(actor_field)s or %(actor_field)s_ansible_id') % {'actor_field': self.actor_field}
raise ValidationError({self.actor_field: msg, f'{self.actor_field}_ansible_id': msg})
@property
def actor_fields(self):
return [self.actor_field, f'{self.actor_field}_ansible_id', f'{self.actor_field}_resource']

def raise_id_fields_error(self, field_list: list[str]):
msg = _('Provide exactly one of %(field_list)s') % {'field_list': field_list}
raise ValidationError({field_name: msg for field_name in field_list})

def raise_actor_fields_error(self):
self.raise_id_fields_error(self.actor_fields)

def get_by_ansible_id(self, ansible_id, requesting_user, for_field):
def raise_object_fields_error(self):
self.raise_id_fields_error(self.object_fields)

def get_by_ansible_id(self, ansible_id, requesting_user: Model, for_field: str) -> Model:
"""Uses ansible_id, deprecated"""
try:
resource_cls = apps.get_model('dab_resource_registry', 'Resource')
except LookupError:
Expand All @@ -215,24 +238,46 @@ def get_by_ansible_id(self, ansible_id, requesting_user, for_field):
except ObjectDoesNotExist:
msg = serializers.PrimaryKeyRelatedField.default_error_messages['does_not_exist']
raise ValidationError({for_field: msg.format(pk_value=ansible_id)})
return resource.content_object
return obj

def get_obj_from_resource(self, resource: Resource, requesting_user: Model, for_field: str) -> Model:
"""Return the content_object of the resource and check permissions"""
try:
# Ensure that the request user has permission to view provided data
obj = resource.content_object
if obj._meta.model_name == 'user':
if not visible_users(requesting_user).filter(pk=obj.pk).exists():
raise ObjectDoesNotExist
elif not requesting_user.has_obj_perm(obj, 'view'):
raise ObjectDoesNotExist
except ObjectDoesNotExist:
msg = serializers.PrimaryKeyRelatedField.default_error_messages['does_not_exist']
raise ValidationError({for_field: msg.format(pk_value=resource.ansible_id)})
return obj

def get_actor_from_data(self, validated_data, requesting_user):
def get_actor_from_data(self, validated_data: dict, requesting_user: Model) -> Model:
"""Return the user or team from validated request data, implicitly check read permission for request user"""
actor_aid_field = f'{self.actor_field}_ansible_id'
if validated_data.get(self.actor_field) and validated_data.get(actor_aid_field):
self.raise_id_fields_error(self.actor_field, actor_aid_field)
actor_resource_field = f'{self.actor_field}_resource'
if len([field_name for field_name in self.actor_fields if validated_data.get(field_name)]) > 1:
self.raise_actor_fields_error()
elif validated_data.get(self.actor_field):
actor = validated_data[self.actor_field]
elif ansible_id := validated_data.get(actor_aid_field):
actor = self.get_by_ansible_id(ansible_id, requesting_user, for_field=actor_aid_field)
elif validated_data.get(actor_aid_field) or validated_data.get(actor_resource_field):
if ansible_id := validated_data.get(actor_aid_field):
actor = self.get_by_ansible_id(ansible_id, requesting_user, for_field=actor_aid_field)
else:
resource = validated_data[actor_resource_field]
actor = self.get_obj_from_resource(resource, requesting_user, for_field=actor_resource_field)
else:
self.raise_id_fields_error(self.actor_field, f'{self.actor_field}_ansible_id')
self.raise_actor_fields_error()
return actor

def get_object_from_data(self, validated_data, role_definition, requesting_user):
def get_object_from_data(self, validated_data, role_definition, requesting_user) -> Optional[Model]:
"""Return the content object specified by validated request data, implicitly check read permission for request user"""
obj = None
if validated_data.get('object_id') and validated_data.get('object_ansible_id'):
self.raise_id_fields_error('object_id', 'object_ansible_id')
if len([field_name for field_name in self.object_fields if validated_data.get(field_name)]) > 1:
self.raise_object_fields_error()
elif validated_data.get('object_id'):
if not role_definition.content_type:
raise ValidationError({'object_id': _('System role does not allow for object assignment')})
Expand All @@ -241,12 +286,18 @@ def get_object_from_data(self, validated_data, role_definition, requesting_user)
obj = serializers.PrimaryKeyRelatedField(queryset=model.access_qs(requesting_user)).to_internal_value(validated_data['object_id'])
except ValidationError as exc:
raise ValidationError({'object_id': exc.detail})
elif validated_data.get('object_ansible_id'):
obj = self.get_by_ansible_id(validated_data.get('object_ansible_id'), requesting_user, for_field='object_ansible_id')
elif validated_data.get('object_ansible_id') or validated_data.get('resource'):
if validated_data.get('object_ansible_id'):
field_name = 'object_ansible_id'
obj = self.get_by_ansible_id(validated_data.get('object_ansible_id'), requesting_user, for_field='object_ansible_id')
else:
field_name = 'resource'
resource = validated_data['resource']
obj = self.get_obj_from_resource(resource, requesting_user, for_field='resource')
if permission_registry.content_type_model.objects.get_for_model(obj) != role_definition.content_type:
raise ValidationError(
{
'object_ansible_id': _('Object type of %(model_name)s does not match role type of %(role_definition)s')
field_name: _('Object type of %(model_name)s does not match role type of %(role_definition)s')
% {'model_name': obj._meta.model_name, 'role_definition': role_definition.content_type.model}
}
)
Expand Down Expand Up @@ -300,19 +351,26 @@ def _get_summary_fields(self, obj) -> dict[str, dict]:
return summary_fields


ASSIGNMENT_FIELDS = ImmutableCommonModelSerializer.Meta.fields + ['content_type', 'object_id', 'object_ansible_id', 'role_definition']
ASSIGNMENT_FIELDS = ImmutableCommonModelSerializer.Meta.fields + ['content_type', 'object_id', 'object_ansible_id', 'resource', 'role_definition']


class RoleUserAssignmentSerializer(BaseAssignmentSerializer):
actor_field = 'user'
user_ansible_id = serializers.UUIDField(
required=False,
help_text=_('Deprecated, use the user_resource field instead.'),
)
user_resource = serializers.SlugRelatedField(
slug_field='ansible_id',
queryset=Resource.objects.filter(content_type__model='user'),
allow_null=True,
required=False,
help_text=_('Resource id of the user who will receive permissions from this assignment. Alternative to user field.'),
)

class Meta:
model = RoleUserAssignment
fields = ASSIGNMENT_FIELDS + ['user', 'user_ansible_id']
fields = ASSIGNMENT_FIELDS + ['user', 'user_ansible_id', 'user_resource']

def get_actor_queryset(self, requesting_user):
return visible_users(requesting_user)
Expand All @@ -321,13 +379,20 @@ def get_actor_queryset(self, requesting_user):
class RoleTeamAssignmentSerializer(BaseAssignmentSerializer):
actor_field = 'team'
team_ansible_id = serializers.UUIDField(
required=False,
help_text=_('Deprecated, use the team_resource field instead.'),
)
team_resource = serializers.SlugRelatedField(
slug_field='ansible_id',
queryset=Resource.objects.filter(content_type__model='team'),
allow_null=True,
required=False,
help_text=_('Resource id of the team who will receive permissions from this assignment. Alternative to team field.'),
)

class Meta:
model = RoleTeamAssignment
fields = ASSIGNMENT_FIELDS + ['team', 'team_ansible_id']
fields = ASSIGNMENT_FIELDS + ['team', 'team_ansible_id', 'team_resource']

def get_actor_queryset(self, requesting_user):
return permission_registry.team_model.access_qs(requesting_user)
40 changes: 40 additions & 0 deletions ansible_base/rbac/migrations/0002_resource_link.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Generated by Django 4.2.11 on 2024-05-06 20:27

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('dab_resource_registry', '0004_remove_resourcetype_migrated'),
('dab_rbac', '0001_initial'),
]

operations = [
migrations.AddField(
model_name='roleteamassignment',
name='resource',
field=models.ForeignKey(help_text='A UUID identifier of the object this role assignment applies to, null value may indicate system-wide role or a model that has no ansible_id', null=True, on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_object_assignments', to='dab_resource_registry.resource'),
),
migrations.AddField(
model_name='roleteamassignment',
name='user_resource',
field=models.ForeignKey(help_text='A UUID identifier of the team given permission defined by this assignment', null=True, on_delete=django.db.models.deletion.CASCADE, related_name='role_team_assignments', to='dab_resource_registry.resource'),
),
migrations.AddField(
model_name='roleuserassignment',
name='resource',
field=models.ForeignKey(help_text='A UUID identifier of the object this role assignment applies to, null value may indicate system-wide role or a model that has no ansible_id', null=True, on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_object_assignments', to='dab_resource_registry.resource'),
),
migrations.AddField(
model_name='roleuserassignment',
name='user_resource',
field=models.ForeignKey(help_text='A UUID identifier of the user given permission defined by this assignment', null=True, on_delete=django.db.models.deletion.CASCADE, related_name='role_user_assignments', to='dab_resource_registry.resource'),
),
migrations.AddField(
model_name='objectrole',
name='resource',
field=models.ForeignKey(help_text='A UUID identifier of the object this role assignment applies to, null value may indicate system-wide role or a model that has no ansible_id', null=True, on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_object_assignments', to='dab_resource_registry.resource'),
),
]
Loading
Loading