From e7d8b744c3140458a37def860ecdc692916d435c Mon Sep 17 00:00:00 2001 From: spwoodcock Date: Thu, 11 Jan 2024 13:00:06 +0000 Subject: [PATCH] feat: add org_admin role to role deps --- src/backend/app/auth/roles.py | 62 +++++++++++++++++++++++++++++++---- 1 file changed, 56 insertions(+), 6 deletions(-) diff --git a/src/backend/app/auth/roles.py b/src/backend/app/auth/roles.py index 82c456864b..a5fccc927f 100644 --- a/src/backend/app/auth/roles.py +++ b/src/backend/app/auth/roles.py @@ -28,8 +28,9 @@ from app.auth.osm import AuthUser, login_required from app.db.database import get_db -from app.db.db_models import DbUser, DbUserRoles -from app.models.enums import ProjectRole, UserRole +from app.db.db_models import DbProject, DbUser, DbUserRoles +from app.models.enums import HTTPStatus, ProjectRole, UserRole +from app.projects.project_deps import get_project_by_id async def get_uid(user_data: AuthUser) -> int: @@ -38,29 +39,75 @@ async def get_uid(user_data: AuthUser) -> int: return user_id else: log.error(f"Failed to get user id from auth object: {user_data}") - raise HTTPException(status_code=401, detail="Auth failed. No user id present") + raise HTTPException( + status_code=HTTPStatus.UNAUTHORIZED, + detail="Auth failed. No user id present", + ) async def super_admin( db: Session = Depends(get_db), user_data: AuthUser = Depends(login_required), ) -> AuthUser: + """Super admin role, with access to all endpoints.""" user_id = await get_uid(user_data) match = db.query(DbUser).filter_by(id=user_id, role=UserRole.ADMIN).first() if not match: log.error(f"User ID {user_id} requested an admin endpoint, but is not admin") - raise HTTPException(status_code=403, detail="User must be an administrator") + raise HTTPException( + status_code=HTTPStatus.FORBIDDEN, detail="User must be an administrator" + ) return user_data +async def org_admin( + project: DbProject = Depends(get_project_by_id), + org_id: int = None, + db: Session = Depends(get_db), + user_data: AuthUser = Depends(login_required), +) -> AuthUser: + """Organization admin with full permission for projects in an organization.""" + user_id = await get_uid(user_data) + + org_admin = ( + db.query(DbUserRoles) + .filter_by(user_id=user_id, role=ProjectRole.ORGANIZATION_ADMIN) + .first() + ) + + if not org_admin: + log.error(f"User ID {user_id} is not an admin for any organization") + raise HTTPException( + status_code=HTTPStatus.FORBIDDEN, + detail="User must be an organization admin", + ) + + matched_project = db.query(DbProject).filter_by(id=org_admin.project_id).first() + matched_org_id = matched_project.organisation_id + + if ( + org_id + and matched_org_id == org_id + or project + and matched_org_id == project.organisation_id + ): + return user_data + + log.error(f"User ID {user_id} is not an organization admin for id {org_id}") + raise HTTPException( + status_code=HTTPStatus.FORBIDDEN, detail="User is not an organization admin" + ) + + async def validator( project_id: int, db: Session = Depends(get_db), user_data: AuthUser = Depends(login_required), ) -> AuthUser: + """A validator for a specific project.""" user_id = await get_uid(user_data) match = ( @@ -69,7 +116,9 @@ async def validator( if not match: log.error(f"User ID {user_id} has no access to project ID {project_id}") - raise HTTPException(status_code=403, detail="User has no access to project") + raise HTTPException( + status_code=HTTPStatus.FORBIDDEN, detail="User has no access to project" + ) if match.role.value < ProjectRole.VALIDATOR.value: log.error( @@ -77,7 +126,8 @@ async def validator( f"for project ID {project_id}" ) raise HTTPException( - status_code=403, detail="User is not a validator for this project" + status_code=HTTPStatus.FORBIDDEN, + detail="User is not a validator for this project", ) return user_data