diff --git a/merlin/core/dispatch.py b/merlin/core/dispatch.py index f09ab341b..c03d59909 100644 --- a/merlin/core/dispatch.py +++ b/merlin/core/dispatch.py @@ -664,6 +664,9 @@ def convert_data(x, cpu=True, to_collection=None, npartitions=1): if isinstance(x, dd.DataFrame): # If input is a dask_cudf collection, convert # to a pandas-backed Dask collection + if hasattr(x, "to_backend"): + # Requires dask>=2023.1.1 + return x.to_backend("pandas") if cudf is None or not isinstance(x, dask_cudf.DataFrame): # Already a Pandas-backed collection return x @@ -676,6 +679,9 @@ def convert_data(x, cpu=True, to_collection=None, npartitions=1): return dd.from_pandas(_x, sort=False, npartitions=npartitions) if to_collection else _x elif cudf and dask_cudf: if isinstance(x, dd.DataFrame): + if hasattr(x, "to_backend"): + # Requires dask>=2023.1.1 + return x.to_backend("cudf") # If input is a Dask collection, convert to dask_cudf if isinstance(x, dask_cudf.DataFrame): # Already a cudf-backed Dask collection diff --git a/merlin/io/dataframe_engine.py b/merlin/io/dataframe_engine.py index 003a947b2..fdd7605d5 100644 --- a/merlin/io/dataframe_engine.py +++ b/merlin/io/dataframe_engine.py @@ -38,7 +38,16 @@ def to_ddf(self, columns=None, cpu=None): cpu = self.cpu if cpu is None else cpu # Move data from gpu to cpu if necessary - _ddf = self._move_ddf("cpu") if (cpu and not self.cpu) else self._ddf + if hasattr(self._ddf, "to_backend"): + # Requires dask>=2023.1.1 + if cpu: + _ddf = self._ddf.to_backend("pandas") + elif cpu is False: + _ddf = self._ddf.to_backend("cudf") + else: + _ddf = self._ddf + else: + _ddf = self._move_ddf("cpu") if (cpu and not self.cpu) else self._ddf if isinstance(columns, list): return _ddf[columns] @@ -49,14 +58,22 @@ def to_ddf(self, columns=None, cpu=None): def to_cpu(self): if self.cpu: return - self._ddf = self._move_ddf("cpu") + if hasattr(self._ddf, "to_backend"): + # Requires dask>=2023.1.1 + self._ddf = self._ddf.to_backend("pandas") + else: + self._ddf = self._move_ddf("cpu") self.cpu = True self.moved_collection = not self.moved_collection def to_gpu(self): if not self.cpu: return - self._ddf = self._move_ddf("gpu") + if hasattr(self._ddf, "to_backend"): + # Requires dask>=2023.1.1 + self._ddf = self._ddf.to_backend("cudf") + else: + self._ddf = self._move_ddf("gpu") self.cpu = False self.moved_collection = not self.moved_collection @@ -66,6 +83,7 @@ def num_rows(self): def _move_ddf(self, destination): """Move the collection between cpu and gpu memory.""" + # TODO: Remove this method when we pin to dask>=2013.1.1 _ddf = self._ddf if ( self.moved_collection