Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add nodes/full_types_count/user endpoint #79

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 283 additions & 1 deletion aiida_restapi/identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@

"""

from typing import Any
from __future__ import annotations

from collections.abc import MutableMapping
from typing import TYPE_CHECKING, Any, Iterator, Optional, Union

from aiida.common.escaping import escape_for_sql_like

if TYPE_CHECKING:
from aiida.engine.processes.ports import PortNamespace

FULL_TYPE_CONCATENATOR = '|'
LIKE_OPERATOR_CHARACTER = '%'
DEFAULT_NAMESPACE_LABEL = '~no-entry-point~'
Expand Down Expand Up @@ -168,3 +174,279 @@ def load_entry_point_from_full_type(full_type: str) -> Any:
# Which means it is most likely a full module path (the fallback option) and we cannot necessarily load the
# class from this. We could try with `importlib` but not sure that we should
raise EntryPointError('entry point of the given full type cannot be loaded')


class Namespace(MutableMapping):
"""Namespace that can be used to map the node class hierarchy."""

namespace_separator = '.'

# Very ugly ad-hoc mapping of `path` to `label` for the non-leaf entries in the nested `Namespace` mapping:
mapping_path_to_label: dict[Union[str, None], str] = {
'node': 'Node',
'node.data': 'Data',
'node.process': 'Process',
'node.process.calculation': 'Calculation',
'node.process.calculation.calcjob': 'Calculation job',
'node.process.calculation.calcfunction': 'Calculation function',
'node.process.workflow': 'Workflow',
'node.process.workflow.workchain': 'Work chain',
'node.process.workflow.workfunction': 'Work function',
}

# This is a hard-coded mapping to generate the correct full types for process node namespaces of external
# plugins. The `node_type` in that case is fixed and the `process_type` should start with the entry point group
# followed by the plugin name and the wildcard.

process_full_type_mapping = {
'process.calculation.calcjob.': 'process.calculation.calcjob.CalcJobNode.|aiida.calculations:{plugin_name}.%',
'process.calculation.calcfunction.': 'process.calculation.calcfunction.CalcFunctionNode.|aiida.calculations:{plugin_name}.%', # noqa: E501
'process.workflow.workfunction.': 'process.workflow.workfunction.WorkFunctionNode.|aiida.workflows:{plugin_name}.%', # noqa: E501
'process.workflow.workchain.': 'process.workflow.workchain.WorkChainNode.|aiida.workflows:{plugin_name}.%',
}

process_full_type_mapping_unplugged = {
'process.calculation.calcjob.': 'process.calculation.calcjob.CalcJobNode.|{plugin_name}.%',
'process.calculation.calcfunction.': 'process.calculation.calcfunction.CalcFunctionNode.|{plugin_name}.%',
'process.workflow.workfunction.': 'process.workflow.workfunction.WorkFunctionNode.|{plugin_name}.%',
'process.workflow.workchain.': 'process.workflow.workchain.WorkChainNode.|{plugin_name}.%',
}

def __str__(self) -> str:
import json

return json.dumps(self.get_description(), sort_keys=True, indent=4)

def __init__(
self,
namespace: str,
path: Optional[str] = None,
label: Optional[str] = None,
full_type: Optional[str] = None,
counter: Optional[int] = None,
is_leaf: bool = True,
):
"""Construct a new node class namespace."""
self._namespace = namespace
self._path = path if path is not None else namespace
self._full_type = self._infer_full_type(full_type)
self._subspaces: dict[str, PortNamespace] = {}
self._is_leaf = is_leaf
self._counter = counter

self._label: str
if label is not None:
self._label = label
else:
self._label = self.mapping_path_to_label.get(path, self._path.rpartition('.')[-1])

# Manual override for process subspaces that contain entries corresponding to nodes with "unregistered" process
# types. In this case, the label should become `Unregistered` and the full type set to `None` because we cannot
# query for all nodes that fall under this category.
if namespace == DEFAULT_NAMESPACE_LABEL:
self._label = 'Unregistered'
self._full_type = None

def _infer_full_type(self, full_type: Union[str, None]) -> Union[str, None]:
"""Infer the full type based on the current namespace path and the given full type of the leaf."""
from aiida.common.utils import strip_prefix

if full_type or self._path is None:
return full_type

full_type_ = strip_prefix(self._path, 'node.')

if full_type_.startswith('process.'):
for basepath, full_type_template in self.process_full_type_mapping.items():
if full_type_.startswith(basepath):
plugin_name = strip_prefix(full_type_, basepath)
if plugin_name.startswith(DEFAULT_NAMESPACE_LABEL):
temp_type_template = self.process_full_type_mapping_unplugged[basepath]
plugin_name = strip_prefix(plugin_name, DEFAULT_NAMESPACE_LABEL + '.')
full_type_ = temp_type_template.format(plugin_name=plugin_name)
else:
full_type_ = full_type_template.format(plugin_name=plugin_name)
return full_type_

full_type_ += f'.{LIKE_OPERATOR_CHARACTER}{FULL_TYPE_CONCATENATOR}'

if full_type_.startswith('process.'):
full_type_ += LIKE_OPERATOR_CHARACTER

return full_type_

def __iter__(self) -> Iterator[PortNamespace]:
return self._subspaces.__iter__()

def __len__(self) -> int:
return len(self._subspaces)

def __delitem__(self, key: str) -> None:
del self._subspaces[key]

def __getitem__(self, key: str) -> PortNamespace:
return self._subspaces[key]

def __setitem__(self, key: str, port: PortNamespace) -> None:
self._subspaces[key] = port

@property
def is_leaf(self) -> bool:
return self._is_leaf

def get_description(self) -> dict:
"""Return a dictionary with a description of the ports this namespace contains.

Nested PortNamespaces will be properly recursed and Ports will print their properties in a list

:returns: a dictionary of descriptions of the Ports contained within this PortNamespace
"""
result: dict[str, Any] = {
'namespace': self._namespace,
'full_type': self._full_type,
'label': self._label,
'path': self._path,
'subspaces': [],
}

for _, port in self._subspaces.items():
subspace_result = port.get_description()
result['subspaces'].append(subspace_result)
if 'counter' in subspace_result:
if self._counter is None:
self._counter = 0
self._counter = self._counter + subspace_result['counter']

if self._counter is not None:
result['counter'] = self._counter

return result

def create_namespace(self, name: str, **kwargs: Any) -> 'Namespace':
"""Create and return a new `Namespace` in this `Namespace`.

If the name is namespaced, the sub `Namespaces` will be created recursively, except if one of the namespaces is
already occupied at any level by a Port in which case a ValueError will be thrown

:param name: name (potentially namespaced) of the port to create and return
:param kwargs: constructor arguments that will be used *only* for the construction of the terminal Namespace
:returns: Namespace
:raises: ValueError if any sub namespace is occupied by a non-Namespace port
"""
if not isinstance(name, str):
raise ValueError(f'name has to be a string type, not {type(name)}')

if not name:
raise ValueError('name cannot be an empty string')

namespace = name.split(self.namespace_separator)
port_name = namespace.pop(0)

path = f'{self._path}{self.namespace_separator}{port_name}'

# If this is True, the (sub) port namespace does not yet exist, so we create it
if port_name not in self:
# If there still is a `namespace`, we create a sub namespace, *without* the constructor arguments
if namespace:
self[port_name] = self.__class__(port_name, path=path, is_leaf=False)

# Otherwise it is the terminal port and we construct *with* the keyword arugments
else:
kwargs['is_leaf'] = True
self[port_name] = self.__class__(port_name, path=path, **kwargs)

# The port does already exist: if it is a leaf and `namespace` is not empty, then the current leaf node is
# also a namespace itself, so create a namespace with the same name and put the leaf within itself
elif self[port_name].is_leaf and namespace:
clone = self[port_name]
self[port_name] = self.__class__(port_name, path=path, is_leaf=False)
self[port_name][port_name] = clone

# If the current existing port is not a leaf and we do not have remaining namespace, that means the current
# namespace is the "concrete" version of the namespace, so we add the leaf version to the namespace.
elif not self[port_name].is_leaf and not namespace:
kwargs['is_leaf'] = True
self[port_name][port_name] = self.__class__(port_name, path=f'{path}.{port_name}', **kwargs)

# If there is still `namespace` left, we create the next namespace
if namespace:
kwargs['is_leaf'] = True
return self[port_name].create_namespace(self.namespace_separator.join(namespace), **kwargs)

return self[port_name]


def get_node_namespace(user_pk: Optional[int] = None, count_nodes: Optional[int] = False) -> 'Namespace':
"""Return the full namespace of all available nodes in the current database.

:return: complete node `Namespace`
"""
from aiida import orm
from aiida.plugins.entry_point import is_valid_entry_point_string, parse_entry_point_string

filters = {}
if user_pk is not None:
filters['user_id'] = user_pk

builder = orm.QueryBuilder().append(orm.Node, filters=filters, project=['node_type', 'process_type']).distinct()

# All None instances of process_type are turned into ''
unique_types = {(node_type, process_type if process_type else '') for node_type, process_type in builder.all()}

# First we create a flat list of all "leaf" node types.
namespaces = []

for node_type, process_type in unique_types:
label = None
counter = None
namespace = None

if process_type:
# Only process nodes
parts = node_type.rsplit('.', 2)
if is_valid_entry_point_string(process_type):
_, entry_point_name = parse_entry_point_string(process_type)
label = entry_point_name.rpartition('.')[-1]
namespace = '.'.join(parts[:-2] + [entry_point_name])
else:
label = process_type.rsplit('.', 1)[-1]
namespace = '.'.join(parts[:-2] + [DEFAULT_NAMESPACE_LABEL, process_type])

else:
# Data nodes and process nodes without process type (='' or =None)
parts = node_type.rsplit('.', 2)
try:
label = parts[-2]
namespace = '.'.join(parts[:-2])
except IndexError:
continue

if count_nodes:
builder = orm.QueryBuilder()
concat_filters = [{'node_type': {'==': node_type}}]

if node_type.startswith('process.'):
if process_type:
concat_filters.append({'process_type': {'==': process_type}})
else:
concat_filters.append({'process_type': {'or': [{'==': ''}, {'==': None}]}})

if user_pk:
concat_filters.append({'user_id': {'==': user_pk}})

if len(concat_filters) == 1:
builder.append(orm.Node, filters=concat_filters[0])
else:
builder.append(orm.Node, filters={'and': concat_filters})

counter = builder.count()

full_type = construct_full_type(node_type, process_type)
namespaces.append((namespace, label, full_type, counter))

node_namespace = Namespace('node')

for namespace, label, full_type, counter in sorted(namespaces, key=lambda x: x[0], reverse=False):
node_namespace.create_namespace(namespace, label=label, full_type=full_type, counter=counter)

return node_namespace
15 changes: 15 additions & 0 deletions aiida_restapi/routers/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from pydantic import ValidationError

from aiida_restapi import models, resources
from aiida_restapi.identifiers import get_node_namespace

from .auth import get_current_active_user

Expand Down Expand Up @@ -41,6 +42,20 @@ async def get_nodes_download_formats() -> dict[str, Any]:
return resources.get_all_download_formats()


@router.get('/nodes/full_types', response_model=dict[str, Any])
@with_dbenv()
async def get_full_types() -> dict[str, Any]:
"""Return full_types of the nodes"""
return get_node_namespace(user_pk=None, count_nodes=False).get_description()


@router.get('/nodes/full_types_count', response_model=dict[str, Any])
@with_dbenv()
async def get_full_types_count(user: Optional[int] = None) -> dict[str, Any]:
"""Return full_types_count of the nodes"""
return get_node_namespace(user_pk=user, count_nodes=True).get_description()


@router.get('/nodes/{nodes_id}', response_model=models.Node)
@with_dbenv()
async def read_node(nodes_id: int) -> Optional[models.Node]:
Expand Down
4 changes: 1 addition & 3 deletions docs/source/user_guide/graphql.md
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,7 @@ http://localhost:5000/api/v4/nodes?attributes=true&attributes_filter=pbc1
http://localhost:5000/api/v4/nodes/full_types
```

NOT YET SPECIFICALLY IMPLEMENTED
(although this needs further investigation, because full types is basically not documented anywhere)

Not implemented for GraphQL, please use the REST API for this use case.

```html
http://localhost:5000/api/v4/nodes/download_formats
Expand Down
Loading
Loading