From 5fe04f33158da9c22dfeae03438138a50bfb4769 Mon Sep 17 00:00:00 2001 From: Matt Holder Date: Fri, 3 Jan 2025 10:34:01 -0500 Subject: [PATCH 1/7] validate operation/value pairings --- rbac/management/role/serializer.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/rbac/management/role/serializer.py b/rbac/management/role/serializer.py index cb4bf515a..f1b15847e 100644 --- a/rbac/management/role/serializer.py +++ b/rbac/management/role/serializer.py @@ -49,6 +49,18 @@ def validate_attributeFilter(self, value): message = f"attributeFilter operation must be one of {ALLOWED_OPERATIONS}" error = {key: [_(message)]} raise serializers.ValidationError(error) + else: + values = value.get("value") + if type(values) is str and op == "in": + key = "format" + message = "attributeFilter operation 'in' expects a List value" + error = {key: [_(message)]} + raise serializers.ValidationError(error) + elif type(values) is list and op == "equals": + key = "format" + message = "attributeFilter operation 'equals' expects a String value" + error = {key: [_(message)]} + raise serializers.ValidationError(error) return value class Meta: From 6292dd586c7383c9b9fd360afde898157c184005 Mon Sep 17 00:00:00 2001 From: Matt Holder Date: Fri, 3 Jan 2025 10:34:17 -0500 Subject: [PATCH 2/7] test new validations --- tests/management/role/test_view.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/management/role/test_view.py b/tests/management/role/test_view.py index c0c10c1ea..011b4eea9 100644 --- a/tests/management/role/test_view.py +++ b/tests/management/role/test_view.py @@ -1773,6 +1773,33 @@ def test_create_duplicate_role_fail(self): self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertEqual(response.data.get("errors")[0].get("detail"), f"Role '{name}' already exists for a tenant.") + def test_create_role_with_invalid_equals_operation(self): + """Test that we cannot create a role when a String value is paired with the 'in' operation.""" + role_name = "roleFail" + access_data = [ + { + "permission": "someApp:*:*", + "resourceDefinitions": [ + {"attributeFilter": {"key": "keyA.id", "operation": "equals", "value": ["value1", "value2"]}} + ], + } + ] + response = self.create_role(role_name, in_access_data=access_data) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + def test_create_role_with_invalid_in_operation(self): + """Test that we cannot create a role when a String value is paired with the 'in' operation.""" + role_name = "roleFail" + access_data = [ + { + "permission": "someApp:*:*", + "resourceDefinitions": [ + {"attributeFilter": {"key": "keyA.id", "operation": "in", "value": "valueA"}} + ], + } + ] + response = self.create_role(role_name, in_access_data=access_data) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) class RoleViewNonAdminTests(IdentityRequest): """Test the role view for nonadmin user.""" From de6b35379fa71a130c3c5491a82bbf04ae331900 Mon Sep 17 00:00:00 2001 From: Matt Holder Date: Fri, 3 Jan 2025 11:34:03 -0500 Subject: [PATCH 3/7] linting --- tests/management/role/test_view.py | 677 +++++++++++++++++++++++------ 1 file changed, 547 insertions(+), 130 deletions(-) diff --git a/tests/management/role/test_view.py b/tests/management/role/test_view.py index 011b4eea9..e8721c471 100644 --- a/tests/management/role/test_view.py +++ b/tests/management/role/test_view.py @@ -50,7 +50,12 @@ def normalize_and_sort(json_obj): for key, value in json_obj.items(): if isinstance(value, list): - sorted_list = sorted([json.dumps(item, sort_keys=True, cls=DjangoJSONEncoder) for item in value]) + sorted_list = sorted( + [ + json.dumps(item, sort_keys=True, cls=DjangoJSONEncoder) + for item in value + ] + ) json_obj[key] = [json.loads(item) for item in sorted_list] return json_obj @@ -59,7 +64,9 @@ def normalize_and_sort(json_obj): def replication_event_for_v1_role(v1_role_uuid, default_workspace_id): """Create a replication event for a v1 role.""" return { - "relations_to_add": relation_api_tuples_for_v1_role(v1_role_uuid, default_workspace_id), + "relations_to_add": relation_api_tuples_for_v1_role( + v1_role_uuid, default_workspace_id + ), "relations_to_remove": [], } @@ -70,24 +77,41 @@ def relation_api_tuples_for_v1_role(v1_role_uuid, default_workspace_id): mappings = BindingMapping.objects.filter(role=role_id).all() relations = [] for role_binding in [m.get_role_binding() for m in mappings]: - relation_tuple = relation_api_tuple("role_binding", role_binding.id, "role", "role", role_binding.role.id) + relation_tuple = relation_api_tuple( + "role_binding", role_binding.id, "role", "role", role_binding.role.id + ) relations.append(relation_tuple) for permission in role_binding.role.permissions: - relation_tuple = relation_api_tuple("role", role_binding.role.id, permission, "principal", "*") + relation_tuple = relation_api_tuple( + "role", role_binding.role.id, permission, "principal", "*" + ) relations.append(relation_tuple) if "app_all_read" in role_binding.role.permissions: relation_tuple = relation_api_tuple( - "workspace", default_workspace_id, "binding", "role_binding", role_binding.id + "workspace", + default_workspace_id, + "binding", + "role_binding", + role_binding.id, ) relations.append(relation_tuple) else: - relation_tuple = relation_api_tuple("keya/id", "valueA", "binding", "role_binding", role_binding.id) + relation_tuple = relation_api_tuple( + "keya/id", "valueA", "binding", "role_binding", role_binding.id + ) relations.append(relation_tuple) return relations -def relation_api_tuple(resource_type, resource_id, relation, subject_type, subject_id, subject_relation=None): +def relation_api_tuple( + resource_type, + resource_id, + relation, + subject_type, + subject_id, + subject_relation=None, +): """Helper function for creating a relation tuple in json.""" return { "resource": relation_api_resource(resource_type, resource_id), @@ -122,11 +146,23 @@ class RoleViewsetTests(IdentityRequest): def setUp(self): """Set up the role viewset tests.""" super().setUp() - sys_role_config = {"name": "system_role", "display_name": "system_display", "system": True} + sys_role_config = { + "name": "system_role", + "display_name": "system_display", + "system": True, + } - sys_pub_role_config = {"name": "system_public_role", "display_name": "system_public_display", "system": True} + sys_pub_role_config = { + "name": "system_public_role", + "display_name": "system_public_display", + "system": True, + } - def_role_config = {"name": "default_role", "display_name": "default_display", "platform_default": True} + def_role_config = { + "name": "default_role", + "display_name": "default_display", + "platform_default": True, + } admin_def_role_config = { "name": "admin_default_role", @@ -160,17 +196,23 @@ def setUp(self): "external_tenant", } - self.principal = Principal(username=self.user_data["username"], tenant=self.tenant) + self.principal = Principal( + username=self.user_data["username"], tenant=self.tenant + ) self.principal.save() self.policy = Policy.objects.create(name="policyA", tenant=self.tenant) - self.group = Group(name="groupA", description="groupA description", tenant=self.tenant) + self.group = Group( + name="groupA", description="groupA description", tenant=self.tenant + ) self.group.save() self.group.principals.add(self.principal) self.group.policies.add(self.policy) self.group.save() self.policyTwo = Policy.objects.create(name="policyB", tenant=self.tenant) - self.groupTwo = Group(name="groupB", description="groupB description", tenant=self.tenant) + self.groupTwo = Group( + name="groupB", description="groupB description", tenant=self.tenant + ) self.groupTwo.save() self.groupTwo.principals.add(self.principal) self.groupTwo.policies.add(self.policyTwo) @@ -179,7 +221,9 @@ def setUp(self): self.adminRole = Role(**admin_def_role_config, tenant=self.tenant) self.adminRole.save() - self.platformAdminRole = Role(**platform_admin_def_role_config, tenant=self.tenant) + self.platformAdminRole = Role( + **platform_admin_def_role_config, tenant=self.tenant + ) self.platformAdminRole.save() self.public_tenant = Tenant.objects.get(tenant_name="public") @@ -193,22 +237,42 @@ def setUp(self): self.defRole.save() self.ext_tenant = ExtTenant.objects.create(name="foo") - self.ext_role_relation = ExtRoleRelation.objects.create(role=self.defRole, ext_tenant=self.ext_tenant) + self.ext_role_relation = ExtRoleRelation.objects.create( + role=self.defRole, ext_tenant=self.ext_tenant + ) - self.policy.roles.add(self.defRole, self.sysRole, self.adminRole, self.platformAdminRole, self.sysPubRole) + self.policy.roles.add( + self.defRole, + self.sysRole, + self.adminRole, + self.platformAdminRole, + self.sysPubRole, + ) self.policy.save() self.policyTwo.roles.add(self.platformAdminRole) self.policyTwo.save() - self.permission = Permission.objects.create(permission="app:*:*", tenant=self.tenant) - self.permission2 = Permission.objects.create(permission="app2:*:*", tenant=self.tenant) - self.permission3 = Permission.objects.create(permission="app:*:read", tenant=self.tenant) + self.permission = Permission.objects.create( + permission="app:*:*", tenant=self.tenant + ) + self.permission2 = Permission.objects.create( + permission="app2:*:*", tenant=self.tenant + ) + self.permission3 = Permission.objects.create( + permission="app:*:read", tenant=self.tenant + ) self.permission.permissions.add(self.permission3) - self.access = Access.objects.create(permission=self.permission, role=self.defRole, tenant=self.tenant) - self.access2 = Access.objects.create(permission=self.permission2, role=self.defRole, tenant=self.tenant) + self.access = Access.objects.create( + permission=self.permission, role=self.defRole, tenant=self.tenant + ) + self.access2 = Access.objects.create( + permission=self.permission2, role=self.defRole, tenant=self.tenant + ) - self.access3 = Access.objects.create(permission=self.permission2, role=self.sysRole, tenant=self.tenant) + self.access3 = Access.objects.create( + permission=self.permission2, role=self.sysRole, tenant=self.tenant + ) Permission.objects.create(permission="cost-management:*:*", tenant=self.tenant) self.root_workspace = Workspace.objects.create( name="root", @@ -247,14 +311,24 @@ def create_role(self, role_name, role_display="", in_access_data=None): { "permission": "app:*:*", "resourceDefinitions": [ - {"attributeFilter": {"key": "key1.id", "operation": "equal", "value": "value1"}} + { + "attributeFilter": { + "key": "key1.id", + "operation": "equal", + "value": "value1", + } + } ], }, {"permission": "app:*:read", "resourceDefinitions": []}, ] if in_access_data: access_data = in_access_data - test_data = {"name": role_name, "display_name": role_display, "access": access_data} + test_data = { + "name": role_name, + "display_name": role_display, + "access": access_data, + } # create a role client = APIClient() @@ -272,7 +346,9 @@ def create_group(self, group_name): def create_policy(self, policy_name, group, roles): """Create a policy.""" # create a policy - policy = Policy.objects.create(name=policy_name, tenant=self.tenant, system=True) + policy = Policy.objects.create( + name=policy_name, tenant=self.tenant, system=True + ) for role in Role.objects.filter(uuid__in=roles): policy.roles.add(role) policy.group = Group.objects.get(uuid=group) @@ -295,7 +371,13 @@ def test_create_role_success(self, send_kafka_message): { "permission": "app:*:*", "resourceDefinitions": [ - {"attributeFilter": {"key": "keyA.id", "operation": "equal", "value": "valueA"}} + { + "attributeFilter": { + "key": "keyA.id", + "operation": "equal", + "value": "valueA", + } + } ], }, {"permission": "app:*:read", "resourceDefinitions": []}, @@ -322,7 +404,9 @@ def test_create_role_success(self, send_kafka_message): self.assertEqual(al_dict_action, "create") # test that we can retrieve the role - url = reverse("v1_management:role-detail", kwargs={"uuid": response.data.get("uuid")}) + url = reverse( + "v1_management:role-detail", kwargs={"uuid": response.data.get("uuid")} + ) client = APIClient() response = client.get(url, **self.headers) uuid = response.data.get("uuid") @@ -365,7 +449,9 @@ def test_create_role_success(self, send_kafka_message): ) @override_settings(V2_MIGRATION_RESOURCE_EXCLUDE_LIST=["rbac:workspace"]) - @patch("management.relation_replicator.outbox_replicator.OutboxReplicator._save_replication_event") + @patch( + "management.relation_replicator.outbox_replicator.OutboxReplicator._save_replication_event" + ) def test_role_replication_exluded_resource(self, mock_method): """Test that excluded resources do not replicate via dual write.""" # Set up @@ -374,7 +460,13 @@ def test_role_replication_exluded_resource(self, mock_method): { "permission": "app:*:*", "resourceDefinitions": [ - {"attributeFilter": {"key": "group.id", "operation": "equal", "value": "valueA"}} + { + "attributeFilter": { + "key": "group.id", + "operation": "equal", + "value": "valueA", + } + } ], }, {"permission": "app:*:read", "resourceDefinitions": []}, @@ -388,12 +480,18 @@ def test_role_replication_exluded_resource(self, mock_method): to_add = actual_sorted["relations_to_add"] self.assertEqual([], actual_sorted["relations_to_remove"]) - self.assertEqual(3, len(to_add), "too many relations (should not add relations for excluded resource)") + self.assertEqual( + 3, + len(to_add), + "too many relations (should not add relations for excluded resource)", + ) - role_binding = find_in_list(to_add, lambda r: r["resource"]["type"]["name"] == "role_binding")["resource"][ - "id" - ] - workspace = find_in_list(to_add, lambda r: r["resource"]["type"]["name"] == "workspace") + role_binding = find_in_list( + to_add, lambda r: r["resource"]["type"]["name"] == "role_binding" + )["resource"]["id"] + workspace = find_in_list( + to_add, lambda r: r["resource"]["type"]["name"] == "workspace" + ) self.assertEquals( role_binding, @@ -403,9 +501,13 @@ def test_role_replication_exluded_resource(self, mock_method): role = find_in_list(to_add, lambda r: r["resource"]["type"]["name"] == "role") - self.assertEquals(role["relation"], "app_all_read", "expected workspace permission") + self.assertEquals( + role["relation"], "app_all_read", "expected workspace permission" + ) - @patch("management.relation_replicator.outbox_replicator.OutboxReplicator._save_replication_event") + @patch( + "management.relation_replicator.outbox_replicator.OutboxReplicator._save_replication_event" + ) def test_create_role_with_display_success(self, mock_method): """Test that we can create a role.""" role_name = "roleD" @@ -414,15 +516,25 @@ def test_create_role_with_display_success(self, mock_method): { "permission": "app:*:*", "resourceDefinitions": [ - {"attributeFilter": {"key": "keyA.id", "operation": "equal", "value": "valueA"}} + { + "attributeFilter": { + "key": "keyA.id", + "operation": "equal", + "value": "valueA", + } + } ], }, {"permission": "app:*:read", "resourceDefinitions": []}, ] - response = self.create_role(role_name, role_display=role_display, in_access_data=access_data) + response = self.create_role( + role_name, role_display=role_display, in_access_data=access_data + ) self.assertEqual(response.status_code, status.HTTP_201_CREATED) - replication_event = replication_event_for_v1_role(response.data.get("uuid"), str(self.default_workspace.id)) + replication_event = replication_event_for_v1_role( + response.data.get("uuid"), str(self.default_workspace.id) + ) mock_method.assert_called_once() actual_call_arg = mock_method.call_args[0][0] @@ -431,7 +543,9 @@ def test_create_role_with_display_success(self, mock_method): self.assertEqual(expected_sorted, actual_sorted) # test that we can retrieve the role - url = reverse("v1_management:role-detail", kwargs={"uuid": response.data.get("uuid")}) + url = reverse( + "v1_management:role-detail", kwargs={"uuid": response.data.get("uuid")} + ) client = APIClient() response = client.get(url, **self.headers) @@ -450,7 +564,13 @@ def test_create_role_without_required_permission(self): { "permission": self.permission.permission, "resourceDefinitions": [ - {"attributeFilter": {"key": "keyA.id", "operation": "equal", "value": "valueA"}} + { + "attributeFilter": { + "key": "keyA.id", + "operation": "equal", + "value": "valueA", + } + } ], } ] @@ -470,14 +590,20 @@ def test_create_role_invalid(self): def test_create_role_invalid_permission(self): """Test that creating a role with invalid access permission returns an error.""" - test_data = {"name": "role1", "access": [{"permission": "foo:bar", "resourceDefinitions": []}]} + test_data = { + "name": "role1", + "access": [{"permission": "foo:bar", "resourceDefinitions": []}], + } client = APIClient() response = client.post(URL, test_data, format="json", **self.headers) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) def test_create_role_empty_application_in_permission(self): """Test that creating a role with empty application in access permission returns an error.""" - test_data = {"name": "role1", "access": [{"permission": ":foo:bar", "resourceDefinitions": []}]} + test_data = { + "name": "role1", + "access": [{"permission": ":foo:bar", "resourceDefinitions": []}], + } client = APIClient() response = client.post(URL, test_data, format="json", **self.headers) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) @@ -489,7 +615,13 @@ def test_create_role_allow_list(self): { "permission": "cost-management:*:*", "resourceDefinitions": [ - {"attributeFilter": {"key": "keyA.id", "operation": "equal", "value": "valueA"}} + { + "attributeFilter": { + "key": "keyA.id", + "operation": "equal", + "value": "valueA", + } + } ], } ] @@ -497,7 +629,9 @@ def test_create_role_allow_list(self): self.assertEqual(response.status_code, status.HTTP_201_CREATED) # test that we can retrieve the role - url = reverse("v1_management:role-detail", kwargs={"uuid": response.data.get("uuid")}) + url = reverse( + "v1_management:role-detail", kwargs={"uuid": response.data.get("uuid")} + ) client = APIClient() response = client.get(url, **self.headers) @@ -514,7 +648,13 @@ def test_create_role_allow_list_fail(self): { "permission": "someApp:*:*", "resourceDefinitions": [ - {"attributeFilter": {"key": "keyA.id", "operation": "equal", "value": "valueA"}} + { + "attributeFilter": { + "key": "keyA.id", + "operation": "equal", + "value": "valueA", + } + } ], } ] @@ -527,7 +667,13 @@ def test_create_role_appfilter_structure_fail(self): access_data = [ { "permission": "cost-management:*:*", - "resourceDefinitions": {"attributeFilter": {"key": "keyA.id", "operation": "in", "foo": "valueA"}}, + "resourceDefinitions": { + "attributeFilter": { + "key": "keyA.id", + "operation": "in", + "foo": "valueA", + } + }, } ] response = self.create_role(role_name, in_access_data=access_data) @@ -540,7 +686,15 @@ def test_create_role_appfilter_fields_fail(self): access_data = [ { "permission": "cost-management:*:*", - "resourceDefinitions": [{"attributeFilter": {"key": "keyA.id", "operation": "in", "foo": "valueA"}}], + "resourceDefinitions": [ + { + "attributeFilter": { + "key": "keyA.id", + "operation": "in", + "foo": "valueA", + } + } + ], } ] response = self.create_role(role_name, in_access_data=access_data) @@ -553,7 +707,13 @@ def test_create_role_appfilter_operation_fail(self): { "permission": "cost-management:*:*", "resourceDefinitions": [ - {"attributeFilter": {"key": "keyA.id", "operation": "boop", "value": "valueA"}} + { + "attributeFilter": { + "key": "keyA.id", + "operation": "boop", + "value": "valueA", + } + } ], } ] @@ -568,13 +728,22 @@ def test_create_role_permission_does_not_exist_fail(self): { "permission": permission, "resourceDefinitions": [ - {"attributeFilter": {"key": "keyA.id", "operation": "equal", "value": "valueA"}} + { + "attributeFilter": { + "key": "keyA.id", + "operation": "equal", + "value": "valueA", + } + } ], } ] response = self.create_role(role_name, in_access_data=access_data) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - self.assertEqual(response.data.get("errors")[0].get("detail"), f"Permission does not exist: {permission}") + self.assertEqual( + response.data.get("errors")[0].get("detail"), + f"Permission does not exist: {permission}", + ) def test_create_role_fail_with_access_not_list(self): """Test that we cannot create a role for a non-allow_listed app.""" @@ -803,7 +972,9 @@ def test_get_role_by_display_name_invalid_criteria(self): def test_get_role_by_exact_display_name_match(self): """Test that getting roles by display_name returns exact match.""" - url = "{}?display_name={}&name_match={}".format(URL, self.sysRole.display_name, "exact") + url = "{}?display_name={}&name_match={}".format( + URL, self.sysRole.display_name, "exact" + ) client = APIClient() response = client.get(url, **self.headers) self.assertEqual(response.data.get("meta").get("count"), 1) @@ -818,7 +989,9 @@ def test_get_role_by_exact_display_name_no_match(self): self.assertEqual(response.data.get("meta").get("count"), 0) @patch("management.principal.proxy.PrincipalProxy.request_filtered_principals") - def test_list_role_with_groups_in_fields_with_username_param_for_non_org_admin(self, mock_request): + def test_list_role_with_groups_in_fields_with_username_param_for_non_org_admin( + self, mock_request + ): """ Test that we can read a list of roles and the groups_in fields is set correctly for a request with 'username' param for non org admin principal. @@ -870,8 +1043,12 @@ def test_list_role_with_groups_in_fields_with_username_param_for_non_org_admin(s } # add principal to the created group - principal_response = self.add_principal_to_group(custom_group_uuid, john.username) - self.assertEqual(principal_response.status_code, status.HTTP_200_OK, principal_response) + principal_response = self.add_principal_to_group( + custom_group_uuid, john.username + ) + self.assertEqual( + principal_response.status_code, status.HTTP_200_OK, principal_response + ) # add groups_in and groups_in_count fields into display fields groups_in_count = "groups_in_count" @@ -900,7 +1077,14 @@ def test_list_role_with_groups_in_fields_with_username_param_for_non_org_admin(s self.assertIsNotNone(iterRole.get(groups_in)[0]["description"]) # make sure created role exists in result set and has correct values - created_role = next((iterRole for iterRole in response_data if iterRole["name"] == custom_role_name), None) + created_role = next( + ( + iterRole + for iterRole in response_data + if iterRole["name"] == custom_role_name + ), + None, + ) self.assertIsNotNone(created_role) self.assertEqual(created_role[groups_in_count], 1) self.assertEqual(created_role[groups_in][0]["name"], custom_group_name) @@ -914,7 +1098,9 @@ def test_list_role_with_groups_in_fields_with_username_param_for_non_org_admin(s self.assertIn(group["name"], groups) @patch("management.principal.proxy.PrincipalProxy.request_filtered_principals") - def test_list_role_with_groups_in_fields_with_username_param_for_org_admin(self, mock_request): + def test_list_role_with_groups_in_fields_with_username_param_for_org_admin( + self, mock_request + ): """ Test that we can read a list of roles and the groups_in fields is set correctly for a request with 'username' param for org admin principal. @@ -966,8 +1152,12 @@ def test_list_role_with_groups_in_fields_with_username_param_for_org_admin(self, } # add principal to the created group - principal_response = self.add_principal_to_group(custom_group_uuid, mary.username) - self.assertEqual(principal_response.status_code, status.HTTP_200_OK, principal_response) + principal_response = self.add_principal_to_group( + custom_group_uuid, mary.username + ) + self.assertEqual( + principal_response.status_code, status.HTTP_200_OK, principal_response + ) # add groups_in and groups_in_count fields into display fields groups_in_count = "groups_in_count" @@ -996,7 +1186,14 @@ def test_list_role_with_groups_in_fields_with_username_param_for_org_admin(self, self.assertIsNotNone(iterRole.get(groups_in)[0]["description"]) # make sure created role exists in result set and has correct values - created_role = next((iterRole for iterRole in response_data if iterRole["name"] == custom_role_name), None) + created_role = next( + ( + iterRole + for iterRole in response_data + if iterRole["name"] == custom_role_name + ), + None, + ) self.assertIsNotNone(created_role) self.assertEqual(created_role[groups_in_count], 1) self.assertEqual(created_role[groups_in][0]["name"], custom_group_name) @@ -1005,13 +1202,19 @@ def test_list_role_with_groups_in_fields_with_username_param_for_org_admin(self, # * custom group 'NewGroupForJohn' or # * 'Default access' group # * 'Default admin access' group - groups = [default_access_group_name, default_admin_access_group_name, custom_group_name] + groups = [ + default_access_group_name, + default_admin_access_group_name, + custom_group_name, + ] for role in response_data: for group in role[groups_in]: self.assertIn(group["name"], groups) @patch("management.principal.proxy.PrincipalProxy.request_filtered_principals") - def test_list_role_with_groups_in_fields_for_principal_scope_success(self, mock_request): + def test_list_role_with_groups_in_fields_for_principal_scope_success( + self, mock_request + ): """ Test that we can read a list of roles and the groups_in fields is set correctly for a principal scoped request. @@ -1048,8 +1251,12 @@ def test_list_role_with_groups_in_fields_for_principal_scope_success(self, mock_ self.create_policy(policy_name, group_uuid, [role_uuid]) # add user principal to the created group - principal_response = self.add_principal_to_group(group_uuid, self.principal.username) - self.assertEqual(principal_response.status_code, status.HTTP_200_OK, principal_response) + principal_response = self.add_principal_to_group( + group_uuid, self.principal.username + ) + self.assertEqual( + principal_response.status_code, status.HTTP_200_OK, principal_response + ) # hit /roles?groups_in, group should appear in groups_in field_1 = "groups_in_count" @@ -1058,7 +1265,9 @@ def test_list_role_with_groups_in_fields_for_principal_scope_success(self, mock_ new_display_fields.add(field_1) new_display_fields.add(field_2) - url = "{}?add_fields={},{}&username={}".format(URL, field_1, field_2, self.principal.username) + url = "{}?add_fields={},{}&username={}".format( + URL, field_1, field_2, self.principal.username + ) client = APIClient() response = client.get(url, **self.headers) @@ -1078,13 +1287,23 @@ def test_list_role_with_groups_in_fields_for_principal_scope_success(self, mock_ self.assertIsNotNone(iterRole.get("groups_in")[0]["description"]) # make sure created role exists in result set and has correct values - created_role = next((iterRole for iterRole in response_data if iterRole["name"] == role_name), None) + created_role = next( + (iterRole for iterRole in response_data if iterRole["name"] == role_name), + None, + ) self.assertIsNotNone(created_role) self.assertEqual(created_role["groups_in_count"], 1) self.assertEqual(created_role["groups_in"][0]["name"], group_name) # make sure a default role exists in result set and has correct values - default_role = next((iterRole for iterRole in response_data if iterRole["name"] == self.defRole.name), None) + default_role = next( + ( + iterRole + for iterRole in response_data + if iterRole["name"] == self.defRole.name + ), + None, + ) self.assertIsNotNone(default_role) self.assertEqual(default_role["groups_in_count"], 1) self.assertEqual(default_role["groups_in"][0]["name"], self.group.name) @@ -1119,13 +1338,27 @@ def test_list_role_with_groups_in_fields_for_admin_scope_success(self): self.assertIsNotNone(iterRole.get("groups_in")[0]["description"]) # make sure a default role exists in result set and has correct values - default_role = next((iterRole for iterRole in response_data if iterRole["name"] == self.defRole.name), None) + default_role = next( + ( + iterRole + for iterRole in response_data + if iterRole["name"] == self.defRole.name + ), + None, + ) self.assertIsNotNone(default_role) self.assertEqual(default_role["groups_in_count"], 1) self.assertEqual(default_role["groups_in"][0]["name"], self.group.name) # make sure an admin role exists in result set and has correct values - admin_role = next((iterRole for iterRole in response_data if iterRole["name"] == self.adminRole.name), None) + admin_role = next( + ( + iterRole + for iterRole in response_data + if iterRole["name"] == self.adminRole.name + ), + None, + ) self.assertIsNotNone(admin_role) self.assertEqual(admin_role["groups_in_count"], 1) self.assertEqual(admin_role["groups_in"][0]["name"], self.group.name) @@ -1180,7 +1413,9 @@ def test_list_role_with_additional_fields_username_success(self, mock_request): ], } - url = "{}?add_fields={},{}&username={}".format(URL, field_1, field_2, self.principal.username) + url = "{}?add_fields={},{}&username={}".format( + URL, field_1, field_2, self.principal.username + ) client = APIClient() response = client.get(url, **self.headers) @@ -1265,7 +1500,11 @@ def test_patch_role_success(self): client = APIClient() response = client.patch( url, - {"name": updated_name, "display_name": updated_name, "description": updated_description}, + { + "name": updated_name, + "display_name": updated_name, + "description": updated_description, + }, format="json", **self.headers, ) @@ -1305,7 +1544,12 @@ def test_patch_role_failure(self): client = APIClient() response = client.patch( url, - {"name": updated_name, "display_name": updated_name, "description": updated_description, "foo": "bar"}, + { + "name": updated_name, + "display_name": updated_name, + "description": updated_description, + "foo": "bar", + }, format="json", **self.headers, ) @@ -1350,7 +1594,9 @@ def test_update_role_success(self, send_kafka_message): self.assertIsNotNone(response.data.get("uuid")) self.assertEqual(updated_name, response.data.get("name")) - self.assertEqual("cost-management:*:*", response.data.get("access")[0]["permission"]) + self.assertEqual( + "cost-management:*:*", response.data.get("access")[0]["permission"] + ) # test whether newly updated (post) role is added correctly within audit log database al_url = "/api/rbac/v1/auditlogs/" @@ -1398,7 +1644,11 @@ def test_update_role_invalid(self): client = APIClient() response = client.put( url, - {"name": "updated_name", "display_name": "updated_name", "description": "updated_description"}, + { + "name": "updated_name", + "display_name": "updated_name", + "description": "updated_description", + }, format="json", **self.headers, ) @@ -1412,7 +1662,13 @@ def test_update_role_invalid_permission(self): { "permission": "cost-management:*:*", "resourceDefinitions": [ - {"attributeFilter": {"key": "keyA.id", "operation": "equal", "value": "valueA"}} + { + "attributeFilter": { + "key": "keyA.id", + "operation": "equal", + "value": "valueA", + } + } ], } ] @@ -1429,7 +1685,9 @@ def test_update_role_invalid_permission(self): response = client.put(url, test_data, format="json", **self.headers) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - @patch("management.relation_replicator.outbox_replicator.OutboxReplicator._save_replication_event") + @patch( + "management.relation_replicator.outbox_replicator.OutboxReplicator._save_replication_event" + ) def test_update_role(self, mock_method): """Test that updating a role with an invalid permission returns an error.""" # Set up @@ -1438,7 +1696,13 @@ def test_update_role(self, mock_method): { "permission": "app:*:*", "resourceDefinitions": [ - {"attributeFilter": {"key": "keyA.id", "operation": "equal", "value": "valueA"}} + { + "attributeFilter": { + "key": "keyA.id", + "operation": "equal", + "value": "valueA", + } + } ], }, {"permission": "app:*:read", "resourceDefinitions": []}, @@ -1448,7 +1712,13 @@ def test_update_role(self, mock_method): { "permission": "app:*:*", "resourceDefinitions": [ - {"attributeFilter": {"key": "keyA.id", "operation": "equal", "value": "valueA"}} + { + "attributeFilter": { + "key": "keyA.id", + "operation": "equal", + "value": "valueA", + } + } ], }, {"permission": "app:*:read", "resourceDefinitions": []}, @@ -1460,10 +1730,14 @@ def test_update_role(self, mock_method): test_data["access"] = new_access_data url = reverse("v1_management:role-detail", kwargs={"uuid": role_uuid}) client = APIClient() - current_relations = relation_api_tuples_for_v1_role(role_uuid, str(self.default_workspace.id)) + current_relations = relation_api_tuples_for_v1_role( + role_uuid, str(self.default_workspace.id) + ) response = client.put(url, test_data, format="json", **self.headers) - replication_event = replication_event_for_v1_role(response.data.get("uuid"), str(self.default_workspace.id)) + replication_event = replication_event_for_v1_role( + response.data.get("uuid"), str(self.default_workspace.id) + ) replication_event["relations_to_remove"] = current_relations actual_call_arg = mock_method.call_args[0][0] expected_sorted = normalize_and_sort(replication_event) @@ -1480,7 +1754,13 @@ def test_update_role_invalid_resource_defs_structure(self): { "permission": "cost-management:*:*", "resourceDefinitions": [ - {"attributeFilter": {"key": "keyA.id", "operation": "equal", "value": "valueA"}} + { + "attributeFilter": { + "key": "keyA.id", + "operation": "equal", + "value": "valueA", + } + } ], } ] @@ -1489,7 +1769,11 @@ def test_update_role_invalid_resource_defs_structure(self): role_uuid = response.data.get("uuid") test_data = response.data test_data.get("access")[0]["resourceDefinitions"] = { - "attributeFilter": {"key": "keyA.id", "operation": "equal", "value": "valueA"} + "attributeFilter": { + "key": "keyA.id", + "operation": "equal", + "value": "valueA", + } } # Test update failure @@ -1506,7 +1790,13 @@ def test_update_role_appfilter_operation_fail(self): { "permission": "cost-management:*:*", "resourceDefinitions": [ - {"attributeFilter": {"key": "keyA.id", "operation": "equal", "value": "valueA"}} + { + "attributeFilter": { + "key": "keyA.id", + "operation": "equal", + "value": "valueA", + } + } ], } ] @@ -1514,7 +1804,9 @@ def test_update_role_appfilter_operation_fail(self): self.assertEqual(response.status_code, status.HTTP_201_CREATED) role_uuid = response.data.get("uuid") test_data = response.data - test_data.get("access")[0]["resourceDefinitions"][0].get("attributeFilter")["operation"] = "foo" + test_data.get("access")[0]["resourceDefinitions"][0].get("attributeFilter")[ + "operation" + ] = "foo" # Test update failure url = reverse("v1_management:role-detail", kwargs={"uuid": role_uuid}) @@ -1522,7 +1814,8 @@ def test_update_role_appfilter_operation_fail(self): response = client.put(url, test_data, format="json", **self.headers) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertEqual( - str(response.data["errors"][0]["detail"]), "attributeFilter operation must be one of ['in', 'equal']" + str(response.data["errors"][0]["detail"]), + "attributeFilter operation must be one of ['in', 'equal']", ) def test_update_role_permission_does_not_exist_fail(self): @@ -1534,7 +1827,13 @@ def test_update_role_permission_does_not_exist_fail(self): { "permission": "cost-management:*:*", "resourceDefinitions": [ - {"attributeFilter": {"key": "keyA.id", "operation": "equal", "value": "valueA"}} + { + "attributeFilter": { + "key": "keyA.id", + "operation": "equal", + "value": "valueA", + } + } ], } ] @@ -1550,9 +1849,14 @@ def test_update_role_permission_does_not_exist_fail(self): client = APIClient() response = client.put(url, test_data, format="json", **self.headers) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - self.assertEqual(response.data.get("errors")[0].get("detail"), f"Permission does not exist: {permission}") + self.assertEqual( + response.data.get("errors")[0].get("detail"), + f"Permission does not exist: {permission}", + ) - @patch("management.relation_replicator.outbox_replicator.OutboxReplicator._save_replication_event") + @patch( + "management.relation_replicator.outbox_replicator.OutboxReplicator._save_replication_event" + ) def test_delete_role(self, mock_method): """Test that we can delete an existing role.""" role_name = "roleA" @@ -1560,7 +1864,13 @@ def test_delete_role(self, mock_method): { "permission": "app:*:*", "resourceDefinitions": [ - {"attributeFilter": {"key": "keyA.id", "operation": "equal", "value": "valueA"}} + { + "attributeFilter": { + "key": "keyA.id", + "operation": "equal", + "value": "valueA", + } + } ], }, {"permission": "app:*:read", "resourceDefinitions": []}, @@ -1571,7 +1881,9 @@ def test_delete_role(self, mock_method): url = reverse("v1_management:role-detail", kwargs={"uuid": role_uuid}) client = APIClient() replication_event = {"relations_to_add": [], "relations_to_remove": []} - current_relations = relation_api_tuples_for_v1_role(role_uuid, str(self.default_workspace.id)) + current_relations = relation_api_tuples_for_v1_role( + role_uuid, str(self.default_workspace.id) + ) replication_event["relations_to_remove"] = current_relations response = client.delete(url, **self.headers) actual_call_arg = mock_method.call_args[0][0] @@ -1652,11 +1964,17 @@ def test_delete_system_role(self): def test_delete_system_role_in_public_tenant(self): """Test that system roles in public tenant are protected from deletion""" - url = reverse("v1_management:role-detail", kwargs={"uuid": self.sysPubRole.uuid}) + url = reverse( + "v1_management:role-detail", kwargs={"uuid": self.sysPubRole.uuid} + ) client = APIClient() - existing_role = Role.objects.filter(uuid=self.sysPubRole.uuid, tenant=self.public_tenant).first() - self.assertIsNotNone(existing_role, "Public system role should exist before deletion test.") + existing_role = Role.objects.filter( + uuid=self.sysPubRole.uuid, tenant=self.public_tenant + ).first() + self.assertIsNotNone( + existing_role, "Public system role should exist before deletion test." + ) response = client.delete(url, **self.headers) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) @@ -1675,13 +1993,23 @@ def test_update_admin_default_role(self): "admin_default": True, "permission": "app:*:*", "resourceDefinitions": [ - {"attributeFilter": {"key": "key1.id", "operation": "equal", "value": "value1"}} + { + "attributeFilter": { + "key": "key1.id", + "operation": "equal", + "value": "value1", + } + } ], }, {"permission": "app:*:read", "resourceDefinitions": []}, ] - test_data = {"name": "role_name", "display_name": "role_display", "access": access_data} + test_data = { + "name": "role_name", + "display_name": "role_display", + "access": access_data, + } response = client.put(url, test_data, format="json", **self.headers) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) @@ -1771,7 +2099,10 @@ def test_create_duplicate_role_fail(self): # Try to create the same role again response = client.post(URL, test_data, format="json", **self.headers) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - self.assertEqual(response.data.get("errors")[0].get("detail"), f"Role '{name}' already exists for a tenant.") + self.assertEqual( + response.data.get("errors")[0].get("detail"), + f"Role '{name}' already exists for a tenant.", + ) def test_create_role_with_invalid_equals_operation(self): """Test that we cannot create a role when a String value is paired with the 'in' operation.""" @@ -1780,7 +2111,13 @@ def test_create_role_with_invalid_equals_operation(self): { "permission": "someApp:*:*", "resourceDefinitions": [ - {"attributeFilter": {"key": "keyA.id", "operation": "equals", "value": ["value1", "value2"]}} + { + "attributeFilter": { + "key": "keyA.id", + "operation": "equals", + "value": ["value1", "value2"], + } + } ], } ] @@ -1794,13 +2131,20 @@ def test_create_role_with_invalid_in_operation(self): { "permission": "someApp:*:*", "resourceDefinitions": [ - {"attributeFilter": {"key": "keyA.id", "operation": "in", "value": "valueA"}} + { + "attributeFilter": { + "key": "keyA.id", + "operation": "in", + "value": "valueA", + } + } ], } ] response = self.create_role(role_name, in_access_data=access_data) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + class RoleViewNonAdminTests(IdentityRequest): """Test the role view for nonadmin user.""" @@ -1815,7 +2159,9 @@ def setUp(self): "platform_default": True, "admin_default": False, } - self.platform_default_role = Role.objects.create(**platform_default_role_config, tenant=self.tenant) + self.platform_default_role = Role.objects.create( + **platform_default_role_config, tenant=self.tenant + ) admin_default_role_config = { "name": "admin_default_role", @@ -1824,7 +2170,9 @@ def setUp(self): "platform_default": False, "admin_default": True, } - self.admin_default_role = Role.objects.create(**admin_default_role_config, tenant=self.tenant) + self.admin_default_role = Role.objects.create( + **admin_default_role_config, tenant=self.tenant + ) not_system_role_config = { "name": "not_system_role", @@ -1834,7 +2182,9 @@ def setUp(self): "admin_default": False, } - self.not_system_role = Role.objects.create(**not_system_role_config, tenant=self.tenant) + self.not_system_role = Role.objects.create( + **not_system_role_config, tenant=self.tenant + ) self.system_roles_count = 2 self.non_system_roles_count = 1 @@ -1858,7 +2208,9 @@ def setUp(self): # Create 2 non org admin principals and 1 org admin # 1. user based principal - self.user_based_principal = Principal(username="user_based_principal", tenant=self.tenant) + self.user_based_principal = Principal( + username="user_based_principal", tenant=self.tenant + ) self.user_based_principal.save() customer_data = { @@ -1869,10 +2221,15 @@ def setUp(self): request_context_user_based_principal = self._create_request_context( customer_data=customer_data, - user_data={"username": self.user_based_principal.username, "email": "test@email.com"}, + user_data={ + "username": self.user_based_principal.username, + "email": "test@email.com", + }, is_org_admin=False, ) - self.headers_user_based_principal = request_context_user_based_principal["request"].META + self.headers_user_based_principal = request_context_user_based_principal[ + "request" + ].META # 2. service account based principal service_account_data = self._create_service_account_data() @@ -1889,7 +2246,9 @@ def setUp(self): service_account_data=service_account_data, is_org_admin=False, ) - self.headers_service_account_principal = request_context_service_account_principal["request"].META + self.headers_service_account_principal = ( + request_context_service_account_principal["request"].META + ) # 3 org admin principal in the tenant self.org_admin = Principal(username="org_admin", tenant=self.tenant) @@ -1903,7 +2262,9 @@ def setUp(self): self.headers_org_admin = request_context_org_admin["request"].META # Error messages - self.no_permission_err_message = "You do not have permission to perform this action." + self.no_permission_err_message = ( + "You do not have permission to perform this action." + ) def tearDown(self): """Tear down role viewset nonadmin tests.""" @@ -1921,7 +2282,11 @@ def _create_group_with_user_access_admin_role(tenant): """Create a group with a 'User Access administrator' role.""" # Create a group with 'User Access administrator' role rbac_admin_permission = Permission.objects.create( - application="rbac", permission="rbac:*:*", resource_type="*", verb="*", tenant=tenant + application="rbac", + permission="rbac:*:*", + resource_type="*", + verb="*", + tenant=tenant, ) user_access_administrator_role = Role.objects.create( admin_default=True, @@ -1931,7 +2296,11 @@ def _create_group_with_user_access_admin_role(tenant): system=True, tenant=tenant, ) - Access.objects.create(permission=rbac_admin_permission, role=user_access_administrator_role, tenant=tenant) + Access.objects.create( + permission=rbac_admin_permission, + role=user_access_administrator_role, + tenant=tenant, + ) rbac_admin_group = Group.objects.create( admin_default=False, description="A group with the 'User Access administrator' role", @@ -1941,7 +2310,10 @@ def _create_group_with_user_access_admin_role(tenant): tenant=tenant, ) policy_for_rbac_admin_group = Policy.objects.create( - group=rbac_admin_group, name="Policy for rbac_admin_group", system=True, tenant=tenant + group=rbac_admin_group, + name="Policy for rbac_admin_group", + system=True, + tenant=tenant, ) policy_for_rbac_admin_group.roles.add(user_access_administrator_role) return rbac_admin_group @@ -1955,11 +2327,15 @@ def test_list_roles_without_User_Access_Admin_fail(self): response = client.get(url, **self.headers_user_based_principal) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) - self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + self.assertEqual( + response.data.get("errors")[0].get("detail"), self.no_permission_err_message + ) response = client.get(url, **self.headers_service_account_principal) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) - self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + self.assertEqual( + response.data.get("errors")[0].get("detail"), self.no_permission_err_message + ) # Org Admin can list the roles response = client.get(url, **self.headers_org_admin) @@ -1972,8 +2348,12 @@ def test_list_roles_with_User_Access_Admin_success(self): Test that principal with 'User Access administrator' role can read a list of roles. """ # Create a group with 'User Access administrator' role and add principals we use in headers - group_with_UA_admin = self._create_group_with_user_access_admin_role(self.tenant) - group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + group_with_UA_admin = self._create_group_with_user_access_admin_role( + self.tenant + ) + group_with_UA_admin.principals.add( + self.user_based_principal, self.service_account_principal + ) client = APIClient() url = reverse("v1_management:role-list") @@ -2010,8 +2390,12 @@ def test_list_roles_with_User_Access_Admin_system_true_success(self): with '?system=true' in the request. """ # Create a group with 'User Access administrator' role and add principals we use in headers - group_with_UA_admin = self._create_group_with_user_access_admin_role(self.tenant) - group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + group_with_UA_admin = self._create_group_with_user_access_admin_role( + self.tenant + ) + group_with_UA_admin.principals.add( + self.user_based_principal, self.service_account_principal + ) client = APIClient() url = reverse("v1_management:role-list") + "?system=true" @@ -2035,11 +2419,15 @@ def test_list_roles_without_User_Access_Admin_system_false_fail(self): response = client.get(url, **self.headers_user_based_principal) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) - self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + self.assertEqual( + response.data.get("errors")[0].get("detail"), self.no_permission_err_message + ) response = client.get(url, **self.headers_service_account_principal) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) - self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + self.assertEqual( + response.data.get("errors")[0].get("detail"), self.no_permission_err_message + ) # Org Admin can list the roles response = client.get(url, **self.headers_org_admin) @@ -2052,8 +2440,12 @@ def test_list_roles_with_User_Access_Admin_system_false_success(self): with '?system=false' in the request. """ # Create a group with 'User Access administrator' role and add principals we use in headers - group_with_UA_admin = self._create_group_with_user_access_admin_role(self.tenant) - group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + group_with_UA_admin = self._create_group_with_user_access_admin_role( + self.tenant + ) + group_with_UA_admin.principals.add( + self.user_based_principal, self.service_account_principal + ) client = APIClient() url = reverse("v1_management:role-list") + "?system=false" @@ -2076,11 +2468,15 @@ def test_list_roles_without_User_Access_Admin_system_foo_fail(self): response = client.get(url, **self.headers_user_based_principal) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) - self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + self.assertEqual( + response.data.get("errors")[0].get("detail"), self.no_permission_err_message + ) response = client.get(url, **self.headers_service_account_principal) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) - self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + self.assertEqual( + response.data.get("errors")[0].get("detail"), self.no_permission_err_message + ) # Org Admin can list the roles response = client.get(url, **self.headers_org_admin) @@ -2094,8 +2490,12 @@ def test_list_roles_with_User_Access_Admin_system_foo_success(self): so the 'system' param is ignored in this case). """ # Create a group with 'User Access administrator' role and add principals we use in headers - group_with_UA_admin = self._create_group_with_user_access_admin_role(self.tenant) - group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + group_with_UA_admin = self._create_group_with_user_access_admin_role( + self.tenant + ) + group_with_UA_admin.principals.add( + self.user_based_principal, self.service_account_principal + ) client = APIClient() url = reverse("v1_management:role-list") @@ -2120,8 +2520,12 @@ def test_create_role_with_rbac_permission_fail(self): Test that it is not possible to create a custom role with RBAC permission. """ # Create a group with 'User Access administrator' role and add principals we use in headers - group_with_UA_admin = self._create_group_with_user_access_admin_role(self.tenant) - group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + group_with_UA_admin = self._create_group_with_user_access_admin_role( + self.tenant + ) + group_with_UA_admin.principals.add( + self.user_based_principal, self.service_account_principal + ) Permission.objects.create(permission="rbac:principal:read", tenant=self.tenant) permissions_count = len(Permission.objects.all()) @@ -2140,14 +2544,27 @@ def test_create_role_with_rbac_permission_fail(self): access_data = [{"permission": "rbac:*:*", "resourceDefinitions": []}] test_data = {"name": role_name, "access": access_data} - response = client.post(url, test_data, format="json", **self.headers_user_based_principal) + response = client.post( + url, test_data, format="json", **self.headers_user_based_principal + ) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - self.assertEqual(response.data.get("errors")[0].get("detail"), "Custom roles cannot be created for rbac") + self.assertEqual( + response.data.get("errors")[0].get("detail"), + "Custom roles cannot be created for rbac", + ) - response = client.post(url, test_data, format="json", **self.headers_service_account_principal) + response = client.post( + url, test_data, format="json", **self.headers_service_account_principal + ) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - self.assertEqual(response.data.get("errors")[0].get("detail"), "Custom roles cannot be created for rbac") + self.assertEqual( + response.data.get("errors")[0].get("detail"), + "Custom roles cannot be created for rbac", + ) response = client.post(url, test_data, format="json", **self.headers_org_admin) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - self.assertEqual(response.data.get("errors")[0].get("detail"), "Custom roles cannot be created for rbac") + self.assertEqual( + response.data.get("errors")[0].get("detail"), + "Custom roles cannot be created for rbac", + ) From 8549be3cb444ac795f138d8e907e7698027f2620 Mon Sep 17 00:00:00 2001 From: Matt Holder Date: Mon, 6 Jan 2025 07:10:35 -0500 Subject: [PATCH 4/7] linting --- rbac/management/role/serializer.py | 2 +- tests/management/role/test_view.py | 331 +++++++---------------------- 2 files changed, 82 insertions(+), 251 deletions(-) diff --git a/rbac/management/role/serializer.py b/rbac/management/role/serializer.py index f1b15847e..dd42e9711 100644 --- a/rbac/management/role/serializer.py +++ b/rbac/management/role/serializer.py @@ -55,11 +55,11 @@ def validate_attributeFilter(self, value): key = "format" message = "attributeFilter operation 'in' expects a List value" error = {key: [_(message)]} - raise serializers.ValidationError(error) elif type(values) is list and op == "equals": key = "format" message = "attributeFilter operation 'equals' expects a String value" error = {key: [_(message)]} + if error: raise serializers.ValidationError(error) return value diff --git a/tests/management/role/test_view.py b/tests/management/role/test_view.py index e8721c471..9067c36f0 100644 --- a/tests/management/role/test_view.py +++ b/tests/management/role/test_view.py @@ -50,12 +50,7 @@ def normalize_and_sort(json_obj): for key, value in json_obj.items(): if isinstance(value, list): - sorted_list = sorted( - [ - json.dumps(item, sort_keys=True, cls=DjangoJSONEncoder) - for item in value - ] - ) + sorted_list = sorted([json.dumps(item, sort_keys=True, cls=DjangoJSONEncoder) for item in value]) json_obj[key] = [json.loads(item) for item in sorted_list] return json_obj @@ -64,9 +59,7 @@ def normalize_and_sort(json_obj): def replication_event_for_v1_role(v1_role_uuid, default_workspace_id): """Create a replication event for a v1 role.""" return { - "relations_to_add": relation_api_tuples_for_v1_role( - v1_role_uuid, default_workspace_id - ), + "relations_to_add": relation_api_tuples_for_v1_role(v1_role_uuid, default_workspace_id), "relations_to_remove": [], } @@ -77,15 +70,11 @@ def relation_api_tuples_for_v1_role(v1_role_uuid, default_workspace_id): mappings = BindingMapping.objects.filter(role=role_id).all() relations = [] for role_binding in [m.get_role_binding() for m in mappings]: - relation_tuple = relation_api_tuple( - "role_binding", role_binding.id, "role", "role", role_binding.role.id - ) + relation_tuple = relation_api_tuple("role_binding", role_binding.id, "role", "role", role_binding.role.id) relations.append(relation_tuple) for permission in role_binding.role.permissions: - relation_tuple = relation_api_tuple( - "role", role_binding.role.id, permission, "principal", "*" - ) + relation_tuple = relation_api_tuple("role", role_binding.role.id, permission, "principal", "*") relations.append(relation_tuple) if "app_all_read" in role_binding.role.permissions: relation_tuple = relation_api_tuple( @@ -97,9 +86,7 @@ def relation_api_tuples_for_v1_role(v1_role_uuid, default_workspace_id): ) relations.append(relation_tuple) else: - relation_tuple = relation_api_tuple( - "keya/id", "valueA", "binding", "role_binding", role_binding.id - ) + relation_tuple = relation_api_tuple("keya/id", "valueA", "binding", "role_binding", role_binding.id) relations.append(relation_tuple) return relations @@ -196,23 +183,17 @@ def setUp(self): "external_tenant", } - self.principal = Principal( - username=self.user_data["username"], tenant=self.tenant - ) + self.principal = Principal(username=self.user_data["username"], tenant=self.tenant) self.principal.save() self.policy = Policy.objects.create(name="policyA", tenant=self.tenant) - self.group = Group( - name="groupA", description="groupA description", tenant=self.tenant - ) + self.group = Group(name="groupA", description="groupA description", tenant=self.tenant) self.group.save() self.group.principals.add(self.principal) self.group.policies.add(self.policy) self.group.save() self.policyTwo = Policy.objects.create(name="policyB", tenant=self.tenant) - self.groupTwo = Group( - name="groupB", description="groupB description", tenant=self.tenant - ) + self.groupTwo = Group(name="groupB", description="groupB description", tenant=self.tenant) self.groupTwo.save() self.groupTwo.principals.add(self.principal) self.groupTwo.policies.add(self.policyTwo) @@ -221,9 +202,7 @@ def setUp(self): self.adminRole = Role(**admin_def_role_config, tenant=self.tenant) self.adminRole.save() - self.platformAdminRole = Role( - **platform_admin_def_role_config, tenant=self.tenant - ) + self.platformAdminRole = Role(**platform_admin_def_role_config, tenant=self.tenant) self.platformAdminRole.save() self.public_tenant = Tenant.objects.get(tenant_name="public") @@ -237,9 +216,7 @@ def setUp(self): self.defRole.save() self.ext_tenant = ExtTenant.objects.create(name="foo") - self.ext_role_relation = ExtRoleRelation.objects.create( - role=self.defRole, ext_tenant=self.ext_tenant - ) + self.ext_role_relation = ExtRoleRelation.objects.create(role=self.defRole, ext_tenant=self.ext_tenant) self.policy.roles.add( self.defRole, @@ -253,26 +230,14 @@ def setUp(self): self.policyTwo.roles.add(self.platformAdminRole) self.policyTwo.save() - self.permission = Permission.objects.create( - permission="app:*:*", tenant=self.tenant - ) - self.permission2 = Permission.objects.create( - permission="app2:*:*", tenant=self.tenant - ) - self.permission3 = Permission.objects.create( - permission="app:*:read", tenant=self.tenant - ) + self.permission = Permission.objects.create(permission="app:*:*", tenant=self.tenant) + self.permission2 = Permission.objects.create(permission="app2:*:*", tenant=self.tenant) + self.permission3 = Permission.objects.create(permission="app:*:read", tenant=self.tenant) self.permission.permissions.add(self.permission3) - self.access = Access.objects.create( - permission=self.permission, role=self.defRole, tenant=self.tenant - ) - self.access2 = Access.objects.create( - permission=self.permission2, role=self.defRole, tenant=self.tenant - ) + self.access = Access.objects.create(permission=self.permission, role=self.defRole, tenant=self.tenant) + self.access2 = Access.objects.create(permission=self.permission2, role=self.defRole, tenant=self.tenant) - self.access3 = Access.objects.create( - permission=self.permission2, role=self.sysRole, tenant=self.tenant - ) + self.access3 = Access.objects.create(permission=self.permission2, role=self.sysRole, tenant=self.tenant) Permission.objects.create(permission="cost-management:*:*", tenant=self.tenant) self.root_workspace = Workspace.objects.create( name="root", @@ -346,9 +311,7 @@ def create_group(self, group_name): def create_policy(self, policy_name, group, roles): """Create a policy.""" # create a policy - policy = Policy.objects.create( - name=policy_name, tenant=self.tenant, system=True - ) + policy = Policy.objects.create(name=policy_name, tenant=self.tenant, system=True) for role in Role.objects.filter(uuid__in=roles): policy.roles.add(role) policy.group = Group.objects.get(uuid=group) @@ -404,9 +367,7 @@ def test_create_role_success(self, send_kafka_message): self.assertEqual(al_dict_action, "create") # test that we can retrieve the role - url = reverse( - "v1_management:role-detail", kwargs={"uuid": response.data.get("uuid")} - ) + url = reverse("v1_management:role-detail", kwargs={"uuid": response.data.get("uuid")}) client = APIClient() response = client.get(url, **self.headers) uuid = response.data.get("uuid") @@ -449,9 +410,7 @@ def test_create_role_success(self, send_kafka_message): ) @override_settings(V2_MIGRATION_RESOURCE_EXCLUDE_LIST=["rbac:workspace"]) - @patch( - "management.relation_replicator.outbox_replicator.OutboxReplicator._save_replication_event" - ) + @patch("management.relation_replicator.outbox_replicator.OutboxReplicator._save_replication_event") def test_role_replication_exluded_resource(self, mock_method): """Test that excluded resources do not replicate via dual write.""" # Set up @@ -486,12 +445,10 @@ def test_role_replication_exluded_resource(self, mock_method): "too many relations (should not add relations for excluded resource)", ) - role_binding = find_in_list( - to_add, lambda r: r["resource"]["type"]["name"] == "role_binding" - )["resource"]["id"] - workspace = find_in_list( - to_add, lambda r: r["resource"]["type"]["name"] == "workspace" - ) + role_binding = find_in_list(to_add, lambda r: r["resource"]["type"]["name"] == "role_binding")["resource"][ + "id" + ] + workspace = find_in_list(to_add, lambda r: r["resource"]["type"]["name"] == "workspace") self.assertEquals( role_binding, @@ -501,13 +458,9 @@ def test_role_replication_exluded_resource(self, mock_method): role = find_in_list(to_add, lambda r: r["resource"]["type"]["name"] == "role") - self.assertEquals( - role["relation"], "app_all_read", "expected workspace permission" - ) + self.assertEquals(role["relation"], "app_all_read", "expected workspace permission") - @patch( - "management.relation_replicator.outbox_replicator.OutboxReplicator._save_replication_event" - ) + @patch("management.relation_replicator.outbox_replicator.OutboxReplicator._save_replication_event") def test_create_role_with_display_success(self, mock_method): """Test that we can create a role.""" role_name = "roleD" @@ -527,14 +480,10 @@ def test_create_role_with_display_success(self, mock_method): }, {"permission": "app:*:read", "resourceDefinitions": []}, ] - response = self.create_role( - role_name, role_display=role_display, in_access_data=access_data - ) + response = self.create_role(role_name, role_display=role_display, in_access_data=access_data) self.assertEqual(response.status_code, status.HTTP_201_CREATED) - replication_event = replication_event_for_v1_role( - response.data.get("uuid"), str(self.default_workspace.id) - ) + replication_event = replication_event_for_v1_role(response.data.get("uuid"), str(self.default_workspace.id)) mock_method.assert_called_once() actual_call_arg = mock_method.call_args[0][0] @@ -543,9 +492,7 @@ def test_create_role_with_display_success(self, mock_method): self.assertEqual(expected_sorted, actual_sorted) # test that we can retrieve the role - url = reverse( - "v1_management:role-detail", kwargs={"uuid": response.data.get("uuid")} - ) + url = reverse("v1_management:role-detail", kwargs={"uuid": response.data.get("uuid")}) client = APIClient() response = client.get(url, **self.headers) @@ -629,9 +576,7 @@ def test_create_role_allow_list(self): self.assertEqual(response.status_code, status.HTTP_201_CREATED) # test that we can retrieve the role - url = reverse( - "v1_management:role-detail", kwargs={"uuid": response.data.get("uuid")} - ) + url = reverse("v1_management:role-detail", kwargs={"uuid": response.data.get("uuid")}) client = APIClient() response = client.get(url, **self.headers) @@ -972,9 +917,7 @@ def test_get_role_by_display_name_invalid_criteria(self): def test_get_role_by_exact_display_name_match(self): """Test that getting roles by display_name returns exact match.""" - url = "{}?display_name={}&name_match={}".format( - URL, self.sysRole.display_name, "exact" - ) + url = "{}?display_name={}&name_match={}".format(URL, self.sysRole.display_name, "exact") client = APIClient() response = client.get(url, **self.headers) self.assertEqual(response.data.get("meta").get("count"), 1) @@ -989,9 +932,7 @@ def test_get_role_by_exact_display_name_no_match(self): self.assertEqual(response.data.get("meta").get("count"), 0) @patch("management.principal.proxy.PrincipalProxy.request_filtered_principals") - def test_list_role_with_groups_in_fields_with_username_param_for_non_org_admin( - self, mock_request - ): + def test_list_role_with_groups_in_fields_with_username_param_for_non_org_admin(self, mock_request): """ Test that we can read a list of roles and the groups_in fields is set correctly for a request with 'username' param for non org admin principal. @@ -1043,12 +984,8 @@ def test_list_role_with_groups_in_fields_with_username_param_for_non_org_admin( } # add principal to the created group - principal_response = self.add_principal_to_group( - custom_group_uuid, john.username - ) - self.assertEqual( - principal_response.status_code, status.HTTP_200_OK, principal_response - ) + principal_response = self.add_principal_to_group(custom_group_uuid, john.username) + self.assertEqual(principal_response.status_code, status.HTTP_200_OK, principal_response) # add groups_in and groups_in_count fields into display fields groups_in_count = "groups_in_count" @@ -1078,11 +1015,7 @@ def test_list_role_with_groups_in_fields_with_username_param_for_non_org_admin( # make sure created role exists in result set and has correct values created_role = next( - ( - iterRole - for iterRole in response_data - if iterRole["name"] == custom_role_name - ), + (iterRole for iterRole in response_data if iterRole["name"] == custom_role_name), None, ) self.assertIsNotNone(created_role) @@ -1098,9 +1031,7 @@ def test_list_role_with_groups_in_fields_with_username_param_for_non_org_admin( self.assertIn(group["name"], groups) @patch("management.principal.proxy.PrincipalProxy.request_filtered_principals") - def test_list_role_with_groups_in_fields_with_username_param_for_org_admin( - self, mock_request - ): + def test_list_role_with_groups_in_fields_with_username_param_for_org_admin(self, mock_request): """ Test that we can read a list of roles and the groups_in fields is set correctly for a request with 'username' param for org admin principal. @@ -1152,12 +1083,8 @@ def test_list_role_with_groups_in_fields_with_username_param_for_org_admin( } # add principal to the created group - principal_response = self.add_principal_to_group( - custom_group_uuid, mary.username - ) - self.assertEqual( - principal_response.status_code, status.HTTP_200_OK, principal_response - ) + principal_response = self.add_principal_to_group(custom_group_uuid, mary.username) + self.assertEqual(principal_response.status_code, status.HTTP_200_OK, principal_response) # add groups_in and groups_in_count fields into display fields groups_in_count = "groups_in_count" @@ -1187,11 +1114,7 @@ def test_list_role_with_groups_in_fields_with_username_param_for_org_admin( # make sure created role exists in result set and has correct values created_role = next( - ( - iterRole - for iterRole in response_data - if iterRole["name"] == custom_role_name - ), + (iterRole for iterRole in response_data if iterRole["name"] == custom_role_name), None, ) self.assertIsNotNone(created_role) @@ -1212,9 +1135,7 @@ def test_list_role_with_groups_in_fields_with_username_param_for_org_admin( self.assertIn(group["name"], groups) @patch("management.principal.proxy.PrincipalProxy.request_filtered_principals") - def test_list_role_with_groups_in_fields_for_principal_scope_success( - self, mock_request - ): + def test_list_role_with_groups_in_fields_for_principal_scope_success(self, mock_request): """ Test that we can read a list of roles and the groups_in fields is set correctly for a principal scoped request. @@ -1251,12 +1172,8 @@ def test_list_role_with_groups_in_fields_for_principal_scope_success( self.create_policy(policy_name, group_uuid, [role_uuid]) # add user principal to the created group - principal_response = self.add_principal_to_group( - group_uuid, self.principal.username - ) - self.assertEqual( - principal_response.status_code, status.HTTP_200_OK, principal_response - ) + principal_response = self.add_principal_to_group(group_uuid, self.principal.username) + self.assertEqual(principal_response.status_code, status.HTTP_200_OK, principal_response) # hit /roles?groups_in, group should appear in groups_in field_1 = "groups_in_count" @@ -1265,9 +1182,7 @@ def test_list_role_with_groups_in_fields_for_principal_scope_success( new_display_fields.add(field_1) new_display_fields.add(field_2) - url = "{}?add_fields={},{}&username={}".format( - URL, field_1, field_2, self.principal.username - ) + url = "{}?add_fields={},{}&username={}".format(URL, field_1, field_2, self.principal.username) client = APIClient() response = client.get(url, **self.headers) @@ -1297,11 +1212,7 @@ def test_list_role_with_groups_in_fields_for_principal_scope_success( # make sure a default role exists in result set and has correct values default_role = next( - ( - iterRole - for iterRole in response_data - if iterRole["name"] == self.defRole.name - ), + (iterRole for iterRole in response_data if iterRole["name"] == self.defRole.name), None, ) self.assertIsNotNone(default_role) @@ -1339,11 +1250,7 @@ def test_list_role_with_groups_in_fields_for_admin_scope_success(self): # make sure a default role exists in result set and has correct values default_role = next( - ( - iterRole - for iterRole in response_data - if iterRole["name"] == self.defRole.name - ), + (iterRole for iterRole in response_data if iterRole["name"] == self.defRole.name), None, ) self.assertIsNotNone(default_role) @@ -1352,11 +1259,7 @@ def test_list_role_with_groups_in_fields_for_admin_scope_success(self): # make sure an admin role exists in result set and has correct values admin_role = next( - ( - iterRole - for iterRole in response_data - if iterRole["name"] == self.adminRole.name - ), + (iterRole for iterRole in response_data if iterRole["name"] == self.adminRole.name), None, ) self.assertIsNotNone(admin_role) @@ -1413,9 +1316,7 @@ def test_list_role_with_additional_fields_username_success(self, mock_request): ], } - url = "{}?add_fields={},{}&username={}".format( - URL, field_1, field_2, self.principal.username - ) + url = "{}?add_fields={},{}&username={}".format(URL, field_1, field_2, self.principal.username) client = APIClient() response = client.get(url, **self.headers) @@ -1594,9 +1495,7 @@ def test_update_role_success(self, send_kafka_message): self.assertIsNotNone(response.data.get("uuid")) self.assertEqual(updated_name, response.data.get("name")) - self.assertEqual( - "cost-management:*:*", response.data.get("access")[0]["permission"] - ) + self.assertEqual("cost-management:*:*", response.data.get("access")[0]["permission"]) # test whether newly updated (post) role is added correctly within audit log database al_url = "/api/rbac/v1/auditlogs/" @@ -1685,9 +1584,7 @@ def test_update_role_invalid_permission(self): response = client.put(url, test_data, format="json", **self.headers) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - @patch( - "management.relation_replicator.outbox_replicator.OutboxReplicator._save_replication_event" - ) + @patch("management.relation_replicator.outbox_replicator.OutboxReplicator._save_replication_event") def test_update_role(self, mock_method): """Test that updating a role with an invalid permission returns an error.""" # Set up @@ -1730,14 +1627,10 @@ def test_update_role(self, mock_method): test_data["access"] = new_access_data url = reverse("v1_management:role-detail", kwargs={"uuid": role_uuid}) client = APIClient() - current_relations = relation_api_tuples_for_v1_role( - role_uuid, str(self.default_workspace.id) - ) + current_relations = relation_api_tuples_for_v1_role(role_uuid, str(self.default_workspace.id)) response = client.put(url, test_data, format="json", **self.headers) - replication_event = replication_event_for_v1_role( - response.data.get("uuid"), str(self.default_workspace.id) - ) + replication_event = replication_event_for_v1_role(response.data.get("uuid"), str(self.default_workspace.id)) replication_event["relations_to_remove"] = current_relations actual_call_arg = mock_method.call_args[0][0] expected_sorted = normalize_and_sort(replication_event) @@ -1804,9 +1697,7 @@ def test_update_role_appfilter_operation_fail(self): self.assertEqual(response.status_code, status.HTTP_201_CREATED) role_uuid = response.data.get("uuid") test_data = response.data - test_data.get("access")[0]["resourceDefinitions"][0].get("attributeFilter")[ - "operation" - ] = "foo" + test_data.get("access")[0]["resourceDefinitions"][0].get("attributeFilter")["operation"] = "foo" # Test update failure url = reverse("v1_management:role-detail", kwargs={"uuid": role_uuid}) @@ -1854,9 +1745,7 @@ def test_update_role_permission_does_not_exist_fail(self): f"Permission does not exist: {permission}", ) - @patch( - "management.relation_replicator.outbox_replicator.OutboxReplicator._save_replication_event" - ) + @patch("management.relation_replicator.outbox_replicator.OutboxReplicator._save_replication_event") def test_delete_role(self, mock_method): """Test that we can delete an existing role.""" role_name = "roleA" @@ -1881,9 +1770,7 @@ def test_delete_role(self, mock_method): url = reverse("v1_management:role-detail", kwargs={"uuid": role_uuid}) client = APIClient() replication_event = {"relations_to_add": [], "relations_to_remove": []} - current_relations = relation_api_tuples_for_v1_role( - role_uuid, str(self.default_workspace.id) - ) + current_relations = relation_api_tuples_for_v1_role(role_uuid, str(self.default_workspace.id)) replication_event["relations_to_remove"] = current_relations response = client.delete(url, **self.headers) actual_call_arg = mock_method.call_args[0][0] @@ -1964,17 +1851,11 @@ def test_delete_system_role(self): def test_delete_system_role_in_public_tenant(self): """Test that system roles in public tenant are protected from deletion""" - url = reverse( - "v1_management:role-detail", kwargs={"uuid": self.sysPubRole.uuid} - ) + url = reverse("v1_management:role-detail", kwargs={"uuid": self.sysPubRole.uuid}) client = APIClient() - existing_role = Role.objects.filter( - uuid=self.sysPubRole.uuid, tenant=self.public_tenant - ).first() - self.assertIsNotNone( - existing_role, "Public system role should exist before deletion test." - ) + existing_role = Role.objects.filter(uuid=self.sysPubRole.uuid, tenant=self.public_tenant).first() + self.assertIsNotNone(existing_role, "Public system role should exist before deletion test.") response = client.delete(url, **self.headers) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) @@ -2159,9 +2040,7 @@ def setUp(self): "platform_default": True, "admin_default": False, } - self.platform_default_role = Role.objects.create( - **platform_default_role_config, tenant=self.tenant - ) + self.platform_default_role = Role.objects.create(**platform_default_role_config, tenant=self.tenant) admin_default_role_config = { "name": "admin_default_role", @@ -2170,9 +2049,7 @@ def setUp(self): "platform_default": False, "admin_default": True, } - self.admin_default_role = Role.objects.create( - **admin_default_role_config, tenant=self.tenant - ) + self.admin_default_role = Role.objects.create(**admin_default_role_config, tenant=self.tenant) not_system_role_config = { "name": "not_system_role", @@ -2182,9 +2059,7 @@ def setUp(self): "admin_default": False, } - self.not_system_role = Role.objects.create( - **not_system_role_config, tenant=self.tenant - ) + self.not_system_role = Role.objects.create(**not_system_role_config, tenant=self.tenant) self.system_roles_count = 2 self.non_system_roles_count = 1 @@ -2208,9 +2083,7 @@ def setUp(self): # Create 2 non org admin principals and 1 org admin # 1. user based principal - self.user_based_principal = Principal( - username="user_based_principal", tenant=self.tenant - ) + self.user_based_principal = Principal(username="user_based_principal", tenant=self.tenant) self.user_based_principal.save() customer_data = { @@ -2227,9 +2100,7 @@ def setUp(self): }, is_org_admin=False, ) - self.headers_user_based_principal = request_context_user_based_principal[ - "request" - ].META + self.headers_user_based_principal = request_context_user_based_principal["request"].META # 2. service account based principal service_account_data = self._create_service_account_data() @@ -2246,9 +2117,7 @@ def setUp(self): service_account_data=service_account_data, is_org_admin=False, ) - self.headers_service_account_principal = ( - request_context_service_account_principal["request"].META - ) + self.headers_service_account_principal = request_context_service_account_principal["request"].META # 3 org admin principal in the tenant self.org_admin = Principal(username="org_admin", tenant=self.tenant) @@ -2262,9 +2131,7 @@ def setUp(self): self.headers_org_admin = request_context_org_admin["request"].META # Error messages - self.no_permission_err_message = ( - "You do not have permission to perform this action." - ) + self.no_permission_err_message = "You do not have permission to perform this action." def tearDown(self): """Tear down role viewset nonadmin tests.""" @@ -2327,15 +2194,11 @@ def test_list_roles_without_User_Access_Admin_fail(self): response = client.get(url, **self.headers_user_based_principal) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) - self.assertEqual( - response.data.get("errors")[0].get("detail"), self.no_permission_err_message - ) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) response = client.get(url, **self.headers_service_account_principal) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) - self.assertEqual( - response.data.get("errors")[0].get("detail"), self.no_permission_err_message - ) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) # Org Admin can list the roles response = client.get(url, **self.headers_org_admin) @@ -2348,12 +2211,8 @@ def test_list_roles_with_User_Access_Admin_success(self): Test that principal with 'User Access administrator' role can read a list of roles. """ # Create a group with 'User Access administrator' role and add principals we use in headers - group_with_UA_admin = self._create_group_with_user_access_admin_role( - self.tenant - ) - group_with_UA_admin.principals.add( - self.user_based_principal, self.service_account_principal - ) + group_with_UA_admin = self._create_group_with_user_access_admin_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) client = APIClient() url = reverse("v1_management:role-list") @@ -2390,12 +2249,8 @@ def test_list_roles_with_User_Access_Admin_system_true_success(self): with '?system=true' in the request. """ # Create a group with 'User Access administrator' role and add principals we use in headers - group_with_UA_admin = self._create_group_with_user_access_admin_role( - self.tenant - ) - group_with_UA_admin.principals.add( - self.user_based_principal, self.service_account_principal - ) + group_with_UA_admin = self._create_group_with_user_access_admin_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) client = APIClient() url = reverse("v1_management:role-list") + "?system=true" @@ -2419,15 +2274,11 @@ def test_list_roles_without_User_Access_Admin_system_false_fail(self): response = client.get(url, **self.headers_user_based_principal) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) - self.assertEqual( - response.data.get("errors")[0].get("detail"), self.no_permission_err_message - ) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) response = client.get(url, **self.headers_service_account_principal) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) - self.assertEqual( - response.data.get("errors")[0].get("detail"), self.no_permission_err_message - ) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) # Org Admin can list the roles response = client.get(url, **self.headers_org_admin) @@ -2440,12 +2291,8 @@ def test_list_roles_with_User_Access_Admin_system_false_success(self): with '?system=false' in the request. """ # Create a group with 'User Access administrator' role and add principals we use in headers - group_with_UA_admin = self._create_group_with_user_access_admin_role( - self.tenant - ) - group_with_UA_admin.principals.add( - self.user_based_principal, self.service_account_principal - ) + group_with_UA_admin = self._create_group_with_user_access_admin_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) client = APIClient() url = reverse("v1_management:role-list") + "?system=false" @@ -2468,15 +2315,11 @@ def test_list_roles_without_User_Access_Admin_system_foo_fail(self): response = client.get(url, **self.headers_user_based_principal) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) - self.assertEqual( - response.data.get("errors")[0].get("detail"), self.no_permission_err_message - ) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) response = client.get(url, **self.headers_service_account_principal) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) - self.assertEqual( - response.data.get("errors")[0].get("detail"), self.no_permission_err_message - ) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) # Org Admin can list the roles response = client.get(url, **self.headers_org_admin) @@ -2490,12 +2333,8 @@ def test_list_roles_with_User_Access_Admin_system_foo_success(self): so the 'system' param is ignored in this case). """ # Create a group with 'User Access administrator' role and add principals we use in headers - group_with_UA_admin = self._create_group_with_user_access_admin_role( - self.tenant - ) - group_with_UA_admin.principals.add( - self.user_based_principal, self.service_account_principal - ) + group_with_UA_admin = self._create_group_with_user_access_admin_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) client = APIClient() url = reverse("v1_management:role-list") @@ -2520,12 +2359,8 @@ def test_create_role_with_rbac_permission_fail(self): Test that it is not possible to create a custom role with RBAC permission. """ # Create a group with 'User Access administrator' role and add principals we use in headers - group_with_UA_admin = self._create_group_with_user_access_admin_role( - self.tenant - ) - group_with_UA_admin.principals.add( - self.user_based_principal, self.service_account_principal - ) + group_with_UA_admin = self._create_group_with_user_access_admin_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) Permission.objects.create(permission="rbac:principal:read", tenant=self.tenant) permissions_count = len(Permission.objects.all()) @@ -2544,18 +2379,14 @@ def test_create_role_with_rbac_permission_fail(self): access_data = [{"permission": "rbac:*:*", "resourceDefinitions": []}] test_data = {"name": role_name, "access": access_data} - response = client.post( - url, test_data, format="json", **self.headers_user_based_principal - ) + response = client.post(url, test_data, format="json", **self.headers_user_based_principal) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertEqual( response.data.get("errors")[0].get("detail"), "Custom roles cannot be created for rbac", ) - response = client.post( - url, test_data, format="json", **self.headers_service_account_principal - ) + response = client.post(url, test_data, format="json", **self.headers_service_account_principal) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertEqual( response.data.get("errors")[0].get("detail"), From 8f684dd98b0ee247e31b5f1f30cc5b718c3c4533 Mon Sep 17 00:00:00 2001 From: Matt Holder Date: Mon, 6 Jan 2025 07:26:58 -0500 Subject: [PATCH 5/7] check using isinstance --- rbac/management/role/serializer.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rbac/management/role/serializer.py b/rbac/management/role/serializer.py index dd42e9711..96c4ac9b4 100644 --- a/rbac/management/role/serializer.py +++ b/rbac/management/role/serializer.py @@ -51,11 +51,12 @@ def validate_attributeFilter(self, value): raise serializers.ValidationError(error) else: values = value.get("value") - if type(values) is str and op == "in": + error = False + if isinstance(values, str) and op == "in": key = "format" message = "attributeFilter operation 'in' expects a List value" error = {key: [_(message)]} - elif type(values) is list and op == "equals": + elif isinstance(values, list) and op == "equals": key = "format" message = "attributeFilter operation 'equals' expects a String value" error = {key: [_(message)]} From d5d3c2a8a09d091cf4aad13019bb6b8c123ab805 Mon Sep 17 00:00:00 2001 From: Matt Holder Date: Mon, 6 Jan 2025 13:15:19 -0500 Subject: [PATCH 6/7] check inverse --- rbac/management/role/serializer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rbac/management/role/serializer.py b/rbac/management/role/serializer.py index 96c4ac9b4..ac929a97d 100644 --- a/rbac/management/role/serializer.py +++ b/rbac/management/role/serializer.py @@ -52,7 +52,7 @@ def validate_attributeFilter(self, value): else: values = value.get("value") error = False - if isinstance(values, str) and op == "in": + if not isinstance(values, list) and op == "in": key = "format" message = "attributeFilter operation 'in' expects a List value" error = {key: [_(message)]} From 8c044ab058708a8f5b10fbd7d8289891813d443b Mon Sep 17 00:00:00 2001 From: Matt Holder Date: Mon, 6 Jan 2025 13:15:50 -0500 Subject: [PATCH 7/7] more precise error assertion --- tests/management/role/test_view.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/management/role/test_view.py b/tests/management/role/test_view.py index 9067c36f0..f5d83f816 100644 --- a/tests/management/role/test_view.py +++ b/tests/management/role/test_view.py @@ -2003,7 +2003,7 @@ def test_create_role_with_invalid_equals_operation(self): } ] response = self.create_role(role_name, in_access_data=access_data) - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data["errors"][0]["source"] == "resourceDefinitions.attributeFilter.format") def test_create_role_with_invalid_in_operation(self): """Test that we cannot create a role when a String value is paired with the 'in' operation.""" @@ -2016,14 +2016,14 @@ def test_create_role_with_invalid_in_operation(self): "attributeFilter": { "key": "keyA.id", "operation": "in", - "value": "valueA", + "value": "123456", } } ], } ] response = self.create_role(role_name, in_access_data=access_data) - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data["errors"][0]["source"] == "resourceDefinitions.attributeFilter.format") class RoleViewNonAdminTests(IdentityRequest):