Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Layouts have buffers and there's self describing schema layout #1098

Merged
merged 4 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion vortex-flatbuffers/flatbuffers/vortex-serde/footer.fbs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
struct Buffer {
begin: uint64;
end: uint64;
}

table Layout {
children: [Layout];
encoding: uint16;
buffers: [Buffer];
children: [Layout];
metadata: [ubyte];
}

Expand Down
171 changes: 156 additions & 15 deletions vortex-flatbuffers/src/generated/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,130 @@ use core::cmp::Ordering;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};

// struct Buffer, aligned to 8
#[repr(transparent)]
#[derive(Clone, Copy, PartialEq)]
pub struct Buffer(pub [u8; 16]);
impl Default for Buffer {
fn default() -> Self {
Self([0; 16])
}
}
impl core::fmt::Debug for Buffer {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
f.debug_struct("Buffer")
.field("begin", &self.begin())
.field("end", &self.end())
.finish()
}
}

impl flatbuffers::SimpleToVerifyInSlice for Buffer {}
impl<'a> flatbuffers::Follow<'a> for Buffer {
type Inner = &'a Buffer;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
<&'a Buffer>::follow(buf, loc)
}
}
impl<'a> flatbuffers::Follow<'a> for &'a Buffer {
type Inner = &'a Buffer;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
flatbuffers::follow_cast_ref::<Buffer>(buf, loc)
}
}
impl<'b> flatbuffers::Push for Buffer {
type Output = Buffer;
#[inline]
unsafe fn push(&self, dst: &mut [u8], _written_len: usize) {
let src = ::core::slice::from_raw_parts(self as *const Buffer as *const u8, Self::size());
dst.copy_from_slice(src);
}
}

impl<'a> flatbuffers::Verifiable for Buffer {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.in_buffer::<Self>(pos)
}
}

impl<'a> Buffer {
#[allow(clippy::too_many_arguments)]
pub fn new(
begin: u64,
end: u64,
) -> Self {
let mut s = Self([0; 16]);
s.set_begin(begin);
s.set_end(end);
s
}

pub fn begin(&self) -> u64 {
let mut mem = core::mem::MaybeUninit::<<u64 as EndianScalar>::Scalar>::uninit();
// Safety:
// Created from a valid Table for this object
// Which contains a valid value in this slot
EndianScalar::from_little_endian(unsafe {
core::ptr::copy_nonoverlapping(
self.0[0..].as_ptr(),
mem.as_mut_ptr() as *mut u8,
core::mem::size_of::<<u64 as EndianScalar>::Scalar>(),
);
mem.assume_init()
})
}

pub fn set_begin(&mut self, x: u64) {
let x_le = x.to_little_endian();
// Safety:
// Created from a valid Table for this object
// Which contains a valid value in this slot
unsafe {
core::ptr::copy_nonoverlapping(
&x_le as *const _ as *const u8,
self.0[0..].as_mut_ptr(),
core::mem::size_of::<<u64 as EndianScalar>::Scalar>(),
);
}
}

pub fn end(&self) -> u64 {
let mut mem = core::mem::MaybeUninit::<<u64 as EndianScalar>::Scalar>::uninit();
// Safety:
// Created from a valid Table for this object
// Which contains a valid value in this slot
EndianScalar::from_little_endian(unsafe {
core::ptr::copy_nonoverlapping(
self.0[8..].as_ptr(),
mem.as_mut_ptr() as *mut u8,
core::mem::size_of::<<u64 as EndianScalar>::Scalar>(),
);
mem.assume_init()
})
}

pub fn set_end(&mut self, x: u64) {
let x_le = x.to_little_endian();
// Safety:
// Created from a valid Table for this object
// Which contains a valid value in this slot
unsafe {
core::ptr::copy_nonoverlapping(
&x_le as *const _ as *const u8,
self.0[8..].as_mut_ptr(),
core::mem::size_of::<<u64 as EndianScalar>::Scalar>(),
);
}
}

}

pub enum LayoutOffset {}
#[derive(Copy, Clone, PartialEq)]

Expand All @@ -25,9 +149,10 @@ impl<'a> flatbuffers::Follow<'a> for Layout<'a> {
}

impl<'a> Layout<'a> {
pub const VT_CHILDREN: flatbuffers::VOffsetT = 4;
pub const VT_ENCODING: flatbuffers::VOffsetT = 6;
pub const VT_METADATA: flatbuffers::VOffsetT = 8;
pub const VT_ENCODING: flatbuffers::VOffsetT = 4;
pub const VT_BUFFERS: flatbuffers::VOffsetT = 6;
pub const VT_CHILDREN: flatbuffers::VOffsetT = 8;
pub const VT_METADATA: flatbuffers::VOffsetT = 10;

#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
Expand All @@ -41,24 +166,32 @@ impl<'a> Layout<'a> {
let mut builder = LayoutBuilder::new(_fbb);
if let Some(x) = args.metadata { builder.add_metadata(x); }
if let Some(x) = args.children { builder.add_children(x); }
if let Some(x) = args.buffers { builder.add_buffers(x); }
builder.add_encoding(args.encoding);
builder.finish()
}


#[inline]
pub fn children(&self) -> Option<flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset<Layout<'a>>>> {
pub fn encoding(&self) -> u16 {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset<Layout>>>>(Layout::VT_CHILDREN, None)}
unsafe { self._tab.get::<u16>(Layout::VT_ENCODING, Some(0)).unwrap()}
}
#[inline]
pub fn encoding(&self) -> u16 {
pub fn buffers(&self) -> Option<flatbuffers::Vector<'a, Buffer>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<u16>(Layout::VT_ENCODING, Some(0)).unwrap()}
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, Buffer>>>(Layout::VT_BUFFERS, None)}
}
#[inline]
pub fn children(&self) -> Option<flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset<Layout<'a>>>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset<Layout>>>>(Layout::VT_CHILDREN, None)}
}
#[inline]
pub fn metadata(&self) -> Option<flatbuffers::Vector<'a, u8>> {
Expand All @@ -76,24 +209,27 @@ impl flatbuffers::Verifiable for Layout<'_> {
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, flatbuffers::ForwardsUOffset<Layout>>>>("children", Self::VT_CHILDREN, false)?
.visit_field::<u16>("encoding", Self::VT_ENCODING, false)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, Buffer>>>("buffers", Self::VT_BUFFERS, false)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, flatbuffers::ForwardsUOffset<Layout>>>>("children", Self::VT_CHILDREN, false)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("metadata", Self::VT_METADATA, false)?
.finish();
Ok(())
}
}
pub struct LayoutArgs<'a> {
pub children: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset<Layout<'a>>>>>,
pub encoding: u16,
pub buffers: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, Buffer>>>,
pub children: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset<Layout<'a>>>>>,
pub metadata: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for LayoutArgs<'a> {
#[inline]
fn default() -> Self {
LayoutArgs {
children: None,
encoding: 0,
buffers: None,
children: None,
metadata: None,
}
}
Expand All @@ -104,15 +240,19 @@ pub struct LayoutBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> LayoutBuilder<'a, 'b, A> {
#[inline]
pub fn add_children(&mut self, children: flatbuffers::WIPOffset<flatbuffers::Vector<'b , flatbuffers::ForwardsUOffset<Layout<'b >>>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(Layout::VT_CHILDREN, children);
}
#[inline]
pub fn add_encoding(&mut self, encoding: u16) {
self.fbb_.push_slot::<u16>(Layout::VT_ENCODING, encoding, 0);
}
#[inline]
pub fn add_buffers(&mut self, buffers: flatbuffers::WIPOffset<flatbuffers::Vector<'b , Buffer>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(Layout::VT_BUFFERS, buffers);
}
#[inline]
pub fn add_children(&mut self, children: flatbuffers::WIPOffset<flatbuffers::Vector<'b , flatbuffers::ForwardsUOffset<Layout<'b >>>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(Layout::VT_CHILDREN, children);
}
#[inline]
pub fn add_metadata(&mut self, metadata: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u8>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(Layout::VT_METADATA, metadata);
}
Expand All @@ -134,8 +274,9 @@ impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> LayoutBuilder<'a, 'b, A> {
impl core::fmt::Debug for Layout<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("Layout");
ds.field("children", &self.children());
ds.field("encoding", &self.encoding());
ds.field("buffers", &self.buffers());
ds.field("children", &self.children());
ds.field("metadata", &self.metadata());
ds.finish()
}
Expand Down
1 change: 1 addition & 0 deletions vortex-serde/src/layouts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub const EOF_SIZE: usize = 8;
pub const FLAT_LAYOUT_ID: LayoutId = LayoutId(1);
pub const CHUNKED_LAYOUT_ID: LayoutId = LayoutId(2);
pub const COLUMN_LAYOUT_ID: LayoutId = LayoutId(3);
pub const INLINE_SCHEMA_LAYOUT_ID: LayoutId = LayoutId(4);

pub use read::*;
pub use write::*;
4 changes: 2 additions & 2 deletions vortex-serde/src/layouts/read/layouts/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ impl LayoutSpec for ColumnLayoutSpec {
fb_bytes: Bytes,
fb_loc: usize,
scan: Scan,
layout_serde: LayoutDeserializer,
layout_builder: LayoutDeserializer,
message_cache: RelativeLayoutCache,
) -> Box<dyn LayoutReader> {
Box::new(ColumnLayout::new(
fb_bytes,
fb_loc,
scan,
layout_serde,
layout_builder,
message_cache,
))
}
Expand Down
30 changes: 9 additions & 21 deletions vortex-serde/src/layouts/read/layouts/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use bytes::Bytes;
use vortex::compute::slice;
use vortex::{Array, Context};
use vortex_error::{vortex_err, VortexExpect, VortexResult, VortexUnwrap};
use vortex_error::{vortex_err, VortexResult, VortexUnwrap};
use vortex_flatbuffers::footer;

use crate::layouts::read::cache::RelativeLayoutCache;
Expand Down Expand Up @@ -35,25 +35,14 @@ impl LayoutSpec for FlatLayoutSpec {
let tab = flatbuffers::Table::new(&fb_bytes, fb_loc);
footer::Layout::init_from_table(tab)
};
let flat_meta = fb_layout
.metadata()
.vortex_expect("FlatLayout must have metadata");
let begin = u64::from_le_bytes(
flat_meta.bytes()[0..8]
.try_into()
.map_err(|e| vortex_err!("Not a u64 {e}"))
.vortex_unwrap(),
);
let end = u64::from_le_bytes(
flat_meta.bytes()[8..16]
.try_into()
.map_err(|e| vortex_err!("Not a u64 {e}"))
.vortex_unwrap(),
);
let buf = fb_layout
.buffers()
.ok_or_else(|| vortex_err!("No buffers"))
.vortex_unwrap()
.get(0);
robert3005 marked this conversation as resolved.
Show resolved Hide resolved

Box::new(FlatLayout::new(
begin,
end,
ByteRange::new(buf.begin(), buf.end()),
scan,
layout_serde.ctx(),
message_cache,
Expand All @@ -73,14 +62,13 @@ pub struct FlatLayout {

impl FlatLayout {
pub fn new(
begin: u64,
end: u64,
range: ByteRange,
scan: Scan,
ctx: Arc<Context>,
cache: RelativeLayoutCache,
) -> Self {
Self {
range: ByteRange { begin, end },
range,
scan,
ctx,
cache,
Expand Down
Loading
Loading