From f99b510ed1c6cea8c9ce5aa38e03903ca5080c86 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 15 Oct 2024 18:00:36 -0700 Subject: [PATCH 1/3] fix result when file-aggregation is absent --- python/dask_cudf/dask_cudf/expr/_expr.py | 27 ++++++++++++++++++- .../dask_cudf/io/tests/test_parquet.py | 27 ++++++++++++++++++- .../dask_cudf/dask_cudf/io/tests/test_s3.py | 4 +++ 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py index af83a01da98..62f2804a9ec 100644 --- a/python/dask_cudf/dask_cudf/expr/_expr.py +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -8,7 +8,7 @@ from dask_expr._expr import Elemwise, Expr, RenameAxis, VarColumns from dask_expr._reductions import Reduction, Var from dask_expr.io.io import FusedParquetIO -from dask_expr.io.parquet import ReadParquetPyarrowFS +from dask_expr.io.parquet import FragmentWrapper, ReadParquetPyarrowFS from dask.dataframe.core import is_dataframe_like, make_meta, meta_nonempty from dask.dataframe.dispatch import is_categorical_dtype @@ -99,6 +99,31 @@ def _table_to_pandas( df = df.set_index(index_name) return df + def _filtered_task(self, index: int): + columns = self.columns.copy() + index_name = self.index.name + if self.index is not None: + index_name = self.index.name + schema = self._dataset_info["schema"].remove_metadata() + if index_name: + if columns is None: + columns = list(schema.names) + columns.append(index_name) + return ( + self._table_to_pandas, + ( + self._fragment_to_table, + FragmentWrapper(self.fragments[index], filesystem=self.fs), + self.filters, + columns, + schema, + ), + index_name, + None, + None, + None, + ) + def _tune_up(self, parent): if self._fusion_compression_factor >= 1: return diff --git a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py index 620a917109e..d4979801680 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py @@ -15,7 +15,11 @@ import cudf import dask_cudf -from dask_cudf.tests.utils import skip_dask_expr, xfail_dask_expr +from dask_cudf.tests.utils import ( + require_dask_expr, + skip_dask_expr, + xfail_dask_expr, +) # Check if create_metadata_file is supported by # the current dask.dataframe version @@ -615,3 +619,24 @@ def test_timezone_column(tmpdir): got = dask_cudf.read_parquet(path) expect = cudf.read_parquet(path) dd.assert_eq(got, expect) + + +@require_dask_expr() +@pytest.mark.parametrize("min_part_size", ["1B", "1GB"]) +def test_read_parquet_arrow_filesystem(tmpdir, min_part_size): + tmp_path = str(tmpdir) + with dask.config.set( + { + "dataframe.backend": "cudf", + "dataframe.parquet.minimum-partition-size": min_part_size, + } + ): + dd.from_dict( + {"x": range(1000), "y": ["a", "b", "c", "d"] * 250}, + npartitions=10, + ).to_parquet(tmp_path, write_index=False) + df = cudf.read_parquet(tmp_path) + ddf = dask_cudf.read_parquet(tmp_path, filesystem="arrow") + dd.assert_eq(df, ddf, check_index=False) + assert isinstance(ddf._meta, cudf.DataFrame) + assert isinstance(ddf.compute(), cudf.DataFrame) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_s3.py b/python/dask_cudf/dask_cudf/io/tests/test_s3.py index cf8af82e112..90907f6fb99 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_s3.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_s3.py @@ -11,6 +11,8 @@ from dask.dataframe import assert_eq +import cudf + import dask_cudf from dask_cudf.tests.utils import QUERY_PLANNING_ON @@ -168,6 +170,8 @@ def test_read_parquet_filesystem(s3_base, s3so, pdf, filesystem): filesystem=filesystem, ) assert df.b.sum().compute() == 9 + assert isinstance(df._meta, cudf.DataFrame) + assert isinstance(df.compute(), cudf.DataFrame) def test_read_parquet_filesystem_explicit(s3_base, s3so, pdf): From caf06e57cc8014bd7981722c0e9fb812893b786e Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 17 Oct 2024 09:08:37 -0700 Subject: [PATCH 2/3] skip old arrow version --- python/dask_cudf/dask_cudf/io/tests/test_parquet.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py index efe8c24f45c..ae5ca480e31 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py @@ -622,6 +622,10 @@ def test_timezone_column(tmpdir): @require_dask_expr() +@pytest.mark.skipif( + not dask_cudf.backends.PYARROW_GE_15, + reason="Requires pyarrow 15", +) @pytest.mark.parametrize("min_part_size", ["1B", "1GB"]) def test_read_parquet_arrow_filesystem(tmpdir, min_part_size): tmp_path = str(tmpdir) From c93e19b57e3cd8632b87e3afb94405ad61b7cb7c Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 21 Oct 2024 07:23:03 -0700 Subject: [PATCH 3/3] remove unused positional args --- python/dask_cudf/dask_cudf/expr/_expr.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py index ab996b9bd97..c7cf66fbffd 100644 --- a/python/dask_cudf/dask_cudf/expr/_expr.py +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -302,11 +302,7 @@ def _dataset_info(self): return dataset_info @staticmethod - def _table_to_pandas( - table, - index_name, - *args, - ): + def _table_to_pandas(table, index_name): df = cudf.DataFrame.from_arrow(table) if index_name is not None: df = df.set_index(index_name) @@ -332,9 +328,6 @@ def _filtered_task(self, index: int): schema, ), index_name, - None, - None, - None, ) def _tune_up(self, parent):