-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: update IPC format to hold buffer_index #903
Changes from 6 commits
4afb454
2205678
dfbaba3
17b4ca9
1502d77
d2315ea
98f3dc4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -190,6 +190,7 @@ impl StructDType { | |
} | ||
}; | ||
|
||
// TODO: do this without the extra allocations. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove |
||
names.push(self.names[idx].clone()); | ||
dtypes.push(self.dtypes[idx].clone()); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,7 +93,7 @@ impl<'a> flatbuffers::Verifiable for Version { | |
|
||
impl flatbuffers::SimpleToVerifyInSlice for Version {} | ||
pub enum ArrayOffset {} | ||
#[derive(Copy, Clone, PartialEq, Eq)] | ||
#[derive(Copy, Clone, PartialEq)] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure why the generator stripped |
||
|
||
pub struct Array<'a> { | ||
pub _tab: flatbuffers::Table<'a>, | ||
|
@@ -109,7 +109,7 @@ impl<'a> flatbuffers::Follow<'a> for Array<'a> { | |
|
||
impl<'a> Array<'a> { | ||
pub const VT_VERSION: flatbuffers::VOffsetT = 4; | ||
pub const VT_HAS_BUFFER: flatbuffers::VOffsetT = 6; | ||
pub const VT_BUFFER_INDEX: flatbuffers::VOffsetT = 6; | ||
pub const VT_ENCODING: flatbuffers::VOffsetT = 8; | ||
pub const VT_METADATA: flatbuffers::VOffsetT = 10; | ||
pub const VT_STATS: flatbuffers::VOffsetT = 12; | ||
|
@@ -125,11 +125,11 @@ impl<'a> Array<'a> { | |
args: &'args ArrayArgs<'args> | ||
) -> flatbuffers::WIPOffset<Array<'bldr>> { | ||
let mut builder = ArrayBuilder::new(_fbb); | ||
if let Some(x) = args.buffer_index { builder.add_buffer_index(x); } | ||
if let Some(x) = args.children { builder.add_children(x); } | ||
if let Some(x) = args.stats { builder.add_stats(x); } | ||
if let Some(x) = args.metadata { builder.add_metadata(x); } | ||
builder.add_encoding(args.encoding); | ||
builder.add_has_buffer(args.has_buffer); | ||
builder.add_version(args.version); | ||
builder.finish() | ||
} | ||
|
@@ -143,11 +143,11 @@ impl<'a> Array<'a> { | |
unsafe { self._tab.get::<Version>(Array::VT_VERSION, Some(Version::V0)).unwrap()} | ||
} | ||
#[inline] | ||
pub fn has_buffer(&self) -> bool { | ||
pub fn buffer_index(&self) -> Option<u64> { | ||
// Safety: | ||
// Created from valid Table for this object | ||
// which contains a valid value in this slot | ||
unsafe { self._tab.get::<bool>(Array::VT_HAS_BUFFER, Some(false)).unwrap()} | ||
unsafe { self._tab.get::<u64>(Array::VT_BUFFER_INDEX, None)} | ||
} | ||
#[inline] | ||
pub fn encoding(&self) -> u16 { | ||
|
@@ -187,7 +187,7 @@ impl flatbuffers::Verifiable for Array<'_> { | |
use self::flatbuffers::Verifiable; | ||
v.visit_table(pos)? | ||
.visit_field::<Version>("version", Self::VT_VERSION, false)? | ||
.visit_field::<bool>("has_buffer", Self::VT_HAS_BUFFER, false)? | ||
.visit_field::<u64>("buffer_index", Self::VT_BUFFER_INDEX, false)? | ||
.visit_field::<u16>("encoding", Self::VT_ENCODING, false)? | ||
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("metadata", Self::VT_METADATA, false)? | ||
.visit_field::<flatbuffers::ForwardsUOffset<ArrayStats>>("stats", Self::VT_STATS, false)? | ||
|
@@ -198,7 +198,7 @@ impl flatbuffers::Verifiable for Array<'_> { | |
} | ||
pub struct ArrayArgs<'a> { | ||
pub version: Version, | ||
pub has_buffer: bool, | ||
pub buffer_index: Option<u64>, | ||
pub encoding: u16, | ||
pub metadata: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>, | ||
pub stats: Option<flatbuffers::WIPOffset<ArrayStats<'a>>>, | ||
|
@@ -209,7 +209,7 @@ impl<'a> Default for ArrayArgs<'a> { | |
fn default() -> Self { | ||
ArrayArgs { | ||
version: Version::V0, | ||
has_buffer: false, | ||
buffer_index: None, | ||
encoding: 0, | ||
metadata: None, | ||
stats: None, | ||
|
@@ -228,8 +228,8 @@ impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> ArrayBuilder<'a, 'b, A> { | |
self.fbb_.push_slot::<Version>(Array::VT_VERSION, version, Version::V0); | ||
} | ||
#[inline] | ||
pub fn add_has_buffer(&mut self, has_buffer: bool) { | ||
self.fbb_.push_slot::<bool>(Array::VT_HAS_BUFFER, has_buffer, false); | ||
pub fn add_buffer_index(&mut self, buffer_index: u64) { | ||
self.fbb_.push_slot_always::<u64>(Array::VT_BUFFER_INDEX, buffer_index); | ||
} | ||
#[inline] | ||
pub fn add_encoding(&mut self, encoding: u16) { | ||
|
@@ -266,7 +266,7 @@ impl core::fmt::Debug for Array<'_> { | |
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { | ||
let mut ds = f.debug_struct("Array"); | ||
ds.field("version", &self.version()); | ||
ds.field("has_buffer", &self.has_buffer()); | ||
ds.field("buffer_index", &self.buffer_index()); | ||
ds.field("encoding", &self.encoding()); | ||
ds.field("metadata", &self.metadata()); | ||
ds.field("stats", &self.stats()); | ||
|
@@ -275,7 +275,7 @@ impl core::fmt::Debug for Array<'_> { | |
} | ||
} | ||
pub enum ArrayStatsOffset {} | ||
#[derive(Copy, Clone, PartialEq, Eq)] | ||
#[derive(Copy, Clone, PartialEq)] | ||
|
||
pub struct ArrayStats<'a> { | ||
pub _tab: flatbuffers::Table<'a>, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,7 @@ pub enum IPCMessage<'a> { | |
|
||
pub struct IPCSchema<'a>(pub &'a DType); | ||
pub struct IPCBatch<'a>(pub &'a Array); | ||
pub struct IPCArray<'a>(pub &'a Array); | ||
pub struct IPCArray<'a>(pub &'a Array, usize); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the second argument here is the cumulative buffer index observed before array |
||
pub struct IPCPage<'a>(pub &'a Buffer); | ||
|
||
pub struct IPCDType(pub DType); | ||
|
@@ -87,13 +87,15 @@ impl<'a> WriteFlatBuffer for IPCBatch<'a> { | |
fbb: &mut FlatBufferBuilder<'fb>, | ||
) -> WIPOffset<Self::Target<'fb>> { | ||
let array_data = self.0; | ||
let array = Some(IPCArray(array_data).write_flatbuffer(fbb)); | ||
let array = Some(IPCArray(array_data, 0).write_flatbuffer(fbb)); | ||
|
||
let length = array_data.len() as u64; | ||
|
||
// Walk the ColumnData depth-first to compute the buffer offsets. | ||
let mut buffers = vec![]; | ||
let mut offset = 0; | ||
|
||
// buffer offset should be saved instead of the has_buffers stuff | ||
for array_data in array_data.depth_first_traversal() { | ||
if let Some(buffer) = array_data.buffer() { | ||
let aligned_size = (buffer.len() + (ALIGNMENT - 1)) & !(ALIGNMENT - 1); | ||
|
@@ -146,10 +148,21 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> { | |
), | ||
}; | ||
|
||
// Assign buffer indices for all child arrays. | ||
// The second tuple element holds the buffer_index for this Array subtree. If this array | ||
// has a buffer, that is its buffer index. If it does not, that buffer index belongs | ||
// to one of the children. | ||
let child_buffer_offset = self.1 + if self.0.buffer().is_some() { 1 } else { 0 }; | ||
|
||
let children = column_data | ||
.children() | ||
.iter() | ||
.map(|child| IPCArray(child).write_flatbuffer(fbb)) | ||
.scan(child_buffer_offset, |buffer_offset, child| { | ||
// Update the number of buffers required. | ||
let msg = IPCArray(child, *buffer_offset).write_flatbuffer(fbb); | ||
*buffer_offset += child.cumulative_nbuffers(); | ||
Some(msg) | ||
}) | ||
.collect_vec(); | ||
let children = Some(fbb.create_vector(&children)); | ||
|
||
|
@@ -159,7 +172,7 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> { | |
fbb, | ||
&fba::ArrayArgs { | ||
version: Default::default(), | ||
has_buffer: column_data.buffer().is_some(), | ||
buffer_index: self.0.buffer().is_some().then_some(self.1 as u64), | ||
encoding, | ||
metadata, | ||
stats, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note that we no longer need to slice, b/c each Array fb message contains the
buffer_index
field, which is a global offset into the buffers list