Skip to content

Commit

Permalink
feat: add org_admin role to role deps
Browse files Browse the repository at this point in the history
  • Loading branch information
spwoodcock committed Jan 16, 2024
1 parent 77a5555 commit e7d8b74
Showing 1 changed file with 56 additions and 6 deletions.
62 changes: 56 additions & 6 deletions src/backend/app/auth/roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@

from app.auth.osm import AuthUser, login_required
from app.db.database import get_db
from app.db.db_models import DbUser, DbUserRoles
from app.models.enums import ProjectRole, UserRole
from app.db.db_models import DbProject, DbUser, DbUserRoles
from app.models.enums import HTTPStatus, ProjectRole, UserRole
from app.projects.project_deps import get_project_by_id


async def get_uid(user_data: AuthUser) -> int:
Expand All @@ -38,29 +39,75 @@ async def get_uid(user_data: AuthUser) -> int:
return user_id
else:
log.error(f"Failed to get user id from auth object: {user_data}")
raise HTTPException(status_code=401, detail="Auth failed. No user id present")
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail="Auth failed. No user id present",
)


async def super_admin(
db: Session = Depends(get_db),
user_data: AuthUser = Depends(login_required),
) -> AuthUser:
"""Super admin role, with access to all endpoints."""
user_id = await get_uid(user_data)

match = db.query(DbUser).filter_by(id=user_id, role=UserRole.ADMIN).first()

if not match:
log.error(f"User ID {user_id} requested an admin endpoint, but is not admin")
raise HTTPException(status_code=403, detail="User must be an administrator")
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN, detail="User must be an administrator"
)

return user_data


async def org_admin(
project: DbProject = Depends(get_project_by_id),
org_id: int = None,
db: Session = Depends(get_db),
user_data: AuthUser = Depends(login_required),
) -> AuthUser:
"""Organization admin with full permission for projects in an organization."""
user_id = await get_uid(user_data)

org_admin = (
db.query(DbUserRoles)
.filter_by(user_id=user_id, role=ProjectRole.ORGANIZATION_ADMIN)
.first()
)

if not org_admin:
log.error(f"User ID {user_id} is not an admin for any organization")
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="User must be an organization admin",
)

matched_project = db.query(DbProject).filter_by(id=org_admin.project_id).first()
matched_org_id = matched_project.organisation_id

if (
org_id
and matched_org_id == org_id
or project
and matched_org_id == project.organisation_id
):
return user_data

log.error(f"User ID {user_id} is not an organization admin for id {org_id}")
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN, detail="User is not an organization admin"
)


async def validator(
project_id: int,
db: Session = Depends(get_db),
user_data: AuthUser = Depends(login_required),
) -> AuthUser:
"""A validator for a specific project."""
user_id = await get_uid(user_data)

match = (
Expand All @@ -69,15 +116,18 @@ async def validator(

if not match:
log.error(f"User ID {user_id} has no access to project ID {project_id}")
raise HTTPException(status_code=403, detail="User has no access to project")
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN, detail="User has no access to project"
)

if match.role.value < ProjectRole.VALIDATOR.value:
log.error(
f"User ID {user_id} does not have validator permission"
f"for project ID {project_id}"
)
raise HTTPException(
status_code=403, detail="User is not a validator for this project"
status_code=HTTPStatus.FORBIDDEN,
detail="User is not a validator for this project",
)

return user_data

0 comments on commit e7d8b74

Please sign in to comment.