Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

165 - Validate role names #981

Merged
merged 12 commits into from
Dec 15, 2023
2 changes: 1 addition & 1 deletion doc/models.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
## Role
- Role is any group of users. One user can be assigned to multiple roles.
- Each role is identified by name that is unique among all roles.
- The name is upper-case (in contrast with [username](#username)).
- The name is upper-case (in contrast with [username](#username)), maximum length is 64 characters.
- Roles can be used for assigning access rights.

## Workspace
Expand Down
3 changes: 2 additions & 1 deletion src/layman/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@
set_after_restart()

logger.info(f'Recreate Role Service admin role views')
from .authz.role_service import ensure_admin_roles
from .authz.internal_role_service import ensure_admin_roles

ensure_admin_roles()

pipe.multi()
Expand Down
6 changes: 5 additions & 1 deletion src/layman/authz/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ def complete_access_rights(access_rights_to_complete, full_access_rights):
return access_rights_to_complete


def is_user(user_or_role_name):
return any(letter.islower() for letter in user_or_role_name)


def split_user_and_role_names(user_and_role_names):
user_names = [name for name in user_and_role_names if any(letter.islower() for letter in name)]
user_names = [name for name in user_and_role_names if is_user(name)]
role_names = [name for name in user_and_role_names if name not in user_names]
return user_names, role_names
1 change: 1 addition & 0 deletions src/layman/authz/authz_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def test_authorize_publications_decorator_accepts_path(request_path):
pytest.param(['ROLE1', 'EVERYONE'], [], ['ROLE1', 'EVERYONE'], id='only-roles'),
pytest.param(['ROLE2', 'user1', 'EVERYONE', 'user2'], ['user1', 'user2'], ['ROLE2', 'EVERYONE'],
id='more-users-and-roles'),
pytest.param(['mIxEd'], ['mIxEd'], [], id='mixed-case'),
])
def test_split_user_and_role_names(roles_and_users, exp_users, exp_roles):
user_names, role_names = split_user_and_role_names(roles_and_users)
Expand Down
24 changes: 24 additions & 0 deletions src/layman/authz/internal_role_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from layman import settings
from db import util as db_util

ROLE_SERVICE_SCHEMA = settings.LAYMAN_INTERNAL_ROLE_SERVICE_SCHEMA


def ensure_admin_roles():
create_admin_roles_view = f"""CREATE OR REPLACE view {ROLE_SERVICE_SCHEMA}.admin_roles
as
select 'ADMIN' as name
UNION ALL
select 'GROUP_ADMIN'
UNION ALL
select %s
;"""
db_util.run_statement(create_admin_roles_view, (settings.LAYMAN_GS_ROLE, ))

create_admin_user_roles_view = f"""CREATE OR REPLACE view {ROLE_SERVICE_SCHEMA}.admin_user_roles
as
select %s as username, %s as rolename
UNION ALL
select %s, 'ADMIN'
;"""
db_util.run_statement(create_admin_user_roles_view, (settings.LAYMAN_GS_USER, settings.LAYMAN_GS_ROLE, settings.LAYMAN_GS_USER))
31 changes: 9 additions & 22 deletions src/layman/authz/role_service.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,7 @@
from db import util as db_util
from layman import settings

ROLE_NAME_PATTERN = r'^[A-Z][A-Z0-9]*(?:_[A-Z0-9]+)*$'
ROLE_SERVICE_SCHEMA = settings.LAYMAN_INTERNAL_ROLE_SERVICE_SCHEMA


def ensure_admin_roles():
create_admin_roles_view = f"""CREATE OR REPLACE view {ROLE_SERVICE_SCHEMA}.admin_roles
as
select 'ADMIN' as name
UNION ALL
select 'GROUP_ADMIN'
UNION ALL
select %s
;"""
db_util.run_statement(create_admin_roles_view, (settings.LAYMAN_GS_ROLE, ))

create_admin_user_roles_view = f"""CREATE OR REPLACE view {ROLE_SERVICE_SCHEMA}.admin_user_roles
as
select %s as username, %s as rolename
UNION ALL
select %s, 'ADMIN'
;"""
db_util.run_statement(create_admin_user_roles_view, (settings.LAYMAN_GS_USER, settings.LAYMAN_GS_ROLE, settings.LAYMAN_GS_USER))
ROLE_NAME_PATTERN = r'^(?!.{65,})[A-Z][A-Z0-9]*(?:_[A-Z0-9]+)*$'


def get_user_roles(username):
Expand All @@ -35,3 +14,11 @@ def get_user_roles(username):
"""
roles = db_util.run_query(query, (username, 'ADMIN', 'GROUP_ADMIN', settings.LAYMAN_GS_ROLE, ROLE_NAME_PATTERN), uri_str=settings.LAYMAN_ROLE_SERVICE_URI)
return {role[0] for role in roles}


def get_existent_roles(roles_to_check):
query = f"""
select name from {settings.LAYMAN_ROLE_SERVICE_SCHEMA}.roles where name = ANY(%s)
"""
rows = db_util.run_query(query, (list(roles_to_check),), uri_str=settings.LAYMAN_ROLE_SERVICE_URI)
return {row[0] for row in rows}
35 changes: 24 additions & 11 deletions src/layman/common/prime_db_schema/publications.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import re
from dataclasses import dataclass
import logging
import psycopg2.extras

import crs as crs_def
from db import util as db_util, TableUri
from geoserver.util import RESERVED_ROLE_NAMES
from layman import settings, LaymanError
from layman.authn import is_user_with_name
from layman.authz import split_user_and_role_names, role_service
from layman.common import get_publications_consts as consts, bbox as bbox_util
from . import workspaces, users, rights

Expand Down Expand Up @@ -357,7 +360,27 @@ def only_valid_user_names(users_list):

# pylint: disable=unused-argument
def only_valid_role_names(roles_list):
pass
roles_list = set(roles_list) - {settings.RIGHTS_EVERYONE_ROLE}
if not roles_list:
return

not_matching_roles = [r for r in roles_list if not re.match(role_service.ROLE_NAME_PATTERN, r)]
if not_matching_roles:
raise LaymanError(43, f'Found roles not matching to regex {role_service.ROLE_NAME_PATTERN} '
f': {not_matching_roles}')

internal_user_roles = [r for r in roles_list if r.startswith('USER_')]
if internal_user_roles:
raise LaymanError(43, f'Internal user roles found: {internal_user_roles}')

reserved_admin_roles = RESERVED_ROLE_NAMES + ['ADMIN', 'ADMIN_GROUP', settings.LAYMAN_GS_ROLE]
admin_roles = [r for r in roles_list if r in reserved_admin_roles]
if admin_roles:
raise LaymanError(43, f'Admin roles found: {admin_roles}')

non_existent_roles = roles_list - role_service.get_existent_roles(roles_list)
if non_existent_roles:
raise LaymanError(43, f'Non-existent roles found: {non_existent_roles}')


def at_least_one_can_write(user_names, role_names):
Expand All @@ -382,16 +405,6 @@ def owner_can_still_write(owner,
raise LaymanError(43, f'Owner of the personal workspace have to keep write right.')


def is_user(user_or_role_name):
return any(letter.islower() for letter in user_or_role_name)


def split_user_and_role_names(user_and_role_names):
user_names = [name for name in user_and_role_names if is_user(name)]
role_names = [name for name in user_and_role_names if name not in user_names]
return user_names, role_names


def check_rights_axioms(can_read,
can_write,
actor_name,
Expand Down
126 changes: 85 additions & 41 deletions src/layman/common/prime_db_schema/publications_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from layman import settings, app as app, LaymanError
from layman.map import MAP_TYPE
from test_tools.role_service import ensure_role, delete_role
from . import publications, workspaces, users

DB_SCHEMA = settings.LAYMAN_PRIME_SCHEMA
Expand All @@ -18,35 +19,88 @@
}


def test_only_valid_names():
def ensure_user(username, sub):
id_workspace_user = workspaces.ensure_workspace(username)
userinfo = userinfo_baseline.copy()
userinfo['sub'] = sub
users.ensure_user(id_workspace_user, userinfo)


class TestOnlyValidUserNames:
workspace_name = 'test_only_valid_names_workspace'
username = 'test_only_valid_names_user'

with app.app_context():
workspaces.ensure_workspace(workspace_name)
id_workspace_user = workspaces.ensure_workspace(username)
userinfo = userinfo_baseline.copy()
userinfo['sub'] = '10'
users.ensure_user(id_workspace_user, userinfo)
@pytest.fixture(scope="class", autouse=True)
def provide_data(self, request):
ensure_user(self.username, '10')
workspaces.ensure_workspace(self.workspace_name)
yield
if request.node.session.testsfailed == 0:
workspaces.delete_workspace(self.workspace_name)
users.delete_user(self.username)

@classmethod
@pytest.mark.parametrize("names", [
pytest.param({username, workspace_name}, id='username-and-workspace-name'),
pytest.param({workspace_name, username}, id='workspace-name-and-username'),
pytest.param({'skaljgdalskfglshfgd'}, id='non-existent-username'),
pytest.param({'mIxEd'}, id='mixed-case'),
])
def test_raises(cls, names):
with app.app_context():
with pytest.raises(LaymanError) as exc_info:
publications.only_valid_user_names(names)
assert exc_info.value.code == 43

publications.only_valid_user_names(set())
publications.only_valid_user_names({username, })

with pytest.raises(LaymanError) as exc_info:
publications.only_valid_user_names({username, workspace_name})
assert exc_info.value.code == 43

with pytest.raises(LaymanError) as exc_info:
publications.only_valid_user_names({workspace_name, username})
assert exc_info.value.code == 43
class TestOnlyValidRoleNames:
role1 = 'TEST_ONLY_VALID_ROLE_NAMES_ROLE1'
role2 = 'TEST_ONLY_VALID_ROLE_NAMES_ROLE2'
role64 = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ_ABCDEFGHIJKLMNOPQRSTUVWXYZ_ABCDEFGHIJ'
user = 'TEST_ONLY_VALID_ROLE_NAMES_USER'
non_existent_role = 'TEST_ONLY_VALID_ROLE_NAMES_NON_EXISTENT_ROLE'

@pytest.fixture(scope="class", autouse=True)
def provide_data(self, request):
roles = [self.role1, self.role2, self.role64]
for role in roles:
ensure_role(role)
ensure_user(self.user, '11')
yield
if request.node.session.testsfailed == 0:
users.delete_user(self.user)
for role in roles:
delete_role(role)

@pytest.mark.parametrize("roles", [
pytest.param({role1}, id='one-existing-role'),
pytest.param({role1, role2}, id='two-existing-roles'),
pytest.param({'EVERYONE'}, id='everyone-role'),
pytest.param({role64}, id='64-characters'),
])
def test_ok(self, roles):
publications.only_valid_role_names(roles)

@pytest.mark.parametrize("roles", [
pytest.param({non_existent_role}, id='non-existent-role'),
pytest.param({role1, non_existent_role}, id='non-existent-role-of-two-roles'),
pytest.param({f'USER_{user}'}, id='internal-user-role'),
pytest.param({f'INVALID__ROLE'}, id='invalid-role-two-underscores'),
pytest.param({f'0INVALID_ROLE'}, id='invalid-role-starts-with-number'),
pytest.param({f'ROLE_ADMINISTRATOR'}, id='ROLE_ADMINISTRATOR'),
pytest.param({f'ROLE_GROUP_ADMIN'}, id='ROLE_GROUP_ADMIN'),
pytest.param({f'ROLE_AUTHENTICATED'}, id='ROLE_AUTHENTICATED'),
pytest.param({f'ROLE_ANONYMOUS'}, id='ROLE_ANONYMOUS'),
pytest.param({f'ADMIN'}, id='ADMIN'),
pytest.param({f'ADMIN_GROUP'}, id='ADMIN_GROUP'),
pytest.param({settings.LAYMAN_GS_ROLE}, id='value-of-LAYMAN_GS_ROLE'),
pytest.param({'ABCDEFGHIJKLMNOPQRSTUVWXYZ_ABCDEFGHIJKLMNOPQRSTUVWXYZ_ABCDEFGHIJK'}, id='65-characters'),
])
def test_raises(self, roles):
with pytest.raises(LaymanError) as exc_info:
publications.only_valid_user_names({'skaljgdalskfglshfgd', })
publications.only_valid_role_names(roles)
assert exc_info.value.code == 43

users.delete_user(username)
workspaces.delete_workspace(workspace_name)


def test_at_least_one_can_write():
workspace_name = 'test_at_least_one_can_write_workspace'
Expand Down Expand Up @@ -179,10 +233,7 @@ def test_get_user_and_role_names_for_db():

with app.app_context():
workspaces.ensure_workspace(workspace_name)
id_workspace_user = workspaces.ensure_workspace(username)
userinfo = userinfo_baseline.copy()
userinfo['sub'] = '20'
users.ensure_user(id_workspace_user, userinfo)
ensure_user(username, '20')

user_names, role_names = publications.get_user_and_role_names_for_db({username, }, workspace_name)
assert user_names == {username}
Expand Down Expand Up @@ -246,17 +297,13 @@ class TestInsertRights:
def provide_data(self, request):
with app.app_context():
workspaces.ensure_workspace(self.workspace_name)
id_workspace_user = workspaces.ensure_workspace(self.username)
userinfo = userinfo_baseline.copy()
userinfo['sub'] = '30'
users.ensure_user(id_workspace_user, userinfo)
id_workspace_user2 = workspaces.ensure_workspace(self.username2)
userinfo = userinfo_baseline.copy()
userinfo['sub'] = '40'
users.ensure_user(id_workspace_user2, userinfo)
ensure_user(self.username, '30')
ensure_user(self.username2, '40')
ensure_role(self.role1)
yield
if request.node.session.testsfailed == 0:
with app.app_context():
delete_role(self.role1)
users.delete_user(self.username)
users.delete_user(self.username2)
workspaces.delete_workspace(self.workspace_name)
Expand Down Expand Up @@ -374,21 +421,18 @@ class TestUpdateRights:
def provide_data(self, request):
with app.app_context():
workspaces.ensure_workspace(self.workspace_name)
id_workspace_user = workspaces.ensure_workspace(self.username)
userinfo = userinfo_baseline.copy()
userinfo['sub'] = '50'
users.ensure_user(id_workspace_user, userinfo)
id_workspace_user2 = workspaces.ensure_workspace(self.username2)
userinfo = userinfo_baseline.copy()
userinfo['sub'] = '60'
users.ensure_user(id_workspace_user2, userinfo)

ensure_user(self.username, '50')
ensure_user(self.username2, '60')
ensure_role(self.role1)
ensure_role(self.role2)
publications.insert_publication(self.username, self.publication_insert_info)
yield
if request.node.session.testsfailed == 0:
with app.app_context():
publications.delete_publication(self.username, self.publication_insert_info["publ_type_name"],
self.publication_insert_info["name"])
delete_role(self.role1)
delete_role(self.role2)
users.delete_user(self.username)
users.delete_user(self.username2)
workspaces.delete_workspace(self.workspace_name)
Expand Down
Loading