Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-23.12' into update-dist…
Browse files Browse the repository at this point in the history
…ributed-breaking-changes
  • Loading branch information
pentschev committed Oct 12, 2023
2 parents 9f2476d + 2ffd1d6 commit 3b070f2
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 43 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ concurrency:
jobs:
conda-python-build:
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@cuda-120-arm
uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.12
with:
build_type: ${{ inputs.build_type || 'branch' }}
branch: ${{ inputs.branch }}
Expand All @@ -38,7 +38,7 @@ jobs:
if: github.ref_type == 'branch'
needs: [conda-python-build]
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@cuda-120-arm
uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.12
with:
arch: "amd64"
branch: ${{ inputs.branch }}
Expand All @@ -51,7 +51,7 @@ jobs:
upload-conda:
needs: [conda-python-build]
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/conda-upload-packages.yaml@cuda-120-arm
uses: rapidsai/shared-action-workflows/.github/workflows/conda-upload-packages.yaml@branch-23.12
with:
build_type: ${{ inputs.build_type || 'branch' }}
branch: ${{ inputs.branch }}
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,26 @@ jobs:
- docs-build
- wheel-build
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/pr-builder.yaml@cuda-120-arm
uses: rapidsai/shared-action-workflows/.github/workflows/pr-builder.yaml@branch-23.12
checks:
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/checks.yaml@cuda-120-arm
uses: rapidsai/shared-action-workflows/.github/workflows/checks.yaml@branch-23.12
conda-python-build:
needs: checks
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@cuda-120-arm
uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.12
with:
build_type: pull-request
conda-python-tests:
needs: conda-python-build
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@cuda-120-arm
uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.12
with:
build_type: pull-request
docs-build:
needs: conda-python-build
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@cuda-120-arm
uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.12
with:
build_type: pull-request
node_type: "gpu-v100-latest-1"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ on:
jobs:
conda-python-tests:
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@cuda-120-arm
uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.12
with:
build_type: nightly
branch: ${{ inputs.branch }}
Expand Down
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,29 @@
# dask-cuda 23.10.00 (11 Oct 2023)

## 🐛 Bug Fixes

- Monkeypatch protocol.loads ala dask/distributed#8216 ([#1247](https://github.com/rapidsai/dask-cuda/pull/1247)) [@wence-](https://github.com/wence-)
- Explicit-comms: preserve partition IDs ([#1240](https://github.com/rapidsai/dask-cuda/pull/1240)) [@madsbk](https://github.com/madsbk)
- Increase test timeouts further to reduce CI failures ([#1234](https://github.com/rapidsai/dask-cuda/pull/1234)) [@pentschev](https://github.com/pentschev)
- Use `conda mambabuild` not `mamba mambabuild` ([#1231](https://github.com/rapidsai/dask-cuda/pull/1231)) [@bdice](https://github.com/bdice)
- Increate timeouts of tests that frequently timeout in CI ([#1228](https://github.com/rapidsai/dask-cuda/pull/1228)) [@pentschev](https://github.com/pentschev)
- Adapt to non-string task keys in distributed ([#1225](https://github.com/rapidsai/dask-cuda/pull/1225)) [@wence-](https://github.com/wence-)
- Update `test_worker_timeout` ([#1223](https://github.com/rapidsai/dask-cuda/pull/1223)) [@pentschev](https://github.com/pentschev)
- Avoid importing `loads_function` from distributed ([#1220](https://github.com/rapidsai/dask-cuda/pull/1220)) [@rjzamora](https://github.com/rjzamora)

## 🚀 New Features

- Enable maximum pool size for RMM async allocator ([#1221](https://github.com/rapidsai/dask-cuda/pull/1221)) [@pentschev](https://github.com/pentschev)

## 🛠️ Improvements

- Pin `dask` and `distributed` for `23.10` release ([#1251](https://github.com/rapidsai/dask-cuda/pull/1251)) [@galipremsagar](https://github.com/galipremsagar)
- Update `test_spill.py` to avoid `FutureWarning`s ([#1243](https://github.com/rapidsai/dask-cuda/pull/1243)) [@pentschev](https://github.com/pentschev)
- Remove obsolete pytest `filterwarnings` ([#1241](https://github.com/rapidsai/dask-cuda/pull/1241)) [@pentschev](https://github.com/pentschev)
- Update image names ([#1233](https://github.com/rapidsai/dask-cuda/pull/1233)) [@AyodeAwe](https://github.com/AyodeAwe)
- Use `copy-pr-bot` ([#1227](https://github.com/rapidsai/dask-cuda/pull/1227)) [@ajschmidt8](https://github.com/ajschmidt8)
- Unpin `dask` and `distributed` for `23.10` development ([#1222](https://github.com/rapidsai/dask-cuda/pull/1222)) [@galipremsagar](https://github.com/galipremsagar)

# dask-cuda 23.08.00 (9 Aug 2023)

## 🐛 Bug Fixes
Expand Down
94 changes: 64 additions & 30 deletions dask_cuda/tests/test_spill.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import gc
import os
from time import sleep

Expand Down Expand Up @@ -58,15 +59,21 @@ def assert_device_host_file_size(


def worker_assert(
dask_worker, total_size, device_chunk_overhead, serialized_chunk_overhead
total_size,
device_chunk_overhead,
serialized_chunk_overhead,
dask_worker=None,
):
assert_device_host_file_size(
dask_worker.data, total_size, device_chunk_overhead, serialized_chunk_overhead
)


def delayed_worker_assert(
dask_worker, total_size, device_chunk_overhead, serialized_chunk_overhead
total_size,
device_chunk_overhead,
serialized_chunk_overhead,
dask_worker=None,
):
start = time()
while not device_host_file_size_matches(
Expand All @@ -82,6 +89,18 @@ def delayed_worker_assert(
)


def assert_host_chunks(spills_to_disk, dask_worker=None):
if spills_to_disk is False:
assert len(dask_worker.data.host)


def assert_disk_chunks(spills_to_disk, dask_worker=None):
if spills_to_disk is True:
assert len(dask_worker.data.disk or list()) > 0
else:
assert len(dask_worker.data.disk or list()) == 0


@pytest.mark.parametrize(
"params",
[
Expand Down Expand Up @@ -122,7 +141,7 @@ def delayed_worker_assert(
},
],
)
@gen_test(timeout=120)
@gen_test(timeout=30)
async def test_cupy_cluster_device_spill(params):
cupy = pytest.importorskip("cupy")
with dask.config.set(
Expand All @@ -144,6 +163,8 @@ async def test_cupy_cluster_device_spill(params):
) as cluster:
async with Client(cluster, asynchronous=True) as client:

await client.wait_for_workers(1)

rs = da.random.RandomState(RandomState=cupy.random.RandomState)
x = rs.random(int(50e6), chunks=2e6)
await wait(x)
Expand All @@ -153,7 +174,10 @@ async def test_cupy_cluster_device_spill(params):

# Allow up to 1024 bytes overhead per chunk serialized
await client.run(
lambda dask_worker: worker_assert(dask_worker, x.nbytes, 1024, 1024)
worker_assert,
x.nbytes,
1024,
1024,
)

y = client.compute(x.sum())
Expand All @@ -162,20 +186,19 @@ async def test_cupy_cluster_device_spill(params):
assert (abs(res / x.size) - 0.5) < 1e-3

await client.run(
lambda dask_worker: worker_assert(dask_worker, x.nbytes, 1024, 1024)
worker_assert,
x.nbytes,
1024,
1024,
)
host_chunks = await client.run(
lambda dask_worker: len(dask_worker.data.host)
await client.run(
assert_host_chunks,
params["spills_to_disk"],
)
disk_chunks = await client.run(
lambda dask_worker: len(dask_worker.data.disk or list())
await client.run(
assert_disk_chunks,
params["spills_to_disk"],
)
for hc, dc in zip(host_chunks.values(), disk_chunks.values()):
if params["spills_to_disk"]:
assert dc > 0
else:
assert hc > 0
assert dc == 0


@pytest.mark.parametrize(
Expand Down Expand Up @@ -218,7 +241,7 @@ async def test_cupy_cluster_device_spill(params):
},
],
)
@gen_test(timeout=120)
@gen_test(timeout=30)
async def test_cudf_cluster_device_spill(params):
cudf = pytest.importorskip("cudf")

Expand All @@ -243,6 +266,8 @@ async def test_cudf_cluster_device_spill(params):
) as cluster:
async with Client(cluster, asynchronous=True) as client:

await client.wait_for_workers(1)

# There's a known issue with datetime64:
# https://github.com/numpy/numpy/issues/4983#issuecomment-441332940
# The same error above happens when spilling datetime64 to disk
Expand All @@ -264,26 +289,35 @@ async def test_cudf_cluster_device_spill(params):
await wait(cdf2)

del cdf
gc.collect()

host_chunks = await client.run(
lambda dask_worker: len(dask_worker.data.host)
await client.run(
assert_host_chunks,
params["spills_to_disk"],
)
disk_chunks = await client.run(
lambda dask_worker: len(dask_worker.data.disk or list())
await client.run(
assert_disk_chunks,
params["spills_to_disk"],
)
for hc, dc in zip(host_chunks.values(), disk_chunks.values()):
if params["spills_to_disk"]:
assert dc > 0
else:
assert hc > 0
assert dc == 0

await client.run(
lambda dask_worker: worker_assert(dask_worker, nbytes, 32, 2048)
worker_assert,
nbytes,
32,
2048,
)

del cdf2

await client.run(
lambda dask_worker: delayed_worker_assert(dask_worker, 0, 0, 0)
)
while True:
try:
await client.run(
delayed_worker_assert,
0,
0,
0,
)
except AssertionError:
gc.collect()
else:
break
8 changes: 4 additions & 4 deletions dask_cuda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import distributed # noqa: required for dask.config.get("distributed.comm.ucx")
from dask.config import canonical_name
from dask.utils import format_bytes, parse_bytes
from distributed import Worker, wait
from distributed import Worker, WorkerPlugin, wait
from distributed.comm import parse_address

try:
Expand All @@ -32,15 +32,15 @@ def nvtx_annotate(message=None, color="blue", domain=None):
yield


class CPUAffinity:
class CPUAffinity(WorkerPlugin):
def __init__(self, cores):
self.cores = cores

def setup(self, worker=None):
os.sched_setaffinity(0, self.cores)


class RMMSetup:
class RMMSetup(WorkerPlugin):
def __init__(
self,
initial_pool_size,
Expand Down Expand Up @@ -135,7 +135,7 @@ def setup(self, worker=None):
rmm.mr.set_current_device_resource(rmm.mr.TrackingResourceAdaptor(mr))


class PreImport:
class PreImport(WorkerPlugin):
def __init__(self, libraries):
if libraries is None:
libraries = []
Expand Down

0 comments on commit 3b070f2

Please sign in to comment.