Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update IPC format to hold buffer_index #903

Merged
merged 7 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions vortex-array/src/array/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use vortex_error::{vortex_bail, vortex_panic, VortexExpect as _, VortexResult};
use vortex_scalar::Scalar;

use crate::array::primitive::PrimitiveArray;
use crate::compute::unary::{scalar_at, subtract_scalar, SubtractScalarFn};
use crate::compute::unary::{scalar_at, scalar_at_unchecked, subtract_scalar, SubtractScalarFn};
use crate::compute::{search_sorted, SearchSortedSide};
use crate::encoding::ids;
use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
Expand Down Expand Up @@ -72,8 +72,12 @@ impl ChunkedArray {

#[inline]
pub fn chunk(&self, idx: usize) -> VortexResult<Array> {
let chunk_start = usize::try_from(&scalar_at(&self.chunk_offsets(), idx)?)?;
let chunk_end = usize::try_from(&scalar_at(&self.chunk_offsets(), idx + 1)?)?;
if idx >= self.nchunks() {
vortex_bail!("chunk index {} > num chunks ({})", idx, self.nchunks());
}

let chunk_start = usize::try_from(&scalar_at_unchecked(&self.chunk_offsets(), idx))?;
let chunk_end = usize::try_from(&scalar_at_unchecked(&self.chunk_offsets(), idx + 1))?;

// Offset the index since chunk_ends is child 0.
self.as_ref()
Expand Down
4 changes: 1 addition & 3 deletions vortex-array/src/array/null/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use serde::{Deserialize, Serialize};
use vortex_dtype::DType;
use vortex_error::{VortexExpect as _, VortexResult};
Expand All @@ -26,7 +24,7 @@ impl NullArray {
DType::Null,
len,
NullMetadata { len },
Arc::new([]),
[].into(),
StatsSet::nulls(len, &DType::Null),
)
.vortex_expect("NullArray::new should never fail!")
Expand Down
9 changes: 9 additions & 0 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ impl Array {
ArrayChildrenIterator::new(self.clone())
}

/// Count the number of cumulative buffers encoded by self.
pub fn cumulative_nbuffers(&self) -> usize {
self.children()
.iter()
.map(|child| child.cumulative_nbuffers())
.sum::<usize>()
+ if self.buffer().is_some() { 1 } else { 0 }
}

/// Return the buffer offsets and the total length of all buffers, assuming the given alignment.
/// This includes all child buffers.
pub fn all_buffer_offsets(&self, alignment: usize) -> Vec<u64> {
Expand Down
44 changes: 8 additions & 36 deletions vortex-array/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use itertools::Itertools;
use log::warn;
use vortex_buffer::Buffer;
use vortex_dtype::{DType, Nullability};
use vortex_error::{vortex_bail, vortex_err, VortexError, VortexExpect as _, VortexResult};
use vortex_error::{vortex_err, VortexError, VortexExpect as _, VortexResult};
use vortex_scalar::{PValue, Scalar, ScalarValue};

use crate::encoding::EncodingRef;
Expand All @@ -22,8 +22,7 @@ pub struct ArrayView {
len: usize,
flatbuffer: Buffer,
flatbuffer_loc: usize,
// TODO(ngates): create an RC'd vector that can be lazily sliced.
buffers: Vec<Buffer>,
buffers: Arc<[Buffer]>,
ctx: Arc<Context>,
// TODO(ngates): a store a Projection. A projected ArrayView contains the full fb::Array
// metadata, but only the buffers from the selected columns. Therefore we need to know
Expand Down Expand Up @@ -60,20 +59,13 @@ impl ArrayView {
.lookup_encoding(array.encoding())
.ok_or_else(|| vortex_err!(InvalidSerde: "Encoding ID out of bounds"))?;

if buffers.len() != Self::cumulative_nbuffers(array) {
vortex_bail!(InvalidSerde:
"Incorrect number of buffers {}, expected {}",
buffers.len(),
Self::cumulative_nbuffers(array)
)
}
let view = Self {
encoding,
dtype,
len,
flatbuffer,
flatbuffer_loc,
buffers,
buffers: buffers.into(),
ctx,
};

Expand Down Expand Up @@ -130,26 +122,13 @@ impl ArrayView {
Box::leak(Box::new(OpaqueEncoding(child.encoding())))
});

// Figure out how many buffers to skip...
// We store them depth-first.
let buffer_offset = self
.flatbuffer()
.children()
.ok_or_else(|| vortex_err!("flatbuffer children not found"))?
.iter()
.take(idx)
.map(|child| Self::cumulative_nbuffers(child))
.sum::<usize>()
+ self.has_buffer() as usize;
let buffer_count = Self::cumulative_nbuffers(child);

Ok(Self {
encoding,
dtype: dtype.clone(),
len,
flatbuffer: self.flatbuffer.clone(),
flatbuffer_loc,
buffers: self.buffers[buffer_offset..][0..buffer_count].to_vec(),
buffers: self.buffers.clone(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that we no longer need to slice, b/c each Array fb message contains the buffer_index field, which is a global offset into the buffers list

ctx: self.ctx.clone(),
})
}
Expand All @@ -173,20 +152,13 @@ impl ArrayView {

/// Whether the current Array makes use of a buffer
pub fn has_buffer(&self) -> bool {
self.flatbuffer().has_buffer()
}

/// The number of buffers used by the current Array and all its children.
fn cumulative_nbuffers(array: fb::Array) -> usize {
let mut nbuffers = if array.has_buffer() { 1 } else { 0 };
for child in array.children().unwrap_or_default() {
nbuffers += Self::cumulative_nbuffers(child)
}
nbuffers
self.flatbuffer().buffer_index().is_some()
}

pub fn buffer(&self) -> Option<&Buffer> {
self.has_buffer().then(|| &self.buffers[0])
self.flatbuffer()
.buffer_index()
.map(|idx| &self.buffers[idx as usize])
}

pub fn statistics(&self) -> &dyn Statistics {
Expand Down
2 changes: 1 addition & 1 deletion vortex-flatbuffers/flatbuffers/vortex-array/array.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ enum Version: uint8 {

table Array {
version: Version = V0;
has_buffer: bool;
buffer_index: uint64 = null;
encoding: uint16;
metadata: [ubyte];
stats: ArrayStats;
Expand Down
24 changes: 12 additions & 12 deletions vortex-flatbuffers/src/generated/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl<'a> flatbuffers::Verifiable for Version {

impl flatbuffers::SimpleToVerifyInSlice for Version {}
pub enum ArrayOffset {}
#[derive(Copy, Clone, PartialEq, Eq)]
#[derive(Copy, Clone, PartialEq)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why the generator stripped Eq this time, but I don't think it matters


pub struct Array<'a> {
pub _tab: flatbuffers::Table<'a>,
Expand All @@ -109,7 +109,7 @@ impl<'a> flatbuffers::Follow<'a> for Array<'a> {

impl<'a> Array<'a> {
pub const VT_VERSION: flatbuffers::VOffsetT = 4;
pub const VT_HAS_BUFFER: flatbuffers::VOffsetT = 6;
pub const VT_BUFFER_INDEX: flatbuffers::VOffsetT = 6;
pub const VT_ENCODING: flatbuffers::VOffsetT = 8;
pub const VT_METADATA: flatbuffers::VOffsetT = 10;
pub const VT_STATS: flatbuffers::VOffsetT = 12;
Expand All @@ -125,11 +125,11 @@ impl<'a> Array<'a> {
args: &'args ArrayArgs<'args>
) -> flatbuffers::WIPOffset<Array<'bldr>> {
let mut builder = ArrayBuilder::new(_fbb);
if let Some(x) = args.buffer_index { builder.add_buffer_index(x); }
if let Some(x) = args.children { builder.add_children(x); }
if let Some(x) = args.stats { builder.add_stats(x); }
if let Some(x) = args.metadata { builder.add_metadata(x); }
builder.add_encoding(args.encoding);
builder.add_has_buffer(args.has_buffer);
builder.add_version(args.version);
builder.finish()
}
Expand All @@ -143,11 +143,11 @@ impl<'a> Array<'a> {
unsafe { self._tab.get::<Version>(Array::VT_VERSION, Some(Version::V0)).unwrap()}
}
#[inline]
pub fn has_buffer(&self) -> bool {
pub fn buffer_index(&self) -> Option<u64> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<bool>(Array::VT_HAS_BUFFER, Some(false)).unwrap()}
unsafe { self._tab.get::<u64>(Array::VT_BUFFER_INDEX, None)}
}
#[inline]
pub fn encoding(&self) -> u16 {
Expand Down Expand Up @@ -187,7 +187,7 @@ impl flatbuffers::Verifiable for Array<'_> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<Version>("version", Self::VT_VERSION, false)?
.visit_field::<bool>("has_buffer", Self::VT_HAS_BUFFER, false)?
.visit_field::<u64>("buffer_index", Self::VT_BUFFER_INDEX, false)?
.visit_field::<u16>("encoding", Self::VT_ENCODING, false)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("metadata", Self::VT_METADATA, false)?
.visit_field::<flatbuffers::ForwardsUOffset<ArrayStats>>("stats", Self::VT_STATS, false)?
Expand All @@ -198,7 +198,7 @@ impl flatbuffers::Verifiable for Array<'_> {
}
pub struct ArrayArgs<'a> {
pub version: Version,
pub has_buffer: bool,
pub buffer_index: Option<u64>,
pub encoding: u16,
pub metadata: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
pub stats: Option<flatbuffers::WIPOffset<ArrayStats<'a>>>,
Expand All @@ -209,7 +209,7 @@ impl<'a> Default for ArrayArgs<'a> {
fn default() -> Self {
ArrayArgs {
version: Version::V0,
has_buffer: false,
buffer_index: None,
encoding: 0,
metadata: None,
stats: None,
Expand All @@ -228,8 +228,8 @@ impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> ArrayBuilder<'a, 'b, A> {
self.fbb_.push_slot::<Version>(Array::VT_VERSION, version, Version::V0);
}
#[inline]
pub fn add_has_buffer(&mut self, has_buffer: bool) {
self.fbb_.push_slot::<bool>(Array::VT_HAS_BUFFER, has_buffer, false);
pub fn add_buffer_index(&mut self, buffer_index: u64) {
self.fbb_.push_slot_always::<u64>(Array::VT_BUFFER_INDEX, buffer_index);
}
#[inline]
pub fn add_encoding(&mut self, encoding: u16) {
Expand Down Expand Up @@ -266,7 +266,7 @@ impl core::fmt::Debug for Array<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("Array");
ds.field("version", &self.version());
ds.field("has_buffer", &self.has_buffer());
ds.field("buffer_index", &self.buffer_index());
ds.field("encoding", &self.encoding());
ds.field("metadata", &self.metadata());
ds.field("stats", &self.stats());
Expand All @@ -275,7 +275,7 @@ impl core::fmt::Debug for Array<'_> {
}
}
pub enum ArrayStatsOffset {}
#[derive(Copy, Clone, PartialEq, Eq)]
#[derive(Copy, Clone, PartialEq)]

pub struct ArrayStats<'a> {
pub _tab: flatbuffers::Table<'a>,
Expand Down
10 changes: 5 additions & 5 deletions vortex-flatbuffers/src/generated/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl flatbuffers::SimpleToVerifyInSlice for Type {}
pub struct TypeUnionTableOffset {}

pub enum NullOffset {}
#[derive(Copy, Clone, PartialEq, Eq)]
#[derive(Copy, Clone, PartialEq)]

pub struct Null<'a> {
pub _tab: flatbuffers::Table<'a>,
Expand Down Expand Up @@ -329,7 +329,7 @@ impl core::fmt::Debug for Null<'_> {
}
}
pub enum BoolOffset {}
#[derive(Copy, Clone, PartialEq, Eq)]
#[derive(Copy, Clone, PartialEq)]

pub struct Bool<'a> {
pub _tab: flatbuffers::Table<'a>,
Expand Down Expand Up @@ -540,7 +540,7 @@ impl core::fmt::Debug for Primitive<'_> {
}
}
pub enum DecimalOffset {}
#[derive(Copy, Clone, PartialEq, Eq)]
#[derive(Copy, Clone, PartialEq)]

pub struct Decimal<'a> {
pub _tab: flatbuffers::Table<'a>,
Expand Down Expand Up @@ -673,7 +673,7 @@ impl core::fmt::Debug for Decimal<'_> {
}
}
pub enum Utf8Offset {}
#[derive(Copy, Clone, PartialEq, Eq)]
#[derive(Copy, Clone, PartialEq)]

pub struct Utf8<'a> {
pub _tab: flatbuffers::Table<'a>,
Expand Down Expand Up @@ -770,7 +770,7 @@ impl core::fmt::Debug for Utf8<'_> {
}
}
pub enum BinaryOffset {}
#[derive(Copy, Clone, PartialEq, Eq)]
#[derive(Copy, Clone, PartialEq)]

pub struct Binary<'a> {
pub _tab: flatbuffers::Table<'a>,
Expand Down
2 changes: 1 addition & 1 deletion vortex-flatbuffers/src/generated/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
// @generated

use crate::scalar::*;
use crate::array::*;
use crate::dtype::*;
use crate::array::*;
use core::mem;
use core::cmp::Ordering;

Expand Down
1 change: 0 additions & 1 deletion vortex-serde/src/layouts/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ impl<W: VortexWrite> LayoutWriter<W> {

async fn write_metadata_arrays(&mut self) -> VortexResult<NestedLayout> {
let mut column_layouts = VecDeque::with_capacity(self.column_chunks.len());

for mut chunk in mem::take(&mut self.column_chunks) {
let len = chunk.byte_offsets.len() - 1;
let mut chunks: VecDeque<Layout> = chunk
Expand Down
20 changes: 16 additions & 4 deletions vortex-serde/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub enum IPCMessage<'a> {

pub struct IPCSchema<'a>(pub &'a DType);
pub struct IPCBatch<'a>(pub &'a Array);
pub struct IPCArray<'a>(pub &'a Array);
pub struct IPCArray<'a>(pub &'a Array, usize);
Copy link
Contributor Author

@a10y a10y Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the second argument here is the cumulative buffer index observed before array

pub struct IPCPage<'a>(pub &'a Buffer);

pub struct IPCDType(pub DType);
Expand Down Expand Up @@ -87,13 +87,14 @@ impl<'a> WriteFlatBuffer for IPCBatch<'a> {
fbb: &mut FlatBufferBuilder<'fb>,
) -> WIPOffset<Self::Target<'fb>> {
let array_data = self.0;
let array = Some(IPCArray(array_data).write_flatbuffer(fbb));
let array = Some(IPCArray(array_data, 0).write_flatbuffer(fbb));

let length = array_data.len() as u64;

// Walk the ColumnData depth-first to compute the buffer offsets.
let mut buffers = vec![];
let mut offset = 0;

for array_data in array_data.depth_first_traversal() {
if let Some(buffer) = array_data.buffer() {
let aligned_size = (buffer.len() + (ALIGNMENT - 1)) & !(ALIGNMENT - 1);
Expand Down Expand Up @@ -146,10 +147,21 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> {
),
};

// Assign buffer indices for all child arrays.
// The second tuple element holds the buffer_index for this Array subtree. If this array
// has a buffer, that is its buffer index. If it does not, that buffer index belongs
// to one of the children.
let child_buffer_offset = self.1 + if self.0.buffer().is_some() { 1 } else { 0 };

let children = column_data
.children()
.iter()
.map(|child| IPCArray(child).write_flatbuffer(fbb))
.scan(child_buffer_offset, |buffer_offset, child| {
// Update the number of buffers required.
let msg = IPCArray(child, *buffer_offset).write_flatbuffer(fbb);
*buffer_offset += child.cumulative_nbuffers();
Some(msg)
})
.collect_vec();
let children = Some(fbb.create_vector(&children));

Expand All @@ -159,7 +171,7 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> {
fbb,
&fba::ArrayArgs {
version: Default::default(),
has_buffer: column_data.buffer().is_some(),
buffer_index: self.0.buffer().is_some().then_some(self.1 as u64),
encoding,
metadata,
stats,
Expand Down
Loading