Skip to content

Commit

Permalink
feat: update IPC format to hold buffer_index (#903)
Browse files Browse the repository at this point in the history
Fixes #786 

Updates IPC format, removing the `Array.has_buffer` boolean field and
replacing it with a `buffer_index`. The `buffer_index` for each array is
an optional `uint64` that will hold the index into the `Arc<[Buffer]>`
containing the buffer of the given array, or `null` if the array does
not have a buffer.

Additional changes:

* Moves `children` into an `Arc<[Array]>` from `Vec<Array>` to improve
cloning performance
* Uses `scalar_at_unchecked` in ChunkedArray::chunk to lookup chunk
offsets, avoiding validity check since by construction, chunk offsets
must be non-nullable
  • Loading branch information
a10y authored Sep 24, 2024
1 parent e9f0d4d commit b2d4dce
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 66 deletions.
10 changes: 7 additions & 3 deletions vortex-array/src/array/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use vortex_error::{vortex_bail, vortex_panic, VortexExpect as _, VortexResult};
use vortex_scalar::Scalar;

use crate::array::primitive::PrimitiveArray;
use crate::compute::unary::{scalar_at, subtract_scalar, SubtractScalarFn};
use crate::compute::unary::{scalar_at, scalar_at_unchecked, subtract_scalar, SubtractScalarFn};
use crate::compute::{search_sorted, SearchSortedSide};
use crate::encoding::ids;
use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
Expand Down Expand Up @@ -72,8 +72,12 @@ impl ChunkedArray {

#[inline]
pub fn chunk(&self, idx: usize) -> VortexResult<Array> {
let chunk_start = usize::try_from(&scalar_at(&self.chunk_offsets(), idx)?)?;
let chunk_end = usize::try_from(&scalar_at(&self.chunk_offsets(), idx + 1)?)?;
if idx >= self.nchunks() {
vortex_bail!("chunk index {} > num chunks ({})", idx, self.nchunks());
}

let chunk_start = usize::try_from(&scalar_at_unchecked(&self.chunk_offsets(), idx))?;
let chunk_end = usize::try_from(&scalar_at_unchecked(&self.chunk_offsets(), idx + 1))?;

// Offset the index since chunk_ends is child 0.
self.as_ref()
Expand Down
4 changes: 1 addition & 3 deletions vortex-array/src/array/null/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use serde::{Deserialize, Serialize};
use vortex_dtype::DType;
use vortex_error::{VortexExpect as _, VortexResult};
Expand All @@ -26,7 +24,7 @@ impl NullArray {
DType::Null,
len,
NullMetadata { len },
Arc::new([]),
[].into(),
StatsSet::nulls(len, &DType::Null),
)
.vortex_expect("NullArray::new should never fail!")
Expand Down
9 changes: 9 additions & 0 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ impl Array {
ArrayChildrenIterator::new(self.clone())
}

/// Count the number of cumulative buffers encoded by self.
pub fn cumulative_nbuffers(&self) -> usize {
self.children()
.iter()
.map(|child| child.cumulative_nbuffers())
.sum::<usize>()
+ if self.buffer().is_some() { 1 } else { 0 }
}

/// Return the buffer offsets and the total length of all buffers, assuming the given alignment.
/// This includes all child buffers.
pub fn all_buffer_offsets(&self, alignment: usize) -> Vec<u64> {
Expand Down
44 changes: 8 additions & 36 deletions vortex-array/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use itertools::Itertools;
use log::warn;
use vortex_buffer::Buffer;
use vortex_dtype::{DType, Nullability};
use vortex_error::{vortex_bail, vortex_err, VortexError, VortexExpect as _, VortexResult};
use vortex_error::{vortex_err, VortexError, VortexExpect as _, VortexResult};
use vortex_scalar::{PValue, Scalar, ScalarValue};

use crate::encoding::EncodingRef;
Expand All @@ -22,8 +22,7 @@ pub struct ArrayView {
len: usize,
flatbuffer: Buffer,
flatbuffer_loc: usize,
// TODO(ngates): create an RC'd vector that can be lazily sliced.
buffers: Vec<Buffer>,
buffers: Arc<[Buffer]>,
ctx: Arc<Context>,
// TODO(ngates): a store a Projection. A projected ArrayView contains the full fb::Array
// metadata, but only the buffers from the selected columns. Therefore we need to know
Expand Down Expand Up @@ -60,20 +59,13 @@ impl ArrayView {
.lookup_encoding(array.encoding())
.ok_or_else(|| vortex_err!(InvalidSerde: "Encoding ID out of bounds"))?;

if buffers.len() != Self::cumulative_nbuffers(array) {
vortex_bail!(InvalidSerde:
"Incorrect number of buffers {}, expected {}",
buffers.len(),
Self::cumulative_nbuffers(array)
)
}
let view = Self {
encoding,
dtype,
len,
flatbuffer,
flatbuffer_loc,
buffers,
buffers: buffers.into(),
ctx,
};

Expand Down Expand Up @@ -130,26 +122,13 @@ impl ArrayView {
Box::leak(Box::new(OpaqueEncoding(child.encoding())))
});

// Figure out how many buffers to skip...
// We store them depth-first.
let buffer_offset = self
.flatbuffer()
.children()
.ok_or_else(|| vortex_err!("flatbuffer children not found"))?
.iter()
.take(idx)
.map(|child| Self::cumulative_nbuffers(child))
.sum::<usize>()
+ self.has_buffer() as usize;
let buffer_count = Self::cumulative_nbuffers(child);

Ok(Self {
encoding,
dtype: dtype.clone(),
len,
flatbuffer: self.flatbuffer.clone(),
flatbuffer_loc,
buffers: self.buffers[buffer_offset..][0..buffer_count].to_vec(),
buffers: self.buffers.clone(),
ctx: self.ctx.clone(),
})
}
Expand All @@ -173,20 +152,13 @@ impl ArrayView {

/// Whether the current Array makes use of a buffer
pub fn has_buffer(&self) -> bool {
self.flatbuffer().has_buffer()
}

/// The number of buffers used by the current Array and all its children.
fn cumulative_nbuffers(array: fb::Array) -> usize {
let mut nbuffers = if array.has_buffer() { 1 } else { 0 };
for child in array.children().unwrap_or_default() {
nbuffers += Self::cumulative_nbuffers(child)
}
nbuffers
self.flatbuffer().buffer_index().is_some()
}

pub fn buffer(&self) -> Option<&Buffer> {
self.has_buffer().then(|| &self.buffers[0])
self.flatbuffer()
.buffer_index()
.map(|idx| &self.buffers[idx as usize])
}

pub fn statistics(&self) -> &dyn Statistics {
Expand Down
2 changes: 1 addition & 1 deletion vortex-flatbuffers/flatbuffers/vortex-array/array.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ enum Version: uint8 {

table Array {
version: Version = V0;
has_buffer: bool;
buffer_index: uint64 = null;
encoding: uint16;
metadata: [ubyte];
stats: ArrayStats;
Expand Down
24 changes: 12 additions & 12 deletions vortex-flatbuffers/src/generated/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl<'a> flatbuffers::Verifiable for Version {

impl flatbuffers::SimpleToVerifyInSlice for Version {}
pub enum ArrayOffset {}
#[derive(Copy, Clone, PartialEq, Eq)]
#[derive(Copy, Clone, PartialEq)]

pub struct Array<'a> {
pub _tab: flatbuffers::Table<'a>,
Expand All @@ -109,7 +109,7 @@ impl<'a> flatbuffers::Follow<'a> for Array<'a> {

impl<'a> Array<'a> {
pub const VT_VERSION: flatbuffers::VOffsetT = 4;
pub const VT_HAS_BUFFER: flatbuffers::VOffsetT = 6;
pub const VT_BUFFER_INDEX: flatbuffers::VOffsetT = 6;
pub const VT_ENCODING: flatbuffers::VOffsetT = 8;
pub const VT_METADATA: flatbuffers::VOffsetT = 10;
pub const VT_STATS: flatbuffers::VOffsetT = 12;
Expand All @@ -125,11 +125,11 @@ impl<'a> Array<'a> {
args: &'args ArrayArgs<'args>
) -> flatbuffers::WIPOffset<Array<'bldr>> {
let mut builder = ArrayBuilder::new(_fbb);
if let Some(x) = args.buffer_index { builder.add_buffer_index(x); }
if let Some(x) = args.children { builder.add_children(x); }
if let Some(x) = args.stats { builder.add_stats(x); }
if let Some(x) = args.metadata { builder.add_metadata(x); }
builder.add_encoding(args.encoding);
builder.add_has_buffer(args.has_buffer);
builder.add_version(args.version);
builder.finish()
}
Expand All @@ -143,11 +143,11 @@ impl<'a> Array<'a> {
unsafe { self._tab.get::<Version>(Array::VT_VERSION, Some(Version::V0)).unwrap()}
}
#[inline]
pub fn has_buffer(&self) -> bool {
pub fn buffer_index(&self) -> Option<u64> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<bool>(Array::VT_HAS_BUFFER, Some(false)).unwrap()}
unsafe { self._tab.get::<u64>(Array::VT_BUFFER_INDEX, None)}
}
#[inline]
pub fn encoding(&self) -> u16 {
Expand Down Expand Up @@ -187,7 +187,7 @@ impl flatbuffers::Verifiable for Array<'_> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<Version>("version", Self::VT_VERSION, false)?
.visit_field::<bool>("has_buffer", Self::VT_HAS_BUFFER, false)?
.visit_field::<u64>("buffer_index", Self::VT_BUFFER_INDEX, false)?
.visit_field::<u16>("encoding", Self::VT_ENCODING, false)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("metadata", Self::VT_METADATA, false)?
.visit_field::<flatbuffers::ForwardsUOffset<ArrayStats>>("stats", Self::VT_STATS, false)?
Expand All @@ -198,7 +198,7 @@ impl flatbuffers::Verifiable for Array<'_> {
}
pub struct ArrayArgs<'a> {
pub version: Version,
pub has_buffer: bool,
pub buffer_index: Option<u64>,
pub encoding: u16,
pub metadata: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
pub stats: Option<flatbuffers::WIPOffset<ArrayStats<'a>>>,
Expand All @@ -209,7 +209,7 @@ impl<'a> Default for ArrayArgs<'a> {
fn default() -> Self {
ArrayArgs {
version: Version::V0,
has_buffer: false,
buffer_index: None,
encoding: 0,
metadata: None,
stats: None,
Expand All @@ -228,8 +228,8 @@ impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> ArrayBuilder<'a, 'b, A> {
self.fbb_.push_slot::<Version>(Array::VT_VERSION, version, Version::V0);
}
#[inline]
pub fn add_has_buffer(&mut self, has_buffer: bool) {
self.fbb_.push_slot::<bool>(Array::VT_HAS_BUFFER, has_buffer, false);
pub fn add_buffer_index(&mut self, buffer_index: u64) {
self.fbb_.push_slot_always::<u64>(Array::VT_BUFFER_INDEX, buffer_index);
}
#[inline]
pub fn add_encoding(&mut self, encoding: u16) {
Expand Down Expand Up @@ -266,7 +266,7 @@ impl core::fmt::Debug for Array<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("Array");
ds.field("version", &self.version());
ds.field("has_buffer", &self.has_buffer());
ds.field("buffer_index", &self.buffer_index());
ds.field("encoding", &self.encoding());
ds.field("metadata", &self.metadata());
ds.field("stats", &self.stats());
Expand All @@ -275,7 +275,7 @@ impl core::fmt::Debug for Array<'_> {
}
}
pub enum ArrayStatsOffset {}
#[derive(Copy, Clone, PartialEq, Eq)]
#[derive(Copy, Clone, PartialEq)]

pub struct ArrayStats<'a> {
pub _tab: flatbuffers::Table<'a>,
Expand Down
10 changes: 5 additions & 5 deletions vortex-flatbuffers/src/generated/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl flatbuffers::SimpleToVerifyInSlice for Type {}
pub struct TypeUnionTableOffset {}

pub enum NullOffset {}
#[derive(Copy, Clone, PartialEq, Eq)]
#[derive(Copy, Clone, PartialEq)]

pub struct Null<'a> {
pub _tab: flatbuffers::Table<'a>,
Expand Down Expand Up @@ -329,7 +329,7 @@ impl core::fmt::Debug for Null<'_> {
}
}
pub enum BoolOffset {}
#[derive(Copy, Clone, PartialEq, Eq)]
#[derive(Copy, Clone, PartialEq)]

pub struct Bool<'a> {
pub _tab: flatbuffers::Table<'a>,
Expand Down Expand Up @@ -540,7 +540,7 @@ impl core::fmt::Debug for Primitive<'_> {
}
}
pub enum DecimalOffset {}
#[derive(Copy, Clone, PartialEq, Eq)]
#[derive(Copy, Clone, PartialEq)]

pub struct Decimal<'a> {
pub _tab: flatbuffers::Table<'a>,
Expand Down Expand Up @@ -673,7 +673,7 @@ impl core::fmt::Debug for Decimal<'_> {
}
}
pub enum Utf8Offset {}
#[derive(Copy, Clone, PartialEq, Eq)]
#[derive(Copy, Clone, PartialEq)]

pub struct Utf8<'a> {
pub _tab: flatbuffers::Table<'a>,
Expand Down Expand Up @@ -770,7 +770,7 @@ impl core::fmt::Debug for Utf8<'_> {
}
}
pub enum BinaryOffset {}
#[derive(Copy, Clone, PartialEq, Eq)]
#[derive(Copy, Clone, PartialEq)]

pub struct Binary<'a> {
pub _tab: flatbuffers::Table<'a>,
Expand Down
2 changes: 1 addition & 1 deletion vortex-flatbuffers/src/generated/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
// @generated

use crate::scalar::*;
use crate::array::*;
use crate::dtype::*;
use crate::array::*;
use core::mem;
use core::cmp::Ordering;

Expand Down
1 change: 0 additions & 1 deletion vortex-serde/src/layouts/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ impl<W: VortexWrite> LayoutWriter<W> {

async fn write_metadata_arrays(&mut self) -> VortexResult<NestedLayout> {
let mut column_layouts = VecDeque::with_capacity(self.column_chunks.len());

for mut chunk in mem::take(&mut self.column_chunks) {
let len = chunk.byte_offsets.len() - 1;
let mut chunks: VecDeque<Layout> = chunk
Expand Down
20 changes: 16 additions & 4 deletions vortex-serde/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub enum IPCMessage<'a> {

pub struct IPCSchema<'a>(pub &'a DType);
pub struct IPCBatch<'a>(pub &'a Array);
pub struct IPCArray<'a>(pub &'a Array);
pub struct IPCArray<'a>(pub &'a Array, usize);
pub struct IPCPage<'a>(pub &'a Buffer);

pub struct IPCDType(pub DType);
Expand Down Expand Up @@ -87,13 +87,14 @@ impl<'a> WriteFlatBuffer for IPCBatch<'a> {
fbb: &mut FlatBufferBuilder<'fb>,
) -> WIPOffset<Self::Target<'fb>> {
let array_data = self.0;
let array = Some(IPCArray(array_data).write_flatbuffer(fbb));
let array = Some(IPCArray(array_data, 0).write_flatbuffer(fbb));

let length = array_data.len() as u64;

// Walk the ColumnData depth-first to compute the buffer offsets.
let mut buffers = vec![];
let mut offset = 0;

for array_data in array_data.depth_first_traversal() {
if let Some(buffer) = array_data.buffer() {
let aligned_size = (buffer.len() + (ALIGNMENT - 1)) & !(ALIGNMENT - 1);
Expand Down Expand Up @@ -146,10 +147,21 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> {
),
};

// Assign buffer indices for all child arrays.
// The second tuple element holds the buffer_index for this Array subtree. If this array
// has a buffer, that is its buffer index. If it does not, that buffer index belongs
// to one of the children.
let child_buffer_offset = self.1 + if self.0.buffer().is_some() { 1 } else { 0 };

let children = column_data
.children()
.iter()
.map(|child| IPCArray(child).write_flatbuffer(fbb))
.scan(child_buffer_offset, |buffer_offset, child| {
// Update the number of buffers required.
let msg = IPCArray(child, *buffer_offset).write_flatbuffer(fbb);
*buffer_offset += child.cumulative_nbuffers();
Some(msg)
})
.collect_vec();
let children = Some(fbb.create_vector(&children));

Expand All @@ -159,7 +171,7 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> {
fbb,
&fba::ArrayArgs {
version: Default::default(),
has_buffer: column_data.buffer().is_some(),
buffer_index: self.0.buffer().is_some().then_some(self.1 as u64),
encoding,
metadata,
stats,
Expand Down

0 comments on commit b2d4dce

Please sign in to comment.