Skip to content

Commit

Permalink
Test roles in GeoServer ACL rules
Browse files Browse the repository at this point in the history
  • Loading branch information
index-git committed Dec 20, 2023
1 parent 13dc04e commit 9456f2c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 34 deletions.
32 changes: 23 additions & 9 deletions tests/dynamic_data/publications/access_rights/test_role.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import pytest

from geoserver import util as gs_util
from layman import app, settings, util as layman_util
from layman.common import geoserver as gs_common
from test_tools import process_client, role_service
from tests import EnumTestTypes, Publication
from tests.asserts.final.publication import util as assert_util
from tests.dynamic_data import base_test, base_test_classes
from tests.dynamic_data.publications import common_publications

pytest_generate_tests = base_test.pytest_generate_tests


class PublicationTypes(base_test_classes.PublicationByDefinitionBase):
LAYER = (common_publications.LAYER_VECTOR_SLD, 'layer')
MAP = (common_publications.MAP_EMPTY, 'map')


USERNAME = 'test_access_rights_role_user1'
USER_ROLE1_ROLE3_EVERYONE = {USERNAME, 'ROLE1', 'ROLE3', 'EVERYONE'}
USER_ROLE1 = {USERNAME, 'ROLE1'}
Expand All @@ -27,14 +24,16 @@ class TestPublication(base_test.TestSingleRestPublication):
publication_type = None

rest_parametrization = [
PublicationTypes,
base_test.PublicationByUsedServers,
base_test_classes.RestMethod
]

usernames_to_reserve = [
USERNAME,
]

external_tables_to_create = base_test_classes.EXTERNAL_TABLE_FOR_LAYERS_BY_USED_SERVERS

def before_class(self):
for role in ROLES:
role_service.ensure_role(role)
Expand Down Expand Up @@ -74,5 +73,20 @@ def test_publication(self, publication, rest_method, rest_args):

info = process_client.get_workspace_publication(publication.type, publication.workspace, publication.name,
actor_name=USERNAME)
assert set(info['access_rights']['read']) == USER_ROLE1_ROLE2
assert set(info['access_rights']['write']) == USER_ROLE1
for right, exp_rights in [('read', USER_ROLE1_ROLE2),
('write', USER_ROLE1),
]:
assert set(info['access_rights'][right]) == exp_rights

if publication.type == process_client.LAYER_TYPE:
with app.app_context():
internal_info = layman_util.get_publication_info(publication.workspace, publication.type, publication.name, {'keys': ['geodata_type', 'wms']})

geodata_type = internal_info['geodata_type']
gs_workspace = internal_info['_wms']['workspace']
workspaces = [publication.workspace, gs_workspace] if geodata_type != settings.GEODATA_TYPE_RASTER else [gs_workspace]
for wspace in workspaces:
gs_expected_roles = gs_common.layman_users_and_roles_to_geoserver_roles(exp_rights)
rule = f'{wspace}.{publication.name}.{right[0]}'
gs_roles = gs_util.get_security_roles(rule, settings.LAYMAN_GS_AUTH)
assert gs_expected_roles == gs_roles, f'gs_expected_roles={gs_expected_roles}, gs_roles={gs_roles}, wspace={wspace}, rule={rule}'
26 changes: 1 addition & 25 deletions tests/static_data/single_publication/layers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import crs as crs_def
from geoserver import GS_REST_WORKSPACES, GS_REST, GS_AUTH, util as gs_util
from layman import settings, app, util as layman_util
from layman.common import bbox as bbox_util, geoserver as gs_common
from layman.common import bbox as bbox_util
from layman.common.micka import util as micka_common_util
from layman.layer import util as layer_util, db as layer_db, get_layer_info_keys
from layman.layer.geoserver.wms import DEFAULT_WMS_QGIS_STORE_PREFIX, VERSION
Expand Down Expand Up @@ -254,30 +254,6 @@ def test_fill_project_template(workspace, publ_type, publication):
assert excinfo.value.response.status_code == 500


@pytest.mark.parametrize('workspace, publ_type, publication', data.LIST_LAYERS)
@pytest.mark.usefixtures('oauth2_provider_mock', 'ensure_layman')
def test_gs_data_security(workspace, publ_type, publication):
ensure_publication(workspace, publ_type, publication)

auth = settings.LAYMAN_GS_AUTH
is_personal_workspace = workspace in data.USERS
owner_and_everyone_roles = gs_common.layman_users_and_roles_to_geoserver_roles({workspace, settings.RIGHTS_EVERYONE_ROLE})
owner_role_set = gs_common.layman_users_and_roles_to_geoserver_roles({workspace})
with app.app_context():
info = layman_util.get_publication_info(workspace, publ_type, publication, context={'keys': ['access_rights', 'wms']})
expected_roles = info['access_rights']
gs_workspace = info['_wms']['workspace']
geodata_type = data.PUBLICATIONS[(workspace, publ_type, publication)][data.TEST_DATA].get('geodata_type')
workspaces = [workspace, gs_workspace] if geodata_type != settings.GEODATA_TYPE_RASTER else [gs_workspace]
for right_type in ['read', 'write']:
for wspace in workspaces:
gs_expected_roles = gs_common.layman_users_and_roles_to_geoserver_roles(expected_roles[right_type])
gs_roles = gs_util.get_security_roles(f'{wspace}.{publication}.{right_type[0]}', auth)
assert gs_expected_roles == gs_roles\
or (is_personal_workspace
and gs_expected_roles == owner_and_everyone_roles == gs_roles.union(owner_role_set)), f'gs_expected_roles={gs_expected_roles}, gs_roles={gs_roles}, wspace={wspace}, is_personal_workspace={is_personal_workspace}'


@pytest.mark.parametrize('workspace, publ_type, publication', [(wspace, ptype, pub)
for wspace, ptype, pub in data.LIST_LAYERS
if data.PUBLICATIONS[(wspace, ptype, pub)][data.TEST_DATA].get('micka_xml')])
Expand Down

0 comments on commit 9456f2c

Please sign in to comment.