Skip to content

Commit

Permalink
Implement PyArrowType for Box<dyn RecordBatchReader + Send> (#4751)
Browse files Browse the repository at this point in the history
* implement for boxed rbr

* add docs
  • Loading branch information
wjones127 authored Sep 1, 2023
1 parent eeba0a3 commit 4927c1e
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 6 deletions.
16 changes: 16 additions & 0 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use std::sync::Arc;

use arrow::array::new_empty_array;
use arrow::record_batch::{RecordBatchIterator, RecordBatchReader};
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::wrap_pyfunction;
Expand Down Expand Up @@ -152,6 +153,20 @@ fn reader_return_errors(obj: PyArrowType<ArrowArrayStreamReader>) -> PyResult<()
}
}

#[pyfunction]
fn boxed_reader_roundtrip(
obj: PyArrowType<ArrowArrayStreamReader>,
) -> PyArrowType<Box<dyn RecordBatchReader + Send>> {
let schema = obj.0.schema();
let batches = obj
.0
.collect::<Result<Vec<RecordBatch>, ArrowError>>()
.unwrap();
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
PyArrowType(reader)
}

#[pymodule]
fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(double))?;
Expand All @@ -166,5 +181,6 @@ fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()>
m.add_wrapped(wrap_pyfunction!(round_trip_record_batch))?;
m.add_wrapped(wrap_pyfunction!(round_trip_record_batch_reader))?;
m.add_wrapped(wrap_pyfunction!(reader_return_errors))?;
m.add_wrapped(wrap_pyfunction!(boxed_reader_roundtrip))?;
Ok(())
}
7 changes: 7 additions & 0 deletions arrow-pyarrow-integration-testing/tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,13 @@ def test_record_batch_reader():
got_batches = list(b)
assert got_batches == batches

# Also try the boxed reader variant
a = pa.RecordBatchReader.from_batches(schema, batches)
b = rust.boxed_reader_roundtrip(a)
assert b.schema == schema
got_batches = list(b)
assert got_batches == batches

def test_record_batch_reader_error():
schema = pa.schema([('ints', pa.list_(pa.int32()))])

Expand Down
3 changes: 2 additions & 1 deletion arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ pub mod pyarrow;

pub mod record_batch {
pub use arrow_array::{
RecordBatch, RecordBatchOptions, RecordBatchReader, RecordBatchWriter,
RecordBatch, RecordBatchIterator, RecordBatchOptions, RecordBatchReader,
RecordBatchWriter,
};
}
pub use arrow_array::temporal_conversions;
Expand Down
59 changes: 54 additions & 5 deletions arrow/src/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,51 @@
// specific language governing permissions and limitations
// under the License.

//! Pass Arrow objects from and to Python, using Arrow's
//! Pass Arrow objects from and to PyArrow, using Arrow's
//! [C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html)
//! and [pyo3](https://docs.rs/pyo3/latest/pyo3/).
//! For underlying implementation, see the [ffi] module.
//!
//! One can use these to write Python functions that take and return PyArrow
//! objects, with automatic conversion to corresponding arrow-rs types.
//!
//! ```ignore
//! #[pyfunction]
//! fn double_array(array: PyArrowType<ArrayData>) -> PyResult<PyArrowType<ArrayData>> {
//! let array = array.0; // Extract from PyArrowType wrapper
//! let array: Arc<dyn Array> = make_array(array); // Convert ArrayData to ArrayRef
//! let array: &Int32Array = array.as_any().downcast_ref()
//! .ok_or_else(|| PyValueError::new_err("expected int32 array"))?;
//! let array: Int32Array = array.iter().map(|x| x.map(|x| x * 2)).collect();
//! Ok(PyArrowType(array.into_data()))
//! }
//! ```
//!
//! | pyarrow type | arrow-rs type |
//! |-----------------------------|--------------------------------------------------------------------|
//! | `pyarrow.DataType` | [DataType] |
//! | `pyarrow.Field` | [Field] |
//! | `pyarrow.Schema` | [Schema] |
//! | `pyarrow.Array` | [ArrayData] |
//! | `pyarrow.RecordBatch` | [RecordBatch] |
//! | `pyarrow.RecordBatchReader` | [ArrowArrayStreamReader] / `Box<dyn RecordBatchReader + Send>` (1) |
//!
//! (1) `pyarrow.RecordBatchReader` can be imported as [ArrowArrayStreamReader]. Either
//! [ArrowArrayStreamReader] or `Box<dyn RecordBatchReader + Send>` can be exported
//! as `pyarrow.RecordBatchReader`. (`Box<dyn RecordBatchReader + Send>` is typically
//! easier to create.)
//!
//! PyArrow has the notion of chunked arrays and tables, but arrow-rs doesn't
//! have these same concepts. A chunked table is instead represented with
//! `Vec<RecordBatch>`. A `pyarrow.Table` can be imported to Rust by calling
//! [pyarrow.Table.to_reader()](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_reader)
//! and then importing the reader as a [ArrowArrayStreamReader].
use std::convert::{From, TryFrom};
use std::ptr::{addr_of, addr_of_mut};
use std::sync::Arc;

use arrow_array::RecordBatchReader;
use pyo3::exceptions::{PyTypeError, PyValueError};
use pyo3::ffi::Py_uintptr_t;
use pyo3::import_exception;
Expand Down Expand Up @@ -256,6 +292,7 @@ impl ToPyArrow for RecordBatch {
}
}

/// Supports conversion from `pyarrow.RecordBatchReader` to [ArrowArrayStreamReader].
impl FromPyArrow for ArrowArrayStreamReader {
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
validate_class("RecordBatchReader", value)?;
Expand All @@ -277,10 +314,13 @@ impl FromPyArrow for ArrowArrayStreamReader {
}
}

impl IntoPyArrow for ArrowArrayStreamReader {
/// Convert a [`RecordBatchReader`] into a `pyarrow.RecordBatchReader`.
impl IntoPyArrow for Box<dyn RecordBatchReader + Send> {
// We can't implement `ToPyArrow` for `T: RecordBatchReader + Send` because
// there is already a blanket implementation for `T: ToPyArrow`.
fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
let mut stream = FFI_ArrowArrayStream::empty();
unsafe { export_reader_into_raw(Box::new(self), &mut stream) };
unsafe { export_reader_into_raw(self, &mut stream) };

let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream;
let module = py.import("pyarrow")?;
Expand All @@ -292,8 +332,17 @@ impl IntoPyArrow for ArrowArrayStreamReader {
}
}

/// A newtype wrapper around a `T: PyArrowConvert` that implements
/// [`FromPyObject`] and [`IntoPy`] allowing usage with pyo3 macros
/// Convert a [`ArrowArrayStreamReader`] into a `pyarrow.RecordBatchReader`.
impl IntoPyArrow for ArrowArrayStreamReader {
fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
let boxed: Box<dyn RecordBatchReader + Send> = Box::new(self);
boxed.into_pyarrow(py)
}
}

/// A newtype wrapper. When wrapped around a type `T: FromPyArrow`, it
/// implements `FromPyObject` for the PyArrow objects. When wrapped around a
/// `T: IntoPyArrow`, it implements `IntoPy<PyObject>` for the wrapped type.
#[derive(Debug)]
pub struct PyArrowType<T>(pub T);

Expand Down

0 comments on commit 4927c1e

Please sign in to comment.