-
Notifications
You must be signed in to change notification settings - Fork 819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: export record batch through stream #4806
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -393,6 +393,23 @@ def test_sparse_union_python(): | |
del a | ||
del b | ||
|
||
def test_tensor_array(): | ||
tensor_type = pa.fixed_shape_tensor(pa.float32(), [2, 3]) | ||
inner = pa.array([float(x) for x in range(1, 7)] + [None] * 12, pa.float32()) | ||
storage = pa.FixedSizeListArray.from_arrays(inner, 6) | ||
f32_array = pa.ExtensionArray.from_storage(tensor_type, storage) | ||
|
||
# Round-tripping as an array gives back storage type, because arrow-rs has | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I must be missing something fundamental here, an array can only have the storage type?? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mean because the extension metadata is in the field metadata, and thus separate from the Array? Arrays typically have a DataType associated with it, so this depends on whether your Arrow implementation has an It actually looks like Arrow C++ exports https://github.com/apache/arrow/blob/b7581fee01ed0d111d5a0361c2f05779aa3c33e8/cpp/src/arrow/c/bridge.cc#L189 So if arrow-rs had an |
||
# no notion of extension types. | ||
b = rust.round_trip_array(f32_array) | ||
assert b == f32_array.storage | ||
|
||
batch = pa.record_batch([f32_array], ["tensor"]) | ||
b = rust.round_trip_record_batch(batch) | ||
assert b == batch | ||
|
||
del b | ||
|
||
def test_record_batch_reader(): | ||
""" | ||
Python -> Rust -> Python | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,14 +59,14 @@ use std::convert::{From, TryFrom}; | |
use std::ptr::{addr_of, addr_of_mut}; | ||
use std::sync::Arc; | ||
|
||
use arrow_array::RecordBatchReader; | ||
use arrow_array::{RecordBatchIterator, RecordBatchReader}; | ||
use pyo3::exceptions::{PyTypeError, PyValueError}; | ||
use pyo3::ffi::Py_uintptr_t; | ||
use pyo3::import_exception; | ||
use pyo3::prelude::*; | ||
use pyo3::types::{PyDict, PyList, PyTuple}; | ||
use pyo3::types::{PyList, PyTuple}; | ||
|
||
use crate::array::{make_array, Array, ArrayData}; | ||
use crate::array::{make_array, ArrayData}; | ||
use crate::datatypes::{DataType, Field, Schema}; | ||
use crate::error::ArrowError; | ||
use crate::ffi; | ||
|
@@ -270,25 +270,13 @@ impl FromPyArrow for RecordBatch { | |
|
||
impl ToPyArrow for RecordBatch { | ||
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> { | ||
let mut py_arrays = vec![]; | ||
|
||
let schema = self.schema(); | ||
let columns = self.columns().iter(); | ||
|
||
for array in columns { | ||
py_arrays.push(array.to_data().to_pyarrow(py)?); | ||
} | ||
|
||
let py_schema = schema.to_pyarrow(py)?; | ||
|
||
let module = py.import("pyarrow")?; | ||
let class = module.getattr("RecordBatch")?; | ||
let args = (py_arrays,); | ||
let kwargs = PyDict::new(py); | ||
kwargs.set_item("schema", py_schema)?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why doesn't this work? The schema is provided with the arrays? Is there some limitation of pyarrow's from_arrays method? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Basically, PyArrow considers there to be a mismatch if passed a schema with an extension type but the arrays passed are all storage arrays. I created an issue in PyArrow's tracker to fix this: In theory, this code should be fine, so we can consider this a workaround for a bug in PyArrow 😁 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps we could link to the upstream bug in the code? So that we can potentially avoid this at some point in the future |
||
let record = class.call_method("from_arrays", args, Some(kwargs))?; | ||
|
||
Ok(PyObject::from(record)) | ||
// arrow::ffi is implemented for Arrays not RecordBatches, so the next | ||
// best thing is to wrap it in a RecordBatchIterator and export that. | ||
tustvold marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let reader = | ||
RecordBatchIterator::new(vec![Ok(self.clone())], self.schema().clone()); | ||
let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader); | ||
let py_reader = reader.into_pyarrow(py)?; | ||
py_reader.call_method0(py, "read_next_batch") | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just as an observation, it is rather strange to me that extension arrays would be a first-class abstraction and in so doing obfuscate the underlying storage type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for systems where RecordBatch is a type exposed to end-users at the interface, the obfuscation is a feature, not a bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it is unfortunate that arrow doesn't have a clear separation between logical and physical types. IMO DataType is a physical type, whereas extension types are definitely in the category of logical types, it's a bit of a mess 😅