diff --git a/docs/docs/userguide.rst b/docs/docs/userguide.rst index 60b02de..542a33c 100644 --- a/docs/docs/userguide.rst +++ b/docs/docs/userguide.rst @@ -63,7 +63,7 @@ We can again specify additional options that change the way the node is set up a max_loops=5 ) -If not explicitly built to do so, the :meth:`~maize.core.node.Node.run` method of the node, containing all user code, will only run once. That means it will typically wait to receive some data, process it, send it onwards, and then shutdown. If you want to keep it running and continuously accept input, you can set ``loop`` to ``True`` (or construct the :meth:`~maize.core.node.Node.run` method in a looping way, see `looped nodes <#looped-execution>`_). If you want to limit the maximum number of iterations, use ``max_loops``, by default the node will loop until it receives a shutdown signal or detects neighbouring nodes shutting down. +If not explicitly told to do so, the :meth:`~maize.core.node.Node.run` method of the node, containing all user code, will only run once. That means it will typically wait to receive some data, process it, send it onwards, and then shutdown. If you want to keep it running and continuously accept input, you can set ``loop`` to ``True``. If you want to limit the maximum number of iterations, use ``max_loops``, by default the node will loop until it receives a shutdown signal or detects neighbouring nodes shutting down. .. tip:: The ``max_loops`` argument can be useful when testing the behaviour of continuously running nodes. For some examples, see the test suites for :mod:`~maize.steps.plumbing`. @@ -81,6 +81,8 @@ You won't be able to directly override any parameters or specify additional keyw .. tip:: When defining a new node, it will be automatically added to an internal registry of node types. The node class (not instance) can then be retrieved from the name only using the :meth:`~maize.core.component.Component.get_node_class` function. +Setting parameters +^^^^^^^^^^^^^^^^^^ Configuration of nodes is performed using :term:`parameters `. These are typically node settings that are unique to that node and the wrapped software, and would make little sense to change during execution of a workflow. These are things like configuration files or other set-and-forget options. Parameters can be set at node instantiation with :meth:`~maize.core.workflow.Workflow.add` as mentioned above, or on the nodes themselves using the :meth:`~maize.core.interface.Parameter.set` method: .. code-block:: python @@ -94,7 +96,6 @@ Alternatively, :term:`inputs ` can also act as :term:`parameters `_) or one specified using the ``MAIZE_CONFIG`` environment variable. If you're confused about what to add to your config for a particular workflow, you can use :meth:`~maize.core.workflow.Workflow.generate_config_template` to create a TOML template that you can populate with the correct paths. Note that only one of ``scripts`` and ``commands`` needs to be specified for a given command. +Here, :class:`~maize.utilities.io.NodeConfig` is a node-level configuration class allowing the specification of paths to any required software. + +By default :class:`~maize.utilities.io.Config` will look for a configuration file named ``maize.toml`` in ``$XDG_CONFIG_HOME`` (usually at ``~/.config/``, see `here for more information on the XDG standard `_) or one specified using the ``MAIZE_CONFIG`` environment variable. If you're confused about what to add to your config for a particular workflow, you can use :meth:`~maize.core.workflow.Workflow.generate_config_template` to create a TOML template that you can populate with the correct paths. Note that only one of ``scripts`` and ``commands`` needs to be specified for a given command. .. _custom-graphs: @@ -317,16 +320,16 @@ This is where :term:`subgraphs ` can be helpful. To define them, creat delay = self.add(Delay, parameters=dict(delay=2)) self.connect(node.out, delay.inp) self.out = self.map_port(delay.out) - self.delay = self.map(delay.delay) + self.map(delay.delay) -A key difference between a :term:`subgraph` and a workflow is that the former will always have exposed ports. We however have to clarify which port should be exposed how, by using the :meth:`~maize.core.graph.Graph.map_port` method and specifying a reference to the original port of a contained node and optionally a new name. We can again use the :meth:`~maize.core.graph.Graph.map` convenience method to automatically expose parameters. Note that to get the benefits of type-checking you should in those cases declare all interfaces as class attributes. To group multiple parameters together, we can again use :meth:`~maize.core.graph.Graph.combine_parameters`. ``SubGraph`` will now behave just like any other :term:`component` in the workflow: +A key difference between a :term:`subgraph` and a workflow is that the former will always have exposed ports. We however have to clarify which port should be exposed how, by using the :meth:`~maize.core.graph.Graph.map_port` method and specifying a reference to the original port of a contained node and optionally a new name. We can again use the :meth:`~maize.core.graph.Graph.map` convenience method to automatically expose parameters. Note that to get the benefits of type-checking you should in those cases declare all interfaces in the class body. To group multiple parameters together, we can again use :meth:`~maize.core.graph.Graph.combine_parameters`. ``SubGraph`` will now behave just like any other :term:`component` in the workflow: .. code-block:: python sg = flow.add(SubGraph, parameters=dict(delay=3)) flow.connect(sg.out, another.inp) -At execution, the whole workflow is flattened and each node executed normally, irrespective of nesting. The :term:`subgraph` paradigm is therefore mostly a conceptual aid for complex workflows. +A common situation is running all contained nodes in a loop, in this case you can pass ``loop=True`` just like for a normal node. At execution, the whole workflow is flattened and each node executed normally, irrespective of nesting. The :term:`subgraph` paradigm is therefore mostly a conceptual aid for complex workflows. .. _custom-nodes: @@ -378,7 +381,7 @@ Behind the scenes, maize lets the receiving node know that one or more files are Looped execution ^^^^^^^^^^^^^^^^ -The above example represents a case of a single execution. We may however be interested in performing some form of continuous repeating computation. This can be achieved by not returning from :meth:`~maize.core.node.Node.run`, but maize also provides a convenience :meth:`~maize.core.node.Node.loop` generator method. Here is our example continuously sending the same string over and over (akin to the Unix ``yes`` command): +The above example represents a case of a single execution. We may however be interested in performing some form of continuous repeating computation. This can be accomplished by passing ``loop=True`` to the node or subgraph when adding it to the workflow. The following node, when used with looping, will continuously send the same value, akin to the Unix ``yes`` command: .. code-block:: python @@ -390,15 +393,19 @@ The above example represents a case of a single execution. We may however be int data: Parameter[str] = Parameter(default="hello") def run(self) -> None: - for _ in self.loop(tick=2.0): - self.out.send(self.data.value) + self.out.send(self.data.value) + +However, in some cases you might want to keep state over multiple loops. In that situation, you can setup any data structures you need in the :meth:`~maize.core.node.Node.prepare` method (making sure to call the base method using `super`). The :class:`~maize.steps.plumbing.RoundRobin` node is a good example of this: + +.. literalinclude:: ../../maize/steps/plumbing.py + :pyobject: RoundRobin -We have given :meth:`~maize.core.node.Node.loop` the ``tick`` argument to make it wait ``2`` seconds between sends. +Here, we called the base :meth:`~maize.core.node.Node.prepare` method, followed by creating an iterator over all outputs, and initializing the first output by calling `next`. In :meth:`~maize.core.node.Node.run`, we can use this output as normal and increment the iterator. .. caution:: - Patterns like this can be dangerous, as they have no explicit exit condition. In many cases however downstream nodes that finish computation will signal a port shutdown and consequently cause the sending port to exit. + Patterns using continuous loops like this always have the potential to cause deadlocks, as they have no explicit exit condition. In many cases however downstream nodes that finish computation will signal a port shutdown and consequently cause the sending port to exit. -A common pattern with looped nodes is an optional receive, i.e. we will want to receive one or multiple values (see :class:`~maize.core.interface.MultiPort`) only if they are available and then continue. This can be accomplished by using optional ports, and querying them before attempting to receive: +A common pattern with looped nodes is an optional receive, i.e. we will want to receive one or multiple values (see :class:`~maize.core.interface.MultiPort`) only if they are available and then continue. This can be accomplished by using optional ports, and querying them using :meth:`~maize.core.interface.Input.ready` before attempting to receive: .. code-block:: python @@ -410,16 +417,15 @@ A common pattern with looped nodes is an optional receive, i.e. we will want to out: Output[str] = Output() def run(self) -> None: - for _ in self.loop(tick=2.0): - concat = "" - for inp in self.inp: - if inp.ready(): - concat += inp.receive() - self.out.send(concat) + concat = "" + for inp in self.inp: + if inp.ready(): + concat += inp.receive() + self.out.send(concat) -This node will always send a value every ``2`` seconds, no matter if data is available or not. The optional flag will also ensure it can shutdown correctly when neighbouring nodes stop. Alternatively you can use the :meth:`~maize.core.interface.Input.receive_optional` method to unconditionally receive a value, with the possibility of receiving ``None``. +This node will always send a value every iteration, no matter if data is available or not. The optional flag will also ensure it can shutdown correctly when neighbouring nodes stop. Alternatively you can use the :meth:`~maize.core.interface.Input.receive_optional` method to unconditionally receive a value, with the possibility of receiving ``None``. -Another option is to allow an input to cache a previously received value by adding the ``cached`` flag to the constructor: +Another useful option is to allow an input to cache a previously received value by adding the ``cached`` flag to the constructor: .. code-block:: python @@ -431,9 +437,8 @@ Another option is to allow an input to cache a previously received value by addi out: Output[str] = Output() def run(self) -> None: - for _ in self.loop(tick=2.0): - data = self.inp.receive() - self.out.send(data + "-bar") + data = self.inp.receive() + self.out.send(data + "-bar") In this case, if the node received the string ``"foo"`` the previous iteration, but hasn't been sent a new value this iteration, it will still receive ``"foo"``. This is particularly useful for setting up parameters at the beginning of a workflow and then keeping them unchanged over various internal loops. @@ -473,8 +478,10 @@ This way we get static typing support throughout our workflow, minimizing errors Running commands ^^^^^^^^^^^^^^^^ -There are two main ways of running commands: locally or using a resource manager (such as `SLURM `_). Both can be used through :meth:`~maize.core.node.Node.run_command`: by default any command will simply be run locally (with optional validation) and return a :class:`subprocess.CompletedProcess` instance. +There are two main ways of running commands: locally or using a resource manager (such as `SLURM `_). Both can be used through :meth:`~maize.core.node.Node.run_command` and :meth:`~maize.core.node.Node.run_multi`: by default any command will simply be run locally (with optional validation) and return a :class:`subprocess.CompletedProcess` instance containing the returncode and any output generated on *standard output* or *standard error*. +Software dependencies +""""""""""""""""""""" A common issue is that many programs will require some environment preparation that is often heavily system dependent. To accomodate this, any node definitions should include a :attr:`~maize.core.node.Node.required_callables` definition listing the commands or software that is necessary to run, and / or a :attr:`~maize.core.node.Node.required_packages` attribute listing python packages required in the environment. They can then be specified in the `global config <#configuring-workflows>`_ using the ``modules``, ``scripts``, and ``commands`` parameters or using the corresponding pre-defined parameters (see `handling software <#handling-external-software>`_). For example, if the node ``MyNode`` requires an executable named ``executable``, it will first load any modules under the ``MyNode`` heading, followed by looking for an entry including ``executable`` in the ``commands`` and ``scripts`` sections. Any discovered matching commands will be place in the :attr:`~maize.core.node.Node.runnable` dictionary, which can be used with any command invocation: .. code-block:: python @@ -490,10 +497,10 @@ A common issue is that many programs will require some environment preparation t def run(self) -> None: import my_package data = self.inp.receive() - self.run_command(f"{self.runnable['executable']} --data {data}") - self.out.send(data) + res = self.run_command(f"{self.runnable['executable']} --data {data}") + self.out.send(float(res.stdout)) -The associated configuration section might look something like this: +Here, we are running a command that takes some floating point value as input, and outputs a result to *standard output*. We convert this output to a float and send it on. In practice you will probably need more sophisticated parsing of command outputs. The associated configuration section might look something like this: .. code-block:: toml @@ -501,9 +508,42 @@ The associated configuration section might look something like this: python = "/path/to/python/interpreter" # must contain 'my_package' commands.executable = "/path/to/executable" +If ``executable`` is a script and requires a preceding interpreter to run, your configuration might look like this instead: + +.. code-block:: toml + + [example] + python = "/path/to/python/interpreter" # must contain 'my_package' + scripts.executable.interpreter = "/path/to/interpreter" + scripts.executable.location = "/path/to/script" + If your node requires more customized environment setups, you can override the :meth:`~maize.core.node.Node.prepare` method with your own initialization logic (making sure to call the original method using `super`). -To make use of batch processing systems common in HPC environments, such as SLURM, set the ``batch`` argument of :meth:`~maize.core.node.Node.run_command` to ``True`` and additionally pass in execution options (:class:`~maize.utilities.execution.JobResourceConfig`) if needed: +Running in parallel +""""""""""""""""""" +You can also run multiple commands in parallel using :meth:`~maize.core.node.Node.run_multi`. It takes a list of commands to run and runs them in batches according to the ``n_jobs`` parameter. This can be useful when processing potentially large batches of data with software that does not have its own internal parallelization. Each command can optionally be run in a separate working directory, and otherwise accepts the same parameters as :meth:`~maize.core.node.Node.run_command`: + +.. code-block:: python + + class Example(Node): + + required_callables = ["executable"] + + inp: Input[list[float]] = Input() + out: Output[list[float]] = Output() + + def run(self) -> None: + data = self.inp.receive() + commands = [f"{self.runnable['executable']} --data {d}" for d in data] + results = self.run_multi(commands, n_jobs=4) + output = [float(res.stdout) for res in results] + self.out.send(output) + +We did the same thing as above, but receive and send lists of floats and run our executable in parallel, using 4 jobs. + +Job submission +"""""""""""""" +To make use of batch processing systems common in HPC environments, pass execution options (:class:`~maize.utilities.execution.JobResourceConfig`) to :meth:`~maize.core.node.Node.run_command`: .. code-block:: python @@ -515,7 +555,7 @@ To make use of batch processing systems common in HPC environments, such as SLUR def run(self) -> None: data = self.inp.receive() options = JobResourceConfig(nodes=2) - self.run_command(f"echo {data}", batch=True, batch_options=options) + self.run_command(f"echo {data}", batch_options=options) self.out.send(data) Batch system settings are handled in the maize configuration (see `configuring workflows <#config-workflow>`_) using :class:`~maize.utilities.execution.ResourceManagerConfig`, for example: @@ -527,6 +567,10 @@ Batch system settings are handled in the maize configuration (see `configuring w queue = "core" walltime = "00:05:00" +Running batch commands in parallel can be done using :meth:`~maize.core.node.Node.run_multi` in the same way, i.e. passing a :class:`~maize.utilities.execution.JobResourceConfig` object to ``batch_options``. In this case, ``n_jobs`` refers to the maximum number of jobs to submit at once. A common pattern of use is to first prepare the required directory structure and corresponding commands, and then send all commands for execution at once. + +Resource management +""""""""""""""""""" Because all nodes run simultaneously on a single machine with limited resources, maize features some simple management tools to reserve computational resources: .. code-block:: python @@ -542,6 +586,20 @@ Because all nodes run simultaneously on a single machine with limited resources, data = do_something_heavy(data) self.out.send(data) -You can also reserve GPUs (using :attr:`~maize.core.node.Node.gpus`) using the same syntax, or create your own management system using :class:`~maize.utilities.resources.Resources`. - -Running commands in parallel can be done by simply spawning multiple jobs with *Slurm* (:meth:`~maize.core.node.Node.submit_job`), but also using :meth:`~maize.core.node.Node.run_multi`. It takes a list of commands as input and makes sure that all cores are in use by utilising a queuing system. A common pattern of use is to first prepare the required directory structure and corresponding commands, and then send all commands for execution at once. +You can also reserve GPUs (using :attr:`~maize.core.node.Node.gpus`) using the same syntax. + +Advanced options +"""""""""""""""" +There are multiple additional options for :meth:`~maize.core.node.Node.run_command` and :meth:`~maize.core.node.Node.run_multi` that are worth knowing about: + +===================== ============================== +Option Information +===================== ============================== +``validators`` A list of :class:`~maize.utilities.validation.Validator` objects, allowing output files or *standard output / error* to be checked for content indicating success or failure. +``verbose`` If ``True``, will also log command output to the Maize log. +``raise_on_failure`` If ``True``, will raise an exception if something goes wrong, otherwise will just log a warning. This can be useful when handling batches of data in which some datapoints might be expected to fail. +``command_input`` Can be used to send data to *standard input*. This can be used for commands that might normally require manual user input or interactivity on the commandline. +``pre_execution`` Any command to run just before the main command. Note that if you need to load modules or set environment variables, you should use the options in the configuration system instead (see `handling software <#handling-external-software>`_). Not only does this allow full de-coupling of system and workflow configuration, but it is also more efficient as a module will only be loaded once. +``timeout`` Maximum runtime for a command in seconds. +``working_dirs`` Run in this directory instead of the node working directory (:meth:`~maize.core.node.Node.run_multi` only). +===================== ============================== diff --git a/maize/core/component.py b/maize/core/component.py index 53a288f..71e825c 100644 --- a/maize/core/component.py +++ b/maize/core/component.py @@ -10,6 +10,8 @@ `Node` and represent atomic workflow steps. Nodes with branches are (Sub-)`Graph`s, as they contain multiple nodes, but expose the same interface that a would: +.. code-block:: text + Workflow / \ Subgraph Node diff --git a/maize/core/graph.py b/maize/core/graph.py index f50edf5..22b52ee 100644 --- a/maize/core/graph.py +++ b/maize/core/graph.py @@ -91,8 +91,6 @@ class Graph(Component, register=False): Dictionary of nodes or subgraphs part of the `Graph` channels Dictionary of channels part of the `Graph` - logger - Instance of `Logger` used for logging the graph building stage Raises ------