diff --git a/notebooks/1-local.ipynb b/notebooks/1-local.ipynb index 7cb72eab..f4418597 100644 --- a/notebooks/1-local.ipynb +++ b/notebooks/1-local.ipynb @@ -49,8 +49,8 @@ "output_type": "stream", "text": [ "2\n", - "CPU times: user 73.1 ms, sys: 49.3 ms, total: 122 ms\n", - "Wall time: 804 ms\n" + "CPU times: user 100 ms, sys: 70.7 ms, total: 171 ms\n", + "Wall time: 1.94 s\n" ] } ], @@ -80,8 +80,8 @@ "output_type": "stream", "text": [ "[4, 6, 8]\n", - "CPU times: user 44.3 ms, sys: 27 ms, total: 71.3 ms\n", - "Wall time: 1.35 s\n" + "CPU times: user 49.4 ms, sys: 29.2 ms, total: 78.7 ms\n", + "Wall time: 1.75 s\n" ] } ], @@ -111,8 +111,8 @@ "output_type": "stream", "text": [ "[10, 12, 14]\n", - "CPU times: user 32.9 ms, sys: 25.2 ms, total: 58.1 ms\n", - "Wall time: 968 ms\n" + "CPU times: user 40.5 ms, sys: 28.1 ms, total: 68.6 ms\n", + "Wall time: 1.09 s\n" ] } ], @@ -204,14 +204,6 @@ "Another option is to set the resource dictionary parameter `resource_dict` during the initialization of the `Executor`. In this case it is internally set for every call of the `submit()` function, without the need to specify it again." ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "dcc06777-ae51-469b-bead-63ce402efe04", - "metadata": {}, - "outputs": [], - "source": [] - }, { "cell_type": "code", "execution_count": 7, @@ -351,7 +343,7 @@ "metadata": {}, "source": [ "### Block Allocation\n", - "By default each submitted Python function is executed in a dedicated process. This gurantees that the execution of the submitted Python function starts with a fresh process. Still the initialization of the Python process takes time. Especially when the call of the Python function requires only limited computational resources it makes sense to reuse the existing Python process for the execution of multiple Python functions. In executorlib this functionality is enabled by setting the `block_allocation` parameter to `Ture`." + "By default each submitted Python function is executed in a dedicated process. This gurantees that the execution of the submitted Python function starts with a fresh process. Still the initialization of the Python process takes time. Especially when the call of the Python function requires only limited computational resources it makes sense to reuse the existing Python process for the execution of multiple Python functions. In executorlib this functionality is enabled by setting the `block_allocation` parameter to `Ture`. To limit the number of parallel Python processes when using block allocation it is recommended to set the `max_workers` parameter to restrict the number of available computing resources. " ] }, { @@ -365,14 +357,14 @@ "output_type": "stream", "text": [ "2\n", - "CPU times: user 34.5 ms, sys: 32.1 ms, total: 66.6 ms\n", - "Wall time: 1.26 s\n" + "CPU times: user 37.1 ms, sys: 21.8 ms, total: 58.9 ms\n", + "Wall time: 1.09 s\n" ] } ], "source": [ "%%time\n", - "with Executor(backend=\"local\", block_allocation=True) as exe:\n", + "with Executor(max_workers=2, backend=\"local\", block_allocation=True) as exe:\n", " future = exe.submit(sum, [1, 1])\n", " print(future.result())" ] @@ -431,16 +423,41 @@ "experience performance degradation.\n", "\n", " Local host: MacBook-Pro.local\n", - " System call: unlink(2) /var/folders/z7/3vhrmssx60v240x_ndq448h80000gn/T//ompi.MacBook-Pro.501/pid.14134/1/vader_segment.MacBook-Pro.501.765b0001.1\n", + " System call: unlink(2) /var/folders/z7/3vhrmssx60v240x_ndq448h80000gn/T//ompi.MacBook-Pro.501/pid.22031/1/vader_segment.MacBook-Pro.501.17620001.1\n", + " Error: No such file or directory (errno 2)\n", + "--------------------------------------------------------------------------\n", + "--------------------------------------------------------------------------\n", + "A system call failed during shared memory initialization that should\n", + "not have. It is likely that your MPI job will now either abort or\n", + "experience performance degradation.\n", + "\n", + " Local host: MacBook-Pro.local\n", + " System call: unlink(2) /var/folders/z7/3vhrmssx60v240x_ndq448h80000gn/T//ompi.MacBook-Pro.501/pid.22028/1/vader_segment.MacBook-Pro.501.17610001.1\n", + " Error: No such file or directory (errno 2)\n", + "--------------------------------------------------------------------------\n", + "--------------------------------------------------------------------------\n", + "A system call failed during shared memory initialization that should\n", + "not have. It is likely that your MPI job will now either abort or\n", + "experience performance degradation.\n", + "\n", + " Local host: MacBook-Pro.local\n", + " System call: unlink(2) /var/folders/z7/3vhrmssx60v240x_ndq448h80000gn/T//ompi.MacBook-Pro.501/pid.22030/1/vader_segment.MacBook-Pro.501.17630001.1\n", + " Error: No such file or directory (errno 2)\n", + "--------------------------------------------------------------------------\n", + "--------------------------------------------------------------------------\n", + "A system call failed during shared memory initialization that should\n", + "not have. It is likely that your MPI job will now either abort or\n", + "experience performance degradation.\n", + "\n", + " Local host: MacBook-Pro.local\n", + " System call: unlink(2) /var/folders/z7/3vhrmssx60v240x_ndq448h80000gn/T//ompi.MacBook-Pro.501/pid.22029/1/vader_segment.MacBook-Pro.501.17600001.1\n", " Error: No such file or directory (errno 2)\n", "--------------------------------------------------------------------------\n" ] } ], "source": [ - "with Executor(\n", - " backend=\"local\", resource_dict={\"cores\": 2}, block_allocation=True\n", - ") as exe:\n", + "with Executor(backend=\"local\", resource_dict={\"cores\": 2}, block_allocation=True) as exe:\n", " fs = exe.submit(calc_mpi, 3)\n", " print(fs.result())" ] @@ -535,7 +552,7 @@ "output_type": "stream", "text": [ "[2, 4, 6]\n", - "CPU times: user 526 ms, sys: 134 ms, total: 661 ms\n", + "CPU times: user 547 ms, sys: 161 ms, total: 708 ms\n", "Wall time: 1.33 s\n" ] } @@ -568,8 +585,8 @@ "output_type": "stream", "text": [ "[2, 4, 6]\n", - "CPU times: user 41.4 ms, sys: 31.7 ms, total: 73.1 ms\n", - "Wall time: 989 ms\n" + "CPU times: user 52.1 ms, sys: 41.1 ms, total: 93.2 ms\n", + "Wall time: 1.13 s\n" ] } ], diff --git a/notebooks/2-hpc-submission.ipynb b/notebooks/2-hpc-submission.ipynb index e0b77224..94988196 100644 --- a/notebooks/2-hpc-submission.ipynb +++ b/notebooks/2-hpc-submission.ipynb @@ -5,56 +5,123 @@ "id": "ddf66f38-dc4a-4306-8b1c-b923fdb76922", "metadata": {}, "source": [ - "# HPC Submission" + "# HPC Submission Mode\n", + "In contrast to the [local mode] and the [HPC allocation mode] the HPC Submission Mode does not communicate via the [zero message queue](https://zeromq.org) but instead stores the python functions on the file system and uses the job scheduler to handle the dependencies of the Python functions. Consequently, the block allocation `block_allocation` and the init function `init_function` are not available in HPC Submission mode. At the same time it is possible to close the Python process which created the `Executor`, wait until the execution of the submitted Python functions is completed and afterwards reload the results from the cache. \n", + "\n", + "Internally the HPC submission mode is using the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io) to connect to HPC job schedulers and the [h5py](https://www.h5py.org) package for serializing the Python functions to store them on the file system. Both packages are optional dependency of executorlib. The installation of the [pysqa](https://pysqa.readthedocs.io) package and the [h5py](https://www.h5py.org) package are covered in the installation section. " ] }, { "cell_type": "markdown", - "id": "2a814efb-2fbc-41ba-98df-cf121d19ea66", + "id": "d56862a6-8279-421d-a090-7ca2a3c4d416", "metadata": {}, "source": [ - "## pysqa" + "## SLURM\n", + "The [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com) job scheduler is currently the most commonly used job scheduler for HPC clusters. In the HPC submission mode executorlib internally uses the [sbatch](https://slurm.schedmd.com/sbatch.html) command this is in contrast to the [HPC allocatiom mode] which internally uses the [srun](https://slurm.schedmd.com/srun.html) command. \n", + "\n", + "The connection to the job scheduler is based on the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io). It provides a default configuration for most commonly used job schedulers including SLURM, in addition it is also possible to provide the submission template as part of the resource dictionary `resource_dict` or via the path to the configuration directory with the `pysqa_config_directory` parameter. All three options are covered in more detail on the [pysqa documentation](https://pysqa.readthedocs.io)." ] }, { - "cell_type": "markdown", - "id": "27f26e48-d4b6-4703-9fc5-b38a18877a05", + "cell_type": "code", + "execution_count": 1, + "id": "2205bc59-d943-4556-8b28-3cc22922fc06", "metadata": {}, + "outputs": [], "source": [ - "### Basic" + "from executorlib import Executor" ] }, { - "cell_type": "code", - "execution_count": 1, - "id": "cc7f6ada-3152-4c59-af89-7d8f538779fe", + "cell_type": "markdown", + "id": "b20913f3-59e4-418c-a399-866124f8e497", "metadata": {}, - "outputs": [], "source": [ - "from executorlib import Executor" + "In comparison to the [Local Mode](), the only two parameters which are changed are the specification of the backend as `backend=\"slurm_submission\"` and the requirement to specify the cache directory using the `cache_directory=\"./cache\"`. The rest of the syntax remains exactly the same, to simplify the up-scaling of simulation workflows. " ] }, { - "cell_type": "code", - "execution_count": 2, - "id": "123fd1cb-320d-477c-b087-248ca9c5701f", + "cell_type": "markdown", + "id": "0b8f3b77-6199-4736-9f28-3058c5230777", "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "[2, 4, 6]\n", - "CPU times: user 1.38 s, sys: 514 ms, total: 1.89 s\n", - "Wall time: 4.57 s\n" - ] - } - ], "source": [ - "%%time\n", - "with Executor(backend=\"flux_submission\", cache_directory=\"./cache\") as exe:\n", + "```python\n", + "with Executor(backend=\"slurm_submission\", cache_directory=\"./cache\") as exe:\n", " future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n", - " print([f.result() for f in future_lst])" + " print([f.result() for f in future_lst])\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "37bef7ac-ce3e-4d8a-b848-b1474c370bca", + "metadata": {}, + "source": [ + "Specific parameters for HPC submission mode like the maximum run time `\"run_time_max\"`, the maximum memory `\"memory_max\"` or the submission template for the job submission script `\"submission_template\"` can be specified as part of the resource dictionary. Again it is possible to specify the resource dictonary `resource_dicionary` either for each function in the `submit()` function or during the initialization of the `Executor`. " + ] + }, + { + "cell_type": "markdown", + "id": "658781de-f222-4235-8c26-b0f77a0831b3", + "metadata": {}, + "source": [ + "```python\n", + "submission_template = \"\"\"\\\n", + "#!/bin/bash\n", + "#SBATCH --output=time.out\n", + "#SBATCH --job-name={{job_name}}\n", + "#SBATCH --chdir={{working_directory}}\n", + "#SBATCH --get-user-env=L\n", + "#SBATCH --partition={{partition}}\n", + "{%- if run_time_max %}\n", + "#SBATCH --time={{ [1, run_time_max // 60]|max }}\n", + "{%- endif %}\n", + "{%- if dependency %}\n", + "#SBATCH --dependency=afterok:{{ dependency | join(',') }}\n", + "{%- endif %}\n", + "{%- if memory_max %}\n", + "#SBATCH --mem={{memory_max}}G\n", + "{%- endif %}\n", + "#SBATCH --cpus-per-task={{cores}}\n", + "\n", + "{{command}}\n", + "\"\"\"\n", + "\n", + "with Executor(backend=\"slurm_submission\", cache_directory=\"./cache\") as exe:\n", + " future = exe.submit(\n", + " sum, [4, 4], \n", + " resource_dict={\n", + " \"submission_template\": submission_template, \n", + " \"run_time_max\": 180, # in seconds \n", + " })\n", + " print([f.result() for f in future_lst])\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "f7ad9c97-7743-4f87-9344-4299b2b31a56", + "metadata": {}, + "source": [ + "With these options executorlib in combination with the SLURM job scheduler provides a lot flexibility to configure the submission of Python functions depending on the specific configuration of the job scheduler. " + ] + }, + { + "cell_type": "markdown", + "id": "2a814efb-2fbc-41ba-98df-cf121d19ea66", + "metadata": {}, + "source": [ + "## Flux\n", + "While most HPC job schedulers require extensive configuration before they can be tested, the [flux framework](http://flux-framework.org) can be installed with the conda package manager, as explained in the [installation section](). This simple installation makes the flux framework especially suitable for demonstrations, testing and continous integration. So below a number of features for the HPC submission mode are demonstrated based on the example of the [flux framework](http://flux-framework.org) still the same applies to other job schedulers like SLURM introduced above." + ] + }, + { + "cell_type": "markdown", + "id": "29d7aa18-357e-416e-805c-1322b59abec1", + "metadata": {}, + "source": [ + "### Dependencies\n", + "As already demonstrated for the [Local Mode]() the `Executor` class from executorlib is capable of resolving the dependencies of serial functions, when [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects are used as inputs for subsequent function calls. For the case of the HPC submission these dependencies are communicated to the job scheduler, which allows to stop the Python process which created the `Executor` class, wait until the execution of the submitted Python functions is completed and afterwards restart the Python process for the `Executor` class and reload the calculation results from the cache defined by the `cache_directory` parameter." ] }, { @@ -84,19 +151,27 @@ ], "source": [ "with Executor(backend=\"flux_submission\", cache_directory=\"./cache\") as exe:\n", - " future = None\n", + " future = 0\n", " for i in range(4, 8):\n", - " if future is None:\n", - " future = exe.submit(add_funct, i, i)\n", - " else:\n", - " future = exe.submit(add_funct, i, future)\n", + " future = exe.submit(add_funct, i, future)\n", " print(future.result())" ] }, + { + "cell_type": "markdown", + "id": "ca75cb6c-c50f-4bee-9b09-d8d29d6c263b", + "metadata": {}, + "source": [ + "### Resource Assignment\n", + "In analogy to the [Local Mode]() the resource assignment for the HPC submission mode is handled by either including the resource dictionary parameter `resource_dict` in the initialization of the `Executor` class or in every call of the `submit()` function. \n", + "\n", + "Below this is demonstrated once for the assignment of muliple CPU cores for the execution of a Python function which internally uses the message passing interface (MPI) via the [mpi4py](https://mpi4py.readthedocs.io) package. " + ] + }, { "cell_type": "code", - "execution_count": 5, - "id": "cf3a2ad4-ce95-4b89-9297-db1a21a157c4", + "execution_count": null, + "id": "cc46ac38-796d-49c6-ba40-c425fb717e84", "metadata": {}, "outputs": [], "source": [ @@ -128,6 +203,44 @@ " print(fs.result())" ] }, + { + "cell_type": "markdown", + "id": "d91499d7-5c6c-4c10-b7b7-bfc4b87ddaa8", + "metadata": {}, + "source": [ + "Beyond CPU cores and threads which were previously also introduced for the [Local Mode]() the HPC submission mode also provides the option to select the available accelerator cards or GPUs, by specifying the `\"gpus_per_core\"` parameter in the resource dictionary `resource_dict`. For demonstration we create a Python function which reads the GPU device IDs and submit it to the `Executor` class:\n", + "```python\n", + "def get_available_gpus():\n", + " import socket\n", + " from tensorflow.python.client import device_lib\n", + " local_device_protos = device_lib.list_local_devices()\n", + " return [\n", + " (x.name, x.physical_device_desc, socket.gethostname()) \n", + " for x in local_device_protos if x.device_type == 'GPU'\n", + " ]\n", + "```\n", + "\n", + "```python\n", + "with Executor(\n", + " backend=\"flux_submission\",\n", + " cache_directory=\"./cache\",\n", + " resource_dict={\"gpus_per_core\": 1}\n", + ") as exe:\n", + " fs_1 = exe.submit(get_available_gpus)\n", + " fs_2 = exe.submit(get_available_gpus)\n", + " print(fs_1.result(), fs_2.result())\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "3f47fd34-04d1-42a7-bb06-6821dc99a648", + "metadata": {}, + "source": [ + "### Cleaning Cache\n", + "Finally, as the HPC Submission Mode leverages the file system to communicate serialized Python functions, it is important to clean up the cache directory specified by the `cache_directory` parameter once the results of the submitted Python functions are no longer needed. The serialized Python functions are stored in binary format using the [cloudpickle](https://github.com/cloudpipe/cloudpickle) library for serialization. This format is design for caching but not for long-term storage. The user is responsible for the long-term storage of their data." + ] + }, { "cell_type": "code", "execution_count": 7, @@ -155,26 +268,6 @@ " pass" ] }, - { - "cell_type": "markdown", - "id": "7840db10-7555-4849-b57d-6d7d1c28f897", - "metadata": {}, - "source": [ - "### Resource Assignment \n", - "* Threads\n", - "* MPI\n", - "* GPU" - ] - }, - { - "cell_type": "markdown", - "id": "8177f422-cec1-49cb-871b-53b7d06fe9ec", - "metadata": {}, - "source": [ - "### Advanced Configuration\n", - "* explain config directory and submission templates" - ] - }, { "cell_type": "code", "execution_count": null, @@ -200,7 +293,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.10" + "version": "3.12.5" } }, "nbformat": 4,