diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index bb65c1e9d..6f78f70be 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -28,7 +28,7 @@ concurrency: jobs: conda-python-build: secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@cuda-120-arm + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.12 with: build_type: ${{ inputs.build_type || 'branch' }} branch: ${{ inputs.branch }} @@ -38,7 +38,7 @@ jobs: if: github.ref_type == 'branch' needs: [conda-python-build] secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@cuda-120-arm + uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.12 with: arch: "amd64" branch: ${{ inputs.branch }} @@ -51,7 +51,7 @@ jobs: upload-conda: needs: [conda-python-build] secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-upload-packages.yaml@cuda-120-arm + uses: rapidsai/shared-action-workflows/.github/workflows/conda-upload-packages.yaml@branch-23.12 with: build_type: ${{ inputs.build_type || 'branch' }} branch: ${{ inputs.branch }} diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 0cbf82387..0dac577db 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -18,26 +18,26 @@ jobs: - docs-build - wheel-build secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/pr-builder.yaml@cuda-120-arm + uses: rapidsai/shared-action-workflows/.github/workflows/pr-builder.yaml@branch-23.12 checks: secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/checks.yaml@cuda-120-arm + uses: rapidsai/shared-action-workflows/.github/workflows/checks.yaml@branch-23.12 conda-python-build: needs: checks secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@cuda-120-arm + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.12 with: build_type: pull-request conda-python-tests: needs: conda-python-build secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@cuda-120-arm + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.12 with: build_type: pull-request docs-build: needs: conda-python-build secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@cuda-120-arm + uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.12 with: build_type: pull-request node_type: "gpu-v100-latest-1" diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 9a5d1c626..ea6c93db2 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -16,7 +16,7 @@ on: jobs: conda-python-tests: secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@cuda-120-arm + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.12 with: build_type: nightly branch: ${{ inputs.branch }} diff --git a/CHANGELOG.md b/CHANGELOG.md index 81e56cd48..55b9650e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,29 @@ +# dask-cuda 23.10.00 (11 Oct 2023) + +## 🐛 Bug Fixes + +- Monkeypatch protocol.loads ala dask/distributed#8216 ([#1247](https://github.com/rapidsai/dask-cuda/pull/1247)) [@wence-](https://github.com/wence-) +- Explicit-comms: preserve partition IDs ([#1240](https://github.com/rapidsai/dask-cuda/pull/1240)) [@madsbk](https://github.com/madsbk) +- Increase test timeouts further to reduce CI failures ([#1234](https://github.com/rapidsai/dask-cuda/pull/1234)) [@pentschev](https://github.com/pentschev) +- Use `conda mambabuild` not `mamba mambabuild` ([#1231](https://github.com/rapidsai/dask-cuda/pull/1231)) [@bdice](https://github.com/bdice) +- Increate timeouts of tests that frequently timeout in CI ([#1228](https://github.com/rapidsai/dask-cuda/pull/1228)) [@pentschev](https://github.com/pentschev) +- Adapt to non-string task keys in distributed ([#1225](https://github.com/rapidsai/dask-cuda/pull/1225)) [@wence-](https://github.com/wence-) +- Update `test_worker_timeout` ([#1223](https://github.com/rapidsai/dask-cuda/pull/1223)) [@pentschev](https://github.com/pentschev) +- Avoid importing `loads_function` from distributed ([#1220](https://github.com/rapidsai/dask-cuda/pull/1220)) [@rjzamora](https://github.com/rjzamora) + +## 🚀 New Features + +- Enable maximum pool size for RMM async allocator ([#1221](https://github.com/rapidsai/dask-cuda/pull/1221)) [@pentschev](https://github.com/pentschev) + +## 🛠️ Improvements + +- Pin `dask` and `distributed` for `23.10` release ([#1251](https://github.com/rapidsai/dask-cuda/pull/1251)) [@galipremsagar](https://github.com/galipremsagar) +- Update `test_spill.py` to avoid `FutureWarning`s ([#1243](https://github.com/rapidsai/dask-cuda/pull/1243)) [@pentschev](https://github.com/pentschev) +- Remove obsolete pytest `filterwarnings` ([#1241](https://github.com/rapidsai/dask-cuda/pull/1241)) [@pentschev](https://github.com/pentschev) +- Update image names ([#1233](https://github.com/rapidsai/dask-cuda/pull/1233)) [@AyodeAwe](https://github.com/AyodeAwe) +- Use `copy-pr-bot` ([#1227](https://github.com/rapidsai/dask-cuda/pull/1227)) [@ajschmidt8](https://github.com/ajschmidt8) +- Unpin `dask` and `distributed` for `23.10` development ([#1222](https://github.com/rapidsai/dask-cuda/pull/1222)) [@galipremsagar](https://github.com/galipremsagar) + # dask-cuda 23.08.00 (9 Aug 2023) ## 🐛 Bug Fixes diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py index 6a542cfb9..6172b0bc6 100644 --- a/dask_cuda/tests/test_spill.py +++ b/dask_cuda/tests/test_spill.py @@ -1,3 +1,4 @@ +import gc import os from time import sleep @@ -58,7 +59,10 @@ def assert_device_host_file_size( def worker_assert( - dask_worker, total_size, device_chunk_overhead, serialized_chunk_overhead + total_size, + device_chunk_overhead, + serialized_chunk_overhead, + dask_worker=None, ): assert_device_host_file_size( dask_worker.data, total_size, device_chunk_overhead, serialized_chunk_overhead @@ -66,7 +70,10 @@ def worker_assert( def delayed_worker_assert( - dask_worker, total_size, device_chunk_overhead, serialized_chunk_overhead + total_size, + device_chunk_overhead, + serialized_chunk_overhead, + dask_worker=None, ): start = time() while not device_host_file_size_matches( @@ -82,6 +89,18 @@ def delayed_worker_assert( ) +def assert_host_chunks(spills_to_disk, dask_worker=None): + if spills_to_disk is False: + assert len(dask_worker.data.host) + + +def assert_disk_chunks(spills_to_disk, dask_worker=None): + if spills_to_disk is True: + assert len(dask_worker.data.disk or list()) > 0 + else: + assert len(dask_worker.data.disk or list()) == 0 + + @pytest.mark.parametrize( "params", [ @@ -122,7 +141,7 @@ def delayed_worker_assert( }, ], ) -@gen_test(timeout=120) +@gen_test(timeout=30) async def test_cupy_cluster_device_spill(params): cupy = pytest.importorskip("cupy") with dask.config.set( @@ -144,6 +163,8 @@ async def test_cupy_cluster_device_spill(params): ) as cluster: async with Client(cluster, asynchronous=True) as client: + await client.wait_for_workers(1) + rs = da.random.RandomState(RandomState=cupy.random.RandomState) x = rs.random(int(50e6), chunks=2e6) await wait(x) @@ -153,7 +174,10 @@ async def test_cupy_cluster_device_spill(params): # Allow up to 1024 bytes overhead per chunk serialized await client.run( - lambda dask_worker: worker_assert(dask_worker, x.nbytes, 1024, 1024) + worker_assert, + x.nbytes, + 1024, + 1024, ) y = client.compute(x.sum()) @@ -162,20 +186,19 @@ async def test_cupy_cluster_device_spill(params): assert (abs(res / x.size) - 0.5) < 1e-3 await client.run( - lambda dask_worker: worker_assert(dask_worker, x.nbytes, 1024, 1024) + worker_assert, + x.nbytes, + 1024, + 1024, ) - host_chunks = await client.run( - lambda dask_worker: len(dask_worker.data.host) + await client.run( + assert_host_chunks, + params["spills_to_disk"], ) - disk_chunks = await client.run( - lambda dask_worker: len(dask_worker.data.disk or list()) + await client.run( + assert_disk_chunks, + params["spills_to_disk"], ) - for hc, dc in zip(host_chunks.values(), disk_chunks.values()): - if params["spills_to_disk"]: - assert dc > 0 - else: - assert hc > 0 - assert dc == 0 @pytest.mark.parametrize( @@ -218,7 +241,7 @@ async def test_cupy_cluster_device_spill(params): }, ], ) -@gen_test(timeout=120) +@gen_test(timeout=30) async def test_cudf_cluster_device_spill(params): cudf = pytest.importorskip("cudf") @@ -243,6 +266,8 @@ async def test_cudf_cluster_device_spill(params): ) as cluster: async with Client(cluster, asynchronous=True) as client: + await client.wait_for_workers(1) + # There's a known issue with datetime64: # https://github.com/numpy/numpy/issues/4983#issuecomment-441332940 # The same error above happens when spilling datetime64 to disk @@ -264,26 +289,35 @@ async def test_cudf_cluster_device_spill(params): await wait(cdf2) del cdf + gc.collect() - host_chunks = await client.run( - lambda dask_worker: len(dask_worker.data.host) + await client.run( + assert_host_chunks, + params["spills_to_disk"], ) - disk_chunks = await client.run( - lambda dask_worker: len(dask_worker.data.disk or list()) + await client.run( + assert_disk_chunks, + params["spills_to_disk"], ) - for hc, dc in zip(host_chunks.values(), disk_chunks.values()): - if params["spills_to_disk"]: - assert dc > 0 - else: - assert hc > 0 - assert dc == 0 await client.run( - lambda dask_worker: worker_assert(dask_worker, nbytes, 32, 2048) + worker_assert, + nbytes, + 32, + 2048, ) del cdf2 - await client.run( - lambda dask_worker: delayed_worker_assert(dask_worker, 0, 0, 0) - ) + while True: + try: + await client.run( + delayed_worker_assert, + 0, + 0, + 0, + ) + except AssertionError: + gc.collect() + else: + break diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index a155dc593..1e244bb31 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -18,7 +18,7 @@ import distributed # noqa: required for dask.config.get("distributed.comm.ucx") from dask.config import canonical_name from dask.utils import format_bytes, parse_bytes -from distributed import Worker, wait +from distributed import Worker, WorkerPlugin, wait from distributed.comm import parse_address try: @@ -32,7 +32,7 @@ def nvtx_annotate(message=None, color="blue", domain=None): yield -class CPUAffinity: +class CPUAffinity(WorkerPlugin): def __init__(self, cores): self.cores = cores @@ -40,7 +40,7 @@ def setup(self, worker=None): os.sched_setaffinity(0, self.cores) -class RMMSetup: +class RMMSetup(WorkerPlugin): def __init__( self, initial_pool_size, @@ -135,7 +135,7 @@ def setup(self, worker=None): rmm.mr.set_current_device_resource(rmm.mr.TrackingResourceAdaptor(mr)) -class PreImport: +class PreImport(WorkerPlugin): def __init__(self, libraries): if libraries is None: libraries = []