Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IPC Message clean up #1686

Merged
merged 6 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions vortex-array/src/data/viewed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use enum_iterator::all;
use itertools::Itertools;
use vortex_buffer::Buffer;
use vortex_dtype::{DType, Nullability, PType};
use vortex_error::{vortex_err, VortexExpect as _, VortexResult, VortexUnwrap};
use vortex_error::{vortex_err, VortexExpect as _, VortexResult};
use vortex_scalar::{Scalar, ScalarValue};

use crate::encoding::opaque::OpaqueEncoding;
Expand Down Expand Up @@ -105,8 +105,12 @@ impl ViewedArrayData {

pub fn buffer(&self) -> Option<&Buffer> {
self.flatbuffer()
.buffer_index()
.map(|idx| &self.buffers[usize::try_from(idx).vortex_unwrap()])
.buffers()
.and_then(|buffers| {
assert!(buffers.len() <= 1, "Array: expected at most one buffer");
(!buffers.is_empty()).then(|| buffers.get(0) as usize)
})
.map(|idx| &self.buffers[idx])
}
}

Expand Down
4 changes: 2 additions & 2 deletions vortex-dtype/src/serde/flatbuffers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,13 @@ mod test {
use std::sync::Arc;

use flatbuffers::root;
use vortex_flatbuffers::FlatBufferToBytes;
use vortex_flatbuffers::WriteFlatBufferExt;

use crate::nullability::Nullability;
use crate::{flatbuffers as fb, DType, PType, StructDType};

fn roundtrip_dtype(dtype: DType) {
let bytes = dtype.with_flatbuffer_bytes(|bytes| bytes.to_vec());
let bytes = dtype.write_flatbuffer_bytes();
let deserialized = DType::try_from(root::<fb::DType>(&bytes).unwrap()).unwrap();
assert_eq!(dtype, deserialized);
}
Expand Down
4 changes: 2 additions & 2 deletions vortex-file/src/read/builder/initial_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use core::ops::Range;
use bytes::Bytes;
use flatbuffers::{root, root_unchecked};
use vortex_error::{vortex_bail, vortex_err, VortexResult, VortexUnwrap};
use vortex_flatbuffers::{footer, message};
use vortex_flatbuffers::{dtype as fbd, footer};
use vortex_io::VortexReadAt;

use crate::{LazyDType, EOF_SIZE, INITIAL_READ_SIZE, MAGIC_BYTES, VERSION};
Expand Down Expand Up @@ -136,7 +136,7 @@ pub async fn read_initial_bytes<R: VortexReadAt>(
// validate the schema and layout
let schema_loc = (schema_offset - initial_read_offset) as usize;
let layout_loc = (layout_offset - initial_read_offset) as usize;
root::<message::Schema>(&buf[schema_loc..layout_loc])?;
root::<fbd::DType>(&buf[schema_loc..layout_loc])?;
root::<footer::Layout>(&buf[layout_loc..ps_loc])?;

Ok(InitialRead {
Expand Down
19 changes: 7 additions & 12 deletions vortex-file/src/read/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use vortex_dtype::field::Field;
use vortex_dtype::flatbuffers::{extract_field, project_and_deserialize, resolve_field};
use vortex_dtype::{DType, FieldNames};
use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexResult};
use vortex_flatbuffers::dtype::Struct_;
use vortex_flatbuffers::message;
use vortex_flatbuffers::dtype as fbd;

use crate::read::projection::Projection;
use crate::read::{LayoutPartId, MessageId};
Expand Down Expand Up @@ -238,10 +237,7 @@ fn field_names(bytes: &[u8], dtype_field: &SerializedDTypeField) -> VortexResult
}

fn project_dtype_bytes(bytes: &[u8], dtype_field: &SerializedDTypeField) -> VortexResult<DType> {
let fb_dtype = fb_schema(bytes)
.dtype()
.ok_or_else(|| vortex_err!(InvalidSerde: "Schema missing DType"))?;

let fb_dtype = fb_dtype(bytes);
match dtype_field {
SerializedDTypeField::Projection(projection) => match projection {
Projection::All => DType::try_from(fb_dtype),
Expand All @@ -251,15 +247,14 @@ fn project_dtype_bytes(bytes: &[u8], dtype_field: &SerializedDTypeField) -> Vort
}
}

fn fb_struct(bytes: &[u8]) -> VortexResult<Struct_> {
fb_schema(bytes)
.dtype()
.and_then(|d| d.type__as_struct_())
fn fb_struct(bytes: &[u8]) -> VortexResult<fbd::Struct_> {
fb_dtype(bytes)
.type__as_struct_()
.ok_or_else(|| vortex_err!("The top-level type should be a struct"))
}

fn fb_schema(bytes: &[u8]) -> message::Schema {
unsafe { root_unchecked::<message::Schema>(bytes) }
fn fb_dtype(bytes: &[u8]) -> fbd::DType {
unsafe { root_unchecked::<fbd::DType>(bytes) }
}

#[derive(Debug, Clone)]
Expand Down
2 changes: 1 addition & 1 deletion vortex-file/src/read/layouts/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ mod tests {
while let Some(chunk) = chunk_stream.try_next().await.unwrap() {
row_offset += chunk.len() as u64;
row_offsets.push(row_offset);
writer.write_batch(chunk).await.unwrap();
writer.write_array(chunk).await.unwrap();
byte_offsets.push(writer.tell());
}
let flat_layouts = byte_offsets
Expand Down
2 changes: 1 addition & 1 deletion vortex-file/src/read/layouts/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ mod tests {
let mut writer = MessageWriter::new(Vec::new());
let array = PrimitiveArray::from((0..100).collect::<Vec<_>>()).into_array();
let len = array.len();
writer.write_batch(array).await.unwrap();
writer.write_array(array).await.unwrap();
let written = writer.into_inner();

let projection_scan = Scan::empty();
Expand Down
4 changes: 3 additions & 1 deletion vortex-file/src/write/layout.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use bytes::Bytes;
use flatbuffers::{FlatBufferBuilder, WIPOffset};
use vortex_flatbuffers::{footer as fb, WriteFlatBuffer};
use vortex_flatbuffers::{footer as fb, FlatBufferRoot, WriteFlatBuffer};
use vortex_ipc::stream_writer::ByteRange;

use crate::{LayoutId, CHUNKED_LAYOUT_ID, COLUMNAR_LAYOUT_ID, FLAT_LAYOUT_ID};
Expand Down Expand Up @@ -49,6 +49,8 @@ impl LayoutSpec {
}
}

impl FlatBufferRoot for LayoutSpec {}

impl WriteFlatBuffer for LayoutSpec {
type Target<'a> = fb::Layout<'a>;

Expand Down
4 changes: 3 additions & 1 deletion vortex-file/src/write/postscript.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use flatbuffers::{FlatBufferBuilder, WIPOffset};
use vortex_error::{vortex_bail, VortexResult};
use vortex_flatbuffers::{footer as fb, WriteFlatBuffer};
use vortex_flatbuffers::{footer as fb, FlatBufferRoot, WriteFlatBuffer};

#[derive(Debug)]
pub struct Postscript {
Expand All @@ -24,6 +24,8 @@ impl Postscript {
}
}

impl FlatBufferRoot for Postscript {}

impl WriteFlatBuffer for Postscript {
type Target<'a> = fb::Postscript<'a>;

Expand Down
34 changes: 12 additions & 22 deletions vortex-file/src/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,17 @@
use std::{io, iter, mem};

use bytes::Bytes;
use flatbuffers::FlatBufferBuilder;
use futures::TryStreamExt;
use itertools::Itertools;
use vortex_array::array::{ChunkedArray, StructArray};
use vortex_array::stats::{as_stat_bitset_bytes, ArrayStatistics, Stat};
use vortex_array::stream::ArrayStream;
use vortex_array::{ArrayData, ArrayLen};
use vortex_buffer::io_buf::IoBuf;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, vortex_err, VortexExpect as _, VortexResult};
use vortex_flatbuffers::WriteFlatBuffer;
use vortex_flatbuffers::{FlatBufferRoot, WriteFlatBuffer, WriteFlatBufferExt};
use vortex_io::VortexWrite;
use vortex_ipc::messages::writer::MessageWriter;
use vortex_ipc::messages::IPCSchema;
use vortex_ipc::stream_writer::ByteRange;

use crate::write::postscript::Postscript;
Expand Down Expand Up @@ -138,7 +135,7 @@ impl<W: VortexWrite> VortexFileWriter<W> {

pub async fn finalize(mut self) -> VortexResult<W> {
let top_level_layout = self.write_metadata_arrays().await?;
let schema_offset = self.msgs.tell();
let dtype_offset = self.msgs.tell();

// we want to write raw flatbuffers from here on out, not messages
let mut writer = self.msgs.into_inner();
Expand All @@ -152,15 +149,14 @@ impl<W: VortexWrite> VortexFileWriter<W> {
// we write an IPCSchema instead of a DType, which allows us to evolve / add to the schema later
// these bytes get deserialized as message::Schema
// NB: we don't wrap the IPCSchema in an IPCMessage, because we record the lengths/offsets in the footer
let schema = IPCSchema(&dtype);
let schema_len = write_fb_raw(&mut writer, schema).await?;
schema_offset + schema_len
let dtype_len = write_fb_raw(&mut writer, dtype).await?;
dtype_offset + dtype_len
};

// write the layout
write_fb_raw(&mut writer, top_level_layout).await?;

let footer = Postscript::try_new(schema_offset, layout_offset)?;
let footer = Postscript::try_new(dtype_offset, layout_offset)?;
let footer_len = write_fb_raw(&mut writer, footer).await?;
if footer_len > MAX_FOOTER_SIZE as u64 {
vortex_bail!(
Expand All @@ -182,20 +178,14 @@ impl<W: VortexWrite> VortexFileWriter<W> {
}

/// Write a flatbuffer to a writer and return the number of bytes written.
async fn write_fb_raw<W: VortexWrite, F: WriteFlatBuffer>(
async fn write_fb_raw<W: VortexWrite, F: WriteFlatBuffer + FlatBufferRoot>(
writer: &mut W,
fb: F,
) -> io::Result<u64> {
let mut fbb = FlatBufferBuilder::new();
let ps_fb = fb.write_flatbuffer(&mut fbb);
fbb.finish_minimal(ps_fb);

let (buffer, buffer_begin) = fbb.collapse();
let buffer_end = buffer.len();

let bytes = buffer.slice_owned(buffer_begin..buffer_end);
writer.write_all(bytes).await?;
Ok((buffer_end - buffer_begin) as u64)
let buffer = fb.write_flatbuffer_bytes();
let buffer_len = buffer.len();
writer.write_all(buffer).await?;
Ok(buffer_len as u64)
}

struct ColumnWriter {
Expand Down Expand Up @@ -240,7 +230,7 @@ impl ColumnWriter {
// clear the stats that we don't want to serialize into the file
retain_only_stats(&chunk, STATS_TO_WRITE);

msgs.write_batch(chunk).await?;
msgs.write_array(chunk).await?;
offsets.push(msgs.tell());
row_offsets.push(rows_written);
}
Expand Down Expand Up @@ -280,7 +270,7 @@ impl ColumnWriter {
let stat_bitset = as_stat_bitset_bytes(&present_stats);

let metadata_array_begin = msgs.tell();
msgs.write_batch(metadata_array).await?;
msgs.write_array(metadata_array).await?;
let metadata_array_end = msgs.tell();

let layouts = iter::once(LayoutSpec::flat(
Expand Down
1 change: 1 addition & 0 deletions vortex-flatbuffers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ file = ["ipc"]

[dependencies]
flatbuffers = { workspace = true }
vortex-buffer = { workspace = true }

[lints]
workspace = true
24 changes: 18 additions & 6 deletions vortex-flatbuffers/flatbuffers/vortex-array/array.fbs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
include "vortex-scalar/scalar.fbs";

enum Version: uint8 {
V0 = 0,
/// An ArrayData describes the hierarchy of an array as well as the locations of the data buffers that appear
/// immediately after the message in the byte stream.
table ArrayData {
/// The array's hierarchical definition.
array: Array;
/// The row count of the array.
row_count: uint64;
/// The locations of the data buffers of the array, in ascending order of offset.
buffers: [Buffer];
}

table Array {
version: Version = V0;
buffer_index: uint64 = null;
encoding: uint16;
metadata: [ubyte];
stats: ArrayStats;
children: [Array];
buffers: [uint16];
stats: ArrayStats;
}

table ArrayStats {
Expand All @@ -27,5 +33,11 @@ table ArrayStats {
uncompressed_size_in_bytes: uint64 = null;
}

table Buffer {
/// The length of the buffer in bytes.
length: uint64;
/// The length of any padding bytes written immediately following the buffer.
padding: uint16;
}

root_type Array;
root_type ArrayData;
28 changes: 3 additions & 25 deletions vortex-flatbuffers/flatbuffers/vortex-serde/message.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,10 @@ enum Compression: uint8 {
None = 0,
}

table Schema {
AdamGS marked this conversation as resolved.
Show resolved Hide resolved
dtype: DType;
}

struct Buffer {
offset: uint64;
padding: uint16;
compression: Compression;
}

table Batch {
array: Array;
length: uint64;
buffers: [Buffer];
buffer_size: uint64;
}

table Page {
buffer_size: uint32;
padding: uint16;
}

union MessageHeader {
Schema,
Batch,
Page,
ArrayData,
Buffer,
DType,
}

table Message {
Expand Down
Loading
Loading