From 7c796e7ac9a95e75e4498a06e54b838098a0dfde Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Tue, 21 Nov 2023 13:04:13 +0100 Subject: [PATCH] Refactoring and adding documentation --- docs/source/examples.md | 87 +++++++++++++++++++ pympipool/__init__.py | 4 +- pympipool/shell/__init__.py | 4 +- pympipool/shell/executor.py | 41 ++++++++- pympipool/shell/interactive.py | 62 ++++++++++++- tests/test_shell_interactive.py | 4 +- ...t_shell.py => test_subprocess_executor.py} | 26 ++++-- 7 files changed, 212 insertions(+), 16 deletions(-) rename tests/{test_shell.py => test_subprocess_executor.py} (64%) diff --git a/docs/source/examples.md b/docs/source/examples.md index b63f8e18..3e69e749 100644 --- a/docs/source/examples.md +++ b/docs/source/examples.md @@ -304,3 +304,90 @@ As a result the GPUs available on the two compute nodes are reported: >>> ('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn139')] ``` In this case each compute node `cn138` and `cn139` is equipped with one `Tesla V100S-PCIE-32GB`. + +## External Executables +While `pympipool` was initially designed for up-scaling python functions for HPC, the same functionality can be leveraged +to up-scale any executable independent of the programming language it is developed in. This approach follows the design +of the `flux.job.FluxExecutor` included in the [flux framework](https://flux-framework.org). In `pympipool` this approach +is extended to support any kind of subprocess, so it is no longer limited to the [flux framework](https://flux-framework.org). + +### Subprocess +Following the [`subprocess.check_output()`](https://docs.python.org/3/library/subprocess.html) interface of the standard +python libraries, any kind of command can be submitted to the `pympipool.SubprocessExecutor`. The command can either be +specified as a list `["echo", "test"]` in which the first entry is typically the executable followed by the corresponding +parameters or the command can be specified as a string `"echo test"` with the additional parameter `shell=True`. +``` +from pympipool import SubprocessExecutor + +with SubprocessExecutor(max_workers=2) as exe: + future = exe.submit(["echo", "test"], universal_newlines=True) + print(future.done(), future.result(), future.done()) +>>> (False, "test", True) +``` +In analogy to the previous examples the `SubprocessExecutor` class is directly imported from the `pympipool` module and +the maximum number of workers is set to two `max_workers=2`. In contrast to the `pympipool.Executor` class no other +settings to assign specific hardware to the command via the python interface are available in the `SubprocessExecutor` +class. To specify the hardware requirements for the individual commands, the user has to manually assign the resources +using the commands of the resource schedulers like `srun`, `flux run` or `mpiexec`. + +The `concurrent.futures.Future` object returned after submitting a command to the `pymipool.SubprocessExecutor` behaves +just like any other future object. It provides a `done()` function to check if the execution completed as well as a +`result()` function to return the output of the submitted command. + +In comparison to the `flux.job.FluxExecutor` included in the [flux framework](https://flux-framework.org) the +`pymipool.SubprocessExecutor` differs in two ways. One the `pymipool.SubprocessExecutor` does not provide any option for +resource assignment and two the `pymipool.SubprocessExecutor` returns the output of the command rather than just +returning the exit status when calling `result()`. + +### Interactive Shell +Beyond external executables which are called once with a set of input parameters and or input files and return one set +of outputs, there are some executables which allow the user to interact with the executable during the execution. The +challenge of interfacing a python process with such an interactive executable is to identify when the executable is ready +to receive the next input. A very basis example for an interactive executable is a script which counts to the number +input by the user. This can be written in python as `count.py`: +``` +def count(iterations): + for i in range(int(iterations)): + print(i) + print("done") + + +if __name__ == "__main__": + while True: + user_input = input() + if "shutdown" in user_input: + break + else: + count(iterations=int(user_input)) +``` +This example is challenging in terms of interfacing it with a python process as the length of the output changes depending +on the input. The first option that the `pympipool.ShellExecutor` provides is specifying the number of lines to read for +each call submitted to the executable using the `lines_to_read` parameter. In comparison to the `SubprocessExecutor` +defined above the `ShellExecutor` only supports the execution of a single executable at a time, correspondingly the input +parameters for calling the executable are provided at the time of initialization of the `ShellExecutor` and the inputs +are submitted using the `submit()` function: +``` +from pympipool import ShellExecutor + +with ShellExecutor(["python", "count.py"], universal_newlines=True) as exe: + future_lines = exe.submit(string_input="4", lines_to_read=5) + print(future_lines.done(), future_lines.result(), future_lines.done()) +>>> (False, "0\n1\n2\n3\ndone\n", True) +``` +The response for a given set of input is again returned as `concurrent.futures.Future` object, this allows the user to +execute other steps on the python side while waiting for the completion of the external executable. In this case the +example counts the numbers from `0` to `3` and prints each of them in one line followed by `done` to notify the user its +waiting for new inputs. This results in `n+1` lines of output for the input of `n`. Still predicting the number of lines +for a given input can be challenging, so the `pympipool.ShellExecutor` class also provides the option to wait until a +specific pattern is found in the output using the `stop_read_pattern`: +``` +from pympipool import ShellExecutor + +with ShellExecutor(["python", "count.py"], universal_newlines=True) as exe: + future_pattern = exe.submit(string_input="4", stop_read_pattern="done") + print(future_pattern.done(), future_pattern.result(), future_pattern.done()) +>>> (False, "0\n1\n2\n3\ndone\n", True) +``` +In this example the pattern simply searches for the string `done` in the output of the program and returns all the output +gathered from the executable since the last input as the result of the `concurrent.futures.Future` object returned after +the submission of the interactive command. \ No newline at end of file diff --git a/pympipool/__init__.py b/pympipool/__init__.py index dfd18e95..dc0bbf68 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -3,8 +3,8 @@ from ._version import get_versions from pympipool.mpi.executor import PyMPIExecutor from pympipool.shared.interface import SLURM_COMMAND -from pympipool.shell.executor import ShellExecutor -from pympipool.shell.interactive import ShellInteractiveExecutor +from pympipool.shell.executor import SubprocessExecutor +from pympipool.shell.interactive import ShellExecutor from pympipool.slurm.executor import PySlurmExecutor try: # The PyFluxExecutor requires flux-core to be installed. diff --git a/pympipool/shell/__init__.py b/pympipool/shell/__init__.py index 09f4b1cc..3086c26f 100644 --- a/pympipool/shell/__init__.py +++ b/pympipool/shell/__init__.py @@ -1,2 +1,2 @@ -from pympipool.shell.executor import ShellExecutor -from pympipool.shell.interactive import ShellInteractiveExecutor +from pympipool.shell.executor import SubprocessExecutor +from pympipool.shell.interactive import ShellExecutor diff --git a/pympipool/shell/executor.py b/pympipool/shell/executor.py index 4667e195..5b72d1ed 100644 --- a/pympipool/shell/executor.py +++ b/pympipool/shell/executor.py @@ -6,6 +6,12 @@ def execute_single_task(future_queue): + """ + Process items received via the queue. + + Args: + future_queue (queue.Queue): + """ while True: task_dict = future_queue.get() if "shutdown" in task_dict.keys() and task_dict["shutdown"]: @@ -31,7 +37,10 @@ def execute_single_task(future_queue): raise KeyError(task_dict) -class ShellStaticExecutor(ExecutorBase): +class SubprocessSingleExecutor(ExecutorBase): + """ + The pympipool.shell.SubprocessSingleExecutor is the internal worker for the pympipool.shell.SubprocessExecutor. + """ def __init__(self): super().__init__() self._process = RaisingThread( @@ -48,7 +57,26 @@ def submit(self, *args, **kwargs): return f -class ShellExecutor(ExecutorBase): +class SubprocessExecutor(ExecutorBase): + """ + The pympipool.shell.SubprocessExecutor enables the submission of command line calls via the subprocess.check_output() + interface of the python standard library. It is based on the concurrent.futures.Executor class and returns a + concurrent.futures.Future object for every submitted command line call. Still it does not provide any option to + interact with the external executable during the execution. + + Args: + max_workers (int): defines the number workers which can execute functions in parallel + sleep_interval (float): synchronization interval - default 0.1 + + Examples: + + >>> from pympipool import SubprocessExecutor + >>> with SubprocessExecutor(max_workers=2) as exe: + >>> future = exe.submit(["echo", "test"], universal_newlines=True) + >>> print(future.done(), future.result(), future.done()) + (False, "test", True) + + """ def __init__( self, max_workers=1, @@ -62,12 +90,19 @@ def __init__( "future_queue": self._future_queue, "max_workers": max_workers, "sleep_interval": sleep_interval, - "executor_class": ShellStaticExecutor, + "executor_class": SubprocessSingleExecutor, }, ) self._process.start() def submit(self, *args, **kwargs): + """ + Submit a command line call to be executed. The given arguments are provided to subprocess.Popen() as additional + inputs to control the execution. + + Returns: + A Future representing the given call. + """ f = Future() self._future_queue.put({"future": f, "args": args, "kwargs": kwargs}) return f diff --git a/pympipool/shell/interactive.py b/pympipool/shell/interactive.py index acf7e295..f990c8e3 100644 --- a/pympipool/shell/interactive.py +++ b/pympipool/shell/interactive.py @@ -7,11 +7,24 @@ def wait_for_process_to_stop(process, sleep_interval=10e-10): + """ + Wait for the subprocess.Popen() process to stop executing + + Args: + process (subprocess.Popen): process object + sleep_interval (float): interval to sleep during poll() calls + """ while process.poll() is None: sleep(sleep_interval) def execute_single_task(future_queue): + """ + Process items received via the queue. + + Args: + future_queue (queue.Queue): + """ process = None while True: task_dict = future_queue.get() @@ -70,7 +83,27 @@ def execute_single_task(future_queue): raise ValueError("process exited") -class ShellInteractiveExecutor(ExecutorBase): +class ShellExecutor(ExecutorBase): + """ + In contrast to the other pympipool.shell.SubprocessExecutor and the pympipool.Executor the pympipool.shell.ShellExecutor + can only execute a single process at a given time. Still it adds the capability to interact with this process during + its execution. The initialization of the pympipool.shell.ShellExecutor takes the same input arguments as the + subprocess.Popen() call for the standard library to start a subprocess. + + Examples + + >>> from pympipool import ShellExecutor + >>> with ShellExecutor(["python", "count.py"], universal_newlines=True) as exe: + >>> future_lines = exe.submit(string_input="4", lines_to_read=5) + >>> print(future_lines.done(), future_lines.result(), future_lines.done()) + (False, "0\n1\n2\n3\ndone\n", True) + + >>> from pympipool import ShellExecutor + >>> with ShellExecutor(["python", "count.py"], universal_newlines=True) as exe: + >>> future_pattern = exe.submit(string_input="4", stop_read_pattern="done") + >>> print(future_pattern.done(), future_pattern.result(), future_pattern.done()) + (False, "0\n1\n2\n3\ndone\n", True) + """ def __init__(self, *args, **kwargs): super().__init__() self._process = RaisingThread( @@ -83,6 +116,20 @@ def __init__(self, *args, **kwargs): self._future_queue.put({"init": True, "args": args, "kwargs": kwargs}) def submit(self, string_input, lines_to_read=None, stop_read_pattern=None): + """ + Submit the input as a string to the executable. In addition to the input the ShellExecutor also needs a measure + to identify the completion of the execution. This can either be provided based on the number of lines to read + using the `lines_to_read` parameter or by providing a string pattern using the `stop_read_pattern` to stop + reading new lines. One of these two stopping criteria has to be defined. + + Args: + string_input (str): Input to be communicated to the underlying executable + lines_to_read (None/int): integer number of lines to read from the command line (optional) + stop_read_pattern (None/str): string pattern to indicate the command line output is completed (optional) + + Returns: + A Future representing the given call. + """ if lines_to_read is None and stop_read_pattern is None: raise ValueError( "Either the number of lines_to_read (int) or the stop_read_pattern (str) has to be defined." @@ -101,6 +148,19 @@ def submit(self, string_input, lines_to_read=None, stop_read_pattern=None): return f def shutdown(self, wait=True, *, cancel_futures=False): + """Clean-up the resources associated with the Executor. + + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. + + Args: + wait: If True then shutdown will not return until all running + futures have finished executing and the resources used by the + parallel_executors have been reclaimed. + cancel_futures: If True then shutdown will cancel all pending + futures. Futures that are completed or running will not be + cancelled. + """ if cancel_futures: cancel_items_in_queue(que=self._future_queue) self._future_queue.put({"shutdown": True, "wait": wait}) diff --git a/tests/test_shell_interactive.py b/tests/test_shell_interactive.py index 2f0406fe..3c98cd6a 100644 --- a/tests/test_shell_interactive.py +++ b/tests/test_shell_interactive.py @@ -4,7 +4,7 @@ from unittest import TestCase -from pympipool.shell.interactive import ShellInteractiveExecutor, execute_single_task +from pympipool.shell.interactive import ShellExecutor, execute_single_task class ShellInteractiveExecutorTest(TestCase): @@ -28,7 +28,7 @@ def test_execute_single_task(self): self.assertEqual("0\n1\n2\n3\ndone\n", future_pattern.result()) def test_shell_interactive_executor(self): - with ShellInteractiveExecutor(["python", self.executable_path], universal_newlines=True) as exe: + with ShellExecutor(["python", self.executable_path], universal_newlines=True) as exe: future_lines = exe.submit(string_input="4", lines_to_read=5, stop_read_pattern=None) future_pattern = exe.submit(string_input="4", lines_to_read=None, stop_read_pattern="done") self.assertFalse(future_lines.done()) diff --git a/tests/test_shell.py b/tests/test_subprocess_executor.py similarity index 64% rename from tests/test_shell.py rename to tests/test_subprocess_executor.py index ed9658d9..209dffb2 100644 --- a/tests/test_shell.py +++ b/tests/test_subprocess_executor.py @@ -3,10 +3,10 @@ from unittest import TestCase -from pympipool.shell.executor import ShellStaticExecutor, ShellExecutor, execute_single_task +from pympipool.shell.executor import SubprocessSingleExecutor, SubprocessExecutor, execute_single_task -class ShellExecutorTest(TestCase): +class SubprocessExecutorTest(TestCase): def test_execute_single_task(self): test_queue = queue.Queue() f = Future() @@ -17,15 +17,29 @@ def test_execute_single_task(self): self.assertTrue(f.done()) self.assertEqual("test\n", f.result()) - def test_shell_static_executor(self): - with ShellStaticExecutor() as exe: - future = exe.submit(["echo", "test"], universal_newlines=True) + def test_shell_static_executor_args(self): + with SubprocessSingleExecutor() as exe: + future = exe.submit(["echo", "test"], universal_newlines=True, shell=False) + self.assertFalse(future.done()) + self.assertEqual("test\n", future.result()) + self.assertTrue(future.done()) + + def test_shell_static_executor_binary(self): + with SubprocessSingleExecutor() as exe: + future = exe.submit(["echo", "test"], universal_newlines=False, shell=False) + self.assertFalse(future.done()) + self.assertEqual(b"test\n", future.result()) + self.assertTrue(future.done()) + + def test_shell_static_executor_shell(self): + with SubprocessSingleExecutor() as exe: + future = exe.submit("echo test", universal_newlines=True, shell=True) self.assertFalse(future.done()) self.assertEqual("test\n", future.result()) self.assertTrue(future.done()) def test_shell_executor(self): - with ShellExecutor(max_workers=2) as exe: + with SubprocessExecutor(max_workers=2) as exe: f_1 = exe.submit(["echo", "test_1"], universal_newlines=True) f_2 = exe.submit(["echo", "test_2"], universal_newlines=True) f_3 = exe.submit(["echo", "test_3"], universal_newlines=True)