From 520ffc4569f3f9070eeea7e1031d8ccd72adf668 Mon Sep 17 00:00:00 2001 From: Dan King Date: Thu, 7 Nov 2024 16:07:06 -0500 Subject: [PATCH] chore: port random access benchmark to layouts (#1246) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit develop was a643065102f24e1feeccb324ceeb0245a7405be6 layouts was this PR: 6dc6c53a477a67231a3bc425459c84f10dd8f0d1 ``` # critcmp develop layouts group develop layouts ----- ------- ------- random-access/parquet-tokio-local-disk 1.01 94.0±0.44ms ? ?/sec 1.00 93.3±0.62ms ? ?/sec random-access/vortex-local-fs 1.00 381.3±10.78µs ? ?/sec 34.91 13.3±0.13ms ? ?/sec random-access/vortex-tokio-local-disk 1.00 349.5±12.64µs ? ?/sec 29.46 10.3±0.14ms ? ?/sec ``` --- bench-vortex/benches/compress_noci.rs | 4 +- bench-vortex/benches/random_access.rs | 8 +- bench-vortex/src/lib.rs | 37 +------ bench-vortex/src/public_bi_data.rs | 2 +- bench-vortex/src/reader.rs | 145 +++++++++----------------- pyvortex/src/dataset.rs | 6 +- vortex-sampling-compressor/src/lib.rs | 2 +- 7 files changed, 61 insertions(+), 143 deletions(-) diff --git a/bench-vortex/benches/compress_noci.rs b/bench-vortex/benches/compress_noci.rs index 15c03149ce..6d14a5b990 100644 --- a/bench-vortex/benches/compress_noci.rs +++ b/bench-vortex/benches/compress_noci.rs @@ -26,7 +26,7 @@ use vortex::array::{ChunkedArray, StructArray}; use vortex::dtype::field::Field; use vortex::error::VortexResult; use vortex::sampling_compressor::compressors::fsst::FSSTCompressor; -use vortex::sampling_compressor::{SamplingCompressor, ALL_COMPRESSORS_CONTEXT}; +use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT}; use vortex::serde::layouts::{ LayoutBatchStreamBuilder, LayoutContext, LayoutDeserializer, LayoutWriter, }; @@ -128,7 +128,7 @@ fn vortex_decompress_read(runtime: &Runtime, buf: Arc>) -> VortexResult< let builder: LayoutBatchStreamBuilder<_> = LayoutBatchStreamBuilder::new( buf, LayoutDeserializer::new( - ALL_COMPRESSORS_CONTEXT.clone(), + ALL_ENCODINGS_CONTEXT.clone(), LayoutContext::default().into(), ), ); diff --git a/bench-vortex/benches/random_access.rs b/bench-vortex/benches/random_access.rs index 22a56a945b..dc39ab6d56 100644 --- a/bench-vortex/benches/random_access.rs +++ b/bench-vortex/benches/random_access.rs @@ -33,12 +33,12 @@ fn random_access_vortex(c: &mut Criterion) { .iter(|| async { black_box(take_vortex_tokio(&taxi_vortex, &INDICES).await.unwrap()) }) }); - let local_fs = Arc::new(LocalFileSystem::new()) as Arc; - let local_fs_path = object_store::path::Path::from_filesystem_path(&taxi_vortex).unwrap(); group.bench_function("vortex-local-fs", |b| { + let local_fs = Arc::new(LocalFileSystem::new()) as Arc; + let local_fs_path = object_store::path::Path::from_filesystem_path(&taxi_vortex).unwrap(); b.to_async(Runtime::new().unwrap()).iter(|| async { black_box( - take_vortex_object_store(&local_fs, &local_fs_path, &INDICES) + take_vortex_object_store(local_fs.clone(), local_fs_path.clone(), &INDICES) .await .unwrap(), ) @@ -65,7 +65,7 @@ fn random_access_vortex(c: &mut Criterion) { b.to_async(Runtime::new().unwrap()).iter(|| async { black_box( - take_vortex_object_store(&r2_fs, &r2_path, &INDICES) + take_vortex_object_store(r2_fs.clone(), r2_path.clone(), &INDICES) .await .unwrap(), ) diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index e1232a2538..ff9338fb26 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -11,23 +11,11 @@ use itertools::Itertools; use log::LevelFilter; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use simplelog::{ColorChoice, Config, TermLogger, TerminalMode}; -use vortex::aliases::hash_set::HashSet; use vortex::array::ChunkedArray; use vortex::arrow::FromArrowType; use vortex::compress::CompressionStrategy; use vortex::dtype::DType; use vortex::fastlanes::DeltaEncoding; -use vortex::sampling_compressor::compressors::alp::ALPCompressor; -use vortex::sampling_compressor::compressors::alp_rd::ALPRDCompressor; -use vortex::sampling_compressor::compressors::bitpacked::BITPACK_WITH_PATCHES; -use vortex::sampling_compressor::compressors::date_time_parts::DateTimePartsCompressor; -use vortex::sampling_compressor::compressors::dict::DictCompressor; -use vortex::sampling_compressor::compressors::fsst::FSSTCompressor; -use vortex::sampling_compressor::compressors::r#for::FoRCompressor; -use vortex::sampling_compressor::compressors::roaring_bool::RoaringBoolCompressor; -use vortex::sampling_compressor::compressors::runend::DEFAULT_RUN_END_COMPRESSOR; -use vortex::sampling_compressor::compressors::sparse::SparseCompressor; -use vortex::sampling_compressor::compressors::CompressorRef; use vortex::sampling_compressor::SamplingCompressor; use vortex::{Array, Context, IntoArray}; @@ -51,22 +39,6 @@ pub static CTX: LazyLock> = LazyLock::new(|| { ) }); -pub static COMPRESSORS: LazyLock>> = LazyLock::new(|| { - [ - &ALPCompressor as CompressorRef<'static>, - &ALPRDCompressor, - &DictCompressor, - &BITPACK_WITH_PATCHES, - &FoRCompressor, - &FSSTCompressor, - &DateTimePartsCompressor, - &DEFAULT_RUN_END_COMPRESSOR, - &RoaringBoolCompressor, - &SparseCompressor, - ] - .into() -}); - /// Creates a file if it doesn't already exist. /// NB: Does NOT modify the given path to ensure that it resides in the data directory. pub fn idempotent( @@ -172,10 +144,7 @@ pub fn fetch_taxi_data() -> Array { } pub fn compress_taxi_data() -> Array { - let uncompressed = fetch_taxi_data(); - let compressor: &dyn CompressionStrategy = &SamplingCompressor::new(COMPRESSORS.clone()); - - compressor.compress(&uncompressed).unwrap() + CompressionStrategy::compress(&SamplingCompressor::default(), &fetch_taxi_data()).unwrap() } pub struct CompressionRunStats { @@ -235,7 +204,7 @@ mod test { use vortex::{Array, IntoCanonical}; use crate::taxi_data::taxi_data_parquet; - use crate::{compress_taxi_data, setup_logger, COMPRESSORS}; + use crate::{compress_taxi_data, setup_logger}; #[ignore] #[test] @@ -268,7 +237,7 @@ mod test { let file = File::open(taxi_data_parquet()).unwrap(); let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); let reader = builder.with_limit(1).build().unwrap(); - let compressor: &dyn CompressionStrategy = &SamplingCompressor::new(COMPRESSORS.clone()); + let compressor: &dyn CompressionStrategy = &SamplingCompressor::default(); for record_batch in reader.map(|batch_result| batch_result.unwrap()) { let struct_arrow: ArrowStructArray = record_batch.into(); diff --git a/bench-vortex/src/public_bi_data.rs b/bench-vortex/src/public_bi_data.rs index 6d973729ee..9529e03a43 100644 --- a/bench-vortex/src/public_bi_data.rs +++ b/bench-vortex/src/public_bi_data.rs @@ -13,7 +13,7 @@ use tokio::fs::File; use vortex::aliases::hash_map::HashMap; use vortex::array::ChunkedArray; use vortex::error::VortexResult; -use vortex::{Array, ArrayDType, ArrayTrait, IntoArray}; +use vortex::{Array, ArrayDType, IntoArray}; use crate::data_downloads::{decompress_bz2, download_data, BenchmarkDataset, FileType}; use crate::public_bi_data::PBIDataset::*; diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index 01e703d29d..cfb202d132 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -11,7 +11,6 @@ use arrow_array::{ }; use arrow_select::concat::concat_batches; use arrow_select::take::take_record_batch; -use bytes::{Bytes, BytesMut}; use futures::stream; use itertools::Itertools; use log::info; @@ -23,22 +22,17 @@ use parquet::file::metadata::RowGroupMetaData; use serde::{Deserialize, Serialize}; use stream::StreamExt; use vortex::aliases::hash_map::HashMap; -use vortex::array::{ChunkedArray, PrimitiveArray}; +use vortex::array::ChunkedArray; use vortex::arrow::FromArrowType; -use vortex::buffer::Buffer; use vortex::compress::CompressionStrategy; use vortex::dtype::DType; -use vortex::error::{vortex_err, VortexResult}; -use vortex::sampling_compressor::SamplingCompressor; -use vortex::serde::chunked_reader::ChunkedArrayReader; -use vortex::serde::io::{ObjectStoreExt, VortexReadAt, VortexWrite}; -use vortex::serde::stream_reader::StreamArrayReader; -use vortex::serde::stream_writer::StreamArrayWriter; -use vortex::serde::DTypeReader; -use vortex::stream::ArrayStreamExt; -use vortex::{Array, ArrayDType, IntoArray, IntoCanonical}; - -use crate::{COMPRESSORS, CTX}; +use vortex::error::VortexResult; +use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT}; +use vortex::serde::io::{ObjectStoreReadAt, VortexReadAt, VortexWrite}; +use vortex::serde::layouts::{ + LayoutBatchStreamBuilder, LayoutContext, LayoutDeserializer, LayoutWriter, +}; +use vortex::{Array, IntoArray, IntoCanonical}; pub const BATCH_SIZE: usize = 65_536; @@ -51,15 +45,18 @@ pub struct VortexFooter { pub async fn open_vortex(path: &Path) -> VortexResult { let file = tokio::fs::File::open(path).await.unwrap(); - let reader = StreamArrayReader::try_new(file, CTX.clone()) - .await? - .load_dtype() - .await?; - reader - .into_array_stream() - .collect_chunked() - .await - .map(IntoArray::into_array) + + LayoutBatchStreamBuilder::new( + file, + LayoutDeserializer::new( + ALL_ENCODINGS_CONTEXT.clone(), + LayoutContext::default().into(), + ), + ) + .build() + .await? + .read_all() + .await } pub async fn rewrite_parquet_as_vortex( @@ -68,24 +65,11 @@ pub async fn rewrite_parquet_as_vortex( ) -> VortexResult<()> { let chunked = compress_parquet_to_vortex(parquet_path.as_path())?; - let written = StreamArrayWriter::new(write) - .write_array_stream(chunked.array_stream()) + LayoutWriter::new(write) + .write_array_columns(chunked) + .await? + .finalize() .await?; - - let layout = written.array_layouts()[0].clone(); - let mut w = written.into_inner(); - let mut s = flexbuffers::FlexbufferSerializer::new(); - VortexFooter { - byte_offsets: layout.chunks.byte_offsets, - row_offsets: layout.chunks.row_offsets, - dtype_range: layout.dtype.begin..layout.dtype.end, - } - .serialize(&mut s)?; - let footer_bytes = Buffer::from(Bytes::from(s.take_buffer())); - let footer_len = footer_bytes.len() as u64; - w.write_all(footer_bytes).await?; - w.write_all(footer_len.to_le_bytes()).await?; - Ok(()) } @@ -102,17 +86,9 @@ pub fn read_parquet_to_vortex>(parquet_path: P) -> VortexResult VortexResult { +pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult { let chunked = read_parquet_to_vortex(parquet_path)?; - let compressor: &dyn CompressionStrategy = &SamplingCompressor::new(COMPRESSORS.clone()); - let dtype = chunked.dtype().clone(); - ChunkedArray::try_new( - chunked - .chunks() - .map(|x| compressor.compress(&x)) - .collect::>>()?, - dtype, - ) + CompressionStrategy::compress(&SamplingCompressor::default(), &chunked.into_array()) } pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResult<()> { @@ -134,64 +110,37 @@ pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResu Ok(()) } -pub async fn read_vortex_footer_format( - reader: R, - len: u64, -) -> VortexResult> { - let mut buf = BytesMut::with_capacity(8); - unsafe { buf.set_len(8) } - buf = reader.read_at_into(len - 8, buf).await?; - let footer_len = u64::from_le_bytes(buf.as_ref().try_into().unwrap()) as usize; - - buf.reserve(footer_len - buf.len()); - unsafe { buf.set_len(footer_len) } - buf = reader - .read_at_into(len - footer_len as u64 - 8, buf) - .await?; - - let footer: VortexFooter = VortexFooter::deserialize( - flexbuffers::Reader::get_root(buf.as_ref()).map_err(|e| vortex_err!("{}", e))?, - )?; - - let header_len = (footer.dtype_range.end - footer.dtype_range.start) as usize; - buf.reserve(header_len - buf.len()); - unsafe { buf.set_len(header_len) } - buf = reader.read_at_into(footer.dtype_range.start, buf).await?; - let dtype = DTypeReader::new(buf).await?.read_dtype().await?; - - ChunkedArrayReader::try_new( +async fn take_vortex( + reader: T, + indices: &[u64], +) -> VortexResult { + LayoutBatchStreamBuilder::new( reader, - CTX.clone(), - dtype.into(), - PrimitiveArray::from(footer.byte_offsets).into_array(), - PrimitiveArray::from(footer.row_offsets).into_array(), + LayoutDeserializer::new( + ALL_ENCODINGS_CONTEXT.clone(), + LayoutContext::default().into(), + ), ) + .with_indices(Array::from(indices.to_vec())) + .build() + .await? + .read_all() + .await + // For equivalence.... we decompress to make sure we're not cheating too much. + .and_then(IntoCanonical::into_canonical) + .map(Array::from) } pub async fn take_vortex_object_store( - fs: &Arc, - path: &object_store::path::Path, + fs: Arc, + path: object_store::path::Path, indices: &[u64], ) -> VortexResult { - let head = fs.head(path).await?; - let indices_array = indices.to_vec().into_array(); - let taken = read_vortex_footer_format(fs.vortex_reader(path), head.size as u64) - .await? - .take_rows(&indices_array) - .await?; - // For equivalence.... we flatten to make sure we're not cheating too much. - Ok(taken.into_canonical()?.into()) + take_vortex(ObjectStoreReadAt::new(fs.clone(), path), indices).await } pub async fn take_vortex_tokio(path: &Path, indices: &[u64]) -> VortexResult { - let len = File::open(path)?.metadata()?.len(); - let indices_array = indices.to_vec().into_array(); - let taken = read_vortex_footer_format(tokio::fs::File::open(path).await?, len) - .await? - .take_rows(&indices_array) - .await?; - // For equivalence.... we flatten to make sure we're not cheating too much. - Ok(taken.into_canonical()?.into()) + take_vortex(tokio::fs::File::open(path).await?, indices).await } pub async fn take_parquet_object_store( diff --git a/pyvortex/src/dataset.rs b/pyvortex/src/dataset.rs index b1dc425b8f..d34744cbc5 100644 --- a/pyvortex/src/dataset.rs +++ b/pyvortex/src/dataset.rs @@ -12,7 +12,7 @@ use vortex::arrow::infer_schema; use vortex::dtype::field::Field; use vortex::dtype::DType; use vortex::error::VortexResult; -use vortex::sampling_compressor::ALL_COMPRESSORS_CONTEXT; +use vortex::sampling_compressor::ALL_ENCODINGS_CONTEXT; use vortex::serde::io::{ObjectStoreReadAt, VortexReadAt}; use vortex::serde::layouts::{ LayoutBatchStream, LayoutBatchStreamBuilder, LayoutContext, LayoutDescriptorReader, @@ -33,7 +33,7 @@ pub async fn layout_stream_from_reader( let mut builder = LayoutBatchStreamBuilder::new( reader, LayoutDeserializer::new( - ALL_COMPRESSORS_CONTEXT.clone(), + ALL_ENCODINGS_CONTEXT.clone(), LayoutContext::default().into(), ), ) @@ -64,7 +64,7 @@ pub async fn read_array_from_reader( pub async fn read_dtype_from_reader(reader: T) -> VortexResult { LayoutDescriptorReader::new(LayoutDeserializer::new( - ALL_COMPRESSORS_CONTEXT.clone(), + ALL_ENCODINGS_CONTEXT.clone(), LayoutContext::default().into(), )) .read_footer(&reader, reader.size().await) diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index f0f8578f64..06c3774655 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -75,7 +75,7 @@ pub static FASTEST_COMPRESSORS: LazyLock<[CompressorRef<'static>; 7]> = LazyLock ] }); -pub static ALL_COMPRESSORS_CONTEXT: LazyLock> = LazyLock::new(|| { +pub static ALL_ENCODINGS_CONTEXT: LazyLock> = LazyLock::new(|| { Arc::new(Context::default().with_encodings([ &ALPEncoding as EncodingRef, &ByteBoolEncoding,