diff --git a/src/mavedb/routers/authorization.py b/src/mavedb/routers/authorization.py new file mode 100644 index 00000000..583101e4 --- /dev/null +++ b/src/mavedb/routers/authorization.py @@ -0,0 +1,71 @@ +import logging +from enum import Enum + +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.orm import Session + +from mavedb import deps +from mavedb.lib.authentication import get_current_user, UserData +from mavedb.lib.permissions import has_permission, Action +from mavedb.lib.logging import LoggedRoute +from mavedb.lib.logging.context import logging_context, save_to_logging_context +from mavedb.models.experiment import Experiment +from mavedb.models.experiment_set import ExperimentSet +from mavedb.models.score_set import ScoreSet + +router = APIRouter( + prefix="/api/v1", + tags=["authorizations"], + responses={404: {"description": "Not found"}}, + route_class=LoggedRoute, +) + +logger = logging.getLogger(__name__) + + +class ModelName(str, Enum): + experiment = "experiment" + experiment_set = "experiment-set" + score_set = "score-set" + + +@router.get( + "/user-is-authorized/{model_name}/{urn}/{action}", + status_code=200, + response_model=bool +) +async def check_authorization( + *, + model_name: str, + urn: str, + action: str, + db: Session = Depends(deps.get_db), + user_data: UserData = Depends(get_current_user), +) -> bool: + """ + Check whether users have authorizations in adding/editing/deleting/publishing experiment or score set. + """ + save_to_logging_context({"requested_resource": urn}) + if model_name == ModelName.experiment_set: + item = db.query(ExperimentSet).filter(ExperimentSet.urn == urn).one_or_none() + elif model_name == ModelName.experiment: + item = db.query(Experiment).filter(Experiment.urn == urn).one_or_none() + elif model_name == ModelName.score_set: + item = db.query(ScoreSet).filter(ScoreSet.urn == urn).one_or_none() + else: + item = None + + if item: + if user_data: + try: + action_enum = Action[action.upper()] + permission = has_permission(user_data, item, action_enum).permitted + return permission + except KeyError: + raise HTTPException(status_code=400, detail=f"Invalid action: {action}") + else: + logger.debug(msg="Miss user data", extra=logging_context()) + raise HTTPException(status_code=404, detail=f"User not found") + else: + logger.debug(msg="The requested resources does not exist.", extra=logging_context()) + raise HTTPException(status_code=404, detail=f"{model_name} with URN '{urn}' not found") diff --git a/src/mavedb/routers/experiment_sets.py b/src/mavedb/routers/experiment_sets.py index eb235619..eeaaf34c 100644 --- a/src/mavedb/routers/experiment_sets.py +++ b/src/mavedb/routers/experiment_sets.py @@ -4,14 +4,12 @@ from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session -from sqlalchemy import or_ from mavedb import deps from mavedb.lib.authentication import get_current_user, UserData from mavedb.lib.permissions import has_permission, Action from mavedb.lib.logging import LoggedRoute from mavedb.lib.logging.context import logging_context, save_to_logging_context -from mavedb.models.contributor import Contributor from mavedb.models.experiment_set import ExperimentSet from mavedb.view_models import experiment_set @@ -25,40 +23,6 @@ logger = logging.getLogger(__name__) -@router.get( - "/check-authorizations/{urn}", - status_code=200, - response_model=bool -) -async def check_experiment_set_authorization( - *, - urn: str, - db: Session = Depends(deps.get_db), - user_data: UserData = Depends(get_current_user), -) -> bool: - """ - Check whether users have authorizations in this experiment set. - """ - query = db.query(ExperimentSet).filter(ExperimentSet.urn == urn) - - if user_data is not None: - query = query.filter( - or_( - ExperimentSet.created_by_id == user_data.user.id, - ExperimentSet.contributors.any(Contributor.orcid_id == user_data.user.username), - ) - ) - else: - return False - - save_to_logging_context({"Experiment set requested resource": urn}) - item = query.first() - if item: - return True - else: - return False - - @router.get( "/{urn}", status_code=200, diff --git a/src/mavedb/routers/experiments.py b/src/mavedb/routers/experiments.py index e8a846b3..6ca47430 100644 --- a/src/mavedb/routers/experiments.py +++ b/src/mavedb/routers/experiments.py @@ -5,7 +5,6 @@ from fastapi import APIRouter, Depends, HTTPException from fastapi.encoders import jsonable_encoder import pydantic -from sqlalchemy import or_, and_ from sqlalchemy.orm import Session from mavedb import deps @@ -45,41 +44,6 @@ ) -@router.get( - "/experiments/check-authorizations/{urn}", - status_code=200, - response_model=bool -) -async def check_experiment_authorization( - *, - urn: str, - db: Session = Depends(deps.get_db), - user_data: UserData = Depends(get_current_user), -) -> bool: - """ - Check whether users have authorizations in this experiment. - """ - query = db.query(Experiment).filter(Experiment.urn == urn) - - if user_data is not None: - query = query.filter( - or_( - Experiment.created_by_id == user_data.user.id, - Experiment.contributors.any(Contributor.orcid_id == user_data.user.username), - ) - ) - else: - return False - - save_to_logging_context({"Experiment requested resource": urn}) - item = query.first() - - if item: - return True - else: - return False - - # TODO: Rewrite this function. @router.get( "/experiments/", diff --git a/src/mavedb/routers/score_sets.py b/src/mavedb/routers/score_sets.py index dcf74f45..44fd2a30 100644 --- a/src/mavedb/routers/score_sets.py +++ b/src/mavedb/routers/score_sets.py @@ -113,40 +113,6 @@ async def fetch_score_set_by_urn( ) -@router.get( - "/score-sets/check-authorizations/{urn}", - status_code=200, - response_model=bool -) -async def check_score_set_authorization( - *, - urn: str, - db: Session = Depends(deps.get_db), - user_data: UserData = Depends(get_current_user), -) -> bool: - """ - Check whether users have authorizations in this score set. - """ - query = db.query(ScoreSet).filter(ScoreSet.urn == urn) - - if user_data is not None: - query = query.filter( - or_( - ScoreSet.created_by_id == user_data.user.id, - ScoreSet.contributors.any(Contributor.orcid_id == user_data.user.username), - ) - ) - else: - return False - - save_to_logging_context({"Score set requested resource": urn}) - item = query.first() - if item: - return True - else: - return False - - @router.post("/score-sets/search", status_code=200, response_model=list[score_set.ShortScoreSet]) def search_score_sets(search: ScoreSetsSearch, db: Session = Depends(deps.get_db)) -> Any: # = Body(..., embed=True), """ diff --git a/src/mavedb/server_main.py b/src/mavedb/server_main.py index 8514fcae..597528f5 100644 --- a/src/mavedb/server_main.py +++ b/src/mavedb/server_main.py @@ -26,6 +26,7 @@ from mavedb.lib.logging.canonical import log_request from mavedb.routers import ( access_keys, + authorization, api_information, controlled_keywords, doi_identifiers, @@ -73,6 +74,7 @@ ) app.include_router(access_keys.router) app.include_router(api_information.router) +app.include_router(authorization.router) app.include_router(controlled_keywords.router) app.include_router(doi_identifiers.router) app.include_router(experiment_sets.router) diff --git a/tests/routers/test_authorization.py b/tests/routers/test_authorization.py new file mode 100644 index 00000000..52fd0893 --- /dev/null +++ b/tests/routers/test_authorization.py @@ -0,0 +1,331 @@ +from tests.helpers.constants import TEST_USER +from tests.helpers.util import ( + add_contributor, + change_ownership, + create_experiment, + create_seq_score_set, +) +from mavedb.models.experiment import Experiment as ExperimentDbModel +from mavedb.models.experiment_set import ExperimentSet as ExperimentSetDbModel +from mavedb.models.score_set import ScoreSet as ScoreSetDbModel + + +# Test check_authorization function +# Experiment set tests +def test_get_true_authorization_from_own_experiment_set_add_experiment_check(client, setup_router_db): + experiment = create_experiment(client) + response = client.get(f"/api/v1/user-is-authorized/experiment-set/{experiment['experimentSetUrn']}/add_experiment") + + assert response.status_code == 200 + assert response.json() == True + + +def test_contributor_gets_true_authorization_from_others_experiment_set_add_experiment_check(session, client, setup_router_db): + experiment = create_experiment(client) + change_ownership(session, experiment["urn"], ExperimentDbModel) + change_ownership(session, experiment["experimentSetUrn"], ExperimentSetDbModel) + add_contributor( + session, + experiment["experimentSetUrn"], + ExperimentSetDbModel, + TEST_USER["username"], + TEST_USER["first_name"], + TEST_USER["last_name"], + ) + response = client.get(f"/api/v1/user-is-authorized/experiment-set/{experiment['experimentSetUrn']}/add_experiment") + + assert response.status_code == 200 + assert response.json() == True + + +def test_get_false_authorization_from_other_users_experiment_set_add_experiment_check(session, client, setup_router_db): + experiment = create_experiment(client) + change_ownership(session, experiment["urn"], ExperimentDbModel) + change_ownership(session, experiment["experimentSetUrn"], ExperimentSetDbModel) + + response = client.get(f"/api/v1/user-is-authorized/experiment-set/{experiment['experimentSetUrn']}/add_experiment") + + assert response.status_code == 200 + assert response.json() == False + + +def test_cannot_get_authorization_with_wrong_action_in_experiment_set(client, setup_router_db): + experiment = create_experiment(client) + response = client.get(f"/api/v1/user-is-authorized/experiment-set/{experiment['experimentSetUrn']}/edit") + + assert response.status_code == 400 + response_data = response.json() + assert response_data["detail"] == "Invalid action: edit" + + +def test_cannot_get_authorization_with_non_existing_experiment_set(client, setup_router_db): + response = client.get(f"/api/v1/user-is-authorized/experiment-set/invalidUrn/update") + + assert response.status_code == 404 + response_data = response.json() + assert response_data["detail"] == "experiment-set with URN 'invalidUrn' not found" + + +# Experiment tests +def test_get_true_authorization_from_own_experiment_update_check(client, setup_router_db): + experiment = create_experiment(client) + response = client.get(f"/api/v1/user-is-authorized/experiment/{experiment['urn']}/update") + + assert response.status_code == 200 + assert response.json() == True + + +def test_get_true_authorization_from_own_experiment_delete_check(client, setup_router_db): + experiment = create_experiment(client) + response = client.get(f"/api/v1/user-is-authorized/experiment/{experiment['urn']}/delete") + + assert response.status_code == 200 + assert response.json() == True + + +def test_get_true_authorization_from_own_experiment_add_score_set_check(client, setup_router_db): + experiment = create_experiment(client) + response = client.get(f"/api/v1/user-is-authorized/experiment/{experiment['urn']}/add_score_set") + + assert response.status_code == 200 + assert response.json() == True + + +def test_contributor_gets_true_authorization_from_others_experiment_update_check(session, client, setup_router_db): + experiment = create_experiment(client) + change_ownership(session, experiment["urn"], ExperimentDbModel) + add_contributor( + session, + experiment["urn"], + ExperimentDbModel, + TEST_USER["username"], + TEST_USER["first_name"], + TEST_USER["last_name"], + ) + response = client.get(f"/api/v1/user-is-authorized/experiment/{experiment['urn']}/update") + + assert response.status_code == 200 + assert response.json() == True + + +def test_contributor_gets_true_authorization_from_others_experiment_delete_check(session, client, setup_router_db): + experiment = create_experiment(client) + change_ownership(session, experiment["urn"], ExperimentDbModel) + add_contributor( + session, + experiment["urn"], + ExperimentDbModel, + TEST_USER["username"], + TEST_USER["first_name"], + TEST_USER["last_name"], + ) + response = client.get(f"/api/v1/user-is-authorized/experiment/{experiment['urn']}/delete") + + assert response.status_code == 200 + assert response.json() == True + + +def test_contributor_gets_true_authorization_from_others_experiment_add_score_set_check(session, client, setup_router_db): + experiment = create_experiment(client) + change_ownership(session, experiment["urn"], ExperimentDbModel) + add_contributor( + session, + experiment["urn"], + ExperimentDbModel, + TEST_USER["username"], + TEST_USER["first_name"], + TEST_USER["last_name"], + ) + response = client.get(f"/api/v1/user-is-authorized/experiment/{experiment['urn']}/add_score_set") + + assert response.status_code == 200 + assert response.json() == True + + +def test_get_false_authorization_from_other_users_experiment_add_score_set_check(session, client, setup_router_db): + experiment = create_experiment(client) + change_ownership(session, experiment["urn"], ExperimentDbModel) + + response = client.get(f"/api/v1/user-is-authorized/experiment/{experiment['urn']}/add_score_set") + + assert response.status_code == 200 + assert response.json() == False + + +def test_get_false_authorization_from_other_users_experiment_update_check(session, client, setup_router_db): + experiment = create_experiment(client) + change_ownership(session, experiment["urn"], ExperimentDbModel) + + response = client.get(f"/api/v1/user-is-authorized/experiment/{experiment['urn']}/update") + + assert response.status_code == 200 + assert response.json() == False + + +def test_get_false_authorization_from_other_users_experiment_delete_check(session, client, setup_router_db): + experiment = create_experiment(client) + change_ownership(session, experiment["urn"], ExperimentDbModel) + + response = client.get(f"/api/v1/user-is-authorized/experiment/{experiment['urn']}/delete") + + assert response.status_code == 200 + assert response.json() == False + + +def test_cannot_get_authorization_with_wrong_action_in_experiment(client, setup_router_db): + experiment = create_experiment(client) + response = client.get(f"/api/v1/user-is-authorized/experiment/{experiment['urn']}/invalidAction") + + assert response.status_code == 400 + response_data = response.json() + assert response_data["detail"] == "Invalid action: invalidAction" + + +def test_cannot_get_authorization_with_non_existing_experiment(client, setup_router_db): + response = client.get(f"/api/v1/user-is-authorized/experiment/invalidUrn/update") + + assert response.status_code == 404 + response_data = response.json() + assert response_data["detail"] == "experiment with URN 'invalidUrn' not found" + + +# Score set tests +def test_get_true_authorization_from_own_score_set_update_check(client, setup_router_db): + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + response = client.get(f"/api/v1/user-is-authorized/score-set/{score_set['urn']}/update") + + assert response.status_code == 200 + assert response.json() == True + + +def test_get_true_authorization_from_own_score_set_delete_check(client, setup_router_db): + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + response = client.get(f"/api/v1/user-is-authorized/score-set/{score_set['urn']}/delete") + + assert response.status_code == 200 + assert response.json() == True + + +def test_get_true_authorization_from_own_score_set_publish_check(client, setup_router_db): + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + response = client.get(f"/api/v1/user-is-authorized/score-set/{score_set['urn']}/publish") + + assert response.status_code == 200 + assert response.json() == True + + +def test_contributor_gets_true_authorization_from_others_score_set_update_check(session, client, setup_router_db): + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + change_ownership(session, score_set["urn"], ScoreSetDbModel) + add_contributor( + session, + score_set["urn"], + ScoreSetDbModel, + TEST_USER["username"], + TEST_USER["first_name"], + TEST_USER["last_name"], + ) + response = client.get(f"/api/v1/user-is-authorized/score-set/{score_set['urn']}/update") + + assert response.status_code == 200 + assert response.json() == True + + +def test_contributor_gets_true_authorization_from_others_score_set_delete_check(session, client, setup_router_db): + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + change_ownership(session, score_set["urn"], ScoreSetDbModel) + add_contributor( + session, + score_set["urn"], + ScoreSetDbModel, + TEST_USER["username"], + TEST_USER["first_name"], + TEST_USER["last_name"], + ) + response = client.get(f"/api/v1/user-is-authorized/score-set/{score_set['urn']}/delete") + + assert response.status_code == 200 + assert response.json() == True + + +def test_contributor_gets_true_authorization_from_others_score_set_publish_check(session, client, setup_router_db): + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + change_ownership(session, score_set["urn"], ScoreSetDbModel) + add_contributor( + session, + score_set["urn"], + ScoreSetDbModel, + TEST_USER["username"], + TEST_USER["first_name"], + TEST_USER["last_name"], + ) + response = client.get(f"/api/v1/user-is-authorized/score-set/{score_set['urn']}/publish") + + assert response.status_code == 200 + assert response.json() == True + + +def test_get_false_authorization_from_other_users_score_set_delete_check(session, client, setup_router_db): + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + change_ownership(session, score_set["urn"], ScoreSetDbModel) + + response = client.get(f"/api/v1/user-is-authorized/score-set/{score_set['urn']}/delete") + + assert response.status_code == 200 + assert response.json() == False + + +def test_get_false_authorization_from_other_users_score_set_update_check(session, client, setup_router_db): + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + change_ownership(session, score_set["urn"], ScoreSetDbModel) + + response = client.get(f"/api/v1/user-is-authorized/score-set/{score_set['urn']}/update") + + assert response.status_code == 200 + assert response.json() == False + + +def test_get_false_authorization_from_other_users_score_set_publish_check(session, client, setup_router_db): + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + change_ownership(session, score_set["urn"], ScoreSetDbModel) + + response = client.get(f"/api/v1/user-is-authorized/score-set/{score_set['urn']}/publish") + + assert response.status_code == 200 + assert response.json() == False + + +def test_cannot_get_authorization_with_wrong_action_in_score_set(client, setup_router_db): + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + response = client.get(f"/api/v1/user-is-authorized/score-set/{score_set['urn']}/invalidAction") + + assert response.status_code == 400 + response_data = response.json() + assert response_data["detail"] == "Invalid action: invalidAction" + + +def test_cannot_get_authorization_with_non_existing_experiment(client, setup_router_db): + response = client.get(f"/api/v1/user-is-authorized/score-set/invalidUrn/update") + + assert response.status_code == 404 + response_data = response.json() + assert response_data["detail"] == "score-set with URN 'invalidUrn' not found" + + +# Common invalid test +def test_cannot_get_authorization_with_non_existing_item(client, setup_router_db): + response = client.get(f"/api/v1/user-is-authorized/invalidModel/invalidUrn/update") + + assert response.status_code == 404 + response_data = response.json() + assert response_data["detail"] == "invalidModel with URN 'invalidUrn' not found" diff --git a/tests/routers/test_experiment_set.py b/tests/routers/test_experiment_set.py deleted file mode 100644 index 734e42ec..00000000 --- a/tests/routers/test_experiment_set.py +++ /dev/null @@ -1,45 +0,0 @@ -from tests.helpers.constants import TEST_USER -from tests.helpers.util import ( - add_contributor, - change_ownership, - create_experiment, -) -from mavedb.models.experiment import Experiment as ExperimentDbModel -from mavedb.models.experiment_set import ExperimentSet as ExperimentSetDbModel - - -def test_get_true_authorization_from_own_experiment_set_check(client, setup_router_db): - experiment = create_experiment(client) - response = client.get(f"/api/v1/experiment-sets/check-authorizations/{experiment['experimentSetUrn']}") - - assert response.status_code == 200 - assert response.json() == True - - -def test_contributor_gets_true_authorization_from_others_experiment_set_check(session, client, setup_router_db): - experiment = create_experiment(client) - change_ownership(session, experiment["urn"], ExperimentDbModel) - change_ownership(session, experiment["experimentSetUrn"], ExperimentSetDbModel) - add_contributor( - session, - experiment["experimentSetUrn"], - ExperimentSetDbModel, - TEST_USER["username"], - TEST_USER["first_name"], - TEST_USER["last_name"], - ) - response = client.get(f"/api/v1/experiment-sets/check-authorizations/{experiment['experimentSetUrn']}") - - assert response.status_code == 200 - assert response.json() == True - - -def test_get_false_authorization_from_other_users_experiment_set_check(session, client, setup_router_db): - experiment = create_experiment(client) - change_ownership(session, experiment["urn"], ExperimentDbModel) - change_ownership(session, experiment["experimentSetUrn"], ExperimentSetDbModel) - - response = client.get(f"/api/v1/experiment-sets/check-authorizations/{experiment['experimentSetUrn']}") - - assert response.status_code == 200 - assert response.json() == False \ No newline at end of file diff --git a/tests/routers/test_experiments.py b/tests/routers/test_experiments.py index 6241ed3a..51c29073 100644 --- a/tests/routers/test_experiments.py +++ b/tests/routers/test_experiments.py @@ -562,41 +562,6 @@ def test_admin_can_update_other_users_public_experiment_set( assert response_data["title"] == "Second Experiment" -def test_get_true_authorization_from_own_experiment_check(client, setup_router_db): - experiment = create_experiment(client) - response = client.get(f"/api/v1/experiments/check-authorizations/{experiment['urn']}") - - assert response.status_code == 200 - assert response.json() == True - - -def test_contributor_gets_true_authorization_from_others_experiment_check(session, client, setup_router_db): - experiment = create_experiment(client) - change_ownership(session, experiment["urn"], ExperimentDbModel) - add_contributor( - session, - experiment["urn"], - ExperimentDbModel, - TEST_USER["username"], - TEST_USER["first_name"], - TEST_USER["last_name"], - ) - response = client.get(f"/api/v1/experiments/check-authorizations/{experiment['urn']}") - - assert response.status_code == 200 - assert response.json() == True - - -def test_get_false_authorization_from_other_users_experiment_check(session, client, setup_router_db): - experiment = create_experiment(client) - change_ownership(session, experiment["urn"], ExperimentDbModel) - - response = client.get(f"/api/v1/experiments/check-authorizations/{experiment['urn']}") - - assert response.status_code == 200 - assert response.json() == False - - def test_edit_preserves_optional_metadata(client, setup_router_db): pass diff --git a/tests/routers/test_score_set.py b/tests/routers/test_score_set.py index 4599b84e..b9cbc5b5 100644 --- a/tests/routers/test_score_set.py +++ b/tests/routers/test_score_set.py @@ -496,44 +496,6 @@ def test_admin_can_add_scores_and_counts_to_other_user_score_set(session, client assert score_set == response_data -def test_get_true_authorization_from_own_score_set_check(client, setup_router_db): - experiment = create_experiment(client) - score_set = create_seq_score_set(client, experiment["urn"]) - response = client.get(f"/api/v1/score-sets/check-authorizations/{score_set['urn']}") - - assert response.status_code == 200 - assert response.json() == True - - -def test_contributor_gets_true_authorization_from_others_score_set_check(session, client, setup_router_db): - experiment = create_experiment(client) - score_set = create_seq_score_set(client, experiment["urn"]) - change_ownership(session, score_set["urn"], ScoreSetDbModel) - add_contributor( - session, - score_set["urn"], - ScoreSetDbModel, - TEST_USER["username"], - TEST_USER["first_name"], - TEST_USER["last_name"], - ) - response = client.get(f"/api/v1/score-sets/check-authorizations/{score_set['urn']}") - - assert response.status_code == 200 - assert response.json() == True - - -def test_get_false_authorization_from_other_users_score_set_check(session, client, setup_router_db): - experiment = create_experiment(client) - score_set = create_seq_score_set(client, experiment["urn"]) - change_ownership(session, score_set["urn"], ScoreSetDbModel) - - response = client.get(f"/api/v1/score-sets/check-authorizations/{score_set['urn']}") - - assert response.status_code == 200 - assert response.json() == False - - def test_publish_score_set(session, data_provider, client, setup_router_db, data_files): experiment = create_experiment(client) score_set = create_seq_score_set_with_variants(