Skip to content

Commit

Permalink
chore: port random access benchmark to layouts (#1246)
Browse files Browse the repository at this point in the history
develop was a643065
layouts was this PR: 6dc6c53
```
# critcmp develop layouts
group                                     develop                                layouts
-----                                     -------                                -------
random-access/parquet-tokio-local-disk    1.01     94.0±0.44ms        ? ?/sec    1.00     93.3±0.62ms        ? ?/sec
random-access/vortex-local-fs             1.00   381.3±10.78µs        ? ?/sec    34.91    13.3±0.13ms        ? ?/sec
random-access/vortex-tokio-local-disk     1.00   349.5±12.64µs        ? ?/sec    29.46    10.3±0.14ms        ? ?/sec
```
  • Loading branch information
danking authored Nov 7, 2024
1 parent a643065 commit 520ffc4
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 143 deletions.
4 changes: 2 additions & 2 deletions bench-vortex/benches/compress_noci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use vortex::array::{ChunkedArray, StructArray};
use vortex::dtype::field::Field;
use vortex::error::VortexResult;
use vortex::sampling_compressor::compressors::fsst::FSSTCompressor;
use vortex::sampling_compressor::{SamplingCompressor, ALL_COMPRESSORS_CONTEXT};
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
use vortex::serde::layouts::{
LayoutBatchStreamBuilder, LayoutContext, LayoutDeserializer, LayoutWriter,
};
Expand Down Expand Up @@ -128,7 +128,7 @@ fn vortex_decompress_read(runtime: &Runtime, buf: Arc<Vec<u8>>) -> VortexResult<
let builder: LayoutBatchStreamBuilder<_> = LayoutBatchStreamBuilder::new(
buf,
LayoutDeserializer::new(
ALL_COMPRESSORS_CONTEXT.clone(),
ALL_ENCODINGS_CONTEXT.clone(),
LayoutContext::default().into(),
),
);
Expand Down
8 changes: 4 additions & 4 deletions bench-vortex/benches/random_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ fn random_access_vortex(c: &mut Criterion) {
.iter(|| async { black_box(take_vortex_tokio(&taxi_vortex, &INDICES).await.unwrap()) })
});

let local_fs = Arc::new(LocalFileSystem::new()) as Arc<dyn ObjectStore>;
let local_fs_path = object_store::path::Path::from_filesystem_path(&taxi_vortex).unwrap();
group.bench_function("vortex-local-fs", |b| {
let local_fs = Arc::new(LocalFileSystem::new()) as Arc<dyn ObjectStore>;
let local_fs_path = object_store::path::Path::from_filesystem_path(&taxi_vortex).unwrap();
b.to_async(Runtime::new().unwrap()).iter(|| async {
black_box(
take_vortex_object_store(&local_fs, &local_fs_path, &INDICES)
take_vortex_object_store(local_fs.clone(), local_fs_path.clone(), &INDICES)
.await
.unwrap(),
)
Expand All @@ -65,7 +65,7 @@ fn random_access_vortex(c: &mut Criterion) {

b.to_async(Runtime::new().unwrap()).iter(|| async {
black_box(
take_vortex_object_store(&r2_fs, &r2_path, &INDICES)
take_vortex_object_store(r2_fs.clone(), r2_path.clone(), &INDICES)
.await
.unwrap(),
)
Expand Down
37 changes: 3 additions & 34 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,11 @@ use itertools::Itertools;
use log::LevelFilter;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use simplelog::{ColorChoice, Config, TermLogger, TerminalMode};
use vortex::aliases::hash_set::HashSet;
use vortex::array::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::compress::CompressionStrategy;
use vortex::dtype::DType;
use vortex::fastlanes::DeltaEncoding;
use vortex::sampling_compressor::compressors::alp::ALPCompressor;
use vortex::sampling_compressor::compressors::alp_rd::ALPRDCompressor;
use vortex::sampling_compressor::compressors::bitpacked::BITPACK_WITH_PATCHES;
use vortex::sampling_compressor::compressors::date_time_parts::DateTimePartsCompressor;
use vortex::sampling_compressor::compressors::dict::DictCompressor;
use vortex::sampling_compressor::compressors::fsst::FSSTCompressor;
use vortex::sampling_compressor::compressors::r#for::FoRCompressor;
use vortex::sampling_compressor::compressors::roaring_bool::RoaringBoolCompressor;
use vortex::sampling_compressor::compressors::runend::DEFAULT_RUN_END_COMPRESSOR;
use vortex::sampling_compressor::compressors::sparse::SparseCompressor;
use vortex::sampling_compressor::compressors::CompressorRef;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::{Array, Context, IntoArray};

Expand All @@ -51,22 +39,6 @@ pub static CTX: LazyLock<Arc<Context>> = LazyLock::new(|| {
)
});

pub static COMPRESSORS: LazyLock<HashSet<CompressorRef<'static>>> = LazyLock::new(|| {
[
&ALPCompressor as CompressorRef<'static>,
&ALPRDCompressor,
&DictCompressor,
&BITPACK_WITH_PATCHES,
&FoRCompressor,
&FSSTCompressor,
&DateTimePartsCompressor,
&DEFAULT_RUN_END_COMPRESSOR,
&RoaringBoolCompressor,
&SparseCompressor,
]
.into()
});

/// Creates a file if it doesn't already exist.
/// NB: Does NOT modify the given path to ensure that it resides in the data directory.
pub fn idempotent<T, E, P: IdempotentPath + ?Sized>(
Expand Down Expand Up @@ -172,10 +144,7 @@ pub fn fetch_taxi_data() -> Array {
}

pub fn compress_taxi_data() -> Array {
let uncompressed = fetch_taxi_data();
let compressor: &dyn CompressionStrategy = &SamplingCompressor::new(COMPRESSORS.clone());

compressor.compress(&uncompressed).unwrap()
CompressionStrategy::compress(&SamplingCompressor::default(), &fetch_taxi_data()).unwrap()
}

pub struct CompressionRunStats {
Expand Down Expand Up @@ -235,7 +204,7 @@ mod test {
use vortex::{Array, IntoCanonical};

use crate::taxi_data::taxi_data_parquet;
use crate::{compress_taxi_data, setup_logger, COMPRESSORS};
use crate::{compress_taxi_data, setup_logger};

#[ignore]
#[test]
Expand Down Expand Up @@ -268,7 +237,7 @@ mod test {
let file = File::open(taxi_data_parquet()).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let reader = builder.with_limit(1).build().unwrap();
let compressor: &dyn CompressionStrategy = &SamplingCompressor::new(COMPRESSORS.clone());
let compressor: &dyn CompressionStrategy = &SamplingCompressor::default();

for record_batch in reader.map(|batch_result| batch_result.unwrap()) {
let struct_arrow: ArrowStructArray = record_batch.into();
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/src/public_bi_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::fs::File;
use vortex::aliases::hash_map::HashMap;
use vortex::array::ChunkedArray;
use vortex::error::VortexResult;
use vortex::{Array, ArrayDType, ArrayTrait, IntoArray};
use vortex::{Array, ArrayDType, IntoArray};

use crate::data_downloads::{decompress_bz2, download_data, BenchmarkDataset, FileType};
use crate::public_bi_data::PBIDataset::*;
Expand Down
145 changes: 47 additions & 98 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use arrow_array::{
};
use arrow_select::concat::concat_batches;
use arrow_select::take::take_record_batch;
use bytes::{Bytes, BytesMut};
use futures::stream;
use itertools::Itertools;
use log::info;
Expand All @@ -23,22 +22,17 @@ use parquet::file::metadata::RowGroupMetaData;
use serde::{Deserialize, Serialize};
use stream::StreamExt;
use vortex::aliases::hash_map::HashMap;
use vortex::array::{ChunkedArray, PrimitiveArray};
use vortex::array::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::buffer::Buffer;
use vortex::compress::CompressionStrategy;
use vortex::dtype::DType;
use vortex::error::{vortex_err, VortexResult};
use vortex::sampling_compressor::SamplingCompressor;
use vortex::serde::chunked_reader::ChunkedArrayReader;
use vortex::serde::io::{ObjectStoreExt, VortexReadAt, VortexWrite};
use vortex::serde::stream_reader::StreamArrayReader;
use vortex::serde::stream_writer::StreamArrayWriter;
use vortex::serde::DTypeReader;
use vortex::stream::ArrayStreamExt;
use vortex::{Array, ArrayDType, IntoArray, IntoCanonical};

use crate::{COMPRESSORS, CTX};
use vortex::error::VortexResult;
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
use vortex::serde::io::{ObjectStoreReadAt, VortexReadAt, VortexWrite};
use vortex::serde::layouts::{
LayoutBatchStreamBuilder, LayoutContext, LayoutDeserializer, LayoutWriter,
};
use vortex::{Array, IntoArray, IntoCanonical};

pub const BATCH_SIZE: usize = 65_536;

Expand All @@ -51,15 +45,18 @@ pub struct VortexFooter {

pub async fn open_vortex(path: &Path) -> VortexResult<Array> {
let file = tokio::fs::File::open(path).await.unwrap();
let reader = StreamArrayReader::try_new(file, CTX.clone())
.await?
.load_dtype()
.await?;
reader
.into_array_stream()
.collect_chunked()
.await
.map(IntoArray::into_array)

LayoutBatchStreamBuilder::new(
file,
LayoutDeserializer::new(
ALL_ENCODINGS_CONTEXT.clone(),
LayoutContext::default().into(),
),
)
.build()
.await?
.read_all()
.await
}

pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
Expand All @@ -68,24 +65,11 @@ pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
) -> VortexResult<()> {
let chunked = compress_parquet_to_vortex(parquet_path.as_path())?;

let written = StreamArrayWriter::new(write)
.write_array_stream(chunked.array_stream())
LayoutWriter::new(write)
.write_array_columns(chunked)
.await?
.finalize()
.await?;

let layout = written.array_layouts()[0].clone();
let mut w = written.into_inner();
let mut s = flexbuffers::FlexbufferSerializer::new();
VortexFooter {
byte_offsets: layout.chunks.byte_offsets,
row_offsets: layout.chunks.row_offsets,
dtype_range: layout.dtype.begin..layout.dtype.end,
}
.serialize(&mut s)?;
let footer_bytes = Buffer::from(Bytes::from(s.take_buffer()));
let footer_len = footer_bytes.len() as u64;
w.write_all(footer_bytes).await?;
w.write_all(footer_len.to_le_bytes()).await?;

Ok(())
}

Expand All @@ -102,17 +86,9 @@ pub fn read_parquet_to_vortex<P: AsRef<Path>>(parquet_path: P) -> VortexResult<C
ChunkedArray::try_new(chunks, dtype)
}

pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult<ChunkedArray> {
pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult<Array> {
let chunked = read_parquet_to_vortex(parquet_path)?;
let compressor: &dyn CompressionStrategy = &SamplingCompressor::new(COMPRESSORS.clone());
let dtype = chunked.dtype().clone();
ChunkedArray::try_new(
chunked
.chunks()
.map(|x| compressor.compress(&x))
.collect::<VortexResult<Vec<_>>>()?,
dtype,
)
CompressionStrategy::compress(&SamplingCompressor::default(), &chunked.into_array())
}

pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResult<()> {
Expand All @@ -134,64 +110,37 @@ pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResu
Ok(())
}

pub async fn read_vortex_footer_format<R: VortexReadAt>(
reader: R,
len: u64,
) -> VortexResult<ChunkedArrayReader<R>> {
let mut buf = BytesMut::with_capacity(8);
unsafe { buf.set_len(8) }
buf = reader.read_at_into(len - 8, buf).await?;
let footer_len = u64::from_le_bytes(buf.as_ref().try_into().unwrap()) as usize;

buf.reserve(footer_len - buf.len());
unsafe { buf.set_len(footer_len) }
buf = reader
.read_at_into(len - footer_len as u64 - 8, buf)
.await?;

let footer: VortexFooter = VortexFooter::deserialize(
flexbuffers::Reader::get_root(buf.as_ref()).map_err(|e| vortex_err!("{}", e))?,
)?;

let header_len = (footer.dtype_range.end - footer.dtype_range.start) as usize;
buf.reserve(header_len - buf.len());
unsafe { buf.set_len(header_len) }
buf = reader.read_at_into(footer.dtype_range.start, buf).await?;
let dtype = DTypeReader::new(buf).await?.read_dtype().await?;

ChunkedArrayReader::try_new(
async fn take_vortex<T: VortexReadAt + Unpin + 'static>(
reader: T,
indices: &[u64],
) -> VortexResult<Array> {
LayoutBatchStreamBuilder::new(
reader,
CTX.clone(),
dtype.into(),
PrimitiveArray::from(footer.byte_offsets).into_array(),
PrimitiveArray::from(footer.row_offsets).into_array(),
LayoutDeserializer::new(
ALL_ENCODINGS_CONTEXT.clone(),
LayoutContext::default().into(),
),
)
.with_indices(Array::from(indices.to_vec()))
.build()
.await?
.read_all()
.await
// For equivalence.... we decompress to make sure we're not cheating too much.
.and_then(IntoCanonical::into_canonical)
.map(Array::from)
}

pub async fn take_vortex_object_store(
fs: &Arc<dyn ObjectStore>,
path: &object_store::path::Path,
fs: Arc<dyn ObjectStore>,
path: object_store::path::Path,
indices: &[u64],
) -> VortexResult<Array> {
let head = fs.head(path).await?;
let indices_array = indices.to_vec().into_array();
let taken = read_vortex_footer_format(fs.vortex_reader(path), head.size as u64)
.await?
.take_rows(&indices_array)
.await?;
// For equivalence.... we flatten to make sure we're not cheating too much.
Ok(taken.into_canonical()?.into())
take_vortex(ObjectStoreReadAt::new(fs.clone(), path), indices).await
}

pub async fn take_vortex_tokio(path: &Path, indices: &[u64]) -> VortexResult<Array> {
let len = File::open(path)?.metadata()?.len();
let indices_array = indices.to_vec().into_array();
let taken = read_vortex_footer_format(tokio::fs::File::open(path).await?, len)
.await?
.take_rows(&indices_array)
.await?;
// For equivalence.... we flatten to make sure we're not cheating too much.
Ok(taken.into_canonical()?.into())
take_vortex(tokio::fs::File::open(path).await?, indices).await
}

pub async fn take_parquet_object_store(
Expand Down
6 changes: 3 additions & 3 deletions pyvortex/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use vortex::arrow::infer_schema;
use vortex::dtype::field::Field;
use vortex::dtype::DType;
use vortex::error::VortexResult;
use vortex::sampling_compressor::ALL_COMPRESSORS_CONTEXT;
use vortex::sampling_compressor::ALL_ENCODINGS_CONTEXT;
use vortex::serde::io::{ObjectStoreReadAt, VortexReadAt};
use vortex::serde::layouts::{
LayoutBatchStream, LayoutBatchStreamBuilder, LayoutContext, LayoutDescriptorReader,
Expand All @@ -33,7 +33,7 @@ pub async fn layout_stream_from_reader<T: VortexReadAt + Unpin>(
let mut builder = LayoutBatchStreamBuilder::new(
reader,
LayoutDeserializer::new(
ALL_COMPRESSORS_CONTEXT.clone(),
ALL_ENCODINGS_CONTEXT.clone(),
LayoutContext::default().into(),
),
)
Expand Down Expand Up @@ -64,7 +64,7 @@ pub async fn read_array_from_reader<T: VortexReadAt + Unpin + 'static>(

pub async fn read_dtype_from_reader<T: VortexReadAt + Unpin>(reader: T) -> VortexResult<DType> {
LayoutDescriptorReader::new(LayoutDeserializer::new(
ALL_COMPRESSORS_CONTEXT.clone(),
ALL_ENCODINGS_CONTEXT.clone(),
LayoutContext::default().into(),
))
.read_footer(&reader, reader.size().await)
Expand Down
2 changes: 1 addition & 1 deletion vortex-sampling-compressor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub static FASTEST_COMPRESSORS: LazyLock<[CompressorRef<'static>; 7]> = LazyLock
]
});

pub static ALL_COMPRESSORS_CONTEXT: LazyLock<Arc<Context>> = LazyLock::new(|| {
pub static ALL_ENCODINGS_CONTEXT: LazyLock<Arc<Context>> = LazyLock::new(|| {
Arc::new(Context::default().with_encodings([
&ALPEncoding as EncodingRef,
&ByteBoolEncoding,
Expand Down

0 comments on commit 520ffc4

Please sign in to comment.