Skip to content

Commit

Permalink
refactor: move Buffer enum to private inner struct (#1216)
Browse files Browse the repository at this point in the history
Follow up to #1212
  • Loading branch information
a10y authored Nov 5, 2024
1 parent 8282118 commit dd67f07
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 33 deletions.
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ jobs:
- uses: ./.github/actions/cleanup
- uses: rui314/setup-mold@v1
- uses: ./.github/actions/setup-rust
- uses: spiraldb/actions/.github/actions/[email protected]
# Required to run benchmarks
- name: Install DuckDB
uses: opt-nc/[email protected]
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
dtype_range: layout.dtype.begin..layout.dtype.end,
}
.serialize(&mut s)?;
let footer_bytes = Buffer::Bytes(Bytes::from(s.take_buffer()));
let footer_bytes = Buffer::from(Bytes::from(s.take_buffer()));
let footer_len = footer_bytes.len() as u64;
w.write_all(footer_bytes).await?;
w.write_all(footer_len.to_le_bytes()).await?;
Expand Down
59 changes: 31 additions & 28 deletions vortex-buffer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ mod string;
///
/// Buffers form the building blocks of all in-memory storage in Vortex.
#[derive(Debug, Clone)]
pub enum Buffer {
pub struct Buffer(Inner);

#[derive(Debug, Clone)]
enum Inner {
// TODO(ngates): we could add Aligned(Arc<AVec>) from aligned-vec package
/// A Buffer that wraps an Apache Arrow buffer
Arrow(ArrowBuffer),
Expand All @@ -44,35 +47,35 @@ impl Buffer {

/// Length of the buffer in bytes
pub fn len(&self) -> usize {
match self {
Self::Arrow(b) => b.len(),
Self::Bytes(b) => b.len(),
match &self.0 {
Inner::Arrow(b) => b.len(),
Inner::Bytes(b) => b.len(),
}
}

/// Predicate for empty buffers
pub fn is_empty(&self) -> bool {
match self {
Self::Arrow(b) => b.is_empty(),
Self::Bytes(b) => b.is_empty(),
match &self.0 {
Inner::Arrow(b) => b.is_empty(),
Inner::Bytes(b) => b.is_empty(),
}
}

#[allow(clippy::same_name_method)]
/// Return a new view on the buffer, but limited to the given index range.
pub fn slice(&self, range: Range<usize>) -> Self {
match self {
Self::Arrow(b) => {
Self::Arrow(b.slice_with_length(range.start, range.end - range.start))
}
Self::Bytes(b) => {
match &self.0 {
Inner::Arrow(b) => Buffer(Inner::Arrow(
b.slice_with_length(range.start, range.end - range.start),
)),
Inner::Bytes(b) => {
if range.is_empty() {
// bytes::Bytes::slice does not preserve alignment if the range is empty
let mut empty_b = b.clone();
empty_b.truncate(0);
Self::Bytes(empty_b)
Buffer(Inner::Bytes(empty_b))
} else {
Self::Bytes(b.slice(range))
Buffer(Inner::Bytes(b.slice(range)))
}
}
}
Expand All @@ -81,9 +84,9 @@ impl Buffer {
#[allow(clippy::same_name_method)]
/// Access the buffer as an immutable byte slice.
pub fn as_slice(&self) -> &[u8] {
match self {
Self::Arrow(b) => b.as_ref(),
Self::Bytes(b) => b.as_ref(),
match &self.0 {
Inner::Arrow(b) => b.as_ref(),
Inner::Bytes(b) => b.as_ref(),
}
}

Expand All @@ -98,18 +101,18 @@ impl Buffer {
/// This method will also fail if we attempt to pass a `T` that is not aligned to the `T` that
/// it was originally allocated with.
pub fn into_vec<T: ArrowNativeType>(self) -> Result<Vec<T>, Self> {
match self {
Self::Arrow(buffer) => buffer.into_vec::<T>().map_err(Buffer::Arrow),
match self.0 {
Inner::Arrow(buffer) => buffer.into_vec::<T>().map_err(|b| Buffer(Inner::Arrow(b))),
// Cannot convert bytes into a mutable vec
Self::Bytes(_) => Err(self),
Inner::Bytes(_) => Err(self),
}
}

/// Convert a Buffer into an ArrowBuffer with no copying.
pub fn into_arrow(self) -> ArrowBuffer {
match self {
Buffer::Arrow(a) => a,
Buffer::Bytes(b) => ArrowBuffer::from_vec(Vec::<u8>::from(b)),
match self.0 {
Inner::Arrow(a) => a,
Inner::Bytes(b) => ArrowBuffer::from_vec(Vec::<u8>::from(b)),
}
}
}
Expand All @@ -131,32 +134,32 @@ impl AsRef<[u8]> for Buffer {
impl From<&[u8]> for Buffer {
fn from(value: &[u8]) -> Self {
// We prefer Arrow since it retains mutability
Self::Arrow(ArrowBuffer::from(value))
Buffer(Inner::Arrow(ArrowBuffer::from(value)))
}
}

impl<T: ArrowNativeType> From<Vec<T>> for Buffer {
fn from(value: Vec<T>) -> Self {
// We prefer Arrow since it retains mutability
Self::Arrow(ArrowBuffer::from_vec(value))
Buffer(Inner::Arrow(ArrowBuffer::from_vec(value)))
}
}

impl From<bytes::Bytes> for Buffer {
fn from(value: bytes::Bytes) -> Self {
Self::Bytes(value)
Buffer(Inner::Bytes(value))
}
}

impl From<ArrowBuffer> for Buffer {
fn from(value: ArrowBuffer) -> Self {
Self::Arrow(value)
Buffer(Inner::Arrow(value))
}
}

impl From<ArrowMutableBuffer> for Buffer {
fn from(value: ArrowMutableBuffer) -> Self {
Self::Arrow(ArrowBuffer::from(value))
Buffer(Inner::Arrow(ArrowBuffer::from(value)))
}
}

Expand Down
2 changes: 1 addition & 1 deletion vortex-serde/src/io/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl ObjectStoreExt for Arc<dyn ObjectStore> {
range: Range<usize>,
) -> VortexResult<impl VortexRead> {
let bytes = self.get_range(location, range).await?;
Ok(Cursor::new(Buffer::Bytes(bytes)))
Ok(Cursor::new(Buffer::from(bytes)))
}

fn vortex_reader(&self, location: &Path) -> impl VortexReadAt {
Expand Down
4 changes: 2 additions & 2 deletions vortex-serde/src/message_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ mod test {
let mut writer = MessageWriter::new(write);
block_on(async {
writer
.write_page(Buffer::Bytes(Bytes::from("somevalue")))
.write_page(Buffer::from(Bytes::from("somevalue")))
.await
})
.unwrap();
Expand All @@ -368,6 +368,6 @@ mod test {
let read_page = block_on(async { reader.maybe_read_page().await })
.unwrap()
.unwrap();
assert_eq!(read_page, Buffer::Bytes(Bytes::from("somevalue")));
assert_eq!(read_page, Buffer::from(Bytes::from("somevalue")));
}
}

0 comments on commit dd67f07

Please sign in to comment.