Skip to content

Commit

Permalink
Merge pull request #749 from rapidsai/branch-21.10
Browse files Browse the repository at this point in the history
[RELEASE] dask-cuda v21.10
  • Loading branch information
ajschmidt8 authored Oct 6, 2021
2 parents 1287a15 + bd00b47 commit 0bcf9dc
Show file tree
Hide file tree
Showing 35 changed files with 1,616 additions and 627 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/timothycrosley/isort
rev: 5.0.7
- repo: https://github.com/pycqa/isort
rev: 5.6.4
hooks:
- id: isort
- repo: https://github.com/ambv/black
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# dask-cuda 21.10.00 (Date TBD)

Please see https://github.com/rapidsai/dask-cuda/releases/tag/v21.10.00a for the latest changes to this development branch.

# dask-cuda 21.08.00 (4 Aug 2021)

## 🐛 Bug Fixes
Expand Down
29 changes: 6 additions & 23 deletions ci/gpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,18 @@ conda list --show-channel-urls

# Fixing Numpy version to avoid RuntimeWarning: numpy.ufunc size changed, may
# indicate binary incompatibility. Expected 192 from C header, got 216 from PyObject
gpuci_conda_retry install "cudatoolkit=$CUDA_REL" \
gpuci_mamba_retry install "cudatoolkit=$CUDA_REL" \
"cudf=${MINOR_VERSION}" "dask-cudf=${MINOR_VERSION}" \
"ucx-py=0.21.*" "ucx-proc=*=gpu" \
"ucx-py=0.22.*" "ucx-proc=*=gpu" \
"rapids-build-env=$MINOR_VERSION.*"

# Pin pytest-asyncio because latest versions modify the default asyncio
# `event_loop_policy`. See https://github.com/dask/distributed/pull/4212 .
gpuci_conda_retry install "pytest-asyncio=<0.14.0"
gpuci_mamba_retry install "pytest-asyncio=<0.14.0"

# https://docs.rapids.ai/maintainers/depmgmt/
# gpuci_conda_retry remove -f rapids-build-env
# gpuci_conda_retry install "your-pkg=1.0.0"
# gpuci_mamba_retry remove -f rapids-build-env
# gpuci_mamba_retry install "your-pkg=1.0.0"


conda info
Expand Down Expand Up @@ -106,24 +106,7 @@ else
gpuci_logger "Python pytest for dask-cuda"
cd "$WORKSPACE"
ls dask_cuda/tests/
UCXPY_IFNAME=eth0 UCX_WARN_UNUSED_ENV_VARS=n UCX_MEMTYPE_CACHE=n pytest -vs -Werror::DeprecationWarning -Werror::FutureWarning --cache-clear --basetemp="$WORKSPACE/dask-cuda-tmp" --junitxml="$WORKSPACE/junit-dask-cuda.xml" --cov-config=.coveragerc --cov=dask_cuda --cov-report=xml:"$WORKSPACE/dask-cuda-coverage.xml" --cov-report term dask_cuda/tests/

gpuci_logger "Running dask.distributed GPU tests"
# Test downstream packages, which requires Python v3.7
if [ $(python -c "import sys; print(sys.version_info[1])") -ge "7" ]; then
# Clone Distributed to avoid pytest cleanup fixture errors
# See https://github.com/dask/distributed/issues/4902
gpuci_logger "Clone Distributed"
git clone https://github.com/dask/distributed

gpuci_logger "Run Distributed Tests"
pytest --cache-clear -vs -Werror::DeprecationWarning -Werror::FutureWarning distributed/distributed/protocol/tests/test_cupy.py
pytest --cache-clear -vs -Werror::DeprecationWarning -Werror::FutureWarning distributed/distributed/protocol/tests/test_numba.py
pytest --cache-clear -vs -Werror::DeprecationWarning -Werror::FutureWarning distributed/distributed/protocol/tests/test_rmm.py
pytest --cache-clear -vs -Werror::DeprecationWarning -Werror::FutureWarning distributed/distributed/protocol/tests/test_collection_cuda.py
pytest --cache-clear -vs -Werror::DeprecationWarning -Werror::FutureWarning distributed/distributed/tests/test_nanny.py
pytest --cache-clear -vs -Werror::DeprecationWarning -Werror::FutureWarning distributed/distributed/diagnostics/tests/test_nvml.py
fi
DASK_CUDA_TEST_SINGLE_GPU=1 UCXPY_IFNAME=eth0 UCX_WARN_UNUSED_ENV_VARS=n UCX_MEMTYPE_CACHE=n pytest -vs -Werror::DeprecationWarning -Werror::FutureWarning --cache-clear --basetemp="$WORKSPACE/dask-cuda-tmp" --junitxml="$WORKSPACE/junit-dask-cuda.xml" --cov-config=.coveragerc --cov=dask_cuda --cov-report=xml:"$WORKSPACE/dask-cuda-coverage.xml" --cov-report term dask_cuda/tests/

logger "Run local benchmark..."
python dask_cuda/benchmarks/local_cudf_shuffle.py --partition-size="1 KiB" -d 0 --runs 1 --backend dask
Expand Down
4 changes: 2 additions & 2 deletions conda/recipes/dask-cuda/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ requirements:
- setuptools
run:
- python
- dask >=2.22.0,<=2021.07.1
- distributed >=2.22.0,<=2021.07.1
- dask=2021.09.1
- distributed=2021.09.1
- pynvml >=8.0.3
- numpy >=1.16.0
- numba >=0.53.1
Expand Down
50 changes: 47 additions & 3 deletions dask_cuda/benchmarks/local_cudf_merge.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import contextlib
import math
from collections import defaultdict
from json import dumps
from time import perf_counter
from warnings import filterwarnings

Expand Down Expand Up @@ -278,6 +279,8 @@ def main(args):
print(f"broadcast | {broadcast}")
print(f"protocol | {args.protocol}")
print(f"device(s) | {args.devs}")
if args.device_memory_limit:
print(f"memory-limit | {format_bytes(args.device_memory_limit)}")
print(f"rmm-pool | {(not args.disable_rmm_pool)}")
print(f"frac-match | {args.frac_match}")
if args.protocol == "ucx":
Expand All @@ -304,18 +307,59 @@ def main(args):
if args.backend == "dask":
if args.markdown:
print("<details>\n<summary>Worker-Worker Transfer Rates</summary>\n\n```")
print("(w1,w2) | 25% 50% 75% (total nbytes)")
print("(w1,w2) | 25% 50% 75% (total nbytes)")
print("-------------------------------")
for (d1, d2), bw in sorted(bandwidths.items()):
fmt = (
"(%s,%s) | %s %s %s (%s)"
"(%s,%s) | %s %s %s (%s)"
if args.multi_node or args.sched_addr
else "(%02d,%02d) | %s %s %s (%s)"
else "(%02d,%02d) | %s %s %s (%s)"
)
print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]))
if args.markdown:
print("```\n</details>\n")

if args.benchmark_json:
bandwidths_json = {
"bandwidth_({d1},{d2})_{i}"
if args.multi_node or args.sched_addr
else "(%02d,%02d)_%s" % (d1, d2, i): parse_bytes(v.rstrip("/s"))
for (d1, d2), bw in sorted(bandwidths.items())
for i, v in zip(
["25%", "50%", "75%", "total_nbytes"],
[bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]],
)
}

with open(args.benchmark_json, "a") as fp:
for data_processed, took in took_list:
fp.write(
dumps(
dict(
{
"backend": args.backend,
"merge_type": args.type,
"rows_per_chunk": args.chunk_size,
"base_chunks": args.base_chunks,
"other_chunks": args.other_chunks,
"broadcast": broadcast,
"protocol": args.protocol,
"devs": args.devs,
"device_memory_limit": args.device_memory_limit,
"rmm_pool": not args.disable_rmm_pool,
"tcp": args.enable_tcp_over_ucx,
"ib": args.enable_infiniband,
"nvlink": args.enable_nvlink,
"data_processed": data_processed,
"wall_clock": took,
"throughput": data_processed / took,
},
**bandwidths_json,
)
)
+ "\n"
)

if args.multi_node:
client.shutdown()
client.close()
Expand Down
47 changes: 44 additions & 3 deletions dask_cuda/benchmarks/local_cudf_shuffle.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import contextlib
from collections import defaultdict
from json import dumps
from time import perf_counter as clock
from warnings import filterwarnings

Expand Down Expand Up @@ -151,6 +152,8 @@ def main(args):
print(f"in-parts | {args.in_parts}")
print(f"protocol | {args.protocol}")
print(f"device(s) | {args.devs}")
if args.device_memory_limit:
print(f"memory-limit | {format_bytes(args.device_memory_limit)}")
print(f"rmm-pool | {(not args.disable_rmm_pool)}")
if args.protocol == "ucx":
print(f"tcp | {args.enable_tcp_over_ucx}")
Expand All @@ -176,18 +179,56 @@ def main(args):
if args.backend == "dask":
if args.markdown:
print("<details>\n<summary>Worker-Worker Transfer Rates</summary>\n\n```")
print("(w1,w2) | 25% 50% 75% (total nbytes)")
print("(w1,w2) | 25% 50% 75% (total nbytes)")
print("-------------------------------")
for (d1, d2), bw in sorted(bandwidths.items()):
fmt = (
"(%s,%s) | %s %s %s (%s)"
"(%s,%s) | %s %s %s (%s)"
if args.multi_node or args.sched_addr
else "(%02d,%02d) | %s %s %s (%s)"
else "(%02d,%02d) | %s %s %s (%s)"
)
print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]))
if args.markdown:
print("```\n</details>\n")

if args.benchmark_json:
bandwidths_json = {
"bandwidth_({d1},{d2})_{i}"
if args.multi_node or args.sched_addr
else "(%02d,%02d)_%s" % (d1, d2, i): parse_bytes(v.rstrip("/s"))
for (d1, d2), bw in sorted(bandwidths.items())
for i, v in zip(
["25%", "50%", "75%", "total_nbytes"],
[bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]],
)
}

with open(args.benchmark_json, "a") as fp:
for data_processed, took in took_list:
fp.write(
dumps(
dict(
{
"backend": args.backend,
"partition_size": args.partition_size,
"in_parts": args.in_parts,
"protocol": args.protocol,
"devs": args.devs,
"device_memory_limit": args.device_memory_limit,
"rmm_pool": not args.disable_rmm_pool,
"tcp": args.enable_tcp_over_ucx,
"ib": args.enable_infiniband,
"nvlink": args.enable_nvlink,
"data_processed": data_processed,
"wall_clock": took,
"throughput": data_processed / took,
},
**bandwidths_json,
)
)
+ "\n"
)

if args.multi_node:
client.shutdown()
client.close()
Expand Down
79 changes: 42 additions & 37 deletions dask_cuda/benchmarks/local_cupy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from collections import defaultdict
from json import dump
from json import dumps
from time import perf_counter as clock
from warnings import filterwarnings

Expand Down Expand Up @@ -246,6 +246,8 @@ async def run(args):
print(f"Ignore-size | {format_bytes(args.ignore_size)}")
print(f"Protocol | {args.protocol}")
print(f"Device(s) | {args.devs}")
if args.device_memory_limit:
print(f"Memory limit | {format_bytes(args.device_memory_limit)}")
print(f"Worker Thread(s) | {args.threads_per_worker}")
print("==========================")
print("Wall-clock | npartitions")
Expand All @@ -266,37 +268,46 @@ async def run(args):
print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]))

if args.benchmark_json:

d = {
"operation": args.operation,
"size": args.size,
"second_size": args.second_size,
"chunk_size": args.chunk_size,
"compute_size": size,
"compute_chunk_size": chunksize,
"ignore_size": format_bytes(args.ignore_size),
"protocol": args.protocol,
"devs": args.devs,
"threads_per_worker": args.threads_per_worker,
"times": [
{"wall_clock": took, "npartitions": npartitions}
for (took, npartitions) in took_list
],
"bandwidths": {
f"({d1},{d2})"
if args.multi_node or args.sched_addr
else "(%02d,%02d)"
% (d1, d2): {
"25%": bw[0],
"50%": bw[1],
"75%": bw[2],
"total_nbytes": total_nbytes[(d1, d2)],
}
for (d1, d2), bw in sorted(bandwidths.items())
},
bandwidths_json = {
"bandwidth_({d1},{d2})_{i}"
if args.multi_node or args.sched_addr
else "(%02d,%02d)_%s" % (d1, d2, i): parse_bytes(v.rstrip("/s"))
for (d1, d2), bw in sorted(bandwidths.items())
for i, v in zip(
["25%", "50%", "75%", "total_nbytes"],
[bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]],
)
}
with open(args.benchmark_json, "w") as fp:
dump(d, fp, indent=2)

with open(args.benchmark_json, "a") as fp:
for took, npartitions in took_list:
fp.write(
dumps(
dict(
{
"operation": args.operation,
"user_size": args.size,
"user_second_size": args.second_size,
"user_chunk_size": args.chunk_size,
"compute_size": size,
"compute_chunk_size": chunksize,
"ignore_size": args.ignore_size,
"protocol": args.protocol,
"devs": args.devs,
"device_memory_limit": args.device_memory_limit,
"worker_threads": args.threads_per_worker,
"rmm_pool": not args.disable_rmm_pool,
"tcp": args.enable_tcp_over_ucx,
"ib": args.enable_infiniband,
"nvlink": args.enable_nvlink,
"wall_clock": took,
"npartitions": npartitions,
},
**bandwidths_json,
)
)
+ "\n"
)

# An SSHCluster will not automatically shut down, we have to
# ensure it does.
Expand Down Expand Up @@ -353,12 +364,6 @@ def parse_args():
"type": int,
"help": "Number of runs (default 3).",
},
{
"name": "--benchmark-json",
"default": None,
"type": str,
"help": "Dump a JSON report of benchmarks (optional).",
},
]

return parse_benchmark_args(
Expand Down
Loading

0 comments on commit 0bcf9dc

Please sign in to comment.