diff --git a/conda/recipes/dask-cuda/meta.yaml b/conda/recipes/dask-cuda/meta.yaml index 08df9e563..3b0c15626 100644 --- a/conda/recipes/dask-cuda/meta.yaml +++ b/conda/recipes/dask-cuda/meta.yaml @@ -32,7 +32,7 @@ requirements: - tomli run: - python - - dask-core ==2023.9.2 + - dask-core >=2023.9.2 {% for r in data.get("project", {}).get("dependencies", []) %} - {{ r }} {% endfor %} diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index 2218e47e5..9d6917ef6 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -21,8 +21,6 @@ __version__ = "23.12.00" -from . import compat - # Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True` dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper( dask.dataframe.shuffle.rearrange_by_column diff --git a/dask_cuda/compat.py b/dask_cuda/compat.py deleted file mode 100644 index 1c09337b2..000000000 --- a/dask_cuda/compat.py +++ /dev/null @@ -1,118 +0,0 @@ -import pickle - -import msgpack -from packaging.version import Version - -import dask -import distributed -import distributed.comm.utils -import distributed.protocol -from distributed.comm.utils import OFFLOAD_THRESHOLD, nbytes, offload -from distributed.protocol.core import ( - Serialized, - decompress, - logger, - merge_and_deserialize, - msgpack_decode_default, - msgpack_opts, -) - -if Version(distributed.__version__) >= Version("2023.8.1"): - # Monkey-patch protocol.core.loads (and its users) - async def from_frames( - frames, deserialize=True, deserializers=None, allow_offload=True - ): - """ - Unserialize a list of Distributed protocol frames. - """ - size = False - - def _from_frames(): - try: - # Patched code - return loads( - frames, deserialize=deserialize, deserializers=deserializers - ) - # end patched code - except EOFError: - if size > 1000: - datastr = "[too large to display]" - else: - datastr = frames - # Aid diagnosing - logger.error("truncated data stream (%d bytes): %s", size, datastr) - raise - - if allow_offload and deserialize and OFFLOAD_THRESHOLD: - size = sum(map(nbytes, frames)) - if ( - allow_offload - and deserialize - and OFFLOAD_THRESHOLD - and size > OFFLOAD_THRESHOLD - ): - res = await offload(_from_frames) - else: - res = _from_frames() - - return res - - def loads(frames, deserialize=True, deserializers=None): - """Transform bytestream back into Python value""" - - allow_pickle = dask.config.get("distributed.scheduler.pickle") - - try: - - def _decode_default(obj): - offset = obj.get("__Serialized__", 0) - if offset > 0: - sub_header = msgpack.loads( - frames[offset], - object_hook=msgpack_decode_default, - use_list=False, - **msgpack_opts, - ) - offset += 1 - sub_frames = frames[offset : offset + sub_header["num-sub-frames"]] - if deserialize: - if "compression" in sub_header: - sub_frames = decompress(sub_header, sub_frames) - return merge_and_deserialize( - sub_header, sub_frames, deserializers=deserializers - ) - else: - return Serialized(sub_header, sub_frames) - - offset = obj.get("__Pickled__", 0) - if offset > 0: - sub_header = msgpack.loads(frames[offset]) - offset += 1 - sub_frames = frames[offset : offset + sub_header["num-sub-frames"]] - # Patched code - if "compression" in sub_header: - sub_frames = decompress(sub_header, sub_frames) - # end patched code - if allow_pickle: - return pickle.loads( - sub_header["pickled-obj"], buffers=sub_frames - ) - else: - raise ValueError( - "Unpickle on the Scheduler isn't allowed, " - "set `distributed.scheduler.pickle=true`" - ) - - return msgpack_decode_default(obj) - - return msgpack.loads( - frames[0], object_hook=_decode_default, use_list=False, **msgpack_opts - ) - - except Exception: - logger.critical("Failed to deserialize", exc_info=True) - raise - - distributed.protocol.loads = loads - distributed.protocol.core.loads = loads - distributed.comm.utils.from_frames = from_frames diff --git a/dask_cuda/device_host_file.py b/dask_cuda/device_host_file.py index 7942f6547..b646a9294 100644 --- a/dask_cuda/device_host_file.py +++ b/dask_cuda/device_host_file.py @@ -17,7 +17,7 @@ serialize_bytelist, ) from distributed.sizeof import safe_sizeof -from distributed.spill import CustomFile as KeyAsStringFile +from distributed.spill import AnyKeyFile as KeyAsStringFile from distributed.utils import nbytes from .is_device_object import is_device_object diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index bd6770225..7d8e1b194 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -164,6 +164,8 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions): @pytest.mark.parametrize("_partitions", [True, False]) def test_dataframe_shuffle(backend, protocol, nworkers, _partitions): if backend == "cudf": + pytest.skip("Temporarily disable due to segfaults in libaws-cpp-sdk-core.so") + pytest.importorskip("cudf") p = mp.Process( @@ -259,6 +261,8 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers): @pytest.mark.parametrize("protocol", ["tcp", "ucx"]) def test_dataframe_shuffle_merge(backend, protocol, nworkers): if backend == "cudf": + pytest.skip("Temporarily disable due to segfaults in libaws-cpp-sdk-core.so") + pytest.importorskip("cudf") p = mp.Process( target=_test_dataframe_shuffle_merge, args=(backend, protocol, nworkers) diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index 5d7762579..3298cf219 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -337,6 +337,7 @@ async def test_pre_import(): # Intentionally not using @gen_test to skip cleanup checks +@pytest.mark.xfail(reason="https://github.com/rapidsai/dask-cuda/issues/1265") def test_pre_import_not_found(): async def _test_pre_import_not_found(): with raises_with_cause(RuntimeError, None, ImportError, None): @@ -491,6 +492,7 @@ def test_print_cluster_config(capsys): assert "[plugin]" in captured.out +@pytest.mark.xfail(reason="https://github.com/rapidsai/dask-cuda/issues/1265") def test_death_timeout_raises(): with pytest.raises(asyncio.exceptions.TimeoutError): with LocalCUDACluster( diff --git a/dependencies.yaml b/dependencies.yaml index 703c52074..1022b3a38 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -101,8 +101,8 @@ dependencies: common: - output_types: [conda, requirements] packages: - - dask==2023.9.2 - - distributed==2023.9.2 + - dask>=2023.9.2 + - distributed>=2023.9.2 - numba>=0.57 - numpy>=1.21 - pandas>=1.3,<1.6.0dev0 @@ -110,7 +110,7 @@ dependencies: - zict>=2.0.0 - output_types: [conda] packages: - - dask-core==2023.9.2 + - dask-core>=2023.9.2 test_python: common: - output_types: [conda] diff --git a/pyproject.toml b/pyproject.toml index 0ceae5db4..2ebe09bc7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,8 +16,8 @@ authors = [ license = { text = "Apache-2.0" } requires-python = ">=3.9" dependencies = [ - "dask ==2023.9.2", - "distributed ==2023.9.2", + "dask >=2023.9.2", + "distributed >=2023.9.2", "pynvml >=11.0.0,<11.5", "numpy >=1.21", "numba >=0.57",