Skip to content

Commit

Permalink
Refactoring and adding documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen committed Nov 21, 2023
1 parent a704d20 commit 7c796e7
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 16 deletions.
87 changes: 87 additions & 0 deletions docs/source/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,90 @@ As a result the GPUs available on the two compute nodes are reported:
>>> ('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn139')]
```
In this case each compute node `cn138` and `cn139` is equipped with one `Tesla V100S-PCIE-32GB`.

## External Executables
While `pympipool` was initially designed for up-scaling python functions for HPC, the same functionality can be leveraged
to up-scale any executable independent of the programming language it is developed in. This approach follows the design
of the `flux.job.FluxExecutor` included in the [flux framework](https://flux-framework.org). In `pympipool` this approach
is extended to support any kind of subprocess, so it is no longer limited to the [flux framework](https://flux-framework.org).

### Subprocess
Following the [`subprocess.check_output()`](https://docs.python.org/3/library/subprocess.html) interface of the standard
python libraries, any kind of command can be submitted to the `pympipool.SubprocessExecutor`. The command can either be
specified as a list `["echo", "test"]` in which the first entry is typically the executable followed by the corresponding
parameters or the command can be specified as a string `"echo test"` with the additional parameter `shell=True`.
```
from pympipool import SubprocessExecutor
with SubprocessExecutor(max_workers=2) as exe:
future = exe.submit(["echo", "test"], universal_newlines=True)
print(future.done(), future.result(), future.done())
>>> (False, "test", True)
```
In analogy to the previous examples the `SubprocessExecutor` class is directly imported from the `pympipool` module and
the maximum number of workers is set to two `max_workers=2`. In contrast to the `pympipool.Executor` class no other
settings to assign specific hardware to the command via the python interface are available in the `SubprocessExecutor`
class. To specify the hardware requirements for the individual commands, the user has to manually assign the resources
using the commands of the resource schedulers like `srun`, `flux run` or `mpiexec`.

The `concurrent.futures.Future` object returned after submitting a command to the `pymipool.SubprocessExecutor` behaves
just like any other future object. It provides a `done()` function to check if the execution completed as well as a
`result()` function to return the output of the submitted command.

In comparison to the `flux.job.FluxExecutor` included in the [flux framework](https://flux-framework.org) the
`pymipool.SubprocessExecutor` differs in two ways. One the `pymipool.SubprocessExecutor` does not provide any option for
resource assignment and two the `pymipool.SubprocessExecutor` returns the output of the command rather than just
returning the exit status when calling `result()`.

### Interactive Shell
Beyond external executables which are called once with a set of input parameters and or input files and return one set
of outputs, there are some executables which allow the user to interact with the executable during the execution. The
challenge of interfacing a python process with such an interactive executable is to identify when the executable is ready
to receive the next input. A very basis example for an interactive executable is a script which counts to the number
input by the user. This can be written in python as `count.py`:
```
def count(iterations):
for i in range(int(iterations)):
print(i)
print("done")
if __name__ == "__main__":
while True:
user_input = input()
if "shutdown" in user_input:
break
else:
count(iterations=int(user_input))
```
This example is challenging in terms of interfacing it with a python process as the length of the output changes depending
on the input. The first option that the `pympipool.ShellExecutor` provides is specifying the number of lines to read for
each call submitted to the executable using the `lines_to_read` parameter. In comparison to the `SubprocessExecutor`
defined above the `ShellExecutor` only supports the execution of a single executable at a time, correspondingly the input
parameters for calling the executable are provided at the time of initialization of the `ShellExecutor` and the inputs
are submitted using the `submit()` function:
```
from pympipool import ShellExecutor
with ShellExecutor(["python", "count.py"], universal_newlines=True) as exe:
future_lines = exe.submit(string_input="4", lines_to_read=5)
print(future_lines.done(), future_lines.result(), future_lines.done())
>>> (False, "0\n1\n2\n3\ndone\n", True)
```
The response for a given set of input is again returned as `concurrent.futures.Future` object, this allows the user to
execute other steps on the python side while waiting for the completion of the external executable. In this case the
example counts the numbers from `0` to `3` and prints each of them in one line followed by `done` to notify the user its
waiting for new inputs. This results in `n+1` lines of output for the input of `n`. Still predicting the number of lines
for a given input can be challenging, so the `pympipool.ShellExecutor` class also provides the option to wait until a
specific pattern is found in the output using the `stop_read_pattern`:
```
from pympipool import ShellExecutor
with ShellExecutor(["python", "count.py"], universal_newlines=True) as exe:
future_pattern = exe.submit(string_input="4", stop_read_pattern="done")
print(future_pattern.done(), future_pattern.result(), future_pattern.done())
>>> (False, "0\n1\n2\n3\ndone\n", True)
```
In this example the pattern simply searches for the string `done` in the output of the program and returns all the output
gathered from the executable since the last input as the result of the `concurrent.futures.Future` object returned after
the submission of the interactive command.
4 changes: 2 additions & 2 deletions pympipool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from ._version import get_versions
from pympipool.mpi.executor import PyMPIExecutor
from pympipool.shared.interface import SLURM_COMMAND
from pympipool.shell.executor import ShellExecutor
from pympipool.shell.interactive import ShellInteractiveExecutor
from pympipool.shell.executor import SubprocessExecutor
from pympipool.shell.interactive import ShellExecutor
from pympipool.slurm.executor import PySlurmExecutor

try: # The PyFluxExecutor requires flux-core to be installed.
Expand Down
4 changes: 2 additions & 2 deletions pympipool/shell/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from pympipool.shell.executor import ShellExecutor
from pympipool.shell.interactive import ShellInteractiveExecutor
from pympipool.shell.executor import SubprocessExecutor
from pympipool.shell.interactive import ShellExecutor
41 changes: 38 additions & 3 deletions pympipool/shell/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@


def execute_single_task(future_queue):
"""
Process items received via the queue.
Args:
future_queue (queue.Queue):
"""
while True:
task_dict = future_queue.get()
if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
Expand All @@ -31,7 +37,10 @@ def execute_single_task(future_queue):
raise KeyError(task_dict)


class ShellStaticExecutor(ExecutorBase):
class SubprocessSingleExecutor(ExecutorBase):
"""
The pympipool.shell.SubprocessSingleExecutor is the internal worker for the pympipool.shell.SubprocessExecutor.
"""
def __init__(self):
super().__init__()
self._process = RaisingThread(
Expand All @@ -48,7 +57,26 @@ def submit(self, *args, **kwargs):
return f


class ShellExecutor(ExecutorBase):
class SubprocessExecutor(ExecutorBase):
"""
The pympipool.shell.SubprocessExecutor enables the submission of command line calls via the subprocess.check_output()
interface of the python standard library. It is based on the concurrent.futures.Executor class and returns a
concurrent.futures.Future object for every submitted command line call. Still it does not provide any option to
interact with the external executable during the execution.
Args:
max_workers (int): defines the number workers which can execute functions in parallel
sleep_interval (float): synchronization interval - default 0.1
Examples:
>>> from pympipool import SubprocessExecutor
>>> with SubprocessExecutor(max_workers=2) as exe:
>>> future = exe.submit(["echo", "test"], universal_newlines=True)
>>> print(future.done(), future.result(), future.done())
(False, "test", True)
"""
def __init__(
self,
max_workers=1,
Expand All @@ -62,12 +90,19 @@ def __init__(
"future_queue": self._future_queue,
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"executor_class": ShellStaticExecutor,
"executor_class": SubprocessSingleExecutor,
},
)
self._process.start()

def submit(self, *args, **kwargs):
"""
Submit a command line call to be executed. The given arguments are provided to subprocess.Popen() as additional
inputs to control the execution.
Returns:
A Future representing the given call.
"""
f = Future()
self._future_queue.put({"future": f, "args": args, "kwargs": kwargs})
return f
62 changes: 61 additions & 1 deletion pympipool/shell/interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,24 @@


def wait_for_process_to_stop(process, sleep_interval=10e-10):
"""
Wait for the subprocess.Popen() process to stop executing
Args:
process (subprocess.Popen): process object
sleep_interval (float): interval to sleep during poll() calls
"""
while process.poll() is None:
sleep(sleep_interval)


def execute_single_task(future_queue):
"""
Process items received via the queue.
Args:
future_queue (queue.Queue):
"""
process = None
while True:
task_dict = future_queue.get()
Expand Down Expand Up @@ -70,7 +83,27 @@ def execute_single_task(future_queue):
raise ValueError("process exited")


class ShellInteractiveExecutor(ExecutorBase):
class ShellExecutor(ExecutorBase):
"""
In contrast to the other pympipool.shell.SubprocessExecutor and the pympipool.Executor the pympipool.shell.ShellExecutor
can only execute a single process at a given time. Still it adds the capability to interact with this process during
its execution. The initialization of the pympipool.shell.ShellExecutor takes the same input arguments as the
subprocess.Popen() call for the standard library to start a subprocess.
Examples
>>> from pympipool import ShellExecutor
>>> with ShellExecutor(["python", "count.py"], universal_newlines=True) as exe:
>>> future_lines = exe.submit(string_input="4", lines_to_read=5)
>>> print(future_lines.done(), future_lines.result(), future_lines.done())
(False, "0\n1\n2\n3\ndone\n", True)
>>> from pympipool import ShellExecutor
>>> with ShellExecutor(["python", "count.py"], universal_newlines=True) as exe:
>>> future_pattern = exe.submit(string_input="4", stop_read_pattern="done")
>>> print(future_pattern.done(), future_pattern.result(), future_pattern.done())
(False, "0\n1\n2\n3\ndone\n", True)
"""
def __init__(self, *args, **kwargs):
super().__init__()
self._process = RaisingThread(
Expand All @@ -83,6 +116,20 @@ def __init__(self, *args, **kwargs):
self._future_queue.put({"init": True, "args": args, "kwargs": kwargs})

def submit(self, string_input, lines_to_read=None, stop_read_pattern=None):
"""
Submit the input as a string to the executable. In addition to the input the ShellExecutor also needs a measure
to identify the completion of the execution. This can either be provided based on the number of lines to read
using the `lines_to_read` parameter or by providing a string pattern using the `stop_read_pattern` to stop
reading new lines. One of these two stopping criteria has to be defined.
Args:
string_input (str): Input to be communicated to the underlying executable
lines_to_read (None/int): integer number of lines to read from the command line (optional)
stop_read_pattern (None/str): string pattern to indicate the command line output is completed (optional)
Returns:
A Future representing the given call.
"""
if lines_to_read is None and stop_read_pattern is None:
raise ValueError(
"Either the number of lines_to_read (int) or the stop_read_pattern (str) has to be defined."
Expand All @@ -101,6 +148,19 @@ def submit(self, string_input, lines_to_read=None, stop_read_pattern=None):
return f

def shutdown(self, wait=True, *, cancel_futures=False):
"""Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other
methods can be called after this one.
Args:
wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the
parallel_executors have been reclaimed.
cancel_futures: If True then shutdown will cancel all pending
futures. Futures that are completed or running will not be
cancelled.
"""
if cancel_futures:
cancel_items_in_queue(que=self._future_queue)
self._future_queue.put({"shutdown": True, "wait": wait})
Expand Down
4 changes: 2 additions & 2 deletions tests/test_shell_interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from unittest import TestCase

from pympipool.shell.interactive import ShellInteractiveExecutor, execute_single_task
from pympipool.shell.interactive import ShellExecutor, execute_single_task


class ShellInteractiveExecutorTest(TestCase):
Expand All @@ -28,7 +28,7 @@ def test_execute_single_task(self):
self.assertEqual("0\n1\n2\n3\ndone\n", future_pattern.result())

def test_shell_interactive_executor(self):
with ShellInteractiveExecutor(["python", self.executable_path], universal_newlines=True) as exe:
with ShellExecutor(["python", self.executable_path], universal_newlines=True) as exe:
future_lines = exe.submit(string_input="4", lines_to_read=5, stop_read_pattern=None)
future_pattern = exe.submit(string_input="4", lines_to_read=None, stop_read_pattern="done")
self.assertFalse(future_lines.done())
Expand Down
26 changes: 20 additions & 6 deletions tests/test_shell.py → tests/test_subprocess_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

from unittest import TestCase

from pympipool.shell.executor import ShellStaticExecutor, ShellExecutor, execute_single_task
from pympipool.shell.executor import SubprocessSingleExecutor, SubprocessExecutor, execute_single_task


class ShellExecutorTest(TestCase):
class SubprocessExecutorTest(TestCase):
def test_execute_single_task(self):
test_queue = queue.Queue()
f = Future()
Expand All @@ -17,15 +17,29 @@ def test_execute_single_task(self):
self.assertTrue(f.done())
self.assertEqual("test\n", f.result())

def test_shell_static_executor(self):
with ShellStaticExecutor() as exe:
future = exe.submit(["echo", "test"], universal_newlines=True)
def test_shell_static_executor_args(self):
with SubprocessSingleExecutor() as exe:
future = exe.submit(["echo", "test"], universal_newlines=True, shell=False)
self.assertFalse(future.done())
self.assertEqual("test\n", future.result())
self.assertTrue(future.done())

def test_shell_static_executor_binary(self):
with SubprocessSingleExecutor() as exe:
future = exe.submit(["echo", "test"], universal_newlines=False, shell=False)
self.assertFalse(future.done())
self.assertEqual(b"test\n", future.result())
self.assertTrue(future.done())

def test_shell_static_executor_shell(self):
with SubprocessSingleExecutor() as exe:
future = exe.submit("echo test", universal_newlines=True, shell=True)
self.assertFalse(future.done())
self.assertEqual("test\n", future.result())
self.assertTrue(future.done())

def test_shell_executor(self):
with ShellExecutor(max_workers=2) as exe:
with SubprocessExecutor(max_workers=2) as exe:
f_1 = exe.submit(["echo", "test_1"], universal_newlines=True)
f_2 = exe.submit(["echo", "test_2"], universal_newlines=True)
f_3 = exe.submit(["echo", "test_3"], universal_newlines=True)
Expand Down

0 comments on commit 7c796e7

Please sign in to comment.