Skip to content

Commit

Permalink
Test access rights application on GeoServer
Browse files Browse the repository at this point in the history
  • Loading branch information
index-git committed Dec 20, 2023
1 parent 9456f2c commit 79fd70d
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
- GET [Layers](doc/rest.md#get-layers)/[Maps](doc/rest.md#get-maps)/[Publications](doc/rest.md#get-publications)
- DELETE Workspace [Layers](doc/rest.md#delete-workspace-layers)/[Maps](doc/rest.md#delete-workspace-maps)
- POST Workspace [Layers](doc/rest.md#post-workspace-layers)/[Maps](doc/rest.md#post-workspace-maps) respects roles in [GRANT_CREATE_PUBLIC_WORKSPACE](doc/env-settings.md#grant_create_public_workspace) and [GRANT_PUBLISH_IN_PUBLIC_WORKSPACE](doc/env-settings.md#grant_publish_in_public_workspace)
- requests to [WMS](doc/endpoints.md#web-map-service) and [WFS](doc/endpoints.md#web-feature-service) endpoints
- [#165](https://github.com/LayerManager/layman/issues/165) Many endpoints return previously associated [role names](doc/models.md#role) in `access_rights.read` and `access_rights.write` keys:
- [GET](doc/rest.md#get-workspace-layer)/[PATCH](doc/rest.md#patch-workspace-layer) Workspace Layer
- [GET](doc/rest.md#get-workspace-map)/[PATCH](doc/rest.md#patch-workspace-map) Workspace Map
Expand Down
18 changes: 15 additions & 3 deletions tests/asserts/final/publication/geoserver_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,24 @@ def is_complete_in_workspace_wms(workspace, publ_type, name, *, version, headers
geoserver_util.is_complete_in_workspace_wms_instance(wms_inst, name, validate_metadata_url=validate_metadata_url)


def is_complete_in_workspace_wms_1_3_0(workspace, publ_type, name, headers):
def is_complete_in_workspace_wms_1_3_0(workspace, publ_type, name, headers=None, *, actor_name=None):
headers = headers or {}
assert headers is not None or actor_name is not None
if actor_name:
assert process_client.TOKEN_HEADER not in headers
if actor_name and actor_name != settings.ANONYM_USER:
headers.update(process_client.get_authz_headers(actor_name))
assert publ_type == process_client.LAYER_TYPE
is_complete_in_workspace_wms(workspace, publ_type, name, version='1.3.0', headers=headers)


def workspace_wfs_2_0_0_capabilities_available_if_vector(workspace, publ_type, name, headers):
def workspace_wfs_2_0_0_capabilities_available_if_vector(workspace, publ_type, name, headers=None, *, actor_name=None):
headers = headers or {}
assert headers is not None or actor_name is not None
if actor_name:
assert process_client.TOKEN_HEADER not in headers
if actor_name and actor_name != settings.ANONYM_USER:
headers.update(process_client.get_authz_headers(actor_name))
with app.app_context():
internal_wfs_url = test_util.url_for('geoserver_proxy_bp.proxy', subpath=workspace + '/wfs')

Expand All @@ -32,7 +44,7 @@ def workspace_wfs_2_0_0_capabilities_available_if_vector(workspace, publ_type, n

assert wfs_inst.contents
wfs_name = f'{workspace}:{name}'
assert wfs_name in wfs_inst.contents
assert wfs_name in wfs_inst.contents, "Layer not found in Capabilities."
wfs_layer = wfs_inst.contents[wfs_name]
assert len(wfs_layer.metadataUrls) == 1
assert wfs_layer.metadataUrls[0]['url'].startswith('http://localhost:3080/record/xml/m-')
Expand Down
2 changes: 1 addition & 1 deletion tests/asserts/final/publication/geoserver_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
def is_complete_in_workspace_wms_instance(wms_instance, name, *, validate_metadata_url):
assert wms_instance.contents
assert name in wms_instance.contents
assert name in wms_instance.contents, "Layer not found in Capabilities."
wms_layer = wms_instance.contents[name]
for style_name, style_values in wms_layer.styles.items():
assert 'legend' in style_values, f'style_name={style_name}, style_values={style_values}'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from layman import settings, LaymanError
from test_tools import process_client, role_service as role_service_util
from tests import Publication, EnumTestTypes
from tests.asserts.final.publication import geoserver_proxy
from tests.dynamic_data import base_test

ENDPOINTS_TO_TEST = {
Expand All @@ -31,6 +32,11 @@
(process_client.delete_workspace_publication, {}),
]

GEOSERVER_METHODS_TO_TEST = [
(geoserver_proxy.is_complete_in_workspace_wms_1_3_0, {}),
(geoserver_proxy.workspace_wfs_2_0_0_capabilities_available_if_vector, {}),
]


def pytest_generate_tests(metafunc):
# https://docs.pytest.org/en/6.2.x/parametrize.html#pytest-generate-tests
Expand Down Expand Up @@ -73,8 +79,9 @@ def add_publication_test_cases_to_list(tc_list, publication, user, endpoints_to_
'layer': publication.name,
'actor_name': user,
'publication_type': publication.type,
'publ_type': publication.type,
}
for method, args in endpoints_to_test[publication.type]:
for method, args in endpoints_to_test:
pytest_id = f'{method.__name__}__{user.split("_")[-1]}__{publication.name[5:]}{("__" + next(iter(args.keys()))) if args else ""}'
method_args = inspect.getfullargspec(method).args + inspect.getfullargspec(method).kwonlyargs

Expand All @@ -93,7 +100,7 @@ def generate_positive_test_cases(publications_user_can_read):
tc_list = []
for user, publications in publications_user_can_read.items():
for publication in publications:
add_publication_test_cases_to_list(tc_list, publication, user, ENDPOINTS_TO_TEST)
add_publication_test_cases_to_list(tc_list, publication, user, ENDPOINTS_TO_TEST[publication.type])
return tc_list


Expand All @@ -104,7 +111,7 @@ def generate_negative_test_cases(publications_user_can_read, publication_all):
if publication in available_publications:
continue
endpoints_to_test = {publ_type: endpoints + ENDPOINTS_TO_TEST_NEGATIVE_ONLY for publ_type, endpoints in ENDPOINTS_TO_TEST.items()}
add_publication_test_cases_to_list(tc_list, publication, user, endpoints_to_test)
add_publication_test_cases_to_list(tc_list, publication, user, endpoints_to_test[publication.type])
return tc_list


Expand Down Expand Up @@ -139,6 +146,24 @@ def generate_multiendpoint_test_cases(publications_user_can_read, workspace, ):
return tc_list


def generate_positive_geoserver_test_cases(publications_user_can_read):
tc_list = []
for user, publications in publications_user_can_read.items():
for publication in publications:
if publication.type == process_client.LAYER_TYPE:
add_publication_test_cases_to_list(tc_list, publication, user, GEOSERVER_METHODS_TO_TEST)
return tc_list


def generate_geoserver_negative_test_cases(publications_user_can_read, publication_all):
tc_list = []
for user, available_publications in publications_user_can_read.items():
for publication in publication_all:
if publication not in available_publications and publication.type == process_client.LAYER_TYPE:
add_publication_test_cases_to_list(tc_list, publication, user, GEOSERVER_METHODS_TO_TEST)
return tc_list


@pytest.mark.timeout(60)
@pytest.mark.usefixtures('ensure_layman_module', 'oauth2_provider_mock')
class TestAccessRights:
Expand Down Expand Up @@ -197,9 +222,10 @@ class TestAccessRights:
}

test_cases = {
'test_single_positive': generate_positive_test_cases(PUBLICATIONS_BY_USER),
'test_single_positive': generate_positive_test_cases(PUBLICATIONS_BY_USER) + generate_positive_geoserver_test_cases(PUBLICATIONS_BY_USER),
'test_single_negative': generate_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS),
'test_multiendpoint': generate_multiendpoint_test_cases(PUBLICATIONS_BY_USER, OWNER),
'test_geoserver_negative': generate_geoserver_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS),
}

@pytest.fixture(scope='class', autouse=True)
Expand Down Expand Up @@ -238,3 +264,8 @@ def test_multiendpoint(self, rest_method, rest_args, params):
result = rest_method(**rest_args)
result_publications = [(publ['workspace'], f"layman.{publ['publication_type']}", publ['name']) for publ in result]
assert result_publications == params['exp_publications']

def test_geoserver_negative(self, rest_method, rest_args, ):
with pytest.raises(AssertionError) as exc_info:
rest_method(**rest_args)
assert exc_info.value.args[0].startswith('Layer not found in Capabilities.')

0 comments on commit 79fd70d

Please sign in to comment.