From f955dd76b47779d4f527efe25de417b1acbff4a7 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 15 Aug 2024 17:13:58 -0700 Subject: [PATCH] Rewrite remaining Python Arrow interop conversions using the C Data Interface (#16548) This PR rewrites all remaining parts of the Python interop code previously using Arrow C++ types to instead use the C Data Interface. With this change, we no longer require pyarrow in that part of the Cython code. There are further improvements that we should make to streamline the internals, but I would like to keep this changeset minimal since getting it merged unblocks progress on multiple fronts so that we can progress further in parallel. Contributes to #15193 Authors: - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Bradley Dice (https://github.com/bdice) - Yunsong Wang (https://github.com/PointKernel) URL: https://github.com/rapidsai/cudf/pull/16548 --- cpp/src/interop/arrow_utilities.cpp | 1 + cpp/src/interop/to_arrow_schema.cpp | 5 +- python/cudf/cudf/_lib/CMakeLists.txt | 6 +- .../cudf/cudf/_lib/pylibcudf/CMakeLists.txt | 5 +- python/cudf/cudf/_lib/pylibcudf/interop.pyx | 188 +++++++++--------- .../cudf/_lib/pylibcudf/libcudf/interop.pxd | 53 +++-- .../cudf/cudf/pylibcudf_tests/common/utils.py | 6 +- 7 files changed, 146 insertions(+), 118 deletions(-) diff --git a/cpp/src/interop/arrow_utilities.cpp b/cpp/src/interop/arrow_utilities.cpp index 4292552a800..3776daf41aa 100644 --- a/cpp/src/interop/arrow_utilities.cpp +++ b/cpp/src/interop/arrow_utilities.cpp @@ -98,6 +98,7 @@ ArrowType id_to_arrow_type(cudf::type_id id) ArrowType id_to_arrow_storage_type(cudf::type_id id) { switch (id) { + case cudf::type_id::TIMESTAMP_DAYS: return NANOARROW_TYPE_INT32; case cudf::type_id::TIMESTAMP_SECONDS: case cudf::type_id::TIMESTAMP_MILLISECONDS: case cudf::type_id::TIMESTAMP_MICROSECONDS: diff --git a/cpp/src/interop/to_arrow_schema.cpp b/cpp/src/interop/to_arrow_schema.cpp index b98ca8a7bed..5afed772656 100644 --- a/cpp/src/interop/to_arrow_schema.cpp +++ b/cpp/src/interop/to_arrow_schema.cpp @@ -170,8 +170,9 @@ int dispatch_to_arrow_type::operator()(column_view input, NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(out, NANOARROW_TYPE_LIST)); auto child = input.child(cudf::lists_column_view::child_column_index); ArrowSchemaInit(out->children[0]); - auto child_meta = - metadata.children_meta.empty() ? column_metadata{"element"} : metadata.children_meta[0]; + auto child_meta = metadata.children_meta.empty() + ? column_metadata{"element"} + : metadata.children_meta[cudf::lists_column_view::child_column_index]; out->flags = input.has_nulls() ? ARROW_FLAG_NULLABLE : 0; NANOARROW_RETURN_NOT_OK(ArrowSchemaSetName(out->children[0], child_meta.name.c_str())); diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index 38b7e9ebe04..d32a2d8e3f8 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -64,9 +64,13 @@ rapids_cython_create_modules( target_link_libraries(strings_udf PUBLIC cudf_strings_udf) -set(targets_using_arrow_headers interop avro csv orc json parquet) +set(targets_using_arrow_headers avro csv orc json parquet) link_to_pyarrow_headers("${targets_using_arrow_headers}") +include(${rapids-cmake-dir}/export/find_package_root.cmake) +include(../../../../cpp/cmake/thirdparty/get_nanoarrow.cmake) +target_link_libraries(interop PUBLIC nanoarrow) + add_subdirectory(io) add_subdirectory(nvtext) add_subdirectory(pylibcudf) diff --git a/python/cudf/cudf/_lib/pylibcudf/CMakeLists.txt b/python/cudf/cudf/_lib/pylibcudf/CMakeLists.txt index df4591baa71..da32d530928 100644 --- a/python/cudf/cudf/_lib/pylibcudf/CMakeLists.txt +++ b/python/cudf/cudf/_lib/pylibcudf/CMakeLists.txt @@ -52,7 +52,10 @@ rapids_cython_create_modules( SOURCE_FILES "${cython_sources}" LINKED_LIBRARIES "${linked_libraries}" MODULE_PREFIX pylibcudf_ ASSOCIATED_TARGETS cudf ) -link_to_pyarrow_headers(pylibcudf_interop) + +include(${rapids-cmake-dir}/export/find_package_root.cmake) +include(../../../../../cpp/cmake/thirdparty/get_nanoarrow.cmake) +target_link_libraries(pylibcudf_interop PUBLIC nanoarrow) add_subdirectory(libcudf) add_subdirectory(strings) diff --git a/python/cudf/cudf/_lib/pylibcudf/interop.pyx b/python/cudf/cudf/_lib/pylibcudf/interop.pyx index adf7e1fd7e8..caa19724786 100644 --- a/python/cudf/cudf/_lib/pylibcudf/interop.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/interop.pyx @@ -1,11 +1,10 @@ # Copyright (c) 2023-2024, NVIDIA CORPORATION. -from cpython cimport pycapsule -from cython.operator cimport dereference -from libcpp.memory cimport shared_ptr, unique_ptr +from cpython.pycapsule cimport PyCapsule_GetPointer, PyCapsule_New +from libc.stdlib cimport free +from libcpp.memory cimport unique_ptr from libcpp.utility cimport move from libcpp.vector cimport vector -from pyarrow cimport lib as pa from dataclasses import dataclass, field from functools import singledispatch @@ -18,23 +17,14 @@ from cudf._lib.pylibcudf.libcudf.interop cimport ( ArrowArrayStream, ArrowSchema, column_metadata, - from_arrow as cpp_from_arrow, from_arrow_column as cpp_from_arrow_column, from_arrow_stream as cpp_from_arrow_stream, - to_arrow as cpp_to_arrow, -) -from cudf._lib.pylibcudf.libcudf.scalar.scalar cimport ( - fixed_point_scalar, - scalar, + to_arrow_host_raw, + to_arrow_schema_raw, ) from cudf._lib.pylibcudf.libcudf.table.table cimport table -from cudf._lib.pylibcudf.libcudf.wrappers.decimals cimport ( - decimal32, - decimal64, - decimal128, - scale_type, -) +from . cimport copying from .column cimport Column from .scalar cimport Scalar from .table cimport Table @@ -109,7 +99,9 @@ def from_arrow(pyarrow_object, *, DataType data_type=None): Union[Table, Scalar] The converted object of type corresponding to the input type in cudf. """ - raise TypeError("from_arrow only accepts Table and Scalar objects") + raise TypeError( + f"Unsupported type {type(pyarrow_object)} for conversion from arrow" + ) @from_arrow.register(pa.DataType) @@ -133,7 +125,7 @@ def _from_arrow_table(pyarrow_object, *, DataType data_type=None): raise ValueError("data_type may not be passed for tables") stream = pyarrow_object.__arrow_c_stream__() cdef ArrowArrayStream* c_stream = ( - pycapsule.PyCapsule_GetPointer(stream, "arrow_array_stream") + PyCapsule_GetPointer(stream, "arrow_array_stream") ) cdef unique_ptr[table] c_result @@ -146,54 +138,17 @@ def _from_arrow_table(pyarrow_object, *, DataType data_type=None): @from_arrow.register(pa.Scalar) def _from_arrow_scalar(pyarrow_object, *, DataType data_type=None): - cdef shared_ptr[pa.CScalar] arrow_scalar = pa.pyarrow_unwrap_scalar(pyarrow_object) - - cdef unique_ptr[scalar] c_result - with nogil: - c_result = move(cpp_from_arrow(dereference(arrow_scalar))) - - cdef Scalar result = Scalar.from_libcudf(move(c_result)) - - if result.type().id() != type_id.DECIMAL128: - if data_type is not None: - raise ValueError( - "dtype may not be passed for non-decimal types" - ) - return result - - if data_type is None: - raise ValueError( - "Decimal scalars must be constructed with a dtype" - ) - - cdef type_id tid = data_type.id() - - if tid == type_id.DECIMAL32: - result.c_obj.reset( - new fixed_point_scalar[decimal32]( - ( - result.c_obj.get() - ).value(), - scale_type(-pyarrow_object.type.scale), - result.c_obj.get().is_valid() - ) - ) - elif tid == type_id.DECIMAL64: - result.c_obj.reset( - new fixed_point_scalar[decimal64]( - ( - result.c_obj.get() - ).value(), - scale_type(-pyarrow_object.type.scale), - result.c_obj.get().is_valid() - ) - ) - elif tid != type_id.DECIMAL128: - raise ValueError( - "Decimal scalars may only be cast to decimals" - ) - - return result + if isinstance(pyarrow_object.type, pa.ListType) and pyarrow_object.as_py() is None: + # pyarrow doesn't correctly handle None values for list types, so + # we have to create this one manually. + # https://github.com/apache/arrow/issues/40319 + pa_array = pa.array([None], type=pyarrow_object.type) + else: + pa_array = pa.array([pyarrow_object]) + return copying.get_element( + from_arrow(pa_array, data_type=data_type), + 0, + ) @from_arrow.register(pa.Array) @@ -204,10 +159,10 @@ def _from_arrow_column(pyarrow_object, *, DataType data_type=None): schema, array = pyarrow_object.__arrow_c_array__() cdef ArrowSchema* c_schema = ( - pycapsule.PyCapsule_GetPointer(schema, "arrow_schema") + PyCapsule_GetPointer(schema, "arrow_schema") ) cdef ArrowArray* c_array = ( - pycapsule.PyCapsule_GetPointer(array, "arrow_array") + PyCapsule_GetPointer(array, "arrow_array") ) cdef unique_ptr[column] c_result @@ -238,7 +193,7 @@ def to_arrow(cudf_object, metadata=None): Union[pyarrow.Array, pyarrow.Table, pyarrow.Scalar] The converted object of type corresponding to the input type in PyArrow. """ - raise TypeError("to_arrow only accepts Table and Scalar objects") + raise TypeError(f"Unsupported type {type(cudf_object)} for conversion to arrow") @to_arrow.register(DataType) @@ -281,46 +236,83 @@ def _to_arrow_datatype(cudf_object, **kwargs): ) -@to_arrow.register(Table) -def _to_arrow_table(cudf_object, metadata=None): +cdef void _release_schema(object schema_capsule) noexcept: + """Release the ArrowSchema object stored in a PyCapsule.""" + cdef ArrowSchema* schema = PyCapsule_GetPointer( + schema_capsule, 'arrow_schema' + ) + if schema.release != NULL: + schema.release(schema) + + free(schema) + + +cdef void _release_array(object array_capsule) noexcept: + """Release the ArrowArray object stored in a PyCapsule.""" + cdef ArrowArray* array = PyCapsule_GetPointer( + array_capsule, 'arrow_array' + ) + if array.release != NULL: + array.release(array) + + free(array) + + +def _table_to_schema(Table tbl, metadata): if metadata is None: - metadata = [ColumnMetadata() for _ in range(len(cudf_object.columns()))] + metadata = [ColumnMetadata() for _ in range(len(tbl.columns()))] metadata = [ColumnMetadata(m) if isinstance(m, str) else m for m in metadata] - cdef vector[column_metadata] c_table_metadata - cdef shared_ptr[pa.CTable] c_table_result + + cdef vector[column_metadata] c_metadata + c_metadata.reserve(len(metadata)) for meta in metadata: - c_table_metadata.push_back(_metadata_to_libcudf(meta)) + c_metadata.push_back(_metadata_to_libcudf(meta)) + + cdef ArrowSchema* raw_schema_ptr with nogil: - c_table_result = move( - cpp_to_arrow(( cudf_object).view(), c_table_metadata) - ) + raw_schema_ptr = to_arrow_schema_raw(tbl.view(), c_metadata) - return pa.pyarrow_wrap_table(c_table_result) + return PyCapsule_New(raw_schema_ptr, 'arrow_schema', _release_schema) -@to_arrow.register(Scalar) -def _to_arrow_scalar(cudf_object, metadata=None): - # Note that metadata for scalars is primarily important for preserving - # information on nested types since names are otherwise irrelevant. - if metadata is None: - metadata = ColumnMetadata() - metadata = ColumnMetadata(metadata) if isinstance(metadata, str) else metadata - cdef column_metadata c_scalar_metadata = _metadata_to_libcudf(metadata) - cdef shared_ptr[pa.CScalar] c_scalar_result +def _table_to_host_array(Table tbl): + cdef ArrowArray* raw_host_array_ptr with nogil: - c_scalar_result = move( - cpp_to_arrow( - dereference(( cudf_object).c_obj), c_scalar_metadata - ) - ) + raw_host_array_ptr = to_arrow_host_raw(tbl.view()) + + return PyCapsule_New(raw_host_array_ptr, "arrow_array", _release_array) + + +class _TableWithArrowMetadata: + def __init__(self, tbl, metadata=None): + self.tbl = tbl + self.metadata = metadata - return pa.pyarrow_wrap_scalar(c_scalar_result) + def __arrow_c_array__(self, requested_schema=None): + return _table_to_schema(self.tbl, self.metadata), _table_to_host_array(self.tbl) + + +# TODO: In the long run we should get rid of the `to_arrow` functions in favor of using +# the protocols directly via `pa.table(cudf_object, schema=...)` directly. We can do the +# same for columns. We cannot do this for scalars since there is no corresponding +# protocol. Since this will require broader changes throughout the codebase, the current +# approach is to leverage the protocol internally but to continue exposing `to_arrow`. +@to_arrow.register(Table) +def _to_arrow_table(cudf_object, metadata=None): + test_table = _TableWithArrowMetadata(cudf_object, metadata) + return pa.table(test_table) @to_arrow.register(Column) def _to_arrow_array(cudf_object, metadata=None): """Create a PyArrow array from a pylibcudf column.""" - if metadata is None: - metadata = ColumnMetadata() - metadata = ColumnMetadata(metadata) if isinstance(metadata, str) else metadata - return to_arrow(Table([cudf_object]), [metadata])[0] + if metadata is not None: + metadata = [metadata] + return to_arrow(Table([cudf_object]), metadata)[0] + + +@to_arrow.register(Scalar) +def _to_arrow_scalar(cudf_object, metadata=None): + # Note that metadata for scalars is primarily important for preserving + # information on nested types since names are otherwise irrelevant. + return to_arrow(Column.from_scalar(cudf_object, 1), metadata=metadata)[0] diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd index 2151da28d4b..24d96b602dc 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd @@ -3,11 +3,11 @@ from libcpp.memory cimport shared_ptr, unique_ptr from libcpp.string cimport string from libcpp.vector cimport vector -from pyarrow.lib cimport CScalar, CTable from cudf._lib.types import cudf_to_np_types, np_to_cudf_types from cudf._lib.pylibcudf.libcudf.column.column cimport column +from cudf._lib.pylibcudf.libcudf.column.column_view cimport column_view from cudf._lib.pylibcudf.libcudf.scalar.scalar cimport scalar from cudf._lib.pylibcudf.libcudf.table.table cimport table from cudf._lib.pylibcudf.libcudf.table.table_view cimport table_view @@ -29,6 +29,9 @@ cdef extern from "cudf/interop.hpp" nogil: cdef struct ArrowArrayStream: void (*release)(ArrowArrayStream*) noexcept nogil + cdef struct ArrowDeviceArray: + ArrowArray array + cdef extern from "cudf/interop.hpp" namespace "cudf" \ nogil: @@ -38,27 +41,49 @@ cdef extern from "cudf/interop.hpp" namespace "cudf" \ DLManagedTensor* to_dlpack(table_view input_table ) except + - cdef unique_ptr[table] from_arrow(CTable input) except + - cdef unique_ptr[scalar] from_arrow(CScalar input) except + - cdef cppclass column_metadata: column_metadata() except + column_metadata(string name_) except + string name vector[column_metadata] children_meta - cdef shared_ptr[CTable] to_arrow( - table_view input, - vector[column_metadata] metadata, - ) except + - - cdef shared_ptr[CScalar] to_arrow( - const scalar& input, - column_metadata metadata, - ) except + - cdef unique_ptr[table] from_arrow_stream(ArrowArrayStream* input) except + cdef unique_ptr[column] from_arrow_column( const ArrowSchema* schema, const ArrowArray* input ) except + + + +cdef extern from *: + # Rather than exporting the underlying functions directly to Cython, we expose + # these wrappers that handle the release to avoid needing to teach Cython how + # to handle unique_ptrs with custom deleters that aren't default constructible. + # This will go away once we introduce cudf::arrow_column (need a + # cudf::arrow_schema as well), see + # https://github.com/rapidsai/cudf/issues/16104. + """ + #include + #include + + ArrowSchema* to_arrow_schema_raw( + cudf::table_view const& input, + cudf::host_span metadata) { + return to_arrow_schema(input, metadata).release(); + } + + ArrowArray* to_arrow_host_raw( + cudf::table_view const& tbl, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) { + // Assumes the sync event is null and the data is already on the host. + ArrowArray *arr = new ArrowArray(); + auto device_arr = cudf::to_arrow_host(tbl, stream, mr); + ArrowArrayMove(&device_arr->array, arr); + return arr; + } + """ + cdef ArrowSchema *to_arrow_schema_raw( + const table_view& tbl, + const vector[column_metadata]& metadata, + ) except + nogil + cdef ArrowArray* to_arrow_host_raw(const table_view& tbl) except + nogil diff --git a/python/cudf/cudf/pylibcudf_tests/common/utils.py b/python/cudf/cudf/pylibcudf_tests/common/utils.py index e19ff58927f..acb2b5be85c 100644 --- a/python/cudf/cudf/pylibcudf_tests/common/utils.py +++ b/python/cudf/cudf/pylibcudf_tests/common/utils.py @@ -44,7 +44,7 @@ def metadata_from_arrow_type( def assert_column_eq( lhs: pa.Array | plc.Column, rhs: pa.Array | plc.Column, - check_field_nullability=True, + check_field_nullability=False, ) -> None: """Verify that a pylibcudf array and PyArrow array are equal. @@ -59,7 +59,9 @@ def assert_column_eq( on child fields are equal. Useful for checking roundtripping of lossy formats like JSON that may not - preserve this information. + preserve this information. Also, our Arrow interop functions make different + choices by default than pyarrow field constructors since the interop functions + may make data-dependent choices. """ # Nested types require children metadata to be passed to the conversion function. if isinstance(lhs, (pa.Array, pa.ChunkedArray)) and isinstance(