From 89392c0a9cc5f13ff97e8bea86ed992c2dd38060 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 9 Jan 2025 07:25:31 -0800 Subject: [PATCH 01/14] try importing dask_expr from dask.dataframe --- python/dask_cudf/dask_cudf/__init__.py | 6 -- python/dask_cudf/dask_cudf/_expr/__init__.py | 15 ++++- .../dask_cudf/dask_cudf/_expr/collection.py | 30 +++++++--- python/dask_cudf/dask_cudf/_expr/expr.py | 23 ++++++-- python/dask_cudf/dask_cudf/_expr/groupby.py | 31 +++++++--- python/dask_cudf/dask_cudf/backends.py | 59 ++++++++----------- python/dask_cudf/dask_cudf/io/parquet.py | 35 +++++++---- 7 files changed, 122 insertions(+), 77 deletions(-) diff --git a/python/dask_cudf/dask_cudf/__init__.py b/python/dask_cudf/dask_cudf/__init__.py index 863102103ed..bbed7ee9806 100644 --- a/python/dask_cudf/dask_cudf/__init__.py +++ b/python/dask_cudf/dask_cudf/__init__.py @@ -11,12 +11,6 @@ from ._version import __git_commit__, __version__ # noqa: F401 from .core import DataFrame, Index, Series, _deprecated_api, concat, from_cudf -if not (QUERY_PLANNING_ON := dd._dask_expr_enabled()): - raise ValueError( - "The legacy DataFrame API is not supported in dask_cudf>24.12. " - "Please enable query-planning, or downgrade to dask_cudf<=24.12" - ) - def read_csv(*args, **kwargs): with config.set({"dataframe.backend": "cudf"}): diff --git a/python/dask_cudf/dask_cudf/_expr/__init__.py b/python/dask_cudf/dask_cudf/_expr/__init__.py index 3c827d4ff59..e2ef0626d25 100644 --- a/python/dask_cudf/dask_cudf/_expr/__init__.py +++ b/python/dask_cudf/dask_cudf/_expr/__init__.py @@ -1 +1,14 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. + +try: + from dask.dataframe import dask_expr # noqa: F401 + +except ImportError: + # TODO: Remove when pinned to dask>2024.12.1 + import dask.dataframe as dd + + if not dd._dask_expr_enabled(): + raise ValueError( + "The legacy DataFrame API is not supported in dask_cudf>24.12. " + "Please enable query-planning, or downgrade to dask_cudf<=24.12" + ) diff --git a/python/dask_cudf/dask_cudf/_expr/collection.py b/python/dask_cudf/dask_cudf/_expr/collection.py index e8c9a970b7b..ab7cc634251 100644 --- a/python/dask_cudf/dask_cudf/_expr/collection.py +++ b/python/dask_cudf/dask_cudf/_expr/collection.py @@ -3,15 +3,27 @@ import warnings from functools import cached_property -from dask_expr import ( - DataFrame as DXDataFrame, - FrameBase, - Index as DXIndex, - Series as DXSeries, - get_collection_type, -) -from dask_expr._collection import new_collection -from dask_expr._util import _raise_if_object_series +try: + from dask.dataframe.dask_expr import ( + DataFrame as DXDataFrame, + FrameBase, + Index as DXIndex, + Series as DXSeries, + get_collection_type, + ) + from dask.dataframe.dask_expr._collection import new_collection + from dask.dataframe.dask_expr._util import _raise_if_object_series +except ImportError: + # TODO: Remove when pinned to dask>2024.12.1 + from dask_expr import ( + DataFrame as DXDataFrame, + FrameBase, + Index as DXIndex, + Series as DXSeries, + get_collection_type, + ) + from dask_expr._collection import new_collection + from dask_expr._util import _raise_if_object_series from dask import config from dask.dataframe.core import is_dataframe_like diff --git a/python/dask_cudf/dask_cudf/_expr/expr.py b/python/dask_cudf/dask_cudf/_expr/expr.py index 03d1da0d258..45ce9f58366 100644 --- a/python/dask_cudf/dask_cudf/_expr/expr.py +++ b/python/dask_cudf/dask_cudf/_expr/expr.py @@ -1,11 +1,24 @@ # Copyright (c) 2024-2025, NVIDIA CORPORATION. import functools -import dask_expr._shuffle as _shuffle_module -from dask_expr import new_collection -from dask_expr._cumulative import CumulativeBlockwise -from dask_expr._expr import Elemwise, Expr, RenameAxis, VarColumns -from dask_expr._reductions import Reduction, Var +try: + import dask.dataframe.dask_expr._shuffle as _shuffle_module + from dask.dataframe.dask_expr import new_collection + from dask.dataframe.dask_expr._cumulative import CumulativeBlockwise + from dask.dataframe.dask_expr._expr import ( + Elemwise, + Expr, + RenameAxis, + VarColumns, + ) + from dask.dataframe.dask_expr._reductions import Reduction, Var +except ImportError: + # TODO: Remove when pinned to dask>2024.12.1 + import dask_expr._shuffle as _shuffle_module + from dask_expr import new_collection + from dask_expr._cumulative import CumulativeBlockwise + from dask_expr._expr import Elemwise, Expr, RenameAxis, VarColumns + from dask_expr._reductions import Reduction, Var from dask.dataframe.dispatch import ( is_categorical_dtype, diff --git a/python/dask_cudf/dask_cudf/_expr/groupby.py b/python/dask_cudf/dask_cudf/_expr/groupby.py index a5cdd43169b..59ea3251fbf 100644 --- a/python/dask_cudf/dask_cudf/_expr/groupby.py +++ b/python/dask_cudf/dask_cudf/_expr/groupby.py @@ -3,15 +3,28 @@ import numpy as np import pandas as pd -from dask_expr._collection import new_collection -from dask_expr._groupby import ( - DecomposableGroupbyAggregation, - GroupBy as DXGroupBy, - GroupbyAggregation, - SeriesGroupBy as DXSeriesGroupBy, - SingleAggregation, -) -from dask_expr._util import is_scalar + +try: + from dask.dataframe.dask_expr._collection import new_collection + from dask.dataframe.dask_expr._groupby import ( + DecomposableGroupbyAggregation, + GroupBy as DXGroupBy, + GroupbyAggregation, + SeriesGroupBy as DXSeriesGroupBy, + SingleAggregation, + ) + from dask.dataframe.dask_expr._util import is_scalar +except ImportError: + # TODO: Remove when pinned to dask>2024.12.1 + from dask_expr._collection import new_collection + from dask_expr._groupby import ( + DecomposableGroupbyAggregation, + GroupBy as DXGroupBy, + GroupbyAggregation, + SeriesGroupBy as DXSeriesGroupBy, + SingleAggregation, + ) + from dask_expr._util import is_scalar from dask.dataframe.core import _concat from dask.dataframe.groupby import Aggregation diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index f33733d9583..662304c9062 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -566,20 +566,23 @@ class CudfBackendEntrypoint(DataFrameBackendEntrypoint): Examples -------- >>> import dask - >>> import dask_expr as dx + >>> import dask.dataframe as dd >>> with dask.config.set({"dataframe.backend": "cudf"}): - ... ddf = dx.from_dict({"a": range(10)}) + ... ddf = dd.from_dict({"a": range(10)}) >>> type(ddf._meta) """ @staticmethod def to_backend(data, **kwargs): - import dask_expr as dx - + try: + from dask.dataframe.dask_expr import new_collection + except ImportError: + # TODO: Remove when pinned to dask>2024.12.1 + from dask_expr import new_collection from dask_cudf._expr.expr import ToCudfBackend - return dx.new_collection(ToCudfBackend(data, kwargs)) + return new_collection(ToCudfBackend(data, kwargs)) @staticmethod def from_dict( @@ -590,10 +593,14 @@ def from_dict( columns=None, constructor=cudf.DataFrame, ): - import dask_expr as dx + try: + from dask.dataframe.dask_expr import from_dict + except ImportError: + # TODO: Remove when pinned to dask>2024.12.1 + from dask_expr import from_dict return _default_backend( - dx.from_dict, + from_dict, data, npartitions=npartitions, orient=orient, @@ -617,35 +624,15 @@ def read_csv( storage_options=None, **kwargs, ): - try: - # TODO: Remove when cudf is pinned to dask>2024.12.0 - import dask_expr as dx - from dask_expr.io.csv import ReadCSV - from fsspec.utils import stringify_path - - if not isinstance(path, str): - path = stringify_path(path) - return dx.new_collection( - ReadCSV( - path, - dtype_backend=dtype_backend, - storage_options=storage_options, - kwargs=kwargs, - header=header, - dataframe_backend="cudf", - ) - ) - except ImportError: - # Requires dask>2024.12.0 - from dask_cudf.io.csv import read_csv - - return read_csv( - path, - *args, - header=header, - storage_options=storage_options, - **kwargs, - ) + from dask_cudf.io.csv import read_csv + + return read_csv( + path, + *args, + header=header, + storage_options=storage_options, + **kwargs, + ) @staticmethod def read_json(*args, **kwargs): diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index a953dce787d..26a6e0809d7 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -11,14 +11,28 @@ import numpy as np import pandas as pd -from dask_expr._expr import Elemwise -from dask_expr._util import _convert_to_list -from dask_expr.io.io import FusedIO, FusedParquetIO -from dask_expr.io.parquet import ( - FragmentWrapper, - ReadParquetFSSpec, - ReadParquetPyarrowFS, -) + +try: + from dask.dataframe.dask_expr import new_collection + from dask.dataframe.dask_expr._expr import Elemwise + from dask.dataframe.dask_expr._util import _convert_to_list + from dask.dataframe.dask_expr.io.io import FusedIO, FusedParquetIO + from dask.dataframe.dask_expr.io.parquet import ( + FragmentWrapper, + ReadParquetFSSpec, + ReadParquetPyarrowFS, + ) +except ImportError: + # TODO: Remove when pinned to dask>2024.12.1 + from dask_expr import new_collection + from dask_expr._expr import Elemwise + from dask_expr._util import _convert_to_list + from dask_expr.io.io import FusedIO, FusedParquetIO + from dask_expr.io.parquet import ( + FragmentWrapper, + ReadParquetFSSpec, + ReadParquetPyarrowFS, + ) from dask._task_spec import Task from dask.dataframe.io.parquet.arrow import _filters_to_expression @@ -698,7 +712,6 @@ def read_parquet_expr( using the ``read`` key-word argument. """ - import dask_expr as dx from fsspec.utils import stringify_path from pyarrow import fs as pa_fs @@ -785,7 +798,7 @@ def read_parquet_expr( "parquet_file_extension is not supported when using the pyarrow filesystem." ) - return dx.new_collection( + return new_collection( NoOp( CudfReadParquetPyarrowFS( path, @@ -806,7 +819,7 @@ def read_parquet_expr( ) ) - return dx.new_collection( + return new_collection( NoOp( CudfReadParquetFSSpec( path, From 88e078d7fc209c8cfc425b40a500ed7afeda8eb0 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 14 Jan 2025 06:32:48 -0800 Subject: [PATCH 02/14] update the error message --- python/dask_cudf/dask_cudf/_expr/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/dask_cudf/dask_cudf/_expr/__init__.py b/python/dask_cudf/dask_cudf/_expr/__init__.py index e2ef0626d25..56e00001ec8 100644 --- a/python/dask_cudf/dask_cudf/_expr/__init__.py +++ b/python/dask_cudf/dask_cudf/_expr/__init__.py @@ -9,6 +9,6 @@ if not dd._dask_expr_enabled(): raise ValueError( - "The legacy DataFrame API is not supported in dask_cudf>24.12. " - "Please enable query-planning, or downgrade to dask_cudf<=24.12" + "The legacy DataFrame API is not supported for RAPIDS >24.12. " + "The 'dataframe.query-planning' config must be True or None." ) From 1f77ec46a6782760b3cef2e3aff35ae90a495fa1 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 14 Jan 2025 12:01:55 -0800 Subject: [PATCH 03/14] add basic shuffle support --- .../cudf_polars/experimental/parallel.py | 5 +- .../cudf_polars/experimental/shuffle.py | 267 ++++++++++++++++++ .../tests/experimental/test_shuffle.py | 48 ++++ 3 files changed, 318 insertions(+), 2 deletions(-) create mode 100644 python/cudf_polars/cudf_polars/experimental/shuffle.py create mode 100644 python/cudf_polars/tests/experimental/test_shuffle.py diff --git a/python/cudf_polars/cudf_polars/experimental/parallel.py b/python/cudf_polars/cudf_polars/experimental/parallel.py index 6843ed9ee2e..5a5eaab8b2f 100644 --- a/python/cudf_polars/cudf_polars/experimental/parallel.py +++ b/python/cudf_polars/cudf_polars/experimental/parallel.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 """Multi-partition Dask execution.""" @@ -10,7 +10,8 @@ from typing import TYPE_CHECKING, Any import cudf_polars.experimental.io -import cudf_polars.experimental.select # noqa: F401 +import cudf_polars.experimental.select +import cudf_polars.experimental.shuffle # noqa: F401 from cudf_polars.dsl.ir import IR, Cache, Filter, HStack, Projection, Select, Union from cudf_polars.dsl.traversal import CachingVisitor, traversal from cudf_polars.experimental.base import PartitionInfo, _concat, get_key_name diff --git a/python/cudf_polars/cudf_polars/experimental/shuffle.py b/python/cudf_polars/cudf_polars/experimental/shuffle.py new file mode 100644 index 00000000000..e000a783295 --- /dev/null +++ b/python/cudf_polars/cudf_polars/experimental/shuffle.py @@ -0,0 +1,267 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +"""Shuffle Logic.""" + +from __future__ import annotations + +import json +import operator +from typing import TYPE_CHECKING, Any + +import pyarrow as pa + +import pylibcudf as plc + +from cudf_polars.containers import Column, DataFrame +from cudf_polars.dsl.ir import IR +from cudf_polars.experimental.base import _concat, get_key_name +from cudf_polars.experimental.dispatch import generate_ir_tasks, lower_ir_node + +if TYPE_CHECKING: + from collections.abc import Hashable, MutableMapping + + from cudf_polars.dsl.expr import NamedExpr + from cudf_polars.experimental.dispatch import LowerIRTransformer + from cudf_polars.experimental.parallel import PartitionInfo + from cudf_polars.typing import Schema + + +class HashIndex(IR): + """Construct a hash-based index for shuffling.""" + + __slots__ = ("count", "keys") + _non_child = ("schema", "keys", "count") + keys: tuple[NamedExpr, ...] + """Columns to hash partition on.""" + count: int + """Number of output partitions.""" + + def __init__( + self, + schema: Schema, + keys: tuple[NamedExpr, ...], + count: int, + df: IR, + ): + self.schema = schema + self.keys = keys + self.count = count + self._non_child_args = (keys, next(iter(schema)), count) + self.children = (df,) + + @classmethod + def do_evaluate( + cls, + keys: tuple[NamedExpr, ...], + name: str, + count: int, + df: DataFrame, + ): + """Evaluate and return a dataframe.""" + partition_map = Column( + plc.binaryop.binary_operation( + plc.hashing.murmurhash3_x86_32( + DataFrame([expr.evaluate(df) for expr in keys]).table + ), + plc.interop.from_arrow(pa.scalar(count, type="uint32")), + plc.binaryop.BinaryOperator.PYMOD, + plc.types.DataType(plc.types.TypeId.UINT32), + ), + name=name, + ) + return DataFrame([partition_map]) + + +class ShuffleByIndex(IR): + """Suffle multi-partition data by a partition index.""" + + __slots__ = ("count", "options") + _non_child = ("schema", "options", "count") + options: dict[str, Any] + """Shuffling options.""" + count: int + """Number of output partitions.""" + + def __init__( + self, + schema: Schema, + options: dict[str, Any], + count: int, + df: IR, + partition_index: IR, + ): + self.schema = schema + self.options = options + self.count = count + self._non_child_args = (count,) + self.children = (df, partition_index) + + def get_hashable(self) -> Hashable: + """Hashable representation of the node.""" + return ( + type(self), + tuple(self.schema.items()), + json.dumps(self.options), + self.count, + self.children, + ) + + @classmethod + def do_evaluate( + cls, count: int, df: DataFrame, partition_index: DataFrame + ): # pragma: no cover + """Evaluate and return a dataframe.""" + # Single-partition logic is a no-op + return df + + +class ShuffleByHash(IR): + """Suffle data by hash partitioning.""" + + __slots__ = ("keys", "options") + _non_child = ("schema", "options", "keys") + keys: tuple[NamedExpr, ...] + """Columns to shuffle on.""" + options: dict[str, Any] + """Shuffling options.""" + + def __init__( + self, + schema: Schema, + keys: tuple[NamedExpr, ...], + options: dict[str, Any], + df: IR, + ): + self.schema = schema + self.keys = keys + self.options = options + self._non_child_args = () + self.children = (df,) + + def get_hashable(self) -> Hashable: + """Hashable representation of the node.""" + return ( + type(self), + tuple(self.schema.items()), + self.keys, + json.dumps(self.options), + self.children, + ) + + @classmethod + def do_evaluate(cls, df: DataFrame): # pragma: no cover + """Evaluate and return a dataframe.""" + # Single-partition logic is a no-op + return df + + +def _split_by_index( + df: DataFrame, + index: DataFrame, + count: int, +) -> dict[int, DataFrame]: + # Apply partitioning + assert len(index.column_map) == 1, "Partition index has too many columns." + t, offsets = plc.partitioning.partition( + df.table, + next(iter(index.column_map.values())).obj, + count, + ) + + # Split and return the partitioned result + return { + i: DataFrame.from_table( + split, + df.column_names, + ) + for i, split in enumerate(plc.copying.split(t, offsets[1:-1])) + } + + +def _simple_shuffle_graph( + name_out: str, + name_in: str, + name_index: str, + count_in: int, + count_out: int, +) -> MutableMapping[Any, Any]: + """Make a simple all-to-all shuffle graph.""" + # Simple all-to-all shuffle (for now) + split_name = f"split-{name_out}" + inter_name = f"inter-{name_out}" + + graph: MutableMapping[Any, Any] = {} + for part_out in range(count_out): + _concat_list = [] + for part_in in range(count_in): + graph[(split_name, part_in)] = ( + _split_by_index, + (name_in, part_in), + (name_index, part_in), + count_out, + ) + _concat_list.append((inter_name, part_out, part_in)) + graph[_concat_list[-1]] = ( + operator.getitem, + (split_name, part_in), + part_out, + ) + graph[(name_out, part_out)] = (_concat, _concat_list) + return graph + + +@lower_ir_node.register(ShuffleByHash) +def _( + ir: ShuffleByHash, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + from cudf_polars.experimental.parallel import PartitionInfo + + # Extract child partitioning + child, partition_info = rec(ir.children[0]) + pi = partition_info[child] + + # Add a HashIndex node + partition_index = HashIndex( + {"_shuffle_index": plc.types.DataType(plc.types.TypeId.UINT32)}, + ir.keys, + pi.count, + child, + ) + partition_info[partition_index] = PartitionInfo(count=pi.count) + + # Shuffle by the HashIndex node + new_node = ShuffleByIndex( + child.schema, + ir.options, + pi.count, + child, + partition_index, + ) + partition_info[new_node] = PartitionInfo(count=pi.count) + return new_node, partition_info + + +@generate_ir_tasks.register(HashIndex) +def _( + ir: HashIndex, partition_info: MutableMapping[IR, PartitionInfo] +) -> MutableMapping[Any, Any]: + # HashIndex is a partition-wise operation + from cudf_polars.experimental.parallel import _generate_ir_tasks_pwise + + return _generate_ir_tasks_pwise(ir, partition_info) + + +@generate_ir_tasks.register(ShuffleByIndex) +def _( + ir: ShuffleByIndex, partition_info: MutableMapping[IR, PartitionInfo] +) -> MutableMapping[Any, Any]: + # Use a simple all-to-all shuffle graph. + # TODO: Optionally use rapidsmp. + child_ir, index_ir = ir.children + return _simple_shuffle_graph( + get_key_name(ir), + get_key_name(child_ir), + get_key_name(index_ir), + partition_info[child_ir].count, + partition_info[ir].count, + ) diff --git a/python/cudf_polars/tests/experimental/test_shuffle.py b/python/cudf_polars/tests/experimental/test_shuffle.py new file mode 100644 index 00000000000..c22bfdfdfcf --- /dev/null +++ b/python/cudf_polars/tests/experimental/test_shuffle.py @@ -0,0 +1,48 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import pytest + +import polars as pl + +from cudf_polars import Translator +from cudf_polars.dsl.expr import Col, NamedExpr +from cudf_polars.experimental.parallel import evaluate_dask +from cudf_polars.experimental.shuffle import ShuffleByHash + + +@pytest.fixture(scope="module") +def engine(): + return pl.GPUEngine( + raise_on_fail=True, + executor="dask-experimental", + executor_options={"max_rows_per_partition": 30000}, + ) + + +@pytest.fixture(scope="module") +def df(): + return pl.LazyFrame( + { + "a": [1, 2, 3, 4, 5, 6, 7], + "b": [1, 1, 1, 1, 1, 1, 1], + } + ) + + +def test_hash_shuffle(df, engine): + # Extract translated IR + qir = Translator(df._ldf.visit(), engine).translate_ir() + + # Add ShuffleByHash node + qir_shuffled = ShuffleByHash( + qir.schema, + (NamedExpr("a", Col(qir.schema["a"], "a")),), + {}, + qir, + ) + + # Check that Dask evaluation works + evaluate_dask(qir_shuffled) From 8c52fde1b0fdd527e48ba31621ff40789cfba70f Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 15 Jan 2025 09:22:30 -0800 Subject: [PATCH 04/14] major revision --- .../cudf_polars/experimental/base.py | 12 +- .../cudf_polars/experimental/shuffle.py | 258 ++++++++---------- .../tests/experimental/test_shuffle.py | 39 ++- 3 files changed, 154 insertions(+), 155 deletions(-) diff --git a/python/cudf_polars/cudf_polars/experimental/base.py b/python/cudf_polars/cudf_polars/experimental/base.py index 8f660632df2..9b81280e4a3 100644 --- a/python/cudf_polars/cudf_polars/experimental/base.py +++ b/python/cudf_polars/cudf_polars/experimental/base.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 """Multi-partition base classes.""" @@ -12,6 +12,7 @@ from collections.abc import Iterator, Sequence from cudf_polars.containers import DataFrame + from cudf_polars.dsl.expr import NamedExpr from cudf_polars.dsl.nodebase import Node @@ -22,10 +23,15 @@ class PartitionInfo: This class only tracks the partition count (for now). """ - __slots__ = ("count",) + __slots__ = ("count", "partitioned_on") - def __init__(self, count: int): + def __init__( + self, + count: int, + partitioned_on: tuple[NamedExpr, ...] = (), + ): self.count = count + self.partitioned_on = partitioned_on def keys(self, node: Node) -> Iterator[tuple[str, int]]: """Return the partitioned keys for a given node.""" diff --git a/python/cudf_polars/cudf_polars/experimental/shuffle.py b/python/cudf_polars/cudf_polars/experimental/shuffle.py index e000a783295..db632334f77 100644 --- a/python/cudf_polars/cudf_polars/experimental/shuffle.py +++ b/python/cudf_polars/cudf_polars/experimental/shuffle.py @@ -6,13 +6,14 @@ import json import operator +from functools import reduce from typing import TYPE_CHECKING, Any import pyarrow as pa import pylibcudf as plc -from cudf_polars.containers import Column, DataFrame +from cudf_polars.containers import DataFrame from cudf_polars.dsl.ir import IR from cudf_polars.experimental.base import _concat, get_key_name from cudf_polars.experimental.dispatch import generate_ir_tasks, lower_ir_node @@ -26,117 +27,47 @@ from cudf_polars.typing import Schema -class HashIndex(IR): - """Construct a hash-based index for shuffling.""" +class Shuffle(IR): + """ + Suffle multi-partition data. - __slots__ = ("count", "keys") - _non_child = ("schema", "keys", "count") - keys: tuple[NamedExpr, ...] - """Columns to hash partition on.""" - count: int - """Number of output partitions.""" + Notes + ----- + A Shuffle node may have either one or two children. In both + cases, the first child corresponds to the DataFrame we are + shuffling. The optional second child corresponds to a distinct + DataFrame to extract the shuffle keys from. For example, it + may be useful to reference a distinct DataFrame in the case + of sorting. - def __init__( - self, - schema: Schema, - keys: tuple[NamedExpr, ...], - count: int, - df: IR, - ): - self.schema = schema - self.keys = keys - self.count = count - self._non_child_args = (keys, next(iter(schema)), count) - self.children = (df,) - - @classmethod - def do_evaluate( - cls, - keys: tuple[NamedExpr, ...], - name: str, - count: int, - df: DataFrame, - ): - """Evaluate and return a dataframe.""" - partition_map = Column( - plc.binaryop.binary_operation( - plc.hashing.murmurhash3_x86_32( - DataFrame([expr.evaluate(df) for expr in keys]).table - ), - plc.interop.from_arrow(pa.scalar(count, type="uint32")), - plc.binaryop.BinaryOperator.PYMOD, - plc.types.DataType(plc.types.TypeId.UINT32), - ), - name=name, - ) - return DataFrame([partition_map]) - - -class ShuffleByIndex(IR): - """Suffle multi-partition data by a partition index.""" - - __slots__ = ("count", "options") - _non_child = ("schema", "options", "count") - options: dict[str, Any] - """Shuffling options.""" - count: int - """Number of output partitions.""" - - def __init__( - self, - schema: Schema, - options: dict[str, Any], - count: int, - df: IR, - partition_index: IR, - ): - self.schema = schema - self.options = options - self.count = count - self._non_child_args = (count,) - self.children = (df, partition_index) - - def get_hashable(self) -> Hashable: - """Hashable representation of the node.""" - return ( - type(self), - tuple(self.schema.items()), - json.dumps(self.options), - self.count, - self.children, - ) - - @classmethod - def do_evaluate( - cls, count: int, df: DataFrame, partition_index: DataFrame - ): # pragma: no cover - """Evaluate and return a dataframe.""" - # Single-partition logic is a no-op - return df - - -class ShuffleByHash(IR): - """Suffle data by hash partitioning.""" + The type of argument `keys` controls whether or not hash + partitioning will be applied. If `keys` is a tuple, we + assume that the corresponding columns must be hashed. If + `keys` is a `NamedExpr`, we assume that the corresponding + column already contains a direct partition mapping. + """ __slots__ = ("keys", "options") - _non_child = ("schema", "options", "keys") - keys: tuple[NamedExpr, ...] - """Columns to shuffle on.""" + _non_child = ("schema", "keys", "options") + keys: tuple[NamedExpr, ...] | NamedExpr + """Keys to shuffle on.""" options: dict[str, Any] """Shuffling options.""" def __init__( self, schema: Schema, - keys: tuple[NamedExpr, ...], + keys: tuple[NamedExpr, ...] | NamedExpr, options: dict[str, Any], - df: IR, + *children: IR, ): self.schema = schema self.keys = keys self.options = options self._non_child_args = () - self.children = (df,) + self.children = children + if len(children) > 2: # pragma: no cover + raise ValueError(f"Expected a maximum of two children, got {children}") def get_hashable(self) -> Hashable: """Hashable representation of the node.""" @@ -155,16 +86,53 @@ def do_evaluate(cls, df: DataFrame): # pragma: no cover return df -def _split_by_index( +def _partition_dataframe( df: DataFrame, - index: DataFrame, + index: DataFrame | None, + keys: tuple[NamedExpr, ...] | NamedExpr, count: int, ) -> dict[int, DataFrame]: + """ + Partition an input DataFrame for shuffling. + + Parameters + ---------- + df + DataFrame to partition. + index + Optional DataFrame from which to extract partitioning + keys. If None, keys will be extracted from `df`. + keys + Shuffle key(s) to extract from index or df. + count + Total number of output partitions. + + Returns + ------- + A dictionary mapping between int partition indices and + DataFrame fragments. + """ + # Extract output-partition mapping + if isinstance(keys, tuple): + # Hash the specified keys to calculate the output + # partition for each row (partition_map) + partition_map = plc.binaryop.binary_operation( + plc.hashing.murmurhash3_x86_32( + DataFrame([expr.evaluate(index or df) for expr in keys]).table + ), + plc.interop.from_arrow(pa.scalar(count, type="uint32")), + plc.binaryop.BinaryOperator.PYMOD, + plc.types.DataType(plc.types.TypeId.UINT32), + ) + else: # pragma: no cover + # Specified key column already contains the + # output-partition index in each row + partition_map = keys.evaluate(index or df).obj + # Apply partitioning - assert len(index.column_map) == 1, "Partition index has too many columns." t, offsets = plc.partitioning.partition( df.table, - next(iter(index.column_map.values())).obj, + partition_map, count, ) @@ -181,12 +149,12 @@ def _split_by_index( def _simple_shuffle_graph( name_out: str, name_in: str, - name_index: str, + name_index: str | None, + keys: tuple[NamedExpr, ...] | NamedExpr, count_in: int, count_out: int, ) -> MutableMapping[Any, Any]: """Make a simple all-to-all shuffle graph.""" - # Simple all-to-all shuffle (for now) split_name = f"split-{name_out}" inter_name = f"inter-{name_out}" @@ -195,9 +163,10 @@ def _simple_shuffle_graph( _concat_list = [] for part_in in range(count_in): graph[(split_name, part_in)] = ( - _split_by_index, + _partition_dataframe, (name_in, part_in), - (name_index, part_in), + None if name_index is None else (name_index, part_in), + keys, count_out, ) _concat_list.append((inter_name, part_out, part_in)) @@ -210,58 +179,67 @@ def _simple_shuffle_graph( return graph -@lower_ir_node.register(ShuffleByHash) +@lower_ir_node.register(Shuffle) def _( - ir: ShuffleByHash, rec: LowerIRTransformer + ir: Shuffle, rec: LowerIRTransformer ) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + # Simple lower_ir_node handling for the default hash-based shuffle. + # More-complex logic (e.g. joining and sorting) should + # be handled separately. from cudf_polars.experimental.parallel import PartitionInfo - # Extract child partitioning - child, partition_info = rec(ir.children[0]) - pi = partition_info[child] - - # Add a HashIndex node - partition_index = HashIndex( - {"_shuffle_index": plc.types.DataType(plc.types.TypeId.UINT32)}, - ir.keys, - pi.count, - child, - ) - partition_info[partition_index] = PartitionInfo(count=pi.count) - - # Shuffle by the HashIndex node - new_node = ShuffleByIndex( - child.schema, - ir.options, - pi.count, - child, - partition_index, - ) - partition_info[new_node] = PartitionInfo(count=pi.count) - return new_node, partition_info + # Check ir.keys + if not isinstance(ir.keys, tuple): # pragma: no cover + raise NotImplementedError( + f"Default hash Shuffle does not support NamedExpr keys argument. Got {ir.keys}" + ) + # Extract child partitioning + children, _partition_info = zip(*(rec(c) for c in ir.children), strict=True) + partition_info = reduce(operator.or_, _partition_info) + pi = partition_info[children[0]] + + # Check child count + if len(children) > 1: # pragma: no cover + raise NotImplementedError( + f"Default hash Shuffle does not support multiple children. Got {children}" + ) -@generate_ir_tasks.register(HashIndex) -def _( - ir: HashIndex, partition_info: MutableMapping[IR, PartitionInfo] -) -> MutableMapping[Any, Any]: - # HashIndex is a partition-wise operation - from cudf_polars.experimental.parallel import _generate_ir_tasks_pwise + # Check if we are already shuffled or update partition_info + if ir.keys == pi.partitioned_on: + # Already shuffled! + new_node = children[0] + else: + new_node = ir.reconstruct(children) + partition_info[new_node] = PartitionInfo( + # Default shuffle preserves partition count + count=pi.count, + # Add partitioned_on info + partitioned_on=ir.keys, + ) - return _generate_ir_tasks_pwise(ir, partition_info) + return new_node, partition_info -@generate_ir_tasks.register(ShuffleByIndex) +@generate_ir_tasks.register(Shuffle) def _( - ir: ShuffleByIndex, partition_info: MutableMapping[IR, PartitionInfo] + ir: Shuffle, partition_info: MutableMapping[IR, PartitionInfo] ) -> MutableMapping[Any, Any]: # Use a simple all-to-all shuffle graph. + # TODO: Optionally use rapidsmp. - child_ir, index_ir = ir.children + if len(ir.children) > 1: # pragma: no cover + child_ir, index_ir = ir.children + index_name = get_key_name(index_ir) + else: + child_ir = ir.children[0] + index_name = None + return _simple_shuffle_graph( get_key_name(ir), get_key_name(child_ir), - get_key_name(index_ir), + index_name, + ir.keys, partition_info[child_ir].count, partition_info[ir].count, ) diff --git a/python/cudf_polars/tests/experimental/test_shuffle.py b/python/cudf_polars/tests/experimental/test_shuffle.py index c22bfdfdfcf..e2ab852fbcf 100644 --- a/python/cudf_polars/tests/experimental/test_shuffle.py +++ b/python/cudf_polars/tests/experimental/test_shuffle.py @@ -9,8 +9,8 @@ from cudf_polars import Translator from cudf_polars.dsl.expr import Col, NamedExpr -from cudf_polars.experimental.parallel import evaluate_dask -from cudf_polars.experimental.shuffle import ShuffleByHash +from cudf_polars.experimental.parallel import evaluate_dask, lower_ir_graph +from cudf_polars.experimental.shuffle import Shuffle @pytest.fixture(scope="module") @@ -26,8 +26,9 @@ def engine(): def df(): return pl.LazyFrame( { - "a": [1, 2, 3, 4, 5, 6, 7], - "b": [1, 1, 1, 1, 1, 1, 1], + "x": [1, 2, 3, 4, 5, 6, 7], + "y": [1, 1, 1, 1, 1, 1, 1], + "z": ["a", "b", "c", "d", "e", "f", "g"], } ) @@ -36,13 +37,27 @@ def test_hash_shuffle(df, engine): # Extract translated IR qir = Translator(df._ldf.visit(), engine).translate_ir() - # Add ShuffleByHash node - qir_shuffled = ShuffleByHash( - qir.schema, - (NamedExpr("a", Col(qir.schema["a"], "a")),), - {}, - qir, - ) + # Add first Shuffle node + keys = (NamedExpr("x", Col(qir.schema["x"], "x")),) + options = {} + qir1 = Shuffle(qir.schema, keys, options, qir) + + # Add second Shuffle node (on the same keys) + qir2 = Shuffle(qir.schema, keys, options, qir1) + + # Check that sequential shuffles on the same keys + # are replaced with a single shuffle node + partition_info = lower_ir_graph(qir2)[1] + assert len([node for node in partition_info if isinstance(node, Shuffle)]) == 1 + + # Add second Shuffle node (on different keys) + keys2 = (NamedExpr("z", Col(qir.schema["z"], "z")),) + qir3 = Shuffle(qir2.schema, keys2, options, qir2) + + # Check that we have an additional shuffle + # node after shuffling on different keys + partition_info = lower_ir_graph(qir3)[1] + assert len([node for node in partition_info if isinstance(node, Shuffle)]) == 2 # Check that Dask evaluation works - evaluate_dask(qir_shuffled) + evaluate_dask(qir3) From f714a510e8236b380ae263031722254e5ac1e350 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 15 Jan 2025 09:32:56 -0800 Subject: [PATCH 05/14] roll back unrelated changes --- python/dask_cudf/dask_cudf/__init__.py | 6 ++ python/dask_cudf/dask_cudf/_expr/__init__.py | 15 +---- .../dask_cudf/dask_cudf/_expr/collection.py | 30 +++------- python/dask_cudf/dask_cudf/_expr/expr.py | 23 ++------ python/dask_cudf/dask_cudf/_expr/groupby.py | 31 +++------- python/dask_cudf/dask_cudf/backends.py | 59 +++++++++++-------- python/dask_cudf/dask_cudf/io/parquet.py | 35 ++++------- 7 files changed, 77 insertions(+), 122 deletions(-) diff --git a/python/dask_cudf/dask_cudf/__init__.py b/python/dask_cudf/dask_cudf/__init__.py index bbed7ee9806..863102103ed 100644 --- a/python/dask_cudf/dask_cudf/__init__.py +++ b/python/dask_cudf/dask_cudf/__init__.py @@ -11,6 +11,12 @@ from ._version import __git_commit__, __version__ # noqa: F401 from .core import DataFrame, Index, Series, _deprecated_api, concat, from_cudf +if not (QUERY_PLANNING_ON := dd._dask_expr_enabled()): + raise ValueError( + "The legacy DataFrame API is not supported in dask_cudf>24.12. " + "Please enable query-planning, or downgrade to dask_cudf<=24.12" + ) + def read_csv(*args, **kwargs): with config.set({"dataframe.backend": "cudf"}): diff --git a/python/dask_cudf/dask_cudf/_expr/__init__.py b/python/dask_cudf/dask_cudf/_expr/__init__.py index 56e00001ec8..3c827d4ff59 100644 --- a/python/dask_cudf/dask_cudf/_expr/__init__.py +++ b/python/dask_cudf/dask_cudf/_expr/__init__.py @@ -1,14 +1 @@ -# Copyright (c) 2024-2025, NVIDIA CORPORATION. - -try: - from dask.dataframe import dask_expr # noqa: F401 - -except ImportError: - # TODO: Remove when pinned to dask>2024.12.1 - import dask.dataframe as dd - - if not dd._dask_expr_enabled(): - raise ValueError( - "The legacy DataFrame API is not supported for RAPIDS >24.12. " - "The 'dataframe.query-planning' config must be True or None." - ) +# Copyright (c) 2024, NVIDIA CORPORATION. diff --git a/python/dask_cudf/dask_cudf/_expr/collection.py b/python/dask_cudf/dask_cudf/_expr/collection.py index ab7cc634251..e8c9a970b7b 100644 --- a/python/dask_cudf/dask_cudf/_expr/collection.py +++ b/python/dask_cudf/dask_cudf/_expr/collection.py @@ -3,27 +3,15 @@ import warnings from functools import cached_property -try: - from dask.dataframe.dask_expr import ( - DataFrame as DXDataFrame, - FrameBase, - Index as DXIndex, - Series as DXSeries, - get_collection_type, - ) - from dask.dataframe.dask_expr._collection import new_collection - from dask.dataframe.dask_expr._util import _raise_if_object_series -except ImportError: - # TODO: Remove when pinned to dask>2024.12.1 - from dask_expr import ( - DataFrame as DXDataFrame, - FrameBase, - Index as DXIndex, - Series as DXSeries, - get_collection_type, - ) - from dask_expr._collection import new_collection - from dask_expr._util import _raise_if_object_series +from dask_expr import ( + DataFrame as DXDataFrame, + FrameBase, + Index as DXIndex, + Series as DXSeries, + get_collection_type, +) +from dask_expr._collection import new_collection +from dask_expr._util import _raise_if_object_series from dask import config from dask.dataframe.core import is_dataframe_like diff --git a/python/dask_cudf/dask_cudf/_expr/expr.py b/python/dask_cudf/dask_cudf/_expr/expr.py index 45ce9f58366..03d1da0d258 100644 --- a/python/dask_cudf/dask_cudf/_expr/expr.py +++ b/python/dask_cudf/dask_cudf/_expr/expr.py @@ -1,24 +1,11 @@ # Copyright (c) 2024-2025, NVIDIA CORPORATION. import functools -try: - import dask.dataframe.dask_expr._shuffle as _shuffle_module - from dask.dataframe.dask_expr import new_collection - from dask.dataframe.dask_expr._cumulative import CumulativeBlockwise - from dask.dataframe.dask_expr._expr import ( - Elemwise, - Expr, - RenameAxis, - VarColumns, - ) - from dask.dataframe.dask_expr._reductions import Reduction, Var -except ImportError: - # TODO: Remove when pinned to dask>2024.12.1 - import dask_expr._shuffle as _shuffle_module - from dask_expr import new_collection - from dask_expr._cumulative import CumulativeBlockwise - from dask_expr._expr import Elemwise, Expr, RenameAxis, VarColumns - from dask_expr._reductions import Reduction, Var +import dask_expr._shuffle as _shuffle_module +from dask_expr import new_collection +from dask_expr._cumulative import CumulativeBlockwise +from dask_expr._expr import Elemwise, Expr, RenameAxis, VarColumns +from dask_expr._reductions import Reduction, Var from dask.dataframe.dispatch import ( is_categorical_dtype, diff --git a/python/dask_cudf/dask_cudf/_expr/groupby.py b/python/dask_cudf/dask_cudf/_expr/groupby.py index 59ea3251fbf..a5cdd43169b 100644 --- a/python/dask_cudf/dask_cudf/_expr/groupby.py +++ b/python/dask_cudf/dask_cudf/_expr/groupby.py @@ -3,28 +3,15 @@ import numpy as np import pandas as pd - -try: - from dask.dataframe.dask_expr._collection import new_collection - from dask.dataframe.dask_expr._groupby import ( - DecomposableGroupbyAggregation, - GroupBy as DXGroupBy, - GroupbyAggregation, - SeriesGroupBy as DXSeriesGroupBy, - SingleAggregation, - ) - from dask.dataframe.dask_expr._util import is_scalar -except ImportError: - # TODO: Remove when pinned to dask>2024.12.1 - from dask_expr._collection import new_collection - from dask_expr._groupby import ( - DecomposableGroupbyAggregation, - GroupBy as DXGroupBy, - GroupbyAggregation, - SeriesGroupBy as DXSeriesGroupBy, - SingleAggregation, - ) - from dask_expr._util import is_scalar +from dask_expr._collection import new_collection +from dask_expr._groupby import ( + DecomposableGroupbyAggregation, + GroupBy as DXGroupBy, + GroupbyAggregation, + SeriesGroupBy as DXSeriesGroupBy, + SingleAggregation, +) +from dask_expr._util import is_scalar from dask.dataframe.core import _concat from dask.dataframe.groupby import Aggregation diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index 662304c9062..f33733d9583 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -566,23 +566,20 @@ class CudfBackendEntrypoint(DataFrameBackendEntrypoint): Examples -------- >>> import dask - >>> import dask.dataframe as dd + >>> import dask_expr as dx >>> with dask.config.set({"dataframe.backend": "cudf"}): - ... ddf = dd.from_dict({"a": range(10)}) + ... ddf = dx.from_dict({"a": range(10)}) >>> type(ddf._meta) """ @staticmethod def to_backend(data, **kwargs): - try: - from dask.dataframe.dask_expr import new_collection - except ImportError: - # TODO: Remove when pinned to dask>2024.12.1 - from dask_expr import new_collection + import dask_expr as dx + from dask_cudf._expr.expr import ToCudfBackend - return new_collection(ToCudfBackend(data, kwargs)) + return dx.new_collection(ToCudfBackend(data, kwargs)) @staticmethod def from_dict( @@ -593,14 +590,10 @@ def from_dict( columns=None, constructor=cudf.DataFrame, ): - try: - from dask.dataframe.dask_expr import from_dict - except ImportError: - # TODO: Remove when pinned to dask>2024.12.1 - from dask_expr import from_dict + import dask_expr as dx return _default_backend( - from_dict, + dx.from_dict, data, npartitions=npartitions, orient=orient, @@ -624,15 +617,35 @@ def read_csv( storage_options=None, **kwargs, ): - from dask_cudf.io.csv import read_csv - - return read_csv( - path, - *args, - header=header, - storage_options=storage_options, - **kwargs, - ) + try: + # TODO: Remove when cudf is pinned to dask>2024.12.0 + import dask_expr as dx + from dask_expr.io.csv import ReadCSV + from fsspec.utils import stringify_path + + if not isinstance(path, str): + path = stringify_path(path) + return dx.new_collection( + ReadCSV( + path, + dtype_backend=dtype_backend, + storage_options=storage_options, + kwargs=kwargs, + header=header, + dataframe_backend="cudf", + ) + ) + except ImportError: + # Requires dask>2024.12.0 + from dask_cudf.io.csv import read_csv + + return read_csv( + path, + *args, + header=header, + storage_options=storage_options, + **kwargs, + ) @staticmethod def read_json(*args, **kwargs): diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 26a6e0809d7..a953dce787d 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -11,28 +11,14 @@ import numpy as np import pandas as pd - -try: - from dask.dataframe.dask_expr import new_collection - from dask.dataframe.dask_expr._expr import Elemwise - from dask.dataframe.dask_expr._util import _convert_to_list - from dask.dataframe.dask_expr.io.io import FusedIO, FusedParquetIO - from dask.dataframe.dask_expr.io.parquet import ( - FragmentWrapper, - ReadParquetFSSpec, - ReadParquetPyarrowFS, - ) -except ImportError: - # TODO: Remove when pinned to dask>2024.12.1 - from dask_expr import new_collection - from dask_expr._expr import Elemwise - from dask_expr._util import _convert_to_list - from dask_expr.io.io import FusedIO, FusedParquetIO - from dask_expr.io.parquet import ( - FragmentWrapper, - ReadParquetFSSpec, - ReadParquetPyarrowFS, - ) +from dask_expr._expr import Elemwise +from dask_expr._util import _convert_to_list +from dask_expr.io.io import FusedIO, FusedParquetIO +from dask_expr.io.parquet import ( + FragmentWrapper, + ReadParquetFSSpec, + ReadParquetPyarrowFS, +) from dask._task_spec import Task from dask.dataframe.io.parquet.arrow import _filters_to_expression @@ -712,6 +698,7 @@ def read_parquet_expr( using the ``read`` key-word argument. """ + import dask_expr as dx from fsspec.utils import stringify_path from pyarrow import fs as pa_fs @@ -798,7 +785,7 @@ def read_parquet_expr( "parquet_file_extension is not supported when using the pyarrow filesystem." ) - return new_collection( + return dx.new_collection( NoOp( CudfReadParquetPyarrowFS( path, @@ -819,7 +806,7 @@ def read_parquet_expr( ) ) - return new_collection( + return dx.new_collection( NoOp( CudfReadParquetFSSpec( path, From 6b0b9f11b1dff62a79d749e775a70fdb8261ade7 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 22 Jan 2025 08:55:22 -0800 Subject: [PATCH 06/14] address some code review --- python/cudf_polars/cudf_polars/experimental/base.py | 10 +++++----- python/cudf_polars/cudf_polars/experimental/shuffle.py | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/python/cudf_polars/cudf_polars/experimental/base.py b/python/cudf_polars/cudf_polars/experimental/base.py index 9b81280e4a3..36c7745c3f4 100644 --- a/python/cudf_polars/cudf_polars/experimental/base.py +++ b/python/cudf_polars/cudf_polars/experimental/base.py @@ -17,13 +17,13 @@ class PartitionInfo: - """ - Partitioning information. - - This class only tracks the partition count (for now). - """ + """Partitioning information.""" __slots__ = ("count", "partitioned_on") + count: int + """Partition count.""" + partitioned_on: tuple[NamedExpr, ...] + """Columns the data is hash-partitioned on.""" def __init__( self, diff --git a/python/cudf_polars/cudf_polars/experimental/shuffle.py b/python/cudf_polars/cudf_polars/experimental/shuffle.py index db632334f77..c345deabea9 100644 --- a/python/cudf_polars/cudf_polars/experimental/shuffle.py +++ b/python/cudf_polars/cudf_polars/experimental/shuffle.py @@ -29,7 +29,7 @@ class Shuffle(IR): """ - Suffle multi-partition data. + Shuffle multi-partition data. Notes ----- @@ -206,7 +206,7 @@ def _( ) # Check if we are already shuffled or update partition_info - if ir.keys == pi.partitioned_on: + if len(children) == 1 and ir.keys == pi.partitioned_on: # Already shuffled! new_node = children[0] else: From c7b81e31048f02391daaa5b481f9168e4a594390 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 22 Jan 2025 14:42:50 -0800 Subject: [PATCH 07/14] check the result --- python/cudf_polars/tests/experimental/test_shuffle.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/cudf_polars/tests/experimental/test_shuffle.py b/python/cudf_polars/tests/experimental/test_shuffle.py index e2ab852fbcf..2781473984d 100644 --- a/python/cudf_polars/tests/experimental/test_shuffle.py +++ b/python/cudf_polars/tests/experimental/test_shuffle.py @@ -6,6 +6,7 @@ import pytest import polars as pl +from polars.testing import assert_frame_equal from cudf_polars import Translator from cudf_polars.dsl.expr import Col, NamedExpr @@ -60,4 +61,6 @@ def test_hash_shuffle(df, engine): assert len([node for node in partition_info if isinstance(node, Shuffle)]) == 2 # Check that Dask evaluation works - evaluate_dask(qir3) + result = evaluate_dask(qir3) + expect = qir.collect(engine="cpu") + assert_frame_equal(result, expect, check_column_order=False) From fd6e39c41c8ae4a96b6e7cce6e0e0901457fde23 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 22 Jan 2025 14:47:13 -0800 Subject: [PATCH 08/14] fix test --- python/cudf_polars/tests/experimental/test_shuffle.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/cudf_polars/tests/experimental/test_shuffle.py b/python/cudf_polars/tests/experimental/test_shuffle.py index 2781473984d..c9486bb7408 100644 --- a/python/cudf_polars/tests/experimental/test_shuffle.py +++ b/python/cudf_polars/tests/experimental/test_shuffle.py @@ -61,6 +61,6 @@ def test_hash_shuffle(df, engine): assert len([node for node in partition_info if isinstance(node, Shuffle)]) == 2 # Check that Dask evaluation works - result = evaluate_dask(qir3) - expect = qir.collect(engine="cpu") + result = evaluate_dask(qir3).to_polars() + expect = df.collect(engine="cpu") assert_frame_equal(result, expect, check_column_order=False) From f02c146fbca328c09fc49afa3b1506abcba773e8 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 23 Jan 2025 07:57:37 -0800 Subject: [PATCH 09/14] simplify Shuffle (only handle hash-based partitioning for now) --- .../cudf_polars/experimental/shuffle.py | 77 ++++++------------- 1 file changed, 24 insertions(+), 53 deletions(-) diff --git a/python/cudf_polars/cudf_polars/experimental/shuffle.py b/python/cudf_polars/cudf_polars/experimental/shuffle.py index c345deabea9..711e1fae2fe 100644 --- a/python/cudf_polars/cudf_polars/experimental/shuffle.py +++ b/python/cudf_polars/cudf_polars/experimental/shuffle.py @@ -33,23 +33,12 @@ class Shuffle(IR): Notes ----- - A Shuffle node may have either one or two children. In both - cases, the first child corresponds to the DataFrame we are - shuffling. The optional second child corresponds to a distinct - DataFrame to extract the shuffle keys from. For example, it - may be useful to reference a distinct DataFrame in the case - of sorting. - - The type of argument `keys` controls whether or not hash - partitioning will be applied. If `keys` is a tuple, we - assume that the corresponding columns must be hashed. If - `keys` is a `NamedExpr`, we assume that the corresponding - column already contains a direct partition mapping. + Only hash-based partitioning is supported (for now). """ __slots__ = ("keys", "options") _non_child = ("schema", "keys", "options") - keys: tuple[NamedExpr, ...] | NamedExpr + keys: tuple[NamedExpr, ...] """Keys to shuffle on.""" options: dict[str, Any] """Shuffling options.""" @@ -57,17 +46,15 @@ class Shuffle(IR): def __init__( self, schema: Schema, - keys: tuple[NamedExpr, ...] | NamedExpr, + keys: tuple[NamedExpr, ...], options: dict[str, Any], - *children: IR, + df: IR, ): self.schema = schema self.keys = keys self.options = options self._non_child_args = () - self.children = children - if len(children) > 2: # pragma: no cover - raise ValueError(f"Expected a maximum of two children, got {children}") + self.children = (df,) def get_hashable(self) -> Hashable: """Hashable representation of the node.""" @@ -88,22 +75,22 @@ def do_evaluate(cls, df: DataFrame): # pragma: no cover def _partition_dataframe( df: DataFrame, - index: DataFrame | None, - keys: tuple[NamedExpr, ...] | NamedExpr, + keys: tuple[NamedExpr, ...], count: int, ) -> dict[int, DataFrame]: """ Partition an input DataFrame for shuffling. + Notes + ----- + This utility only supports hash partitioning (for now). + Parameters ---------- df DataFrame to partition. - index - Optional DataFrame from which to extract partitioning - keys. If None, keys will be extracted from `df`. keys - Shuffle key(s) to extract from index or df. + Shuffle key(s). count Total number of output partitions. @@ -112,22 +99,16 @@ def _partition_dataframe( A dictionary mapping between int partition indices and DataFrame fragments. """ - # Extract output-partition mapping - if isinstance(keys, tuple): - # Hash the specified keys to calculate the output - # partition for each row (partition_map) - partition_map = plc.binaryop.binary_operation( - plc.hashing.murmurhash3_x86_32( - DataFrame([expr.evaluate(index or df) for expr in keys]).table - ), - plc.interop.from_arrow(pa.scalar(count, type="uint32")), - plc.binaryop.BinaryOperator.PYMOD, - plc.types.DataType(plc.types.TypeId.UINT32), - ) - else: # pragma: no cover - # Specified key column already contains the - # output-partition index in each row - partition_map = keys.evaluate(index or df).obj + # Hash the specified keys to calculate the output + # partition for each row + partition_map = plc.binaryop.binary_operation( + plc.hashing.murmurhash3_x86_32( + DataFrame([expr.evaluate(df) for expr in keys]).table + ), + plc.interop.from_arrow(pa.scalar(count, type="uint32")), + plc.binaryop.BinaryOperator.PYMOD, + plc.types.DataType(plc.types.TypeId.UINT32), + ) # Apply partitioning t, offsets = plc.partitioning.partition( @@ -149,8 +130,7 @@ def _partition_dataframe( def _simple_shuffle_graph( name_out: str, name_in: str, - name_index: str | None, - keys: tuple[NamedExpr, ...] | NamedExpr, + keys: tuple[NamedExpr, ...], count_in: int, count_out: int, ) -> MutableMapping[Any, Any]: @@ -165,7 +145,6 @@ def _simple_shuffle_graph( graph[(split_name, part_in)] = ( _partition_dataframe, (name_in, part_in), - None if name_index is None else (name_index, part_in), keys, count_out, ) @@ -228,18 +207,10 @@ def _( # Use a simple all-to-all shuffle graph. # TODO: Optionally use rapidsmp. - if len(ir.children) > 1: # pragma: no cover - child_ir, index_ir = ir.children - index_name = get_key_name(index_ir) - else: - child_ir = ir.children[0] - index_name = None - return _simple_shuffle_graph( get_key_name(ir), - get_key_name(child_ir), - index_name, + get_key_name(ir.children[0]), ir.keys, - partition_info[child_ir].count, + partition_info[ir.children[0]].count, partition_info[ir].count, ) From 8604e1b4d94bcc90261aedbcac04ab1805c689d5 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 23 Jan 2025 08:00:07 -0800 Subject: [PATCH 10/14] remove multi-child validation --- .../cudf_polars/experimental/shuffle.py | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/python/cudf_polars/cudf_polars/experimental/shuffle.py b/python/cudf_polars/cudf_polars/experimental/shuffle.py index 711e1fae2fe..986d057c08a 100644 --- a/python/cudf_polars/cudf_polars/experimental/shuffle.py +++ b/python/cudf_polars/cudf_polars/experimental/shuffle.py @@ -167,25 +167,13 @@ def _( # be handled separately. from cudf_polars.experimental.parallel import PartitionInfo - # Check ir.keys - if not isinstance(ir.keys, tuple): # pragma: no cover - raise NotImplementedError( - f"Default hash Shuffle does not support NamedExpr keys argument. Got {ir.keys}" - ) - # Extract child partitioning children, _partition_info = zip(*(rec(c) for c in ir.children), strict=True) partition_info = reduce(operator.or_, _partition_info) pi = partition_info[children[0]] - # Check child count - if len(children) > 1: # pragma: no cover - raise NotImplementedError( - f"Default hash Shuffle does not support multiple children. Got {children}" - ) - # Check if we are already shuffled or update partition_info - if len(children) == 1 and ir.keys == pi.partitioned_on: + if ir.keys == pi.partitioned_on: # Already shuffled! new_node = children[0] else: From 9624396ebc399adca3f25d21d86937b20d6cd53f Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 27 Jan 2025 09:03:23 -0800 Subject: [PATCH 11/14] address code review --- .../cudf_polars/experimental/shuffle.py | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/python/cudf_polars/cudf_polars/experimental/shuffle.py b/python/cudf_polars/cudf_polars/experimental/shuffle.py index 986d057c08a..39ee53045c6 100644 --- a/python/cudf_polars/cudf_polars/experimental/shuffle.py +++ b/python/cudf_polars/cudf_polars/experimental/shuffle.py @@ -6,7 +6,6 @@ import json import operator -from functools import reduce from typing import TYPE_CHECKING, Any import pyarrow as pa @@ -167,25 +166,20 @@ def _( # be handled separately. from cudf_polars.experimental.parallel import PartitionInfo - # Extract child partitioning - children, _partition_info = zip(*(rec(c) for c in ir.children), strict=True) - partition_info = reduce(operator.or_, _partition_info) - pi = partition_info[children[0]] - - # Check if we are already shuffled or update partition_info - if ir.keys == pi.partitioned_on: - # Already shuffled! - new_node = children[0] - else: - new_node = ir.reconstruct(children) - partition_info[new_node] = PartitionInfo( - # Default shuffle preserves partition count - count=pi.count, - # Add partitioned_on info - partitioned_on=ir.keys, - ) - - return new_node, partition_info + (child,) = ir.children + + new_child, pi = rec(child) + if ir.keys == pi[new_child].partitioned_on: + # Already shuffled + return new_child, pi + new_node = ir.reconstruct([new_child]) + pi[new_node] = PartitionInfo( + # Default shuffle preserves partition count + count=pi[new_child].count, + # Add partitioned_on info + partitioned_on=ir.keys, + ) + return new_node, pi @generate_ir_tasks.register(Shuffle) From 264fcfdcb2e3f34cc401d5bc4cd563d83fb22c30 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 27 Jan 2025 09:11:53 -0800 Subject: [PATCH 12/14] avoid shuffling single partition --- python/cudf_polars/cudf_polars/experimental/shuffle.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/cudf_polars/cudf_polars/experimental/shuffle.py b/python/cudf_polars/cudf_polars/experimental/shuffle.py index 39ee53045c6..c531d3008cc 100644 --- a/python/cudf_polars/cudf_polars/experimental/shuffle.py +++ b/python/cudf_polars/cudf_polars/experimental/shuffle.py @@ -68,8 +68,7 @@ def get_hashable(self) -> Hashable: @classmethod def do_evaluate(cls, df: DataFrame): # pragma: no cover """Evaluate and return a dataframe.""" - # Single-partition logic is a no-op - return df + raise NotImplementedError("Shuffle.do_evaluate is not supported.") def _partition_dataframe( @@ -169,7 +168,7 @@ def _( (child,) = ir.children new_child, pi = rec(child) - if ir.keys == pi[new_child].partitioned_on: + if pi[new_child].count == 1 or ir.keys == pi[new_child].partitioned_on: # Already shuffled return new_child, pi new_node = ir.reconstruct([new_child]) From 82f9c78088566bd846278a49ca5e5795f97e2057 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 27 Jan 2025 09:28:47 -0800 Subject: [PATCH 13/14] fix test bug --- python/cudf_polars/tests/experimental/test_shuffle.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/cudf_polars/tests/experimental/test_shuffle.py b/python/cudf_polars/tests/experimental/test_shuffle.py index c9486bb7408..294557fd0d6 100644 --- a/python/cudf_polars/tests/experimental/test_shuffle.py +++ b/python/cudf_polars/tests/experimental/test_shuffle.py @@ -19,7 +19,7 @@ def engine(): return pl.GPUEngine( raise_on_fail=True, executor="dask-experimental", - executor_options={"max_rows_per_partition": 30000}, + executor_options={"max_rows_per_partition": 4}, ) @@ -63,4 +63,4 @@ def test_hash_shuffle(df, engine): # Check that Dask evaluation works result = evaluate_dask(qir3).to_polars() expect = df.collect(engine="cpu") - assert_frame_equal(result, expect, check_column_order=False) + assert_frame_equal(result, expect, check_row_order=False) From a502f71624d97ffc3413f949ee1e761a36992941 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 28 Jan 2025 06:25:34 -0800 Subject: [PATCH 14/14] turn do_evaluate back into a no-op --- .../cudf_polars/cudf_polars/experimental/shuffle.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/python/cudf_polars/cudf_polars/experimental/shuffle.py b/python/cudf_polars/cudf_polars/experimental/shuffle.py index c531d3008cc..d49f13375ed 100644 --- a/python/cudf_polars/cudf_polars/experimental/shuffle.py +++ b/python/cudf_polars/cudf_polars/experimental/shuffle.py @@ -52,7 +52,7 @@ def __init__( self.schema = schema self.keys = keys self.options = options - self._non_child_args = () + self._non_child_args = (schema, keys, options) self.children = (df,) def get_hashable(self) -> Hashable: @@ -66,9 +66,16 @@ def get_hashable(self) -> Hashable: ) @classmethod - def do_evaluate(cls, df: DataFrame): # pragma: no cover + def do_evaluate( + cls, + schema: Schema, + keys: tuple[NamedExpr, ...], + options: dict[str, Any], + df: DataFrame, + ): # pragma: no cover """Evaluate and return a dataframe.""" - raise NotImplementedError("Shuffle.do_evaluate is not supported.") + # Single-partition Shuffle evaluation is a no-op + return df def _partition_dataframe(