diff --git a/CHANGELOG.md b/CHANGELOG.md index 5541b6c6d..996809686 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ - GET [Layers](doc/rest.md#get-layers)/[Maps](doc/rest.md#get-maps)/[Publications](doc/rest.md#get-publications) - DELETE Workspace [Layers](doc/rest.md#delete-workspace-layers)/[Maps](doc/rest.md#delete-workspace-maps) - POST Workspace [Layers](doc/rest.md#post-workspace-layers)/[Maps](doc/rest.md#post-workspace-maps) respects roles in [GRANT_CREATE_PUBLIC_WORKSPACE](doc/env-settings.md#grant_create_public_workspace) and [GRANT_PUBLISH_IN_PUBLIC_WORKSPACE](doc/env-settings.md#grant_publish_in_public_workspace) + - requests to [WMS](doc/endpoints.md#web-map-service) and [WFS](doc/endpoints.md#web-feature-service) endpoints - [#165](https://github.com/LayerManager/layman/issues/165) Many endpoints return previously associated [role names](doc/models.md#role) in `access_rights.read` and `access_rights.write` keys: - [GET](doc/rest.md#get-workspace-layer)/[PATCH](doc/rest.md#patch-workspace-layer) Workspace Layer - [GET](doc/rest.md#get-workspace-map)/[PATCH](doc/rest.md#patch-workspace-map) Workspace Map diff --git a/tests/asserts/final/publication/geoserver_proxy.py b/tests/asserts/final/publication/geoserver_proxy.py index d02244922..ba35ee831 100644 --- a/tests/asserts/final/publication/geoserver_proxy.py +++ b/tests/asserts/final/publication/geoserver_proxy.py @@ -15,12 +15,24 @@ def is_complete_in_workspace_wms(workspace, publ_type, name, *, version, headers geoserver_util.is_complete_in_workspace_wms_instance(wms_inst, name, validate_metadata_url=validate_metadata_url) -def is_complete_in_workspace_wms_1_3_0(workspace, publ_type, name, headers): +def is_complete_in_workspace_wms_1_3_0(workspace, publ_type, name, headers=None, *, actor_name=None): + headers = headers or {} + assert headers is not None or actor_name is not None + if actor_name: + assert process_client.TOKEN_HEADER not in headers + if actor_name and actor_name != settings.ANONYM_USER: + headers.update(process_client.get_authz_headers(actor_name)) assert publ_type == process_client.LAYER_TYPE is_complete_in_workspace_wms(workspace, publ_type, name, version='1.3.0', headers=headers) -def workspace_wfs_2_0_0_capabilities_available_if_vector(workspace, publ_type, name, headers): +def workspace_wfs_2_0_0_capabilities_available_if_vector(workspace, publ_type, name, headers=None, *, actor_name=None): + headers = headers or {} + assert headers is not None or actor_name is not None + if actor_name: + assert process_client.TOKEN_HEADER not in headers + if actor_name and actor_name != settings.ANONYM_USER: + headers.update(process_client.get_authz_headers(actor_name)) with app.app_context(): internal_wfs_url = test_util.url_for('geoserver_proxy_bp.proxy', subpath=workspace + '/wfs') diff --git a/tests/dynamic_data/publications/access_rights/test_access_rights_application.py b/tests/dynamic_data/publications/access_rights/test_access_rights_application.py index e46e97279..1704a85db 100644 --- a/tests/dynamic_data/publications/access_rights/test_access_rights_application.py +++ b/tests/dynamic_data/publications/access_rights/test_access_rights_application.py @@ -5,6 +5,7 @@ from layman import settings, LaymanError from test_tools import process_client, role_service as role_service_util from tests import Publication, EnumTestTypes +from tests.asserts.final.publication import geoserver_proxy from tests.dynamic_data import base_test ENDPOINTS_TO_TEST = { @@ -31,6 +32,11 @@ (process_client.delete_workspace_publication, {}), ] +GEOSERVER_METHODS_TO_TEST = [ + (geoserver_proxy.is_complete_in_workspace_wms_1_3_0, {}), + (geoserver_proxy.workspace_wfs_2_0_0_capabilities_available_if_vector, {}), +] + def pytest_generate_tests(metafunc): # https://docs.pytest.org/en/6.2.x/parametrize.html#pytest-generate-tests @@ -73,8 +79,9 @@ def add_publication_test_cases_to_list(tc_list, publication, user, endpoints_to_ 'layer': publication.name, 'actor_name': user, 'publication_type': publication.type, + 'publ_type': publication.type, } - for method, args in endpoints_to_test[publication.type]: + for method, args in endpoints_to_test: pytest_id = f'{method.__name__}__{user.split("_")[-1]}__{publication.name[5:]}{("__" + next(iter(args.keys()))) if args else ""}' method_args = inspect.getfullargspec(method).args + inspect.getfullargspec(method).kwonlyargs @@ -93,7 +100,7 @@ def generate_positive_test_cases(publications_user_can_read): tc_list = [] for user, publications in publications_user_can_read.items(): for publication in publications: - add_publication_test_cases_to_list(tc_list, publication, user, ENDPOINTS_TO_TEST) + add_publication_test_cases_to_list(tc_list, publication, user, ENDPOINTS_TO_TEST[publication.type]) return tc_list @@ -104,7 +111,7 @@ def generate_negative_test_cases(publications_user_can_read, publication_all): if publication in available_publications: continue endpoints_to_test = {publ_type: endpoints + ENDPOINTS_TO_TEST_NEGATIVE_ONLY for publ_type, endpoints in ENDPOINTS_TO_TEST.items()} - add_publication_test_cases_to_list(tc_list, publication, user, endpoints_to_test) + add_publication_test_cases_to_list(tc_list, publication, user, endpoints_to_test[publication.type]) return tc_list @@ -139,6 +146,24 @@ def generate_multiendpoint_test_cases(publications_user_can_read, workspace, ): return tc_list +def generate_positive_geoserver_test_cases(publications_user_can_read): + tc_list = [] + for user, publications in publications_user_can_read.items(): + for publication in publications: + if publication.type == process_client.LAYER_TYPE: + add_publication_test_cases_to_list(tc_list, publication, user, GEOSERVER_METHODS_TO_TEST) + return tc_list + + +def generate_geoserver_negative_test_cases(publications_user_can_read, publication_all): + tc_list = [] + for user, available_publications in publications_user_can_read.items(): + for publication in publication_all: + if publication not in available_publications and publication.type == process_client.LAYER_TYPE: + add_publication_test_cases_to_list(tc_list, publication, user, GEOSERVER_METHODS_TO_TEST) + return tc_list + + @pytest.mark.timeout(60) @pytest.mark.usefixtures('ensure_layman_module', 'oauth2_provider_mock') class TestAccessRights: @@ -197,9 +222,10 @@ class TestAccessRights: } test_cases = { - 'test_single_positive': generate_positive_test_cases(PUBLICATIONS_BY_USER), + 'test_single_positive': generate_positive_test_cases(PUBLICATIONS_BY_USER) + generate_positive_geoserver_test_cases(PUBLICATIONS_BY_USER), 'test_single_negative': generate_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS), 'test_multiendpoint': generate_multiendpoint_test_cases(PUBLICATIONS_BY_USER, OWNER), + 'test_geoserver_negative': generate_geoserver_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS), } @pytest.fixture(scope='class', autouse=True) @@ -238,3 +264,7 @@ def test_multiendpoint(self, rest_method, rest_args, params): result = rest_method(**rest_args) result_publications = [(publ['workspace'], f"layman.{publ['publication_type']}", publ['name']) for publ in result] assert result_publications == params['exp_publications'] + + def test_geoserver_negative(self, rest_method, rest_args, ): + with pytest.raises(AssertionError): + rest_method(**rest_args)