Skip to content

Commit

Permalink
Rewrite remaining Python Arrow interop conversions using the C Data I…
Browse files Browse the repository at this point in the history
…nterface (#16548)

This PR rewrites all remaining parts of the Python interop code previously using Arrow C++ types to instead use the C Data Interface. With this change, we no longer require pyarrow in that part of the Cython code. There are further improvements that we should make to streamline the internals, but I would like to keep this changeset minimal since getting it merged unblocks progress on multiple fronts so that we can progress further in parallel.

Contributes to #15193

Authors:
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - Yunsong Wang (https://github.com/PointKernel)

URL: #16548
  • Loading branch information
vyasr authored Aug 16, 2024
1 parent 155edde commit f955dd7
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 118 deletions.
1 change: 1 addition & 0 deletions cpp/src/interop/arrow_utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ ArrowType id_to_arrow_type(cudf::type_id id)
ArrowType id_to_arrow_storage_type(cudf::type_id id)
{
switch (id) {
case cudf::type_id::TIMESTAMP_DAYS: return NANOARROW_TYPE_INT32;
case cudf::type_id::TIMESTAMP_SECONDS:
case cudf::type_id::TIMESTAMP_MILLISECONDS:
case cudf::type_id::TIMESTAMP_MICROSECONDS:
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/interop/to_arrow_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,9 @@ int dispatch_to_arrow_type::operator()<cudf::list_view>(column_view input,
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(out, NANOARROW_TYPE_LIST));
auto child = input.child(cudf::lists_column_view::child_column_index);
ArrowSchemaInit(out->children[0]);
auto child_meta =
metadata.children_meta.empty() ? column_metadata{"element"} : metadata.children_meta[0];
auto child_meta = metadata.children_meta.empty()
? column_metadata{"element"}
: metadata.children_meta[cudf::lists_column_view::child_column_index];

out->flags = input.has_nulls() ? ARROW_FLAG_NULLABLE : 0;
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetName(out->children[0], child_meta.name.c_str()));
Expand Down
6 changes: 5 additions & 1 deletion python/cudf/cudf/_lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@ rapids_cython_create_modules(

target_link_libraries(strings_udf PUBLIC cudf_strings_udf)

set(targets_using_arrow_headers interop avro csv orc json parquet)
set(targets_using_arrow_headers avro csv orc json parquet)
link_to_pyarrow_headers("${targets_using_arrow_headers}")

include(${rapids-cmake-dir}/export/find_package_root.cmake)
include(../../../../cpp/cmake/thirdparty/get_nanoarrow.cmake)
target_link_libraries(interop PUBLIC nanoarrow)

add_subdirectory(io)
add_subdirectory(nvtext)
add_subdirectory(pylibcudf)
Expand Down
5 changes: 4 additions & 1 deletion python/cudf/cudf/_lib/pylibcudf/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ rapids_cython_create_modules(
SOURCE_FILES "${cython_sources}"
LINKED_LIBRARIES "${linked_libraries}" MODULE_PREFIX pylibcudf_ ASSOCIATED_TARGETS cudf
)
link_to_pyarrow_headers(pylibcudf_interop)

include(${rapids-cmake-dir}/export/find_package_root.cmake)
include(../../../../../cpp/cmake/thirdparty/get_nanoarrow.cmake)
target_link_libraries(pylibcudf_interop PUBLIC nanoarrow)

add_subdirectory(libcudf)
add_subdirectory(strings)
Expand Down
188 changes: 90 additions & 98 deletions python/cudf/cudf/_lib/pylibcudf/interop.pyx
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.

from cpython cimport pycapsule
from cython.operator cimport dereference
from libcpp.memory cimport shared_ptr, unique_ptr
from cpython.pycapsule cimport PyCapsule_GetPointer, PyCapsule_New
from libc.stdlib cimport free
from libcpp.memory cimport unique_ptr
from libcpp.utility cimport move
from libcpp.vector cimport vector
from pyarrow cimport lib as pa

from dataclasses import dataclass, field
from functools import singledispatch
Expand All @@ -18,23 +17,14 @@ from cudf._lib.pylibcudf.libcudf.interop cimport (
ArrowArrayStream,
ArrowSchema,
column_metadata,
from_arrow as cpp_from_arrow,
from_arrow_column as cpp_from_arrow_column,
from_arrow_stream as cpp_from_arrow_stream,
to_arrow as cpp_to_arrow,
)
from cudf._lib.pylibcudf.libcudf.scalar.scalar cimport (
fixed_point_scalar,
scalar,
to_arrow_host_raw,
to_arrow_schema_raw,
)
from cudf._lib.pylibcudf.libcudf.table.table cimport table
from cudf._lib.pylibcudf.libcudf.wrappers.decimals cimport (
decimal32,
decimal64,
decimal128,
scale_type,
)

from . cimport copying
from .column cimport Column
from .scalar cimport Scalar
from .table cimport Table
Expand Down Expand Up @@ -109,7 +99,9 @@ def from_arrow(pyarrow_object, *, DataType data_type=None):
Union[Table, Scalar]
The converted object of type corresponding to the input type in cudf.
"""
raise TypeError("from_arrow only accepts Table and Scalar objects")
raise TypeError(
f"Unsupported type {type(pyarrow_object)} for conversion from arrow"
)


@from_arrow.register(pa.DataType)
Expand All @@ -133,7 +125,7 @@ def _from_arrow_table(pyarrow_object, *, DataType data_type=None):
raise ValueError("data_type may not be passed for tables")
stream = pyarrow_object.__arrow_c_stream__()
cdef ArrowArrayStream* c_stream = (
<ArrowArrayStream*>pycapsule.PyCapsule_GetPointer(stream, "arrow_array_stream")
<ArrowArrayStream*>PyCapsule_GetPointer(stream, "arrow_array_stream")
)

cdef unique_ptr[table] c_result
Expand All @@ -146,54 +138,17 @@ def _from_arrow_table(pyarrow_object, *, DataType data_type=None):

@from_arrow.register(pa.Scalar)
def _from_arrow_scalar(pyarrow_object, *, DataType data_type=None):
cdef shared_ptr[pa.CScalar] arrow_scalar = pa.pyarrow_unwrap_scalar(pyarrow_object)

cdef unique_ptr[scalar] c_result
with nogil:
c_result = move(cpp_from_arrow(dereference(arrow_scalar)))

cdef Scalar result = Scalar.from_libcudf(move(c_result))

if result.type().id() != type_id.DECIMAL128:
if data_type is not None:
raise ValueError(
"dtype may not be passed for non-decimal types"
)
return result

if data_type is None:
raise ValueError(
"Decimal scalars must be constructed with a dtype"
)

cdef type_id tid = data_type.id()

if tid == type_id.DECIMAL32:
result.c_obj.reset(
new fixed_point_scalar[decimal32](
(
<fixed_point_scalar[decimal128]*> result.c_obj.get()
).value(),
scale_type(-pyarrow_object.type.scale),
result.c_obj.get().is_valid()
)
)
elif tid == type_id.DECIMAL64:
result.c_obj.reset(
new fixed_point_scalar[decimal64](
(
<fixed_point_scalar[decimal128]*> result.c_obj.get()
).value(),
scale_type(-pyarrow_object.type.scale),
result.c_obj.get().is_valid()
)
)
elif tid != type_id.DECIMAL128:
raise ValueError(
"Decimal scalars may only be cast to decimals"
)

return result
if isinstance(pyarrow_object.type, pa.ListType) and pyarrow_object.as_py() is None:
# pyarrow doesn't correctly handle None values for list types, so
# we have to create this one manually.
# https://github.com/apache/arrow/issues/40319
pa_array = pa.array([None], type=pyarrow_object.type)
else:
pa_array = pa.array([pyarrow_object])
return copying.get_element(
from_arrow(pa_array, data_type=data_type),
0,
)


@from_arrow.register(pa.Array)
Expand All @@ -204,10 +159,10 @@ def _from_arrow_column(pyarrow_object, *, DataType data_type=None):

schema, array = pyarrow_object.__arrow_c_array__()
cdef ArrowSchema* c_schema = (
<ArrowSchema*>pycapsule.PyCapsule_GetPointer(schema, "arrow_schema")
<ArrowSchema*>PyCapsule_GetPointer(schema, "arrow_schema")
)
cdef ArrowArray* c_array = (
<ArrowArray*>pycapsule.PyCapsule_GetPointer(array, "arrow_array")
<ArrowArray*>PyCapsule_GetPointer(array, "arrow_array")
)

cdef unique_ptr[column] c_result
Expand Down Expand Up @@ -238,7 +193,7 @@ def to_arrow(cudf_object, metadata=None):
Union[pyarrow.Array, pyarrow.Table, pyarrow.Scalar]
The converted object of type corresponding to the input type in PyArrow.
"""
raise TypeError("to_arrow only accepts Table and Scalar objects")
raise TypeError(f"Unsupported type {type(cudf_object)} for conversion to arrow")


@to_arrow.register(DataType)
Expand Down Expand Up @@ -281,46 +236,83 @@ def _to_arrow_datatype(cudf_object, **kwargs):
)


@to_arrow.register(Table)
def _to_arrow_table(cudf_object, metadata=None):
cdef void _release_schema(object schema_capsule) noexcept:
"""Release the ArrowSchema object stored in a PyCapsule."""
cdef ArrowSchema* schema = <ArrowSchema*>PyCapsule_GetPointer(
schema_capsule, 'arrow_schema'
)
if schema.release != NULL:
schema.release(schema)

free(schema)


cdef void _release_array(object array_capsule) noexcept:
"""Release the ArrowArray object stored in a PyCapsule."""
cdef ArrowArray* array = <ArrowArray*>PyCapsule_GetPointer(
array_capsule, 'arrow_array'
)
if array.release != NULL:
array.release(array)

free(array)


def _table_to_schema(Table tbl, metadata):
if metadata is None:
metadata = [ColumnMetadata() for _ in range(len(cudf_object.columns()))]
metadata = [ColumnMetadata() for _ in range(len(tbl.columns()))]
metadata = [ColumnMetadata(m) if isinstance(m, str) else m for m in metadata]
cdef vector[column_metadata] c_table_metadata
cdef shared_ptr[pa.CTable] c_table_result

cdef vector[column_metadata] c_metadata
c_metadata.reserve(len(metadata))
for meta in metadata:
c_table_metadata.push_back(_metadata_to_libcudf(meta))
c_metadata.push_back(_metadata_to_libcudf(meta))

cdef ArrowSchema* raw_schema_ptr
with nogil:
c_table_result = move(
cpp_to_arrow((<Table> cudf_object).view(), c_table_metadata)
)
raw_schema_ptr = to_arrow_schema_raw(tbl.view(), c_metadata)

return pa.pyarrow_wrap_table(c_table_result)
return PyCapsule_New(<void*>raw_schema_ptr, 'arrow_schema', _release_schema)


@to_arrow.register(Scalar)
def _to_arrow_scalar(cudf_object, metadata=None):
# Note that metadata for scalars is primarily important for preserving
# information on nested types since names are otherwise irrelevant.
if metadata is None:
metadata = ColumnMetadata()
metadata = ColumnMetadata(metadata) if isinstance(metadata, str) else metadata
cdef column_metadata c_scalar_metadata = _metadata_to_libcudf(metadata)
cdef shared_ptr[pa.CScalar] c_scalar_result
def _table_to_host_array(Table tbl):
cdef ArrowArray* raw_host_array_ptr
with nogil:
c_scalar_result = move(
cpp_to_arrow(
dereference((<Scalar> cudf_object).c_obj), c_scalar_metadata
)
)
raw_host_array_ptr = to_arrow_host_raw(tbl.view())

return PyCapsule_New(<void*>raw_host_array_ptr, "arrow_array", _release_array)


class _TableWithArrowMetadata:
def __init__(self, tbl, metadata=None):
self.tbl = tbl
self.metadata = metadata

return pa.pyarrow_wrap_scalar(c_scalar_result)
def __arrow_c_array__(self, requested_schema=None):
return _table_to_schema(self.tbl, self.metadata), _table_to_host_array(self.tbl)


# TODO: In the long run we should get rid of the `to_arrow` functions in favor of using
# the protocols directly via `pa.table(cudf_object, schema=...)` directly. We can do the
# same for columns. We cannot do this for scalars since there is no corresponding
# protocol. Since this will require broader changes throughout the codebase, the current
# approach is to leverage the protocol internally but to continue exposing `to_arrow`.
@to_arrow.register(Table)
def _to_arrow_table(cudf_object, metadata=None):
test_table = _TableWithArrowMetadata(cudf_object, metadata)
return pa.table(test_table)


@to_arrow.register(Column)
def _to_arrow_array(cudf_object, metadata=None):
"""Create a PyArrow array from a pylibcudf column."""
if metadata is None:
metadata = ColumnMetadata()
metadata = ColumnMetadata(metadata) if isinstance(metadata, str) else metadata
return to_arrow(Table([cudf_object]), [metadata])[0]
if metadata is not None:
metadata = [metadata]
return to_arrow(Table([cudf_object]), metadata)[0]


@to_arrow.register(Scalar)
def _to_arrow_scalar(cudf_object, metadata=None):
# Note that metadata for scalars is primarily important for preserving
# information on nested types since names are otherwise irrelevant.
return to_arrow(Column.from_scalar(cudf_object, 1), metadata=metadata)[0]
53 changes: 39 additions & 14 deletions python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
from libcpp.memory cimport shared_ptr, unique_ptr
from libcpp.string cimport string
from libcpp.vector cimport vector
from pyarrow.lib cimport CScalar, CTable

from cudf._lib.types import cudf_to_np_types, np_to_cudf_types

from cudf._lib.pylibcudf.libcudf.column.column cimport column
from cudf._lib.pylibcudf.libcudf.column.column_view cimport column_view
from cudf._lib.pylibcudf.libcudf.scalar.scalar cimport scalar
from cudf._lib.pylibcudf.libcudf.table.table cimport table
from cudf._lib.pylibcudf.libcudf.table.table_view cimport table_view
Expand All @@ -29,6 +29,9 @@ cdef extern from "cudf/interop.hpp" nogil:
cdef struct ArrowArrayStream:
void (*release)(ArrowArrayStream*) noexcept nogil

cdef struct ArrowDeviceArray:
ArrowArray array


cdef extern from "cudf/interop.hpp" namespace "cudf" \
nogil:
Expand All @@ -38,27 +41,49 @@ cdef extern from "cudf/interop.hpp" namespace "cudf" \
DLManagedTensor* to_dlpack(table_view input_table
) except +

cdef unique_ptr[table] from_arrow(CTable input) except +
cdef unique_ptr[scalar] from_arrow(CScalar input) except +

cdef cppclass column_metadata:
column_metadata() except +
column_metadata(string name_) except +
string name
vector[column_metadata] children_meta

cdef shared_ptr[CTable] to_arrow(
table_view input,
vector[column_metadata] metadata,
) except +

cdef shared_ptr[CScalar] to_arrow(
const scalar& input,
column_metadata metadata,
) except +

cdef unique_ptr[table] from_arrow_stream(ArrowArrayStream* input) except +
cdef unique_ptr[column] from_arrow_column(
const ArrowSchema* schema,
const ArrowArray* input
) except +


cdef extern from *:
# Rather than exporting the underlying functions directly to Cython, we expose
# these wrappers that handle the release to avoid needing to teach Cython how
# to handle unique_ptrs with custom deleters that aren't default constructible.
# This will go away once we introduce cudf::arrow_column (need a
# cudf::arrow_schema as well), see
# https://github.com/rapidsai/cudf/issues/16104.
"""
#include <nanoarrow/nanoarrow.h>
#include <nanoarrow/nanoarrow_device.h>
ArrowSchema* to_arrow_schema_raw(
cudf::table_view const& input,
cudf::host_span<cudf::column_metadata const> metadata) {
return to_arrow_schema(input, metadata).release();
}
ArrowArray* to_arrow_host_raw(
cudf::table_view const& tbl,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) {
// Assumes the sync event is null and the data is already on the host.
ArrowArray *arr = new ArrowArray();
auto device_arr = cudf::to_arrow_host(tbl, stream, mr);
ArrowArrayMove(&device_arr->array, arr);
return arr;
}
"""
cdef ArrowSchema *to_arrow_schema_raw(
const table_view& tbl,
const vector[column_metadata]& metadata,
) except + nogil
cdef ArrowArray* to_arrow_host_raw(const table_view& tbl) except + nogil
Loading

0 comments on commit f955dd7

Please sign in to comment.