Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test access right roles in GeoServer #983

Merged
merged 4 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
- GET [Layers](doc/rest.md#get-layers)/[Maps](doc/rest.md#get-maps)/[Publications](doc/rest.md#get-publications)
- DELETE Workspace [Layers](doc/rest.md#delete-workspace-layers)/[Maps](doc/rest.md#delete-workspace-maps)
- POST Workspace [Layers](doc/rest.md#post-workspace-layers)/[Maps](doc/rest.md#post-workspace-maps) respects roles in [GRANT_CREATE_PUBLIC_WORKSPACE](doc/env-settings.md#grant_create_public_workspace) and [GRANT_PUBLISH_IN_PUBLIC_WORKSPACE](doc/env-settings.md#grant_publish_in_public_workspace)
- requests to [WMS](doc/endpoints.md#web-map-service) and [WFS](doc/endpoints.md#web-feature-service) endpoints
- [#165](https://github.com/LayerManager/layman/issues/165) Many endpoints return previously associated [role names](doc/models.md#role) in `access_rights.read` and `access_rights.write` keys:
- [GET](doc/rest.md#get-workspace-layer)/[PATCH](doc/rest.md#patch-workspace-layer) Workspace Layer
- [GET](doc/rest.md#get-workspace-map)/[PATCH](doc/rest.md#patch-workspace-map) Workspace Map
Expand Down
9 changes: 6 additions & 3 deletions src/layman/common/geoserver/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from geoserver.util import username_to_rolename
from layman import settings
from layman.authz import is_user


def layman_users_to_geoserver_roles(layman_users):
def layman_users_and_roles_to_geoserver_roles(layman_users_and_roles):
geoserver_roles = set()
for layman_user in layman_users:
for layman_user in layman_users_and_roles:
if layman_user == settings.RIGHTS_EVERYONE_ROLE:
geoserver_roles.add('ROLE_ANONYMOUS')
geoserver_roles.add('ROLE_AUTHENTICATED')
else:
elif is_user(layman_user):
geoserver_roles.add(username_to_rolename(layman_user))
else:
geoserver_roles.add(layman_user)
return geoserver_roles
15 changes: 15 additions & 0 deletions src/layman/common/geoserver/geoserver_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import pytest

from layman import settings
from . import layman_users_and_roles_to_geoserver_roles


@pytest.mark.parametrize('layman_users_and_roles, exp_geoserver_roles', [
pytest.param({'username'}, {'USER_USERNAME'}, id='username'),
pytest.param({'ROLE'}, {'ROLE'}, id='rolename'),
pytest.param({settings.RIGHTS_EVERYONE_ROLE}, {'ROLE_ANONYMOUS', 'ROLE_AUTHENTICATED'}, id='everyone-role'),
pytest.param({f'username2', 'ROLE2', settings.RIGHTS_EVERYONE_ROLE}, {'USER_USERNAME2', 'ROLE2', 'ROLE_ANONYMOUS', 'ROLE_AUTHENTICATED'}, id='everything'),
])
def test_layman_users_and_roles_to_geoserver_roles(layman_users_and_roles, exp_geoserver_roles):
result = layman_users_and_roles_to_geoserver_roles(layman_users_and_roles)
assert result == exp_geoserver_roles
4 changes: 2 additions & 2 deletions src/layman/layer/geoserver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ def set_security_rules(workspace, layer, access_rights, auth, geoserver_workspac
read_roles = access_rights.get('read') if access_rights and access_rights.get('read') else layer_info['access_rights']['read']
write_roles = access_rights.get('write') if access_rights and access_rights.get('write') else layer_info['access_rights']['write']

security_read_roles = gs_common.layman_users_to_geoserver_roles(read_roles)
security_read_roles = gs_common.layman_users_and_roles_to_geoserver_roles(read_roles)
gs_util.ensure_layer_security_roles(geoserver_workspace, layer, security_read_roles, 'r', auth)

security_write_roles = gs_common.layman_users_to_geoserver_roles(write_roles)
security_write_roles = gs_common.layman_users_and_roles_to_geoserver_roles(write_roles)
gs_util.ensure_layer_security_roles(geoserver_workspace, layer, security_write_roles, 'w', auth)


Expand Down
4 changes: 2 additions & 2 deletions src/layman/layer/geoserver/wfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ def patch_layer(workspace, layername, title, description, original_data_source,
clear_cache(workspace)

if access_rights and access_rights.get('read'):
security_read_roles = gs_common.layman_users_to_geoserver_roles(access_rights['read'])
security_read_roles = gs_common.layman_users_and_roles_to_geoserver_roles(access_rights['read'])
gs_util.ensure_layer_security_roles(workspace, layername, security_read_roles, 'r', settings.LAYMAN_GS_AUTH)

if access_rights and access_rights.get('write'):
security_write_roles = gs_common.layman_users_to_geoserver_roles(access_rights['write'])
security_write_roles = gs_common.layman_users_and_roles_to_geoserver_roles(access_rights['write'])
gs_util.ensure_layer_security_roles(workspace, layername, security_write_roles, 'w', settings.LAYMAN_GS_AUTH)


Expand Down
4 changes: 2 additions & 2 deletions src/layman/layer/geoserver/wms.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ def patch_layer(workspace, layername, original_data_source, title, description,
clear_cache(workspace)

if access_rights and access_rights.get('read'):
security_read_roles = gs_common.layman_users_to_geoserver_roles(access_rights['read'])
security_read_roles = gs_common.layman_users_and_roles_to_geoserver_roles(access_rights['read'])
gs_util.ensure_layer_security_roles(geoserver_workspace, layername, security_read_roles, 'r', settings.LAYMAN_GS_AUTH)

if access_rights and access_rights.get('write'):
security_write_roles = gs_common.layman_users_to_geoserver_roles(access_rights['write'])
security_write_roles = gs_common.layman_users_and_roles_to_geoserver_roles(access_rights['write'])
gs_util.ensure_layer_security_roles(geoserver_workspace, layername, security_write_roles, 'w', settings.LAYMAN_GS_AUTH)


Expand Down
18 changes: 15 additions & 3 deletions tests/asserts/final/publication/geoserver_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,24 @@ def is_complete_in_workspace_wms(workspace, publ_type, name, *, version, headers
geoserver_util.is_complete_in_workspace_wms_instance(wms_inst, name, validate_metadata_url=validate_metadata_url)


def is_complete_in_workspace_wms_1_3_0(workspace, publ_type, name, headers):
def is_complete_in_workspace_wms_1_3_0(workspace, publ_type, name, headers=None, *, actor_name=None):
headers = headers or {}
assert headers is not None or actor_name is not None
if actor_name:
assert process_client.TOKEN_HEADER not in headers
if actor_name and actor_name != settings.ANONYM_USER:
headers.update(process_client.get_authz_headers(actor_name))
assert publ_type == process_client.LAYER_TYPE
is_complete_in_workspace_wms(workspace, publ_type, name, version='1.3.0', headers=headers)


def workspace_wfs_2_0_0_capabilities_available_if_vector(workspace, publ_type, name, headers):
def workspace_wfs_2_0_0_capabilities_available_if_vector(workspace, publ_type, name, headers=None, *, actor_name=None):
headers = headers or {}
assert headers is not None or actor_name is not None
if actor_name:
assert process_client.TOKEN_HEADER not in headers
if actor_name and actor_name != settings.ANONYM_USER:
headers.update(process_client.get_authz_headers(actor_name))
with app.app_context():
internal_wfs_url = test_util.url_for('geoserver_proxy_bp.proxy', subpath=workspace + '/wfs')

Expand All @@ -32,7 +44,7 @@ def workspace_wfs_2_0_0_capabilities_available_if_vector(workspace, publ_type, n

assert wfs_inst.contents
wfs_name = f'{workspace}:{name}'
assert wfs_name in wfs_inst.contents
assert wfs_name in wfs_inst.contents, "Layer not found in Capabilities."
wfs_layer = wfs_inst.contents[wfs_name]
assert len(wfs_layer.metadataUrls) == 1
assert wfs_layer.metadataUrls[0]['url'].startswith('http://localhost:3080/record/xml/m-')
Expand Down
2 changes: 1 addition & 1 deletion tests/asserts/final/publication/geoserver_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
def is_complete_in_workspace_wms_instance(wms_instance, name, *, validate_metadata_url):
assert wms_instance.contents
assert name in wms_instance.contents
assert name in wms_instance.contents, "Layer not found in Capabilities."
wms_layer = wms_instance.contents[name]
for style_name, style_values in wms_layer.styles.items():
assert 'legend' in style_values, f'style_name={style_name}, style_values={style_values}'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from layman import settings, LaymanError
from test_tools import process_client, role_service as role_service_util
from tests import Publication, EnumTestTypes
from tests.asserts.final.publication import geoserver_proxy
from tests.dynamic_data import base_test

ENDPOINTS_TO_TEST = {
Expand All @@ -31,6 +32,11 @@
(process_client.delete_workspace_publication, {}),
]

GEOSERVER_METHODS_TO_TEST = [
(geoserver_proxy.is_complete_in_workspace_wms_1_3_0, {}),
(geoserver_proxy.workspace_wfs_2_0_0_capabilities_available_if_vector, {}),
]


def pytest_generate_tests(metafunc):
# https://docs.pytest.org/en/6.2.x/parametrize.html#pytest-generate-tests
Expand Down Expand Up @@ -73,8 +79,9 @@ def add_publication_test_cases_to_list(tc_list, publication, user, endpoints_to_
'layer': publication.name,
'actor_name': user,
'publication_type': publication.type,
'publ_type': publication.type,
}
for method, args in endpoints_to_test[publication.type]:
for method, args in endpoints_to_test:
pytest_id = f'{method.__name__}__{user.split("_")[-1]}__{publication.name[5:]}{("__" + next(iter(args.keys()))) if args else ""}'
method_args = inspect.getfullargspec(method).args + inspect.getfullargspec(method).kwonlyargs

Expand All @@ -93,7 +100,7 @@ def generate_positive_test_cases(publications_user_can_read):
tc_list = []
for user, publications in publications_user_can_read.items():
for publication in publications:
add_publication_test_cases_to_list(tc_list, publication, user, ENDPOINTS_TO_TEST)
add_publication_test_cases_to_list(tc_list, publication, user, ENDPOINTS_TO_TEST[publication.type])
return tc_list


Expand All @@ -104,7 +111,7 @@ def generate_negative_test_cases(publications_user_can_read, publication_all):
if publication in available_publications:
continue
endpoints_to_test = {publ_type: endpoints + ENDPOINTS_TO_TEST_NEGATIVE_ONLY for publ_type, endpoints in ENDPOINTS_TO_TEST.items()}
add_publication_test_cases_to_list(tc_list, publication, user, endpoints_to_test)
add_publication_test_cases_to_list(tc_list, publication, user, endpoints_to_test[publication.type])
return tc_list


Expand Down Expand Up @@ -139,6 +146,24 @@ def generate_multiendpoint_test_cases(publications_user_can_read, workspace, ):
return tc_list


def generate_positive_geoserver_test_cases(publications_user_can_read):
tc_list = []
for user, publications in publications_user_can_read.items():
for publication in publications:
if publication.type == process_client.LAYER_TYPE:
add_publication_test_cases_to_list(tc_list, publication, user, GEOSERVER_METHODS_TO_TEST)
return tc_list


def generate_geoserver_negative_test_cases(publications_user_can_read, publication_all):
tc_list = []
for user, available_publications in publications_user_can_read.items():
for publication in publication_all:
if publication not in available_publications and publication.type == process_client.LAYER_TYPE:
add_publication_test_cases_to_list(tc_list, publication, user, GEOSERVER_METHODS_TO_TEST)
return tc_list


@pytest.mark.timeout(60)
@pytest.mark.usefixtures('ensure_layman_module', 'oauth2_provider_mock')
class TestAccessRights:
Expand Down Expand Up @@ -197,9 +222,10 @@ class TestAccessRights:
}

test_cases = {
'test_single_positive': generate_positive_test_cases(PUBLICATIONS_BY_USER),
'test_single_positive': generate_positive_test_cases(PUBLICATIONS_BY_USER) + generate_positive_geoserver_test_cases(PUBLICATIONS_BY_USER),
'test_single_negative': generate_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS),
'test_multiendpoint': generate_multiendpoint_test_cases(PUBLICATIONS_BY_USER, OWNER),
'test_geoserver_negative': generate_geoserver_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS),
}

@pytest.fixture(scope='class', autouse=True)
Expand Down Expand Up @@ -238,3 +264,8 @@ def test_multiendpoint(self, rest_method, rest_args, params):
result = rest_method(**rest_args)
result_publications = [(publ['workspace'], f"layman.{publ['publication_type']}", publ['name']) for publ in result]
assert result_publications == params['exp_publications']

def test_geoserver_negative(self, rest_method, rest_args, ):
with pytest.raises(AssertionError) as exc_info:
rest_method(**rest_args)
assert exc_info.value.args[0].startswith('Layer not found in Capabilities.')
32 changes: 23 additions & 9 deletions tests/dynamic_data/publications/access_rights/test_role.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import pytest

from geoserver import util as gs_util
from layman import app, settings, util as layman_util
from layman.common import geoserver as gs_common
from test_tools import process_client, role_service
from tests import EnumTestTypes, Publication
from tests.asserts.final.publication import util as assert_util
from tests.dynamic_data import base_test, base_test_classes
from tests.dynamic_data.publications import common_publications

pytest_generate_tests = base_test.pytest_generate_tests


class PublicationTypes(base_test_classes.PublicationByDefinitionBase):
LAYER = (common_publications.LAYER_VECTOR_SLD, 'layer')
MAP = (common_publications.MAP_EMPTY, 'map')


USERNAME = 'test_access_rights_role_user1'
USER_ROLE1_ROLE3_EVERYONE = {USERNAME, 'ROLE1', 'ROLE3', 'EVERYONE'}
USER_ROLE1 = {USERNAME, 'ROLE1'}
Expand All @@ -27,14 +24,16 @@ class TestPublication(base_test.TestSingleRestPublication):
publication_type = None

rest_parametrization = [
PublicationTypes,
base_test.PublicationByUsedServers,
index-git marked this conversation as resolved.
Show resolved Hide resolved
base_test_classes.RestMethod
]

usernames_to_reserve = [
USERNAME,
]

external_tables_to_create = base_test_classes.EXTERNAL_TABLE_FOR_LAYERS_BY_USED_SERVERS

def before_class(self):
for role in ROLES:
role_service.ensure_role(role)
Expand Down Expand Up @@ -74,5 +73,20 @@ def test_publication(self, publication, rest_method, rest_args):

info = process_client.get_workspace_publication(publication.type, publication.workspace, publication.name,
actor_name=USERNAME)
assert set(info['access_rights']['read']) == USER_ROLE1_ROLE2
assert set(info['access_rights']['write']) == USER_ROLE1
for right, exp_rights in [('read', USER_ROLE1_ROLE2),
index-git marked this conversation as resolved.
Show resolved Hide resolved
('write', USER_ROLE1),
]:
assert set(info['access_rights'][right]) == exp_rights

if publication.type == process_client.LAYER_TYPE:
with app.app_context():
internal_info = layman_util.get_publication_info(publication.workspace, publication.type, publication.name, {'keys': ['geodata_type', 'wms']})

geodata_type = internal_info['geodata_type']
gs_workspace = internal_info['_wms']['workspace']
workspaces = [publication.workspace, gs_workspace] if geodata_type != settings.GEODATA_TYPE_RASTER else [gs_workspace]
for wspace in workspaces:
gs_expected_roles = gs_common.layman_users_and_roles_to_geoserver_roles(exp_rights)
rule = f'{wspace}.{publication.name}.{right[0]}'
gs_roles = gs_util.get_security_roles(rule, settings.LAYMAN_GS_AUTH)
assert gs_expected_roles == gs_roles, f'gs_expected_roles={gs_expected_roles}, gs_roles={gs_roles}, wspace={wspace}, rule={rule}'
26 changes: 1 addition & 25 deletions tests/static_data/single_publication/layers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import crs as crs_def
from geoserver import GS_REST_WORKSPACES, GS_REST, GS_AUTH, util as gs_util
from layman import settings, app, util as layman_util
from layman.common import bbox as bbox_util, geoserver as gs_common
from layman.common import bbox as bbox_util
from layman.common.micka import util as micka_common_util
from layman.layer import util as layer_util, db as layer_db, get_layer_info_keys
from layman.layer.geoserver.wms import DEFAULT_WMS_QGIS_STORE_PREFIX, VERSION
Expand Down Expand Up @@ -254,30 +254,6 @@ def test_fill_project_template(workspace, publ_type, publication):
assert excinfo.value.response.status_code == 500


@pytest.mark.parametrize('workspace, publ_type, publication', data.LIST_LAYERS)
@pytest.mark.usefixtures('oauth2_provider_mock', 'ensure_layman')
def test_gs_data_security(workspace, publ_type, publication):
ensure_publication(workspace, publ_type, publication)

auth = settings.LAYMAN_GS_AUTH
is_personal_workspace = workspace in data.USERS
owner_and_everyone_roles = gs_common.layman_users_to_geoserver_roles({workspace, settings.RIGHTS_EVERYONE_ROLE})
owner_role_set = gs_common.layman_users_to_geoserver_roles({workspace})
with app.app_context():
info = layman_util.get_publication_info(workspace, publ_type, publication, context={'keys': ['access_rights', 'wms']})
expected_roles = info['access_rights']
gs_workspace = info['_wms']['workspace']
geodata_type = data.PUBLICATIONS[(workspace, publ_type, publication)][data.TEST_DATA].get('geodata_type')
workspaces = [workspace, gs_workspace] if geodata_type != settings.GEODATA_TYPE_RASTER else [gs_workspace]
for right_type in ['read', 'write']:
for wspace in workspaces:
gs_expected_roles = gs_common.layman_users_to_geoserver_roles(expected_roles[right_type])
gs_roles = gs_util.get_security_roles(f'{wspace}.{publication}.{right_type[0]}', auth)
assert gs_expected_roles == gs_roles\
or (is_personal_workspace
and gs_expected_roles == owner_and_everyone_roles == gs_roles.union(owner_role_set)), f'gs_expected_roles={gs_expected_roles}, gs_roles={gs_roles}, wspace={wspace}, is_personal_workspace={is_personal_workspace}'


@pytest.mark.parametrize('workspace, publ_type, publication', [(wspace, ptype, pub)
for wspace, ptype, pub in data.LIST_LAYERS
if data.PUBLICATIONS[(wspace, ptype, pub)][data.TEST_DATA].get('micka_xml')])
Expand Down
Loading