diff --git a/.github/labeler.yml b/.github/labeler.yml index e8c3f14dc..89814c09a 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -5,7 +5,7 @@ python: - 'dask_cuda/**' -gpuCI: +ci: - 'ci/**' conda: diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index bce48ebd8..59e188881 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -28,17 +28,17 @@ concurrency: jobs: conda-python-build: secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.02 + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.04 with: build_type: ${{ inputs.build_type || 'branch' }} branch: ${{ inputs.branch }} date: ${{ inputs.date }} sha: ${{ inputs.sha }} docs-build: - if: ${{ startsWith(github.ref, 'refs/heads/branch-') }} + if: github.ref_type == 'branch' && github.event_name == 'push' needs: [conda-python-build] secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.02 + uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.04 with: build_type: branch node_type: "gpu-latest-1" @@ -48,7 +48,7 @@ jobs: upload-conda: needs: [conda-python-build] secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-upload-packages.yaml@branch-23.02 + uses: rapidsai/shared-action-workflows/.github/workflows/conda-upload-packages.yaml@branch-23.04 with: build_type: ${{ inputs.build_type || 'branch' }} branch: ${{ inputs.branch }} diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 3dee7d77f..abcd0c66c 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -18,26 +18,26 @@ jobs: - docs-build - wheel-build secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/pr-builder.yaml@branch-23.02 + uses: rapidsai/shared-action-workflows/.github/workflows/pr-builder.yaml@branch-23.04 checks: secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/checks.yaml@branch-23.02 + uses: rapidsai/shared-action-workflows/.github/workflows/checks.yaml@branch-23.04 conda-python-build: needs: checks secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.02 + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.04 with: build_type: pull-request conda-python-tests: needs: conda-python-build secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.02 + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.04 with: build_type: pull-request docs-build: needs: conda-python-build secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.02 + uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.04 with: build_type: pull-request node_type: "gpu-latest-1" diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 5c18a0b1c..3a6641d81 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -16,7 +16,7 @@ on: jobs: conda-python-tests: secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.02 + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.04 with: build_type: nightly branch: ${{ inputs.branch }} diff --git a/ci/checks/style.sh b/ci/checks/style.sh deleted file mode 100755 index 5d01f97d9..000000000 --- a/ci/checks/style.sh +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/bash -# Copyright (c) 2020, NVIDIA CORPORATION. -################################################################################ -# dask-cuda Style Tester -################################################################################ - -# Ignore errors and set path -set +e -PATH=/opt/conda/bin:$PATH - -# Activate common conda env -. /opt/conda/etc/profile.d/conda.sh -conda activate rapids - -# Run pre-commit checks -pre-commit run --hook-stage manual --all-files diff --git a/ci/cpu/build.sh b/ci/cpu/build.sh deleted file mode 100755 index b1b279641..000000000 --- a/ci/cpu/build.sh +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/env bash -# Copyright (c) 2019-2022, NVIDIA CORPORATION. -################################################################################ -# dask-cuda cpu build -################################################################################ -set -e - -# Set path and build parallel level -export PATH=/opt/conda/bin:/usr/local/cuda/bin:$PATH -export PARALLEL_LEVEL=${PARALLEL_LEVEL:-4} - -# Set home to the job's workspace -export HOME="$WORKSPACE" - -# Determine CUDA release version -export CUDA_REL=${CUDA_VERSION%.*} - -# Setup 'gpuci_conda_retry' for build retries (results in 2 total attempts) -export GPUCI_CONDA_RETRY_MAX=1 -export GPUCI_CONDA_RETRY_SLEEP=30 - -# Whether to keep `dask/label/dev` channel in the env. If INSTALL_DASK_MAIN=0, -# `dask/label/dev` channel is removed. -export INSTALL_DASK_MAIN=0 - -# Dask version to install when `INSTALL_DASK_MAIN=0` -export DASK_STABLE_VERSION="2023.1.1" - -# Switch to project root; also root of repo checkout -cd "$WORKSPACE" - -# While conda provides these during conda-build, they are also necessary during -# the setup.py build for PyPI -export GIT_DESCRIBE_TAG=`git describe --abbrev=0 --tags` -export GIT_DESCRIBE_NUMBER=`git rev-list ${GIT_DESCRIBE_TAG}..HEAD --count` - -# If nightly build, append current YYMMDD to version -if [[ "$BUILD_MODE" = "branch" && "$SOURCE_BRANCH" = branch-* ]] ; then - export VERSION_SUFFIX=`date +%y%m%d` -fi - -################################################################################ -# SETUP - Check environment -################################################################################ - -gpuci_logger "Check environment variables" -env - -gpuci_logger "Activate conda env" -. /opt/conda/etc/profile.d/conda.sh -conda activate rapids - -# Remove `rapidsai-nightly` & `dask/label/dev` channel if we are building main branch -if [ "$SOURCE_BRANCH" = "main" ]; then - conda config --system --remove channels rapidsai-nightly - conda config --system --remove channels dask/label/dev -elif [[ "${INSTALL_DASK_MAIN}" == 0 ]]; then -# Remove `dask/label/dev` channel if INSTALL_DASK_MAIN=0 - conda config --system --remove channels dask/label/dev -fi - -gpuci_logger "Check compiler versions" -python --version -$CC --version -$CXX --version - -gpuci_logger "Check conda environment" -conda info -conda config --show-sources -conda list --show-channel-urls - -# FIX Added to deal with Anancoda SSL verification issues during conda builds -conda config --set ssl_verify False - -# Install latest nightly version for dask and distributed if needed -if [[ "${INSTALL_DASK_MAIN}" == 1 ]]; then - gpuci_logger "Installing dask and distributed from dask nightly channel" - gpuci_mamba_retry install -c dask/label/dev \ - "dask/label/dev::dask" \ - "dask/label/dev::distributed" -else - gpuci_logger "gpuci_mamba_retry install conda-forge::dask==${DASK_STABLE_VERSION} conda-forge::distributed==${DASK_STABLE_VERSION} conda-forge::dask-core==${DASK_STABLE_VERSION} --force-reinstall" - gpuci_mamba_retry install conda-forge::dask==${DASK_STABLE_VERSION} conda-forge::distributed==${DASK_STABLE_VERSION} conda-forge::dask-core==${DASK_STABLE_VERSION} --force-reinstall -fi - - -################################################################################ -# BUILD - Package builds -################################################################################ - -# FIXME: Move boa install to gpuci/rapidsai -gpuci_mamba_retry install -c conda-forge boa - -gpuci_logger "Build conda pkg for dask-cuda" -gpuci_conda_retry mambabuild conda/recipes/dask-cuda --python=${PYTHON} - -rm -rf dist/ -python setup.py sdist bdist_wheel - -################################################################################ -# UPLOAD - Packages -################################################################################ - -gpuci_logger "Upload conda pkg..." -source ci/cpu/upload.sh - -gpuci_logger "Upload pypi pkg..." -source ci/cpu/upload-pypi.sh diff --git a/ci/cpu/upload-pypi.sh b/ci/cpu/upload-pypi.sh deleted file mode 100755 index 4eb6a2ad9..000000000 --- a/ci/cpu/upload-pypi.sh +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/bash -set -e - - -if [ ${BUILD_MODE} != "branch" ]; then - echo "Skipping upload" - return 0 -fi - -if [ -z "$TWINE_PASSWORD" ]; then - echo "TWINE_PASSWORD not set" - return 0 -fi - -echo "Upload pypi" -twine upload --skip-existing -u ${TWINE_USERNAME:-rapidsai} dist/* diff --git a/ci/cpu/upload.sh b/ci/cpu/upload.sh deleted file mode 100755 index 5a40e1f98..000000000 --- a/ci/cpu/upload.sh +++ /dev/null @@ -1,42 +0,0 @@ -#!/bin/bash -# -# Adopted from https://github.com/tmcdonell/travis-scripts/blob/dfaac280ac2082cd6bcaba3217428347899f2975/update-accelerate-buildbot.sh - -set -e - -# Setup 'gpuci_retry' for upload retries (results in 4 total attempts) -export GPUCI_RETRY_MAX=3 -export GPUCI_RETRY_SLEEP=30 - -# Set default label options if they are not defined elsewhere -export LABEL_OPTION=${LABEL_OPTION:-"--label main"} - -# Skip uploads unless BUILD_MODE == "branch" -if [ ${BUILD_MODE} != "branch" ]; then - echo "Skipping upload" - return 0 -fi - -# Skip uploads if there is no upload key -if [ -z "$MY_UPLOAD_KEY" ]; then - echo "No upload key" - return 0 -fi - -################################################################################ -# SETUP - Get conda file output locations -################################################################################ - -gpuci_logger "Get conda file output locations" -export DASKCUDA_FILE=`conda build conda/recipes/dask-cuda --python=$PYTHON --output` - -################################################################################ -# UPLOAD - Conda packages -################################################################################ - -gpuci_logger "Starting conda uploads" - -test -e ${DASKCUDA_FILE} -echo "Upload Dask-cuda" -echo ${DASKCUDA_FILE} -gpuci_retry anaconda -t ${MY_UPLOAD_KEY} upload -u ${CONDA_USERNAME:-rapidsai} ${LABEL_OPTION} --skip-existing ${DASKCUDA_FILE} --no-progress diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh deleted file mode 100755 index 2d6f35f10..000000000 --- a/ci/gpu/build.sh +++ /dev/null @@ -1,129 +0,0 @@ -#!/bin/bash -# Copyright (c) 2020, NVIDIA CORPORATION. -############################################## -# dask-cuda GPU build and test script for CI # -############################################## -set -e -NUMARGS=$# -ARGS=$* - -# Arg parsing function -function hasArg { - (( ${NUMARGS} != 0 )) && (echo " ${ARGS} " | grep -q " $1 ") -} - -# Set path and build parallel level -export PATH=/opt/conda/bin:/usr/local/cuda/bin:$PATH -export PARALLEL_LEVEL=${PARALLEL_LEVEL:-4} -export CUDA_REL=${CUDA_VERSION%.*} -export CUDA_REL2=${CUDA//./} - -# Set home to the job's workspace -export HOME="$WORKSPACE" - -# Parse git describe -cd "$WORKSPACE" -export GIT_DESCRIBE_TAG=`git describe --tags` -export MINOR_VERSION=`echo $GIT_DESCRIBE_TAG | grep -o -E '([0-9]+\.[0-9]+)'` -export UCX_PATH=$CONDA_PREFIX -export UCXPY_VERSION=0.30 -unset GIT_DESCRIBE_TAG - -# Enable NumPy's __array_function__ protocol (needed for NumPy 1.16.x, -# will possibly be enabled by default starting on 1.17) -export NUMPY_EXPERIMENTAL_ARRAY_FUNCTION=1 - -# Install dask and distributed from main branch. Usually needed during -# development time and disabled before a new dask-cuda release. -export INSTALL_DASK_MAIN=0 - -# Dask version to install when `INSTALL_DASK_MAIN=0` -export DASK_STABLE_VERSION="2023.1.1" - -# Temporary workaround for Jupyter errors. -# See https://github.com/rapidsai/dask-cuda/issues/1040 -export JUPYTER_PLATFORM_DIRS=1 - -################################################################################ -# SETUP - Check environment -################################################################################ - -gpuci_logger "Check environment" -env - -gpuci_logger "Check GPU usage" -nvidia-smi - -gpuci_logger "Activate conda env" -. /opt/conda/etc/profile.d/conda.sh -conda activate rapids - -conda info -conda config --show-sources -conda list --show-channel-urls - -# Installing cucim in order to test GDS spilling -gpuci_mamba_retry install "cudf=${MINOR_VERSION}" \ - "dask-cudf=${MINOR_VERSION}" \ - "ucx-py=${UCXPY_VERSION}" \ - "ucx-proc=*=gpu" \ - "cucim" - - -gpuci_logger "Check versions" -python --version -$CC --version -$CXX --version - -conda info -conda config --show-sources -conda list --show-channel-urls - -################################################################################ -# BUILD - Build dask-cuda -################################################################################ - -# TODO: Move boa install to gpuci/rapidsai -gpuci_mamba_retry install boa - -gpuci_logger "Build and install dask-cuda" -cd "${WORKSPACE}" -CONDA_BLD_DIR="${WORKSPACE}/.conda-bld" -gpuci_conda_retry mambabuild --croot "${CONDA_BLD_DIR}" conda/recipes/dask-cuda --python="${PYTHON}" -gpuci_mamba_retry install -c "${CONDA_BLD_DIR}" dask-cuda - -################################################################################ -# DASK - Install latest nightly version for dask and distributed if needed. -# Done after everything else to ensure packages are not downgraded. -################################################################################ -if [[ "${INSTALL_DASK_MAIN}" == 1 ]]; then - gpuci_logger "Installing dask and distributed from dask nightly channel" - gpuci_mamba_retry install -c dask/label/dev \ - "dask/label/dev::dask" \ - "dask/label/dev::distributed" -else - gpuci_logger "gpuci_mamba_retry install conda-forge::dask==${DASK_STABLE_VERSION} conda-forge::distributed==${DASK_STABLE_VERSION} conda-forge::dask-core==${DASK_STABLE_VERSION} --force-reinstall" - gpuci_mamba_retry install conda-forge::dask==${DASK_STABLE_VERSION} conda-forge::distributed==${DASK_STABLE_VERSION} conda-forge::dask-core==${DASK_STABLE_VERSION} --force-reinstall - conda config --system --remove channels dask/label/dev -fi - -################################################################################ -# TEST - Run pytests for ucx-py -################################################################################ - -if hasArg --skip-tests; then - gpuci_logger "Skipping Tests" -else - gpuci_logger "Python pytest for dask-cuda" - cd "$WORKSPACE" - ls dask_cuda/tests/ - DASK_CUDA_TEST_SINGLE_GPU=1 UCXPY_IFNAME=eth0 UCX_WARN_UNUSED_ENV_VARS=n UCX_MEMTYPE_CACHE=n pytest -vs --cache-clear --basetemp="$WORKSPACE/dask-cuda-tmp" --junitxml="$WORKSPACE/junit-dask-cuda.xml" --cov-config=.coveragerc --cov=dask_cuda --cov-report=xml:"$WORKSPACE/dask-cuda-coverage.xml" --cov-report term dask_cuda/tests/ - - logger "Run local benchmark..." - python dask_cuda/benchmarks/local_cudf_shuffle.py --partition-size="1 KiB" -d 0 --runs 1 --backend dask - python dask_cuda/benchmarks/local_cudf_shuffle.py --partition-size="1 KiB" -d 0 --runs 1 --backend explicit-comms -fi - -if [ -n "${CODECOV_TOKEN}" ]; then - codecov -t $CODECOV_TOKEN -fi diff --git a/ci/local/README.md b/ci/local/README.md deleted file mode 100644 index b121cfc05..000000000 --- a/ci/local/README.md +++ /dev/null @@ -1,58 +0,0 @@ -## Purpose - -This script is designed for developer and contributor use. This tool mimics the actions of gpuCI on your local machine. This allows you to test and even debug your code inside a gpuCI base container before pushing your code as a GitHub commit. -The script can be helpful in locally triaging and debugging RAPIDS continuous integration failures. - -## Requirements - -``` -nvidia-docker -``` - -## Usage - -``` -bash build.sh [-h] [-H] [-s] [-r ] [-i ] -Build and test your local repository using a base gpuCI Docker image - -where: - -H Show this help text - -r Path to repository (defaults to working directory) - -i Use Docker image (default is gpuci/rapidsai-base:cuda10.0-ubuntu16.04-gcc5-py3.6) - -s Skip building and testing and start an interactive shell in a container of the Docker image -``` - -Example Usage: -`bash build.sh -r ~/rapids/dask-cuda -i gpuci/rapidsai-base:cuda10.1-ubuntu16.04-gcc5-py3.6` - -For a full list of available gpuCI docker images, visit our [DockerHub](https://hub.docker.com/r/gpuci/rapidsai-base/tags) page. - -Style Check: -```bash -$ bash ci/local/build.sh -r ~/rapids/dask-cuda -s -$ . /opt/conda/etc/profile.d/conda.sh -$ conda activate rapids #Activate gpuCI conda environment -$ cd rapids -$ flake8 python -``` - -## Information - -There are some caveats to be aware of when using this script, especially if you plan on developing from within the container itself. - - -### Docker Image Build Repository - -The docker image will generate build artifacts in a folder on your machine located in the `root` directory of the repository you passed to the script. For the above example, the directory is named `~/rapids/dask-cuda/build_rapidsai-base_cuda10.1-ubuntu16.04-gcc5-py3.6/`. Feel free to remove this directory after the script is finished. - -*Note*: The script *will not* override your local build repository. Your local environment stays in tact. - - -### Where The User is Dumped - -The script will build your repository and run all tests. If any tests fail, it dumps the user into the docker container itself to allow you to debug from within the container. If all the tests pass as expected the container exits and is automatically removed. Remember to exit the container if tests fail and you do not wish to debug within the container itself. - - -### Container File Structure - -Your repository will be located in the `/rapids/` folder of the container. This folder is volume mounted from the local machine. Any changes to the code in this repository are replicated onto the local machine. The `cpp/build` and `python/build` directories within your repository is on a separate mount to avoid conflicting with your local build artifacts. diff --git a/ci/local/build.sh b/ci/local/build.sh deleted file mode 100755 index 20d867e8c..000000000 --- a/ci/local/build.sh +++ /dev/null @@ -1,145 +0,0 @@ -#!/bin/bash - -GIT_DESCRIBE_TAG=`git describe --tags` -MINOR_VERSION=`echo $GIT_DESCRIBE_TAG | grep -o -E '([0-9]+\.[0-9]+)'` - -DOCKER_IMAGE="gpuci/rapidsai:${MINOR_VERSION}-cuda10.1-devel-ubuntu16.04-py3.7" -REPO_PATH=${PWD} -RAPIDS_DIR_IN_CONTAINER="/rapids" -CPP_BUILD_DIR="cpp/build" -PYTHON_BUILD_DIR="python/build" -CONTAINER_SHELL_ONLY=0 - -SHORTHELP="$(basename "$0") [-h] [-H] [-s] [-r ] [-i ]" -LONGHELP="${SHORTHELP} -Build and test your local repository using a base gpuCI Docker image - -where: - -H Show this help text - -r Path to repository (defaults to working directory) - -i Use Docker image (default is ${DOCKER_IMAGE}) - -s Skip building and testing and start an interactive shell in a container of the Docker image -" - -# Limit GPUs available to container based on CUDA_VISIBLE_DEVICES -if [[ -z "${CUDA_VISIBLE_DEVICES}" ]]; then - NVIDIA_VISIBLE_DEVICES="all" -else - NVIDIA_VISIBLE_DEVICES=${CUDA_VISIBLE_DEVICES} -fi - -while getopts ":hHr:i:s" option; do - case ${option} in - r) - REPO_PATH=${OPTARG} - ;; - i) - DOCKER_IMAGE=${OPTARG} - ;; - s) - CONTAINER_SHELL_ONLY=1 - ;; - h) - echo "${SHORTHELP}" - exit 0 - ;; - H) - echo "${LONGHELP}" - exit 0 - ;; - *) - echo "ERROR: Invalid flag" - echo "${SHORTHELP}" - exit 1 - ;; - esac -done - -REPO_PATH_IN_CONTAINER="${RAPIDS_DIR_IN_CONTAINER}/$(basename "${REPO_PATH}")" -CPP_BUILD_DIR_IN_CONTAINER="${RAPIDS_DIR_IN_CONTAINER}/$(basename "${REPO_PATH}")/${CPP_BUILD_DIR}" -PYTHON_BUILD_DIR_IN_CONTAINER="${RAPIDS_DIR_IN_CONTAINER}/$(basename "${REPO_PATH}")/${PYTHON_BUILD_DIR}" - - -# BASE_CONTAINER_BUILD_DIR is named after the image name, allowing for -# multiple image builds to coexist on the local filesystem. This will -# be mapped to the typical BUILD_DIR inside of the container. Builds -# running in the container generate build artifacts just as they would -# in a bare-metal environment, and the host filesystem is able to -# maintain the host build in BUILD_DIR as well. -# FIXME: Fix the shellcheck complaints -# shellcheck disable=SC2001,SC2005,SC2046 -BASE_CONTAINER_BUILD_DIR=${REPO_PATH}/build_$(echo $(basename "${DOCKER_IMAGE}")|sed -e 's/:/_/g') -CPP_CONTAINER_BUILD_DIR=${BASE_CONTAINER_BUILD_DIR}/cpp -PYTHON_CONTAINER_BUILD_DIR=${BASE_CONTAINER_BUILD_DIR}/python -# Create build directories. This is to ensure correct owner for directories. If -# directories don't exist there is side effect from docker volume mounting creating build -# directories owned by root(volume mount point(s)) -mkdir -p "${REPO_PATH}/${CPP_BUILD_DIR}" -mkdir -p "${REPO_PATH}/${PYTHON_BUILD_DIR}" - -BUILD_SCRIPT="#!/bin/bash -set -e -WORKSPACE=${REPO_PATH_IN_CONTAINER} -PREBUILD_SCRIPT=${REPO_PATH_IN_CONTAINER}/ci/gpu/prebuild.sh -BUILD_SCRIPT=${REPO_PATH_IN_CONTAINER}/ci/gpu/build.sh -cd "\$WORKSPACE" -if [ -f \${PREBUILD_SCRIPT} ]; then - source \${PREBUILD_SCRIPT} -fi -yes | source \${BUILD_SCRIPT} -" - -if (( CONTAINER_SHELL_ONLY == 0 )); then - COMMAND="${CPP_BUILD_DIR_IN_CONTAINER}/build.sh || bash" -else - COMMAND="bash" -fi - -# Create the build dir for the container to mount, generate the build script inside of it -mkdir -p "${BASE_CONTAINER_BUILD_DIR}" -mkdir -p "${CPP_CONTAINER_BUILD_DIR}" -mkdir -p "${PYTHON_CONTAINER_BUILD_DIR}" -echo "${BUILD_SCRIPT}" > "${CPP_CONTAINER_BUILD_DIR}/build.sh" -chmod ugo+x "${CPP_CONTAINER_BUILD_DIR}/build.sh" - -# Mount passwd and group files to docker. This allows docker to resolve username and group -# avoiding these nags: -# * groups: cannot find name for group ID ID -# * I have no name!@id:/$ -# For ldap user user information is not present in system /etc/passwd and /etc/group files. -# Hence we generate dummy files for ldap users which docker uses to resolve username and group - -PASSWD_FILE="/etc/passwd" -GROUP_FILE="/etc/group" - -USER_FOUND=$(grep -wc "$(whoami)" < "$PASSWD_FILE") -if [ "$USER_FOUND" == 0 ]; then - echo "Local User not found, LDAP WAR for docker mounts activated. Creating dummy passwd and group" - echo "files to allow docker resolve username and group" - cp "$PASSWD_FILE" /tmp/passwd - PASSWD_FILE="/tmp/passwd" - cp "$GROUP_FILE" /tmp/group - GROUP_FILE="/tmp/group" - echo "$(whoami):x:$(id -u):$(id -g):$(whoami),,,:$HOME:$SHELL" >> "$PASSWD_FILE" - echo "$(whoami):x:$(id -g):" >> "$GROUP_FILE" -fi - -# Run the generated build script in a container -docker pull "${DOCKER_IMAGE}" - -DOCKER_MAJOR=$(docker -v|sed 's/[^[0-9]*\([0-9]*\).*/\1/') -GPU_OPTS="--gpus device=${NVIDIA_VISIBLE_DEVICES}" -if [ "$DOCKER_MAJOR" -lt 19 ] -then - GPU_OPTS="--runtime=nvidia -e NVIDIA_VISIBLE_DEVICES=${NVIDIA_VISIBLE_DEVICES}" -fi - -docker run --rm -it ${GPU_OPTS} \ - -u "$(id -u)":"$(id -g)" \ - -v "${REPO_PATH}":"${REPO_PATH_IN_CONTAINER}" \ - -v "${CPP_CONTAINER_BUILD_DIR}":"${CPP_BUILD_DIR_IN_CONTAINER}" \ - -v "${PYTHON_CONTAINER_BUILD_DIR}":"${PYTHON_BUILD_DIR_IN_CONTAINER}" \ - -v "$PASSWD_FILE":/etc/passwd:ro \ - -v "$GROUP_FILE":/etc/group:ro \ - --cap-add=SYS_PTRACE \ - "${DOCKER_IMAGE}" bash -c "${COMMAND}" diff --git a/ci/release/update-version.sh b/ci/release/update-version.sh index cab06b0ad..b73037951 100755 --- a/ci/release/update-version.sh +++ b/ci/release/update-version.sh @@ -31,9 +31,6 @@ function sed_runner() { sed -i.bak ''"$1"'' $2 && rm -f ${2}.bak } -# Update UCX-Py version -sed_runner "s/export UCXPY_VERSION=.*/export UCXPY_VERSION="${NEXT_UCXPY_VERSION}"/g" ci/gpu/build.sh - # Bump cudf and dask-cudf testing dependencies sed_runner "s/cudf=.*/cudf=${NEXT_SHORT_TAG}/g" dependencies.yaml sed_runner "s/dask-cudf=.*/dask-cudf=${NEXT_SHORT_TAG}/g" dependencies.yaml diff --git a/ci/test_python.sh b/ci/test_python.sh index bf221f498..b9610bcaf 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright (c) 2022, NVIDIA CORPORATION. +# Copyright (c) 2022-2023, NVIDIA CORPORATION. set -euo pipefail @@ -24,17 +24,18 @@ PYTHON_CHANNEL=$(rapids-download-conda-from-s3 python) RAPIDS_TESTS_DIR=${RAPIDS_TESTS_DIR:-"${PWD}/test-results"} RAPIDS_COVERAGE_DIR=${RAPIDS_COVERAGE_DIR:-"${PWD}/coverage-results"} mkdir -p "${RAPIDS_TESTS_DIR}" "${RAPIDS_COVERAGE_DIR}" -SUITEERROR=0 rapids-print-env rapids-mamba-retry install \ - -c "${PYTHON_CHANNEL}" \ + --channel "${PYTHON_CHANNEL}" \ dask-cuda rapids-logger "Check GPU usage" nvidia-smi +EXITCODE=0 +trap "EXITCODE=1" ERR set +e rapids-logger "pytest dask-cuda" @@ -53,12 +54,6 @@ timeout 30m pytest \ --cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cuda-coverage.xml" \ --cov-report=term \ tests -exitcode=$? - -if (( ${exitcode} != 0 )); then - SUITEERROR=${exitcode} - echo "FAILED: 1 or more tests in dask-cuda" -fi popd rapids-logger "Run local benchmark" @@ -67,23 +62,12 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ -d 0 \ --runs 1 \ --backend dask -exitcode=$? - -if (( ${exitcode} != 0 )); then - SUITEERROR=${exitcode} - echo "FAILED: Local benchmark with dask comms" -fi python dask_cuda/benchmarks/local_cudf_shuffle.py \ --partition-size="1 KiB" \ -d 0 \ --runs 1 \ --backend explicit-comms -exitcode=$? - -if (( ${exitcode} != 0 )); then - SUITEERROR=${exitcode} - echo "FAILED: Local benchmark with explicit comms" -fi -exit ${SUITEERROR} +rapids-logger "Test script exiting with value: $EXITCODE" +exit ${EXITCODE} diff --git a/conda/recipes/dask-cuda/meta.yaml b/conda/recipes/dask-cuda/meta.yaml index cc26426d6..42988822c 100644 --- a/conda/recipes/dask-cuda/meta.yaml +++ b/conda/recipes/dask-cuda/meta.yaml @@ -1,13 +1,12 @@ -# Copyright (c) 2019-2022, NVIDIA CORPORATION. +# Copyright (c) 2019-2023, NVIDIA CORPORATION. # Usage: # conda build -c conda-forge . {% set data = load_file_data("pyproject.toml") %} -{% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %} -{% set number = environ.get('GIT_DESCRIBE_NUMBER', 0) %} +{% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') %} {% set py_version = environ['CONDA_PY'] %} -{% set git_hash = environ.get('GIT_DESCRIBE_HASH', '') %} +{% set date_string = environ['RAPIDS_DATE_STRING'] %} package: name: dask-cuda @@ -17,8 +16,8 @@ source: git_url: ../../.. build: - number: {{ number }} - string: py{{ py_version }}_{{ git_hash }}_{{ number }} + number: {{ GIT_DESCRIBE_NUMBER }} + string: py{{ py_version }}_{{ date_string }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }} script: - {{ PYTHON }} -m pip install . -vv entry_points: @@ -34,6 +33,7 @@ requirements: - versioneer >=0.24 run: - python + - dask-core ==2023.3.2 {% for r in data.get("project", {}).get("dependencies", []) %} - {{ r }} {% endfor %} @@ -49,7 +49,12 @@ test: {% endfor %} about: - home: https://rapids.ai/ - license: Apache-2.0 - license_file: ../../../LICENSE - summary: dask-cuda library + home: {{ data.get("project", {}).get("urls", {}).get("Homepage", "") }} + license: {{ data.get("project", {}).get("license", {}).get("text", "") }} + license_file: + {% for e in data.get("tool", {}).get("setuptools", {}).get("license-files", []) %} + - ../../../{{ e }} + {% endfor %} + summary: {{ data.get("project", {}).get("description", "") }} + dev_url: {{ data.get("project", {}).get("urls", {}).get("Source", "") }} + doc_url: {{ data.get("project", {}).get("urls", {}).get("Documentation", "") }} diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index ed8e6ae9e..55207d08f 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -5,12 +5,18 @@ import dask +import dask.utils import dask.dataframe.core import dask.dataframe.shuffle +import dask.dataframe.multi +import dask.bag.core from ._version import get_versions from .cuda_worker import CUDAWorker -from .explicit_comms.dataframe.shuffle import get_rearrange_by_column_tasks_wrapper +from .explicit_comms.dataframe.shuffle import ( + get_rearrange_by_column_wrapper, + get_default_shuffle_algorithm, +) from .local_cuda_cluster import LocalCUDACluster from .proxify_device_objects import proxify_decorator, unproxify_decorator @@ -19,11 +25,14 @@ # Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True` -dask.dataframe.shuffle.rearrange_by_column_tasks = ( - get_rearrange_by_column_tasks_wrapper( - dask.dataframe.shuffle.rearrange_by_column_tasks - ) +dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper( + dask.dataframe.shuffle.rearrange_by_column ) +# We have to replace all modules that imports Dask's `get_default_shuffle_algorithm()` +# TODO: introduce a shuffle-algorithm dispatcher in Dask so we don't need this hack +dask.dataframe.shuffle.get_default_shuffle_algorithm = get_default_shuffle_algorithm +dask.dataframe.multi.get_default_shuffle_algorithm = get_default_shuffle_algorithm +dask.bag.core.get_default_shuffle_algorithm = get_default_shuffle_algorithm # Monkey patching Dask to make use of proxify and unproxify in compatibility mode diff --git a/dask_cuda/benchmarks/common.py b/dask_cuda/benchmarks/common.py index e734f882c..1335334ab 100644 --- a/dask_cuda/benchmarks/common.py +++ b/dask_cuda/benchmarks/common.py @@ -121,8 +121,12 @@ def run(client: Client, args: Namespace, config: Config): args.type == "gpu", args.rmm_pool_size, args.disable_rmm_pool, + args.enable_rmm_async, + args.enable_rmm_managed, + args.rmm_release_threshold, args.rmm_log_directory, args.enable_rmm_statistics, + args.enable_rmm_track_allocations, ) address_to_index, results, message_data = gather_bench_results(client, args, config) p2p_bw = peer_to_peer_bandwidths(message_data, address_to_index) diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index 1d07df30c..d3ce666b2 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -98,6 +98,25 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[] cluster_args.add_argument( "--disable-rmm-pool", action="store_true", help="Disable the RMM memory pool" ) + cluster_args.add_argument( + "--enable-rmm-managed", + action="store_true", + help="Enable RMM managed memory allocator", + ) + cluster_args.add_argument( + "--enable-rmm-async", + action="store_true", + help="Enable RMM async memory allocator (implies --disable-rmm-pool)", + ) + cluster_args.add_argument( + "--rmm-release-threshold", + default=None, + type=parse_bytes, + help="When --enable-rmm-async is set and the pool size grows beyond this " + "value, unused memory held by the pool will be released at the next " + "synchronization point. Can be an integer (bytes), or a string string (like " + "'4GB' or '5000M'). By default, this feature is disabled.", + ) cluster_args.add_argument( "--rmm-log-directory", default=None, @@ -112,6 +131,17 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[] "This enables spilling implementations such as JIT-Unspill to provides more " "information on out-of-memory errors", ) + cluster_args.add_argument( + "--enable-rmm-track-allocations", + action="store_true", + help="When enabled, wraps the memory resource used by each worker with a " + "``rmm.mr.TrackingResourceAdaptor``, which tracks the amount of memory " + "allocated." + "NOTE: This option enables additional diagnostics to be collected and " + "reported by the Dask dashboard. However, there is significant overhead " + "associated with this and it should only be used for debugging and memory " + "profiling.", + ) cluster_args.add_argument( "--enable-tcp-over-ucx", default=None, @@ -191,6 +221,13 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[] "since the workers are assumed to be started separately. Similarly the other " "cluster configuration options have no effect.", ) + group.add_argument( + "--dashboard-address", + default=None, + type=str, + help="Address on which to listen for diagnostics dashboard, ignored if " + "either ``--scheduler-address`` or ``--scheduler-file`` is specified.", + ) cluster_args.add_argument( "--shutdown-external-cluster-on-exit", default=False, @@ -298,7 +335,11 @@ def get_cluster_options(args): cluster_kwargs = { "connect_options": {"known_hosts": None}, - "scheduler_options": {"protocol": args.protocol, "port": 8786}, + "scheduler_options": { + "protocol": args.protocol, + "port": 8786, + "dashboard_address": args.dashboard_address, + }, "worker_class": "dask_cuda.CUDAWorker", "worker_options": { "protocol": args.protocol, @@ -315,6 +356,7 @@ def get_cluster_options(args): cluster_args = [] cluster_kwargs = { "protocol": args.protocol, + "dashboard_address": args.dashboard_address, "n_workers": len(args.devs.split(",")), "threads_per_worker": args.threads_per_worker, "CUDA_VISIBLE_DEVICES": args.devs, @@ -346,34 +388,58 @@ def setup_memory_pool( dask_worker=None, pool_size=None, disable_pool=False, + rmm_async=False, + rmm_managed=False, + release_threshold=None, log_directory=None, statistics=False, + rmm_track_allocations=False, ): import cupy import rmm + from rmm.allocators.cupy import rmm_cupy_allocator from dask_cuda.utils import get_rmm_log_file_name logging = log_directory is not None - if not disable_pool: + if rmm_async: + rmm.mr.set_current_device_resource( + rmm.mr.CudaAsyncMemoryResource( + initial_pool_size=pool_size, release_threshold=release_threshold + ) + ) + else: rmm.reinitialize( - pool_allocator=True, - devices=0, + pool_allocator=not disable_pool, + managed_memory=rmm_managed, initial_pool_size=pool_size, logging=logging, log_file_name=get_rmm_log_file_name(dask_worker, logging, log_directory), ) - cupy.cuda.set_allocator(rmm.rmm_cupy_allocator) + cupy.cuda.set_allocator(rmm_cupy_allocator) if statistics: rmm.mr.set_current_device_resource( rmm.mr.StatisticsResourceAdaptor(rmm.mr.get_current_device_resource()) ) + if rmm_track_allocations: + rmm.mr.set_current_device_resource( + rmm.mr.TrackingResourceAdaptor(rmm.mr.get_current_device_resource()) + ) def setup_memory_pools( - client, is_gpu, pool_size, disable_pool, log_directory, statistics + client, + is_gpu, + pool_size, + disable_pool, + rmm_async, + rmm_managed, + release_threshold, + log_directory, + statistics, + rmm_track_allocations, ): if not is_gpu: return @@ -381,8 +447,12 @@ def setup_memory_pools( setup_memory_pool, pool_size=pool_size, disable_pool=disable_pool, + rmm_async=rmm_async, + rmm_managed=rmm_managed, + release_threshold=release_threshold, log_directory=log_directory, statistics=statistics, + rmm_track_allocations=rmm_track_allocations, ) # Create an RMM pool on the scheduler due to occasional deserialization # of CUDA objects. May cause issues with InfiniBand otherwise. @@ -390,8 +460,12 @@ def setup_memory_pools( setup_memory_pool, pool_size=1e9, disable_pool=disable_pool, + rmm_async=rmm_async, + rmm_managed=rmm_managed, + release_threshold=release_threshold, log_directory=log_directory, statistics=statistics, + rmm_track_allocations=rmm_track_allocations, ) diff --git a/dask_cuda/cli.py b/dask_cuda/cli.py index b7069d632..128da2078 100644 --- a/dask_cuda/cli.py +++ b/dask_cuda/cli.py @@ -145,6 +145,17 @@ def cuda(): incompatible with RMM pools and managed memory, trying to enable both will result in failure.""", ) +@click.option( + "--rmm-release-threshold", + default=None, + help="""When ``rmm.async`` is ``True`` and the pool size grows beyond this value, unused + memory held by the pool will be released at the next synchronization point. Can be + an integer (bytes), float (fraction of total device memory), string (like ``"5GB"`` + or ``"5000M"``) or ``None``. By default, this feature is disabled. + + .. note:: + This size is a per-worker configuration, and not cluster-wide.""", +) @click.option( "--rmm-log-directory", default=None, @@ -232,6 +243,12 @@ def cuda(): help="""Module that should be loaded by each worker process like ``"foo.bar"`` or ``"/path/to/foo.py"``.""", ) +@click.option( + "--death-timeout", + type=str, + default=None, + help="Seconds to wait for a scheduler before closing", +) @click.option( "--dashboard-prefix", type=str, @@ -312,6 +329,7 @@ def worker( rmm_maximum_pool_size, rmm_managed_memory, rmm_async, + rmm_release_threshold, rmm_log_directory, rmm_track_allocations, pid_file, @@ -383,6 +401,7 @@ def worker( rmm_maximum_pool_size, rmm_managed_memory, rmm_async, + rmm_release_threshold, rmm_log_directory, rmm_track_allocations, pid_file, diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index 03b16b529..f12ad6780 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -47,6 +47,7 @@ def __init__( rmm_maximum_pool_size=None, rmm_managed_memory=False, rmm_async=False, + rmm_release_threshold=None, rmm_log_directory=None, rmm_track_allocations=False, pid_file=None, @@ -138,12 +139,6 @@ def del_pid_file(): "For installation instructions, please see " "https://github.com/rapidsai/rmm" ) # pragma: no cover - if rmm_async: - raise ValueError( - "RMM pool and managed memory are incompatible with asynchronous " - "allocator" - ) - else: if enable_nvlink: warnings.warn( @@ -215,12 +210,13 @@ def del_pid_file(): get_cpu_affinity(nvml_device_index(i, cuda_visible_devices(i))) ), RMMSetup( - rmm_pool_size, - rmm_maximum_pool_size, - rmm_managed_memory, - rmm_async, - rmm_log_directory, - rmm_track_allocations, + initial_pool_size=rmm_pool_size, + maximum_pool_size=rmm_maximum_pool_size, + managed_memory=rmm_managed_memory, + async_alloc=rmm_async, + release_threshold=rmm_release_threshold, + log_directory=rmm_log_directory, + track_allocations=rmm_track_allocations, ), PreImport(pre_import), }, diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 4b240d2f1..a444fce0b 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -9,7 +9,10 @@ from typing import Any, Callable, Dict, List, Optional, Set, TypeVar import dask +import dask.config import dask.dataframe +import dask.utils +import distributed.worker from dask.base import tokenize from dask.dataframe.core import DataFrame, Series, _concat as dd_concat, new_dd_object from dask.dataframe.shuffle import group_split_dispatch, hash_object_dispatch @@ -467,7 +470,7 @@ def shuffle( # Step (a): df = df.persist() # Make sure optimizations are apply on the existing graph - wait(df) # Make sure all keys has been materialized on workers + wait([df]) # Make sure all keys has been materialized on workers name = ( "explicit-comms-shuffle-" f"{tokenize(df, column_names, npartitions, ignore_index)}" @@ -534,7 +537,7 @@ def shuffle( # Create a distributed Dataframe from all the pieces divs = [None] * (len(dsk) + 1) ret = new_dd_object(dsk, name, df_meta, divs).persist() - wait(ret) + wait([ret]) # Release all temporary dataframes for fut in [*shuffle_result.values(), *dsk.values()]: @@ -542,7 +545,20 @@ def shuffle( return ret -def get_rearrange_by_column_tasks_wrapper(func): +def _use_explicit_comms() -> bool: + """Is explicit-comms and available?""" + if dask.config.get("explicit-comms", False): + try: + # Make sure we have an activate client. + distributed.worker.get_client() + except (ImportError, ValueError): + pass + else: + return True + return False + + +def get_rearrange_by_column_wrapper(func): """Returns a function wrapper that dispatch the shuffle to explicit-comms. Notice, this is monkey patched into Dask at dask_cuda import @@ -552,23 +568,30 @@ def get_rearrange_by_column_tasks_wrapper(func): @functools.wraps(func) def wrapper(*args, **kwargs): - if dask.config.get("explicit-comms", False): - try: - import distributed.worker - - # Make sure we have an activate client. - distributed.worker.get_client() - except (ImportError, ValueError): - pass - else: - # Convert `*args, **kwargs` to a dict of `keyword -> values` - kw = func_sig.bind(*args, **kwargs) - kw.apply_defaults() - kw = kw.arguments - column = kw["column"] - if isinstance(column, str): - column = [column] - return shuffle(kw["df"], column, kw["npartitions"], kw["ignore_index"]) + if _use_explicit_comms(): + # Convert `*args, **kwargs` to a dict of `keyword -> values` + kw = func_sig.bind(*args, **kwargs) + kw.apply_defaults() + kw = kw.arguments + # Notice, we only overwrite the default and the "tasks" shuffle + # algorithm. The "disk" and "p2p" algorithm, we don't touch. + if kw["shuffle"] in ("tasks", None): + col = kw["col"] + if isinstance(col, str): + col = [col] + return shuffle(kw["df"], col, kw["npartitions"], kw["ignore_index"]) return func(*args, **kwargs) return wrapper + + +def get_default_shuffle_algorithm() -> str: + """Return the default shuffle algorithm used by Dask + + This changes the default shuffle algorithm from "p2p" to "tasks" + when explicit comms is enabled. + """ + ret = dask.config.get("dataframe.shuffle.algorithm", None) + if ret is None and _use_explicit_comms(): + return "tasks" + return dask.utils.get_default_shuffle_algorithm() diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index fa532b5f0..656f6140d 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -131,6 +131,14 @@ class LocalCUDACluster(LocalCluster): The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also incompatible with RMM pools and managed memory. Trying to enable both will result in an exception. + rmm_release_threshold: int, str or None, default None + When ``rmm.async is True`` and the pool size grows beyond this value, unused + memory held by the pool will be released at the next synchronization point. + Can be an integer (bytes), float (fraction of total device memory), string (like + ``"5GB"`` or ``"5000M"``) or ``None``. By default, this feature is disabled. + + .. note:: + This size is a per-worker configuration, and not cluster-wide. rmm_log_directory : str or None, default None Directory to write per-worker RMM log files to. The client and scheduler are not logged here. Can be a string (like ``"/path/to/logs/"``) or ``None`` to @@ -178,8 +186,12 @@ class LocalCUDACluster(LocalCluster): TypeError If InfiniBand or NVLink are enabled and ``protocol!="ucx"``. ValueError - If NVLink and RMM managed memory are both enabled, or if RMM pools / managed - memory and asynchronous allocator are both enabled. + If RMM pool, RMM managed memory or RMM async allocator are requested but RMM + cannot be imported. + If RMM managed memory and asynchronous allocator are both enabled. + If RMM maximum pool size is set but RMM pool size is not. + If RMM maximum pool size is set but RMM async allocator is used. + If RMM release threshold is set but the RMM async allocator is not being used. See Also -------- @@ -205,6 +217,7 @@ def __init__( rmm_maximum_pool_size=None, rmm_managed_memory=False, rmm_async=False, + rmm_release_threshold=None, rmm_log_directory=None, rmm_track_allocations=False, jit_unspill=None, @@ -247,7 +260,8 @@ def __init__( self.rmm_maximum_pool_size = rmm_maximum_pool_size self.rmm_managed_memory = rmm_managed_memory self.rmm_async = rmm_async - if rmm_pool_size is not None or rmm_managed_memory: + self.rmm_release_threshold = rmm_release_threshold + if rmm_pool_size is not None or rmm_managed_memory or rmm_async: try: import rmm # noqa F401 except ImportError: @@ -256,11 +270,6 @@ def __init__( "is not available. For installation instructions, please " "see https://github.com/rapidsai/rmm" ) # pragma: no cover - if rmm_async: - raise ValueError( - "RMM pool and managed memory are incompatible with asynchronous " - "allocator" - ) else: if enable_nvlink: warnings.warn( @@ -385,12 +394,13 @@ def new_worker_spec(self): get_cpu_affinity(nvml_device_index(0, visible_devices)) ), RMMSetup( - self.rmm_pool_size, - self.rmm_maximum_pool_size, - self.rmm_managed_memory, - self.rmm_async, - self.rmm_log_directory, - self.rmm_track_allocations, + initial_pool_size=self.rmm_pool_size, + maximum_pool_size=self.rmm_maximum_pool_size, + managed_memory=self.rmm_managed_memory, + async_alloc=self.rmm_async, + release_threshold=self.rmm_release_threshold, + log_directory=self.rmm_log_directory, + track_allocations=self.rmm_track_allocations, ), PreImport(self.pre_import), }, diff --git a/dask_cuda/proxy_object.py b/dask_cuda/proxy_object.py index 21dc15ea1..2f9c774dc 100644 --- a/dask_cuda/proxy_object.py +++ b/dask_cuda/proxy_object.py @@ -837,7 +837,10 @@ def obj_pxy_dask_serialize(obj: ProxyObject): header, frames = pxy.serialize(serializers=("dask", "pickle")) obj._pxy_set(pxy) - return {"proxied-header": header, "obj-pxy-detail": pxy.get_init_args()}, frames + return { + "proxied-header": header, + "obj-pxy-detail": pickle.dumps(pxy.get_init_args()), + }, frames @distributed.protocol.cuda.cuda_serialize.register(ProxyObject) @@ -860,7 +863,10 @@ def obj_pxy_cuda_serialize(obj: ProxyObject): # the worker's data store. header, frames = pxy.serialize(serializers=("cuda",)) - return {"proxied-header": header, "obj-pxy-detail": pxy.get_init_args()}, frames + return { + "proxied-header": header, + "obj-pxy-detail": pickle.dumps(pxy.get_init_args()), + }, frames @distributed.protocol.dask_deserialize.register(ProxyObject) @@ -872,7 +878,7 @@ def obj_pxy_dask_deserialize(header, frames): deserialized using the same serializers that were used when the object was serialized. """ - args = header["obj-pxy-detail"] + args = pickle.loads(header["obj-pxy-detail"]) if args["subclass"] is None: subclass = ProxyObject else: diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index 64950e2b6..7a6207c06 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -131,6 +131,10 @@ def test_rmm_async(loop): # noqa: F811 "--host", "127.0.0.1", "--rmm-async", + "--rmm-pool-size", + "2 GB", + "--rmm-release-threshold", + "3 GB", "--no-dashboard", ] ): @@ -143,6 +147,11 @@ def test_rmm_async(loop): # noqa: F811 for v in memory_resource_type.values(): assert v is rmm.mr.CudaAsyncMemoryResource + ret = get_cluster_configuration(client) + wait(ret) + assert ret["[plugin] RMMSetup"]["initial_pool_size"] == 2000000000 + assert ret["[plugin] RMMSetup"]["release_threshold"] == 3000000000 + def test_rmm_logging(loop): # noqa: F811 rmm = pytest.importorskip("rmm") @@ -422,3 +431,24 @@ def test_worker_fraction_limits(loop): # noqa: F811 ret["[plugin] RMMSetup"]["maximum_pool_size"] == (device_total_memory * 0.3) // 256 * 256 ) + + +@patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0"}) +def test_worker_timeout(): + ret = subprocess.run( + [ + "dask", + "cuda", + "worker", + "192.168.1.100:7777", + "--death-timeout", + "1", + ], + text=True, + encoding="utf8", + capture_output=True, + ) + + assert "closing nanny at" in ret.stderr.lower() + assert "reason: nanny-close" in ret.stderr.lower() + assert ret.returncode == 0 diff --git a/dask_cuda/tests/test_device_host_file.py b/dask_cuda/tests/test_device_host_file.py index 59e066470..4a4807941 100644 --- a/dask_cuda/tests/test_device_host_file.py +++ b/dask_cuda/tests/test_device_host_file.py @@ -10,7 +10,6 @@ serialize, serialize_bytelist, ) -from distributed.protocol.pickle import HIGHEST_PROTOCOL from dask_cuda.device_host_file import DeviceHostFile, device_to_host, host_to_device @@ -189,10 +188,7 @@ def test_serialize_cupy_collection(collection, length, value): header, frames = serialize(obj, serializers=["pickle"], on_error="raise") - if HIGHEST_PROTOCOL >= 5: - assert len(frames) == (1 + len(obj.frames)) - else: - assert len(frames) == 1 + assert len(frames) == (1 + len(obj.frames)) obj2 = deserialize(header, frames) res = host_to_device(obj2) diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index 413bf5bdd..d1024ff69 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -11,7 +11,7 @@ from dask import dataframe as dd from dask.dataframe.shuffle import partitioning_index from dask.dataframe.utils import assert_eq -from distributed import Client, get_worker +from distributed import Client from distributed.deploy.local import LocalCluster import dask_cuda @@ -183,10 +183,10 @@ def check_shuffle(): name = "explicit-comms-shuffle" ddf = dd.from_pandas(pd.DataFrame({"key": np.arange(10)}), npartitions=2) with dask.config.set(explicit_comms=False): - res = ddf.shuffle(on="key", npartitions=4, shuffle="tasks") + res = ddf.shuffle(on="key", npartitions=4) assert all(name not in str(key) for key in res.dask) with dask.config.set(explicit_comms=True): - res = ddf.shuffle(on="key", npartitions=4, shuffle="tasks") + res = ddf.shuffle(on="key", npartitions=4) if in_cluster: assert any(name in str(key) for key in res.dask) else: # If not in cluster, we cannot use explicit comms @@ -200,7 +200,7 @@ def check_shuffle(): ): dask.config.refresh() # Trigger re-read of the environment variables with pytest.raises(ValueError, match="explicit-comms-batchsize"): - ddf.shuffle(on="key", npartitions=4, shuffle="tasks") + ddf.shuffle(on="key", npartitions=4) if in_cluster: with LocalCluster( @@ -314,8 +314,8 @@ def test_jit_unspill(protocol): def _test_lock_workers(scheduler_address, ranks): - async def f(_): - worker = get_worker() + async def f(info): + worker = info["worker"] if hasattr(worker, "running"): assert not worker.running worker.running = True diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index b0ac88234..f2e48783c 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -9,7 +9,6 @@ from dask.distributed import Client from distributed.system import MEMORY_LIMIT from distributed.utils_test import gen_test, raises_with_cause -from distributed.worker import get_worker from dask_cuda import CUDAWorker, LocalCUDACluster, utils from dask_cuda.initialize import initialize @@ -140,7 +139,9 @@ async def test_no_memory_limits_cluster(): ) as cluster: async with Client(cluster, asynchronous=True) as client: # Check that all workers use a regular dict as their "data store". - res = await client.run(lambda: isinstance(get_worker().data, dict)) + res = await client.run( + lambda dask_worker: isinstance(dask_worker.data, dict) + ) assert all(res.values()) @@ -161,7 +162,9 @@ async def test_no_memory_limits_cudaworker(): await new_worker await client.wait_for_workers(2) # Check that all workers use a regular dict as their "data store". - res = await client.run(lambda: isinstance(get_worker().data, dict)) + res = await client.run( + lambda dask_worker: isinstance(dask_worker.data, dict) + ) assert all(res.values()) await new_worker.close() @@ -231,6 +234,8 @@ async def test_rmm_async(): async with LocalCUDACluster( rmm_async=True, + rmm_pool_size="2GB", + rmm_release_threshold="3GB", asynchronous=True, ) as cluster: async with Client(cluster, asynchronous=True) as client: @@ -240,6 +245,10 @@ async def test_rmm_async(): for v in memory_resource_type.values(): assert v is rmm.mr.CudaAsyncMemoryResource + ret = await get_cluster_configuration(client) + assert ret["[plugin] RMMSetup"]["initial_pool_size"] == 2000000000 + assert ret["[plugin] RMMSetup"]["release_threshold"] == 3000000000 + @gen_test(timeout=20) async def test_rmm_logging(): @@ -434,3 +443,13 @@ def test_print_cluster_config(capsys): assert "ucx" in captured.out assert "1 B" in captured.out assert "[plugin]" in captured.out + + +def test_death_timeout_raises(): + with pytest.raises(asyncio.exceptions.TimeoutError): + with LocalCUDACluster( + silence_logs=False, + death_timeout=1e-10, + dashboard_address=":0", + ): + pass diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py index 41399d673..50b2c51a5 100644 --- a/dask_cuda/tests/test_proxify_host_file.py +++ b/dask_cuda/tests/test_proxify_host_file.py @@ -12,7 +12,6 @@ from dask.utils import format_bytes from distributed import Client from distributed.utils_test import gen_test -from distributed.worker import get_worker import dask_cuda import dask_cuda.proxify_device_objects @@ -429,9 +428,9 @@ async def test_worker_force_spill_to_disk(): ddf = dask.dataframe.from_pandas(df, npartitions=1).persist() await ddf - async def f(): + async def f(dask_worker): """Trigger a memory_monitor() and reset memory_limit""" - w = get_worker() + w = dask_worker # Set a host memory limit that triggers spilling to disk w.memory_manager.memory_pause_fraction = False memory = w.monitor.proc.memory_info().rss @@ -443,7 +442,7 @@ async def f(): assert w.monitor.proc.memory_info().rss < memory - 10**7 w.memory_manager.memory_limit = memory * 10 # Un-limit - await client.submit(f) + client.run(f) log = str(await client.get_worker_logs()) # Check that the worker doesn't complain about unmanaged memory assert "Unmanaged memory use is high" not in log diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py index f93b83ec7..bbd24d5ad 100644 --- a/dask_cuda/tests/test_spill.py +++ b/dask_cuda/tests/test_spill.py @@ -6,7 +6,7 @@ import dask from dask import array as da -from distributed import Client, get_worker, wait +from distributed import Client, wait from distributed.metrics import time from distributed.sizeof import sizeof from distributed.utils_test import gen_cluster, gen_test, loop # noqa: F401 @@ -57,21 +57,25 @@ def assert_device_host_file_size( ) -def worker_assert(total_size, device_chunk_overhead, serialized_chunk_overhead): +def worker_assert( + dask_worker, total_size, device_chunk_overhead, serialized_chunk_overhead +): assert_device_host_file_size( - get_worker().data, total_size, device_chunk_overhead, serialized_chunk_overhead + dask_worker.data, total_size, device_chunk_overhead, serialized_chunk_overhead ) -def delayed_worker_assert(total_size, device_chunk_overhead, serialized_chunk_overhead): +def delayed_worker_assert( + dask_worker, total_size, device_chunk_overhead, serialized_chunk_overhead +): start = time() while not device_host_file_size_matches( - get_worker().data, total_size, device_chunk_overhead, serialized_chunk_overhead + dask_worker.data, total_size, device_chunk_overhead, serialized_chunk_overhead ): sleep(0.01) if time() < start + 3: assert_device_host_file_size( - get_worker().data, + dask_worker.data, total_size, device_chunk_overhead, serialized_chunk_overhead, @@ -143,17 +147,23 @@ async def test_cupy_cluster_device_spill(params): await wait(xx) # Allow up to 1024 bytes overhead per chunk serialized - await client.run(worker_assert, x.nbytes, 1024, 1024) + await client.run( + lambda dask_worker: worker_assert(dask_worker, x.nbytes, 1024, 1024) + ) y = client.compute(x.sum()) res = await y assert (abs(res / x.size) - 0.5) < 1e-3 - await client.run(worker_assert, x.nbytes, 1024, 1024) - host_chunks = await client.run(lambda: len(get_worker().data.host)) + await client.run( + lambda dask_worker: worker_assert(dask_worker, x.nbytes, 1024, 1024) + ) + host_chunks = await client.run( + lambda dask_worker: len(dask_worker.data.host) + ) disk_chunks = await client.run( - lambda: len(get_worker().data.disk or list()) + lambda dask_worker: len(dask_worker.data.disk or list()) ) for hc, dc in zip(host_chunks.values(), disk_chunks.values()): if params["spills_to_disk"]: @@ -245,9 +255,11 @@ async def test_cudf_cluster_device_spill(params): del cdf - host_chunks = await client.run(lambda: len(get_worker().data.host)) + host_chunks = await client.run( + lambda dask_worker: len(dask_worker.data.host) + ) disk_chunks = await client.run( - lambda: len(get_worker().data.disk or list()) + lambda dask_worker: len(dask_worker.data.disk or list()) ) for hc, dc in zip(host_chunks.values(), disk_chunks.values()): if params["spills_to_disk"]: @@ -256,8 +268,12 @@ async def test_cudf_cluster_device_spill(params): assert hc > 0 assert dc == 0 - await client.run(worker_assert, nbytes, 32, 2048) + await client.run( + lambda dask_worker: worker_assert(dask_worker, nbytes, 32, 2048) + ) del cdf2 - await client.run(delayed_worker_assert, 0, 0, 0) + await client.run( + lambda dask_worker: delayed_worker_assert(dask_worker, 0, 0, 0) + ) diff --git a/dask_cuda/tests/test_utils.py b/dask_cuda/tests/test_utils.py index ca17c097c..34e63f1b4 100644 --- a/dask_cuda/tests/test_utils.py +++ b/dask_cuda/tests/test_utils.py @@ -189,13 +189,16 @@ def test_parse_visible_devices(): uuids = [] for index in range(get_gpu_count()): handle = pynvml.nvmlDeviceGetHandleByIndex(index) - uuid = pynvml.nvmlDeviceGetUUID(handle).decode("utf-8") + try: + uuid = pynvml.nvmlDeviceGetUUID(handle).decode("utf-8") + except AttributeError: + uuid = pynvml.nvmlDeviceGetUUID(handle) assert parse_cuda_visible_device(index) == index assert parse_cuda_visible_device(uuid) == uuid indices.append(str(index)) - uuids.append(pynvml.nvmlDeviceGetUUID(handle).decode("utf-8")) + uuids.append(uuid) index_devices = ",".join(indices) with patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": index_devices}): diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index 1a24d80b0..468c37f47 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -46,6 +46,7 @@ def __init__( maximum_pool_size, managed_memory, async_alloc, + release_threshold, log_directory, track_allocations, ): @@ -54,20 +55,46 @@ def __init__( "`rmm_maximum_pool_size` was specified without specifying " "`rmm_pool_size`.`rmm_pool_size` must be specified to use RMM pool." ) + if async_alloc is True and managed_memory is True: + raise ValueError( + "`rmm_managed_memory` is incompatible with the `rmm_async`." + ) + if async_alloc is True and maximum_pool_size is not None: + raise ValueError( + "`rmm_maximum_pool_size` is incompatible with the `rmm_async`." + ) + if async_alloc is False and release_threshold is not None: + raise ValueError("`rmm_release_threshold` requires `rmm_async`.") self.initial_pool_size = initial_pool_size self.maximum_pool_size = maximum_pool_size self.managed_memory = managed_memory self.async_alloc = async_alloc + self.release_threshold = release_threshold self.logging = log_directory is not None self.log_directory = log_directory self.rmm_track_allocations = track_allocations def setup(self, worker=None): + if self.initial_pool_size is not None: + self.initial_pool_size = parse_device_memory_limit( + self.initial_pool_size, alignment_size=256 + ) + if self.async_alloc: import rmm - rmm.mr.set_current_device_resource(rmm.mr.CudaAsyncMemoryResource()) + if self.release_threshold is not None: + self.release_threshold = parse_device_memory_limit( + self.release_threshold, alignment_size=256 + ) + + rmm.mr.set_current_device_resource( + rmm.mr.CudaAsyncMemoryResource( + initial_pool_size=self.initial_pool_size, + release_threshold=self.release_threshold, + ) + ) if self.logging: rmm.enable_logging( log_file_name=get_rmm_log_file_name( @@ -80,9 +107,6 @@ def setup(self, worker=None): pool_allocator = False if self.initial_pool_size is None else True if self.initial_pool_size is not None: - self.initial_pool_size = parse_device_memory_limit( - self.initial_pool_size, alignment_size=256 - ) if self.maximum_pool_size is not None: self.maximum_pool_size = parse_device_memory_limit( self.maximum_pool_size, alignment_size=256 @@ -676,7 +700,10 @@ def get_gpu_uuid_from_index(device_index=0): pynvml.nvmlInit() handle = pynvml.nvmlDeviceGetHandleByIndex(device_index) - return pynvml.nvmlDeviceGetUUID(handle).decode("utf-8") + try: + return pynvml.nvmlDeviceGetUUID(handle).decode("utf-8") + except AttributeError: + return pynvml.nvmlDeviceGetUUID(handle) def get_worker_config(dask_worker): diff --git a/dependencies.yaml b/dependencies.yaml index f2fc93652..b484afb5b 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -95,24 +95,27 @@ dependencies: common: - output_types: [conda, requirements] packages: - - dask==2023.1.1 - - distributed==2023.1.1 + - dask==2023.3.2 + - distributed==2023.3.2.1 - numba>=0.54 - - numpy>=1.18.0 - - pandas>=1.0 + - numpy>=1.21 + - pandas>=1.3,<1.6.0dev0 - pynvml>=11.0.0,<11.5 - zict>=0.1.3 + - output_types: [conda] + packages: + - dask-core==2023.3.2 test_python: common: - output_types: [conda] packages: - - cucim=23.02 - - cudf=23.02 - - dask-cudf=23.02 + - cucim=23.04 + - cudf=23.04 + - dask-cudf=23.04 - pytest - pytest-cov - ucx-proc=*=gpu - - ucx-py=0.30 + - ucx-py=0.31 specific: - output_types: conda matrices: diff --git a/pyproject.toml b/pyproject.toml index 5dafea823..6377693bd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,12 +19,12 @@ authors = [ license = { text = "Apache-2.0" } requires-python = ">=3.8" dependencies = [ - "dask ==2023.1.1", - "distributed ==2023.1.1", + "dask ==2023.3.2", + "distributed ==2023.3.2.1", "pynvml >=11.0.0,<11.5", - "numpy >=1.18.0", + "numpy >=1.21", "numba >=0.54", - "pandas >=1.0", + "pandas >=1.3,<1.6.0dev0", "zict >=0.1.3", ] classifiers = [