From ff9a1a2971ca6053b4107aad552d1403579d3da1 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 4 Oct 2023 09:26:24 -0700 Subject: [PATCH 1/5] Import `AnyKeyFile`, as per Distributed changes https://github.com/dask/distributed/pull/8195 has renamed `CustomFile` to `AnyKeyFile`, the import requires updating. --- dask_cuda/device_host_file.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_cuda/device_host_file.py b/dask_cuda/device_host_file.py index 7942f6547..b646a9294 100644 --- a/dask_cuda/device_host_file.py +++ b/dask_cuda/device_host_file.py @@ -17,7 +17,7 @@ serialize_bytelist, ) from distributed.sizeof import safe_sizeof -from distributed.spill import CustomFile as KeyAsStringFile +from distributed.spill import AnyKeyFile as KeyAsStringFile from distributed.utils import nbytes from .is_device_object import is_device_object From 9f2476d22a16f5220470f227d6040fb4d716b8cc Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 4 Oct 2023 10:13:23 -0700 Subject: [PATCH 2/5] Remove `protocol.loads` compatibility fix This change was introduced in https://github.com/rapidsai/dask-cuda/pull/1247 to include the fix from https://github.com/dask/distributed/pull/8216 for the Dask-CUDA 23.10 which pinned to Distributed 2023.9.2. Starting from Distributed 2023.9.3, the fix is merged in Distributed and this isn't required anymore. --- dask_cuda/__init__.py | 2 - dask_cuda/compat.py | 118 ------------------------------------------ 2 files changed, 120 deletions(-) delete mode 100644 dask_cuda/compat.py diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index 2218e47e5..9d6917ef6 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -21,8 +21,6 @@ __version__ = "23.12.00" -from . import compat - # Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True` dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper( dask.dataframe.shuffle.rearrange_by_column diff --git a/dask_cuda/compat.py b/dask_cuda/compat.py deleted file mode 100644 index 1c09337b2..000000000 --- a/dask_cuda/compat.py +++ /dev/null @@ -1,118 +0,0 @@ -import pickle - -import msgpack -from packaging.version import Version - -import dask -import distributed -import distributed.comm.utils -import distributed.protocol -from distributed.comm.utils import OFFLOAD_THRESHOLD, nbytes, offload -from distributed.protocol.core import ( - Serialized, - decompress, - logger, - merge_and_deserialize, - msgpack_decode_default, - msgpack_opts, -) - -if Version(distributed.__version__) >= Version("2023.8.1"): - # Monkey-patch protocol.core.loads (and its users) - async def from_frames( - frames, deserialize=True, deserializers=None, allow_offload=True - ): - """ - Unserialize a list of Distributed protocol frames. - """ - size = False - - def _from_frames(): - try: - # Patched code - return loads( - frames, deserialize=deserialize, deserializers=deserializers - ) - # end patched code - except EOFError: - if size > 1000: - datastr = "[too large to display]" - else: - datastr = frames - # Aid diagnosing - logger.error("truncated data stream (%d bytes): %s", size, datastr) - raise - - if allow_offload and deserialize and OFFLOAD_THRESHOLD: - size = sum(map(nbytes, frames)) - if ( - allow_offload - and deserialize - and OFFLOAD_THRESHOLD - and size > OFFLOAD_THRESHOLD - ): - res = await offload(_from_frames) - else: - res = _from_frames() - - return res - - def loads(frames, deserialize=True, deserializers=None): - """Transform bytestream back into Python value""" - - allow_pickle = dask.config.get("distributed.scheduler.pickle") - - try: - - def _decode_default(obj): - offset = obj.get("__Serialized__", 0) - if offset > 0: - sub_header = msgpack.loads( - frames[offset], - object_hook=msgpack_decode_default, - use_list=False, - **msgpack_opts, - ) - offset += 1 - sub_frames = frames[offset : offset + sub_header["num-sub-frames"]] - if deserialize: - if "compression" in sub_header: - sub_frames = decompress(sub_header, sub_frames) - return merge_and_deserialize( - sub_header, sub_frames, deserializers=deserializers - ) - else: - return Serialized(sub_header, sub_frames) - - offset = obj.get("__Pickled__", 0) - if offset > 0: - sub_header = msgpack.loads(frames[offset]) - offset += 1 - sub_frames = frames[offset : offset + sub_header["num-sub-frames"]] - # Patched code - if "compression" in sub_header: - sub_frames = decompress(sub_header, sub_frames) - # end patched code - if allow_pickle: - return pickle.loads( - sub_header["pickled-obj"], buffers=sub_frames - ) - else: - raise ValueError( - "Unpickle on the Scheduler isn't allowed, " - "set `distributed.scheduler.pickle=true`" - ) - - return msgpack_decode_default(obj) - - return msgpack.loads( - frames[0], object_hook=_decode_default, use_list=False, **msgpack_opts - ) - - except Exception: - logger.critical("Failed to deserialize", exc_info=True) - raise - - distributed.protocol.loads = loads - distributed.protocol.core.loads = loads - distributed.comm.utils.from_frames = from_frames From 55bbf45ca61a59834a2a1a493abbf03b2ea5f8c3 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Mon, 23 Oct 2023 16:23:00 -0700 Subject: [PATCH 3/5] Unpin dask and distributed for 23.12 development --- conda/recipes/dask-cuda/meta.yaml | 2 +- dependencies.yaml | 6 +++--- pyproject.toml | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/conda/recipes/dask-cuda/meta.yaml b/conda/recipes/dask-cuda/meta.yaml index 08df9e563..3b0c15626 100644 --- a/conda/recipes/dask-cuda/meta.yaml +++ b/conda/recipes/dask-cuda/meta.yaml @@ -32,7 +32,7 @@ requirements: - tomli run: - python - - dask-core ==2023.9.2 + - dask-core >=2023.9.2 {% for r in data.get("project", {}).get("dependencies", []) %} - {{ r }} {% endfor %} diff --git a/dependencies.yaml b/dependencies.yaml index 703c52074..1022b3a38 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -101,8 +101,8 @@ dependencies: common: - output_types: [conda, requirements] packages: - - dask==2023.9.2 - - distributed==2023.9.2 + - dask>=2023.9.2 + - distributed>=2023.9.2 - numba>=0.57 - numpy>=1.21 - pandas>=1.3,<1.6.0dev0 @@ -110,7 +110,7 @@ dependencies: - zict>=2.0.0 - output_types: [conda] packages: - - dask-core==2023.9.2 + - dask-core>=2023.9.2 test_python: common: - output_types: [conda] diff --git a/pyproject.toml b/pyproject.toml index 0ceae5db4..2ebe09bc7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,8 +16,8 @@ authors = [ license = { text = "Apache-2.0" } requires-python = ">=3.9" dependencies = [ - "dask ==2023.9.2", - "distributed ==2023.9.2", + "dask >=2023.9.2", + "distributed >=2023.9.2", "pynvml >=11.0.0,<11.5", "numpy >=1.21", "numba >=0.57", From 8e60caa9c631fe8e8a6fc3cc4a55a120aedaf10e Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 26 Oct 2023 08:42:41 -0700 Subject: [PATCH 4/5] Mark failing tests to xfail --- dask_cuda/tests/test_local_cuda_cluster.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index 5d7762579..3298cf219 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -337,6 +337,7 @@ async def test_pre_import(): # Intentionally not using @gen_test to skip cleanup checks +@pytest.mark.xfail(reason="https://github.com/rapidsai/dask-cuda/issues/1265") def test_pre_import_not_found(): async def _test_pre_import_not_found(): with raises_with_cause(RuntimeError, None, ImportError, None): @@ -491,6 +492,7 @@ def test_print_cluster_config(capsys): assert "[plugin]" in captured.out +@pytest.mark.xfail(reason="https://github.com/rapidsai/dask-cuda/issues/1265") def test_death_timeout_raises(): with pytest.raises(asyncio.exceptions.TimeoutError): with LocalCUDACluster( From 6eec79fbd016b704ce4ee05d9a6f111fd60bf815 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 27 Oct 2023 02:01:19 -0700 Subject: [PATCH 5/5] Temporarily disable tests due to AWS segfaults --- dask_cuda/tests/test_explicit_comms.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index bd6770225..7d8e1b194 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -164,6 +164,8 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions): @pytest.mark.parametrize("_partitions", [True, False]) def test_dataframe_shuffle(backend, protocol, nworkers, _partitions): if backend == "cudf": + pytest.skip("Temporarily disable due to segfaults in libaws-cpp-sdk-core.so") + pytest.importorskip("cudf") p = mp.Process( @@ -259,6 +261,8 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers): @pytest.mark.parametrize("protocol", ["tcp", "ucx"]) def test_dataframe_shuffle_merge(backend, protocol, nworkers): if backend == "cudf": + pytest.skip("Temporarily disable due to segfaults in libaws-cpp-sdk-core.so") + pytest.importorskip("cudf") p = mp.Process( target=_test_dataframe_shuffle_merge, args=(backend, protocol, nworkers)