Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: allow using open_arrow with PyCapsule protocol (without pyarrow dependency) #349

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
- `read_arrow` and `open_arrow` now provide
[GeoArrow-compliant extension metadata](https://geoarrow.org/extension-types.html),
including the CRS, when using GDAL 3.8 or higher (#366).
- The `open_arrow` function can now be used without a `pyarrow` dependency. In
that case, specify `use_pyarrow=False` and the returned reader will be a
generic object implementing the [Arrow PyCapsule Protocol](https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html) (i.e. having an `__arrow_c_stream__`
method). This object can then be consumed by your Arrow implementation of choice
that supports this protocol (#349).
- Warn when reading from a multilayer file without specifying a layer (#362).


Expand Down
65 changes: 56 additions & 9 deletions pyogrio/_io.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ from libc.string cimport strlen
from libc.math cimport isnan

cimport cython
from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer

import numpy as np
cimport numpy as np

Expand Down Expand Up @@ -1238,6 +1240,35 @@ def ogr_read(
field_data
)


cdef void pycapsule_array_stream_deleter(object stream_capsule) noexcept:
cdef ArrowArrayStream* stream = <ArrowArrayStream*>PyCapsule_GetPointer(
jorisvandenbossche marked this conversation as resolved.
Show resolved Hide resolved
stream_capsule, 'arrow_array_stream'
)
# Do not invoke the deleter on a used/moved capsule
if stream.release != NULL:
stream.release(stream)

free(stream)


cdef object alloc_c_stream(ArrowArrayStream** c_stream):
c_stream[0] = <ArrowArrayStream*> malloc(sizeof(ArrowArrayStream))
# Ensure the capsule destructor doesn't call a random release pointer
c_stream[0].release = NULL
return PyCapsule_New(c_stream[0], 'arrow_array_stream', &pycapsule_array_stream_deleter)


class _ArrowStream:
def __init__(self, capsule):
self._capsule = capsule

def __arrow_c_stream__(self, requested_schema=None):
if requested_schema is not None:
raise NotImplementedError("requested_schema is not supported")
return self._capsule


@contextlib.contextmanager
def ogr_open_arrow(
str path,
Expand All @@ -1256,7 +1287,9 @@ def ogr_open_arrow(
str sql=None,
str sql_dialect=None,
int return_fids=False,
int batch_size=0):
int batch_size=0,
return_pyarrow=True,
):

cdef int err = 0
cdef const char *path_c = NULL
Expand All @@ -1267,7 +1300,7 @@ def ogr_open_arrow(
cdef char **fields_c = NULL
cdef const char *field_c = NULL
cdef char **options = NULL
cdef ArrowArrayStream stream
cdef ArrowArrayStream* stream
cdef ArrowSchema schema

IF CTE_GDAL_VERSION < (3, 6, 0):
Expand Down Expand Up @@ -1390,19 +1423,28 @@ def ogr_open_arrow(
# make sure layer is read from beginning
OGR_L_ResetReading(ogr_layer)

if not OGR_L_GetArrowStream(ogr_layer, &stream, options):
raise RuntimeError("Failed to open ArrowArrayStream from Layer")
# allocate the stream struct and wrap in capsule to ensure clean-up on error
if not return_pyarrow:
capsule = alloc_c_stream(&stream)
else:
stream = <ArrowArrayStream*> malloc(sizeof(ArrowArrayStream))

stream_ptr = <uintptr_t> &stream
if not OGR_L_GetArrowStream(ogr_layer, stream, options):
if return_pyarrow:
free(stream)
raise RuntimeError("Failed to open ArrowArrayStream from Layer")

if skip_features:
# only supported for GDAL >= 3.8.0; have to do this after getting
# the Arrow stream
OGR_L_SetNextByIndex(ogr_layer, skip_features)

# stream has to be consumed before the Dataset is closed
import pyarrow as pa
reader = pa.RecordBatchStreamReader._import_from_c(stream_ptr)
if return_pyarrow:
import pyarrow as pa
jorisvandenbossche marked this conversation as resolved.
Show resolved Hide resolved
stream_ptr = <uintptr_t> stream
reader = pa.RecordBatchStreamReader._import_from_c(stream_ptr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe simplify a bit and remove line 1439?

Suggested change
reader = pa.RecordBatchStreamReader._import_from_c(stream_ptr)
reader = pa.RecordBatchStreamReader._import_from_c(<uintptr_t> stream)

else:
reader = _ArrowStream(capsule)

meta = {
'crs': crs,
Expand All @@ -1413,10 +1455,11 @@ def ogr_open_arrow(
'fid_column': fid_column,
}

# stream has to be consumed before the Dataset is closed
yield meta, reader

finally:
if reader is not None:
if return_pyarrow and reader is not None:
# Mark reader as closed to prevent reading batches
reader.close()

Expand All @@ -1436,6 +1479,10 @@ def ogr_open_arrow(
GDALClose(ogr_dataset)
ogr_dataset = NULL

if return_pyarrow:
free(stream)


def ogr_read_bounds(
str path,
object layer=None,
Expand Down
1 change: 1 addition & 0 deletions pyogrio/_ogr.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ cdef extern from "arrow_bridge.h":

struct ArrowArrayStream:
int (*get_schema)(ArrowArrayStream* stream, ArrowSchema* out)
void (*release)(ArrowArrayStream*) noexcept nogil
jorisvandenbossche marked this conversation as resolved.
Show resolved Hide resolved


cdef extern from "ogr_api.h":
Expand Down
38 changes: 34 additions & 4 deletions pyogrio/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,17 +361,36 @@ def open_arrow(
sql_dialect=None,
return_fids=False,
batch_size=65_536,
return_pyarrow=True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on return_pyarrow vs return_capsule? I'm not sure which is clearer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't return a capsule exactly, but a small wrapper object with the __arrow_c_stream__ method that will return the capsule.

Side question: would returning the capsule directly be more useful for you?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(although it would be nice to have keyword name that is "positive" about what you want, instead of indicating of not wanting pyarrow .. I just don't have good ideas for that at the moment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(although it would be nice to have keyword name that is "positive" about what you want, instead of indicating of not wanting pyarrow .. I just don't have good ideas for that at the moment)

I think this is more along the lines of what I was trying to ask.

I think having a wrapper object is preferable than raw capsules

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could maybe also consider removing the option entirely, and always returns the capsule wrapper object. This API is still quite new and more an advanced API, so we might be OK with still changing it.

Taking the example from the docstring:

with open_arrow(path) as source:
    meta, reader = source
    for table in reader:
        ....

If we would just always return the wrapper object as implemented in the PR right now, that means that people using this right now and expecting a pyarrow RecordBachReader (or an iterable of pyarrow Tables), would need to change their code to:

with open_arrow(path) as source:
    meta, reader = source
    reader = pa.RecordBatchReader.from_stream(reader)
    for table in reader:
        ....

One problem with this however is that this requires pyarrow >= 15.0, so that's not ideal.

We could also add the basic methods of a RecordBatchreader to the wrapper object (__enter__, __iter__, read_next_batch, read_all, schema, etc, just calling the equivalent method of the underlying pyarrow.RecordBatchReader, which we can instantiate lazily only when one of those methods is used, i.e. which still allows you to convert it with __arrow_c_stream__ without requiring pyarrow)

That keeps compatibility for people using the Python APIs of a RecordBatchReader (like in the code snippet above, to iterate through the stream). However, that unfortunately still doesn't work when passing this reader object to some other functionality of pyarrow that can accept a reader but passes that through to C++ code (eg writing a dataset with a RecordBatchReader as input)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also just check if pyarrow is available, if so return a RecordBatchReader (which will also have the __arrow_c_stream__ method for recent pyarrow), and if not fallback on returning the wrapper object.

That keeps backwards compatibility perfectly, while allowing to use it without pyarrow (at the expense of a slightly confusing API / different behaviour just depending on whether another package is installed or not)

Although that might also be an extra indirection in case you don't need the pyarrow RecordBatchReader. Because what will be returned from __arrow_c_stream__ on the returned reader will be a ArrowArrayStream backed by Arrow wrapping the GDAL stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the extra indirection would be a problem.

I tend to think that having multiple possible return types is much more confusion than it's worth for most users. I'm not really a fan of having it depend only on if pyarrow is installed.

If we're going to have one function, having a return_pyarrow=True I think is the best option. It's probably too much duplication here, but otherwise I would've considered an open_arrow_pycapsule_interface() or similar function

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also just check if pyarrow is available, if so return a RecordBatchReader (which will also have the __arrow_c_stream__ method for recent pyarrow), and if not fallback on returning the wrapper object.

Silently returning a different type depending on installed Python packages is generally error-prone for users. I would recommend avoiding this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points, agreed that it's better to not do any "smart" return value depending on what is installed.

I updated the PR (and added some docs) for the current state of having a return_pyarrow keyword with a default of True.

I am personally also still fine with changing the default to False (so you have to ask explicitly for a pyarrow object (to avoid the one line to do it yourself), which makes the keyword also about what you want, not what you want to avoid)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The keyword mentioned in the changelog is use_pyarrow (which I think I prefer) but the implementation is return_pyarrow

**kwargs,
):
"""
Open OGR data source as a stream of pyarrow record batches.
Open OGR data source as a stream of Arrow record batches.

See docstring of `read` for parameters.

The RecordBatchStreamReader is reading from a stream provided by OGR and must not be
The RecordBatchReader is reading from a stream provided by OGR and must not be
accessed after the OGR dataset has been closed, i.e. after the context manager has
been closed.

By default this function returns a `pyarrow.RecordBatchReader`. Optionally,
you can use this function without a `pyarrow` dependency by specifying
``return_pyarrow=False``. In that case, the returned reader will be a
generic object implementing the `Arrow PyCapsule Protocol`_ (i.e. having
an `__arrow_c_stream__` method). This object can then be consumed by
your Arrow implementation of choice that supports this protocol.

.. _Arrow PyCapsule Protocol: https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html

Other Parameters
----------------
batch_size : int (default: 65_536)
Maximum number of features to retrieve in a batch.
return_pyarrow : bool (default: True)
If False, return a generic ArrowStream object instead of a pyarrow
RecordBatchReader. This object needs to be passed to another library
supporting the Arrow PyCapsule Protocol to consume the stream of data.

Examples
--------

Expand All @@ -384,12 +403,22 @@ def open_arrow(
>>> for table in reader:
>>> geometries = shapely.from_wkb(table[meta["geometry_name"]])

Or without directly returning a pyarrow object:

>>> with open_arrow(path) as source:
jorisvandenbossche marked this conversation as resolved.
Show resolved Hide resolved
>>> meta, stream = source
>>> reader = pa.RecordBatchReader.from_stream(stream)
>>> for table in reader:
>>> geometries = shapely.from_wkb(table[meta["geometry_name"]])
jorisvandenbossche marked this conversation as resolved.
Show resolved Hide resolved

Returns
-------
(dict, pyarrow.RecordBatchStreamReader)
(dict, pyarrow.RecordBatchReader or ArrowStream)

Returns a tuple of meta information about the data source in a dict,
and a pyarrow RecordBatchStreamReader with data.
and a data stream object (a pyarrow RecordBatchReader if
`return_pyarrow` is set to True, otherwise a generic ArrowStrem
object).

Meta is: {
"crs": "<crs>",
Expand Down Expand Up @@ -425,6 +454,7 @@ def open_arrow(
return_fids=return_fids,
dataset_kwargs=dataset_kwargs,
batch_size=batch_size,
return_pyarrow=return_pyarrow,
)
finally:
if buffer is not None:
Expand Down
16 changes: 15 additions & 1 deletion pyogrio/tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import os

import pytest

import numpy as np

import pyogrio
from pyogrio import __gdal_version__, read_dataframe
from pyogrio.raw import open_arrow, read_arrow, write
from pyogrio.tests.conftest import requires_arrow_api
Expand Down Expand Up @@ -207,6 +208,19 @@ def test_read_arrow_geoarrow_metadata(naturalearth_lowres):
assert parsed_meta["crs"]["id"]["code"] == 4326


def test_open_arrow_capsule_protocol(naturalearth_lowres):
jorisvandenbossche marked this conversation as resolved.
Show resolved Hide resolved
pytest.importorskip("pyarrow", minversion="14")

with open_arrow(naturalearth_lowres, return_pyarrow=False) as (meta, reader):
assert isinstance(meta, dict)
assert isinstance(reader, pyogrio._io._ArrowStream)

result = pyarrow.table(reader)

_, expected = read_arrow(naturalearth_lowres)
assert result.equals(expected)


@contextlib.contextmanager
def use_arrow_context():
original = os.environ.get("PYOGRIO_USE_ARROW", None)
Expand Down