Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask aurora #583

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 229 additions & 0 deletions docs/aurora/data-science/frameworks/dask.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
# Dask

[Dask](https://www.dask.org/) is a Python library for parallel and distributed computing.
A Dask cluster is composed by one scheduler that coordinates the job of many workers, which can have access to CPU or GPU resources.
Here we show how to install Dask in a conda environment on Aurora and how to start a cluster with GPU workers and run a simple example script.


## Install Dask on Aurora

From one of Aurora's login nodes, use the following commands to create a conda environment and install Dask.
This will also install other libraries needed to run an example script, and create a Jupyter kernel that allows to work interactively from a notebook.

```bash
module load frameworks
conda create -y -n dask -c conda-forge python=3.11 pip dask ipykernel jupyterlab
conda activate dask
# install additional libraries
conda install -y -c https://software.repos.intel.com/python/conda/ -c conda-forge dpnp
# create the jupyter kernel
python -m ipykernel install --prefix=${CONDA_PREFIX} --name dask
```


## Start a Dask cluster

Copy the following script into a file called `start_dask_aurora.sh` and make it executable with

```bash
chmod a+x ./start_dask_aurora.sh
```

### `start_dask_aurora.sh`

```bash
#!/bin/bash

# start_dask_aurora.sh
# Usage:
# mpiexec -n NNODES * NUM_WORKERS_PER_NODE --ppn NUM_WORKERS_PER_NODE ./start_dask_aurora.sh WORKER_TYPE NUM_WORKERS_PER_NODE
# Examples on two nodes:
# mpiexec -n 12 --ppn 6 ./start_dask_aurora.sh gpu 6
# mpiexec -n 208 --ppn 104 ./start_dask_aurora.sh cpu 104

WORKER_TYPE=$1
NUM_WORKERS_PER_NODE=$2
# if using 12 GPU workers, assign one worker per tile, otherwise uses one worker per GPU (2 tiles)
if [ $NUM_WORKERS_PER_NODE = 12 ] && [ $WORKER_TYPE = 'gpu' ]; then
export ZE_FLAT_DEVICE_HIERARCHY=FLAT
export ZE_ENABLE_PCI_ID_DEVICE_ORDER=1
else
export ZE_FLAT_DEVICE_HIERARCHY=COMPOSITE
fi

# Number of threads per worker (208 CPU threads per node divided by num workers)
NTHREADS=$(( 208 / NUM_WORKERS_PER_NODE )) # 208 / 12 ≈ 17
# Memory limit per worker (1100GB RAM per node divided by num workers)
MEMORY_PER_WORKER=$(( 1100 / NUM_WORKERS_PER_NODE ))GB # 1100GB / 12 ≈ 91GB
LOCAL_DIRECTORY=~/dask-local-directory
DASK_DASHBOARD_PORT=${DASK_DASHBOARD_PORT:-8787}
DASK_SCHEDULER_PORT=${DASK_SCHEDULER_PORT:-8786}

# Start Dask scheduler on rank 0
if [ $PALS_RANKID = 0 ]; then
# Purge Dask worker, log directories and config directories
rm -rf ${LOCAL_DIRECTORY}/* /tmp/dask-workers/* ~/.config/dask
mkdir -p ${LOCAL_DIRECTORY}/logs /tmp/dask-workers
# Setup scheduler
nohup dask scheduler --port ${DASK_SCHEDULER_PORT} --dashboard-address $DASK_DASHBOARD_PORT \
--scheduler-file ${LOCAL_DIRECTORY}/scheduler.json > ${LOCAL_DIRECTORY}/logs/$HOSTNAME-scheduler.log 2>&1 &
fi
sleep 10
# Setup workers
if [ $WORKER_TYPE = 'gpu' ]; then
ZE_AFFINITY_MASK=$PALS_LOCAL_RANKID dask worker \
--resources "GPU=1" --memory-limit ${MEMORY_PER_WORKER} \
--nthreads ${NTHREADS} --local-directory /tmp/dask-workers \
--scheduler-file ${LOCAL_DIRECTORY}/scheduler.json >> ${LOCAL_DIRECTORY}/logs/$HOSTNAME-worker.log 2>&1
else
dask worker \
--nthreads ${NTHREADS} --local-directory /tmp/dask-workers \
--scheduler-file ${LOCAL_DIRECTORY}/scheduler.json >> ${LOCAL_DIRECTORY}/logs/$HOSTNAME-worker.log 2>&1
fi
```

### Start a cluster with CPU workers

Run the following commands from a compute node on Aurora to start a Dask cluster with 104 CPU workers per node:

```bash
module load frameworks
conda activate dask
NNODES=`wc -l < $PBS_NODEFILE`
mpiexec -n $(( $NNODES * 104 )) --ppn 104 ./start_dask_aurora.sh cpu 104
```

### Start a cluster with GPU workers

Run the following commands from a compute node on Aurora to start a Dask cluster with 6 GPU workers per node:

```bash
module load frameworks
conda activate dask
NNODES=`wc -l < $PBS_NODEFILE`
mpiexec -n $(( $NNODES * 6 )) --ppn 6 ./start_dask_aurora.sh gpu 6
```


## Example

In this example, we will [estimate Pi using a Monte Carlo method](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods).

Paste the following python script into a file called `pi_dask_gpu.py`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Paste the following python script into a file called `pi_dask_gpu.py`.
Paste the following Python script into a file called `pi_dask_gpu.py`.

Here is a breakdown of what the script does:
1. It connects to the Dask cluster (that you should start beforehand) and prints some information including the number of workers and available memory.
1. It divides the total number of points to sample between the workers, and each worker uses its GPU to
- generate random points uniformly inside the unit square
- return the number of points that are inside the unit circle
1. When the results from the workers are ready, they are aggregated to compute Pi.
1. A total of 5 Pi calculations are performed and timed (the very first iterations will incur in initialization and warmup costs).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
1. A total of 5 Pi calculations are performed and timed (the very first iterations will incur in initialization and warmup costs).
1. A total of 5x Pi calculations are performed and timed. Note, the very first iterations will incur initialization and warmup costs.

1. At the end, the Dask cluster is shut down.

### `pi_dask_gpu.py`

```python
import json
import pathlib
from dask.distributed import Client


fname = f'{pathlib.Path.home().as_posix()}/dask-local-directory/scheduler.json'
with open(fname, 'r') as f:
scheduler = json.load(f)
client = Client(scheduler['address'])
print(client.scheduler_info)


import time
import dpnp as np


def count_points_inside_circle(N):
x = np.random.uniform(low=-1.0, high=1.0, size=(N, 2))
inside_circle = ((x * x).sum(axis=1) < 1.).sum()
return int(inside_circle)


def compute_pi(inside_circle, N):
return 4 * inside_circle / N


def run():
start = time.time()
num_workers = len(client.scheduler_info()['workers'])
N = 10_400_000_004

# number of points per worker
Neach_section, extras = divmod(N, num_workers)
points_per_worker = [Neach_section for _ in range(num_workers)]
points_per_worker[-1] += extras

futures = client.map(count_points_inside_circle, points_per_worker)
inside_circle = client.submit(sum, futures).result()
pi = compute_pi(inside_circle, N)
end = time.time()
return f"Num samples: {N:.2E}\t\tEstimate: {pi:.9f}\t\tTime taken: {end - start:.3f} s"


def main(runs=5):
for i in range(runs):
print(f"Run {i}\t\t{run()}")


main()
client.shutdown()
```

### Run the `pi_dask_gpu.py` example

- First, request an interactive job on 1 node.
- Then, [start a Dask cluster with 6 GPU workers](#start-a-cluster-with-gpu-workers) and wait about 10 seconds for the cluster to start.
- Press `Ctrl+Z` to send the process to the background, or open a new shell and ssh into the compute node.
- Run the example script:
```bash
module load frameworks
conda activate dask
python pi_dask_gpu.py
```
- Output:
```bash
<bound method Client.scheduler_info of <Client: 'tcp://10.168.0.10:8786' processes=6 threads=204, memory=1.00 TiB>>
Run 0 Num samples: 1.04E+10 Estimate: 3.141653798 Time taken: 1.596 s
Run 1 Num samples: 1.04E+10 Estimate: 3.141570887 Time taken: 1.354 s
Run 2 Num samples: 1.04E+10 Estimate: 3.141651954 Time taken: 1.451 s
Run 3 Num samples: 1.04E+10 Estimate: 3.141636617 Time taken: 0.518 s
Run 4 Num samples: 1.04E+10 Estimate: 3.141650108 Time taken: 0.511 s
```

## Connect to a Dask cluster from JupyterLab

Here are the steps to start a Dask cluster and connect to it interactively from a Jupyter notebook:

- First, request an interactive job on 1 node. Print the compute node's hostname (that you get with the command `hostname`), which will be used later.
- Then, [start a Dask cluster](#start-a-cluster-with-gpu-workers) and wait about 10 seconds for the cluster to start.
- **On your local machine**, open a ssh tunnel to the compute node (`COMPUTE_NODE` is the compute node's hostname and `YOUR_ALCF_USERNAME` is your ALCF username):
```bash
ssh -t -L 23456:localhost:23456 -L 8787:localhost:8787 [email protected] ssh -t -L 23456:localhost:23456 -L 8787:localhost:8787 login.aurora.alcf.anl.gov ssh -t -L 23456:localhost:23456 -L 8787:localhost:8787 COMPUTE_NODE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get nervous when I see this multi-hop SSH tunnel line in our tutorials and docs, as there are several pitfalls as we saw during the scikit-learn hands-on exercise back in October: argonne-lcf/ALCF_Hands_on_HPC_Workshop#56

Are any of those pitfalls possible here, i.e. should we suggest installing the SSH keypair in ~/.ssh/authorized_keys for the compute node jump?

```
- **On the compute node** where you land with the above ssh command, start JupyterLab:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- **On the compute node** where you land with the above ssh command, start JupyterLab:
- **On the compute node** where you land with the above SSH command, start JupyterLab:

or

Suggested change
- **On the compute node** where you land with the above ssh command, start JupyterLab:
- **On the compute node** where you land with the above `ssh` command, start JupyterLab:

```bash
module load frameworks
conda activate dask
jupyter lab --no-browser --port=23456
```
- Copy the line starting with `http://localhost:23456/lab?token=<TOKEN>` at the end of the jupyter command's output.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Copy the line starting with `http://localhost:23456/lab?token=<TOKEN>` at the end of the jupyter command's output.
- Copy the line starting with `http://localhost:23456/lab?token=<TOKEN>` at the end of the Jupyter command's output.

- **On your local machine**, open a browser window and go to that url.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- **On your local machine**, open a browser window and go to that url.
- **On your local machine**, open a browser window and go to that URL.

- **On the JupyterLab page**, select the `dask` kernel and use this script to connect to the Dask cluster:
```python
import json
import pathlib
from dask.distributed import Client

fname = f'{pathlib.Path.home().as_posix()}/dask-local-directory/scheduler.json'
with open(fname, 'r') as f:
scheduler = json.load(f)
client = Client(scheduler['address'])
client
```
- The [Dask dashboard](https://docs.dask.org/en/latest/dashboard.html) will be available at <http://localhost:8787>

1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ nav:
- LibTorch: aurora/data-science/frameworks/libtorch.md
- OpenVINO: aurora/data-science/frameworks/openvino.md
- oneCCL: aurora/data-science/frameworks/oneCCL.md
- Dask: aurora/data-science/frameworks/dask.md
- Programming Models:
- Kokkos: aurora/programming-models/kokkos-aurora.md
- Level Zero: aurora/programming-models/level-0.md
Expand Down
Loading