Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into helpers-for-async-req…
Browse files Browse the repository at this point in the history
…uests
  • Loading branch information
Ducica committed Nov 25, 2024
2 parents f15b371 + 84c3779 commit 3e1c880
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 2 deletions.
5 changes: 5 additions & 0 deletions oarepo_ui/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from flask import Response, current_app
from importlib_metadata import entry_points
from invenio_base.utils import obj_or_import_string
from flask_login import user_logged_in, user_logged_out
from .utils import clear_view_deposit_page_permission_from_session


import oarepo_ui.cli # noqa
from oarepo_ui.resources.templating.catalog import OarepoCatalog as Catalog
Expand Down Expand Up @@ -90,6 +93,8 @@ def __init__(self, app=None):
def init_app(self, app):
self.init_config(app)
app.extensions["oarepo_ui"] = OARepoUIState(app)
user_logged_in.connect(clear_view_deposit_page_permission_from_session)
user_logged_out.connect(clear_view_deposit_page_permission_from_session)

def init_config(self, app):
"""Initialize configuration."""
Expand Down
105 changes: 103 additions & 2 deletions oarepo_ui/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,58 @@
"""Oarepo ui utils module."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Optional, overload

from flask import current_app, g, session
from flask_login import current_user
from invenio_base.utils import obj_or_import_string
from marshmallow import Schema, fields
from marshmallow.schema import SchemaMeta
from marshmallow_utils.fields import NestedAttribute

if TYPE_CHECKING:
from invenio_records_resources.services.records import RecordService


@overload
def dump_empty(schema_or_field: Schema) -> dict: ...


@overload
def dump_empty(schema_or_field: SchemaMeta) -> dict: ...


@overload
def dump_empty(schema_or_field: fields.List) -> list: ...


@overload
def dump_empty(schema_or_field: fields.Nested | NestedAttribute) -> dict: ...


@overload
def dump_empty(schema_or_field: fields.Str) -> str: ...


@overload
def dump_empty(schema_or_field: fields.Dict) -> dict: ...


def dump_empty(schema_or_field):
@overload
def dump_empty(schema_or_field: object) -> None: ...


def dump_empty(
schema_or_field: Schema
| SchemaMeta
| fields.List
| fields.Nested
| NestedAttribute
| fields.Str
| fields.Dict
| object,
) -> dict | list | str | None:
"""Return a full json-compatible dict of schema representation with empty values."""
if isinstance(schema_or_field, (Schema,)):
schema = schema_or_field
Expand All @@ -15,7 +64,6 @@ def dump_empty(schema_or_field):
schema = schema_or_field()
return {k: dump_empty(v) for (k, v) in schema.fields.items()}
if isinstance(schema_or_field, fields.List):
# return [dump_empty(schema_or_field.inner)]
return []
if isinstance(schema_or_field, (NestedAttribute, fields.Nested)):
field = schema_or_field
Expand All @@ -28,3 +76,56 @@ def dump_empty(schema_or_field):
if isinstance(schema_or_field, fields.Dict):
return {}
return None


view_deposit_page_permission_key: str = "view_deposit_page_permission"


def can_view_deposit_page() -> bool:
"""Check if the current user can view the deposit page."""
permission_to_deposit: bool = False

if not current_user.is_authenticated:
return False

if view_deposit_page_permission_key in session:
return bool(session[view_deposit_page_permission_key])

repository_search_resources: list[dict[str, str]] = current_app.config.get(
"GLOBAL_SEARCH_MODELS", []
)

if not repository_search_resources:
return False

for search_resource in repository_search_resources:
search_resource_service: Optional[str] = search_resource.get(
"model_service", None
)
search_resource_config: Optional[str] = search_resource.get(
"service_config", None
)

if search_resource_service and search_resource_config:
try:
service_def: Any = obj_or_import_string(search_resource_service)
service_cfg: Any = obj_or_import_string(search_resource_config)

# Instantiate service and check permission
service: RecordService = service_def(service_cfg())
permission_to_deposit = service.check_permission(
g.identity, "view_deposit_page", record=None
)
if permission_to_deposit:
break
except ImportError:
continue

# Cache permission result in session
session[view_deposit_page_permission_key] = permission_to_deposit
return permission_to_deposit


def clear_view_deposit_page_permission_from_session(*args: Any, **kwargs: Any) -> None:
"""Clear the cached permission for viewing the deposit page from the session."""
session.pop(view_deposit_page_permission_key, None)

0 comments on commit 3e1c880

Please sign in to comment.