From e8c49c6e6b4da4b1ef7456ffced6dfe0a2ac8c0f Mon Sep 17 00:00:00 2001 From: Andrew Walker Date: Fri, 13 Dec 2024 22:34:01 -0600 Subject: [PATCH] NAS-133042 / 25.04 / Lock out plugins when in GPOS STIG mode (#15178) Certain TrueNAS features are incompatibile with requirements for the GPOS STIG. This commit makes changes to our RBAC framework to decrease effective privileges granted to all credentials to prevent configuring non-compliant services and to give UI hints about which features to disable. As a result of these changes, the allowlist column is dropped from the privilege table. Internally our credentials still generate an allowlist in order to perform RBAC checks. The allowlist changes have a functional impact on privilege framework regarding REST API access. After these changes, only credentials with FULL_ADMIN privilege will be allowed REST access and only when server does not have STIG rules applied. --- ...-12-11_20-52_remove-privilege-allowlist.py | 32 ++++ .../middlewared/api/base/decorator.py | 2 +- .../middlewared/api/v25_04_0/privilege.py | 13 +- .../middlewared/plugins/account.py | 11 +- .../middlewared/plugins/account_/privilege.py | 24 +-- .../plugins/account_/privilege_roles.py | 16 +- .../activedirectory_join_mixin.py | 2 +- .../middlewared/plugins/security/update.py | 4 +- .../middlewared/plugins/test/mock.py | 30 ++++ src/middlewared/middlewared/role.py | 63 ++++++-- .../middlewared/service/decorators.py | 9 +- .../test/integration/assets/account.py | 6 +- .../test/integration/assets/product.py | 4 +- .../test/integration/assets/roles.py | 1 - .../middlewared/utils/privilege.py | 3 - src/middlewared/middlewared/utils/security.py | 16 ++ tests/api2/test_014_failover_related.py | 1 - tests/api2/test_030_activedirectory.py | 8 +- tests/api2/test_275_ldap.py | 2 +- tests/api2/test_account_privilege.py | 11 +- .../test_account_privilege_authentication.py | 3 +- tests/api2/test_account_query_roles.py | 8 - tests/api2/test_account_shell_choices.py | 6 +- tests/api2/test_audit_rest.py | 1 - tests/api2/test_auth_token.py | 2 +- tests/api2/test_core_bulk.py | 6 +- tests/api2/test_job_credentials.py | 3 +- tests/api2/test_job_events.py | 2 +- tests/api2/test_job_lock.py | 4 +- tests/api2/test_job_logs.py | 13 +- tests/api2/test_legacy_websocket.py | 1 - tests/api2/test_password_reset.py | 1 - tests/api2/test_rest_api_authentication.py | 45 +----- tests/api2/test_rest_api_download.py | 4 +- tests/api2/test_stig.py | 143 +++++++++++++----- tests/api2/test_twofactor_auth.py | 1 - tests/unit/test_role_manager.py | 73 +++++++++ 37 files changed, 391 insertions(+), 183 deletions(-) create mode 100644 src/middlewared/middlewared/alembic/versions/25.04/2024-12-11_20-52_remove-privilege-allowlist.py create mode 100644 src/middlewared/middlewared/utils/security.py create mode 100644 tests/unit/test_role_manager.py diff --git a/src/middlewared/middlewared/alembic/versions/25.04/2024-12-11_20-52_remove-privilege-allowlist.py b/src/middlewared/middlewared/alembic/versions/25.04/2024-12-11_20-52_remove-privilege-allowlist.py new file mode 100644 index 0000000000000..a65e01d95e0d8 --- /dev/null +++ b/src/middlewared/middlewared/alembic/versions/25.04/2024-12-11_20-52_remove-privilege-allowlist.py @@ -0,0 +1,32 @@ +""" Remove privilege allowlist + +Revision ID: aea6bced4328 +Revises: b44c092bfa30 +Create Date: 2024-12-11 20:52:26.972597+00:00 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'aea6bced4328' +down_revision = 'b44c092bfa30' +branch_labels = None +depends_on = None + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.execute("UPDATE account_privilege SET roles='[\"FULL_ADMIN\"]' WHERE allowlist='[{\"method\": \"*\", \"resource\": \"*\"}]'") + with op.batch_alter_table('account_privilege', schema=None) as batch_op: + batch_op.drop_column('allowlist') + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('account_privilege', schema=None) as batch_op: + batch_op.add_column(sa.Column('allowlist', sa.TEXT(), nullable=False)) + + # ### end Alembic commands ### diff --git a/src/middlewared/middlewared/api/base/decorator.py b/src/middlewared/middlewared/api/base/decorator.py index f703b9f93c16f..b26d7bb276434 100644 --- a/src/middlewared/middlewared/api/base/decorator.py +++ b/src/middlewared/middlewared/api/base/decorator.py @@ -92,7 +92,7 @@ def wrapped(*args): wrapped.audit_callback = audit_callback wrapped.audit_extended = audit_extended wrapped.rate_limit = rate_limit - wrapped.roles = roles or [] + wrapped.roles = roles or ['FULL_ADMIN'] wrapped._private = private wrapped._cli_private = cli_private diff --git a/src/middlewared/middlewared/api/v25_04_0/privilege.py b/src/middlewared/middlewared/api/v25_04_0/privilege.py index 92cc53dad99bf..904d43d49d485 100644 --- a/src/middlewared/middlewared/api/v25_04_0/privilege.py +++ b/src/middlewared/middlewared/api/v25_04_0/privilege.py @@ -1,8 +1,8 @@ from middlewared.api.base import BaseModel, Excluded, excluded_field, ForUpdateMetaclass, NonEmptyString, SID -from .api_key import AllowListItem +from middlewared.utils.security import STIGType from .group import GroupEntry -__all__ = ["PrivilegeEntry", +__all__ = ["PrivilegeEntry", "PrivilegeRoleEntry", "PrivilegeCreateArgs", "PrivilegeCreateResult", "PrivilegeUpdateArgs", "PrivilegeUpdateResult", "PrivilegeDeleteArgs", "PrivilegeDeleteResult"] @@ -14,7 +14,6 @@ class PrivilegeEntry(BaseModel): name: NonEmptyString local_groups: list[GroupEntry] ds_groups: list[GroupEntry] - allowlist: list[AllowListItem] = [] roles: list[str] = [] web_shell: bool @@ -53,3 +52,11 @@ class PrivilegeDeleteArgs(BaseModel): class PrivilegeDeleteResult(BaseModel): result: bool + + +class PrivilegeRoleEntry(BaseModel): + name: NonEmptyString + title: NonEmptyString + includes: list[NonEmptyString] + builtin: bool + stig: STIGType | None diff --git a/src/middlewared/middlewared/plugins/account.py b/src/middlewared/middlewared/plugins/account.py index dc5618e2b7315..2bd95d3f1f08e 100644 --- a/src/middlewared/middlewared/plugins/account.py +++ b/src/middlewared/middlewared/plugins/account.py @@ -1291,6 +1291,13 @@ async def common_validation(self, verrors, data, schema, group_ids, old=None): f'{schema}.password_disabled', 'Password authentication may not be disabled for SMB users.' ) + if combined['smb'] and (await self.middleware.call('system.security.config'))['enable_gpos_stig']: + verrors.add( + f'{schema}.smb', + 'SMB authentication for local user accounts is not permitted when General Purpose OS ' + 'STIG compatibility is enabled.' + ) + password = data.get('password') if not old and not password and not data.get('password_disabled'): verrors.add(f'{schema}.password', 'Password is required') @@ -1593,10 +1600,6 @@ async def group_extend(self, group, ctx): group['users'] = list({u['id'] for u in group['users']} | ctx['primary_memberships'][group['id']]) privilege_mappings = privileges_group_mapping(ctx['privileges'], [group['gid']], 'local_groups') - if privilege_mappings['allowlist']: - privilege_mappings['roles'].append('HAS_ALLOW_LIST') - if {'method': '*', 'resource': '*'} in privilege_mappings['allowlist']: - privilege_mappings['roles'].append('FULL_ADMIN') match group['group']: case 'builtin_administrators': diff --git a/src/middlewared/middlewared/plugins/account_/privilege.py b/src/middlewared/middlewared/plugins/account_/privilege.py index 5cde64293b5e1..a9efac00b4821 100644 --- a/src/middlewared/middlewared/plugins/account_/privilege.py +++ b/src/middlewared/middlewared/plugins/account_/privilege.py @@ -12,6 +12,7 @@ privilege_has_webui_access, privileges_group_mapping ) +from middlewared.utils.security import system_security_config_to_stig_type import middlewared.sqlalchemy as sa @@ -29,7 +30,6 @@ class PrivilegeModel(sa.Model): name = sa.Column(sa.String(200)) local_groups = sa.Column(sa.JSON(list)) ds_groups = sa.Column(sa.JSON(list)) - allowlist = sa.Column(sa.JSON(list)) roles = sa.Column(sa.JSON(list)) web_shell = sa.Column(sa.Boolean()) @@ -45,6 +45,7 @@ class Config: datastore_extend_context = "privilege.item_extend_context" cli_namespace = "auth.privilege" entry = PrivilegeEntry + role_prefix = 'PRIVILEGE' @private async def item_extend_context(self, rows, extra): @@ -70,7 +71,7 @@ async def do_create(self, data): `ds_groups` is list of Directory Service group GIDs that will gain this privilege. - `allowlist` is a list of API endpoints allowed for this privilege. + `roles` is a list of roles to be assigned to the privilege `web_shell` controls whether users with this privilege are allowed to log in to the Web UI. """ @@ -105,7 +106,7 @@ async def do_update(self, audit_callback, id_, data): verrors = ValidationErrors() if new["builtin_name"]: - for k in ["name", "allowlist", "roles"]: + for k in ["name", "roles"]: if new[k] != old[k]: verrors.add(f"privilege_update.{k}", "This field is read-only for built-in privileges") @@ -354,6 +355,9 @@ async def privileges_for_groups(self, groups_key, group_ids): @private async def compose_privilege(self, privileges): + security_config = await self.middleware.call('system.security.config') + enabled_stig = system_security_config_to_stig_type(security_config) + compose = { 'roles': set(), 'allowlist': [], @@ -362,20 +366,16 @@ async def compose_privilege(self, privileges): } for privilege in privileges: for role in privilege['roles']: - compose['roles'] |= self.middleware.role_manager.roles_for_role(role) - - compose['allowlist'].extend(self.middleware.role_manager.allowlist_for_role(role)) + compose['roles'] |= self.middleware.role_manager.roles_for_role(role, enabled_stig) - for item in privilege['allowlist']: - if item == {'method': '*', 'resource': '*'} and 'FULL_ADMIN' not in compose['roles']: - compose['roles'] |= self.middleware.role_manager.roles_for_role('FULL_ADMIN') - compose['webui_access'] = True - - compose['allowlist'].append(item) + compose['allowlist'].extend(self.middleware.role_manager.allowlist_for_role(role, enabled_stig)) compose['web_shell'] |= privilege['web_shell'] compose['webui_access'] |= privilege_has_webui_access(privilege) + if enabled_stig: + compose['web_shell'] = False + return compose @private diff --git a/src/middlewared/middlewared/plugins/account_/privilege_roles.py b/src/middlewared/middlewared/plugins/account_/privilege_roles.py index b2cf7f62428d8..2b4ddadfc7ccb 100644 --- a/src/middlewared/middlewared/plugins/account_/privilege_roles.py +++ b/src/middlewared/middlewared/plugins/account_/privilege_roles.py @@ -1,6 +1,7 @@ +from middlewared.api.current import PrivilegeRoleEntry + from middlewared.role import ROLES -from middlewared.service import Service, filterable, filterable_returns, filter_list, no_authz_required -from middlewared.schema import Bool, Dict, List, Str +from middlewared.service import Service, filterable_api_method, filter_list class PrivilegeService(Service): @@ -9,15 +10,7 @@ class Config: namespace = "privilege" cli_namespace = "auth.privilege" - @no_authz_required - @filterable - @filterable_returns(Dict( - "role", - Str("name"), - Str("title"), - List("includes", items=[Str("name")]), - Bool("builtin") - )) + @filterable_api_method(item=PrivilegeRoleEntry, authorization_required=False) async def roles(self, filters, options): """ Get all available roles. @@ -39,6 +32,7 @@ async def roles(self, filters, options): "title": name, "includes": role.includes, "builtin": role.builtin, + "stig": role.stig, } for name, role in ROLES.items() ] diff --git a/src/middlewared/middlewared/plugins/directoryservices_/activedirectory_join_mixin.py b/src/middlewared/middlewared/plugins/directoryservices_/activedirectory_join_mixin.py index caede7c0e33d7..66f97261dd3a6 100644 --- a/src/middlewared/middlewared/plugins/directoryservices_/activedirectory_join_mixin.py +++ b/src/middlewared/middlewared/plugins/directoryservices_/activedirectory_join_mixin.py @@ -251,7 +251,7 @@ def _ad_grant_privileges(self) -> None: self.middleware.call_sync('privilege.create', { 'name': dom.dns_name.upper(), 'ds_groups': [f'{dom.sid}-512'], - 'allowlist': [{'method': '*', 'resource': '*'}], + 'roles': ['FULL_ADMIN'], 'web_shell': True }) except Exception: diff --git a/src/middlewared/middlewared/plugins/security/update.py b/src/middlewared/middlewared/plugins/security/update.py index 1401c237a27e9..2d0320a0fe9db 100644 --- a/src/middlewared/middlewared/plugins/security/update.py +++ b/src/middlewared/middlewared/plugins/security/update.py @@ -24,7 +24,7 @@ class Config: cli_namespace = 'system.security' datastore = 'system.security' namespace = 'system.security' - role = 'SYSTEM_SECURITY' + role_prefix = 'SYSTEM_SECURITY' entry = SystemSecurityEntry @private @@ -188,6 +188,8 @@ async def do_update(self, job, data): ) await self.configure_security_on_ha(is_ha, job, RebootReason.GPOSSTIG) + await self.configure_stig(new) + return await self.config() diff --git a/src/middlewared/middlewared/plugins/test/mock.py b/src/middlewared/middlewared/plugins/test/mock.py index 5ba0725d7b790..579cb70193858 100644 --- a/src/middlewared/middlewared/plugins/test/mock.py +++ b/src/middlewared/middlewared/plugins/test/mock.py @@ -1,6 +1,17 @@ +from middlewared.alert.base import AlertCategory, AlertClass, AlertLevel, SimpleOneShotAlertClass +from middlewared.role import Role from middlewared.service import CallError, Service +class SystemTestingAlertClass(AlertClass, SimpleOneShotAlertClass): + category = AlertCategory.SYSTEM + level = AlertLevel.CRITICAL + title = "System mocking endpoints used" + text = "System mocking endpoits used on server." + + deleted_automatically = False + + class TestService(Service): class Config: private = True @@ -25,12 +36,31 @@ def method(*args, **kwargs): else: raise CallError("Invalid mock declaration") + await self.set_mock_role() self.middleware.set_mock(name, args, method) async def remove_mock(self, name, args): self.middleware.remove_mock(name, args) + async def set_mock_role(self): + """ + adds a MOCK role to role_manager and grants access to test.test1 and test.test2 + + This allows testing RBAC against mocked endpoint + """ + if 'MOCK' in self.middleware.role_manager.roles: + return + + # There are no STIG requirements specified for MOCK role here because + # we need to be able to mock methods in CI testing while under STIG restrictions + self.middleware.role_manager.roles['MOCK'] = Role() + self.middleware.role_manager.register_method(method_name='test.test1', roles=['MOCK']) + self.middleware.role_manager.register_method(method_name='test.test2', roles=['MOCK']) + + await self.middleware.call('alert.oneshot_create', 'SystemTesting') + # Dummy methods to mock for internal infrastructure testing (i.e. jobs manager) + # When these are mocked over they will be available to users with the "MOCK" role. async def test1(self): pass diff --git a/src/middlewared/middlewared/role.py b/src/middlewared/middlewared/role.py index 111712e6f4e04..3e39665550fa7 100644 --- a/src/middlewared/middlewared/role.py +++ b/src/middlewared/middlewared/role.py @@ -1,5 +1,6 @@ from collections import defaultdict from dataclasses import dataclass, field +from middlewared.utils.security import STIGType import typing @@ -15,6 +16,7 @@ class Role: includes: typing.List[str] = field(default_factory=list) full_admin: bool = False builtin: bool = True + stig: STIGType = STIGType.GPOS # By default roles are available under GPOS STIG ROLES = { @@ -45,6 +47,11 @@ class Role: 'FILESYSTEM_DATA_WRITE': Role(includes=['FILESYSTEM_DATA_READ']), 'FILESYSTEM_FULL_CONTROL': Role(includes=['FILESYSTEM_ATTRS_WRITE', 'FILESYSTEM_DATA_WRITE']), + + # Interact with privilege framework + 'PRIVILEGE_READ': Role(), + 'PRIVILEGE_WRITE': Role(includes=['PRIVILEGE_WRITE']), + 'REPORTING_READ': Role(), 'REPORTING_WRITE': Role(includes=['REPORTING_READ']), @@ -79,9 +86,9 @@ class Role: # VM roles 'VM_READ': Role(), - 'VM_WRITE': Role(includes=['VM_READ']), + 'VM_WRITE': Role(includes=['VM_READ'], stig=None), 'VM_DEVICE_READ': Role(includes=['VM_READ']), - 'VM_DEVICE_WRITE': Role(includes=['VM_WRITE', 'VM_DEVICE_READ']), + 'VM_DEVICE_WRITE': Role(includes=['VM_WRITE', 'VM_DEVICE_READ'], stig=None), # JBOF roles 'JBOF_READ': Role(), @@ -89,7 +96,7 @@ class Role: # Truecommand roles 'TRUECOMMAND_READ': Role(), - 'TRUECOMMAND_WRITE': Role(includes=['TRUECOMMAND_READ']), + 'TRUECOMMAND_WRITE': Role(includes=['TRUECOMMAND_READ'], stig=None), # Crypto roles 'CERTIFICATE_READ': Role(), @@ -99,11 +106,11 @@ class Role: # Apps roles 'CATALOG_READ': Role(), - 'CATALOG_WRITE': Role(includes=['CATALOG_READ']), + 'CATALOG_WRITE': Role(includes=['CATALOG_READ'], stig=None), 'DOCKER_READ': Role(includes=[]), - 'DOCKER_WRITE': Role(includes=['DOCKER_READ']), + 'DOCKER_WRITE': Role(includes=['DOCKER_READ'], stig=None), 'APPS_READ': Role(includes=['CATALOG_READ']), - 'APPS_WRITE': Role(includes=['CATALOG_WRITE', 'APPS_READ']), + 'APPS_WRITE': Role(includes=['CATALOG_WRITE', 'APPS_READ'], stig=None), # FTP roles 'SHARING_FTP_READ': Role(), @@ -201,12 +208,12 @@ class Role: # Virtualization 'VIRT_GLOBAL_READ': Role(), - 'VIRT_GLOBAL_WRITE': Role(includes=['VIRT_GLOBAL_READ']), + 'VIRT_GLOBAL_WRITE': Role(includes=['VIRT_GLOBAL_READ'], stig=None), 'VIRT_INSTANCE_READ': Role(), - 'VIRT_INSTANCE_WRITE': Role(includes=['VIRT_INSTANCE_READ']), - 'VIRT_INSTANCE_DELETE': Role(), + 'VIRT_INSTANCE_WRITE': Role(includes=['VIRT_INSTANCE_READ'], stig=None), + 'VIRT_INSTANCE_DELETE': Role(stig=None), 'VIRT_IMAGE_READ': Role(), - 'VIRT_IMAGE_WRITE': Role(includes=['VIRT_IMAGE_READ']), + 'VIRT_IMAGE_WRITE': Role(includes=['VIRT_IMAGE_READ'], stig=None), } ROLES['READONLY_ADMIN'] = Role(includes=[role for role in ROLES if role.endswith('_READ')], builtin=False) @@ -273,19 +280,45 @@ def add_roles_to_method(self, method_name: str, roles: typing.Iterable[str]): def register_event(self, event_name: str, roles: typing.Iterable[str], *, exist_ok: bool = False): self.events.register_resource(event_name, roles, exist_ok) - def roles_for_role(self, role: str) -> typing.Set[str]: + def role_stig_check(self, role_name: str, enabled_stig: STIGType) -> bool: + role = self.roles[role_name] + if role.stig is None: + return False + + return bool(role.stig & enabled_stig) + + def roles_for_role(self, role: str, enabled_stig: STIGType | None) -> typing.Set[str]: if role not in self.roles: return set() - return set.union({role}, *[self.roles_for_role(included_role) for included_role in self.roles[role].includes]) + if not enabled_stig: + return set.union({role}, *[ + self.roles_for_role(included_role, enabled_stig) for included_role in self.roles[role].includes + ]) - def allowlist_for_role(self, role: str) -> typing.List[dict[str, str]]: + if self.roles[role].full_admin: + # Convert FULL_ADMIN to all stig-allowed roles. + return set([role_name for role_name, role in self.roles.items() if self.role_stig_check(role_name, enabled_stig)]) + + return set.union({role}, *[ + self.roles_for_role(included_role, enabled_stig) + for included_role in self.roles[role].includes if self.role_stig_check(included_role, enabled_stig) + ]) + + def allowlist_for_role(self, role: str, enabled_stig: STIGType) -> typing.List[dict[str, str]]: if role in self.roles and self.roles[role].full_admin: - return [{"method": "CALL", "resource": "*"}, {"method": "SUBSCRIBE", "resource": "*"}] + if enabled_stig: + return sum([ + self.methods.allowlists_for_roles[role] + self.events.allowlists_for_roles[role] + for role in self.roles_for_role(role, enabled_stig) + ], []) + + # Only non-stig FULL_ADMIN privilege can access REST endpoints + return [{"method": "*", "resource": "*"}] return sum([ self.methods.allowlists_for_roles[role] + self.events.allowlists_for_roles[role] - for role in self.roles_for_role(role) + for role in self.roles_for_role(role, enabled_stig) ], []) def roles_for_method(self, method_name: str) -> typing.List[str]: diff --git a/src/middlewared/middlewared/service/decorators.py b/src/middlewared/middlewared/service/decorators.py index 1a2e16d273122..1f1e3bc35f0eb 100644 --- a/src/middlewared/middlewared/service/decorators.py +++ b/src/middlewared/middlewared/service/decorators.py @@ -51,7 +51,9 @@ def filterable_internal(fn): return filterable_internal -def filterable_api_method(fn=None, /, *, roles=None, item=None, private=False, cli_private=False): +def filterable_api_method( + fn=None, /, *, roles=None, item=None, private=False, cli_private=False, authorization_required=True +): def filterable_internal(fn): fn._filterable = True if hasattr(fn, 'wraps'): @@ -65,7 +67,10 @@ def filterable_internal(fn): returns = GenericQueryResult - return api_method(QueryArgs, returns, private=private, roles=roles, cli_private=cli_private)(fn) + return api_method( + QueryArgs, returns, private=private, roles=roles, cli_private=cli_private, + authorization_required=authorization_required + )(fn) # See if we're being called as @filterable or @filterable(). if fn is None: diff --git a/src/middlewared/middlewared/test/integration/assets/account.py b/src/middlewared/middlewared/test/integration/assets/account.py index c3884d50b32bc..e196eec076363 100644 --- a/src/middlewared/middlewared/test/integration/assets/account.py +++ b/src/middlewared/middlewared/test/integration/assets/account.py @@ -43,7 +43,7 @@ def group(data): @contextlib.contextmanager -def unprivileged_user(*, username, group_name, privilege_name, allowlist, web_shell, roles=None): +def unprivileged_user(*, username, group_name, privilege_name, web_shell, roles=None): with group({ "name": group_name, }) as g: @@ -51,7 +51,6 @@ def unprivileged_user(*, username, group_name, privilege_name, allowlist, web_sh "name": privilege_name, "local_groups": [g["gid"]], "ds_groups": [], - "allowlist": allowlist, "roles": roles or [], "web_shell": web_shell, }): @@ -73,13 +72,12 @@ def unprivileged_user(*, username, group_name, privilege_name, allowlist, web_sh @contextlib.contextmanager -def unprivileged_user_client(roles=None, allowlist=None): +def unprivileged_user_client(roles=None): suffix = "".join([random.choice(string.ascii_lowercase + string.digits) for _ in range(8)]) with unprivileged_user( username=f"unprivileged_{suffix}", group_name=f"unprivileged_users_{suffix}", privilege_name=f"Unprivileged users ({suffix})", - allowlist=allowlist or [], roles=roles or [], web_shell=False, ) as t: diff --git a/src/middlewared/middlewared/test/integration/assets/product.py b/src/middlewared/middlewared/test/integration/assets/product.py index b4f51d3c8d66e..267e86de609d4 100644 --- a/src/middlewared/middlewared/test/integration/assets/product.py +++ b/src/middlewared/middlewared/test/integration/assets/product.py @@ -16,7 +16,7 @@ def product_type(product_type='SCALE_ENTERPRISE'): @contextlib.contextmanager -def enable_stig(): +def set_stig_available(): with product_type(): - with mock('system.security.config', return_value={'id': 1, 'enable_fips': True, 'enable_gpos_stig': True}): + with set_fips_available(): yield diff --git a/src/middlewared/middlewared/test/integration/assets/roles.py b/src/middlewared/middlewared/test/integration/assets/roles.py index 74501e51a2753..73853f1d6c510 100644 --- a/src/middlewared/middlewared/test/integration/assets/roles.py +++ b/src/middlewared/middlewared/test/integration/assets/roles.py @@ -21,7 +21,6 @@ def unprivileged_user_fixture(request): username=f'unprivileged_fixture_{suffix}', group_name=group_name, privilege_name=f'Unprivileged users fixture ({suffix})', - allowlist=[], roles=[], web_shell=False, ) as t: diff --git a/src/middlewared/middlewared/utils/privilege.py b/src/middlewared/middlewared/utils/privilege.py index f255440541d83..1816bfccd5bb5 100644 --- a/src/middlewared/middlewared/utils/privilege.py +++ b/src/middlewared/middlewared/utils/privilege.py @@ -77,19 +77,16 @@ def privileges_group_mapping( group_ids: list, groups_key: str, ) -> dict: - allowlist = [] roles = set() privileges_out = [] group_ids = set(group_ids) for privilege in privileges: if set(privilege[groups_key]) & group_ids: - allowlist.extend(privilege['allowlist']) roles |= set(privilege['roles']) privileges_out.append(privilege) return { - 'allowlist': allowlist, 'roles': list(roles), 'privileges': privileges_out } diff --git a/src/middlewared/middlewared/utils/security.py b/src/middlewared/middlewared/utils/security.py new file mode 100644 index 0000000000000..a9a988b85b315 --- /dev/null +++ b/src/middlewared/middlewared/utils/security.py @@ -0,0 +1,16 @@ +import enum + + +class STIGType(enum.IntFlag): + """ + Currently we are only attempting to meet a single STIG (General Purpose + Operating System). This enum is defined so that we have capability + to expand if we decide to apply more specific STIGs to different areas + of our product + """ + # https://www.stigviewer.com/stig/general_purpose_operating_system_srg/ + GPOS = enum.auto() # General Purpose Operating System + + +def system_security_config_to_stig_type(config: dict[str, bool]) -> STIGType: + return STIGType.GPOS if config['enable_gpos_stig'] else 0 diff --git a/tests/api2/test_014_failover_related.py b/tests/api2/test_014_failover_related.py index 9f1c12f758fad..7dc14b425c2da 100644 --- a/tests/api2/test_014_failover_related.py +++ b/tests/api2/test_014_failover_related.py @@ -17,7 +17,6 @@ def readonly_admin(): username='failover_guy', group_name='failover_admins', privilege_name='FAILOVER_PRIV', - allowlist=[], web_shell=False, roles=['READONLY_ADMIN'] ) as acct: diff --git a/tests/api2/test_030_activedirectory.py b/tests/api2/test_030_activedirectory.py index 3f35c9d1ea9a4..e6b24578b452c 100644 --- a/tests/api2/test_030_activedirectory.py +++ b/tests/api2/test_030_activedirectory.py @@ -199,7 +199,7 @@ def test_enable_leave_activedirectory(): res = call('privilege.query', [['name', 'C=', AD_DOMAIN]], {'get': True}) assert res['ds_groups'][0]['name'].endswith('domain admins') assert res['ds_groups'][0]['sid'].endswith('512') - assert res['allowlist'][0] == {'method': '*', 'resource': '*'} + assert res['roles'][0] == 'FULL_ADMIN' assert check_ad_started() is False @@ -345,11 +345,7 @@ def test_account_privilege_authentication(set_product_type): "name": "AD privilege", "local_groups": [], "ds_groups": [f"{domain_sid}-513"], - "allowlist": [ - {"method": "CALL", "resource": "system.info"}, - {"method": "CALL", "resource": "user.query"}, - {"method": "CALL", "resource": "group.query"}, - ], + "roles": ["READONLY_ADMIN"], "web_shell": False, }): with client(auth=(f"limiteduser@{AD_DOMAIN}", ADPASSWORD)) as c: diff --git a/tests/api2/test_275_ldap.py b/tests/api2/test_275_ldap.py index 16ca75add2468..8eb1a51414080 100644 --- a/tests/api2/test_275_ldap.py +++ b/tests/api2/test_275_ldap.py @@ -65,7 +65,7 @@ def test_account_privilege_authentication(do_ldap_connection): "name": "LDAP privilege", "local_groups": [], "ds_groups": [group["pw_gid"]], - "allowlist": [{"method": "CALL", "resource": "system.info"}], + "roles": ["READONLY_ADMIN"], "web_shell": False, }): with client(auth=(LDAPUSER, LDAPPASSWORD)) as c: diff --git a/tests/api2/test_account_privilege.py b/tests/api2/test_account_privilege.py index c172488ad0bb7..17eba1f1d35bc 100644 --- a/tests/api2/test_account_privilege.py +++ b/tests/api2/test_account_privilege.py @@ -23,13 +23,6 @@ def test_change_local_administrator_groups_to_invalid(): assert ve.value.errors[0].attribute == "privilege_update.local_groups" -def test_change_local_administrator_allowlist(): - with pytest.raises(ValidationErrors) as ve: - call("privilege.update", 1, {"allowlist": [{"method": "CALL", "resource": "system.info"}]}) - - assert ve.value.errors[0].attribute == "privilege_update.allowlist" - - def test_change_local_administrator_roles(): with pytest.raises(ValidationErrors) as ve: call("privilege.update", 1, {"roles": ['READONLY_ADMIN']}) @@ -50,7 +43,7 @@ def test_invalid_local_group(): "name": "Test", "local_groups": [1024], # invalid local group ID "ds_groups": [], - "allowlist": [{"method": "CALL", "resource": "system.info"}], + "roles": ["READONLY_ADMIN"], "web_shell": False, }) @@ -83,7 +76,7 @@ def privilege_with_orphan_local_group(): "name": "Test orphan", "local_groups": [gid], "ds_groups": [], - "allowlist": [{"method": "CALL", "resource": "system.info"}], + "roles": ["READONLY_ADMIN"], "web_shell": False, }) call("datastore.delete", "account.bsdgroups", g["id"]) diff --git a/tests/api2/test_account_privilege_authentication.py b/tests/api2/test_account_privilege_authentication.py index 70516a257aa58..bfa793b879b9e 100644 --- a/tests/api2/test_account_privilege_authentication.py +++ b/tests/api2/test_account_privilege_authentication.py @@ -20,7 +20,7 @@ def unprivileged_user(): username="unprivileged", group_name="unprivileged_users", privilege_name="Unprivileged users", - allowlist=[{"method": "CALL", "resource": "system.info"}], + roles=["READONLY_ADMIN"], web_shell=False, ) as t: yield t @@ -38,7 +38,6 @@ def unprivileged_user_with_web_shell(): username="unprivilegedws", group_name="unprivileged_users_ws", privilege_name="Unprivileged users with web shell", - allowlist=[], web_shell=True, ) as t: yield t diff --git a/tests/api2/test_account_query_roles.py b/tests/api2/test_account_query_roles.py index e1321031ae3f8..15525f79b45a8 100644 --- a/tests/api2/test_account_query_roles.py +++ b/tests/api2/test_account_query_roles.py @@ -9,11 +9,3 @@ def test_user_role_in_account(role): this_user = c.call("user.query", [["username", "=", c.username]], {"get": True}) assert this_user['roles'] == [role] - - -def test_user_role_full_admin_map(): - with unprivileged_user_client(allowlist=[{"method": "*", "resource": "*"}]) as c: - this_user = c.call("user.query", [["username", "=", c.username]], {"get": True}) - - assert "FULL_ADMIN" in this_user["roles"] - assert "HAS_ALLOW_LIST" in this_user["roles"] diff --git a/tests/api2/test_account_shell_choices.py b/tests/api2/test_account_shell_choices.py index 29ede61eae80a..cc855832f373c 100644 --- a/tests/api2/test_account_shell_choices.py +++ b/tests/api2/test_account_shell_choices.py @@ -20,7 +20,7 @@ def test_shell_choices_has_privileges(): "name": "Test", "local_groups": [g["gid"]], "ds_groups": [], - "allowlist": [{"method": "CALL", "resource": "system.info"}], + "roles": ["READONLY_ADMIN"], "web_shell": False, }) try: @@ -63,7 +63,7 @@ def test_can_create_user_with_cli_shell_with_privileges(group_payload): "name": "Test", "local_groups": [g["gid"]], "ds_groups": [], - "allowlist": [{"method": "CALL", "resource": "system.info"}], + "roles": ["READONLY_ADMIN"], "web_shell": False, }) try: @@ -116,7 +116,7 @@ def test_can_update_user_with_cli_shell_with_privileges(group_payload): "name": "Test", "local_groups": [g["gid"]], "ds_groups": [], - "allowlist": [{"method": "CALL", "resource": "system.info"}], + "roles": ["READONLY_ADMIN"], "web_shell": False, }) try: diff --git a/tests/api2/test_audit_rest.py b/tests/api2/test_audit_rest.py index 1a33d9dfeb7f5..311fe232d17c8 100644 --- a/tests/api2/test_audit_rest.py +++ b/tests/api2/test_audit_rest.py @@ -137,7 +137,6 @@ def test_unauthorized_call(): username="unprivileged", group_name="unprivileged_users", privilege_name="Unprivileged users", - allowlist=[], roles=[], web_shell=False, ) as u: diff --git a/tests/api2/test_auth_token.py b/tests/api2/test_auth_token.py index a4f89b6ab5235..499ac1e5cf727 100644 --- a/tests/api2/test_auth_token.py +++ b/tests/api2/test_auth_token.py @@ -56,7 +56,7 @@ def unprivileged_user(): username="test", group_name="test", privilege_name="test", - allowlist=[{"method": "CALL", "resource": "system.info"}], + roles=['READONLY_ADMIN'], web_shell=True, ): yield diff --git a/tests/api2/test_core_bulk.py b/tests/api2/test_core_bulk.py index c519d523a3522..db010ada331e8 100644 --- a/tests/api2/test_core_bulk.py +++ b/tests/api2/test_core_bulk.py @@ -33,7 +33,7 @@ def mock(self, job, *args): def test_authorized(): - with unprivileged_user_client(allowlist=[{"method": "CALL", "resource": "test.test1"}]) as c: + with unprivileged_user_client(roles=["MOCK"]) as c: with mock("test.test1", """ from middlewared.service import pass_app @@ -45,7 +45,7 @@ async def mock(self, app): def test_authorized_audit(): - with unprivileged_user_client(allowlist=[{"method": "CALL", "resource": "test.test1"}]) as c: + with unprivileged_user_client(roles=["MOCK"]) as c: with mock("test.test1", """ from middlewared.schema import Int from middlewared.service import accepts @@ -71,7 +71,7 @@ async def mock(self, param): def test_not_authorized(): - with unprivileged_user_client(allowlist=[]) as c: + with unprivileged_user_client(roles=[]) as c: with pytest.raises(ClientException) as ve: c.call("core.bulk", "test.test1", [[]], job=True) diff --git a/tests/api2/test_job_credentials.py b/tests/api2/test_job_credentials.py index 57cb036b9b391..0ee7498a9a56c 100644 --- a/tests/api2/test_job_credentials.py +++ b/tests/api2/test_job_credentials.py @@ -11,7 +11,8 @@ def test_job_credentials(): def mock(self, job, *args): return 42 """): - with unprivileged_user_client(allowlist=[{"method": "CALL", "resource": "test.test1"}]) as c: + with unprivileged_user_client(roles=["FULL_ADMIN"]) as c: + job_id = c.call("test.test1") job = call("core.get_jobs", [["id", "=", job_id]], {"get": True}) diff --git a/tests/api2/test_job_events.py b/tests/api2/test_job_events.py index 726de846543aa..910a1b2a383e5 100644 --- a/tests/api2/test_job_events.py +++ b/tests/api2/test_job_events.py @@ -45,7 +45,7 @@ def test_unprivileged_user_only_sees_its_own_jobs_events(): def mock(self, job, *args): return 42 """): - with unprivileged_user_client(allowlist=[{"method": "CALL", "resource": "test.test1"}]) as c: + with unprivileged_user_client(roles=["MOCK"]) as c: events = [] def callback(type, **message): diff --git a/tests/api2/test_job_lock.py b/tests/api2/test_job_lock.py index 9e476dfeb8a8a..d7cad90d8170f 100644 --- a/tests/api2/test_job_lock.py +++ b/tests/api2/test_job_lock.py @@ -131,7 +131,7 @@ def mock(self, job, *args): @pytest.mark.flaky(reruns=5, reruns_delay=5) def test_lock_queue_unprivileged_user_can_access_own_jobs(): - with unprivileged_user_client(allowlist=[{"method": "CALL", "resource": "test.test1"}]) as c: + with unprivileged_user_client(roles=["MOCK"]) as c: with mock("test.test1", """ from middlewared.service import job @@ -151,7 +151,7 @@ def mock(self, job, *args): @pytest.mark.flaky(reruns=5, reruns_delay=5) def test_lock_queue_unprivileged_user_cant_access_others_jobs(): - with unprivileged_user_client(allowlist=[{"method": "CALL", "resource": "test.test1"}]) as c: + with unprivileged_user_client(roles=["MOCK"]) as c: with mock("test.test1", """ from middlewared.service import job diff --git a/tests/api2/test_job_logs.py b/tests/api2/test_job_logs.py index 044d2e0dbe119..6ad9529f08681 100644 --- a/tests/api2/test_job_logs.py +++ b/tests/api2/test_job_logs.py @@ -10,8 +10,13 @@ @pytest.fixture(scope="module") def c(): - with unprivileged_user_client(roles=["REPLICATION_TASK_READ"], - allowlist=[{"method": "CALL", "resource": "test.test1"}]) as c: + with unprivileged_user_client(roles=["MOCK"]) as c: + yield c + + +@pytest.fixture(scope="module") +def c_replication_read(): + with unprivileged_user_client(roles=["REPLICATION_TASK_READ"]) as c: yield c @@ -59,7 +64,7 @@ def mock(self, job, *args): assert ve.value.errno == errno.EPERM -def test_job_download_logs_unprivileged_downloads_internal_logs_with_read_role(c): +def test_job_download_logs_unprivileged_downloads_internal_logs_with_read_role(c_replication_read): with mock("test.test1", """ from middlewared.service import job @@ -67,6 +72,8 @@ def test_job_download_logs_unprivileged_downloads_internal_logs_with_read_role(c def mock(self, job, *args): job.logs_fd.write(b'Job logs') """): + c = c_replication_read + jid = call("test.test1") c.call("core.job_wait", jid, job=True) diff --git a/tests/api2/test_legacy_websocket.py b/tests/api2/test_legacy_websocket.py index b0218a3381bca..0c7a84205b1c5 100644 --- a/tests/api2/test_legacy_websocket.py +++ b/tests/api2/test_legacy_websocket.py @@ -30,7 +30,6 @@ def unprivileged_client(): username=f"unprivileged_{suffix}", group_name=f"unprivileged_users_{suffix}", privilege_name=f"Unprivileged users ({suffix})", - allowlist=[], roles=["READONLY_ADMIN"], web_shell=False, ) as t: diff --git a/tests/api2/test_password_reset.py b/tests/api2/test_password_reset.py index fc99c9002005f..7141d373ba073 100644 --- a/tests/api2/test_password_reset.py +++ b/tests/api2/test_password_reset.py @@ -26,7 +26,6 @@ def test_restricted_user_set_password(): username=TEST_USERNAME, group_name=TEST_GROUPNAME, privilege_name='TEST_PASSWD_RESET_PRIVILEGE', - allowlist=[], web_shell=False, roles=['READONLY_ADMIN'] ) as acct: diff --git a/tests/api2/test_rest_api_authentication.py b/tests/api2/test_rest_api_authentication.py index 9eb55d1470ad9..e4f290fd298cc 100644 --- a/tests/api2/test_rest_api_authentication.py +++ b/tests/api2/test_rest_api_authentication.py @@ -23,7 +23,7 @@ def api_key_auth(allowlist): username="unprivileged2", group_name="unprivileged_users2", privilege_name="Unprivileged users", - allowlist=allowlist, + roles=allowlist, web_shell=False, ) as t: with api_key(t.username) as key: @@ -36,7 +36,7 @@ def login_password_auth(allowlist): username="unprivileged", group_name="unprivileged_users", privilege_name="Unprivileged users", - allowlist=allowlist, + roles=allowlist, web_shell=False, ) as t: yield dict(auth=(t.username, t.password)) @@ -48,7 +48,7 @@ def token_auth(allowlist): username="unprivileged", group_name="unprivileged_users", privilege_name="Unprivileged users", - allowlist=allowlist, + roles=allowlist, web_shell=False, ) as t: with client(auth=(t.username, t.password)) as c: @@ -61,23 +61,16 @@ def auth(request): return request.param -def test_root_api_key_rest(auth): - """We should be able to call a method with a root credential using REST API.""" - with auth([{"method": "*", "resource": "*"}]) as kwargs: - results = GET('/system/info/', **kwargs) - assert results.status_code == 200, results.text - - def test_allowed_api_key_rest_plain(auth): """We should be able to request an endpoint with a credential that allows that request using REST API.""" - with auth([{"method": "GET", "resource": "/system/info/"}]) as kwargs: + with auth(["FULL_ADMIN"]) as kwargs: results = GET('/system/info/', **kwargs) assert results.status_code == 200, results.text def test_allowed_api_key_rest_dynamic(auth): """We should be able to request a dynamic endpoint with a credential that allows that request using REST API.""" - with auth([{"method": "GET", "resource": "/user/id/{id_}/"}]) as kwargs: + with auth(["FULL_ADMIN"]) as kwargs: results = GET('/user/id/1/', **kwargs) assert results.status_code == 200, results.text @@ -86,7 +79,7 @@ def test_denied_api_key_rest(auth): """ We should not be able to request an endpoint with a credential that does not allow that request using REST API. """ - with auth([{"method": "GET", "resource": "/system/info_/"}]) as kwargs: + with auth(["ACCOUNT_READ"]) as kwargs: results = GET('/system/info/', **kwargs) assert results.status_code == 403 @@ -94,29 +87,7 @@ def test_denied_api_key_rest(auth): def test_root_api_key_upload(auth): """We should be able to call a method with root a credential using file upload endpoint.""" ip = truenas_server.ip - with auth([{"method": "*", "resource": "*"}]) as kwargs: - kwargs.pop("anonymous", None) # This key is only used for our test requests library - r = requests.post( - f"http://{ip}/_upload", - **kwargs, - data={ - "data": json.dumps({ - "method": "filesystem.put", - "params": ["/tmp/upload"], - }) - }, - files={ - "file": io.BytesIO(b"test"), - }, - timeout=10 - ) - r.raise_for_status() - - -def test_allowed_api_key_upload(auth): - """We should be able to call a method with an API that allows that call using file upload endpoint.""" - ip = truenas_server.ip - with auth([{"method": "CALL", "resource": "filesystem.put"}]) as kwargs: + with auth(["FULL_ADMIN"]) as kwargs: kwargs.pop("anonymous", None) # This key is only used for our test requests library r = requests.post( f"http://{ip}/_upload", @@ -140,7 +111,7 @@ def test_denied_api_key_upload(auth): We should not be able to call a method with a credential that does not allow that call using file upload endpoint. """ ip = truenas_server.ip - with auth([{"method": "CALL", "resource": "filesystem.put_"}]) as kwargs: + with auth(["SHARING_ADMIN"]) as kwargs: kwargs.pop("anonymous", None) # This key is only used for our test requests library r = requests.post( f"http://{ip}/_upload", diff --git a/tests/api2/test_rest_api_download.py b/tests/api2/test_rest_api_download.py index 5657380d1338d..5b6f97a4ad677 100644 --- a/tests/api2/test_rest_api_download.py +++ b/tests/api2/test_rest_api_download.py @@ -92,7 +92,7 @@ def test_download_authorization_ok(): username="unprivileged", group_name="unprivileged_users", privilege_name="Unprivileged users", - allowlist=[{"method": "CALL", "resource": "resttest.test_download_slow_pipe"}], + roles=["FULL_ADMIN"], web_shell=False, ) as user: with client(auth=(user.username, user.password)) as c: @@ -104,7 +104,7 @@ def test_download_authorization_fails(): username="unprivileged", group_name="unprivileged_users", privilege_name="Unprivileged users", - allowlist=[], + roles=["READONLY_ADMIN"], web_shell=False, ) as user: with client(auth=(user.username, user.password)) as c: diff --git a/tests/api2/test_stig.py b/tests/api2/test_stig.py index 3d6985c74c4e4..e2947470d9382 100644 --- a/tests/api2/test_stig.py +++ b/tests/api2/test_stig.py @@ -2,7 +2,8 @@ import pytest from middlewared.service_exception import CallError -from middlewared.test.integration.assets.product import enable_stig, product_type, set_fips_available +from middlewared.service_exception import ValidationErrors as Verr +from middlewared.test.integration.assets.product import product_type, set_fips_available from middlewared.test.integration.assets.two_factor_auth import ( enabled_twofactor_auth, get_user_secret, get_2fa_totp_token ) @@ -24,31 +25,13 @@ def community_product(): yield -@pytest.fixture(scope='function') +@pytest.fixture(scope='module') def two_factor_enabled(): with enabled_twofactor_auth() as two_factor_config: yield two_factor_config -@pytest.fixture(scope='function') -def setup_stig(): - # We need authenticated client to undo assurance level - with client() as c: - with enable_stig(): - # Force reconfiguration for STIG - call('system.security.configure_stig', {'enable_gpos_stig': True}) - aal = call('auth.get_authenticator_assurance_level') - assert aal == 'LEVEL_2' - - try: - yield - finally: - # Drop assurance level so that we can remove mock - # reliably - c.call('auth.set_authenticator_assurance_level', 'LEVEL_1') - - -@pytest.fixture(scope='function') +@pytest.fixture(scope='module') def two_factor_non_admin(two_factor_enabled, unprivileged_user_fixture): privilege = call('privilege.query', [['local_groups.0.group', '=', unprivileged_user_fixture.group_name]]) assert len(privilege) > 0, 'Privilege not found' @@ -56,12 +39,14 @@ def two_factor_non_admin(two_factor_enabled, unprivileged_user_fixture): try: call('user.renew_2fa_secret', unprivileged_user_fixture.username, {'interval': 60}) - yield unprivileged_user_fixture + user_obj_id = call('user.query', [['username', '=', unprivileged_user_fixture.username]], {'get': True})['id'] + secret = get_user_secret(user_obj_id) + yield (unprivileged_user_fixture, secret) finally: call('privilege.update', privilege[0]['id'], {'roles': []}) -@pytest.fixture(scope='function') +@pytest.fixture(scope='module') def two_factor_full_admin(two_factor_enabled, unprivileged_user_fixture): privilege = call('privilege.query', [['local_groups.0.group', '=', unprivileged_user_fixture.group_name]]) assert len(privilege) > 0, 'Privilege not found' @@ -76,6 +61,56 @@ def two_factor_full_admin(two_factor_enabled, unprivileged_user_fixture): call('privilege.update', privilege[0]['id'], {'roles': []}) +def do_stig_auth(c, user_obj, secret): + resp = c.call('auth.login_ex', { + 'mechanism': 'PASSWORD_PLAIN', + 'username': user_obj.username, + 'password': user_obj.password + }) + assert resp['response_type'] == 'OTP_REQUIRED' + assert resp['username'] == user_obj.username + + resp = c.call('auth.login_ex', { + 'mechanism': 'OTP_TOKEN', + 'otp_token': get_2fa_totp_token(secret) + }) + + assert resp['response_type'] == 'SUCCESS' + + +@pytest.fixture(scope='module') +def setup_stig(two_factor_full_admin): + """ Configure STIG and yield admin user object and an authenticated session """ + user_obj, secret = two_factor_full_admin + + # Create websocket connection from prior to STIG being enabled to pass to + # test methods. This connection will have unrestricted privileges (due to + # privilege_compose happening before STIG). + # + # Tests validating what can be performed under STIG restrictions should create + # a new websocket session + with product_type('SCALE_ENTERPRISE'): + with set_fips_available(True): + with client(auth=None) as c: + # Do two-factor authentication before enabling STIG support + do_stig_auth(c, user_obj, secret) + c.call('system.security.update', {'enable_fips': True, 'enable_gpos_stig': True}, job=True) + aal = c.call('auth.get_authenticator_assurance_level') + assert aal == 'LEVEL_2' + + try: + yield { + 'connection': c, + 'user_obj': user_obj, + 'secret': secret + } + finally: + c.call('system.security.update', {'enable_fips': False, 'enable_gpos_stig': False}, job=True) + + +# The order of the following tests is significant. We gradually add fixtures that have module scope +# as we finish checking for correct ValidationErrors + def test_nonenterprise_fail(community_product): with pytest.raises(ValidationErrors, match='Please contact iX sales for more information.'): call('system.security.update', {'enable_gpos_stig': True}, job=True) @@ -107,24 +142,12 @@ def test_no_current_cred_no_2fa(enterprise_product, two_factor_full_admin): call('system.security.update', {'enable_fips': True, 'enable_gpos_stig': True}, job=True) -def test_stig_enabled_authenticator_assurance_level(setup_stig, two_factor_full_admin): - # Validate that admin user can authenticate and perform operations - user_obj, secret = two_factor_full_admin - with client(auth=None) as c: - resp = c.call('auth.login_ex', { - 'mechanism': 'PASSWORD_PLAIN', - 'username': user_obj.username, - 'password': user_obj.password - }) - assert resp['response_type'] == 'OTP_REQUIRED' - assert resp['username'] == user_obj.username +# At this point STIG should be enabled on TrueNAS until end of file - resp = c.call('auth.login_ex', { - 'mechanism': 'OTP_TOKEN', - 'otp_token': get_2fa_totp_token(secret) - }) - assert resp['response_type'] == 'SUCCESS' +def test_stig_enabled_authenticator_assurance_level(setup_stig): + # Validate that admin user can authenticate and perform operations + setup_stig['connection'].call('system.info') # Auth for account without 2fa should fail with pytest.raises(CallError) as ce: @@ -132,3 +155,45 @@ def test_stig_enabled_authenticator_assurance_level(setup_stig, two_factor_full_ pass assert ce.value.errno == errno.EOPNOTSUPP + + # We should also be able to create a new websocket connection + # The previous one was created before enabling STIG + with client(auth=None) as c: + do_stig_auth(c, setup_stig['user_obj'], setup_stig['secret']) + c.call('system.info') + + +def test_stig_roles_decrease(setup_stig): + + # We need new websocket connection to verify that privileges + # are appropriately decreased + with client(auth=None) as c: + do_stig_auth(c, setup_stig['user_obj'], setup_stig['secret']) + + me = c.call('auth.me') + for role in c.call('privilege.roles'): + if role['stig'] is not None: + assert role['name'] in me['privilege']['roles'] + else: + assert role['name'] not in me['privilege']['roles'] + + assert me['privilege']['web_shell'] is False + assert me['privilege']['webui_access'] is True + + +def test_stig_smb_auth_disabled(setup_stig): + # We need new websocket connection to verify that privileges + # are appropriately decreased + + smb_user_cnt = setup_stig['connection'].call('user.query', [['smb', '=', True]], {'count': True}) + assert smb_user_cnt == 0 + + # We shouldn't be able to create new SMB users + with pytest.raises(Verr, match='General Purpose OS STIG'): + setup_stig['connection'].call('user.create', { + 'username': 'CANARY', + 'full_name': 'CANARY', + 'password': 'CANARY', + 'group_create': True, + 'smb': True + }) diff --git a/tests/api2/test_twofactor_auth.py b/tests/api2/test_twofactor_auth.py index 7a4773bdefe62..61d374884239c 100644 --- a/tests/api2/test_twofactor_auth.py +++ b/tests/api2/test_twofactor_auth.py @@ -164,7 +164,6 @@ def test_restricted_user_2fa_secret_renewal(clear_ratelimit): username=TEST_USERNAME, group_name='TEST_2FA_GROUP', privilege_name='TEST_2FA_PRIVILEGE', - allowlist=[], web_shell=False, roles=['READONLY_ADMIN'] ) as acct: diff --git a/tests/unit/test_role_manager.py b/tests/unit/test_role_manager.py new file mode 100644 index 0000000000000..36577b3cc5039 --- /dev/null +++ b/tests/unit/test_role_manager.py @@ -0,0 +1,73 @@ +import pytest + +from middlewared.role import RoleManager, ROLES +from middlewared.utils import security + + +FAKE_METHODS = [ + ('fakemethod1', ['READONLY_ADMIN']), + ('fakemethod2', ['FILESYSTEM_DATA_READ']), + ('fakemethod3', ['CATALOG_WRITE']), + ('fakemethod4', ['DOCKER_READ']), + ('fakemethod5', ['ACCOUNT_READ']), + ('fakemethod6', ['ACCOUNT_WRITE']), +] + +FAKE_METHODS_NOSTIG = frozenset(['fakemethod3']) +ALL_METHODS = frozenset([method for method, roles in FAKE_METHODS]) +WRITE_METHODS = frozenset(['fakemethod3', 'fakemethod6']) +READONLY_ADMIN_METHODS = ALL_METHODS - WRITE_METHODS +FULL_ADMIN_STIG = ALL_METHODS - FAKE_METHODS_NOSTIG + + +@pytest.fixture(scope='module') +def nostig_roles(): + # Generate list of expected roles that should be unavailble for STIG mode + PREFIXES = ('VM', 'TRUECOMMAND', 'CATALOG', 'DOCKER', 'APPS', 'VIRT') + yield set([ + role_name for + role_name in list(ROLES.keys()) if role_name.startswith(PREFIXES) and not role_name.endswith('READ') + ]) + + +@pytest.fixture(scope='module') +def role_manager(): + """ A role manager populated with made up methods """ + rm = RoleManager(roles=ROLES) + for method, roles in FAKE_METHODS: + rm.register_method(method_name=method, roles=roles) + + yield rm + + +@pytest.mark.parametrize('role_name', list(ROLES.keys())) +def test__roles_have_correct_stig_assignment(nostig_roles, role_name): + role_to_check = ROLES[role_name] + assert type(role_to_check.stig) in (security.STIGType, type(None)) + + if role_to_check.stig is not None: + assert role_name not in nostig_roles + else: + assert role_name in nostig_roles + + # There should only be one role that grants full admin privileges + if role_name == 'FULL_ADMIN': + assert role_to_check.full_admin is True + else: + assert role_to_check.full_admin is False + + +@pytest.mark.parametrize('role,method,enabled_stig_type,resources', [ + ('READONLY_ADMIN', 'CALL', None, READONLY_ADMIN_METHODS), + ('READONLY_ADMIN', 'CALL', security.STIGType.GPOS, READONLY_ADMIN_METHODS), + ('FULL_ADMIN', '*', None, {'*'}), + ('FULL_ADMIN', 'CALL', security.STIGType.GPOS, FULL_ADMIN_STIG), +]) +def test__roles_have_correct_allowlist(role_manager, role, method, enabled_stig_type, resources): + allowlist = role_manager.allowlist_for_role(role, enabled_stig_type) + allowlist_resources = set() + for entry in allowlist: + assert method == entry['method'] + allowlist_resources.add(entry['resource']) + + assert allowlist_resources == resources