From 9cc907122077d18e5128e7da36685fdeb82fef41 Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Fri, 15 Nov 2024 20:28:26 -0500 Subject: [PATCH] Use pylibcudf contiguous split APIs in cudf python (#17246) Apart of #15162 Authors: - Matthew Murray (https://github.com/Matt711) Approvers: - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/cudf/pull/17246 --- python/cudf/cudf/_lib/copying.pxd | 10 - python/cudf/cudf/_lib/copying.pyx | 213 +++++++----------- .../pylibcudf/pylibcudf/contiguous_split.pxd | 1 + .../pylibcudf/pylibcudf/contiguous_split.pyx | 3 +- 4 files changed, 87 insertions(+), 140 deletions(-) delete mode 100644 python/cudf/cudf/_lib/copying.pxd diff --git a/python/cudf/cudf/_lib/copying.pxd b/python/cudf/cudf/_lib/copying.pxd deleted file mode 100644 index 14c7d2066d8..00000000000 --- a/python/cudf/cudf/_lib/copying.pxd +++ /dev/null @@ -1,10 +0,0 @@ -# Copyright (c) 2021-2024, NVIDIA CORPORATION. - -from pylibcudf.libcudf.contiguous_split cimport packed_columns - - -cdef class _CPackedColumns: - cdef packed_columns c_obj - cdef object column_names - cdef object column_dtypes - cdef object index_names diff --git a/python/cudf/cudf/_lib/copying.pyx b/python/cudf/cudf/_lib/copying.pyx index 8b4d6199600..4dfb12d8ab3 100644 --- a/python/cudf/cudf/_lib/copying.pyx +++ b/python/cudf/cudf/_lib/copying.pyx @@ -2,37 +2,31 @@ import pickle -from libc.stdint cimport uint8_t, uintptr_t from libcpp cimport bool from libcpp.memory cimport unique_ptr from libcpp.utility cimport move -from libcpp.vector cimport vector - -from rmm.pylibrmm.device_buffer cimport DeviceBuffer - import pylibcudf import cudf -from cudf.core.buffer import Buffer, acquire_spill_lock, as_buffer - +from cudf.core.buffer import acquire_spill_lock, as_buffer +from cudf.core.abc import Serializable from cudf._lib.column cimport Column from cudf._lib.scalar import as_device_scalar from cudf._lib.scalar cimport DeviceScalar -from cudf._lib.utils cimport table_view_from_table from cudf._lib.reduce import minmax -from cudf.core.abc import Serializable from libcpp.memory cimport make_unique -cimport pylibcudf.libcudf.contiguous_split as cpp_contiguous_split from pylibcudf.libcudf.column.column cimport column from pylibcudf.libcudf.column.column_view cimport column_view from pylibcudf.libcudf.types cimport size_type -from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_table_view +from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_pylibcudf_table +import pylibcudf as plc +from pylibcudf.contiguous_split cimport PackedColumns as PlcPackedColumns def _gather_map_is_valid( @@ -331,54 +325,37 @@ def get_element(Column input_column, size_type index): ) -cdef class _CPackedColumns: - - @staticmethod - def from_py_table(input_table, keep_index=True): - """ - Construct a ``PackedColumns`` object from a ``cudf.DataFrame``. - """ - import cudf.core.dtypes - - cdef _CPackedColumns p = _CPackedColumns.__new__(_CPackedColumns) - - if keep_index and ( - not isinstance(input_table.index, cudf.RangeIndex) - or input_table.index.start != 0 - or input_table.index.stop != len(input_table) - or input_table.index.step != 1 - ): - input_table_view = table_view_from_table(input_table) - p.index_names = input_table._index_names - else: - input_table_view = table_view_from_table( - input_table, ignore_index=True) - - p.column_names = input_table._column_names - p.column_dtypes = {} - for name, col in input_table._column_labels_and_values: - if isinstance(col.dtype, cudf.core.dtypes._BaseDtype): - p.column_dtypes[name] = col.dtype - - p.c_obj = move(cpp_contiguous_split.pack(input_table_view)) +class PackedColumns(Serializable): + """ + A packed representation of a Frame, with all columns residing + in a single GPU memory buffer. + """ - return p + def __init__( + self, + PlcPackedColumns data, + object column_names = None, + object index_names = None, + object column_dtypes = None + ): + self._metadata, self._gpu_data = data.release() + self.column_names=column_names + self.index_names=index_names + self.column_dtypes=column_dtypes - @property - def gpu_data_ptr(self): - return int(self.c_obj.gpu_data.get()[0].data()) + def __reduce__(self): + return self.deserialize, self.serialize() @property - def gpu_data_size(self): - return int(self.c_obj.gpu_data.get()[0].size()) + def __cuda_array_interface__(self): + return self._gpu_data.__cuda_array_interface__ def serialize(self): header = {} frames = [] - gpu_data = as_buffer( - data=self.gpu_data_ptr, - size=self.gpu_data_size, + data = self._gpu_data.obj.ptr, + size = self._gpu_data.obj.size, owner=self, exposed=True ) @@ -388,65 +365,83 @@ cdef class _CPackedColumns: header["column-names"] = self.column_names header["index-names"] = self.index_names - if self.c_obj.metadata.get()[0].data() != NULL: - header["metadata"] = list( - - self.c_obj.metadata.get()[0].data() - ) - - column_dtypes = {} + header["metadata"] = self._metadata.tobytes() for name, dtype in self.column_dtypes.items(): dtype_header, dtype_frames = dtype.serialize() - column_dtypes[name] = ( + self.column_dtypes[name] = ( dtype_header, (len(frames), len(frames) + len(dtype_frames)), ) frames.extend(dtype_frames) - header["column-dtypes"] = column_dtypes - + header["column-dtypes"] = self.column_dtypes + header["type-serialized"] = pickle.dumps(type(self)) return header, frames - @staticmethod - def deserialize(header, frames): - cdef _CPackedColumns p = _CPackedColumns.__new__(_CPackedColumns) - - gpu_data = Buffer.deserialize(header["data"], frames) - - dbuf = DeviceBuffer( - ptr=gpu_data.get_ptr(mode="write"), - size=gpu_data.nbytes - ) - - cdef cpp_contiguous_split.packed_columns data - data.metadata = move( - make_unique[vector[uint8_t]]( - move(header.get("metadata", [])) - ) - ) - data.gpu_data = move(dbuf.c_obj) - - p.c_obj = move(data) - p.column_names = header["column-names"] - p.index_names = header["index-names"] - + @classmethod + def deserialize(cls, header, frames): column_dtypes = {} for name, dtype in header["column-dtypes"].items(): dtype_header, (start, stop) = dtype column_dtypes[name] = pickle.loads( dtype_header["type-serialized"] ).deserialize(dtype_header, frames[start:stop]) - p.column_dtypes = column_dtypes + return cls( + plc.contiguous_split.pack( + plc.contiguous_split.unpack_from_memoryviews( + memoryview(header["metadata"]), + plc.gpumemoryview(frames[0]), + ) + ), + header["column-names"], + header["index-names"], + column_dtypes, + ) - return p + @classmethod + def from_py_table(cls, input_table, keep_index=True): + if keep_index and ( + not isinstance(input_table.index, cudf.RangeIndex) + or input_table.index.start != 0 + or input_table.index.stop != len(input_table) + or input_table.index.step != 1 + ): + columns = input_table._index._columns + input_table._columns + index_names = input_table._index_names + else: + columns = input_table._columns + index_names = None + + column_names = input_table._column_names + column_dtypes = {} + for name, col in input_table._column_labels_and_values: + if isinstance( + col.dtype, + (cudf.core.dtypes._BaseDtype, cudf.core.dtypes.CategoricalDtype) + ): + column_dtypes[name] = col.dtype + + return cls( + plc.contiguous_split.pack( + plc.Table( + [ + col.to_pylibcudf(mode="read") for col in columns + ] + ) + ), + column_names, + index_names, + column_dtypes, + ) def unpack(self): - output_table = cudf.DataFrame._from_data(*data_from_table_view( - cpp_contiguous_split.unpack(self.c_obj), - self, + output_table = cudf.DataFrame._from_data(*data_from_pylibcudf_table( + plc.contiguous_split.unpack_from_memoryviews( + self._metadata, + self._gpu_data + ), self.column_names, self.index_names )) - for name, dtype in self.column_dtypes.items(): output_table._data[name] = ( output_table._data[name]._with_type_metadata(dtype) @@ -455,46 +450,6 @@ cdef class _CPackedColumns: return output_table -class PackedColumns(Serializable): - """ - A packed representation of a Frame, with all columns residing - in a single GPU memory buffer. - """ - - def __init__(self, data): - self._data = data - - def __reduce__(self): - return self.deserialize, self.serialize() - - @property - def __cuda_array_interface__(self): - return { - "data": (self._data.gpu_data_ptr, False), - "shape": (self._data.gpu_data_size,), - "strides": None, - "typestr": "|u1", - "version": 0 - } - - def serialize(self): - header, frames = self._data.serialize() - header["type-serialized"] = pickle.dumps(type(self)) - - return header, frames - - @classmethod - def deserialize(cls, header, frames): - return cls(_CPackedColumns.deserialize(header, frames)) - - @classmethod - def from_py_table(cls, input_table, keep_index=True): - return cls(_CPackedColumns.from_py_table(input_table, keep_index)) - - def unpack(self): - return self._data.unpack() - - def pack(input_table, keep_index=True): """ Pack the columns of a cudf Frame into a single GPU memory buffer. diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pxd b/python/pylibcudf/pylibcudf/contiguous_split.pxd index 2a10cb5b3d5..3745e893c3e 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pxd +++ b/python/pylibcudf/pylibcudf/contiguous_split.pxd @@ -12,6 +12,7 @@ cdef class PackedColumns: @staticmethod cdef PackedColumns from_libcudf(unique_ptr[packed_columns] data) + cpdef tuple release(self) cpdef PackedColumns pack(Table input) diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pyx b/python/pylibcudf/pylibcudf/contiguous_split.pyx index 94873e079c9..2a40d42e6e9 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pyx +++ b/python/pylibcudf/pylibcudf/contiguous_split.pyx @@ -63,6 +63,7 @@ cdef class HostBuffer: def __releasebuffer__(self, Py_buffer *buffer): pass + cdef class PackedColumns: """Column data in a serialized format. @@ -87,7 +88,7 @@ cdef class PackedColumns: out.c_obj = move(data) return out - def release(self): + cpdef tuple release(self): """Releases and returns the underlying serialized metadata and gpu data. The ownership of the memory are transferred to the returned buffers. After