diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index bcf0c7261..f5c7a0af4 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -43,7 +43,7 @@ jobs: arch: "amd64" branch: ${{ inputs.branch }} build_type: ${{ inputs.build_type || 'branch' }} - container_image: "rapidsai/ci:latest" + container_image: "rapidsai/ci-conda:latest" date: ${{ inputs.date }} node_type: "gpu-v100-latest-1" run_script: "ci/build_docs.sh" @@ -60,7 +60,7 @@ jobs: wheel-build: runs-on: ubuntu-latest container: - image: rapidsai/ci:latest + image: rapidsai/ci-conda:latest defaults: run: shell: bash diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 835fe9c0c..f7846c226 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -42,13 +42,13 @@ jobs: build_type: pull-request node_type: "gpu-v100-latest-1" arch: "amd64" - container_image: "rapidsai/ci:latest" + container_image: "rapidsai/ci-conda:latest" run_script: "ci/build_docs.sh" wheel-build: needs: checks runs-on: ubuntu-latest container: - image: rapidsai/ci:latest + image: rapidsai/ci-conda:latest defaults: run: shell: bash diff --git a/ci/build_python.sh b/ci/build_python.sh index 4124a4c5a..d4a28497d 100755 --- a/ci/build_python.sh +++ b/ci/build_python.sh @@ -11,7 +11,7 @@ rapids-print-env rapids-logger "Begin py build" -rapids-mamba-retry mambabuild \ +rapids-conda-retry mambabuild \ conda/recipes/dask-cuda rapids-upload-conda-to-s3 python diff --git a/conda/recipes/dask-cuda/meta.yaml b/conda/recipes/dask-cuda/meta.yaml index 41b977375..08df9e563 100644 --- a/conda/recipes/dask-cuda/meta.yaml +++ b/conda/recipes/dask-cuda/meta.yaml @@ -32,7 +32,7 @@ requirements: - tomli run: - python - - dask-core >=2023.7.1 + - dask-core ==2023.9.2 {% for r in data.get("project", {}).get("dependencies", []) %} - {{ r }} {% endfor %} diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index c33fae213..0d72efd89 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -21,6 +21,7 @@ __version__ = "23.10.00" +from . import compat # Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True` dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper( diff --git a/dask_cuda/compat.py b/dask_cuda/compat.py new file mode 100644 index 000000000..1c09337b2 --- /dev/null +++ b/dask_cuda/compat.py @@ -0,0 +1,118 @@ +import pickle + +import msgpack +from packaging.version import Version + +import dask +import distributed +import distributed.comm.utils +import distributed.protocol +from distributed.comm.utils import OFFLOAD_THRESHOLD, nbytes, offload +from distributed.protocol.core import ( + Serialized, + decompress, + logger, + merge_and_deserialize, + msgpack_decode_default, + msgpack_opts, +) + +if Version(distributed.__version__) >= Version("2023.8.1"): + # Monkey-patch protocol.core.loads (and its users) + async def from_frames( + frames, deserialize=True, deserializers=None, allow_offload=True + ): + """ + Unserialize a list of Distributed protocol frames. + """ + size = False + + def _from_frames(): + try: + # Patched code + return loads( + frames, deserialize=deserialize, deserializers=deserializers + ) + # end patched code + except EOFError: + if size > 1000: + datastr = "[too large to display]" + else: + datastr = frames + # Aid diagnosing + logger.error("truncated data stream (%d bytes): %s", size, datastr) + raise + + if allow_offload and deserialize and OFFLOAD_THRESHOLD: + size = sum(map(nbytes, frames)) + if ( + allow_offload + and deserialize + and OFFLOAD_THRESHOLD + and size > OFFLOAD_THRESHOLD + ): + res = await offload(_from_frames) + else: + res = _from_frames() + + return res + + def loads(frames, deserialize=True, deserializers=None): + """Transform bytestream back into Python value""" + + allow_pickle = dask.config.get("distributed.scheduler.pickle") + + try: + + def _decode_default(obj): + offset = obj.get("__Serialized__", 0) + if offset > 0: + sub_header = msgpack.loads( + frames[offset], + object_hook=msgpack_decode_default, + use_list=False, + **msgpack_opts, + ) + offset += 1 + sub_frames = frames[offset : offset + sub_header["num-sub-frames"]] + if deserialize: + if "compression" in sub_header: + sub_frames = decompress(sub_header, sub_frames) + return merge_and_deserialize( + sub_header, sub_frames, deserializers=deserializers + ) + else: + return Serialized(sub_header, sub_frames) + + offset = obj.get("__Pickled__", 0) + if offset > 0: + sub_header = msgpack.loads(frames[offset]) + offset += 1 + sub_frames = frames[offset : offset + sub_header["num-sub-frames"]] + # Patched code + if "compression" in sub_header: + sub_frames = decompress(sub_header, sub_frames) + # end patched code + if allow_pickle: + return pickle.loads( + sub_header["pickled-obj"], buffers=sub_frames + ) + else: + raise ValueError( + "Unpickle on the Scheduler isn't allowed, " + "set `distributed.scheduler.pickle=true`" + ) + + return msgpack_decode_default(obj) + + return msgpack.loads( + frames[0], object_hook=_decode_default, use_list=False, **msgpack_opts + ) + + except Exception: + logger.critical("Failed to deserialize", exc_info=True) + raise + + distributed.protocol.loads = loads + distributed.protocol.core.loads = loads + distributed.comm.utils.from_frames = from_frames diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 0ca1c48ee..854115fe0 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -328,7 +328,7 @@ async def shuffle_task( ignore_index: bool, num_rounds: int, batchsize: int, -) -> List[DataFrame]: +) -> Dict[int, DataFrame]: """Explicit-comms shuffle task This function is running on each worker participating in the shuffle. @@ -360,8 +360,8 @@ async def shuffle_task( Returns ------- - partitions: list of DataFrames - List of dataframe-partitions + partitions: dict + dict that maps each Partition ID to a dataframe-partition """ proxify = get_proxify(s["worker"]) @@ -387,14 +387,13 @@ async def shuffle_task( ) # Finally, we concatenate the output dataframes into the final output partitions - ret = [] + ret = {} while out_part_id_to_dataframe_list: - ret.append( - proxify( - dd_concat( - out_part_id_to_dataframe_list.popitem()[1], - ignore_index=ignore_index, - ) + part_id, dataframe_list = out_part_id_to_dataframe_list.popitem() + ret[part_id] = proxify( + dd_concat( + dataframe_list, + ignore_index=ignore_index, ) ) # For robustness, we yield this task to give Dask a chance to do bookkeeping @@ -529,9 +528,12 @@ def shuffle( dsk = {} for rank in ranks: - for i, part_id in enumerate(rank_to_out_part_ids[rank]): + for part_id in rank_to_out_part_ids[rank]: dsk[(name, part_id)] = c.client.submit( - getitem, shuffle_result[rank], i, workers=[c.worker_addresses[rank]] + getitem, + shuffle_result[rank], + part_id, + workers=[c.worker_addresses[rank]], ) # Create a distributed Dataframe from all the pieces diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index 1a15370b5..ae4e3332c 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -93,7 +93,7 @@ def check_partitions(df, npartitions): return True -def _test_dataframe_shuffle(backend, protocol, n_workers): +def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions): if backend == "cudf": cudf = pytest.importorskip("cudf") @@ -112,6 +112,9 @@ def _test_dataframe_shuffle(backend, protocol, n_workers): if backend == "cudf": df = cudf.DataFrame.from_pandas(df) + if _partitions: + df["_partitions"] = 0 + for input_nparts in range(1, 5): for output_nparts in range(1, 5): ddf = dd.from_pandas(df.copy(), npartitions=input_nparts).persist( @@ -123,33 +126,45 @@ def _test_dataframe_shuffle(backend, protocol, n_workers): with dask.config.set(explicit_comms_batchsize=batchsize): ddf = explicit_comms_shuffle( ddf, - ["key"], + ["_partitions"] if _partitions else ["key"], npartitions=output_nparts, batchsize=batchsize, ).persist() assert ddf.npartitions == output_nparts - # Check that each partition hashes to the same value - result = ddf.map_partitions( - check_partitions, output_nparts - ).compute() - assert all(result.to_list()) - - # Check the values (ignoring the row order) - expected = df.sort_values("key") - got = ddf.compute().sort_values("key") - assert_eq(got, expected) + if _partitions: + # If "_partitions" is the hash key, we expect all but + # the first partition to be empty + assert_eq(ddf.partitions[0].compute(), df) + assert all( + len(ddf.partitions[i].compute()) == 0 + for i in range(1, ddf.npartitions) + ) + else: + # Check that each partition hashes to the same value + result = ddf.map_partitions( + check_partitions, output_nparts + ).compute() + assert all(result.to_list()) + + # Check the values (ignoring the row order) + expected = df.sort_values("key") + got = ddf.compute().sort_values("key") + assert_eq(got, expected) @pytest.mark.parametrize("nworkers", [1, 2, 3]) @pytest.mark.parametrize("backend", ["pandas", "cudf"]) @pytest.mark.parametrize("protocol", ["tcp", "ucx"]) -def test_dataframe_shuffle(backend, protocol, nworkers): +@pytest.mark.parametrize("_partitions", [True, False]) +def test_dataframe_shuffle(backend, protocol, nworkers, _partitions): if backend == "cudf": pytest.importorskip("cudf") - p = mp.Process(target=_test_dataframe_shuffle, args=(backend, protocol, nworkers)) + p = mp.Process( + target=_test_dataframe_shuffle, args=(backend, protocol, nworkers, _partitions) + ) p.start() p.join() assert not p.exitcode diff --git a/dask_cuda/tests/test_from_array.py b/dask_cuda/tests/test_from_array.py new file mode 100644 index 000000000..33f27d6fe --- /dev/null +++ b/dask_cuda/tests/test_from_array.py @@ -0,0 +1,18 @@ +import pytest + +import dask.array as da +from distributed import Client + +from dask_cuda import LocalCUDACluster + +pytest.importorskip("ucp") +cupy = pytest.importorskip("cupy") + + +@pytest.mark.parametrize("protocol", ["ucx", "tcp"]) +def test_ucx_from_array(protocol): + N = 10_000 + with LocalCUDACluster(protocol=protocol) as cluster: + with Client(cluster): + val = da.from_array(cupy.arange(N), chunks=(N // 10,)).sum().compute() + assert val == (N * (N - 1)) // 2 diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 53282bef1..c779a39ef 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -400,7 +400,7 @@ def _pxy_deserialize(self): @pytest.mark.parametrize("send_serializers", [None, ("dask", "pickle"), ("cuda",)]) @pytest.mark.parametrize("protocol", ["tcp", "ucx"]) -@gen_test(timeout=60) +@gen_test(timeout=120) async def test_communicating_proxy_objects(protocol, send_serializers): """Testing serialization of cuDF dataframe when communicating""" cudf = pytest.importorskip("cudf") diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py index cd36cb781..6a542cfb9 100644 --- a/dask_cuda/tests/test_spill.py +++ b/dask_cuda/tests/test_spill.py @@ -103,11 +103,12 @@ def delayed_worker_assert( }, { # This test setup differs from the one above as Distributed worker - # pausing is enabled and thus triggers `DeviceHostFile.evict()` + # spilling fraction is very low and thus forcefully triggers + # `DeviceHostFile.evict()` "device_memory_limit": int(200e6), "memory_limit": int(200e6), - "host_target": None, - "host_spill": None, + "host_target": False, + "host_spill": 0.01, "host_pause": False, "spills_to_disk": True, }, @@ -121,10 +122,17 @@ def delayed_worker_assert( }, ], ) -@gen_test(timeout=30) +@gen_test(timeout=120) async def test_cupy_cluster_device_spill(params): cupy = pytest.importorskip("cupy") - with dask.config.set({"distributed.worker.memory.terminate": False}): + with dask.config.set( + { + "distributed.worker.memory.terminate": False, + "distributed.worker.memory.pause": params["host_pause"], + "distributed.worker.memory.spill": params["host_spill"], + "distributed.worker.memory.target": params["host_target"], + } + ): async with LocalCUDACluster( n_workers=1, scheduler_port=0, @@ -133,9 +141,6 @@ async def test_cupy_cluster_device_spill(params): asynchronous=True, device_memory_limit=params["device_memory_limit"], memory_limit=params["memory_limit"], - memory_target_fraction=params["host_target"], - memory_spill_fraction=params["host_spill"], - memory_pause_fraction=params["host_pause"], ) as cluster: async with Client(cluster, asynchronous=True) as client: @@ -194,11 +199,12 @@ async def test_cupy_cluster_device_spill(params): }, { # This test setup differs from the one above as Distributed worker - # pausing is enabled and thus triggers `DeviceHostFile.evict()` + # spilling fraction is very low and thus forcefully triggers + # `DeviceHostFile.evict()` "device_memory_limit": int(50e6), "memory_limit": int(50e6), - "host_target": None, - "host_spill": None, + "host_target": False, + "host_spill": 0.01, "host_pause": False, "spills_to_disk": True, }, @@ -212,7 +218,7 @@ async def test_cupy_cluster_device_spill(params): }, ], ) -@gen_test(timeout=30) +@gen_test(timeout=120) async def test_cudf_cluster_device_spill(params): cudf = pytest.importorskip("cudf") @@ -221,16 +227,19 @@ async def test_cudf_cluster_device_spill(params): "distributed.comm.compression": False, "distributed.worker.memory.terminate": False, "distributed.worker.memory.spill-compression": False, + "distributed.worker.memory.pause": params["host_pause"], + "distributed.worker.memory.spill": params["host_spill"], + "distributed.worker.memory.target": params["host_target"], } ): async with LocalCUDACluster( n_workers=1, + scheduler_port=0, + silence_logs=False, + dashboard_address=None, + asynchronous=True, device_memory_limit=params["device_memory_limit"], memory_limit=params["memory_limit"], - memory_target_fraction=params["host_target"], - memory_spill_fraction=params["host_spill"], - memory_pause_fraction=params["host_pause"], - asynchronous=True, ) as cluster: async with Client(cluster, asynchronous=True) as client: diff --git a/dependencies.yaml b/dependencies.yaml index 46500c172..c684f79cd 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -101,8 +101,8 @@ dependencies: common: - output_types: [conda, requirements] packages: - - dask>=2023.7.1 - - distributed>=2023.7.1 + - dask==2023.9.2 + - distributed==2023.9.2 - numba>=0.57 - numpy>=1.21 - pandas>=1.3,<1.6.0dev0 @@ -110,7 +110,7 @@ dependencies: - zict>=2.0.0 - output_types: [conda] packages: - - dask-core>=2023.7.1 + - dask-core==2023.9.2 test_python: common: - output_types: [conda] diff --git a/pyproject.toml b/pyproject.toml index 73777b316..4435df92a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,8 +16,8 @@ authors = [ license = { text = "Apache-2.0" } requires-python = ">=3.9" dependencies = [ - "dask >=2023.7.1", - "distributed >=2023.7.1", + "dask ==2023.9.2", + "distributed ==2023.9.2", "pynvml >=11.0.0,<11.5", "numpy >=1.21", "numba >=0.57", @@ -116,15 +116,8 @@ skip = [ filterwarnings = [ "error::DeprecationWarning", "error::FutureWarning", - "ignore::DeprecationWarning:pkg_resources", - "ignore:distutils Version classes are deprecated.*:DeprecationWarning:", - # tornado 6.2, remove when dask/distributed#6669 is fixed - "ignore:clear_current is deprecated:DeprecationWarning:", - "ignore:make_current is deprecated:DeprecationWarning:", # remove after https://github.com/rapidsai/dask-cuda/issues/1087 is closed "ignore:There is no current event loop:DeprecationWarning:tornado", - # remove after unpinning Dask/Distributed 2023.3.2 - "ignore:.*np.bool.*:DeprecationWarning:", ] [tool.setuptools]