Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patched 40.0.0 with Parquet memory limiting40 #37

Open
wants to merge 9 commits into
base: alamb/40.0.0_base
Choose a base branch
from
10 changes: 6 additions & 4 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ use arrow::array::new_empty_array;
use pyo3::prelude::*;
use pyo3::wrap_pyfunction;

use arrow::array::{Array, ArrayData, ArrayRef, Int64Array, make_array};
use arrow::array::{make_array, Array, ArrayData, ArrayRef, Int64Array};
use arrow::compute::kernels;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::error::ArrowError;
use arrow::ffi_stream::ArrowArrayStreamReader;
use arrow::pyarrow::{PyArrowConvert, PyArrowException, PyArrowType};
use arrow::pyarrow::{FromPyArrow, PyArrowException, PyArrowType, ToPyArrow};
use arrow::record_batch::RecordBatch;

fn to_py_err(err: ArrowError) -> PyErr {
Expand Down Expand Up @@ -88,7 +88,8 @@ fn substring(
let array = make_array(array.0);

// substring
let array = kernels::substring::substring(array.as_ref(), start, None).map_err(to_py_err)?;
let array =
kernels::substring::substring(array.as_ref(), start, None).map_err(to_py_err)?;

Ok(array.to_data().into())
}
Expand All @@ -99,7 +100,8 @@ fn concatenate(array: PyArrowType<ArrayData>, py: Python) -> PyResult<PyObject>
let array = make_array(array.0);

// concat
let array = kernels::concat::concat(&[array.as_ref(), array.as_ref()]).map_err(to_py_err)?;
let array =
kernels::concat::concat(&[array.as_ref(), array.as_ref()]).map_err(to_py_err)?;

array.to_data().to_pyarrow(py)
}
Expand Down
99 changes: 34 additions & 65 deletions arrow/src/ffi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,19 @@
//! let reader = Box::new(FileReader::try_new(file).unwrap());
//!
//! // export it
//! let stream = Box::new(FFI_ArrowArrayStream::empty());
//! let stream_ptr = Box::into_raw(stream) as *mut FFI_ArrowArrayStream;
//! unsafe { export_reader_into_raw(reader, stream_ptr) };
//! let mut stream = FFI_ArrowArrayStream::empty();
//! unsafe { export_reader_into_raw(reader, &mut stream) };
//!
//! // consumed and used by something else...
//!
//! // import it
//! let stream_reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr).unwrap() };
//! let stream_reader = unsafe { ArrowArrayStreamReader::from_raw(&mut stream).unwrap() };
//! let imported_schema = stream_reader.schema();
//!
//! let mut produced_batches = vec![];
//! for batch in stream_reader {
//! produced_batches.push(batch.unwrap());
//! }
//!
//! // (drop/release)
//! unsafe {
//! Box::from_raw(stream_ptr);
//! }
//! Ok(())
//! }
//! ```
Expand Down Expand Up @@ -105,6 +99,8 @@ pub struct FFI_ArrowArrayStream {
pub private_data: *mut c_void,
}

unsafe impl Send for FFI_ArrowArrayStream {}

// callback used to drop [FFI_ArrowArrayStream] when it is exported.
unsafe extern "C" fn release_stream(stream: *mut FFI_ArrowArrayStream) {
if stream.is_null() {
Expand Down Expand Up @@ -231,8 +227,7 @@ impl ExportedArrayStream {
let struct_array = StructArray::from(batch);
let array = FFI_ArrowArray::new(&struct_array.to_data());

unsafe { std::ptr::copy(addr_of!(array), out, 1) };
std::mem::forget(array);
unsafe { std::ptr::write_unaligned(out, array) };
0
} else {
let err = &next_batch.unwrap_err();
Expand Down Expand Up @@ -261,24 +256,21 @@ fn get_error_code(err: &ArrowError) -> i32 {
/// Struct used to fetch `RecordBatch` from the C Stream Interface.
/// Its main responsibility is to expose `RecordBatchReader` functionality
/// that requires [FFI_ArrowArrayStream].
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct ArrowArrayStreamReader {
stream: Arc<FFI_ArrowArrayStream>,
stream: FFI_ArrowArrayStream,
schema: SchemaRef,
}

/// Gets schema from a raw pointer of `FFI_ArrowArrayStream`. This is used when constructing
/// `ArrowArrayStreamReader` to cache schema.
fn get_stream_schema(stream_ptr: *mut FFI_ArrowArrayStream) -> Result<SchemaRef> {
let empty_schema = Arc::new(FFI_ArrowSchema::empty());
let schema_ptr = Arc::into_raw(empty_schema) as *mut FFI_ArrowSchema;
let mut schema = FFI_ArrowSchema::empty();

let ret_code = unsafe { (*stream_ptr).get_schema.unwrap()(stream_ptr, schema_ptr) };

let ffi_schema = unsafe { Arc::from_raw(schema_ptr) };
let ret_code = unsafe { (*stream_ptr).get_schema.unwrap()(stream_ptr, &mut schema) };

if ret_code == 0 {
let schema = Schema::try_from(ffi_schema.as_ref()).unwrap();
let schema = Schema::try_from(&schema).unwrap();
Ok(Arc::new(schema))
} else {
Err(ArrowError::CDataInterface(format!(
Expand All @@ -291,21 +283,16 @@ impl ArrowArrayStreamReader {
/// Creates a new `ArrowArrayStreamReader` from a `FFI_ArrowArrayStream`.
/// This is used to import from the C Stream Interface.
#[allow(dead_code)]
pub fn try_new(stream: FFI_ArrowArrayStream) -> Result<Self> {
pub fn try_new(mut stream: FFI_ArrowArrayStream) -> Result<Self> {
if stream.release.is_none() {
return Err(ArrowError::CDataInterface(
"input stream is already released".to_string(),
));
}

let stream_ptr = Arc::into_raw(Arc::new(stream)) as *mut FFI_ArrowArrayStream;

let schema = get_stream_schema(stream_ptr)?;
let schema = get_stream_schema(&mut stream)?;

Ok(Self {
stream: unsafe { Arc::from_raw(stream_ptr) },
schema,
})
Ok(Self { stream, schema })
}

/// Creates a new `ArrowArrayStreamReader` from a raw pointer of `FFI_ArrowArrayStream`.
Expand All @@ -324,13 +311,12 @@ impl ArrowArrayStreamReader {
}

/// Get the last error from `ArrowArrayStreamReader`
fn get_stream_last_error(&self) -> Option<String> {
fn get_stream_last_error(&mut self) -> Option<String> {
self.stream.get_last_error?;

let stream_ptr = Arc::as_ptr(&self.stream) as *mut FFI_ArrowArrayStream;

let error_str = unsafe {
let c_str = self.stream.get_last_error.unwrap()(stream_ptr) as *mut c_char;
let c_str =
self.stream.get_last_error.unwrap()(&mut self.stream) as *mut c_char;
CString::from_raw(c_str).into_string()
};

Expand All @@ -346,26 +332,22 @@ impl Iterator for ArrowArrayStreamReader {
type Item = Result<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
let stream_ptr = Arc::as_ptr(&self.stream) as *mut FFI_ArrowArrayStream;

let empty_array = Arc::new(FFI_ArrowArray::empty());
let array_ptr = Arc::into_raw(empty_array) as *mut FFI_ArrowArray;
let mut array = FFI_ArrowArray::empty();

let ret_code = unsafe { self.stream.get_next.unwrap()(stream_ptr, array_ptr) };
let ret_code =
unsafe { self.stream.get_next.unwrap()(&mut self.stream, &mut array) };

if ret_code == 0 {
let ffi_array = unsafe { Arc::from_raw(array_ptr) };

// The end of stream has been reached
if ffi_array.is_released() {
if array.is_released() {
return None;
}

let schema_ref = self.schema();
let schema = FFI_ArrowSchema::try_from(schema_ref.as_ref()).ok()?;

let data = ArrowArray {
array: ffi_array,
array: Arc::new(array),
schema: Arc::new(schema),
}
.to_data()
Expand All @@ -375,8 +357,6 @@ impl Iterator for ArrowArrayStreamReader {

Some(Ok(record_batch))
} else {
unsafe { Arc::from_raw(array_ptr) };

let last_error = self.get_stream_last_error();
let err = ArrowError::CDataInterface(last_error.unwrap());
Some(Err(err))
Expand Down Expand Up @@ -451,40 +431,33 @@ mod tests {
let reader = TestRecordBatchReader::new(schema.clone(), iter);

// Export a `RecordBatchReader` through `FFI_ArrowArrayStream`
let stream = Arc::new(FFI_ArrowArrayStream::empty());
let stream_ptr = Arc::into_raw(stream) as *mut FFI_ArrowArrayStream;

unsafe { export_reader_into_raw(reader, stream_ptr) };

let empty_schema = Arc::new(FFI_ArrowSchema::empty());
let schema_ptr = Arc::into_raw(empty_schema) as *mut FFI_ArrowSchema;
let mut ffi_stream = FFI_ArrowArrayStream::empty();
unsafe { export_reader_into_raw(reader, &mut ffi_stream) };

// Get schema from `FFI_ArrowArrayStream`
let ret_code = unsafe { get_schema(stream_ptr, schema_ptr) };
let mut ffi_schema = FFI_ArrowSchema::empty();
let ret_code = unsafe { get_schema(&mut ffi_stream, &mut ffi_schema) };
assert_eq!(ret_code, 0);

let ffi_schema = unsafe { Arc::from_raw(schema_ptr) };

let exported_schema = Schema::try_from(ffi_schema.as_ref()).unwrap();
let exported_schema = Schema::try_from(&ffi_schema).unwrap();
assert_eq!(&exported_schema, schema.as_ref());

let ffi_schema = Arc::new(ffi_schema);

// Get array from `FFI_ArrowArrayStream`
let mut produced_batches = vec![];
loop {
let empty_array = Arc::new(FFI_ArrowArray::empty());
let array_ptr = Arc::into_raw(empty_array.clone()) as *mut FFI_ArrowArray;

let ret_code = unsafe { get_next(stream_ptr, array_ptr) };
let mut ffi_array = FFI_ArrowArray::empty();
let ret_code = unsafe { get_next(&mut ffi_stream, &mut ffi_array) };
assert_eq!(ret_code, 0);

// The end of stream has been reached
let ffi_array = unsafe { Arc::from_raw(array_ptr) };
if ffi_array.is_released() {
break;
}

let array = ArrowArray {
array: ffi_array,
array: Arc::new(ffi_array),
schema: ffi_schema.clone(),
}
.to_data()
Expand All @@ -496,7 +469,6 @@ mod tests {

assert_eq!(produced_batches, vec![batch.clone(), batch]);

unsafe { Arc::from_raw(stream_ptr) };
Ok(())
}

Expand All @@ -512,10 +484,8 @@ mod tests {
let reader = TestRecordBatchReader::new(schema.clone(), iter);

// Import through `FFI_ArrowArrayStream` as `ArrowArrayStreamReader`
let stream = Arc::new(FFI_ArrowArrayStream::new(reader));
let stream_ptr = Arc::into_raw(stream) as *mut FFI_ArrowArrayStream;
let stream_reader =
unsafe { ArrowArrayStreamReader::from_raw(stream_ptr).unwrap() };
let stream = FFI_ArrowArrayStream::new(reader);
let stream_reader = ArrowArrayStreamReader::try_new(stream).unwrap();

let imported_schema = stream_reader.schema();
assert_eq!(imported_schema, schema);
Expand All @@ -527,7 +497,6 @@ mod tests {

assert_eq!(produced_batches, vec![batch.clone(), batch]);

unsafe { Arc::from_raw(stream_ptr) };
Ok(())
}

Expand Down
Loading