Skip to content

Commit

Permalink
[Bug] Fix Arrow-FS parquet reader for larger files (#17099)
Browse files Browse the repository at this point in the history
Follow-up to #16684

There is currently a bug in `dask_cudf.read_parquet(..., filesystem="arrow")` when the files are larger than the `"dataframe.parquet.minimum-partition-size"` config. More specifically, when the files are not aggregated together, the output will be `pd.DataFrame` instead of `cudf.DataFrame`.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)

URL: #17099
  • Loading branch information
rjzamora authored Oct 23, 2024
1 parent cff1296 commit 3126f77
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 7 deletions.
30 changes: 24 additions & 6 deletions python/dask_cudf/dask_cudf/expr/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
)
from dask_expr._reductions import Reduction, Var
from dask_expr.io.io import FusedParquetIO
from dask_expr.io.parquet import ReadParquetPyarrowFS
from dask_expr.io.parquet import FragmentWrapper, ReadParquetPyarrowFS

from dask.dataframe.core import (
_concat,
Expand Down Expand Up @@ -302,16 +302,34 @@ def _dataset_info(self):
return dataset_info

@staticmethod
def _table_to_pandas(
table,
index_name,
*args,
):
def _table_to_pandas(table, index_name):
df = cudf.DataFrame.from_arrow(table)
if index_name is not None:
df = df.set_index(index_name)
return df

def _filtered_task(self, index: int):
columns = self.columns.copy()
index_name = self.index.name
if self.index is not None:
index_name = self.index.name
schema = self._dataset_info["schema"].remove_metadata()
if index_name:
if columns is None:
columns = list(schema.names)
columns.append(index_name)
return (
self._table_to_pandas,
(
self._fragment_to_table,
FragmentWrapper(self.fragments[index], filesystem=self.fs),
self.filters,
columns,
schema,
),
index_name,
)

def _tune_up(self, parent):
if self._fusion_compression_factor >= 1:
return
Expand Down
31 changes: 30 additions & 1 deletion python/dask_cudf/dask_cudf/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
import cudf

import dask_cudf
from dask_cudf.tests.utils import skip_dask_expr, xfail_dask_expr
from dask_cudf.tests.utils import (
require_dask_expr,
skip_dask_expr,
xfail_dask_expr,
)

# Check if create_metadata_file is supported by
# the current dask.dataframe version
Expand Down Expand Up @@ -615,3 +619,28 @@ def test_timezone_column(tmpdir):
got = dask_cudf.read_parquet(path)
expect = cudf.read_parquet(path)
dd.assert_eq(got, expect)


@require_dask_expr()
@pytest.mark.skipif(
not dask_cudf.backends.PYARROW_GE_15,
reason="Requires pyarrow 15",
)
@pytest.mark.parametrize("min_part_size", ["1B", "1GB"])
def test_read_parquet_arrow_filesystem(tmpdir, min_part_size):
tmp_path = str(tmpdir)
with dask.config.set(
{
"dataframe.backend": "cudf",
"dataframe.parquet.minimum-partition-size": min_part_size,
}
):
dd.from_dict(
{"x": range(1000), "y": ["a", "b", "c", "d"] * 250},
npartitions=10,
).to_parquet(tmp_path, write_index=False)
df = cudf.read_parquet(tmp_path)
ddf = dask_cudf.read_parquet(tmp_path, filesystem="arrow")
dd.assert_eq(df, ddf, check_index=False)
assert isinstance(ddf._meta, cudf.DataFrame)
assert isinstance(ddf.compute(), cudf.DataFrame)
4 changes: 4 additions & 0 deletions python/dask_cudf/dask_cudf/io/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from dask.dataframe import assert_eq

import cudf

import dask_cudf
from dask_cudf.tests.utils import QUERY_PLANNING_ON

Expand Down Expand Up @@ -168,6 +170,8 @@ def test_read_parquet_filesystem(s3_base, s3so, pdf, filesystem):
filesystem=filesystem,
)
assert df.b.sum().compute() == 9
assert isinstance(df._meta, cudf.DataFrame)
assert isinstance(df.compute(), cudf.DataFrame)


def test_read_parquet_filesystem_explicit(s3_base, s3so, pdf):
Expand Down

0 comments on commit 3126f77

Please sign in to comment.