diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 23d0af35f..9bfa630e1 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -2,8 +2,9 @@ dask_cuda/ @rapidsai/daskcuda-python-codeowners #build/ops code owners -.github/ @rapidsai/ops-codeowners +.github/ @rapidsai/ops-codeowners ci/ @rapidsai/ops-codeowners conda/ @rapidsai/ops-codeowners **/Dockerfile @rapidsai/ops-codeowners **/.dockerignore @rapidsai/ops-codeowners +dependencies.yaml @rapidsai/ops-codeowners diff --git a/.github/ops-bot.yaml b/.github/ops-bot.yaml index 0a52b6795..2d1444c59 100644 --- a/.github/ops-bot.yaml +++ b/.github/ops-bot.yaml @@ -5,4 +5,5 @@ auto_merger: true branch_checker: true label_checker: true release_drafter: true -external_contributors: false +copy_prs: true +recently_updated: true diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml new file mode 100644 index 000000000..bce48ebd8 --- /dev/null +++ b/.github/workflows/build.yaml @@ -0,0 +1,74 @@ +name: build + +on: + push: + branches: + - "branch-*" + tags: + - v[0-9][0-9].[0-9][0-9].[0-9][0-9] + workflow_dispatch: + inputs: + branch: + required: true + type: string + date: + required: true + type: string + sha: + required: true + type: string + build_type: + type: string + default: nightly + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + conda-python-build: + secrets: inherit + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.02 + with: + build_type: ${{ inputs.build_type || 'branch' }} + branch: ${{ inputs.branch }} + date: ${{ inputs.date }} + sha: ${{ inputs.sha }} + docs-build: + if: ${{ startsWith(github.ref, 'refs/heads/branch-') }} + needs: [conda-python-build] + secrets: inherit + uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.02 + with: + build_type: branch + node_type: "gpu-latest-1" + arch: "amd64" + container_image: "rapidsai/ci:latest" + run_script: "ci/build_docs.sh" + upload-conda: + needs: [conda-python-build] + secrets: inherit + uses: rapidsai/shared-action-workflows/.github/workflows/conda-upload-packages.yaml@branch-23.02 + with: + build_type: ${{ inputs.build_type || 'branch' }} + branch: ${{ inputs.branch }} + date: ${{ inputs.date }} + sha: ${{ inputs.sha }} + wheel-build: + runs-on: ubuntu-latest + container: + image: rapidsai/ci:latest + defaults: + run: + shell: bash + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + - name: Build wheel + run: ci/build_python_pypi.sh + - name: Publish distribution 📦 to PyPI + if: inputs.build_type == 'nightly' + uses: pypa/gh-action-pypi-publish@release/v1 + with: + password: ${{ secrets.RAPIDSAI_PYPI_TOKEN }} diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml new file mode 100644 index 000000000..3dee7d77f --- /dev/null +++ b/.github/workflows/pr.yaml @@ -0,0 +1,60 @@ +name: pr + +on: + push: + branches: + - "pull-request/[0-9]+" + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + pr-builder: + needs: + - checks + - conda-python-build + - conda-python-tests + - docs-build + - wheel-build + secrets: inherit + uses: rapidsai/shared-action-workflows/.github/workflows/pr-builder.yaml@branch-23.02 + checks: + secrets: inherit + uses: rapidsai/shared-action-workflows/.github/workflows/checks.yaml@branch-23.02 + conda-python-build: + needs: checks + secrets: inherit + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.02 + with: + build_type: pull-request + conda-python-tests: + needs: conda-python-build + secrets: inherit + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.02 + with: + build_type: pull-request + docs-build: + needs: conda-python-build + secrets: inherit + uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.02 + with: + build_type: pull-request + node_type: "gpu-latest-1" + arch: "amd64" + container_image: "rapidsai/ci:latest" + run_script: "ci/build_docs.sh" + wheel-build: + needs: checks + runs-on: ubuntu-latest + container: + image: rapidsai/ci:latest + defaults: + run: + shell: bash + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + - name: Build wheel + run: ci/build_python_pypi.sh diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml new file mode 100644 index 000000000..5c18a0b1c --- /dev/null +++ b/.github/workflows/test.yaml @@ -0,0 +1,24 @@ +name: test + +on: + workflow_dispatch: + inputs: + branch: + required: true + type: string + date: + required: true + type: string + sha: + required: true + type: string + +jobs: + conda-python-tests: + secrets: inherit + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.02 + with: + build_type: nightly + branch: ${{ inputs.branch }} + date: ${{ inputs.date }} + sha: ${{ inputs.sha }} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index bd2190660..030c454b6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,11 @@ repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.3.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer - repo: https://github.com/pycqa/isort - rev: 5.10.1 + rev: 5.12.0 hooks: - id: isort - repo: https://github.com/ambv/black @@ -11,5 +16,23 @@ repos: rev: 3.8.3 hooks: - id: flake8 + - repo: https://github.com/codespell-project/codespell + rev: v2.1.0 + hooks: + - id: codespell + exclude: | + (?x)^( + .*test.*| + ^CHANGELOG.md$| + ^.*versioneer.py$ + ) + - repo: https://github.com/pre-commit/mirrors-mypy + rev: 'v0.991' + hooks: + - id: mypy + additional_dependencies: [types-cachetools] + args: ["--module=dask_cuda", "--ignore-missing-imports"] + pass_filenames: false + default_language_version: python: python3 diff --git a/.readthedocs.yml b/.readthedocs.yml index 0b2ac73c0..fd5ccf688 100644 --- a/.readthedocs.yml +++ b/.readthedocs.yml @@ -4,4 +4,4 @@ sphinx: configuration: rtd/conf.py formats: - - htmlzip \ No newline at end of file + - htmlzip diff --git a/CHANGELOG.md b/CHANGELOG.md index 680c0d9d7..f82b7e59d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,44 @@ +# dask-cuda 23.02.00 (9 Feb 2023) + +## 🚨 Breaking Changes + +- Pin `dask` and `distributed` for release ([#1106](https://github.com/rapidsai/dask-cuda/pull/1106)) [@galipremsagar](https://github.com/galipremsagar) + +## 🐛 Bug Fixes + +- pre-commit: Update isort version to 5.12.0 ([#1098](https://github.com/rapidsai/dask-cuda/pull/1098)) [@wence-](https://github.com/wence-) +- explicit-comms: don't mix `-` and `_` in config ([#1096](https://github.com/rapidsai/dask-cuda/pull/1096)) [@madsbk](https://github.com/madsbk) +- Update `cudf.Buffer` pointer access method ([#1094](https://github.com/rapidsai/dask-cuda/pull/1094)) [@pentschev](https://github.com/pentschev) +- Update tests for Python 3.10 ([#1086](https://github.com/rapidsai/dask-cuda/pull/1086)) [@pentschev](https://github.com/pentschev) +- Use `pkgutil.iter_modules` to get un-imported module for `test_pre_import` ([#1085](https://github.com/rapidsai/dask-cuda/pull/1085)) [@charlesbluca](https://github.com/charlesbluca) +- Make proxy tests with `LocalCUDACluster` asynchronous ([#1084](https://github.com/rapidsai/dask-cuda/pull/1084)) [@pentschev](https://github.com/pentschev) +- Ensure consistent results from `safe_sizeof()` in test ([#1071](https://github.com/rapidsai/dask-cuda/pull/1071)) [@madsbk](https://github.com/madsbk) +- Pass missing argument to groupby benchmark compute ([#1069](https://github.com/rapidsai/dask-cuda/pull/1069)) [@mattf](https://github.com/mattf) +- Reorder channel priority. ([#1067](https://github.com/rapidsai/dask-cuda/pull/1067)) [@bdice](https://github.com/bdice) +- Fix owner check when the owner is a cupy array ([#1061](https://github.com/rapidsai/dask-cuda/pull/1061)) [@wence-](https://github.com/wence-) + +## 🛠️ Improvements + +- Pin `dask` and `distributed` for release ([#1106](https://github.com/rapidsai/dask-cuda/pull/1106)) [@galipremsagar](https://github.com/galipremsagar) +- Update shared workflow branches ([#1105](https://github.com/rapidsai/dask-cuda/pull/1105)) [@ajschmidt8](https://github.com/ajschmidt8) +- Proxify: make duplicate check optional ([#1101](https://github.com/rapidsai/dask-cuda/pull/1101)) [@madsbk](https://github.com/madsbk) +- Fix whitespace & add URLs in `pyproject.toml` ([#1092](https://github.com/rapidsai/dask-cuda/pull/1092)) [@jakirkham](https://github.com/jakirkham) +- pre-commit: spell, whitespace, and mypy check ([#1091](https://github.com/rapidsai/dask-cuda/pull/1091)) [@madsbk](https://github.com/madsbk) +- shuffle: use cuDF's `partition_by_hash()` when available ([#1090](https://github.com/rapidsai/dask-cuda/pull/1090)) [@madsbk](https://github.com/madsbk) +- add initial docs build ([#1089](https://github.com/rapidsai/dask-cuda/pull/1089)) [@AjayThorve](https://github.com/AjayThorve) +- Remove `--get-cluster-configuration` option, check for scheduler in `dask cuda config` ([#1088](https://github.com/rapidsai/dask-cuda/pull/1088)) [@charlesbluca](https://github.com/charlesbluca) +- Add timeout to `pytest` command ([#1082](https://github.com/rapidsai/dask-cuda/pull/1082)) [@ajschmidt8](https://github.com/ajschmidt8) +- shuffle-benchmark: add `--partition-distribution` ([#1081](https://github.com/rapidsai/dask-cuda/pull/1081)) [@madsbk](https://github.com/madsbk) +- Ensure tests run for Python `3.10` ([#1080](https://github.com/rapidsai/dask-cuda/pull/1080)) [@ajschmidt8](https://github.com/ajschmidt8) +- Use TrackingResourceAdaptor to get better debug info ([#1079](https://github.com/rapidsai/dask-cuda/pull/1079)) [@madsbk](https://github.com/madsbk) +- Improve shuffle-benchmark ([#1074](https://github.com/rapidsai/dask-cuda/pull/1074)) [@madsbk](https://github.com/madsbk) +- Update builds for CUDA `11.8` and Python `310` ([#1072](https://github.com/rapidsai/dask-cuda/pull/1072)) [@ajschmidt8](https://github.com/ajschmidt8) +- Shuffle by partition to reduce memory usage significantly ([#1068](https://github.com/rapidsai/dask-cuda/pull/1068)) [@madsbk](https://github.com/madsbk) +- Enable copy_prs. ([#1063](https://github.com/rapidsai/dask-cuda/pull/1063)) [@bdice](https://github.com/bdice) +- Add GitHub Actions Workflows ([#1062](https://github.com/rapidsai/dask-cuda/pull/1062)) [@bdice](https://github.com/bdice) +- Unpin `dask` and `distributed` for development ([#1060](https://github.com/rapidsai/dask-cuda/pull/1060)) [@galipremsagar](https://github.com/galipremsagar) +- Switch to the new dask CLI ([#981](https://github.com/rapidsai/dask-cuda/pull/981)) [@jacobtomlinson](https://github.com/jacobtomlinson) + # dask-cuda 22.12.00 (8 Dec 2022) ## 🚨 Breaking Changes diff --git a/ci/build_docs.sh b/ci/build_docs.sh new file mode 100755 index 000000000..338ff974c --- /dev/null +++ b/ci/build_docs.sh @@ -0,0 +1,38 @@ +#!/bin/bash + +set -euo pipefail + +rapids-logger "Create test conda environment" +. /opt/conda/etc/profile.d/conda.sh + +rapids-dependency-file-generator \ + --output conda \ + --file_key docs \ + --matrix "cuda=${RAPIDS_CUDA_VERSION%.*};arch=$(arch);py=${RAPIDS_PY_VERSION}" | tee env.yaml + +rapids-mamba-retry env create --force -f env.yaml -n docs +conda activate docs + +rapids-print-env + +rapids-logger "Downloading artifacts from previous jobs" + +PYTHON_CHANNEL=$(rapids-download-conda-from-s3 python) +VERSION_NUMBER=$(rapids-get-rapids-version-from-git) + +rapids-mamba-retry install \ + --channel "${PYTHON_CHANNEL}" \ + dask-cuda + +# Build Python docs +rapids-logger "Build Python docs" +pushd docs +sphinx-build -b dirhtml ./source _html +sphinx-build -b text ./source _text +popd + +if [[ "${RAPIDS_BUILD_TYPE}" == "branch" ]]; then + rapids-logger "Upload Docs to S3" + aws s3 sync --no-progress --delete docs/_html "s3://rapidsai-docs/dask-cuda/${VERSION_NUMBER}/html" + aws s3 sync --no-progress --delete docs/_text "s3://rapidsai-docs/dask-cuda/${VERSION_NUMBER}/txt" +fi diff --git a/ci/build_python.sh b/ci/build_python.sh new file mode 100755 index 000000000..4124a4c5a --- /dev/null +++ b/ci/build_python.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Copyright (c) 2022, NVIDIA CORPORATION. + +set -euo pipefail + +source rapids-env-update + +export CMAKE_GENERATOR=Ninja + +rapids-print-env + +rapids-logger "Begin py build" + +rapids-mamba-retry mambabuild \ + conda/recipes/dask-cuda + +rapids-upload-conda-to-s3 python diff --git a/ci/build_python_pypi.sh b/ci/build_python_pypi.sh new file mode 100755 index 000000000..5fea926cd --- /dev/null +++ b/ci/build_python_pypi.sh @@ -0,0 +1,18 @@ +#!/bin/bash + + +python -m pip install build --user + +# While conda provides these during conda-build, they are also necessary during +# the setup.py build for PyPI +export GIT_DESCRIBE_TAG=$(git describe --abbrev=0 --tags) +export GIT_DESCRIBE_NUMBER=$(git rev-list ${GIT_DESCRIBE_TAG}..HEAD --count) + +# Compute/export VERSION_SUFFIX +source rapids-env-update + +python -m build \ + --sdist \ + --wheel \ + --outdir dist/ \ + . diff --git a/ci/check_style.sh b/ci/check_style.sh new file mode 100755 index 000000000..be3ac3f4b --- /dev/null +++ b/ci/check_style.sh @@ -0,0 +1,18 @@ +#!/bin/bash +# Copyright (c) 2020-2022, NVIDIA CORPORATION. + +set -euo pipefail + +rapids-logger "Create checks conda environment" +. /opt/conda/etc/profile.d/conda.sh + +rapids-dependency-file-generator \ + --output conda \ + --file_key checks \ + --matrix "cuda=${RAPIDS_CUDA_VERSION%.*};arch=$(arch);py=${RAPIDS_PY_VERSION}" | tee env.yaml + +rapids-mamba-retry env create --force -f env.yaml -n checks +conda activate checks + +# Run pre-commit checks +pre-commit run --hook-stage manual --all-files --show-diff-on-failure diff --git a/ci/cpu/build.sh b/ci/cpu/build.sh index 5ed0a3221..b1b279641 100755 --- a/ci/cpu/build.sh +++ b/ci/cpu/build.sh @@ -24,7 +24,7 @@ export GPUCI_CONDA_RETRY_SLEEP=30 export INSTALL_DASK_MAIN=0 # Dask version to install when `INSTALL_DASK_MAIN=0` -export DASK_STABLE_VERSION="2022.11.1" +export DASK_STABLE_VERSION="2023.1.1" # Switch to project root; also root of repo checkout cd "$WORKSPACE" diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index 1e2479b76..2d6f35f10 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -26,7 +26,7 @@ cd "$WORKSPACE" export GIT_DESCRIBE_TAG=`git describe --tags` export MINOR_VERSION=`echo $GIT_DESCRIBE_TAG | grep -o -E '([0-9]+\.[0-9]+)'` export UCX_PATH=$CONDA_PREFIX -export UCXPY_VERSION=0.29.* +export UCXPY_VERSION=0.30 unset GIT_DESCRIBE_TAG # Enable NumPy's __array_function__ protocol (needed for NumPy 1.16.x, @@ -38,7 +38,7 @@ export NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1 export INSTALL_DASK_MAIN=0 # Dask version to install when `INSTALL_DASK_MAIN=0` -export DASK_STABLE_VERSION="2022.11.1" +export DASK_STABLE_VERSION="2023.1.1" # Temporary workaround for Jupyter errors. # See https://github.com/rapidsai/dask-cuda/issues/1040 diff --git a/ci/release/update-version.sh b/ci/release/update-version.sh index afd907b53..cab06b0ad 100755 --- a/ci/release/update-version.sh +++ b/ci/release/update-version.sh @@ -22,7 +22,7 @@ CURRENT_SHORT_TAG=${CURRENT_MAJOR}.${CURRENT_MINOR} NEXT_MAJOR=$(echo $NEXT_FULL_TAG | awk '{split($0, a, "."); print a[1]}') NEXT_MINOR=$(echo $NEXT_FULL_TAG | awk '{split($0, a, "."); print a[2]}') NEXT_SHORT_TAG=${NEXT_MAJOR}.${NEXT_MINOR} -NEXT_UCXPY_VERSION="$(curl -s https://version.gpuci.io/rapids/${NEXT_SHORT_TAG}).*" +NEXT_UCXPY_VERSION="$(curl -s https://version.gpuci.io/rapids/${NEXT_SHORT_TAG})" echo "Preparing release $CURRENT_TAG => $NEXT_FULL_TAG" @@ -33,3 +33,13 @@ function sed_runner() { # Update UCX-Py version sed_runner "s/export UCXPY_VERSION=.*/export UCXPY_VERSION="${NEXT_UCXPY_VERSION}"/g" ci/gpu/build.sh + +# Bump cudf and dask-cudf testing dependencies +sed_runner "s/cudf=.*/cudf=${NEXT_SHORT_TAG}/g" dependencies.yaml +sed_runner "s/dask-cudf=.*/dask-cudf=${NEXT_SHORT_TAG}/g" dependencies.yaml +sed_runner "s/cucim=.*/cucim=${NEXT_SHORT_TAG}/g" dependencies.yaml +sed_runner "s/ucx-py=.*/ucx-py=${NEXT_UCXPY_VERSION}/g" dependencies.yaml + +for FILE in .github/workflows/*.yaml; do + sed_runner "/shared-action-workflows/ s/@.*/@branch-${NEXT_SHORT_TAG}/g" "${FILE}" +done diff --git a/ci/test_python.sh b/ci/test_python.sh new file mode 100755 index 000000000..bf221f498 --- /dev/null +++ b/ci/test_python.sh @@ -0,0 +1,89 @@ +#!/bin/bash +# Copyright (c) 2022, NVIDIA CORPORATION. + +set -euo pipefail + +. /opt/conda/etc/profile.d/conda.sh + +rapids-logger "Generate Python testing dependencies" +rapids-dependency-file-generator \ + --output conda \ + --file_key test_python \ + --matrix "cuda=${RAPIDS_CUDA_VERSION%.*};arch=$(arch);py=${RAPIDS_PY_VERSION}" | tee env.yaml + +rapids-mamba-retry env create --force -f env.yaml -n test + +# Temporarily allow unbound variables for conda activation. +set +u +conda activate test +set -u + +rapids-logger "Downloading artifacts from previous jobs" +PYTHON_CHANNEL=$(rapids-download-conda-from-s3 python) + +RAPIDS_TESTS_DIR=${RAPIDS_TESTS_DIR:-"${PWD}/test-results"} +RAPIDS_COVERAGE_DIR=${RAPIDS_COVERAGE_DIR:-"${PWD}/coverage-results"} +mkdir -p "${RAPIDS_TESTS_DIR}" "${RAPIDS_COVERAGE_DIR}" +SUITEERROR=0 + +rapids-print-env + +rapids-mamba-retry install \ + -c "${PYTHON_CHANNEL}" \ + dask-cuda + +rapids-logger "Check GPU usage" +nvidia-smi + +set +e + +rapids-logger "pytest dask-cuda" +pushd dask_cuda +DASK_CUDA_TEST_SINGLE_GPU=1 \ +UCXPY_IFNAME=eth0 \ +UCX_WARN_UNUSED_ENV_VARS=n \ +UCX_MEMTYPE_CACHE=n \ +timeout 30m pytest \ + -vv \ + --capture=no \ + --cache-clear \ + --junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cuda.xml" \ + --cov-config=../pyproject.toml \ + --cov=dask_cuda \ + --cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cuda-coverage.xml" \ + --cov-report=term \ + tests +exitcode=$? + +if (( ${exitcode} != 0 )); then + SUITEERROR=${exitcode} + echo "FAILED: 1 or more tests in dask-cuda" +fi +popd + +rapids-logger "Run local benchmark" +python dask_cuda/benchmarks/local_cudf_shuffle.py \ + --partition-size="1 KiB" \ + -d 0 \ + --runs 1 \ + --backend dask +exitcode=$? + +if (( ${exitcode} != 0 )); then + SUITEERROR=${exitcode} + echo "FAILED: Local benchmark with dask comms" +fi + +python dask_cuda/benchmarks/local_cudf_shuffle.py \ + --partition-size="1 KiB" \ + -d 0 \ + --runs 1 \ + --backend explicit-comms +exitcode=$? + +if (( ${exitcode} != 0 )); then + SUITEERROR=${exitcode} + echo "FAILED: Local benchmark with explicit comms" +fi + +exit ${SUITEERROR} diff --git a/conda/recipes/dask-cuda/meta.yaml b/conda/recipes/dask-cuda/meta.yaml index a31628b23..cc26426d6 100644 --- a/conda/recipes/dask-cuda/meta.yaml +++ b/conda/recipes/dask-cuda/meta.yaml @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2021, NVIDIA CORPORATION. +# Copyright (c) 2019-2022, NVIDIA CORPORATION. # Usage: # conda build -c conda-forge . @@ -6,7 +6,7 @@ {% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %} {% set number = environ.get('GIT_DESCRIBE_NUMBER', 0) %} -{% set py_version = environ.get('CONDA_PY', 36) %} +{% set py_version = environ['CONDA_PY'] %} {% set git_hash = environ.get('GIT_DESCRIBE_HASH', '') %} package: @@ -41,10 +41,15 @@ requirements: test: imports: - dask_cuda - + commands: + - dask cuda --help + {% for e in data.get("project", {}).get("scripts", {}).keys() %} + - {{ e }} --help + - {{ e|replace("-", " ") }} --help + {% endfor %} about: - home: http://rapids.ai/ + home: https://rapids.ai/ license: Apache-2.0 license_file: ../../../LICENSE summary: dask-cuda library diff --git a/dask_cuda/benchmarks/common.py b/dask_cuda/benchmarks/common.py index 7c489d000..e734f882c 100644 --- a/dask_cuda/benchmarks/common.py +++ b/dask_cuda/benchmarks/common.py @@ -85,7 +85,8 @@ class Config(NamedTuple): def run_benchmark(client: Client, args: Namespace, config: Config): """Run a benchmark a specified number of times - If ``args.profile`` is set, the final run is profiled.""" + If ``args.profile`` is set, the final run is profiled. + """ results = [] for _ in range(max(1, args.runs) - 1): res = config.bench_once(client, args, write_profile=None) @@ -110,14 +111,18 @@ def gather_bench_results(client: Client, args: Namespace, config: Config): def run(client: Client, args: Namespace, config: Config): """Run the full benchmark on the cluster - Waits for the cluster, sets up memory pools, prints and saves results""" + Waits for the cluster, sets up memory pools, prints and saves results + """ + wait_for_cluster(client, shutdown_on_failure=True) + assert len(client.scheduler_info()["workers"]) > 0 setup_memory_pools( client, args.type == "gpu", args.rmm_pool_size, args.disable_rmm_pool, args.rmm_log_directory, + args.enable_rmm_statistics, ) address_to_index, results, message_data = gather_bench_results(client, args, config) p2p_bw = peer_to_peer_bandwidths(message_data, address_to_index) @@ -156,7 +161,8 @@ def run_client_from_existing_scheduler(args: Namespace, config: Config): def run_create_client(args: Namespace, config: Config): """Create a client + cluster and run - Shuts down the cluster at the end of the benchmark""" + Shuts down the cluster at the end of the benchmark + """ cluster_options = get_cluster_options(args) Cluster = cluster_options["class"] cluster_args = cluster_options["args"] diff --git a/dask_cuda/benchmarks/local_cudf_groupby.py b/dask_cuda/benchmarks/local_cudf_groupby.py index 0a142698a..4e9dea94e 100644 --- a/dask_cuda/benchmarks/local_cudf_groupby.py +++ b/dask_cuda/benchmarks/local_cudf_groupby.py @@ -107,6 +107,7 @@ def bench_once(client, args, write_profile=None): t1 = clock() agg = apply_groupby( df, + backend=args.backend, sort=args.sort, split_out=args.split_out, split_every=args.split_every, diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py index 7ff099cca..51ba48f93 100644 --- a/dask_cuda/benchmarks/local_cudf_shuffle.py +++ b/dask_cuda/benchmarks/local_cudf_shuffle.py @@ -1,13 +1,16 @@ import contextlib from collections import ChainMap from time import perf_counter +from typing import Tuple +import numpy as np import pandas as pd import dask -from dask import array as da +import dask.dataframe +from dask.dataframe.core import new_dd_object from dask.dataframe.shuffle import shuffle -from dask.distributed import performance_report, wait +from dask.distributed import Client, performance_report, wait from dask.utils import format_bytes, parse_bytes import dask_cuda.explicit_comms.dataframe.shuffle @@ -20,42 +23,99 @@ print_throughput_bandwidth, ) +try: + import cupy -def shuffle_dask(df, *, noop=False): - result = shuffle(df, index="data", shuffle="tasks") - if noop: + import cudf +except ImportError: + cupy = None + cudf = None + + +def shuffle_dask(df, args): + result = shuffle(df, index="data", shuffle="tasks", ignore_index=args.ignore_index) + if args.backend == "dask-noop": result = as_noop(result) t1 = perf_counter() wait(result.persist()) return perf_counter() - t1 -def shuffle_explicit_comms(df): +def shuffle_explicit_comms(df, args): t1 = perf_counter() wait( dask_cuda.explicit_comms.dataframe.shuffle.shuffle( - df, column_names="data" + df, column_names=["data"], ignore_index=args.ignore_index ).persist() ) return perf_counter() - t1 -def bench_once(client, args, write_profile=None): - # Generate random Dask dataframe - chunksize = args.partition_size // 8 # Convert bytes to float64 - nchunks = args.in_parts - totalsize = chunksize * nchunks - x = da.random.random((totalsize,), chunks=(chunksize,)) - df = dask.dataframe.from_dask_array(x, columns="data").to_frame() +def create_df(nelem, df_type): + if df_type == "cpu": + return pd.DataFrame({"data": np.random.random(nelem)}) + elif df_type == "gpu": + if cudf is None or cupy is None: + raise RuntimeError("`--type=gpu` requires cudf and cupy ") + return cudf.DataFrame({"data": cupy.random.random(nelem)}) + else: + raise ValueError(f"Unknown type {df_type}") + + +def create_data( + client: Client, args, name="balanced-df" +) -> Tuple[int, dask.dataframe.DataFrame]: + """Create an evenly distributed dask dataframe + + The partitions are perfectly distributed across workers, if the number of + requested partitions is evenly divisible by the number of workers. + """ + chunksize = args.partition_size // np.float64().nbytes + + workers = list(client.scheduler_info()["workers"].keys()) + assert len(workers) > 0 + + dist = args.partition_distribution + if dist is None: + # By default, we create a balanced distribution + dist = [args.in_parts // len(workers)] * len(workers) + for i in range(args.in_parts % len(workers)): + dist[i] += 1 + + if len(dist) != len(workers): + raise ValueError( + f"The length of `--devs`({len(dist)}) and " + f"`--partition-distribution`({len(workers)}) doesn't match" + ) + if sum(dist) != args.in_parts: + raise ValueError( + f"The sum of `--partition-distribution`({sum(dist)}) must match " + f"the number of input partitions `--in-parts={args.in_parts}`" + ) + + # Create partition based to the specified partition distribution + dsk = {} + for i, part_size in enumerate(dist): + for _ in range(part_size): + # We use `client.submit` to control placement of the partition. + dsk[(name, len(dsk))] = client.submit( + create_df, chunksize, args.type, workers=[workers[i]], pure=False + ) + wait(dsk.values()) + + df_meta = create_df(0, args.type) + divs = [None] * (len(dsk) + 1) + ret = new_dd_object(dsk, name, df_meta, divs).persist() + wait(ret) - if args.type == "gpu": - import cudf + data_processed = args.in_parts * args.partition_size + if not args.ignore_index: + data_processed += args.in_parts * chunksize * df_meta.index.dtype.itemsize + return data_processed, ret - df = df.map_partitions(cudf.from_pandas) - df = df.persist() - wait(df) - data_processed = len(df) * sum([t.itemsize for t in df.dtypes]) +def bench_once(client, args, write_profile=None): + data_processed, df = create_data(client, args) if write_profile is None: ctx = contextlib.nullcontext() @@ -64,9 +124,9 @@ def bench_once(client, args, write_profile=None): with ctx: if args.backend in {"dask", "dask-noop"}: - duration = shuffle_dask(df, noop=args.backend == "dask-noop") + duration = shuffle_dask(df, args) else: - duration = shuffle_explicit_comms(df) + duration = shuffle_explicit_comms(df, args) return (data_processed, duration) @@ -177,6 +237,20 @@ def parse_args(): "type": int, "help": "Number of runs", }, + { + "name": "--ignore-index", + "action": "store_true", + "help": "When shuffle, ignore the index", + }, + { + "name": "--partition-distribution", + "default": None, + "metavar": "PARTITION_SIZE_LIST", + "type": lambda x: [int(y) for y in x.split(",")], + "help": "Comma separated list defining the size of each partition, " + "which must have the same length as `--devs`. " + "If not set, a balanced distribution is used.", + }, ] return parse_benchmark_args( diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index 8a8419cd3..1d07df30c 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -105,6 +105,13 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[] help="Directory to write worker and scheduler RMM log files to. " "Logging is only enabled if RMM memory pool is enabled.", ) + cluster_args.add_argument( + "--enable-rmm-statistics", + action="store_true", + help="Use RMM's StatisticsResourceAdaptor to gather allocation statistics. " + "This enables spilling implementations such as JIT-Unspill to provides more " + "information on out-of-memory errors", + ) cluster_args.add_argument( "--enable-tcp-over-ucx", default=None, @@ -340,6 +347,7 @@ def setup_memory_pool( pool_size=None, disable_pool=False, log_directory=None, + statistics=False, ): import cupy @@ -358,9 +366,15 @@ def setup_memory_pool( log_file_name=get_rmm_log_file_name(dask_worker, logging, log_directory), ) cupy.cuda.set_allocator(rmm.rmm_cupy_allocator) + if statistics: + rmm.mr.set_current_device_resource( + rmm.mr.StatisticsResourceAdaptor(rmm.mr.get_current_device_resource()) + ) -def setup_memory_pools(client, is_gpu, pool_size, disable_pool, log_directory): +def setup_memory_pools( + client, is_gpu, pool_size, disable_pool, log_directory, statistics +): if not is_gpu: return client.run( @@ -368,6 +382,7 @@ def setup_memory_pools(client, is_gpu, pool_size, disable_pool, log_directory): pool_size=pool_size, disable_pool=disable_pool, log_directory=log_directory, + statistics=statistics, ) # Create an RMM pool on the scheduler due to occasional deserialization # of CUDA objects. May cause issues with InfiniBand otherwise. @@ -376,6 +391,7 @@ def setup_memory_pools(client, is_gpu, pool_size, disable_pool, log_directory): pool_size=1e9, disable_pool=disable_pool, log_directory=log_directory, + statistics=statistics, ) @@ -632,7 +648,7 @@ def bandwidth_statistics( logs: the ``dask_worker.incoming_transfer_log`` object ignore_size: int (optional) - ignore messsages whose total byte count is smaller than this + ignore messages whose total byte count is smaller than this value (if provided) Returns diff --git a/dask_cuda/cli/dask_cuda_worker.py b/dask_cuda/cli.py old mode 100755 new mode 100644 similarity index 81% rename from dask_cuda/cli/dask_cuda_worker.py rename to dask_cuda/cli.py index 62faeddb6..b7069d632 --- a/dask_cuda/cli/dask_cuda_worker.py +++ b/dask_cuda/cli.py @@ -5,25 +5,62 @@ import click from tornado.ioloop import IOLoop, TimeoutError -from dask import config +from dask import config as dask_config +from distributed import Client from distributed.cli.utils import install_signal_handlers from distributed.preloading import validate_preload_argv from distributed.security import Security from distributed.utils import import_term -from ..cuda_worker import CUDAWorker +from .cuda_worker import CUDAWorker +from .utils import print_cluster_config logger = logging.getLogger(__name__) pem_file_option_type = click.Path(exists=True, resolve_path=True) - - -@click.command(context_settings=dict(ignore_unknown_options=True)) -@click.argument("scheduler", type=str, required=False) -@click.argument( +scheduler = click.argument("scheduler", type=str, required=False) +preload_argv = click.argument( "preload_argv", nargs=-1, type=click.UNPROCESSED, callback=validate_preload_argv ) +scheduler_file = click.option( + "--scheduler-file", + type=str, + default=None, + help="""Filename to JSON encoded scheduler information. To be used in conjunction + with the equivalent ``dask scheduler`` option.""", +) +tls_ca_file = click.option( + "--tls-ca-file", + type=pem_file_option_type, + default=None, + help="""CA certificate(s) file for TLS (in PEM format). Can be a string (like + ``"path/to/certs"``), or ``None`` for no certificate(s).""", +) +tls_cert = click.option( + "--tls-cert", + type=pem_file_option_type, + default=None, + help="""Certificate file for TLS (in PEM format). Can be a string (like + ``"path/to/certs"``), or ``None`` for no certificate(s).""", +) +tls_key = click.option( + "--tls-key", + type=pem_file_option_type, + default=None, + help="""Private key file for TLS (in PEM format). Can be a string (like + ``"path/to/certs"``), or ``None`` for no private key.""", +) + + +@click.group +def cuda(): + """Subcommands to launch or query distributed workers with GPUs.""" + + +@cuda.command(name="worker", context_settings=dict(ignore_unknown_options=True)) +@scheduler +@preload_argv @click.option( "--host", type=str, @@ -100,7 +137,7 @@ "--rmm-async/--no-rmm-async", default=False, show_default=True, - help="""Initialize each worker withh RMM and set it to use RMM's asynchronous + help="""Initialize each worker with RMM and set it to use RMM's asynchronous allocator. See ``rmm.mr.CudaAsyncMemoryResource`` for more info. .. warning:: @@ -174,13 +211,7 @@ specified by `"jit-unspill-shared-fs"`. Notice, a shared filesystem must support the `os.link()` operation.""", ) -@click.option( - "--scheduler-file", - type=str, - default=None, - help="""Filename to JSON encoded scheduler information. To be used in conjunction - with the equivalent ``dask-scheduler`` option.""", -) +@scheduler_file @click.option( "--protocol", type=str, default=None, help="Protocol like tcp, tls, or ucx" ) @@ -208,27 +239,9 @@ help="""Prefix for the dashboard. Can be a string (like ...) or ``None`` for no prefix.""", ) -@click.option( - "--tls-ca-file", - type=pem_file_option_type, - default=None, - help="""CA certificate(s) file for TLS (in PEM format). Can be a string (like - ``"path/to/certs"``), or ``None`` for no certificate(s).""", -) -@click.option( - "--tls-cert", - type=pem_file_option_type, - default=None, - help="""Certificate file for TLS (in PEM format). Can be a string (like - ``"path/to/certs"``), or ``None`` for no certificate(s).""", -) -@click.option( - "--tls-key", - type=pem_file_option_type, - default=None, - help="""Private key file for TLS (in PEM format). Can be a string (like - ``"path/to/certs"``), or ``None`` for no private key.""", -) +@tls_ca_file +@tls_cert +@tls_key @click.option( "--enable-tcp-over-ucx/--disable-tcp-over-ucx", default=None, @@ -288,7 +301,7 @@ type=click.Choice(["spawn", "fork", "forkserver"]), help="""Method used to start new processes with multiprocessing""", ) -def main( +def worker( scheduler, host, nthreads, @@ -324,6 +337,15 @@ def main( multiprocessing_method, **kwargs, ): + """Launch a distributed worker with GPUs attached to an existing scheduler. + + A scheduler can be specified either through a URI passed through the ``SCHEDULER`` + argument or a scheduler file passed through the ``--scheduler-file`` option. + + See + https://docs.rapids.ai/api/dask-cuda/stable/quickstart.html#dask-cuda-worker + for info. + """ if multiprocessing_method == "forkserver": import multiprocessing.forkserver as f @@ -347,7 +369,7 @@ def main( if worker_class is not None: worker_class = import_term(worker_class) - with config.set( + with dask_config.set( {"distributed.worker.multiprocessing-method": multiprocessing_method} ): worker = CUDAWorker( @@ -404,9 +426,57 @@ async def run(): logger.info("End worker") -def go(): - main() +@cuda.command(name="config", context_settings=dict(ignore_unknown_options=True)) +@scheduler +@preload_argv +@scheduler_file +@tls_ca_file +@tls_cert +@tls_key +def config( + scheduler, + scheduler_file, + tls_ca_file, + tls_cert, + tls_key, + **kwargs, +): + """Query an existing GPU cluster's configuration. + + A cluster can be specified either through a URI passed through the ``SCHEDULER`` + argument or a scheduler file passed through the ``--scheduler-file`` option. + """ + if ( + scheduler is None + and scheduler_file is None + and dask_config.get("scheduler-address", None) is None + ): + raise ValueError( + "No scheduler specified. A scheduler can be specified by " + "passing an address through the SCHEDULER argument or " + "'dask.scheduler-address' config option, or by passing the " + "location of a scheduler file through the --scheduler-file " + "option" + ) + + if isinstance(scheduler, str) and scheduler.startswith("-"): + raise ValueError( + "The scheduler address can't start with '-'. Please check " + "your command line arguments, you probably attempted to use " + "unsupported one. Scheduler address: %s" % scheduler + ) + if tls_ca_file and tls_cert and tls_key: + security = Security( + tls_ca_file=tls_ca_file, + tls_worker_cert=tls_cert, + tls_worker_key=tls_key, + ) + else: + security = None -if __name__ == "__main__": - go() + if scheduler_file is not None: + client = Client(scheduler_file=scheduler_file, security=security) + else: + client = Client(scheduler, security=security) + print_cluster_config(client) diff --git a/dask_cuda/cli/__init__.py b/dask_cuda/cli/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/dask_cuda/cli/dask_config.py b/dask_cuda/cli/dask_config.py deleted file mode 100755 index 51c9aa2bc..000000000 --- a/dask_cuda/cli/dask_config.py +++ /dev/null @@ -1,95 +0,0 @@ -from __future__ import absolute_import, division, print_function - -import logging - -import click - -from distributed import Client -from distributed.preloading import validate_preload_argv -from distributed.security import Security - -from ..utils import print_cluster_config - -logger = logging.getLogger(__name__) - - -pem_file_option_type = click.Path(exists=True, resolve_path=True) - - -@click.command(context_settings=dict(ignore_unknown_options=True)) -@click.argument("scheduler", type=str, required=False) -@click.argument( - "preload_argv", nargs=-1, type=click.UNPROCESSED, callback=validate_preload_argv -) -@click.option( - "--scheduler-file", - type=str, - default=None, - help="""Filename to JSON encoded scheduler information. To be used in conjunction - with the equivalent ``dask-scheduler`` option.""", -) -@click.option( - "--get-cluster-configuration", - "get_cluster_conf", - default=False, - is_flag=True, - required=False, - show_default=True, - help="""Print a table of the current cluster configuration""", -) -@click.option( - "--tls-ca-file", - type=pem_file_option_type, - default=None, - help="""CA certificate(s) file for TLS (in PEM format). Can be a string (like - ``"path/to/certs"``), or ``None`` for no certificate(s).""", -) -@click.option( - "--tls-cert", - type=pem_file_option_type, - default=None, - help="""Certificate file for TLS (in PEM format). Can be a string (like - ``"path/to/certs"``), or ``None`` for no certificate(s).""", -) -@click.option( - "--tls-key", - type=pem_file_option_type, - default=None, - help="""Private key file for TLS (in PEM format). Can be a string (like - ``"path/to/certs"``), or ``None`` for no private key.""", -) -def main( - scheduler, - scheduler_file, - get_cluster_conf, - tls_ca_file, - tls_cert, - tls_key, - **kwargs, -): - if tls_ca_file and tls_cert and tls_key: - security = Security( - tls_ca_file=tls_ca_file, - tls_worker_cert=tls_cert, - tls_worker_key=tls_key, - ) - else: - security = None - - if isinstance(scheduler, str) and scheduler.startswith("-"): - raise ValueError( - "The scheduler address can't start with '-'. Please check " - "your command line arguments, you probably attempted to use " - "unsupported one. Scheduler address: %s" % scheduler - ) - - if get_cluster_conf: - if scheduler_file is not None: - client = Client(scheduler_file=scheduler_file, security=security) - else: - client = Client(scheduler, security=security) - print_cluster_config(client) - - -if __name__ == "__main__": - main() diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index 5e14aba8d..03b16b529 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -16,7 +16,6 @@ enable_proctitle_on_children, enable_proctitle_on_current, ) -from distributed.utils import has_arg from distributed.worker_memory import parse_memory_limit from .device_host_file import DeviceHostFile @@ -86,16 +85,10 @@ def __init__( raise ValueError("nthreads must be higher than 0.") # Set nthreads=1 when parsing mem_limit since it only depends on nprocs - if has_arg(parse_memory_limit, "logger"): - # TODO: Remove has_arg check after 2022.11.1 support is dropped - logger = logging.getLogger(__name__) - memory_limit = parse_memory_limit( - memory_limit=memory_limit, nthreads=1, total_cores=nprocs, logger=logger - ) - else: - memory_limit = parse_memory_limit( - memory_limit=memory_limit, nthreads=1, total_cores=nprocs - ) + logger = logging.getLogger(__name__) + memory_limit = parse_memory_limit( + memory_limit=memory_limit, nthreads=1, total_cores=nprocs, logger=logger + ) if pid_file: with open(pid_file, "w") as f: @@ -118,13 +111,16 @@ def del_pid_file(): kwargs = {"worker_port": None, "listen_address": None, **kwargs} if ( - not scheduler - and not scheduler_file + scheduler is None + and scheduler_file is None and dask.config.get("scheduler-address", None) is None ): raise ValueError( - "Need to provide scheduler address like\n" - "dask-worker SCHEDULER_ADDRESS:8786" + "No scheduler specified. A scheduler can be specified by " + "passing an address through the SCHEDULER argument or " + "'dask.scheduler-address' config option, or by passing the " + "location of a scheduler file through the --scheduler-file " + "option" ) if isinstance(scheduler, Cluster): diff --git a/dask_cuda/disk_io.py b/dask_cuda/disk_io.py index 7ccda0f3f..0427b77f0 100644 --- a/dask_cuda/disk_io.py +++ b/dask_cuda/disk_io.py @@ -96,8 +96,8 @@ class SpillToDiskProperties: def __init__( self, root_dir: Union[str, os.PathLike], - shared_filesystem: bool = None, - gds: bool = None, + shared_filesystem: Optional[bool] = None, + gds: Optional[bool] = None, ): """ Parameters diff --git a/dask_cuda/explicit_comms/comms.py b/dask_cuda/explicit_comms/comms.py index 0ebd7f0ce..05dbc9619 100644 --- a/dask_cuda/explicit_comms/comms.py +++ b/dask_cuda/explicit_comms/comms.py @@ -180,7 +180,7 @@ def __init__(self, client: Optional[Client] = None): self.sessionId = uuid.uuid4().int # Get address of all workers (not Nanny addresses) - self.worker_addresses = list(self.client.run(lambda: 42).keys()) + self.worker_addresses = list(self.client.scheduler_info()["workers"].keys()) # Make all workers listen and get all listen addresses self.worker_direct_addresses = [] diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 6099025dd..4b240d2f1 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -4,11 +4,10 @@ import functools import inspect from collections import defaultdict +from math import ceil from operator import getitem from typing import Any, Callable, Dict, List, Optional, Set, TypeVar -import numpy - import dask import dask.dataframe from dask.base import tokenize @@ -23,9 +22,73 @@ T = TypeVar("T") +Proxify = Callable[[T], T] + + +def get_proxify(worker: Worker) -> Proxify: + """Get function to proxify objects""" + from dask_cuda.proxify_host_file import ProxifyHostFile + + if isinstance(worker.data, ProxifyHostFile): + # Notice, we know that we never call proxify() on the same proxied + # object thus we can speedup the call by setting `duplicate_check=False` + return lambda x: worker.data.manager.proxify(x, duplicate_check=False)[0] + return lambda x: x # no-op + + +def get_no_comm_postprocess( + stage: Dict[str, Any], num_rounds: int, batchsize: int, proxify: Proxify +) -> Callable[[DataFrame], DataFrame]: + """Get function for post-processing partitions not communicated + + In cuDF, the `group_split_dispatch` uses `scatter_by_map` to create + the partitions, which is implemented by splitting a single base dataframe + into multiple partitions. This means that memory are not freed until + ALL partitions are deleted. + + In order to free memory ASAP, we can deep copy partitions NOT being + communicated. We do this when `num_rounds != batchsize`. + + Parameters + ---------- + stage + The staged input dataframes. + num_rounds + Number of rounds of dataframe partitioning and all-to-all communication. + batchsize + Number of partitions each worker will handle in each round. + proxify + Function to proxify object. + + Returns + ------- + Function to be called on partitions not communicated. + + """ + if num_rounds == batchsize: + return lambda x: x + + # Check that we are shuffling a cudf dataframe + try: + import cudf + except ImportError: + return lambda x: x + if not stage or not isinstance(next(iter(stage.values())), cudf.DataFrame): + return lambda x: x + + # Deep copying a cuDF dataframe doesn't deep copy its index hence + # we have to do it explicitly. + return lambda x: proxify( + x._from_data( + x._data.copy(deep=True), + x._index.copy(deep=True), + ) + ) + + async def send( eps, - myrank, + myrank: int, rank_to_out_part_ids: Dict[int, Set[int]], out_part_id_to_dataframe: Dict[int, DataFrame], ) -> None: @@ -43,10 +106,10 @@ async def send( async def recv( eps, - myrank, + myrank: int, rank_to_out_part_ids: Dict[int, Set[int]], out_part_id_to_dataframe_list: Dict[int, List[DataFrame]], - proxify, + proxify: Proxify, ) -> None: """Notice, received items are appended to `out_parts_list`""" @@ -60,17 +123,9 @@ async def read_msg(rank: int) -> None: ) -def get_proxify(worker: Worker) -> Callable[[T], T]: - """Get function to proxify objects""" - from dask_cuda.proxify_host_file import ProxifyHostFile - - if isinstance(worker.data, ProxifyHostFile): - data = worker.data - return lambda x: data.manager.proxify(x)[0] - return lambda x: x # no-op - - -def compute_map_index(df: Any, column_names, npartitions) -> Series: +def compute_map_index( + df: DataFrame, column_names: List[str], npartitions: int +) -> Series: """Return a Series that maps each row `df` to a partition ID The partitions are determined by hashing the columns given by column_names @@ -79,17 +134,17 @@ def compute_map_index(df: Any, column_names, npartitions) -> Series: Parameters ---------- - df: DataFrame - column_names: list of strings + df + The dataframe. + column_names List of column names on which we want to split. - npartitions: int or None + npartitions The desired number of output partitions. Returns ------- - out: Dict[int, DataFrame] - A dictionary mapping integers in {0..k} to dataframes such that the - hash values of `df[col]` are well partitioned. + Series + Series that maps each row `df` to a partition ID """ if column_names[0] == "_partitions": @@ -98,61 +153,90 @@ def compute_map_index(df: Any, column_names, npartitions) -> Series: ind = hash_object_dispatch( df[column_names] if column_names else df, index=False ) - typ = numpy.min_scalar_type(npartitions * 2) - return (ind % npartitions).astype(typ, copy=False) + return ind % npartitions -def single_shuffle_group( - df: DataFrame, column_names, npartitions, ignore_index +def partition_dataframe( + df: DataFrame, column_names: List[str], npartitions: int, ignore_index: bool ) -> Dict[int, DataFrame]: - """Split dataframe based on the indexes returned by `compute_map_index`""" + """Partition dataframe to a dict of dataframes + + The partitions are determined by hashing the columns given by column_names + unless `column_names[0] == "_partitions"`, in which case the values of + `column_names[0]` are used as index. + + Parameters + ---------- + df + The dataframe to partition + column_names + List of column names on which we want to partition. + npartitions + The desired number of output partitions. + ignore_index + Ignore index during shuffle. If True, performance may improve, + but index values will not be preserved. + + Returns + ------- + partitions + Dict of dataframe-partitions, mapping partition-ID to dataframe + """ + if column_names[0] != "_partitions" and hasattr(df, "partition_by_hash"): + return dict( + zip( + range(npartitions), + df.partition_by_hash( + column_names, npartitions, keep_index=not ignore_index + ), + ) + ) map_index = compute_map_index(df, column_names, npartitions) return group_split_dispatch(df, map_index, npartitions, ignore_index=ignore_index) -def multi_shuffle_group( - df_meta: DataFrame, - dfs: Dict[str, DataFrame], - column_names, - npartitions, - ignore_index, - proxify, +def create_partitions( + stage: Dict[str, Any], + batchsize: int, + column_names: List[str], + npartitions: int, + ignore_index: bool, + proxify: Proxify, ) -> Dict[int, DataFrame]: - """Split multiple dataframes such that each partition hashes to the same - - Since we concatenate dataframes belonging to the same partition, each - partition ID maps to exactly one dataframe. + """Create partitions from one or more staged dataframes Parameters ---------- - df_meta: DataFrame - An empty dataframe matching the expected output - dfs: dict of dataframes - The dataframes to split given as a map of stage keys to dataframes - column_names: list of strings + stage + The staged input dataframes + column_names List of column names on which we want to split. - npartitions: int or None + npartitions The desired number of output partitions. - ignore_index: bool + ignore_index Ignore index during shuffle. If True, performance may improve, but index values will not be preserved. - proxify: callable + proxify Function to proxify object. Returns ------- - dict of DataFrames - Mapping from partition ID to dataframe. + partitions: list of DataFrames + List of dataframe-partitions """ + if not stage: + return {} + batchsize = min(len(stage), batchsize) + # Grouping each input dataframe, one part for each partition ID. dfs_grouped: List[Dict[int, DataFrame]] = [] - while dfs: + for _ in range(batchsize): dfs_grouped.append( proxify( - single_shuffle_group( + partition_dataframe( # pop dataframe in any order, to free staged memory ASAP - dfs.popitem()[1], + stage.popitem()[1], column_names, npartitions, ignore_index, @@ -165,24 +249,82 @@ def multi_shuffle_group( ret: Dict[int, DataFrame] = {} for i in range(npartitions): # Iterate over all possible output partition IDs t = [df_grouped[i] for df_grouped in dfs_grouped] + assert len(t) > 0 if len(t) == 1: ret[i] = t[0] elif len(t) > 1: ret[i] = proxify(dd_concat(t, ignore_index=ignore_index)) - else: - ret[i] = df_meta # Empty dataframe return ret +async def send_recv_partitions( + eps: dict, + myrank: int, + rank_to_out_part_ids: Dict[int, Set[int]], + out_part_id_to_dataframe: Dict[int, DataFrame], + no_comm_postprocess: Callable[[DataFrame], DataFrame], + proxify: Proxify, + out_part_id_to_dataframe_list: Dict[int, List[DataFrame]], +) -> None: + """Send and receive (all-to-all) partitions between all workers + + Parameters + ---------- + eps + Communication endpoints to the other workers. + myrank + The rank of this worker. + rank_to_out_part_ids + dict that for each worker rank specifies a set of output partition IDs. + If the worker shouldn't return any partitions, it is excluded from the + dict. Partition IDs are global integers `0..npartitions` and corresponds + to the dict keys returned by `group_split_dispatch`. + out_part_id_to_dataframe + Mapping from partition ID to dataframe. This dict is cleared on return. + no_comm_postprocess + Function to post-process partitions not communicated. + See `get_no_comm_postprocess` + proxify + Function to proxify object. + out_part_id_to_dataframe_list + The **output** of this function, which is a dict of the partitions owned by + this worker. + """ + await asyncio.gather( + recv( + eps, + myrank, + rank_to_out_part_ids, + out_part_id_to_dataframe_list, + proxify, + ), + send(eps, myrank, rank_to_out_part_ids, out_part_id_to_dataframe), + ) + + # At this point `send()` should have pop'ed all output partitions + # beside the partitions owned be `myrank` (if any). + assert ( + rank_to_out_part_ids[myrank] == out_part_id_to_dataframe.keys() + or not out_part_id_to_dataframe + ) + # We can now add them to the output dataframes. + for out_part_id, dataframe in out_part_id_to_dataframe.items(): + out_part_id_to_dataframe_list[out_part_id].append( + no_comm_postprocess(dataframe) + ) + out_part_id_to_dataframe.clear() + + async def shuffle_task( s, - stage_name, - df_meta, + stage_name: str, rank_to_inkeys: Dict[int, set], rank_to_out_part_ids: Dict[int, Set[int]], - column_names, - npartitions, - ignore_index, + column_names: List[str], + npartitions: int, + ignore_index: bool, + num_rounds: int, + batchsize: int, ) -> List[DataFrame]: """Explicit-comms shuffle task @@ -195,19 +337,23 @@ async def shuffle_task( stage_name: str Name of the stage to retrieve the input keys from. rank_to_inkeys: dict - dict that for each worker rank specifices the set of staged input keys. + dict that for each worker rank specifies the set of staged input keys. rank_to_out_part_ids: dict - dict that for each worker rank specifices a set of output partition IDs. + dict that for each worker rank specifies a set of output partition IDs. If the worker shouldn't return any partitions, it is excluded from the dict. Partition IDs are global integers `0..npartitions` and corresponds to the dict keys returned by `group_split_dispatch`. column_names: list of strings List of column names on which we want to split. - npartitions: int or None + npartitions: int The desired number of output partitions. ignore_index: bool Ignore index during shuffle. If True, performance may improve, but index values will not be preserved. + num_rounds: int + Number of rounds of dataframe partitioning and all-to-all communication. + batchsize: int + Number of partitions each worker will handle in each round. Returns ------- @@ -216,42 +362,42 @@ async def shuffle_task( """ proxify = get_proxify(s["worker"]) - myrank = s["rank"] eps = s["eps"] + myrank: int = s["rank"] stage = comms.pop_staging_area(s, stage_name) assert stage.keys() == rank_to_inkeys[myrank] + no_comm_postprocess = get_no_comm_postprocess(stage, num_rounds, batchsize, proxify) - out_part_id_to_dataframe = multi_shuffle_group( - df_meta=df_meta, - dfs=stage, - column_names=column_names, - npartitions=npartitions, - ignore_index=ignore_index, - proxify=proxify, - ) - - # Communicate all the dataframe-partitions all-to-all. The result is - # `out_part_id_to_dataframe_list` that for each output partition maps - # a list of dataframes received. out_part_id_to_dataframe_list: Dict[int, List[DataFrame]] = defaultdict(list) - await asyncio.gather( - recv(eps, myrank, rank_to_out_part_ids, out_part_id_to_dataframe_list, proxify), - send(eps, myrank, rank_to_out_part_ids, out_part_id_to_dataframe), - ) - - # At this point `send()` should have pop'ed all output partitions - # beside the partitions owned be `myrank`. - assert rank_to_out_part_ids[myrank] == out_part_id_to_dataframe.keys() - # We can now add them to the output dataframes. - for out_part_id, dataframe in out_part_id_to_dataframe.items(): - out_part_id_to_dataframe_list[out_part_id].append(dataframe) - del out_part_id_to_dataframe + for _ in range(num_rounds): + partitions = create_partitions( + stage, batchsize, column_names, npartitions, ignore_index, proxify + ) + await send_recv_partitions( + eps, + myrank, + rank_to_out_part_ids, + partitions, + no_comm_postprocess, + proxify, + out_part_id_to_dataframe_list, + ) # Finally, we concatenate the output dataframes into the final output partitions - return [ - proxify(dd_concat(dfs, ignore_index=ignore_index)) - for dfs in out_part_id_to_dataframe_list.values() - ] + ret = [] + while out_part_id_to_dataframe_list: + ret.append( + proxify( + dd_concat( + out_part_id_to_dataframe_list.popitem()[1], + ignore_index=ignore_index, + ) + ) + ) + # For robustness, we yield this task to give Dask a chance to do bookkeeping + # such as letting the Worker answer heartbeat requests + await asyncio.sleep(0) + return ret def shuffle( @@ -259,6 +405,7 @@ def shuffle( column_names: List[str], npartitions: Optional[int] = None, ignore_index: bool = False, + batchsize: Optional[int] = None, ) -> DataFrame: """Order divisions of DataFrame so that all values within column(s) align @@ -283,6 +430,15 @@ def shuffle( ignore_index: bool Ignore index during shuffle. If True, performance may improve, but index values will not be preserved. + batchsize: int + A shuffle consist of multiple rounds where each worker partitions and + then all-to-all communicates a number of its dataframe partitions. The batch + size is the number of partitions each worker will handle in each round. + If -1, each worker will handle all its partitions in a single round and + all techniques to reduce memory usage are disabled, which might be faster + when memory pressure isn't an issue. + If None, the value of `DASK_EXPLICIT_COMMS_BATCHSIZE` is used or 1 if not + set thus by default, we prioritize robustness over performance. Returns ------- @@ -324,6 +480,20 @@ def shuffle( rank_to_inkeys = c.stage_keys(name=name, keys=df.__dask_keys__()) c.client.cancel(df) + # Get batchsize + max_num_inkeys = max(len(k) for k in rank_to_inkeys.values()) + batchsize = batchsize or dask.config.get("explicit-comms-batchsize", 1) + if batchsize == -1: + batchsize = max_num_inkeys + if not isinstance(batchsize, int) or batchsize < 0: + raise ValueError( + "explicit-comms-batchsize must be a " + f"positive integer or -1 (was '{batchsize}')" + ) + + # Get number of rounds of dataframe partitioning and all-to-all communication. + num_rounds = ceil(max_num_inkeys / batchsize) + # Find the output partition IDs for each worker div = npartitions // len(ranks) rank_to_out_part_ids: Dict[int, Set[int]] = {} # rank -> set of partition id @@ -332,19 +502,20 @@ def shuffle( for rank, i in zip(ranks, range(div * len(ranks), npartitions)): rank_to_out_part_ids[rank].add(i) - # Run `_shuffle()` on each worker + # Run a shuffle task on each worker shuffle_result = {} for rank in ranks: shuffle_result[rank] = c.submit( c.worker_addresses[rank], shuffle_task, name, - df_meta, rank_to_inkeys, rank_to_out_part_ids, column_names, npartitions, ignore_index, + num_rounds, + batchsize, ) wait(list(shuffle_result.values())) diff --git a/dask_cuda/get_device_memory_objects.py b/dask_cuda/get_device_memory_objects.py index 44dc433ff..c5746c862 100644 --- a/dask_cuda/get_device_memory_objects.py +++ b/dask_cuda/get_device_memory_objects.py @@ -51,8 +51,8 @@ def get_device_memory_objects_default(obj): return dispatch(obj._pxy_get().obj) if hasattr(obj, "data"): return dispatch(obj.data) - owner = getattr(obj, "owner", None) or getattr(obj, "_owner", None) - if owner: + owner = getattr(obj, "owner", getattr(obj, "_owner", None)) + if owner is not None: return dispatch(owner) if hasattr(obj, "__cuda_array_interface__"): return [obj] diff --git a/dask_cuda/initialize.py b/dask_cuda/initialize.py index f03f99ec5..0b9c92a59 100644 --- a/dask_cuda/initialize.py +++ b/dask_cuda/initialize.py @@ -30,7 +30,7 @@ def _create_cuda_context(): try: distributed.comm.ucx.init_once() except ModuleNotFoundError: - # UCX intialization has to be delegated to Distributed, it will take care + # UCX initialization has to be delegated to Distributed, it will take care # of setting correct environment variables and importing `ucp` after that. # Therefore if ``import ucp`` fails we can just continue here. pass @@ -73,7 +73,7 @@ def initialize( To ensure UCX works correctly, it is important to ensure it is initialized with the correct options. This is especially important for the client, which cannot be configured to use UCX with arguments like ``LocalCUDACluster`` and - ``dask-cuda-worker``. This function will ensure that they are provided a UCX + ``dask cuda worker``. This function will ensure that they are provided a UCX configuration based on the flags and options passed by the user. This function can also be used within a worker preload script for UCX configuration diff --git a/dask_cuda/is_spillable_object.py b/dask_cuda/is_spillable_object.py index 9e337aa82..cb85248e5 100644 --- a/dask_cuda/is_spillable_object.py +++ b/dask_cuda/is_spillable_object.py @@ -40,7 +40,7 @@ def is_device_object_cudf_index(s): def cudf_spilling_status() -> Optional[bool]: - """Check the status of cudf's build-in spilling + """Check the status of cudf's built-in spilling Returns: - True if cudf's internal spilling is enabled, or diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index ff93532d3..fa532b5f0 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -5,7 +5,6 @@ import dask from distributed import LocalCluster, Nanny, Worker -from distributed.utils import has_arg from distributed.worker_memory import parse_memory_limit from .device_host_file import DeviceHostFile @@ -125,7 +124,7 @@ class LocalCUDACluster(LocalCluster): Managed memory is currently incompatible with NVLink. Trying to enable both will result in an exception. rmm_async: bool, default False - Initialize each worker withh RMM and set it to use RMM's asynchronous allocator. + Initialize each worker with RMM and set it to use RMM's asynchronous allocator. See ``rmm.mr.CudaAsyncMemoryResource`` for more info. .. warning:: @@ -233,19 +232,13 @@ def __init__( if n_workers < 1: raise ValueError("Number of workers cannot be less than 1.") # Set nthreads=1 when parsing mem_limit since it only depends on n_workers - if has_arg(parse_memory_limit, "logger"): - # TODO: Remove has_arg check after 2022.11.1 support is dropped - logger = logging.getLogger(__name__) - self.memory_limit = parse_memory_limit( - memory_limit=memory_limit, - nthreads=1, - total_cores=n_workers, - logger=logger, - ) - else: - self.memory_limit = parse_memory_limit( - memory_limit=memory_limit, nthreads=1, total_cores=n_workers - ) + logger = logging.getLogger(__name__) + self.memory_limit = parse_memory_limit( + memory_limit=memory_limit, + nthreads=1, + total_cores=n_workers, + logger=logger, + ) self.device_memory_limit = parse_device_memory_limit( device_memory_limit, device_index=nvml_device_index(0, CUDA_VISIBLE_DEVICES) ) diff --git a/dask_cuda/proxify_device_objects.py b/dask_cuda/proxify_device_objects.py index 923e7cf8e..a8b8a45df 100644 --- a/dask_cuda/proxify_device_objects.py +++ b/dask_cuda/proxify_device_objects.py @@ -19,7 +19,7 @@ def _register_incompatible_types(): """Lazy register types that ProxifyHostFile should unproxify on retrieval. It reads the config key "jit-unspill-incompatible" - (DASK_JIT_UNSPILL_INCOMPATIBLE), which should be a comma seperated + (DASK_JIT_UNSPILL_INCOMPATIBLE), which should be a comma separated list of types. The default value is: DASK_JIT_UNSPILL_INCOMPATIBLE="cupy.ndarray" """ @@ -51,8 +51,8 @@ def f(paths): def proxify_device_objects( obj: T, - proxied_id_to_proxy: MutableMapping[int, ProxyObject] = None, - found_proxies: List[ProxyObject] = None, + proxied_id_to_proxy: Optional[MutableMapping[int, ProxyObject]] = None, + found_proxies: Optional[List[ProxyObject]] = None, excl_proxies: bool = False, mark_as_explicit_proxies: bool = False, ) -> T: @@ -135,7 +135,9 @@ def unproxify_device_objects( pxy = obj._pxy_get(copy=True) if only_incompatible_types: if incompatible_types and isinstance(obj, incompatible_types): - obj = obj._pxy_deserialize(maybe_evict=False, proxy_detail=pxy) + obj = obj._pxy_deserialize( # type: ignore + maybe_evict=False, proxy_detail=pxy + ) elif not skip_explicit_proxies or not pxy.explicit_proxy: pxy.explicit_proxy = False obj = obj._pxy_deserialize(maybe_evict=False, proxy_detail=pxy) diff --git a/dask_cuda/proxify_host_file.py b/dask_cuda/proxify_host_file.py index f258776e5..04716a2ba 100644 --- a/dask_cuda/proxify_host_file.py +++ b/dask_cuda/proxify_host_file.py @@ -43,6 +43,7 @@ from .is_spillable_object import cudf_spilling_status from .proxify_device_objects import proxify_device_objects, unproxify_device_objects from .proxy_object import ProxyObject +from .utils import get_rmm_device_memory_usage T = TypeVar("T") @@ -163,7 +164,7 @@ class ProxiesOnDevice(Proxies): In this case the tally of the total device memory usage is incorrect. """ - def __init__(self): + def __init__(self) -> None: super().__init__() self.proxy_id_to_dev_mems: Dict[int, Set[DeviceMemoryId]] = {} self.dev_mem_to_proxy_ids: DefaultDict[DeviceMemoryId, Set[int]] = defaultdict( @@ -321,20 +322,45 @@ def validate(self): header, _ = pxy.obj assert header["serializer"] == pxy.serializer - def proxify(self, obj: T) -> Tuple[T, bool]: + def proxify(self, obj: T, duplicate_check=True) -> Tuple[T, bool]: """Proxify `obj` and add found proxies to the `Proxies` collections + Search through `obj` and wrap all CUDA device objects in ProxyObject. + If duplicate_check is True, identical CUDA device objects found in + `obj` are wrapped by the same ProxyObject. + Returns the proxified object and a boolean, which is `True` when one or more incompatible-types were found. + + Parameters + ---------- + obj + Object to search through or wrap in a ProxyObject. + duplicate_check + Make sure that identical CUDA device objects found in `obj` are + wrapped by the same ProxyObject. This check comes with a significant + overhead hence it is recommended setting to False when it is known + that no duplicate exist. + + Return + ------ + obj + The proxified object. + bool + Whether incompatible-types were found or not. """ + incompatible_type_found = False with self.lock: found_proxies: List[ProxyObject] = [] - # In order detect already proxied object, proxify_device_objects() - # needs a mapping from proxied objects to their proxy objects. - proxied_id_to_proxy = { - id(p._pxy_get().obj): p for p in self._dev.get_proxies() - } + if duplicate_check: + # In order to detect already proxied object, proxify_device_objects() + # needs a mapping from proxied objects to their proxy objects. + proxied_id_to_proxy = { + id(p._pxy_get().obj): p for p in self._dev.get_proxies() + } + else: + proxied_id_to_proxy = None ret = proxify_device_objects(obj, proxied_id_to_proxy, found_proxies) last_access = time.monotonic() for p in found_proxies: @@ -476,7 +502,7 @@ class ProxifyHostFile(MutableMapping): spill_on_demand: bool or None, default None Enables spilling when the RMM memory pool goes out of memory. If ``None``, the "spill-on-demand" config value are used, which defaults to True. - Notice, enabling this does nothing when RMM isn't availabe or not used. + Notice, enabling this does nothing when RMM isn't available or not used. gds_spilling: bool Enable GPUDirect Storage spilling. If ``None``, the "gds-spilling" config value are used, which defaults to ``False``. @@ -496,10 +522,10 @@ def __init__( *, device_memory_limit: int, memory_limit: int, - shared_filesystem: bool = None, - compatibility_mode: bool = None, - spill_on_demand: bool = None, - gds_spilling: bool = None, + shared_filesystem: Optional[bool] = None, + compatibility_mode: Optional[bool] = None, + spill_on_demand: Optional[bool] = None, + gds_spilling: Optional[bool] = None, ): if cudf_spilling_status(): warnings.warn( @@ -591,12 +617,16 @@ def oom(nbytes: int) -> bool: traceback.print_stack(file=f) f.seek(0) tb = f.read() + + dev_mem = get_rmm_device_memory_usage() + dev_msg = "" + if dev_mem is not None: + dev_msg = f"RMM allocs: {format_bytes(dev_mem)}, " + self.logger.warning( - "RMM allocation of %s failed, spill-on-demand couldn't " - "find any device memory to spill:\n%s\ntraceback:\n%s\n", - format_bytes(nbytes), - self.manager.pprint(), - tb, + f"RMM allocation of {format_bytes(nbytes)} failed, " + "spill-on-demand couldn't find any device memory to " + f"spill.\n{dev_msg}{self.manager}, traceback:\n{tb}\n" ) # Since we didn't find anything to spill, we give up. return False @@ -630,7 +660,7 @@ def evict(self) -> int: def fast(self): """Alternative access to `.evict()` used by Dask - Dask expects `.fast.evict()` to be availabe for manually triggering + Dask expects `.fast.evict()` to be available for manually triggering of CPU-to-Disk spilling. """ if len(self.manager._host) == 0: diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 80aaa7c43..21dc15ea1 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -46,7 +46,9 @@ def asproxy( - obj: object, serializers: Iterable[str] = None, subclass: Type["ProxyObject"] = None + obj: object, + serializers: Optional[Iterable[str]] = None, + subclass: Optional[Type["ProxyObject"]] = None, ) -> "ProxyObject": """Wrap `obj` in a ProxyObject object if it isn't already. @@ -344,7 +346,7 @@ class ProxyObject: Attributes ---------- _pxy: ProxyDetail - Details of all proxy information of the underlaying proxied object. + Details of all proxy information of the underlying proxied object. Access to _pxy is not pass-through to the proxied object, which is the case for most other access to the ProxyObject. @@ -380,7 +382,7 @@ def __del__(self): def _pxy_serialize( self, serializers: Iterable[str], - proxy_detail: ProxyDetail = None, + proxy_detail: Optional[ProxyDetail] = None, ) -> None: """Inplace serialization of the proxied object using the `serializers` @@ -410,7 +412,7 @@ def _pxy_serialize( self._pxy_cache.pop("device_memory_objects", None) def _pxy_deserialize( - self, maybe_evict: bool = True, proxy_detail: ProxyDetail = None + self, maybe_evict: bool = True, proxy_detail: Optional[ProxyDetail] = None ): """Inplace deserialization of the proxied object diff --git a/dask_cuda/tests/test_cudf_builtin_spilling.py b/dask_cuda/tests/test_cudf_builtin_spilling.py index 3e9519caa..d4c28ba06 100644 --- a/dask_cuda/tests/test_cudf_builtin_spilling.py +++ b/dask_cuda/tests/test_cudf_builtin_spilling.py @@ -34,7 +34,7 @@ @pytest.fixture def manager(request): - """Fixture to enable and make a spilling manager availabe""" + """Fixture to enable and make a spilling manager available""" kwargs = dict(getattr(request, "param", {})) set_global_manager(manager=SpillManager(**kwargs)) yield get_global_manager() @@ -77,6 +77,11 @@ def test_device_host_file_step_by_step(tmp_path, manager: SpillManager): tmpdir.mkdir() pdf = pandas.DataFrame({"a": [1, 2, 3]}) cdf = cudf.DataFrame({"a": [1, 2, 3]}) + + # Pandas will cache the result of probing this attribute. + # We trigger it here, to get consistent results from `safe_sizeof()` + hasattr(pdf, "__cuda_array_interface__") + dhf = DeviceHostFile( device_memory_limit=safe_sizeof(pdf), memory_limit=safe_sizeof(pdf), diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index 951e02692..64950e2b6 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -1,11 +1,11 @@ from __future__ import absolute_import, division, print_function import os +import pkgutil import subprocess import sys from unittest.mock import patch -import pkg_resources import pytest from distributed import Client, wait @@ -25,10 +25,12 @@ @patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0,3,7,8"}) def test_cuda_visible_devices_and_memory_limit_and_nthreads(loop): # noqa: F811 nthreads = 4 - with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]): + with popen(["dask", "scheduler", "--port", "9359", "--no-dashboard"]): with popen( [ - "dask-cuda-worker", + "dask", + "cuda", + "worker", "127.0.0.1:9359", "--host", "127.0.0.1", @@ -62,10 +64,12 @@ def get_visible_devices(): def test_rmm_pool(loop): # noqa: F811 rmm = pytest.importorskip("rmm") - with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): + with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]): with popen( [ - "dask-cuda-worker", + "dask", + "cuda", + "worker", "127.0.0.1:9369", "--host", "127.0.0.1", @@ -86,10 +90,12 @@ def test_rmm_pool(loop): # noqa: F811 def test_rmm_managed(loop): # noqa: F811 rmm = pytest.importorskip("rmm") - with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): + with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]): with popen( [ - "dask-cuda-worker", + "dask", + "cuda", + "worker", "127.0.0.1:9369", "--host", "127.0.0.1", @@ -115,10 +121,12 @@ def test_rmm_async(loop): # noqa: F811 if driver_version < 11020 or runtime_version < 11020: pytest.skip("cudaMallocAsync not supported") - with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): + with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]): with popen( [ - "dask-cuda-worker", + "dask", + "cuda", + "worker", "127.0.0.1:9369", "--host", "127.0.0.1", @@ -138,10 +146,12 @@ def test_rmm_async(loop): # noqa: F811 def test_rmm_logging(loop): # noqa: F811 rmm = pytest.importorskip("rmm") - with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): + with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]): with popen( [ - "dask-cuda-worker", + "dask", + "cuda", + "worker", "127.0.0.1:9369", "--host", "127.0.0.1", @@ -164,10 +174,12 @@ def test_rmm_logging(loop): # noqa: F811 @patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0"}) def test_dashboard_address(loop): # noqa: F811 - with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): + with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]): with popen( [ - "dask-cuda-worker", + "dask", + "cuda", + "worker", "127.0.0.1:9369", "--dashboard-address", "127.0.0.1:9370", @@ -184,7 +196,9 @@ def test_dashboard_address(loop): # noqa: F811 def test_unknown_argument(): - ret = subprocess.run(["dask-cuda-worker", "--my-argument"], capture_output=True) + ret = subprocess.run( + ["dask", "cuda", "worker", "--my-argument"], capture_output=True + ) assert ret.returncode != 0 assert b"Scheduler address: --my-argument" in ret.stderr @@ -194,18 +208,20 @@ def test_pre_import(loop): # noqa: F811 module = None # Pick a module that isn't currently loaded - for m in pkg_resources.working_set: - if m.key not in sys.modules.keys(): - module = m.key + for m in pkgutil.iter_modules(): + if m.ispkg and m.name not in sys.modules.keys(): + module = m.name break if module is None: pytest.skip("No module found that isn't already loaded") - with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): + with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]): with popen( [ - "dask-cuda-worker", + "dask", + "cuda", + "worker", "127.0.0.1:9369", "--pre-import", module, @@ -221,9 +237,9 @@ def test_pre_import(loop): # noqa: F811 @pytest.mark.timeout(20) @patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0"}) def test_pre_import_not_found(): - with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): + with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]): ret = subprocess.run( - ["dask-cuda-worker", "127.0.0.1:9369", "--pre-import", "my_module"], + ["dask", "cuda", "worker", "127.0.0.1:9369", "--pre-import", "my_module"], capture_output=True, ) assert ret.returncode != 0 @@ -241,10 +257,12 @@ def test_cuda_mig_visible_devices_and_memory_limit_and_nthreads(loop): # noqa: with patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": cuda_visible_devices}): nthreads = len(cuda_visible_devices) - with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]): + with popen(["dask", "scheduler", "--port", "9359", "--no-dashboard"]): with popen( [ - "dask-cuda-worker", + "dask", + "cuda", + "worker", "127.0.0.1:9359", "--host", "127.0.0.1", @@ -276,10 +294,12 @@ def test_cuda_visible_devices_uuid(loop): # noqa: F811 gpu_uuid = get_gpu_uuid_from_index(0) with patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": gpu_uuid}): - with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]): + with popen(["dask", "scheduler", "--port", "9359", "--no-dashboard"]): with popen( [ - "dask-cuda-worker", + "dask", + "cuda", + "worker", "127.0.0.1:9359", "--host", "127.0.0.1", @@ -297,10 +317,12 @@ def test_cuda_visible_devices_uuid(loop): # noqa: F811 def test_rmm_track_allocations(loop): # noqa: F811 rmm = pytest.importorskip("rmm") - with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): + with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]): with popen( [ - "dask-cuda-worker", + "dask", + "cuda", + "worker", "127.0.0.1:9369", "--host", "127.0.0.1", @@ -329,10 +351,12 @@ def test_rmm_track_allocations(loop): # noqa: F811 @patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0"}) def test_get_cluster_configuration(loop): # noqa: F811 pytest.importorskip("rmm") - with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): + with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]): with popen( [ - "dask-cuda-worker", + "dask", + "cuda", + "worker", "127.0.0.1:9369", "--host", "127.0.0.1", @@ -360,10 +384,12 @@ def test_get_cluster_configuration(loop): # noqa: F811 @patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0"}) def test_worker_fraction_limits(loop): # noqa: F811 pytest.importorskip("rmm") - with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]): + with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]): with popen( [ - "dask-cuda-worker", + "dask", + "cuda", + "worker", "127.0.0.1:9369", "--host", "127.0.0.1", diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index dd92e2a61..413bf5bdd 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -1,5 +1,7 @@ import asyncio import multiprocessing as mp +import os +from unittest.mock import patch import numpy as np import pandas as pd @@ -74,10 +76,14 @@ def _test_dataframe_merge_empty_partitions(nrows, npartitions): expected = df1.merge(df2).set_index("key") ddf1 = dd.from_pandas(df1, npartitions=npartitions) ddf2 = dd.from_pandas(df2, npartitions=npartitions) - with dask.config.set(explicit_comms=True): - ddf3 = ddf1.merge(ddf2, on=["key"]).set_index("key") - got = ddf3.compute() - pd.testing.assert_frame_equal(got, expected) + + for batchsize in (-1, 1, 2): + with dask.config.set( + explicit_comms=True, explicit_comms_batchsize=batchsize + ): + ddf3 = ddf1.merge(ddf2, on=["key"]).set_index("key") + got = ddf3.compute() + pd.testing.assert_frame_equal(got, expected) def test_dataframe_merge_empty_partitions(): @@ -130,22 +136,29 @@ def _test_dataframe_shuffle(backend, protocol, n_workers): ddf = dd.from_pandas(df.copy(), npartitions=input_nparts).persist( workers=all_workers ) - ddf = explicit_comms_shuffle( - ddf, ["key"], npartitions=output_nparts - ).persist() - - assert ddf.npartitions == output_nparts - - # Check that each partition of `ddf` hashes to the same value - result = ddf.map_partitions( - check_partitions, output_nparts - ).compute() - assert all(result.to_list()) - - # Check the values of `ddf` (ignoring the row order) - expected = df.sort_values("key") - got = ddf.compute().sort_values("key") - assert_eq(got, expected) + # To reduce test runtime, we change the batchsizes here instead + # of using a test parameter. + for batchsize in (-1, 1, 2): + with dask.config.set(explicit_comms_batchsize=batchsize): + ddf = explicit_comms_shuffle( + ddf, + ["key"], + npartitions=output_nparts, + batchsize=batchsize, + ).persist() + + assert ddf.npartitions == output_nparts + + # Check that each partition hashes to the same value + result = ddf.map_partitions( + check_partitions, output_nparts + ).compute() + assert all(result.to_list()) + + # Check the values (ignoring the row order) + expected = df.sort_values("key") + got = ddf.compute().sort_values("key") + assert_eq(got, expected) @pytest.mark.parametrize("nworkers", [1, 2, 3]) @@ -161,8 +174,9 @@ def test_dataframe_shuffle(backend, protocol, nworkers): assert not p.exitcode -def _test_dask_use_explicit_comms(): - def check_shuffle(in_cluster): +@pytest.mark.parametrize("in_cluster", [True, False]) +def test_dask_use_explicit_comms(in_cluster): + def check_shuffle(): """Check if shuffle use explicit-comms by search for keys named 'explicit-comms-shuffle' """ @@ -178,23 +192,28 @@ def check_shuffle(in_cluster): else: # If not in cluster, we cannot use explicit comms assert all(name not in str(key) for key in res.dask) - with LocalCluster( - protocol="tcp", - dashboard_address=None, - n_workers=2, - threads_per_worker=1, - processes=True, - ) as cluster: - with Client(cluster): - check_shuffle(True) - check_shuffle(False) - - -def test_dask_use_explicit_comms(): - p = mp.Process(target=_test_dask_use_explicit_comms) - p.start() - p.join() - assert not p.exitcode + if in_cluster: + # We check environment variables by setting an illegal batchsize + with patch.dict( + os.environ, + {"DASK_EXPLICIT_COMMS": "1", "DASK_EXPLICIT_COMMS_BATCHSIZE": "-2"}, + ): + dask.config.refresh() # Trigger re-read of the environment variables + with pytest.raises(ValueError, match="explicit-comms-batchsize"): + ddf.shuffle(on="key", npartitions=4, shuffle="tasks") + + if in_cluster: + with LocalCluster( + protocol="tcp", + dashboard_address=None, + n_workers=2, + threads_per_worker=1, + processes=True, + ) as cluster: + with Client(cluster): + check_shuffle() + else: + check_shuffle() def _test_dataframe_shuffle_merge(backend, protocol, n_workers): diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index 5e4070802..b0ac88234 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -1,9 +1,9 @@ import asyncio import os +import pkgutil import sys from unittest.mock import patch -import pkg_resources import pytest from dask.distributed import Client @@ -263,9 +263,9 @@ async def test_pre_import(): module = None # Pick a module that isn't currently loaded - for m in pkg_resources.working_set: - if m.key not in sys.modules.keys(): - module = m.key + for m in pkgutil.iter_modules(): + if m.ispkg and m.name not in sys.modules.keys(): + module = m.name break if module is None: diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py index 09b5c9b46..41399d673 100644 --- a/dask_cuda/tests/test_proxify_host_file.py +++ b/dask_cuda/tests/test_proxify_host_file.py @@ -1,4 +1,3 @@ -import re from typing import Iterable from unittest.mock import patch @@ -10,6 +9,7 @@ import dask.dataframe from dask.dataframe.shuffle import shuffle_group from dask.sizeof import sizeof +from dask.utils import format_bytes from distributed import Client from distributed.utils_test import gen_test from distributed.worker import get_worker @@ -239,7 +239,8 @@ def test_spill_on_demand(root_dir): @pytest.mark.parametrize("jit_unspill", [True, False]) -def test_local_cuda_cluster(jit_unspill): +@gen_test(timeout=20) +async def test_local_cuda_cluster(jit_unspill): """Testing spilling of a proxied cudf dataframe in a local cuda cluster""" cudf = pytest.importorskip("cudf") dask_cudf = pytest.importorskip("dask_cudf") @@ -256,14 +257,17 @@ def task(x): return x # Notice, setting `device_memory_limit=1B` to trigger spilling - with dask_cuda.LocalCUDACluster( - n_workers=1, device_memory_limit="1B", jit_unspill=jit_unspill + async with dask_cuda.LocalCUDACluster( + n_workers=1, + device_memory_limit="1B", + jit_unspill=jit_unspill, + asynchronous=True, ) as cluster: - with Client(cluster): + async with Client(cluster, asynchronous=True) as client: df = cudf.DataFrame({"a": range(10)}) ddf = dask_cudf.from_cudf(df, npartitions=1) ddf = ddf.map_partitions(task, meta=df.head()) - got = ddf.compute() + got = await client.compute(ddf) assert_frame_equal(got.to_pandas(), df.to_pandas()) @@ -277,7 +281,7 @@ def test_dataframes_share_dev_mem(root_dir): # Even though the two dataframe doesn't point to the same cudf.Buffer object assert view1["a"].data is not view2["a"].data # They still share the same underlying device memory - view1["a"].data.ptr == view2["a"].data.ptr + view1["a"].data.get_ptr(mode="read") == view2["a"].data.get_ptr(mode="read") dhf = ProxifyHostFile( worker_local_directory=root_dir, device_memory_limit=160, memory_limit=1000 @@ -381,15 +385,18 @@ def test_incompatible_types(root_dir): @pytest.mark.parametrize("npartitions", [1, 2, 3]) @pytest.mark.parametrize("compatibility_mode", [True, False]) -def test_compatibility_mode_dataframe_shuffle(compatibility_mode, npartitions): +@gen_test(timeout=20) +async def test_compatibility_mode_dataframe_shuffle(compatibility_mode, npartitions): cudf = pytest.importorskip("cudf") def is_proxy_object(x): return "ProxyObject" in str(type(x)) with dask.config.set(jit_unspill_compatibility_mode=compatibility_mode): - with dask_cuda.LocalCUDACluster(n_workers=1, jit_unspill=True) as cluster: - with Client(cluster): + async with dask_cuda.LocalCUDACluster( + n_workers=1, jit_unspill=True, asynchronous=True + ) as cluster: + async with Client(cluster, asynchronous=True) as client: ddf = dask.dataframe.from_pandas( cudf.DataFrame({"key": np.arange(10)}), npartitions=npartitions ) @@ -397,8 +404,8 @@ def is_proxy_object(x): # With compatibility mode on, we shouldn't encounter any proxy objects if compatibility_mode: - assert "ProxyObject" not in str(type(res.compute())) - res = res.map_partitions(is_proxy_object).compute() + assert "ProxyObject" not in str(type(await client.compute(res))) + res = await client.compute(res.map_partitions(is_proxy_object)) res = res.to_list() if compatibility_mode: @@ -448,25 +455,32 @@ def test_on_demand_debug_info(): if not hasattr(rmm.mr, "FailureCallbackResourceAdaptor"): pytest.skip("RMM doesn't implement FailureCallbackResourceAdaptor") - total_mem = get_device_total_memory() + rmm_pool_size = 2**20 def task(): - rmm.DeviceBuffer(size=total_mem + 1) + ( + rmm.DeviceBuffer(size=rmm_pool_size // 2), + rmm.DeviceBuffer(size=rmm_pool_size // 2), + rmm.DeviceBuffer(size=rmm_pool_size), # Trigger OOM + ) - with dask_cuda.LocalCUDACluster(n_workers=1, jit_unspill=True) as cluster: + with dask_cuda.LocalCUDACluster( + n_workers=1, + jit_unspill=True, + rmm_pool_size=rmm_pool_size, + rmm_maximum_pool_size=rmm_pool_size, + rmm_track_allocations=True, + ) as cluster: with Client(cluster) as client: # Warmup, which trigger the initialization of spill on demand client.submit(range, 10).result() # Submit too large RMM buffer - with pytest.raises( - MemoryError, match=r".*std::bad_alloc:.*CUDA error at:.*" - ): + with pytest.raises(MemoryError, match="Maximum pool size exceeded"): client.submit(task).result() log = str(client.get_worker_logs()) - assert re.search( - "WARNING - RMM allocation of .* failed, spill-on-demand", log - ) - assert re.search(": Empty", log) + size = format_bytes(rmm_pool_size) + assert f"WARNING - RMM allocation of {size} failed" in log + assert f"RMM allocs: {size}" in log assert "traceback:" in log diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 830b403d3..1a4abafe9 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -16,9 +16,10 @@ from dask.sizeof import sizeof from distributed import Client from distributed.protocol.serialize import deserialize, serialize +from distributed.utils_test import gen_test import dask_cuda -from dask_cuda import proxy_object +from dask_cuda import LocalCUDACluster, proxy_object from dask_cuda.disk_io import SpillToDiskFile from dask_cuda.proxify_device_objects import proxify_device_objects from dask_cuda.proxify_host_file import ProxifyHostFile @@ -282,7 +283,8 @@ def test_fixed_attribute_name(): @pytest.mark.parametrize("jit_unspill", [True, False]) -def test_spilling_local_cuda_cluster(jit_unspill): +@gen_test(timeout=20) +async def test_spilling_local_cuda_cluster(jit_unspill): """Testing spilling of a proxied cudf dataframe in a local cuda cluster""" cudf = pytest.importorskip("cudf") dask_cudf = pytest.importorskip("dask_cudf") @@ -299,14 +301,17 @@ def task(x): return x # Notice, setting `device_memory_limit=1B` to trigger spilling - with dask_cuda.LocalCUDACluster( - n_workers=1, device_memory_limit="1B", jit_unspill=jit_unspill + async with LocalCUDACluster( + n_workers=1, + device_memory_limit="1B", + jit_unspill=jit_unspill, + asynchronous=True, ) as cluster: - with Client(cluster): + async with Client(cluster, asynchronous=True) as client: df = cudf.DataFrame({"a": range(10)}) ddf = dask_cudf.from_cudf(df, npartitions=1) ddf = ddf.map_partitions(task, meta=df.head()) - got = ddf.compute() + got = await client.compute(ddf) if isinstance(got, pandas.Series): pytest.xfail( "BUG fixed by " @@ -395,7 +400,8 @@ def _pxy_deserialize(self): @pytest.mark.parametrize("send_serializers", [None, ("dask", "pickle"), ("cuda",)]) @pytest.mark.parametrize("protocol", ["tcp", "ucx"]) -def test_communicating_proxy_objects(protocol, send_serializers): +@gen_test(timeout=20) +async def test_communicating_proxy_objects(protocol, send_serializers): """Testing serialization of cuDF dataframe when communicating""" cudf = pytest.importorskip("cudf") @@ -413,10 +419,13 @@ def task(x): else: assert serializers_used == "dask" - with dask_cuda.LocalCUDACluster( - n_workers=1, protocol=protocol, enable_tcp_over_ucx=protocol == "ucx" + async with dask_cuda.LocalCUDACluster( + n_workers=1, + protocol=protocol, + enable_tcp_over_ucx=protocol == "ucx", + asynchronous=True, ) as cluster: - with Client(cluster) as client: + async with Client(cluster, asynchronous=True) as client: df = cudf.DataFrame({"a": range(10)}) df = proxy_object.asproxy( df, serializers=send_serializers, subclass=_PxyObjTest @@ -429,14 +438,14 @@ def task(x): df._pxy_get().assert_on_deserializing = False else: df._pxy_get().assert_on_deserializing = True - df = client.scatter(df) - client.submit(task, df).result() - client.shutdown() # Avoids a UCX shutdown error + df = await client.scatter(df) + await client.submit(task, df) @pytest.mark.parametrize("protocol", ["tcp", "ucx"]) @pytest.mark.parametrize("shared_fs", [True, False]) -def test_communicating_disk_objects(protocol, shared_fs): +@gen_test(timeout=20) +async def test_communicating_disk_objects(protocol, shared_fs): """Testing disk serialization of cuDF dataframe when communicating""" cudf = pytest.importorskip("cudf") ProxifyHostFile._spill_to_disk.shared_filesystem = shared_fs @@ -450,16 +459,18 @@ def task(x): else: assert serializer_used == "dask" - with dask_cuda.LocalCUDACluster( - n_workers=1, protocol=protocol, enable_tcp_over_ucx=protocol == "ucx" + async with dask_cuda.LocalCUDACluster( + n_workers=1, + protocol=protocol, + enable_tcp_over_ucx=protocol == "ucx", + asynchronous=True, ) as cluster: - with Client(cluster) as client: + async with Client(cluster, asynchronous=True) as client: df = cudf.DataFrame({"a": range(10)}) df = proxy_object.asproxy(df, serializers=("disk",), subclass=_PxyObjTest) df._pxy_get().assert_on_deserializing = False - df = client.scatter(df) - client.submit(task, df).result() - client.shutdown() # Avoids a UCX shutdown error + df = await client.scatter(df) + await client.submit(task, df) @pytest.mark.parametrize("array_module", ["numpy", "cupy"]) diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index a60c05e78..1a24d80b0 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -7,6 +7,7 @@ from contextlib import suppress from functools import singledispatch from multiprocessing import cpu_count +from typing import Optional import numpy as np import pynvml @@ -19,8 +20,6 @@ from distributed import Worker, wait from distributed.comm import parse_address -from .proxify_host_file import ProxifyHostFile - try: from nvtx import annotate as nvtx_annotate except ImportError: @@ -681,7 +680,9 @@ def get_gpu_uuid_from_index(device_index=0): def get_worker_config(dask_worker): - # assume homogenous cluster + from .proxify_host_file import ProxifyHostFile + + # assume homogeneous cluster plugin_vals = dask_worker.plugins.values() ret = {} @@ -822,3 +823,35 @@ def get_cluster_configuration(client): _get_cluster_configuration, client=client, asynchronous=client.asynchronous ) return data + + +def get_rmm_device_memory_usage() -> Optional[int]: + """Get current bytes allocated on current device through RMM + + Check the current RMM resource stack for resources such as + `StatisticsResourceAdaptor` and `TrackingResourceAdaptor` + that can report the current allocated bytes. Returns None, + if no such resources exist. + + Return + ------ + nbytes: int or None + Number of bytes allocated on device through RMM or None + """ + + def get_rmm_memory_resource_stack(mr) -> list: + if hasattr(mr, "upstream_mr"): + return [mr] + get_rmm_memory_resource_stack(mr.upstream_mr) + return [mr] + + try: + import rmm + except ImportError: + return None + + for mr in get_rmm_memory_resource_stack(rmm.mr.get_current_device_resource()): + if isinstance(mr, rmm.mr.TrackingResourceAdaptor): + return mr.get_allocated_bytes() + if isinstance(mr, rmm.mr.StatisticsResourceAdaptor): + return mr.allocation_counts["current_bytes"] + return None diff --git a/dependencies.yaml b/dependencies.yaml new file mode 100644 index 000000000..9b471e6a4 --- /dev/null +++ b/dependencies.yaml @@ -0,0 +1,126 @@ +# Dependency list for https://github.com/rapidsai/dependency-file-generator +files: + all: + output: none + includes: + - build_python + - cudatoolkit + - develop + - docs + - py_version + - run_python + - test_python + test_python: + output: none + includes: + - cudatoolkit + - py_version + - test_python + checks: + output: none + includes: + - develop + - py_version + docs: + output: none + includes: + - cudatoolkit + - docs + - py_version +channels: + - rapidsai + - rapidsai-nightly + - dask/label/dev + - conda-forge + - nvidia +dependencies: + build_python: + common: + - output_types: [conda, requirements] + packages: + - setuptools>=64.0.0 + cudatoolkit: + specific: + - output_types: conda + matrices: + - matrix: + cuda: "11.2" + packages: + - cudatoolkit=11.2 + - matrix: + cuda: "11.4" + packages: + - cudatoolkit=11.4 + - matrix: + cuda: "11.5" + packages: + - cudatoolkit=11.5 + - matrix: + cuda: "11.8" + packages: + - cudatoolkit=11.8 + develop: + common: + - output_types: [conda, requirements] + packages: + - pre-commit + docs: + common: + - output_types: [conda, requirements] + packages: + - numpydoc + - sphinx + - sphinx-click + - sphinx_rtd_theme + py_version: + specific: + - output_types: conda + matrices: + - matrix: + py: "3.8" + packages: + - python=3.8 + - matrix: + py: "3.9" + packages: + - python=3.9 + - matrix: + py: "3.10" + packages: + - python=3.10 + - matrix: + packages: + - python>=3.8,<3.11 + run_python: + common: + - output_types: [conda, requirements] + packages: + - dask==2023.1.1 + - distributed==2023.1.1 + - numba>=0.54 + - numpy>=1.18.0 + - pandas>=1.0 + - pynvml>=11.0.0 + - zict>=0.1.3 + test_python: + common: + - output_types: [conda] + packages: + - cucim=23.02 + - cudf=23.02 + - dask-cudf=23.02 + - pytest + - pytest-cov + - ucx-proc=*=gpu + - ucx-py=0.30 + specific: + - output_types: conda + matrices: + - matrix: + arch: x86_64 + packages: + - numactl-devel-cos7-x86_64 + - matrix: + arch: aarch64 + packages: + - numactl-devel-cos7-aarch64 diff --git a/docs/Makefile b/docs/Makefile index 69fe55ecf..ba501f6f5 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -16,4 +16,4 @@ help: # Catch-all target: route all unknown targets to Sphinx using the new # "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). %: Makefile - @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) \ No newline at end of file + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/docs/source/api.rst b/docs/source/api.rst index 10a3ed6d0..b9d9d6dfa 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -7,10 +7,19 @@ Cluster .. autoclass:: LocalCUDACluster :members: +CLI +--- + Worker ------- -.. click:: dask_cuda.cli.dask_cuda_worker:main - :prog: dask-cuda-worker +~~~~~~ +.. click:: dask_cuda.cli:worker + :prog: dask cuda + :nested: none + +Cluster configuration +~~~~~~~~~~~~~~~~~~~~~ +.. click:: dask_cuda.cli:config + :prog: dask cuda :nested: none Client initialization @@ -24,4 +33,3 @@ Explicit-comms .. currentmodule:: dask_cuda.explicit_comms.comms .. autoclass:: CommsContext :members: - diff --git a/docs/source/examples/best-practices.rst b/docs/source/examples/best-practices.rst index 242e90fff..84cc78b88 100644 --- a/docs/source/examples/best-practices.rst +++ b/docs/source/examples/best-practices.rst @@ -114,4 +114,3 @@ With UCX and NVLink, we greatly reduced the wall clock time to: ``347.43 ms +/- 0 | ucx://127.0.0.1:35954 1 | ucx://127.0.0.1:53584 ================================================================================ - diff --git a/docs/source/examples/ucx.rst b/docs/source/examples/ucx.rst index b9a367773..6230caf67 100644 --- a/docs/source/examples/ucx.rst +++ b/docs/source/examples/ucx.rst @@ -1,7 +1,7 @@ Enabling UCX communication ========================== -A CUDA cluster using UCX communication can be started automatically with LocalCUDACluster or manually with the ``dask-cuda-worker`` CLI tool. +A CUDA cluster using UCX communication can be started automatically with LocalCUDACluster or manually with the ``dask cuda worker`` CLI tool. In either case, a ``dask.distributed.Client`` must be made for the worker cluster using the same Dask UCX configuration; see `UCX Integration -- Configuration <../ucx.html#configuration>`_ for details on all available options. LocalCUDACluster with Automatic Configuration @@ -48,10 +48,10 @@ To connect a client to a cluster with all supported transports and an RMM pool: ) client = Client(cluster) -dask-cuda-worker with Automatic Configuration ---------------------------------------------- +``dask cuda worker`` with Automatic Configuration +------------------------------------------------- -When using ``dask-cuda-worker`` with UCX communication and automatic configuration, the scheduler, workers, and client must all be started manually, but without specifying any UCX transports explicitly. This is only supported in Dask-CUDA 22.02 and newer and requires UCX >= 1.11.1. +When using ``dask cuda worker`` with UCX communication and automatic configuration, the scheduler, workers, and client must all be started manually, but without specifying any UCX transports explicitly. This is only supported in Dask-CUDA 22.02 and newer and requires UCX >= 1.11.1. Scheduler ^^^^^^^^^ @@ -64,7 +64,7 @@ To start a Dask scheduler using UCX with automatic configuration and one GB of R $ DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True \ > DASK_DISTRIBUTED__RMM__POOL_SIZE=1GB \ - > dask-scheduler --protocol ucx --interface ib0 + > dask scheduler --protocol ucx --interface ib0 .. note:: The ``interface="ib0"`` is intentionally specified above to ensure RDMACM is used in systems that support InfiniBand. On systems that don't support InfiniBand or where RDMACM isn't required, the ``interface`` argument may be omitted or specified to listen on a different interface. @@ -79,7 +79,7 @@ To start workers with automatic UCX configuration and an RMM pool of 14GB per GP .. code-block:: bash $ UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES=cuda - > dask-cuda-worker ucx://:8786 \ + > dask cuda worker ucx://:8786 \ > --rmm-pool-size="14GB" \ > --interface="ib0" @@ -121,15 +121,15 @@ Alternatively, the ``with dask.config.set`` statement from the example above may .. note:: We specify ``UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES=cuda`` above for optimal performance with InfiniBand, see details `here `_. If not using InfiniBand, that option may be omitted. In UCX 1.12 and newer, that option is default and may be omitted as well even when using InfiniBand. -dask-cuda-worker with Manual Configuration +``dask cuda worker`` with Manual Configuration ------------------------------------------ -When using ``dask-cuda-worker`` with UCX communication and manual configuration, the scheduler, workers, and client must all be started manually, each using the same UCX configuration. +When using ``dask cuda worker`` with UCX communication and manual configuration, the scheduler, workers, and client must all be started manually, each using the same UCX configuration. Scheduler ^^^^^^^^^ -UCX configuration options will need to be specified for ``dask-scheduler`` as environment variables; see `Dask Configuration -- Environment Variables `_ for more details on the mapping between environment variables and options. +UCX configuration options will need to be specified for ``dask scheduler`` as environment variables; see `Dask Configuration -- Environment Variables `_ for more details on the mapping between environment variables and options. To start a Dask scheduler using UCX with all supported transports and an gigabyte RMM pool: @@ -141,19 +141,19 @@ To start a Dask scheduler using UCX with all supported transports and an gigabyt > DASK_DISTRIBUTED__COMM__UCX__INFINIBAND=True \ > DASK_DISTRIBUTED__COMM__UCX__RDMACM=True \ > DASK_DISTRIBUTED__RMM__POOL_SIZE=1GB \ - > dask-scheduler --protocol ucx --interface ib0 + > dask scheduler --protocol ucx --interface ib0 We communicate to the scheduler that we will be using UCX with the ``--protocol`` option, and that we will be using InfiniBand with the ``--interface`` option. Workers ^^^^^^^ -All UCX configuration options have analogous options in ``dask-cuda-worker``; see `API -- Worker <../api.html#worker>`_ for a complete list of these options. +All UCX configuration options have analogous options in ``dask cuda worker``; see `API -- Worker <../api.html#worker>`_ for a complete list of these options. To start a cluster with all supported transports and an RMM pool: .. code-block:: bash - $ dask-cuda-worker ucx://:8786 \ + $ dask cuda worker ucx://:8786 \ > --enable-tcp-over-ucx \ > --enable-nvlink \ > --enable-infiniband \ diff --git a/docs/source/examples/worker_count.rst b/docs/source/examples/worker_count.rst index 62954ffbe..401236723 100644 --- a/docs/source/examples/worker_count.rst +++ b/docs/source/examples/worker_count.rst @@ -20,14 +20,14 @@ This argument can be used on its own or in conjunction with ``CUDA_VISIBLE_DEVIC cluster = LocalCUDACluster(n_workers=2) # will use GPUs 0,1 cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="3,4,5", n_workers=2) # will use GPUs 3,4 -When using ``dask-cuda-worker``, ``CUDA_VISIBLE_DEVICES`` must be provided as an environment variable: +When using ``dask cuda worker``, ``CUDA_VISIBLE_DEVICES`` must be provided as an environment variable: .. code-block:: bash - $ dask-scheduler + $ dask scheduler distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:8786 - $ CUDA_VISIBLE_DEVICES=0,1 dask-cuda-worker 127.0.0.1:8786 + $ CUDA_VISIBLE_DEVICES=0,1 dask cuda worker 127.0.0.1:8786 GPUs can also be selected by their UUIDs, which can be acquired using `NVIDIA System Management Interface `_: @@ -46,4 +46,4 @@ These UUIDs can then be passed to ``CUDA_VISIBLE_DEVICES`` in place of a GPU ind .. code-block:: bash $ CUDA_VISIBLE_DEVICES="GPU-dae76d0e-3414-958a-8f3e-fc6682b36f31" \ - > dask-cuda-worker 127.0.0.1:8786 + > dask cuda worker 127.0.0.1:8786 diff --git a/docs/source/index.rst b/docs/source/index.rst index a43f29079..37ba12139 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -9,7 +9,7 @@ Motivation While Distributed can be used to leverage GPU workloads through libraries such as `cuDF `_, `CuPy `_, and `Numba `_, Dask-CUDA offers several unique features unavailable to Distributed: -- **Automatic instantiation of per-GPU workers** -- Using Dask-CUDA's LocalCUDACluster or ``dask-cuda-worker`` CLI will automatically launch one worker for each GPU available on the executing node, avoiding the need to explicitly select GPUs. +- **Automatic instantiation of per-GPU workers** -- Using Dask-CUDA's LocalCUDACluster or ``dask cuda worker`` CLI will automatically launch one worker for each GPU available on the executing node, avoiding the need to explicitly select GPUs. - **Automatic setting of CPU affinity** -- The setting of CPU affinity for each GPU is done automatically, preventing memory transfers from taking suboptimal paths. - **Automatic selection of InfiniBand devices** -- When UCX communication is enabled over InfiniBand, Dask-CUDA automatically selects the optimal InfiniBand device for each GPU (see `UCX Integration `_ for instructions on configuring UCX communication). - **Memory spilling from GPU** -- For memory-intensive workloads, Dask-CUDA supports spilling from GPU to host memory when a GPU reaches the default or user-specified memory utilization limit. diff --git a/docs/source/install.rst b/docs/source/install.rst index eb303346c..b8442b4ff 100644 --- a/docs/source/install.rst +++ b/docs/source/install.rst @@ -12,11 +12,11 @@ To use Dask-CUDA on your system, you will need: - A version of NVIDIA CUDA Toolkit compatible with the installed driver version; see Table 1 of `CUDA Compatibility -- Binary Compatibility `_ for an overview of CUDA Toolkit driver requirements Once the proper CUDA Toolkit version has been determined, it can be installed using along with Dask-CUDA using ``conda``. -To install the latest version of Dask-CUDA along with CUDA Toolkit 11.0: +To install the latest version of Dask-CUDA along with CUDA Toolkit 11.5: .. code-block:: bash - conda install -c rapidsai -c nvidia -c conda-forge dask-cuda cudatoolkit=11.0 + conda install -c rapidsai -c conda-forge -c nvidia dask-cuda cudatoolkit=11.5 Pip --- diff --git a/docs/source/quickstart.rst b/docs/source/quickstart.rst index ce9ea2f21..c5592b439 100644 --- a/docs/source/quickstart.rst +++ b/docs/source/quickstart.rst @@ -1,7 +1,7 @@ Quickstart ========== -A Dask-CUDA cluster can be created using either LocalCUDACluster or ``dask-cuda-worker`` from the command line. +A Dask-CUDA cluster can be created using either LocalCUDACluster or ``dask cuda worker`` from the command line. LocalCUDACluster ---------------- @@ -16,17 +16,17 @@ To create a Dask-CUDA cluster using all available GPUs and connect a Dask.distri cluster = LocalCUDACluster() client = Client(cluster) -dask-cuda-worker ----------------- +``dask cuda worker`` +-------------------- -To create an equivalent cluster from the command line, Dask-CUDA workers must be connected to a scheduler started with ``dask-scheduler``: +To create an equivalent cluster from the command line, Dask-CUDA workers must be connected to a scheduler started with ``dask scheduler``: .. code-block:: bash - $ dask-scheduler + $ dask scheduler distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:8786 - $ dask-cuda-worker 127.0.0.1:8786 + $ dask cuda worker 127.0.0.1:8786 To connect a client to this cluster: diff --git a/docs/source/spilling.rst b/docs/source/spilling.rst index ba8e7b93f..28f3562b9 100644 --- a/docs/source/spilling.rst +++ b/docs/source/spilling.rst @@ -19,17 +19,17 @@ Memory spilling can be disabled by setting ``device_memory_limit`` to 0: cluster = LocalCUDACluster(device_memory_limit=0) # spilling disabled -The same applies for ``dask-cuda-worker``, and spilling can be controlled by setting ``--device-memory-limit``: +The same applies for ``dask cuda worker``, and spilling can be controlled by setting ``--device-memory-limit``: .. code-block:: - $ dask-scheduler + $ dask scheduler distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:8786 - $ dask-cuda-worker --device-memory-limit 50000 - $ dask-cuda-worker --device-memory-limit 5GB - $ dask-cuda-worker --device-memory-limit 0.3 - $ dask-cuda-worker --device-memory-limit 0 + $ dask cuda worker --device-memory-limit 50000 + $ dask cuda worker --device-memory-limit 5GB + $ dask cuda worker --device-memory-limit 0.3 + $ dask cuda worker --device-memory-limit 0 JIT-Unspill @@ -65,19 +65,19 @@ Or set the worker argument ``--enable-jit-unspill​`` .. code-block:: - $ dask-scheduler + $ dask scheduler distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:8786 - $ dask-cuda-worker --enable-jit-unspill​ + $ dask cuda worker --enable-jit-unspill​ Or environment variable ``DASK_JIT_UNSPILL=True`` .. code-block:: - $ dask-scheduler + $ dask scheduler distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:8786 - $ DASK_JIT_UNSPILL=True dask-cuda-worker​ + $ DASK_JIT_UNSPILL=True dask cuda worker​ Limitations diff --git a/docs/source/ucx.rst b/docs/source/ucx.rst index fe9b95c4f..d9cacdc77 100644 --- a/docs/source/ucx.rst +++ b/docs/source/ucx.rst @@ -37,7 +37,7 @@ Automatic Beginning with Dask-CUDA 22.02 and assuming UCX >= 1.11.1, specifying UCX transports is now optional. -A local cluster can now be started with ``LocalCUDACluster(protocol="ucx")``, implying automatic UCX transport selection (``UCX_TLS=all``). Starting a cluster separately -- scheduler, workers and client as different processes -- is also possible, as long as Dask scheduler is created with ``dask-scheduler --protocol="ucx"`` and connecting a ``dask-cuda-worker`` to the scheduler will imply automatic UCX transport selection, but that requires the Dask scheduler and client to be started with ``DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True``. See `Enabling UCX communication `_ for more details examples of UCX usage with automatic configuration. +A local cluster can now be started with ``LocalCUDACluster(protocol="ucx")``, implying automatic UCX transport selection (``UCX_TLS=all``). Starting a cluster separately -- scheduler, workers and client as different processes -- is also possible, as long as Dask scheduler is created with ``dask scheduler --protocol="ucx"`` and connecting a ``dask cuda worker`` to the scheduler will imply automatic UCX transport selection, but that requires the Dask scheduler and client to be started with ``DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True``. See `Enabling UCX communication `_ for more details examples of UCX usage with automatic configuration. Configuring transports manually is still possible, please refer to the subsection below. @@ -97,7 +97,7 @@ this when using Dask-CUDA's UCX integration, processes launched via multiprocessing should use the start processes using the `"forkserver" `_ -method. When launching workers using `dask-cuda-worker `_, this can be +method. When launching workers using `dask cuda worker `_, this can be achieved by passing ``--multiprocessing-method forkserver`` as an argument. In user code, the method can be controlled with the ``distributed.worker.multiprocessing-method`` configuration key in @@ -127,8 +127,7 @@ therefore do something like the following: .. note:: - To confirm that no bad fork calls are occuring, start jobs with + To confirm that no bad fork calls are occurring, start jobs with ``UCX_IB_FORK_INIT=n``. UCX will produce a warning ``UCX WARN IB: ibv_fork_init() was disabled or failed, yet a fork() has been issued.`` if the application calls ``fork()``. - diff --git a/examples/ucx/dask_cuda_worker.sh b/examples/ucx/dask_cuda_worker.sh index f1ec98186..f139bfd6f 100644 --- a/examples/ucx/dask_cuda_worker.sh +++ b/examples/ucx/dask_cuda_worker.sh @@ -3,7 +3,7 @@ usage() { echo "usage: $0 [-a ] [-i ] [-r ] [-t ]" >&2 exit 1 - } + } # parse arguments rmm_pool_size=1GB @@ -46,7 +46,7 @@ if [[ $transport == *"ib"* ]]; then fi # initialize scheduler -dask-scheduler $scheduler_flags & +dask scheduler $scheduler_flags & # initialize workers -dask-cuda-worker $worker_flags +dask cuda worker $worker_flags diff --git a/pyproject.toml b/pyproject.toml index 4eec772de..58f156bb9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,18 +16,18 @@ readme = { file = "README.md", content-type = "text/markdown" } authors = [ { name = "NVIDIA Corporation" }, ] -license= { text = "Apache-2.0" } +license = { text = "Apache-2.0" } requires-python = ">=3.8" dependencies = [ - "dask ==2022.11.1", - "distributed ==2022.11.1", + "dask ==2023.1.1", + "distributed ==2023.1.1", "pynvml >=11.0.0", "numpy >=1.18.0", "numba >=0.54", "pandas >=1.0", "zict >=0.1.3", ] -classifiers=[ +classifiers = [ "Intended Audience :: Developers", "Topic :: Database", "Topic :: Scientific/Engineering", @@ -35,11 +35,15 @@ classifiers=[ "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", ] [project.scripts] -dask-cuda-worker = "dask_cuda.cli.dask_cuda_worker:go" -dask-config = "dask_cuda.cli.dask_config:go" +dask-cuda-worker = "dask_cuda.cli:worker" +dask-cuda-config = "dask_cuda.cli:config" + +[project.entry-points.dask_cli] +cuda = "dask_cuda.cli:cuda" [project.optional-dependencies] docs = [ @@ -54,6 +58,8 @@ test = [ [project.urls] Homepage = "https://github.com/rapidsai/dask-cuda" +Documentation = "https://docs.rapids.ai/api/dask-cuda/stable/" +Source = "https://github.com/rapidsai/dask-cuda" [tool.coverage.run] disable_warnings = [ @@ -120,6 +126,8 @@ filterwarnings = [ # tornado 6.2, remove when dask/distributed#6669 is fixed "ignore:clear_current is deprecated:DeprecationWarning:", "ignore:make_current is deprecated:DeprecationWarning:", + # remove after https://github.com/rapidsai/dask-cuda/issues/1087 is closed + "ignore:There is no current event loop:DeprecationWarning:tornado", ] [tool.setuptools] diff --git a/rtd/Makefile b/rtd/Makefile index 69fe55ecf..ba501f6f5 100644 --- a/rtd/Makefile +++ b/rtd/Makefile @@ -16,4 +16,4 @@ help: # Catch-all target: route all unknown targets to Sphinx using the new # "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). %: Makefile - @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) \ No newline at end of file + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)