diff --git a/arrow-pyarrow-integration-testing/src/lib.rs b/arrow-pyarrow-integration-testing/src/lib.rs index adcec769f247..a53447b53c31 100644 --- a/arrow-pyarrow-integration-testing/src/lib.rs +++ b/arrow-pyarrow-integration-testing/src/lib.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use arrow::array::new_empty_array; +use arrow::record_batch::{RecordBatchIterator, RecordBatchReader}; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::wrap_pyfunction; @@ -152,6 +153,20 @@ fn reader_return_errors(obj: PyArrowType) -> PyResult<() } } +#[pyfunction] +fn boxed_reader_roundtrip( + obj: PyArrowType, +) -> PyArrowType> { + let schema = obj.0.schema(); + let batches = obj + .0 + .collect::, ArrowError>>() + .unwrap(); + let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema); + let reader: Box = Box::new(reader); + PyArrowType(reader) +} + #[pymodule] fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(double))?; @@ -166,5 +181,6 @@ fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> m.add_wrapped(wrap_pyfunction!(round_trip_record_batch))?; m.add_wrapped(wrap_pyfunction!(round_trip_record_batch_reader))?; m.add_wrapped(wrap_pyfunction!(reader_return_errors))?; + m.add_wrapped(wrap_pyfunction!(boxed_reader_roundtrip))?; Ok(()) } diff --git a/arrow-pyarrow-integration-testing/tests/test_sql.py b/arrow-pyarrow-integration-testing/tests/test_sql.py index e2e8d66c0f29..3be5b9ec52fe 100644 --- a/arrow-pyarrow-integration-testing/tests/test_sql.py +++ b/arrow-pyarrow-integration-testing/tests/test_sql.py @@ -409,6 +409,13 @@ def test_record_batch_reader(): got_batches = list(b) assert got_batches == batches + # Also try the boxed reader variant + a = pa.RecordBatchReader.from_batches(schema, batches) + b = rust.boxed_reader_roundtrip(a) + assert b.schema == schema + got_batches = list(b) + assert got_batches == batches + def test_record_batch_reader_error(): schema = pa.schema([('ints', pa.list_(pa.int32()))]) diff --git a/arrow/src/lib.rs b/arrow/src/lib.rs index fb904c1908e6..f4d0585fa6b5 100644 --- a/arrow/src/lib.rs +++ b/arrow/src/lib.rs @@ -375,7 +375,8 @@ pub mod pyarrow; pub mod record_batch { pub use arrow_array::{ - RecordBatch, RecordBatchOptions, RecordBatchReader, RecordBatchWriter, + RecordBatch, RecordBatchIterator, RecordBatchOptions, RecordBatchReader, + RecordBatchWriter, }; } pub use arrow_array::temporal_conversions; diff --git a/arrow/src/pyarrow.rs b/arrow/src/pyarrow.rs index 0e9669c5e9fa..6063ae763228 100644 --- a/arrow/src/pyarrow.rs +++ b/arrow/src/pyarrow.rs @@ -15,15 +15,51 @@ // specific language governing permissions and limitations // under the License. -//! Pass Arrow objects from and to Python, using Arrow's +//! Pass Arrow objects from and to PyArrow, using Arrow's //! [C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html) //! and [pyo3](https://docs.rs/pyo3/latest/pyo3/). //! For underlying implementation, see the [ffi] module. +//! +//! One can use these to write Python functions that take and return PyArrow +//! objects, with automatic conversion to corresponding arrow-rs types. +//! +//! ```ignore +//! #[pyfunction] +//! fn double_array(array: PyArrowType) -> PyResult> { +//! let array = array.0; // Extract from PyArrowType wrapper +//! let array: Arc = make_array(array); // Convert ArrayData to ArrayRef +//! let array: &Int32Array = array.as_any().downcast_ref() +//! .ok_or_else(|| PyValueError::new_err("expected int32 array"))?; +//! let array: Int32Array = array.iter().map(|x| x.map(|x| x * 2)).collect(); +//! Ok(PyArrowType(array.into_data())) +//! } +//! ``` +//! +//! | pyarrow type | arrow-rs type | +//! |-----------------------------|--------------------------------------------------------------------| +//! | `pyarrow.DataType` | [DataType] | +//! | `pyarrow.Field` | [Field] | +//! | `pyarrow.Schema` | [Schema] | +//! | `pyarrow.Array` | [ArrayData] | +//! | `pyarrow.RecordBatch` | [RecordBatch] | +//! | `pyarrow.RecordBatchReader` | [ArrowArrayStreamReader] / `Box` (1) | +//! +//! (1) `pyarrow.RecordBatchReader` can be imported as [ArrowArrayStreamReader]. Either +//! [ArrowArrayStreamReader] or `Box` can be exported +//! as `pyarrow.RecordBatchReader`. (`Box` is typically +//! easier to create.) +//! +//! PyArrow has the notion of chunked arrays and tables, but arrow-rs doesn't +//! have these same concepts. A chunked table is instead represented with +//! `Vec`. A `pyarrow.Table` can be imported to Rust by calling +//! [pyarrow.Table.to_reader()](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_reader) +//! and then importing the reader as a [ArrowArrayStreamReader]. use std::convert::{From, TryFrom}; use std::ptr::{addr_of, addr_of_mut}; use std::sync::Arc; +use arrow_array::RecordBatchReader; use pyo3::exceptions::{PyTypeError, PyValueError}; use pyo3::ffi::Py_uintptr_t; use pyo3::import_exception; @@ -256,6 +292,7 @@ impl ToPyArrow for RecordBatch { } } +/// Supports conversion from `pyarrow.RecordBatchReader` to [ArrowArrayStreamReader]. impl FromPyArrow for ArrowArrayStreamReader { fn from_pyarrow(value: &PyAny) -> PyResult { validate_class("RecordBatchReader", value)?; @@ -277,10 +314,13 @@ impl FromPyArrow for ArrowArrayStreamReader { } } -impl IntoPyArrow for ArrowArrayStreamReader { +/// Convert a [`RecordBatchReader`] into a `pyarrow.RecordBatchReader`. +impl IntoPyArrow for Box { + // We can't implement `ToPyArrow` for `T: RecordBatchReader + Send` because + // there is already a blanket implementation for `T: ToPyArrow`. fn into_pyarrow(self, py: Python) -> PyResult { let mut stream = FFI_ArrowArrayStream::empty(); - unsafe { export_reader_into_raw(Box::new(self), &mut stream) }; + unsafe { export_reader_into_raw(self, &mut stream) }; let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream; let module = py.import("pyarrow")?; @@ -292,8 +332,17 @@ impl IntoPyArrow for ArrowArrayStreamReader { } } -/// A newtype wrapper around a `T: PyArrowConvert` that implements -/// [`FromPyObject`] and [`IntoPy`] allowing usage with pyo3 macros +/// Convert a [`ArrowArrayStreamReader`] into a `pyarrow.RecordBatchReader`. +impl IntoPyArrow for ArrowArrayStreamReader { + fn into_pyarrow(self, py: Python) -> PyResult { + let boxed: Box = Box::new(self); + boxed.into_pyarrow(py) + } +} + +/// A newtype wrapper. When wrapped around a type `T: FromPyArrow`, it +/// implements `FromPyObject` for the PyArrow objects. When wrapped around a +/// `T: IntoPyArrow`, it implements `IntoPy` for the wrapped type. #[derive(Debug)] pub struct PyArrowType(pub T);