Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge authz_test and test_role_application #969

Merged
merged 4 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 0 additions & 109 deletions src/layman/authz/authz_test.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import requests
from flask import g
import pytest

from layman import app, settings, LaymanError
from test_tools import process_client
from test_tools.util import url_for
from . import authorize_workspace_publications_decorator, split_user_and_role_names


Expand Down Expand Up @@ -61,112 +58,6 @@ def test_authorize_publications_decorator_accepts_path(request_path):
assert isinstance(exc_info.value, LaymanError), exc_info.traceback


class TestRestApiClass:
layername = 'test_authorize_decorator_layer'
mapname = 'test_authorize_decorator_map'
username = 'test_authorize_decorator_user'
authz_headers = process_client.get_authz_headers(username)

@pytest.fixture(scope="class")
def provide_publications(self):
username = self.username
authz_headers = self.authz_headers
layername = self.layername
mapname = self.mapname
process_client.ensure_reserved_username(username, headers=authz_headers)
process_client.publish_workspace_layer(username, layername, headers=authz_headers)
process_client.publish_workspace_map(username, mapname, headers=authz_headers)
yield
process_client.delete_workspace_layer(username, layername, headers=authz_headers)
process_client.delete_workspace_map(username, mapname, headers=authz_headers)

@staticmethod
def assert_response(response, exp_status_code, exp_data):
assert response.status_code == exp_status_code, response.text
if exp_status_code == 200 and exp_data is not None:
resp_json = response.json()
if callable(exp_data):
assert exp_data(resp_json), f"resp_json={resp_json}, exp_data={exp_data}"
else:
assert resp_json == exp_data
elif exp_status_code != 200 and exp_data is not None:
resp_json = response.json()
assert resp_json['code'] == exp_data, f"resp_json={resp_json}, exp_data={exp_data}"

@staticmethod
def has_single_layer(r_json):
return {li['name'] for li in r_json} == {TestRestApiClass.layername}

@staticmethod
def has_single_map(r_json):
return {li['name'] for li in r_json} == {TestRestApiClass.mapname}

@staticmethod
def has_no_publication(r_json):
return {li['name'] for li in r_json} == set()

@pytest.mark.parametrize(
"rest_action, url_for_params, authz_status_code, authz_response, unauthz_status_code, unauthz_response",
[
('rest_workspace_layers.get', {}, 200, has_single_layer.__func__, 200, has_no_publication.__func__),
('rest_workspace_layer.get', {'layername': layername}, 200, None, 404, 15),
('rest_workspace_layer_metadata_comparison.get', {'layername': layername}, 200, None, 404, 15),
('rest_workspace_layer_style.get', {'layername': layername}, 200, None, 404, 15),
('rest_workspace_layer_thumbnail.get', {'layername': layername}, 200, None, 404, 15),
('rest_workspace_layer_chunk.get', {'layername': layername}, 400, 20, 404, 15),
('rest_workspace_maps.get', {}, 200, has_single_map.__func__, 200, has_no_publication.__func__),
('rest_workspace_map.get', {'mapname': mapname}, 200, None, 404, 26),
('rest_workspace_map_file.get', {'mapname': mapname}, 200, None, 404, 26),
('rest_workspace_map_metadata_comparison.get', {'mapname': mapname}, 200, None, 404, 26),
('rest_workspace_map_thumbnail.get', {'mapname': mapname}, 200, None, 404, 26),
],
)
@pytest.mark.usefixtures('oauth2_provider_mock', 'ensure_layman', 'provide_publications')
def test_authorize_publications_decorator_on_rest_api(
self,
rest_action,
url_for_params,
authz_status_code,
authz_response,
unauthz_status_code,
unauthz_response,
):
username = self.username
authz_headers = self.authz_headers
patch_method = None
publ_name = None
if '_layer' in rest_action:
patch_method = process_client.patch_workspace_layer
publ_name = self.layername
elif '_map' in rest_action:
patch_method = process_client.patch_workspace_map
publ_name = self.mapname
assert publ_name

url_for_params['workspace'] = username

with app.app_context():
rest_url = url_for(rest_action, **url_for_params)

patch_method(username, publ_name, headers=authz_headers, access_rights={
'read': username,
'write': username,
})
response = requests.get(rest_url, headers=authz_headers, timeout=settings.DEFAULT_CONNECTION_TIMEOUT)
self.assert_response(response, authz_status_code, authz_response)
response = requests.get(rest_url, timeout=settings.DEFAULT_CONNECTION_TIMEOUT)
self.assert_response(response, unauthz_status_code, unauthz_response)

patch_method(username, publ_name, headers=authz_headers, access_rights={
'read': settings.RIGHTS_EVERYONE_ROLE,
'write': settings.RIGHTS_EVERYONE_ROLE,
})
response = requests.get(rest_url, headers=authz_headers, timeout=settings.DEFAULT_CONNECTION_TIMEOUT)
self.assert_response(response, authz_status_code, authz_response)
response = requests.get(rest_url, timeout=settings.DEFAULT_CONNECTION_TIMEOUT)
self.assert_response(response, authz_status_code, authz_response)


@pytest.mark.parametrize('roles_and_users, exp_users, exp_roles', [
pytest.param([], [], [], id='no-names'),
pytest.param(['user1', 'user2'], ['user1', 'user2'], [], id='only-users'),
Expand Down
3 changes: 3 additions & 0 deletions test_tools/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ def oauth2_provider_mock():
'test_access_rights_role_user1': None,
'test_role_application_user': None,
'test_role_application_role_user': None,
'test_access_rights_application_owner': None,
'test_access_rights_application_reader': None,
'test_access_rights_application_other_user': None,
},
},
'host': '0.0.0.0',
Expand Down
41 changes: 38 additions & 3 deletions test_tools/process_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
'patch_workspace_publication_url',
'get_workspace_publications_url',
'get_workspace_publication_url',
'get_workspace_publication_thumbnail_url',
'delete_workspace_publication_url',
'delete_workspace_publications_url',
'keys_to_check',
Expand All @@ -59,6 +60,7 @@
'rest_workspace_map.patch',
'rest_workspace_maps.get',
'rest_workspace_map.get',
'rest_workspace_map_thumbnail.get',
'rest_workspace_map.delete_map',
'rest_workspace_maps.delete',
map_keys_to_check,
Expand All @@ -72,6 +74,7 @@
'rest_workspace_layer.patch',
'rest_workspace_layers.get',
'rest_workspace_layer.get',
'rest_workspace_layer_thumbnail.get',
'rest_workspace_layer.delete_layer',
'rest_workspace_layers.delete',
layer_keys_to_check,
Expand All @@ -91,6 +94,7 @@
None,
None,
None,
None,
),
}

Expand Down Expand Up @@ -505,7 +509,12 @@ def get_publications_response(publication_type, *, workspace=None, headers=None,
return response


def get_publications(publication_type, *, workspace=None, headers=None, query_params=None):
def get_publications(publication_type, *, workspace=None, headers=None, query_params=None, actor_name=None):
headers = headers or {}
if actor_name:
assert TOKEN_HEADER not in headers
if actor_name and actor_name != settings.ANONYM_USER:
headers.update(get_authz_headers(actor_name))
return get_publications_response(publication_type, workspace=workspace, headers=headers, query_params=query_params).json()


Expand Down Expand Up @@ -535,7 +544,12 @@ def get_workspace_publication(publication_type, workspace, name, headers=None, *
get_workspace_layer = partial(get_workspace_publication, LAYER_TYPE)


def get_workspace_layer_style(workspace, layer, headers=None):
def get_workspace_layer_style(workspace, layer, headers=None, *, actor_name=None, ):
headers = headers or {}
if actor_name:
assert TOKEN_HEADER not in headers
if actor_name and actor_name != settings.ANONYM_USER:
headers.update(get_authz_headers(actor_name))
with app.app_context():
r_url = url_for('rest_workspace_layer_style.get',
workspace=workspace,
Expand All @@ -554,8 +568,13 @@ def finish_delete(workspace, url, headers, skip_404=False, ):
return response.json()


def delete_workspace_publication(publication_type, workspace, name, *, headers=None, skip_404=False, ):
def delete_workspace_publication(publication_type, workspace, name, *, headers=None, skip_404=False, actor_name=None, ):
headers = headers or {}
if actor_name:
assert TOKEN_HEADER not in headers
if actor_name and actor_name != settings.ANONYM_USER:
headers.update(get_authz_headers(actor_name))

publication_type_def = PUBLICATION_TYPES_DEF[publication_type]

with app.app_context():
Expand Down Expand Up @@ -723,3 +742,19 @@ def get_workspace_map_file(publication_type, workspace, name, headers=None, acto
response = requests.get(r_url, headers=headers, timeout=HTTP_TIMEOUT)
raise_layman_error(response)
return response.json()


def get_workspace_publication_thumbnail(publication_type, workspace, name, *, actor_name=None):
headers = {}
publication_type_def = PUBLICATION_TYPES_DEF[publication_type]
if actor_name:
assert TOKEN_HEADER not in headers

if actor_name and actor_name != settings.ANONYM_USER:
headers.update(get_authz_headers(actor_name))

with app.app_context():
r_url = url_for(publication_type_def.get_workspace_publication_thumbnail_url, **{publication_type_def.url_param_name: name}, workspace=workspace)
response = requests.get(r_url, headers=headers, timeout=HTTP_TIMEOUT)
raise_layman_error(response)
return response.content
Loading
Loading